ADR-0029: Revision-based incremental sync

Status: accepted Date: 2026-04-18 Deciders: команда проекта

Контекст

Два из пяти поставщиков поддерживают revision-based incremental sync:

  • DKC (api.dkc.ru): endpoints /revisions/last/size, /revisions/last, /revisions/{materials|drawings|certificates|packings|nodes|distributorOffices} — полный набор revision cursors. Stock revision формируется 1 раз в 2 часа.
  • Systeme Electric (api.systeme.ru): /getdeltaproducts?since=<date> — date-based incremental.

Остальные (ETM, smart-shop.pro, КЭАЗ): full refresh либо fetch-by-SKU.

Текущая модель: JobKind={refresh_observation, refresh_characteristics, full_catalog} — все делают pull-all-known-SKUs. При 100K+ товарах от одного поставщика это:

  • 100K запросов к price endpoint каждые 6 часов = ~5 req/сек sustained. Для большинства поставщиков это за пределами rate budget.
  • Невозможно поймать новые SKU быстро (только через periodic full_catalog, который даже дороже).
  • Сеть / storage / observability затраты пропорциональны всему каталогу, а не объёму реальных изменений.

DKC’s revision pattern даёт O(changes) вместо O(catalog).

Решение

Revision-based incremental sync — первоклассный паттерн в Ingestion BC. Не замена full refresh, а дополнение: поставщики, умеющие revisions, используют их по умолчанию, full refresh остаётся fallback’ом на восстановление.

1. IncrementalSyncCursor aggregate (Ingestion BC)

IncrementalSyncCursor
├── id: (supplier_ref, data_scope)
│     data_scope: materials | stock | price | characteristics | certificates |
│                 packings | nodes | distributor_offices | etim
├── cursor_kind: revision | date | opaque_token
├── cursor_value: string   // "12345" для revision; ISO-8601 для date; opaque для token
├── last_applied_at
├── last_applied_rev_size? // для revision: сколько событий было в revision
├── last_completed_job_id
├── health: healthy | stale | broken

Event-sourced. Один cursor per (supplier_ref, data_scope). Сохранение cursor’а — в той же транзакции, что и обработка RawPayloads (idempotency + no-loss gaurantee).

2. JobKind=incremental_sync

Новая kind в EnrichmentJob. В target кладётся:

{
  data_scope: materials|stock|...,
  from_cursor: <value>,     // берётся из IncrementalSyncCursor
  to_cursor?: <value>       // если поставщик поддерживает upper bound
}

Результат job: новый cursor сохраняется, поток OfferObservationRecorded / OfferCharacteristicsUpdated генерится по свежим payload’ам, старые товары не трогаются.

3. Two-tier schedule

  • Primary: incremental_sync — каждые 5-15 минут per data_scope. Легковесно. Дешёвые revision-размеры типа DKC — десятки KB.
  • Secondary: full_catalog / refresh_observation — ежедневно или еженедельно, в часы low-traffic. Защита от пропущенных изменений, повреждений cursor’а, backfill при изменении нашей модели.

4. Health + recovery

Cursor может деградировать:

  • stale: last_applied_at старше N * interval (пропуск, network issue, worker down).
  • broken: поставщик вернул «unknown cursor» / HTTP 400 на передачу нашего cursor_value.

Политика:

HealthДействие
healthyНормальный incremental flow.
stale N → 2N intervalsAlert cursor_stale{supplier, scope}. Auto-retry.
stale >= 3 * intervalAuto-fallback: запускается full_catalog job для scope; cursor помечается broken до успеха.
brokenБлокируется incremental; принудительный full_catalog → новый cursor seed’ится из response.

5. Capabilities

Capabilities.IncrementalSync:

IncrementalSync {
  Supported           bool
  DataScopes          []DataScope         // какие scope-ы поддерживаются
  CursorKind          revision | date | opaque_token
  MinFetchInterval    duration            // минимальная рекомендуемая частота poll
  MaxFetchInterval    duration            // «не реже чем» — иначе cursor может протухнуть на стороне поставщика
  CursorLifetime      duration?           // TTL cursor'а (DKC не документирует; считаем бесконечным, пока не ответит 400)
  SupportsBulkDeltas  bool                // можно ли запросить delta между двумя cursor'ами (не всегда с last)
}

Composition root отказывается стартовать incremental_sync jobs для connector с Supported=false.

6. Ordering + idempotency

Revision-based incremental гарантирует:

  • Упорядочивание событий внутри scope (DKC номерует revisions монотонно).
  • At-least-once delivery — consumer может увидеть один payload дважды, если job фейлится после фетча, до commit cursor.

Observation append-only + UNIQUE (offer_id, credential_id, seller_ref, observed_at) уже даёт идемпотентность (дублирующий payload → UNIQUE violation → ignored). Cursor commit safe:

BEGIN;
  INSERT observations ...; -- UNIQUE подавляет дубли
  UPDATE incremental_sync_cursor SET cursor_value=?, last_applied_at=now() WHERE id=?;
COMMIT;

7. События

СобытиеПричина
КурсорИнкрементальнойСинхронизацииПродвинут (IncrementalSyncCursorAdvanced)Успешный incremental fetch
КурсорИнкрементальнойСинхронизацииПротух (IncrementalSyncCursorStaled)stale timeout
КурсорИнкрементальнойСинхронизацииСломан (IncrementalSyncCursorBroken)Поставщик отверг cursor
ИнкрементальныйFallbackЗапущен (IncrementalFallbackToFullCatalogTriggered)Policy fallback

Топик: supplier.incremental.v1. Для observability dashboard.

8. Seed cursor

При подключении нового поставщика с incremental support:

  1. Первый job — full_catalog (full fetch).
  2. В response поставщика (или отдельным запросом /revisions/last) читается текущий top cursor.
  3. Cursor стартует с этого значения → дальше идут incremental.

Последствия

Плюсы

  • Массивно снижает нагрузку на API поставщиков (DKC: 100K товаров — 50 запросов/день вместо 100K × 4).
  • Реальная реактивность: новые цены / товары / сертификаты видны через 5-15 минут.
  • Cursor — explicit state, можно debug’ить (где застряли, почему).
  • Бэкап full_catalog защищает от cursor loss (корраптед БД, миграция, поставщик ресетнул revision table).

Минусы

  • Новый aggregate + lifecycle → сложность Ingestion BC растёт.
  • Multiple scopes = multiple cursors = координация (если materials обновлены, но characteristics cursor отстал — observation может сослаться на устаревшие characteristics).
  • Ошибка cursor commit после успешного fetch → повторная обработка событий (решается idempotency UNIQUE).

Нейтральные последствия

  • Non-incremental поставщики (ETM, smart-shop.pro, КЭАЗ) работают как раньше — IncrementalSync.Supported=false.
  • Full refresh становится запасным путём, а не основным — его частота может быть снижена (еженедельно для incremental-поставщиков).

Рассмотренные альтернативы

A. Полагаться только на full refresh с TTL

Текущий подход. Работает для ETM. Не работает для DKC (100K+ товаров, страшные costs).

B. Webhook-based push

Ни один из текущих поставщиков не умеет webhooks. Требовать у каждого — нереалистично. Оставлен как Pattern 3 для будущих поставщиков.

C. Single global cursor per supplier

Слишком грубо. У DKC отдельные revision streams для materials/stock/certificates/packings, нужна тонкость per-scope.

Ссылки

  • ADR-0024 (connector contract) — Capabilities.IncrementalSync новый блок.
  • ADR-0025 (observation extensions) — idempotency UNIQUE + observed_at используются для safe incremental apply.
  • ADR-0028 (federated identity taxonomies) — taxonomy registries (ETIM/eCl@ss/UNSPSC) тоже incremental-eligible (data_scope=taxonomy_).
  • ../../10-business/contexts/ingestion.mdIncrementalSyncCursor live здесь.
  • DKC swagger — api.dkc.ru/documentation/ (примерный reference).