Контекст: загрузка данных
NOTE
Статус: Target design. Документ описывает целевую доменную модель. Соответствующий код реализован частично (см.
backend/internal/core/) или пока не начат. Правила маркировки — в50-processes/documentation-standard.md.
Назначение
Ingestion поддерживает per-credential refresh (обязательно для proposal pipeline,
search-proposal.md). E2E обязан прогонять pipeline над данными собранными per credential. Стратегия обхода задаётся per-job/per-supplier, не заставляет scheduler обходить всех customer credentials каждый раз (soft invariant ADR-0035 §13.20).
Принимает данные от внешних поставщиков и доставляет их в систему. Управляет очередью задач (EnrichmentJob), выбирает credential для каждого запроса, соблюдает rate limits, сохраняет сырые ответы, парсит их в стандартизованную форму и публикует событиями для downstream BC (Offers, Matching, Supplier Network).
Ingestion — единственный BC, который имеет физический контакт с внешним миром поставщика. Все остальные BC получают данные асинхронно через события.
Главный смысл
Клиент никогда не блокируется ожиданием поставщика. Ingestion — асинхронный pipeline между внешним API и внутренним event store.
Агрегаты / сущности / value objects
| Имя | Тип | Назначение |
|---|---|---|
EnrichmentJob | 🟨 Aggregate | Async-задача на опрос поставщика. Lifecycle, retries, dedupe. |
JobKind | VO enum | discovery / refresh_observation / refresh_characteristics / rebuild_supply_chain / full_catalog / stock_batch / incremental_sync / manufacturer_dictionary_refresh / characteristic_dictionary_refresh / classification_dictionary_refresh / taxonomy_dictionary_refresh / distributor_office_refresh / client_sku_map_import. incremental_sync — revision/date-based delta, efficient для поставщиков c IncrementalSync.Supported=true (ADR-0029). |
IncrementalSyncCursor | 🟨 Aggregate | (supplier_ref, data_scope) → {cursor_kind, cursor_value, last_applied_at, health}. Один cursor per scope. Event-sourced. ADR-0029. |
DataScope | VO enum | materials / stock / price / characteristics / certificates / packings / nodes / distributor_offices / taxonomy_<standard>. |
CursorKind | VO enum | revision / date / opaque_token. |
JobStatus | VO enum | queued / running / awaiting_report / fetching / completed / failed / skipped. awaiting_report + fetching — для async-report паттерна (ADR-0024). |
JobPriority | VO enum | high / normal / low. |
RemoteJobRef | VO | (uuid, provider, submitted_at, deadline) — для poll async-report. |
PollStrategy | VO | (initial_delay, interval, max_wait, backoff). Poll считается запросом в DataBucket[async_poll], не DataBucket[data]. |
RawPayload | VO | Тело ответа поставщика: (supplier_sku, payload_type, data, fetched_at, source, credential_id, identity_pack: SkuIdentityPack). |
ConnectorError | VO enum | AuthRejected / SessionExpired / RateLimited / NotFound / OnRequest / Transient / Permanent / PartialSuccess / MarkupDrift — единая taxonomy (ADR-0024, ADR-0027). MarkupDrift специфичен для scraper’ов. |
TransportKind | VO enum | rest / file_feed / webhook / html_scrape (ADR-0024). Декларируется connector через Capabilities(). |
Connector | ⚙️ Port | Интерфейс реализации поставщика (Fetch(ctx, cred, job), Capabilities(), PollRemoteJob(ctx, cred, remote_ref)). |
CredentialRouter | ⚙️ Port | Выбирает credential для запроса (NextForCatalog, ForCustomer, ForSystem). |
RateLimiter | ⚙️ Port | Двухуровневый: AuthBucket (блокирующее ожидание) + DataBucket[endpoint_class] (skip_and_reschedule). ADR-0008. |
SessionManager | ⚙️ Port | Acquire/release session handle. Проактивный refresh за min(5min, TTL/10) до истечения. |
Доменные события
| Событие | Причина |
|---|---|
ЗапросНаПоступлениеДанныхСоздан (EnrichmentJobEnqueued) | Schedule / user request / system event |
ЗапросНаПоступлениеДанныхДедуплицирован (EnrichmentJobDeduplicated) | Уже есть pending job с тем же (kind, target, credential_context) |
ЗапросНаПоступлениеДанныхНачат (EnrichmentJobStarted) | Worker взял job из очереди |
ОтчётПоставщикаЗапущен (RemoteReportRequested) | Connector отправил POST /job/create (async report pattern) → получен remote_job_ref |
ОтчётПоставщикаГотов (RemoteReportReady) | Poll вернул state=ready → можно скачивать файл |
ОтчётПоставщикаЗавис (RemoteReportTimedOut) | max_wait истёк без ready → EnrichmentJobFailed |
СырыеДанныеПолучены (RawPayloadStored) | Connector вернул RawPayload, записан в S3 |
ЗапросНаПоступлениеДанныхЗавершён (EnrichmentJobCompleted) | Все RawPayload сохранены |
ЗапросНаПоступлениеДанныхЧастично (EnrichmentJobPartial) | Часть payload’ов ок, часть — ConnectorError=Permanent (details в result_summary) |
ЗапросНаПоступлениеДанныхПровален (EnrichmentJobFailed) | ConnectorError=Permanent / AuthRejected после retry / RemoteReportTimedOut |
ЗапросНаПоступлениеДанныхПропущен (EnrichmentJobSkipped) | DataBucket исчерпан |
БюджетЗапросовИсчерпан (RateBudgetExhausted) | Token bucket пуст (публикуется с {bucket_kind: auth|data, endpoint_class?}) |
СессияОбновлена (SessionRefreshed) | Re-login / token refresh (проактивный или по SessionExpired) |
СессияИстекла (SessionExpired) | Сервер поставщика вернул «сессия невалидна» (у ETM 403) — re-login затем retry запроса |
СессияНевалидна (SessionAuthFailed) | AuthRejected после свежего login — кандидат на MarkFailing |
КурсорИнкрементальнойСинхронизацииПродвинут (IncrementalSyncCursorAdvanced) | Успешный incremental fetch — новый cursor_value сохранён |
КурсорИнкрементальнойСинхронизацииПротух (IncrementalSyncCursorStaled) | last_applied_at старше max_fetch_interval × N |
КурсорИнкрементальнойСинхронизацииСломан (IncrementalSyncCursorBroken) | Поставщик вернул 400 на передачу cursor’а |
ИнкрементальныйFallbackЗапущен (IncrementalFallbackToFullCatalogTriggered) | Policy: cursor broken → автоматический full_catalog |
Команды
| Команда | Актор | Целевой агрегат | Результат |
|---|---|---|---|
СоздатьЗапросНаПоступление (EnqueueEnrichmentJob) | User / Schedule / Policy | EnrichmentJob | EnrichmentJobEnqueued |
НачатьЗапрос (StartJob) | Worker | EnrichmentJob | EnrichmentJobStarted |
СохранитьСырыеДанные (StoreRawPayload) | Worker | — | RawPayloadStored |
ЗавершитьЗапрос (CompleteJob) | Worker | EnrichmentJob | EnrichmentJobCompleted |
ОтметитьПровал (MarkJobFailed) | Worker | EnrichmentJob | EnrichmentJobFailed |
ОбновитьСессию (RefreshSession) | SessionManager | — | SessionRefreshed |
Политики
| Триггер | Реакция |
|---|---|
EnrichmentJobEnqueued + есть pending для (kind, target, cred_ctx) | → EnrichmentJobDeduplicated (присоединиться к существующему) |
EnrichmentJobStarted + DataBucket[endpoint] пуст | → EnrichmentJobSkipped + reschedule с backoff (DataBucket.backoff). НЕ fail. |
EnrichmentJobStarted + AuthBucket пуст | → блокирующее ожидание токена (ADR-0008), job остаётся в running |
ConnectorError=SessionExpired | → re-login через SessionManager; retry запроса один раз. НЕ считается AuthRejected. |
ConnectorError=AuthRejected + свежий login | → SessionAuthFailed. 3 подряд за окно 10 мин → MarkCredentialFailing (Credentials BC) |
ConnectorError=NotFound | → RawPayloadStored c пустым content-маркером (товар исчез); не fail |
ConnectorError=OnRequest | → RecordObservation с PriceSet.on_request=true; не fail |
ConnectorError=RateLimited (server-side 429) | → EnrichmentJobSkipped + увеличить backoff |
ConnectorError=PartialSuccess | → EnrichmentJobPartial + опубликовать success’ные payload’ы, неудачные в DLQ |
RawPayloadStored | → команда ОбработатьСырыеДанные (Offers BC) |
Kind=full_catalog с async-report у connector’а | → RemoteReportRequested, переход в awaiting_report; PollRemoteJob по PollStrategy |
RemoteReportReady | → переход в fetching, скачать файл, сгенерировать RawPayloadStored на каждую строку |
| Schedule «опросить поставщика X каждые 6 ч» | → EnqueueEnrichmentJob{kind=refresh_observation} для connectors без incremental support |
| Schedule (incremental-ready connector, каждые 5-15 мин per data_scope) | → EnqueueEnrichmentJob{kind=incremental_sync, target: {data_scope, from_cursor}} (ADR-0029) |
IncrementalSyncCursorBroken | → Auto-fallback: запустить full_catalog job для scope; cursor помечается broken до успеха |
IncrementalSyncCursorStaled (> 3 × max_fetch_interval) | → Alert + auto-retry; после 2 retry → fallback на full_catalog |
| Schedule (weekly) | → EnqueueEnrichmentJob{kind=manufacturer_dictionary_refresh} / characteristic_dictionary_refresh / classification_dictionary_refresh / taxonomy_dictionary_refresh / distributor_office_refresh (если connector поддерживает) |
CustomerCredentialActivated (Credentials BC) | → EnqueueEnrichmentJob{kind=discovery, credential=new} + client_sku_map_import если SgGds-like endpoint доступен |
EnrichmentJobCompleted + клиент подписан | → push webhook / SSE notification |
Read-модели
- 🟩
enrichment_job_queue(PG) — очередь задач с приоритетами. - 🟩
enrichment_job_history(CH) — для аналитики latency / success rate. - 🟩
connector_metrics(CH) —(supplier_id, endpoint, outcome, latency).
Инварианты
- Идемпотентность: повторная обработка того же RawPayload не создаёт дублей.
- Дедупликация активна: на (kind, target, credential_context) — максимум один job в статусах
queued/running/awaiting_report/fetching. - Throttling user-request: на один target — не чаще одного user-triggered job в N секунд (default 60s).
- Ни одно из не-пропускаемое событий не теряется: при rate-limit job отправляется на reschedule, не drop’ается.
- RawPayload хранится не менее 30 дней (политика retention в open questions).
- Connector не выбирает credential сам — только
CredentialRouter. - На границе BC Connector работает только с
CredentialContext(VO Shared Kernel), не сSupplierCredentialaggregate. - Connector нормализует любую ошибку в
ConnectorErrortaxonomy (ADR-0024); «сырые» HTTP-коды не поднимаются в core. - Два bucket rate limiter’а изолированы:
AuthBucketне делит бюджет сDataBucket. Исчерпание data-бюджета не мешает восстановлению session через login. SessionExpired≠AuthRejected: первое приводит к одной re-login попытке; три подрядAuthRejectedпосле свежего login →MarkFailing.- Job в
awaiting_reportне считается зависшим покаnow() - submitted_at < PollStrategy.max_wait. Проактивный timeout — черезRemoteReportTimedOut. - Poll async-отчёта списывает токены из
DataBucket[async_poll], отдельного от основного data-бюджета.
Интеграционные события (публикуем)
Топик: ingestion.events.v1. Partition key: supplier_id.
| Имя | Когда |
|---|---|
RawPayloadStored | Successful fetch + S3 write |
EnrichmentJobCompleted | Все payload’ы по job сохранены |
EnrichmentJobFailed | Job провалена окончательно (после всех retries) |
RateBudgetExhausted | Для алертов и dashboards |
Подписанные интеграционные события
| Источник | Событие | Реакция |
|---|---|---|
| Credentials | SupplierCredentialActivated | Discovery job для новой credential |
| Credentials | SupplierCredentialRevoked | Cancel pending jobs с этой credential |
| Customer | CustomerEstimateRequested | High-priority refresh observation для линий сметы |
| Catalog | CanonicalCreated | Discovery: попросить других поставщиков, есть ли у них этот товар |
| Supplier Network | SupplierRelationshipChanged | Возможный rebuild_supply_chain job |
Связи в context map
| BC | Паттерн | Назначение |
|---|---|---|
| Credentials | OHS (Credentials → Ingestion) | CredentialContext — стабильный API для опроса |
| Offers | Published Language | Ingestion публикует RawPayloadStored; Offers строит SupplierOffer + Observation |
| Supplier Network | Customer/Supplier | Ingestion (downstream) использует граф при построении trace |
| External (поставщик API) | Anti-Corruption Layer | Connector нормализует контракт поставщика в RawPayload |
Мини event storming
flowchart LR subgraph SCH["Schedule / User"] T["🟫 Cron / Customer"] end subgraph CRED["Credentials BC"] CC["🟦 CredentialContext"] end subgraph ING["Ingestion BC"] CMD1["🟦 EnqueueEnrichmentJob"] J["🟨 EnrichmentJob"] E1["🟧 EnrichmentJobEnqueued"] P1["🟪 Dedup policy"] E2["🟧 EnrichmentJobStarted"] W["🟦 Connector.Fetch"] E3["🟧 RawPayloadStored"] E4["🟧 EnrichmentJobCompleted"] end subgraph EXT["🌐 Внешний API поставщика"] API["Supplier API"] end subgraph OFF["Offers BC"] SUB["🟪 Subscribe RawPayloadStored"] end T --> CMD1 --> J --> E1 --> P1 --> E2 --> W CC -.OHS.-> W W --> API API --> W W --> E3 --> E4 E3 -.PL.-> SUB
Связанные файлы
- Архитектура:
../../20-architecture/integration-patterns.md. - ADR-0008 — двухуровневый rate limiter.
- ADR-0011 — no-proxy архитектура.
- ADR-0014 — graceful degradation без warmup.
- ADR-0024 — supplier connector contract (обязательный контракт: session, rate, error taxonomy, async report, identity pack, media rights, capabilities, egress, TransportKind, Marketplace, Taxonomies, IncrementalSync, FileFeed, MultiCurrency).
- ADR-0026 — marketplace observations + seller axis (fan-out fetch → N observations per seller).
- ADR-0027 — HTML scraper connector pattern (TransportKind.html_scrape, compliance gate, markup drift detection).
- ADR-0028 — federated identity taxonomies (ETIM, eCl@ss, GS1, MPN, OKPD2).
- ADR-0029 — revision-based incremental sync (IncrementalSyncCursor aggregate, JobKind=incremental_sync).
- Сценарий:
../scenarios/ingestion-flow.md,../scenarios/update-and-diff.md,../scenarios/supplier-dictionary-sync.md. credentials.md,offers.md,supplier-network.md.