ADR-0060: Universal cursor-based catalog ingestion для batch suppliers
Status: accepted Date: 2026-05-17 Deciders: Maxim Belkanov
Контекст
Catalog ingestion для batch suppliers (ETM / IEK / Systeme / Russvet) выполнял full sweep каждый tick без сохранения позиции. Любой прерывающий событие — перезапуск контейнера, stale_timeout (15 мин, hardcoded), сбой supplier API — означал полную потерю прогресса.
Конкретная боль — IEK: 71 000 SKU @ 1 rps ≈ 20 ч sweep. Контейнер
supplier-sync перезапускался каждые ~15 мин → catalog sweep никогда не
завершался. Данные IEK в БД фактически не обновлялись.
DKC уже использовал отдельный CursorPipeline (cycle 4, ADR-0050) —
cursor-based incremental синхронизация с сохранением токена в таблице
incremental_sync_cursor. Задача — дистиллировать этот паттерн и применить его
ко всем batch suppliers через generic абстракцию вместо дублирования кода.
Дополнительные проблемы, устранённые попутно:
stale_threshold_secondsбыл hardcoded 900 с глобально; catalog sweep длиннее commerce tick, что вызывало false-positive/readyzпредупреждения.- Rate-limit был общим for всех поставщиков без возможности per-supplier настройки через env.
/readyzмог войти в restart-loop из-за одного transient blip.- Container SIGTERM не логировал goroutine dump → невозможно отследить зависший sweep.
- ETM commerce checkpoint отсутствовал: весь batch без прогрессивного checkpoint’а.
Решение
CursorStrategy интерфейс
Новый файл ingestion/domain/cursor_strategy.go вводит:
type CursorStrategy interface {
Kind() string
Initial(ctx context.Context, supplier SupplierKind) (PageBoundary, error)
Advance(prev PageBoundary, marker string) (next PageBoundary, done bool, err error)
}PageBoundary — discriminated struct (поля Kind / PageNumber / Token / HighWatermark / Revision / Done / TotalPages). Выбор поля per-supplier по
Kind строке. Сериализуется в JSON для хранения в incremental_sync_cursor.
StreamRequest и FetchResult
BulkSnapshotSource теряет метод Stream(ctx, cred) и приобретает:
StreamRequest(ctx context.Context, cred CredentialContext, req FetchRequest) (FetchResult, error)FetchRequest.Cursor несёт PageBoundary для resume; FetchResult.PageMarker
(строка) эмитируется после каждой страницы. Все четыре source (ETM / IEK /
Systeme / Russvet) мигрировали на новую сигнатуру.
Durable-page advance в BulkSnapshotPipeline.RunOnce
Алгоритм:
CursorRepo.Get(scopeKey)→ еслиErrCursorNotFound, вызватьstrategy.Initialдля seed.- Per-page loop:
source.StreamRequest(ctx, cred, req{Cursor: current}). - Если страница обработана без ошибок (
pendingErrors == 0): вызватьstrategy.Advance→ сохранить следующий курсор черезCursorRepo.Save. - Если страница содержала ошибки: курсор не сдвигается; при следующем restart страница переиграется.
- Sweep complete →
CursorRepo.Delete(scopeKey)→ следующийGetвернётErrCursorNotFound→strategy.Initialseed (= новый full sweep).
Scope key: <jobKind>/credential/<url.PathEscape(CredentialRef)>. Catalog и
commerce сохраняются в отдельных rows одной и той же таблицы.
Per-supplier стратегии
| Supplier | Kind | Механизм курсора |
|---|---|---|
| IEK | page_offset | PageNumber int |
| ETM | page_offset | PageNumber int (SgGds job pages) |
| Systeme | opaque | Token string из Link header |
| Russvet | iso_date | HighWatermark дата последней записи |
Выбор стратегии — compile-time per supplier capabilities (нет reflection).
Adjacent fixes
- stale_threshold_seconds per-schedule: schedules таблица получила колонку
stale_threshold_seconds;BulkSnapshotPipelineчитает её через LEFT JOIN с fallback 900 с для commerce и 14 400 с (4 ч) для catalog. - Rate-limit per-supplier env-driven:
SUPPLIER_<NAME>_RATE_RPSenv vars;ingdom.RatePolicyхранит per-supplier bucket вместо глобального. - readyz dampening: 3 последовательных 503 перед установкой unhealthy; одиночный blip игнорируется.
- SIGTERM goroutine dump:
signal.NotifyContext+runtime.Stackвmain.goпри получении SIGTERM для диагностики зависших sweep. - DB-driven tickTracker hooks:
scheduler.TickerполучилOnTickcallback-хуки;tickTrackerрегистрирует tick start/end в PG, что позволяет админ-UI отображать live-статус sweep. - ETM commerce checkpoint:
PageMarkerэмитируется каждые 10 batch (50 SKU каждый) → при restart ETM commerce переиграет максимум 10 batch, не весь commerce tick.
I.1 решение: E2E cursor-resume тест
Отдельный E2E full-wiring тест для cursor resume mid-sweep не реализован.
Решение обосновано покрытием на уровне per-supplier integration тестов:
bulk_snapshot_source_test.go каждого supplier проверяет resume-with-cursor +
idempotent upsert. Дублирование в E2E теоретически даёт signal о wiring, но не
о дополнительных edge-cases. Стоимость (test-infra, testcontainers, latency) не
оправдана на текущем масштабе. Решение зафиксировано в этом ADR.
Последствия
Позитивные:
- IEK catalog sweep теперь завершается за ~4 ч (rate 5 rps default) без потери прогресса при перезапуске.
- Container restart mid-sweep → resume c последней успешно подтверждённой страницы.
- Partial-page errors freeze cursor → page replay при следующем tick (idempotent upsert гарантирует safe replay).
stale_thresholdfalse positives устранены: catalog schedules получают 4-ч порог вместо 900 с.- readyz transient blip (1 503) больше не триггерит restart-loop.
- SIGTERM горутин-дамп ускоряет диагностику зависших sweep в production.
Стоимость:
- Все mock-реализации
CursorRepo(в тестах) потребовали добавления методаDelete. BulkSnapshotSource.Streamсигнатура удалена — все 4 source мигрировали наStreamRequest.- ETM source извлёк
streamWarehousePairshelper, IEK —streamPages, Systeme —streamCatalogRows, Russvet —streamCatalog(рефактор внутри source, внешний интерфейс не изменился). - Live IEK production sweep verification отложена как manual operator task (требует деплоя в production с наполненным credential pool).
Связано
- ADR-0050 (DKC + taxonomy cycle 4) — оригинальный cursor pattern, дистиллированный в этом цикле.
- ADR-0043 (ingestion audit) — per-supplier scheduler.Ticker + /readyz семантика, расширенная dampening’ом.
- ADR-0057 (connector events) — observability events, пропагируемые через sweep.
- ADR-0058 (retention) — monthly partitioning connector_events.
- ADR-0059 (ETM batch shape) — pair-iteration foundation, которую cursor strategy переиспользует для ETM commerce path.