Контекст: загрузка данных

NOTE

Статус: Target design. Документ описывает целевую доменную модель. Соответствующий код реализован частично (см. backend/internal/core/) или пока не начат. Правила маркировки — в 50-processes/documentation-standard.md.

Назначение

Ingestion поддерживает per-credential refresh (обязательно для proposal pipeline, search-proposal.md). E2E обязан прогонять pipeline над данными собранными per credential. Стратегия обхода задаётся per-job/per-supplier, не заставляет scheduler обходить всех customer credentials каждый раз (soft invariant ADR-0035 §13.20).

Принимает данные от внешних поставщиков и доставляет их в систему. Управляет очередью задач (EnrichmentJob), выбирает credential для каждого запроса, соблюдает rate limits, сохраняет сырые ответы, парсит их в стандартизованную форму и публикует событиями для downstream BC (Offers, Matching, Supplier Network).

Ingestion — единственный BC, который имеет физический контакт с внешним миром поставщика. Все остальные BC получают данные асинхронно через события.

Главный смысл

Клиент никогда не блокируется ожиданием поставщика. Ingestion — асинхронный pipeline между внешним API и внутренним event store.

Агрегаты / сущности / value objects

ИмяТипНазначение
EnrichmentJob🟨 AggregateAsync-задача на опрос поставщика. Lifecycle, retries, dedupe.
JobKindVO enumdiscovery / refresh_observation / refresh_characteristics / rebuild_supply_chain / full_catalog / stock_batch / incremental_sync / manufacturer_dictionary_refresh / characteristic_dictionary_refresh / classification_dictionary_refresh / taxonomy_dictionary_refresh / distributor_office_refresh / client_sku_map_import. incremental_sync — revision/date-based delta, efficient для поставщиков c IncrementalSync.Supported=true (ADR-0029).
IncrementalSyncCursor🟨 Aggregate(supplier_ref, data_scope) → {cursor_kind, cursor_value, last_applied_at, health}. Один cursor per scope. Event-sourced. ADR-0029.
DataScopeVO enummaterials / stock / price / characteristics / certificates / packings / nodes / distributor_offices / taxonomy_<standard>.
CursorKindVO enumrevision / date / opaque_token.
JobStatusVO enumqueued / running / awaiting_report / fetching / completed / failed / skipped. awaiting_report + fetching — для async-report паттерна (ADR-0024).
JobPriorityVO enumhigh / normal / low.
RemoteJobRefVO(uuid, provider, submitted_at, deadline) — для poll async-report.
PollStrategyVO(initial_delay, interval, max_wait, backoff). Poll считается запросом в DataBucket[async_poll], не DataBucket[data].
RawPayloadVOТело ответа поставщика: (supplier_sku, payload_type, data, fetched_at, source, credential_id, identity_pack: SkuIdentityPack).
ConnectorErrorVO enumAuthRejected / SessionExpired / RateLimited / NotFound / OnRequest / Transient / Permanent / PartialSuccess / MarkupDrift — единая taxonomy (ADR-0024, ADR-0027). MarkupDrift специфичен для scraper’ов.
TransportKindVO enumrest / file_feed / webhook / html_scrape (ADR-0024). Декларируется connector через Capabilities().
Connector⚙️ PortИнтерфейс реализации поставщика (Fetch(ctx, cred, job), Capabilities(), PollRemoteJob(ctx, cred, remote_ref)).
CredentialRouter⚙️ PortВыбирает credential для запроса (NextForCatalog, ForCustomer, ForSystem).
RateLimiter⚙️ PortДвухуровневый: AuthBucket (блокирующее ожидание) + DataBucket[endpoint_class] (skip_and_reschedule). ADR-0008.
SessionManager⚙️ PortAcquire/release session handle. Проактивный refresh за min(5min, TTL/10) до истечения.

Доменные события

СобытиеПричина
ЗапросНаПоступлениеДанныхСоздан (EnrichmentJobEnqueued)Schedule / user request / system event
ЗапросНаПоступлениеДанныхДедуплицирован (EnrichmentJobDeduplicated)Уже есть pending job с тем же (kind, target, credential_context)
ЗапросНаПоступлениеДанныхНачат (EnrichmentJobStarted)Worker взял job из очереди
ОтчётПоставщикаЗапущен (RemoteReportRequested)Connector отправил POST /job/create (async report pattern) → получен remote_job_ref
ОтчётПоставщикаГотов (RemoteReportReady)Poll вернул state=ready → можно скачивать файл
ОтчётПоставщикаЗавис (RemoteReportTimedOut)max_wait истёк без readyEnrichmentJobFailed
СырыеДанныеПолучены (RawPayloadStored)Connector вернул RawPayload, записан в S3
ЗапросНаПоступлениеДанныхЗавершён (EnrichmentJobCompleted)Все RawPayload сохранены
ЗапросНаПоступлениеДанныхЧастично (EnrichmentJobPartial)Часть payload’ов ок, часть — ConnectorError=Permanent (details в result_summary)
ЗапросНаПоступлениеДанныхПровален (EnrichmentJobFailed)ConnectorError=Permanent / AuthRejected после retry / RemoteReportTimedOut
ЗапросНаПоступлениеДанныхПропущен (EnrichmentJobSkipped)DataBucket исчерпан
БюджетЗапросовИсчерпан (RateBudgetExhausted)Token bucket пуст (публикуется с {bucket_kind: auth|data, endpoint_class?})
СессияОбновлена (SessionRefreshed)Re-login / token refresh (проактивный или по SessionExpired)
СессияИстекла (SessionExpired)Сервер поставщика вернул «сессия невалидна» (у ETM 403) — re-login затем retry запроса
СессияНевалидна (SessionAuthFailed)AuthRejected после свежего login — кандидат на MarkFailing
КурсорИнкрементальнойСинхронизацииПродвинут (IncrementalSyncCursorAdvanced)Успешный incremental fetch — новый cursor_value сохранён
КурсорИнкрементальнойСинхронизацииПротух (IncrementalSyncCursorStaled)last_applied_at старше max_fetch_interval × N
КурсорИнкрементальнойСинхронизацииСломан (IncrementalSyncCursorBroken)Поставщик вернул 400 на передачу cursor’а
ИнкрементальныйFallbackЗапущен (IncrementalFallbackToFullCatalogTriggered)Policy: cursor broken → автоматический full_catalog

Команды

КомандаАкторЦелевой агрегатРезультат
СоздатьЗапросНаПоступление (EnqueueEnrichmentJob)User / Schedule / PolicyEnrichmentJobEnrichmentJobEnqueued
НачатьЗапрос (StartJob)WorkerEnrichmentJobEnrichmentJobStarted
СохранитьСырыеДанные (StoreRawPayload)WorkerRawPayloadStored
ЗавершитьЗапрос (CompleteJob)WorkerEnrichmentJobEnrichmentJobCompleted
ОтметитьПровал (MarkJobFailed)WorkerEnrichmentJobEnrichmentJobFailed
ОбновитьСессию (RefreshSession)SessionManagerSessionRefreshed

Политики

ТриггерРеакция
EnrichmentJobEnqueued + есть pending для (kind, target, cred_ctx)EnrichmentJobDeduplicated (присоединиться к существующему)
EnrichmentJobStarted + DataBucket[endpoint] пустEnrichmentJobSkipped + reschedule с backoff (DataBucket.backoff). НЕ fail.
EnrichmentJobStarted + AuthBucket пуст→ блокирующее ожидание токена (ADR-0008), job остаётся в running
ConnectorError=SessionExpired→ re-login через SessionManager; retry запроса один раз. НЕ считается AuthRejected.
ConnectorError=AuthRejected + свежий loginSessionAuthFailed. 3 подряд за окно 10 мин → MarkCredentialFailing (Credentials BC)
ConnectorError=NotFoundRawPayloadStored c пустым content-маркером (товар исчез); не fail
ConnectorError=OnRequestRecordObservation с PriceSet.on_request=true; не fail
ConnectorError=RateLimited (server-side 429)EnrichmentJobSkipped + увеличить backoff
ConnectorError=PartialSuccessEnrichmentJobPartial + опубликовать success’ные payload’ы, неудачные в DLQ
RawPayloadStored→ команда ОбработатьСырыеДанные (Offers BC)
Kind=full_catalog с async-report у connector’аRemoteReportRequested, переход в awaiting_report; PollRemoteJob по PollStrategy
RemoteReportReady→ переход в fetching, скачать файл, сгенерировать RawPayloadStored на каждую строку
Schedule «опросить поставщика X каждые 6 ч»EnqueueEnrichmentJob{kind=refresh_observation} для connectors без incremental support
Schedule (incremental-ready connector, каждые 5-15 мин per data_scope)EnqueueEnrichmentJob{kind=incremental_sync, target: {data_scope, from_cursor}} (ADR-0029)
IncrementalSyncCursorBroken→ Auto-fallback: запустить full_catalog job для scope; cursor помечается broken до успеха
IncrementalSyncCursorStaled (> 3 × max_fetch_interval)→ Alert + auto-retry; после 2 retry → fallback на full_catalog
Schedule (weekly)EnqueueEnrichmentJob{kind=manufacturer_dictionary_refresh} / characteristic_dictionary_refresh / classification_dictionary_refresh / taxonomy_dictionary_refresh / distributor_office_refresh (если connector поддерживает)
CustomerCredentialActivated (Credentials BC)EnqueueEnrichmentJob{kind=discovery, credential=new} + client_sku_map_import если SgGds-like endpoint доступен
EnrichmentJobCompleted + клиент подписан→ push webhook / SSE notification

Read-модели

  • 🟩 enrichment_job_queue (PG) — очередь задач с приоритетами.
  • 🟩 enrichment_job_history (CH) — для аналитики latency / success rate.
  • 🟩 connector_metrics (CH) — (supplier_id, endpoint, outcome, latency).

Инварианты

  1. Идемпотентность: повторная обработка того же RawPayload не создаёт дублей.
  2. Дедупликация активна: на (kind, target, credential_context) — максимум один job в статусах queued/running/awaiting_report/fetching.
  3. Throttling user-request: на один target — не чаще одного user-triggered job в N секунд (default 60s).
  4. Ни одно из не-пропускаемое событий не теряется: при rate-limit job отправляется на reschedule, не drop’ается.
  5. RawPayload хранится не менее 30 дней (политика retention в open questions).
  6. Connector не выбирает credential сам — только CredentialRouter.
  7. На границе BC Connector работает только с CredentialContext (VO Shared Kernel), не с SupplierCredential aggregate.
  8. Connector нормализует любую ошибку в ConnectorError taxonomy (ADR-0024); «сырые» HTTP-коды не поднимаются в core.
  9. Два bucket rate limiter’а изолированы: AuthBucket не делит бюджет с DataBucket. Исчерпание data-бюджета не мешает восстановлению session через login.
  10. SessionExpiredAuthRejected: первое приводит к одной re-login попытке; три подряд AuthRejected после свежего login → MarkFailing.
  11. Job в awaiting_report не считается зависшим пока now() - submitted_at < PollStrategy.max_wait. Проактивный timeout — через RemoteReportTimedOut.
  12. Poll async-отчёта списывает токены из DataBucket[async_poll], отдельного от основного data-бюджета.

Интеграционные события (публикуем)

Топик: ingestion.events.v1. Partition key: supplier_id.

ИмяКогда
RawPayloadStoredSuccessful fetch + S3 write
EnrichmentJobCompletedВсе payload’ы по job сохранены
EnrichmentJobFailedJob провалена окончательно (после всех retries)
RateBudgetExhaustedДля алертов и dashboards

Подписанные интеграционные события

ИсточникСобытиеРеакция
CredentialsSupplierCredentialActivatedDiscovery job для новой credential
CredentialsSupplierCredentialRevokedCancel pending jobs с этой credential
CustomerCustomerEstimateRequestedHigh-priority refresh observation для линий сметы
CatalogCanonicalCreatedDiscovery: попросить других поставщиков, есть ли у них этот товар
Supplier NetworkSupplierRelationshipChangedВозможный rebuild_supply_chain job

Связи в context map

BCПаттернНазначение
CredentialsOHS (Credentials → Ingestion)CredentialContext — стабильный API для опроса
OffersPublished LanguageIngestion публикует RawPayloadStored; Offers строит SupplierOffer + Observation
Supplier NetworkCustomer/SupplierIngestion (downstream) использует граф при построении trace
External (поставщик API)Anti-Corruption LayerConnector нормализует контракт поставщика в RawPayload

Мини event storming

flowchart LR
    subgraph SCH["Schedule / User"]
        T["🟫 Cron / Customer"]
    end
    subgraph CRED["Credentials BC"]
        CC["🟦 CredentialContext"]
    end
    subgraph ING["Ingestion BC"]
        CMD1["🟦 EnqueueEnrichmentJob"]
        J["🟨 EnrichmentJob"]
        E1["🟧 EnrichmentJobEnqueued"]
        P1["🟪 Dedup policy"]
        E2["🟧 EnrichmentJobStarted"]
        W["🟦 Connector.Fetch"]
        E3["🟧 RawPayloadStored"]
        E4["🟧 EnrichmentJobCompleted"]
    end
    subgraph EXT["🌐 Внешний API поставщика"]
        API["Supplier API"]
    end
    subgraph OFF["Offers BC"]
        SUB["🟪 Subscribe RawPayloadStored"]
    end

    T --> CMD1 --> J --> E1 --> P1 --> E2 --> W
    CC -.OHS.-> W
    W --> API
    API --> W
    W --> E3 --> E4
    E3 -.PL.-> SUB

Связанные файлы

  • Архитектура: ../../20-architecture/integration-patterns.md.
  • ADR-0008 — двухуровневый rate limiter.
  • ADR-0011 — no-proxy архитектура.
  • ADR-0014 — graceful degradation без warmup.
  • ADR-0024 — supplier connector contract (обязательный контракт: session, rate, error taxonomy, async report, identity pack, media rights, capabilities, egress, TransportKind, Marketplace, Taxonomies, IncrementalSync, FileFeed, MultiCurrency).
  • ADR-0026 — marketplace observations + seller axis (fan-out fetch → N observations per seller).
  • ADR-0027 — HTML scraper connector pattern (TransportKind.html_scrape, compliance gate, markup drift detection).
  • ADR-0028 — federated identity taxonomies (ETIM, eCl@ss, GS1, MPN, OKPD2).
  • ADR-0029 — revision-based incremental sync (IncrementalSyncCursor aggregate, JobKind=incremental_sync).
  • Сценарий: ../scenarios/ingestion-flow.md, ../scenarios/update-and-diff.md, ../scenarios/supplier-dictionary-sync.md.
  • credentials.md, offers.md, supplier-network.md.