ADR-0060: Universal cursor-based catalog ingestion для batch suppliers

Status: accepted Date: 2026-05-17 Deciders: Maxim Belkanov

Контекст

Catalog ingestion для batch suppliers (ETM / IEK / Systeme / Russvet) выполнял full sweep каждый tick без сохранения позиции. Любой прерывающий событие — перезапуск контейнера, stale_timeout (15 мин, hardcoded), сбой supplier API — означал полную потерю прогресса.

Конкретная боль — IEK: 71 000 SKU @ 1 rps ≈ 20 ч sweep. Контейнер supplier-sync перезапускался каждые ~15 мин → catalog sweep никогда не завершался. Данные IEK в БД фактически не обновлялись.

DKC уже использовал отдельный CursorPipeline (cycle 4, ADR-0050) — cursor-based incremental синхронизация с сохранением токена в таблице incremental_sync_cursor. Задача — дистиллировать этот паттерн и применить его ко всем batch suppliers через generic абстракцию вместо дублирования кода.

Дополнительные проблемы, устранённые попутно:

  • stale_threshold_seconds был hardcoded 900 с глобально; catalog sweep длиннее commerce tick, что вызывало false-positive /readyz предупреждения.
  • Rate-limit был общим for всех поставщиков без возможности per-supplier настройки через env.
  • /readyz мог войти в restart-loop из-за одного transient blip.
  • Container SIGTERM не логировал goroutine dump → невозможно отследить зависший sweep.
  • ETM commerce checkpoint отсутствовал: весь batch без прогрессивного checkpoint’а.

Решение

CursorStrategy интерфейс

Новый файл ingestion/domain/cursor_strategy.go вводит:

type CursorStrategy interface {
    Kind() string
    Initial(ctx context.Context, supplier SupplierKind) (PageBoundary, error)
    Advance(prev PageBoundary, marker string) (next PageBoundary, done bool, err error)
}

PageBoundary — discriminated struct (поля Kind / PageNumber / Token / HighWatermark / Revision / Done / TotalPages). Выбор поля per-supplier по Kind строке. Сериализуется в JSON для хранения в incremental_sync_cursor.

StreamRequest и FetchResult

BulkSnapshotSource теряет метод Stream(ctx, cred) и приобретает:

StreamRequest(ctx context.Context, cred CredentialContext, req FetchRequest) (FetchResult, error)

FetchRequest.Cursor несёт PageBoundary для resume; FetchResult.PageMarker (строка) эмитируется после каждой страницы. Все четыре source (ETM / IEK / Systeme / Russvet) мигрировали на новую сигнатуру.

Durable-page advance в BulkSnapshotPipeline.RunOnce

Алгоритм:

  1. CursorRepo.Get(scopeKey) → если ErrCursorNotFound, вызвать strategy.Initial для seed.
  2. Per-page loop: source.StreamRequest(ctx, cred, req{Cursor: current}).
  3. Если страница обработана без ошибок (pendingErrors == 0): вызвать strategy.Advance → сохранить следующий курсор через CursorRepo.Save.
  4. Если страница содержала ошибки: курсор не сдвигается; при следующем restart страница переиграется.
  5. Sweep complete → CursorRepo.Delete(scopeKey) → следующий Get вернёт ErrCursorNotFoundstrategy.Initial seed (= новый full sweep).

Scope key: <jobKind>/credential/<url.PathEscape(CredentialRef)>. Catalog и commerce сохраняются в отдельных rows одной и той же таблицы.

Per-supplier стратегии

SupplierKindМеханизм курсора
IEKpage_offsetPageNumber int
ETMpage_offsetPageNumber int (SgGds job pages)
SystemeopaqueToken string из Link header
Russvetiso_dateHighWatermark дата последней записи

Выбор стратегии — compile-time per supplier capabilities (нет reflection).

Adjacent fixes

  • stale_threshold_seconds per-schedule: schedules таблица получила колонку stale_threshold_seconds; BulkSnapshotPipeline читает её через LEFT JOIN с fallback 900 с для commerce и 14 400 с (4 ч) для catalog.
  • Rate-limit per-supplier env-driven: SUPPLIER_<NAME>_RATE_RPS env vars; ingdom.RatePolicy хранит per-supplier bucket вместо глобального.
  • readyz dampening: 3 последовательных 503 перед установкой unhealthy; одиночный blip игнорируется.
  • SIGTERM goroutine dump: signal.NotifyContext + runtime.Stack в main.go при получении SIGTERM для диагностики зависших sweep.
  • DB-driven tickTracker hooks: scheduler.Ticker получил OnTick callback-хуки; tickTracker регистрирует tick start/end в PG, что позволяет админ-UI отображать live-статус sweep.
  • ETM commerce checkpoint: PageMarker эмитируется каждые 10 batch (50 SKU каждый) → при restart ETM commerce переиграет максимум 10 batch, не весь commerce tick.

I.1 решение: E2E cursor-resume тест

Отдельный E2E full-wiring тест для cursor resume mid-sweep не реализован. Решение обосновано покрытием на уровне per-supplier integration тестов: bulk_snapshot_source_test.go каждого supplier проверяет resume-with-cursor + idempotent upsert. Дублирование в E2E теоретически даёт signal о wiring, но не о дополнительных edge-cases. Стоимость (test-infra, testcontainers, latency) не оправдана на текущем масштабе. Решение зафиксировано в этом ADR.

Последствия

Позитивные:

  • IEK catalog sweep теперь завершается за ~4 ч (rate 5 rps default) без потери прогресса при перезапуске.
  • Container restart mid-sweep → resume c последней успешно подтверждённой страницы.
  • Partial-page errors freeze cursor → page replay при следующем tick (idempotent upsert гарантирует safe replay).
  • stale_threshold false positives устранены: catalog schedules получают 4-ч порог вместо 900 с.
  • readyz transient blip (1 503) больше не триггерит restart-loop.
  • SIGTERM горутин-дамп ускоряет диагностику зависших sweep в production.

Стоимость:

  • Все mock-реализации CursorRepo (в тестах) потребовали добавления метода Delete.
  • BulkSnapshotSource.Stream сигнатура удалена — все 4 source мигрировали на StreamRequest.
  • ETM source извлёк streamWarehousePairs helper, IEK — streamPages, Systeme — streamCatalogRows, Russvet — streamCatalog (рефактор внутри source, внешний интерфейс не изменился).
  • Live IEK production sweep verification отложена как manual operator task (требует деплоя в production с наполненным credential pool).

Связано

  • ADR-0050 (DKC + taxonomy cycle 4) — оригинальный cursor pattern, дистиллированный в этом цикле.
  • ADR-0043 (ingestion audit) — per-supplier scheduler.Ticker + /readyz семантика, расширенная dampening’ом.
  • ADR-0057 (connector events) — observability events, пропагируемые через sweep.
  • ADR-0058 (retention) — monthly partitioning connector_events.
  • ADR-0059 (ETM batch shape) — pair-iteration foundation, которую cursor strategy переиспользует для ETM commerce path.