Ingestion pipelines
NOTE
Статус: Live (HEAD
5e4ce54, 2026-05-06). Документ описывает три pipeline’а, через которыеsupplier-syncполучает и нормализует данные поставщиков. Все три уже работают в коде; распределение поставщиков фиксируется capability-флагами.
Введение BulkSnapshotPipeline (commit 78763bf 2026-05-06) разделило ingestion на три параллельных pipeline’а, выбираемых PipelineRouter на основе Capabilities.
Маршрутизация (PipelineRouter)
backend/internal/core/ingestion/app/pipeline_router.go:
PipelineRouter.Route(supplier)
│
┌──────────────────────┼─────────────────────────────────┐
▼ ▼ ▼
IncrementalSync.Kind= ScheduledRefresh.UsesBulkSnapshot() otherwise
revision_based ─► true ─► legacy per-SKU
date_based (planned) ─► BulkSnapshotPipeline full-catalog
│
▼
CursorPipelineРешение принимается один раз на supplier — capabilities статичны (ADR-0024 §7).
CursorPipeline
backend/internal/core/ingestion/app/cursor_pipeline.go (введён в Cycle 4 — ADR-0050).
- Использует delta-cursor поставщика. На сегодня — DKC (
revision_based). - Systeme Electric retrofit на
date_basedcursor запланирован (см.tracium_path_to_launch.md). - Хранение последнего revision/cursor в
supplier_cursorтаблице; advisory-lock на per-supplier ticker гарантирует exclusivity.
BulkSnapshotPipeline
backend/internal/core/ingestion/app/bulk_snapshot_pipeline.go — 2026-05-06.
Per-supplier BulkSnapshotSource стримит BulkFetchResult items, pipeline их по очереди гонит через normalization → upsert → canonical provisioning → optional charnorm raw write.
Активные источники:
| Поставщик | Источник | Backing API |
|---|---|---|
| ETM | infra/etm/bulk_snapshot_source.go | seed catalog + per-SKU price/remains |
| IEK | infra/iek/bulk_snapshot_source.go | catalog + price + stock endpoint’ы |
| Systeme | infra/systeme/bulk_snapshot_source.go | catalog + price + delivery |
| Russvet | infra/russvet/bulk_snapshot_source.go | massprice + residue/all + position |
| DKC | infra/dkc/bulk_snapshot_source.go | fallback к cursor (используется при initial seed) |
Capabilities.ScheduledRefresh.UsesBulkSnapshot() возвращает true если любая из axis (Catalog/Prices/Stock/Specs) имеет стратегию batch | batch_by_warehouse | paged | incremental.
Outcome reporting
type BulkSnapshotReport struct {
Supplier string
Duration time.Duration
ItemCount int
OK int
Partial int
NotFound int
Errored int
}OK— full row written.Partial— некоторые axis не пришли (price есть, stock нет etc).NotFound— supplier ответил 404 на specific item.Errored— non-recoverable error.
Гарантии
- Per-credential iteration: bulk-pull под одной credential завершается до перехода к следующей (строит снимок на per-customer-credentials cohort).
- Errors внутри stream’а не прерывают ingestion — логируются, инкрементируют
Errored. - Logger через
slog; events —platform/observability/StartPhase(stable nameingestion.bulk.phase). - Clock через
clock.Clockport (notime.Now()direct).
Legacy per-SKU full-catalog pipeline
Используется только для поставщиков без IncrementalSync и без bulk-стратегий. На 2026-05-06 такие поставщики отсутствуют — все 5 проходят либо через CursorPipeline (DKC), либо через BulkSnapshotPipeline.
Сохраняется в коде до полной верификации, что bulk-pipeline покрывает все edge-cases.
DI wiring
backend/internal/core/ingestion/di.go собирает:
Sources map[string]BulkSnapshotSource— каждый supplier-fx-модуль регистрирует свой source черезfx.Group("ingestion.bulk_sources").CursorPipeline— отдельный fx provider; реестр cursor’ов отдельно.PipelineRouterпринимает оба + legacy fallback.
Связанные документы
connectors/framework.md— общий контракт connector’а.connectors/russvet.md,connectors/etm.md,connectors/iek.md— per-supplier specifics.../../20-architecture/adr/0024-supplier-connector-contract.md— connector contract.../../20-architecture/adr/0043-ingestion-orchestration.md— orchestration audit (per-supplier ticker, advisory lock).../../20-architecture/adr/0050-dkc-adapter-and-taxonomy.md— Cycle 4: cursor-based incremental + federated taxonomy.