Ingestion pipelines

NOTE

Статус: Live (HEAD 5e4ce54, 2026-05-06). Документ описывает три pipeline’а, через которые supplier-sync получает и нормализует данные поставщиков. Все три уже работают в коде; распределение поставщиков фиксируется capability-флагами.

Введение BulkSnapshotPipeline (commit 78763bf 2026-05-06) разделило ingestion на три параллельных pipeline’а, выбираемых PipelineRouter на основе Capabilities.

Маршрутизация (PipelineRouter)

backend/internal/core/ingestion/app/pipeline_router.go:

                 PipelineRouter.Route(supplier)

       ┌──────────────────────┼─────────────────────────────────┐
       ▼                      ▼                                 ▼
IncrementalSync.Kind=     ScheduledRefresh.UsesBulkSnapshot()  otherwise
revision_based  ─►        true                                  ─► legacy per-SKU
date_based   (planned)    ─► BulkSnapshotPipeline                 full-catalog


CursorPipeline

Решение принимается один раз на supplier — capabilities статичны (ADR-0024 §7).

CursorPipeline

backend/internal/core/ingestion/app/cursor_pipeline.go (введён в Cycle 4 — ADR-0050).

  • Использует delta-cursor поставщика. На сегодня — DKC (revision_based).
  • Systeme Electric retrofit на date_based cursor запланирован (см. tracium_path_to_launch.md).
  • Хранение последнего revision/cursor в supplier_cursor таблице; advisory-lock на per-supplier ticker гарантирует exclusivity.

BulkSnapshotPipeline

backend/internal/core/ingestion/app/bulk_snapshot_pipeline.go — 2026-05-06.

Per-supplier BulkSnapshotSource стримит BulkFetchResult items, pipeline их по очереди гонит через normalization → upsert → canonical provisioning → optional charnorm raw write.

Активные источники:

ПоставщикИсточникBacking API
ETMinfra/etm/bulk_snapshot_source.goseed catalog + per-SKU price/remains
IEKinfra/iek/bulk_snapshot_source.gocatalog + price + stock endpoint’ы
Systemeinfra/systeme/bulk_snapshot_source.gocatalog + price + delivery
Russvetinfra/russvet/bulk_snapshot_source.gomassprice + residue/all + position
DKCinfra/dkc/bulk_snapshot_source.gofallback к cursor (используется при initial seed)

Capabilities.ScheduledRefresh.UsesBulkSnapshot() возвращает true если любая из axis (Catalog/Prices/Stock/Specs) имеет стратегию batch | batch_by_warehouse | paged | incremental.

Outcome reporting

type BulkSnapshotReport struct {
    Supplier  string
    Duration  time.Duration
    ItemCount int
    OK        int
    Partial   int
    NotFound  int
    Errored   int
}
  • OK — full row written.
  • Partial — некоторые axis не пришли (price есть, stock нет etc).
  • NotFound — supplier ответил 404 на specific item.
  • Errored — non-recoverable error.

Гарантии

  • Per-credential iteration: bulk-pull под одной credential завершается до перехода к следующей (строит снимок на per-customer-credentials cohort).
  • Errors внутри stream’а не прерывают ingestion — логируются, инкрементируют Errored.
  • Logger через slog; events — platform/observability/StartPhase (stable name ingestion.bulk.phase).
  • Clock через clock.Clock port (no time.Now() direct).

Legacy per-SKU full-catalog pipeline

Используется только для поставщиков без IncrementalSync и без bulk-стратегий. На 2026-05-06 такие поставщики отсутствуют — все 5 проходят либо через CursorPipeline (DKC), либо через BulkSnapshotPipeline.

Сохраняется в коде до полной верификации, что bulk-pipeline покрывает все edge-cases.

DI wiring

backend/internal/core/ingestion/di.go собирает:

  • Sources map[string]BulkSnapshotSource — каждый supplier-fx-модуль регистрирует свой source через fx.Group("ingestion.bulk_sources").
  • CursorPipeline — отдельный fx provider; реестр cursor’ов отдельно.
  • PipelineRouter принимает оба + legacy fallback.

Связанные документы