ADR-0043: Ingestion orchestration — per-supplier tickers + Postgres advisory lease

Status: accepted Date: 2026-04-29 Deciders: agent-claude

Контекст

Ingestion-worker cmd/supplier-sync использовал один глобальный scheduler.Ticker с последовательным проходом по списку поставщиков. Это давало три практические проблемы:

  1. Один медленный или зависший поставщик блокировал всех остальных в пределах одного тика — следующий тик начинался только после завершения предыдущего.
  2. Раннее if cfg.ETM.BaseURL == "" отключало весь scheduler; отсутствие одного поставщика глушило ingestion целиком.
  3. Два экземпляра процесса (blue-green deploy, scale-out, два cron’а на разных хостах) опрашивали одного поставщика одновременно — двойной HTTP-трафик к API поставщика, удвоенный расход rate-limit бюджета.

Кроме того, отсутствовала per-supplier observability — slog.Info("ingestion tick summary", ...) в IngestOrchestrator.RunOnce единственный сигнал; нет отдельных счётчиков success/fail/latency на алерты.

Решение

Реализуем variant C — полный аудит scheduler-side слоя.

1. Per-supplier scheduler.Ticker

На каждого включённого поставщика регистрируется отдельный *scheduler.Ticker с собственным интервалом и собственной goroutine. Медленный или сломанный поставщик не влияет на расписание других.

2. Postgres advisory lock

pg_try_advisory_lock(hashtext('ingestion:'+supplier)::bigint) — единственный механизм межпроцессной блокировки. Берётся в начале тика, удерживается через одно занятое соединение из pool’а до конца тика, освобождается через pg_advisory_unlock + conn.Release(). На падение процесса Postgres снимает lock автоматически (закрытие сессии).

ErrLeaseBusy — нормальный сценарий, не ошибка. Метрика tick_attempt{outcome="lease_busy"} инкрементится, тик пропускается.

3. Расписание в env-конфиге, расширенная структура

SUPPLIER_SYNC_SUPPLIERS=etm:6h:true,iek:12h:true,systeme:6h:false — список name:interval:enabled тройек. Парсер в platform/config. Validate обнаруживает дубликаты, нулевые интервалы, all-disabled, устаревшие env- переменные SUPPLIER_SYNC_TICK_INTERVAL/SUPPLIER_SYNC_ENABLED (помогает ловить half-migrated деплои).

DB-driven schedule отложен — env-конфиг достаточен на этом этапе, параметры меняются редко.

4. ETM-gate удалён

if cfg.ETM.BaseURL == "" больше нет. Per-supplier guard уже встроен в IngestOrchestrator.RunOnce через upfront lookup в ConnectorBundle map’е: если bundle для поставщика отсутствует, RunOnce возвращает typed-error, тикер логирует warn и продолжает.

5. IngestionMetrics в domain/

Интерфейс IngestionMetrics живёт в ingestion/domain/ для разрыва циклов app↔infra (тот же паттерн что P6 ExchangeMetrics). Реализация AtomicIngestionMetrics в ingestion/app/, slog-mirror, Prometheus- compliant naming (ingestion.metrics.tick_attempt, ingestion.metrics.tick_duration_seconds, ingestion.metrics.last_success_age_seconds, ingestion.metrics.skus_processed_total).

6. /readyz per-supplier

tickTracker хранит map[supplier]time.Time под mutex’ом. Loopbackhttp /readyz через расширенный интерфейс LastTickPerSupplier для каждого включённого поставщика проверяет now - last_success < 2 * interval. Если хоть один протух — 503 с детализацией в JSON-теле. Никогда не тикавшие поставщики (fresh boot) дают check value "none" без 503 — иначе сервис флапал бы unhealthy между стартом и первым RunImmediately.

Последствия

Плюсы:

  • Один сломанный поставщик не валит расписание других.
  • Blue-green деплой / scale-out не дублируют ingestion.
  • --tick-once для cron безопасен на нескольких хостах одновременно.
  • /readyz детектит per-supplier stale data — алерт на конкретного поставщика, а не “ingestion в целом”.

Минусы / принятые trade-offs:

  • Расписание в env — изменения требуют деплоя. Деплой как раз и есть место, где видна вся конфигурация в git.
  • Postgres advisory lock держит одно соединение из pool’а на всё время тика. Pool на supplier-sync — обычно 4-8 соединений; одно занято под lock — терпимо, тики обычно длятся минуты.
  • Heartbeat для очень длинных тиков не реализован — текущие тики измеряются минутами. Если тики приблизятся к нескольким часам — добавим heartbeat в follow-up цикле.

Альтернативы, отклонённые

A. Redis SETNX с TTL вместо Postgres advisory lock

Redis в проекте используется как кеш и rate-bucket store — non-critical, любой ops-инцидент даёт ложные срабатывания. Postgres — единственный устойчивый источник правды; lock’и здесь идиоматичны (outbox, event store, всё критичное — на Postgres). Auto-release “на смерть процесса” в Postgres встроен, не нужно угадывать TTL.

B. Ничего — положиться на upsert-идемпотентность

Postgres-уровень upsert защищает от дубликатов в supplier_offer_observations, но не от двойного HTTP трафика к поставщикам. На rate-limit бюджеты поставщиков (десятки запросов в минуту) такое не пройдёт.

C. DB-driven schedule (suppliers таблица + admin endpoint)

Преждевременно. Параметры расписания меняются редко; env-конфиг с расширенной структурой решает 95% операционных задач без новой инфраструктуры (миграция БД, admin API, кеширование). Если operational agility станет узким местом — выделим отдельный цикл.

Реализация

Затронутые компоненты:

backend/internal/platform/config/config.goSupplierSchedule struct, парсер envSupplierSchedules, Validate per-supplier checks + flag deprecated env vars.

backend/internal/core/ingestion/domain/:

  • metrics.goIngestionMetrics interface + NoopIngestionMetrics impl.

backend/internal/core/ingestion/app/:

  • metrics.goAtomicIngestionMetrics slog-mirror impl.
  • lease.goLeaseGuard.Acquire (returns release closure or ErrLeaseBusy).

backend/internal/core/ingestion/infra/pg/:

  • advisory_lock.goTryAdvisoryLock / AdvisoryUnlock helpers через pg_try_advisory_lock(hashtext($1)::bigint).

backend/internal/core/ingestion/di.go — провайдит LeaseGuard и AtomicIngestionMetrics (типизированный как ingdom.IngestionMetrics).

backend/internal/platform/loopbackhttp/server.goLastTickProviderLastTickPerSupplier; Server.Intervals снапшот для readyz threshold; per- supplier handler logic.

backend/cmd/supplier-sync/main.goregisterTickerregisterPerSupplierTickers; runSupplierTick per-tick callback с lease + metrics + tracker; runOnceAndExit обновлён под новую конфигурацию; tickTracker per-supplier map.

Deferred items

  1. Heartbeat extension lock’а для тиков длиннее N часов. Текущие тики укладываются в минуты, advisory lock на сессию приемлем.
  2. DB-driven schedule (suppliers таблица). Operational agility, не correctness.
  3. Cross-region / cluster-wide lease (Redis SETNX, etcd, FoundationDB). Postgres advisory покрывает single-region multi-instance.
  4. Per-credential schedule. Когда customer-credential тики потребуют отдельных rate-limit бюджетов.
  5. Pluggable error-classifier. Текущий classifyError различает fetch_error vs storage_error через component prefix; более тонкая классификация (rate-limit, auth, parse) — follow-up.
  6. /metrics HTTP endpoint + Prometheus exporter. Унаследован deferred-status от Live-D и P6.

Ссылки

  • ADR-0011 — No-proxy (ingestion async pipeline)
  • ADR-0024 — Supplier connector contract
  • ADR-0030 — Backend DI rule
  • Spec: docs/superpowers/specs/2026-04-29-ingestion-orchestration-audit-design.md
  • Plan: docs/superpowers/plans/2026-04-29-ingestion-orchestration-audit.md