Сервис загрузки данных
NOTE
Статус: Target service boundary. Документ описывает целевую сервисную границу. Код либо полностью отсутствует, либо существует только как scaffold — смотрите секцию «Статус документа» ниже для точного указания на код. Правила маркировки — в
50-processes/documentation-standard.md.
Этот README описывает целевую сервисную границу ingestion. Читайте его, когда нужно понять, как система получает данные от поставщиков, управляет сессиями и публикует raw payload в пайплайн.
Назначение
Оркестрация получения данных от поставщиков: запуск коннекторов, управление сессиями и rate limits, сохранение raw payload и публикация событий в pipeline.
Статус документа
- Тип знания:
target service boundary - Статус реализации: отдельного standalone-сервиса
ingestionнет, но worker runtimesupplier-syncуже реализован - Текущее место кода:
backend/cmd/supplier-sync,backend/internal/core/ingestion,backend/internal/core/normalization - Что читать дальше:
connectors/framework.md,pipelines.md(live — 3 pipeline’а: cursor / bulk-snapshot / legacy),supplier-sync-runbook.md,../../20-architecture/integration-patterns.md,../normalization/README.md - Текущий supplier roster (2026-05-06): ETM, IEK, Systeme Electric, DKC, Russvet — 5 адаптеров. См.
connectors/{etm,iek,systeme,dkc,russvet}.md.
Область действия
Входит:
- Запуск fetch-задач по расписанию и по триггерам.
- Управление сессиями поставщиков и refresh auth.
- Централизованный rate limiter.
- Polling асинхронных поставщиковых задач.
- Сохранение сырых payload’ов в S3 и публикация raw-событий.
Не входит:
- Парсинг и нормализация payload’ов.
- Матчинг и каталожные решения.
Публичный контракт
Вход
- HTTP (internal):
POST /v1/jobs/fetchдля ручного запуска fetch-зачи. - Триггеры планировщика per supplier.
- Webhooks поставщиков, если поставщик поддерживает push-модель.
Выход
- Kafka-топики
raw.supplier.<supplier_id>.payload.v1. - Запись raw payload в S3/MinIO.
Внутренняя архитектура
В целевом состоянии ingestion собирает connector registry, scheduler, session manager, rate limiter и raw payload writer. Коннекторы регистрируются в composition root ingestion-процесса, а не зашиваются в ядро.
Текущий runtime supplier-sync уже использует этот каркас для seed-based
интеграций etm, iek и systeme.
Как данные проходят от ETM до API
Ниже зафиксирован целевой поток данных для Phase 1-a: как данные забираются у ETM, где хранятся, как преобразуются и как попадают в публичный API. Это рабочая схема для чтения архитектуры целиком, без необходимости собирать картину по plan/spec документам.
flowchart LR A["Шаг 1: supplier-sync<br/>берёт список SKU из backend/config/etm-seed.yaml"] --> B["Шаг 2: ETM API<br/>goods / price / remains"] B --> C["Шаг 3: MinIO / S3<br/>сохраняем исходные ответы ETM"] B --> D["Шаг 4: Нормализация<br/>преобразуем ответы ETM в нашу внутреннюю модель"] C -. raw_refs .-> E D --> E["Шаг 5: PostgreSQL<br/>supplier_offers + offer_observations + canonical_products + warehouses"] E --> F["Шаг 6: Outbox<br/>фиксируем события в той же транзакции"] F --> G["Шаг 7: Kafka<br/>raw.supplier.etm.payload.v1<br/>offer.observation.v2<br/>offer.price_set_changed.v1<br/>offer.forecast_changed.v1<br/>canonical.events.v1"] E --> H["Шаг 8: api-server<br/>читает только PostgreSQL"] H --> I["Шаг 9: Внешний API<br/>GET /v1/products"] H --> J["Шаг 10: Внутренний API<br/>GET /v1/admin/offers/supplier-offers"]
Пошаговый сценарий
supplier-syncпо расписанию запускает очередной проход загрузки и читает список SKU изbackend/config/etm-seed.yaml.- Для каждого SKU коннектор ETM получает данные из ETM API: карточку товара, цены и остатки.
- Каждый исходный ответ ETM отдельно сохраняется в MinIO/S3. Это нужно, чтобы не терять исходные данные и иметь возможность повторно их обработать.
- Затем слой нормализации преобразует ответы ETM в нашу внутреннюю модель:
NormalizedOffer— описание товара поставщика;Observation— наблюдение по ценам, остаткам и срокам поставки.
- Итог записывается в PostgreSQL:
supplier_offers— текущее представление товара у поставщика;offer_observations— последнее наблюдение по ценам, остаткам и срокам;canonical_products— канонический товар, который будет отдаваться наружу;warehouses— используемые склады и их типы.
- В той же транзакции создаются записи в
outbox_events. Это гарантирует, что данные в PostgreSQL и события для шины не разъедутся. api-serverподнимает singleton-dispatcher outbox, который читаетoutbox_events, публикует сообщения в Kafka и помечает строки как отправленные.- Публичный и внутренний API не ходят в ETM напрямую. Они читают только PostgreSQL.
GET /v1/productsотдаёт клиенту каноническую карточку товара: идентификаторTR-XXXXXXX, характеристики, цены, остатки и сроки.GET /v1/admin/offers/supplier-offersотдаёт внутреннее представление данных поставщика: supplier SKU, наблюдения, категории, производителя и другую техническую информацию.
Что где хранится
- Исходные ответы ETM: MinIO/S3.
- Нормализованные данные для чтения API: PostgreSQL.
- События для downstream-систем: Kafka, через transactional outbox.
Где смотреть точные контракты
- Целевой технический дизайн потока:
docs/superpowers/specs/2026-04-20-ingestion-phase1-a-design.md - Публичный API:
../../20-architecture/schemas/api/public-api.openapi.yaml - Публичный сервисный слой:
../public-api/README.md - Нормализация:
../normalization/README.md - ETM connector reference:
connectors/etm/README.md - IEK connector reference:
connectors/iek.md - Systeme connector reference:
connectors/systeme/README.md
Зависимости
- PostgreSQL для supplier/session/job metadata.
- Kafka.
- Redis для rate limiting и session cache.
- S3/MinIO.
- Secret backend / master key provider для credentials.
Хранилище
- Читает:
supplier,supplier_warehouse. - Пишет:
supplier_session,async_supplier_job,raw_payload_ref. - Raw payloads в S3/MinIO считаются временным operational cache. Product-facing данные должны быть материализованы в нормализованных таблицах; старые raw objects удаляются retention-механизмом, чтобы ingestion не блокировался на free-space threshold MinIO.
Конфигурация
| Env var | Default | Описание |
|---|---|---|
INGESTION_DB_DSN | — | PostgreSQL DSN |
INGESTION_KAFKA_BROKERS | — | список брокеров |
INGESTION_REDIS_ADDR | — | Redis |
INGESTION_S3_ENDPOINT | — | endpoint объектного хранилища |
INGESTION_S3_BUCKET | — | bucket для raw payload |
S3_RAW_EXPIRE_DAYS | 7 | lifecycle TTL для raw bucket |
S3_RAW_CLEANER_OLDER_THAN | 24h | возраст raw objects для активной очистки MinIO |
S3_RAW_CLEANER_INTERVAL_SECONDS | 3600 | период запуска активной очистки MinIO |
INGESTION_VAULT_ADDR | — | backend для credentials |
Локальный запуск
Отдельного standalone-процесса ingestion в текущем репозитории нет, но
есть worker supplier-sync. Для его локального прогона смотрите
supplier-sync-runbook.md. Из общих команд
доступно:
make infra-up
make web-upТестирование
- Для целевого ingestion обязательны unit-тесты scheduler/session logic.
- Нужны provider contract tests для коннекторов на фикстурах.
- Нужны integration-тесты на S3, Kafka и rate limiter.
Наблюдаемость
В целевом состоянии сервис должен публиковать метрики:
ingestion_fetch_attempts_total{supplier,endpoint,status}ingestion_fetch_duration_seconds{supplier,endpoint}ingestion_rate_limit_wait_seconds{supplier}ingestion_raw_payloads_written_total{supplier}
Сейчас доступны только общие логи локального контура.
Открытые вопросы / TODO
- Добавить replay сырого payload из S3 обратно в pipeline.
- Спроектировать graceful shutdown для длинных fetch-задач.