Сценарий: поступление данных от поставщика
NOTE
Статус: Target design. Документ описывает целевую доменную модель. Соответствующий код реализован частично (см.
backend/internal/core/) или пока не начат. Правила маркировки — в50-processes/documentation-standard.md.
Триггер
Один из:
Schedule— cron «опросить поставщика X каждые 6 ч».User— клиент инициировал refresh через UI / API.System event—CustomerEstimateRequested(high-priority refresh для линий сметы),SupplierCredentialActivated(discovery новой credential),CanonicalCreated(попросить других поставщиков).
Участники
| BC | Роль |
|---|---|
| Ingestion | Owner процесса. Выбор credential, опрос API, сохранение raw, парсинг. |
| Credentials | Выдаёт CredentialContext через router. Может перейти в failing при auth-ошибке. |
| Supplier Network | Получает SupplierOfferSeen → пересчитывает SupplyChainTrace. |
| Offers | Принимает RawPayloadStored → строит SupplierOffer + Observation. |
| Matching | Слушает SupplierOfferSeen / OfferCharacteristicsUpdated → AttemptMatch. |
| External | API поставщика. |
Sequence diagram
sequenceDiagram autonumber participant SCH as Schedule/User/System participant ING as Ingestion participant CR as Credentials participant API as 🌐 Supplier API participant S3 as S3 (raw) participant OFR as Offers participant SN as Supplier Network participant M as Matching SCH->>ING: 🟦 EnqueueEnrichmentJob ING-->>ING: 🟧 EnrichmentJobEnqueued Note over ING: 🟪 dedup policy ING->>CR: NextForCatalog / ForCustomer / ForSystem CR-->>ING: 🟦 CredentialContext ING->>ING: 🟧 EnrichmentJobStarted ING->>API: HTTP fetch (with auth) API-->>ING: response ING->>S3: write raw payload ING-->>ING: 🟧 RawPayloadStored par publish PL ING-->>OFR: 🟧 RawPayloadStored ING-->>SN: 🟧 RawPayloadStored end OFR->>OFR: 🟦 IngestRawPayload (parse) OFR-->>OFR: 🟧 SupplierOfferSeen<br/>(или OfferCharacteristicsUpdated) OFR->>OFR: 🟦 RecordObservation OFR-->>OFR: 🟧 OfferObservationRecorded OFR-->>M: 🟧 SupplierOfferSeen OFR-->>SN: 🟧 SupplierOfferSeen SN->>SN: 🟦 RecomputeTrace SN-->>SN: 🟧 SupplyChainTraceRecomputed SN-->>OFR: SupplyChainTrace VO snapshot M->>M: 🟦 AttemptMatch M-->>M: 🟧 MatchDecided ING->>ING: 🟧 EnrichmentJobCompleted
Шаги
- Job создаётся (Schedule / user / system event). Команда
EnqueueEnrichmentJob{kind, target, credential_context}.- Для connectors с
Capabilities.IncrementalSync.Supported=true(DKC, Systeme) — primary pathkind=incremental_syncctarget.from_cursorизIncrementalSyncCursoraggregate.full_catalog— только fallback при broken cursor или seed. - Для file_feed (КЭАЗ) —
kind=full_catalogс conditionalIf-Modified-Since/ URL-timestamp проверкой (если not modified →EnrichmentJobSkipped{reason=not_modified}).
- Для connectors с
- Дедупликация policy: если уже есть pending/running/awaiting_report/fetching для тех же
(kind, target, cred_ctx)—EnrichmentJobDeduplicated, присоединиться. - Worker берёт job:
StartJob→EnrichmentJobStarted. Запрашивает уCredentialRouterCredentialContext(c ключамиAuthBucketиDataBucket[endpoint]). - Rate budget check (двухуровневый, ADR-0008):
DataBucket[endpoint]пуст →EnrichmentJobSkipped+ reschedule с backoff. Никаких drop’ов.AuthBucketпуст → блокирующее ожидание (не fail). Нужен для восстановления session.
- Session acquire: SessionManager.Acquire. Если нет живой session — login (через
AuthBucket). Проактивный refresh приremaining_ttl < min(5min, TTL/10). - Connector.Fetch (sync endpoint) или Connector.SubmitRemoteJob (async-report endpoint):
- Sync: HTTP-запрос, получен response.
- Async-report (например, ETM
SgGds): POST create →RemoteJobRef{uuid}→RemoteReportRequested, job →awaiting_report.PollRemoteJobпоPollStrategy(списываются токены изDataBucket[async_poll]).state=ready→RemoteReportReady+fetching.max_waitистёк →RemoteReportTimedOut+EnrichmentJobFailed.
- Error normalization: connector нормализует ответ в
ConnectorError(ADR-0024).SessionExpired→ re-login, retry один раз.AuthRejected(после свежего login) →SessionAuthFailed; три подряд за 10 мин →MarkFailing.NotFound→ пустой observation-маркер (товар исчез), не fail.OnRequest→ observation сPriceSet.on_request=true.RateLimited→EnrichmentJobSkipped+ backoff.Transient→ exponential backoff.Permanent/ parse error →EnrichmentJobFailed+ DLQ.
- Parse + Save: каждая
RawPayload(с полнымSkuIdentityPack) пишется в S3 (immutable), генеритсяRawPayloadStored. - Offers consumer (Policy
RawPayloadStored):IngestRawPayload→ создаёт/обновляетSupplierOffer. Если(supplier_ref, supplier_sku)нов —SupplierOfferSeen. Иначе —OfferCharacteristicsUpdated/OfferMediaUpdated(по содержимому diff).- Если
Capabilities.Marketplace.IsMarketplace=true(ADR-0026): один payload содержит N seller-rows. Consumer fan-out’ит:- для каждого seller: lookup/создать
Sellerentity (SellerRegisteredесли новый) →SupplierRelationshipEstablished{kind=listed_on, source=observed}. RecordObservationN раз, каждое сseller_ref: append-onlyOfferObservationRecorded{offer, credential, seller_ref, prices, stock, ...}. Все N observations имеют одинаковыйobserved_at(время fetch’а aggregator’а).- Seller rating в payload →
SellerRatingUpdated(если diff ≥ min_delta). - Seller отсутствует в payload > N fetch’ей подряд →
SellerStatusChanged{status=inactive}.
- для каждого seller: lookup/создать
- Если
IsMarketplace=false(ETM-like):RecordObservationодин раз,seller_ref=null. - Diff к предыдущему observation (per
(offer, credential, seller_ref)) — derivedOfferPriceSetChanged(с маской изменённых полей),OfferStockChanged,OfferForecastUpdated,OfferDeliveryTermChanged. - Observations от connector’а с
TransportKind=html_scrape(ADR-0027) помечаютсяreliability=degraded. - Multi-currency payload (ADR-0025 extended): если connector отдаёт несколько валют (DKC RUB+KZT) —
observation.pricesзаполняется какMultiCurrencyPrices{"RUB":..., "KZT":...}в одном observation. Не множим observations per currency. - External identities (ADR-0028): если payload несёт ETIM class / GTIN / MPN — после MatchDecided для связанного Canonical отправляется команда
AssignExternalIdentity→ExternalIdentityAssignedevent вcatalog.external_identities.v1. 10a. Incremental sync commit (ADR-0029): приkind=incremental_sync— обработка RawPayloads и cursor update происходят в одной транзакции:
Если поставщик вернулBEGIN; INSERT observations... (UNIQUE подавляет дубли при at-least-once); UPDATE incremental_sync_cursor SET cursor_value=<new>, last_applied_at=now() WHERE id=?; COMMIT;400 unknown cursor—IncrementalSyncCursorBroken→ auto-fallback наfull_catalog. - Supplier Network consumer (Policy
SupplierOfferSeen/ change events):RecomputeTrace→SupplyChainTraceRecomputed. Snapshot (включаяwarehouse_chain) вкладывается обратно вSupplierOffer.supply_chain_trace. Если connector вернулInfoSuppStores-подобный payload с новым складом — auto-созданиеWarehouse{kind=manufacturer_warehouse, trust_level=origin}. - Matching consumer (Policy
SupplierOfferSeen/OfferCharacteristicsUpdated):AttemptMatch→MatchDecided(см. matching-flow.md). Характеристики проходят черезSupplierCharacteristicMapping; неизвестные supplier_char_code — в Moderation. - Job completion:
EnrichmentJobCompleted/EnrichmentJobPartial. Если кто-то подписан (webhook / SSE) — push-нотификация.
Decision points
- Дедуп найден → join вместо нового запроса.
SessionExpired(ETM 403) → re-login + retry один раз. Не считается AuthRejected.- Три подряд
AuthRejectedпосле свежего login за 10 мин → publishSessionAuthFailed→MarkCredentialFailing(Credentials). - Rate limit:
DataBucket— skip + backoff;AuthBucket— wait. - Async-report ожидает более
max_wait→RemoteReportTimedOut→EnrichmentJobFailed. - Parse error / schema violation →
EnrichmentJobFailed+ DLQ alert. - canonical_id уже existed для (supplier_ref, supplier_sku) → не создаём новый offer, идёт update.
- Connector вернул watermarked media, credential имеет
media_unwatermarked→ Offers сохраняет обе версии; canonical-level media выбирается по priority watermarked=false.
Edge cases
| Случай | Поведение |
|---|---|
| API возвращает пустой ответ | OfferObservationRecorded со stock=0 (не drop) — отслеживаем «исчезновение». Через N таких observations подряд → OfferLifecycleStatusChanged=discontinued. |
| Несколько credentials видят один offer | OfferSourceCredentialAdded (append-only), one offer canvas — multiple observations. |
SupplyChainTrace зависит от ещё не созданного supplier (race) | RecomputeTrace делает best-effort + ставит follow-up через RematchRequested. Daily scheduled rebuilder фиксирует расхождения. |
Credential перешла в failing посреди job | Job завершается тем, что успело; новые job этой credential — skip до Validated. |
| Поставщик вернул один SKU дважды в одном payload | Идемпотентность: второй вход нодаёт duplicate observation за тот же observed_at (UNIQUE constraint per (offer_id, credential_id, seller_ref, observed_at)). |
| Marketplace: seller пропал из payload | Policy N missed fetches → SellerStatusChanged{status=inactive}. Observations seller’а перестают попадать в Pricing tie-break (invariant 12 контекста Pricing). |
| Marketplace: seller сменил external_seller_id (ребрендинг) | Детектируется heuristic’ой (совпадение legal_name + rating history) → SellerProposalOpened в Moderation. Автоматического merge нет — рискованно. |
| Scraper: markup drift | Parser сверяет с baseline; нарушение → ConnectorError.Permanent, alert connector_markup_drift_total. Observation не записывается (fail-fast). |
| Scraper: 429 / 503 Retry-After | Connector возвращает ConnectorError.RateLimited + поле retry_after_ms. RateLimiter учитывает Retry-After поверх своего бюджета. |
| Scraper: headless browser crash | Chromium pod restart’ится (liveness probe); job — Transient, retry с backoff. После 3 подряд — EnrichmentJobFailed + alert. |
Инварианты сценария
- Ни один payload не теряется без следа (либо успешно обработан, либо в DLQ с alert).
- Connector никогда не выбирает credential сам — только
CredentialRouter. - Observation append-only: после записи — никогда не меняется.
- На клиента ответ не блокируется ожиданием поставщика. Customer-triggered jobs имеют
priority=high, но customer не ждёт результат — получает обновление через event/webhook/SSE.
Метрики и observability
enrichment_jobs_total{supplier, kind, status}— counters.enrichment_job_latency_seconds{supplier, kind}— histogram.connector_request_total{supplier, endpoint, outcome}—success/auth_failed/rate_limited/timeout.outbox_lag_seconds— задержка публикацииRawPayloadStored(см. ADR-0020).- Alerts:
outbox_lag > 10s,auth_failure_rate{supplier} > 5%,dlq_size > 0.
Связанные файлы
- Контексты:
../contexts/ingestion.md,../contexts/offers.md,../contexts/credentials.md,../contexts/supplier-network.md,../contexts/matching.md. - Архитектура:
../../20-architecture/integration-patterns.md,../../20-architecture/event-sourcing.md. - ADR-0008 (двухуровневый rate limiter), ADR-0011 (no-proxy), ADR-0014 (graceful degradation), ADR-0017 (event-driven supply chain), ADR-0020 (outbox SLA), ADR-0024 (supplier connector contract), ADR-0025 (observation extensions).