Сервис загрузки данных

NOTE

Статус: Target service boundary. Документ описывает целевую сервисную границу. Код либо полностью отсутствует, либо существует только как scaffold — смотрите секцию «Статус документа» ниже для точного указания на код. Правила маркировки — в 50-processes/documentation-standard.md.

Этот README описывает целевую сервисную границу ingestion. Читайте его, когда нужно понять, как система получает данные от поставщиков, управляет сессиями и публикует raw payload в пайплайн.

Назначение

Оркестрация получения данных от поставщиков: запуск коннекторов, управление сессиями и rate limits, сохранение raw payload и публикация событий в pipeline.

Статус документа

  • Тип знания: target service boundary
  • Статус реализации: отдельного standalone-сервиса ingestion нет, но worker runtime supplier-sync уже реализован
  • Текущее место кода: backend/cmd/supplier-sync, backend/internal/core/ingestion, backend/internal/core/normalization
  • Что читать дальше: connectors/framework.md, pipelines.md (live — 3 pipeline’а: cursor / bulk-snapshot / legacy), supplier-sync-runbook.md, ../../20-architecture/integration-patterns.md, ../normalization/README.md
  • Текущий supplier roster (2026-05-06): ETM, IEK, Systeme Electric, DKC, Russvet — 5 адаптеров. См. connectors/{etm,iek,systeme,dkc,russvet}.md.

Область действия

Входит:

  • Запуск fetch-задач по расписанию и по триггерам.
  • Управление сессиями поставщиков и refresh auth.
  • Централизованный rate limiter.
  • Polling асинхронных поставщиковых задач.
  • Сохранение сырых payload’ов в S3 и публикация raw-событий.

Не входит:

  • Парсинг и нормализация payload’ов.
  • Матчинг и каталожные решения.

Публичный контракт

Вход

  • HTTP (internal): POST /v1/jobs/fetch для ручного запуска fetch-зачи.
  • Триггеры планировщика per supplier.
  • Webhooks поставщиков, если поставщик поддерживает push-модель.

Выход

  • Kafka-топики raw.supplier.<supplier_id>.payload.v1.
  • Запись raw payload в S3/MinIO.

Внутренняя архитектура

В целевом состоянии ingestion собирает connector registry, scheduler, session manager, rate limiter и raw payload writer. Коннекторы регистрируются в composition root ingestion-процесса, а не зашиваются в ядро.

Текущий runtime supplier-sync уже использует этот каркас для seed-based интеграций etm, iek и systeme.

Как данные проходят от ETM до API

Ниже зафиксирован целевой поток данных для Phase 1-a: как данные забираются у ETM, где хранятся, как преобразуются и как попадают в публичный API. Это рабочая схема для чтения архитектуры целиком, без необходимости собирать картину по plan/spec документам.

flowchart LR
    A["Шаг 1: supplier-sync<br/>берёт список SKU из backend/config/etm-seed.yaml"] --> B["Шаг 2: ETM API<br/>goods / price / remains"]
    B --> C["Шаг 3: MinIO / S3<br/>сохраняем исходные ответы ETM"]
    B --> D["Шаг 4: Нормализация<br/>преобразуем ответы ETM в нашу внутреннюю модель"]
    C -. raw_refs .-> E
    D --> E["Шаг 5: PostgreSQL<br/>supplier_offers + offer_observations + canonical_products + warehouses"]
    E --> F["Шаг 6: Outbox<br/>фиксируем события в той же транзакции"]
    F --> G["Шаг 7: Kafka<br/>raw.supplier.etm.payload.v1<br/>offer.observation.v2<br/>offer.price_set_changed.v1<br/>offer.forecast_changed.v1<br/>canonical.events.v1"]
    E --> H["Шаг 8: api-server<br/>читает только PostgreSQL"]
    H --> I["Шаг 9: Внешний API<br/>GET /v1/products"]
    H --> J["Шаг 10: Внутренний API<br/>GET /v1/admin/offers/supplier-offers"]

Пошаговый сценарий

  1. supplier-sync по расписанию запускает очередной проход загрузки и читает список SKU из backend/config/etm-seed.yaml.
  2. Для каждого SKU коннектор ETM получает данные из ETM API: карточку товара, цены и остатки.
  3. Каждый исходный ответ ETM отдельно сохраняется в MinIO/S3. Это нужно, чтобы не терять исходные данные и иметь возможность повторно их обработать.
  4. Затем слой нормализации преобразует ответы ETM в нашу внутреннюю модель:
    • NormalizedOffer — описание товара поставщика;
    • Observation — наблюдение по ценам, остаткам и срокам поставки.
  5. Итог записывается в PostgreSQL:
    • supplier_offers — текущее представление товара у поставщика;
    • offer_observations — последнее наблюдение по ценам, остаткам и срокам;
    • canonical_products — канонический товар, который будет отдаваться наружу;
    • warehouses — используемые склады и их типы.
  6. В той же транзакции создаются записи в outbox_events. Это гарантирует, что данные в PostgreSQL и события для шины не разъедутся.
  7. api-server поднимает singleton-dispatcher outbox, который читает outbox_events, публикует сообщения в Kafka и помечает строки как отправленные.
  8. Публичный и внутренний API не ходят в ETM напрямую. Они читают только PostgreSQL.
  9. GET /v1/products отдаёт клиенту каноническую карточку товара: идентификатор TR-XXXXXXX, характеристики, цены, остатки и сроки.
  10. GET /v1/admin/offers/supplier-offers отдаёт внутреннее представление данных поставщика: supplier SKU, наблюдения, категории, производителя и другую техническую информацию.

Что где хранится

  • Исходные ответы ETM: MinIO/S3.
  • Нормализованные данные для чтения API: PostgreSQL.
  • События для downstream-систем: Kafka, через transactional outbox.

Где смотреть точные контракты

Зависимости

  • PostgreSQL для supplier/session/job metadata.
  • Kafka.
  • Redis для rate limiting и session cache.
  • S3/MinIO.
  • Secret backend / master key provider для credentials.

Хранилище

  • Читает: supplier, supplier_warehouse.
  • Пишет: supplier_session, async_supplier_job, raw_payload_ref.
  • Raw payloads в S3/MinIO считаются временным operational cache. Product-facing данные должны быть материализованы в нормализованных таблицах; старые raw objects удаляются retention-механизмом, чтобы ingestion не блокировался на free-space threshold MinIO.

Конфигурация

Env varDefaultОписание
INGESTION_DB_DSNPostgreSQL DSN
INGESTION_KAFKA_BROKERSсписок брокеров
INGESTION_REDIS_ADDRRedis
INGESTION_S3_ENDPOINTendpoint объектного хранилища
INGESTION_S3_BUCKETbucket для raw payload
S3_RAW_EXPIRE_DAYS7lifecycle TTL для raw bucket
S3_RAW_CLEANER_OLDER_THAN24hвозраст raw objects для активной очистки MinIO
S3_RAW_CLEANER_INTERVAL_SECONDS3600период запуска активной очистки MinIO
INGESTION_VAULT_ADDRbackend для credentials

Локальный запуск

Отдельного standalone-процесса ingestion в текущем репозитории нет, но есть worker supplier-sync. Для его локального прогона смотрите supplier-sync-runbook.md. Из общих команд доступно:

make infra-up
make web-up

Тестирование

  • Для целевого ingestion обязательны unit-тесты scheduler/session logic.
  • Нужны provider contract tests для коннекторов на фикстурах.
  • Нужны integration-тесты на S3, Kafka и rate limiter.

Наблюдаемость

В целевом состоянии сервис должен публиковать метрики:

  • ingestion_fetch_attempts_total{supplier,endpoint,status}
  • ingestion_fetch_duration_seconds{supplier,endpoint}
  • ingestion_rate_limit_wait_seconds{supplier}
  • ingestion_raw_payloads_written_total{supplier}

Сейчас доступны только общие логи локального контура.

Открытые вопросы / TODO

  • Добавить replay сырого payload из S3 обратно в pipeline.
  • Спроектировать graceful shutdown для длинных fetch-задач.

Связанные документы