Сценарий: поступление данных от поставщика

NOTE

Статус: Target design. Документ описывает целевую доменную модель. Соответствующий код реализован частично (см. backend/internal/core/) или пока не начат. Правила маркировки — в 50-processes/documentation-standard.md.

Триггер

Один из:

  • Schedule — cron «опросить поставщика X каждые 6 ч».
  • User — клиент инициировал refresh через UI / API.
  • System eventCustomerEstimateRequested (high-priority refresh для линий сметы), SupplierCredentialActivated (discovery новой credential), CanonicalCreated (попросить других поставщиков).

Участники

BCРоль
IngestionOwner процесса. Выбор credential, опрос API, сохранение raw, парсинг.
CredentialsВыдаёт CredentialContext через router. Может перейти в failing при auth-ошибке.
Supplier NetworkПолучает SupplierOfferSeen → пересчитывает SupplyChainTrace.
OffersПринимает RawPayloadStored → строит SupplierOffer + Observation.
MatchingСлушает SupplierOfferSeen / OfferCharacteristicsUpdatedAttemptMatch.
ExternalAPI поставщика.

Sequence diagram

sequenceDiagram
    autonumber
    participant SCH as Schedule/User/System
    participant ING as Ingestion
    participant CR as Credentials
    participant API as 🌐 Supplier API
    participant S3 as S3 (raw)
    participant OFR as Offers
    participant SN as Supplier Network
    participant M as Matching

    SCH->>ING: 🟦 EnqueueEnrichmentJob
    ING-->>ING: 🟧 EnrichmentJobEnqueued
    Note over ING: 🟪 dedup policy
    ING->>CR: NextForCatalog / ForCustomer / ForSystem
    CR-->>ING: 🟦 CredentialContext
    ING->>ING: 🟧 EnrichmentJobStarted
    ING->>API: HTTP fetch (with auth)
    API-->>ING: response
    ING->>S3: write raw payload
    ING-->>ING: 🟧 RawPayloadStored
    par publish PL
        ING-->>OFR: 🟧 RawPayloadStored
        ING-->>SN: 🟧 RawPayloadStored
    end
    OFR->>OFR: 🟦 IngestRawPayload (parse)
    OFR-->>OFR: 🟧 SupplierOfferSeen<br/>(или OfferCharacteristicsUpdated)
    OFR->>OFR: 🟦 RecordObservation
    OFR-->>OFR: 🟧 OfferObservationRecorded
    OFR-->>M: 🟧 SupplierOfferSeen
    OFR-->>SN: 🟧 SupplierOfferSeen
    SN->>SN: 🟦 RecomputeTrace
    SN-->>SN: 🟧 SupplyChainTraceRecomputed
    SN-->>OFR: SupplyChainTrace VO snapshot
    M->>M: 🟦 AttemptMatch
    M-->>M: 🟧 MatchDecided
    ING->>ING: 🟧 EnrichmentJobCompleted

Шаги

  1. Job создаётся (Schedule / user / system event). Команда EnqueueEnrichmentJob{kind, target, credential_context}.
    • Для connectors с Capabilities.IncrementalSync.Supported=true (DKC, Systeme) — primary path kind=incremental_sync c target.from_cursor из IncrementalSyncCursor aggregate. full_catalog — только fallback при broken cursor или seed.
    • Для file_feed (КЭАЗ) — kind=full_catalog с conditional If-Modified-Since / URL-timestamp проверкой (если not modified → EnrichmentJobSkipped{reason=not_modified}).
  2. Дедупликация policy: если уже есть pending/running/awaiting_report/fetching для тех же (kind, target, cred_ctx)EnrichmentJobDeduplicated, присоединиться.
  3. Worker берёт job: StartJobEnrichmentJobStarted. Запрашивает у CredentialRouter CredentialContext (c ключами AuthBucket и DataBucket[endpoint]).
  4. Rate budget check (двухуровневый, ADR-0008):
    • DataBucket[endpoint] пуст → EnrichmentJobSkipped + reschedule с backoff. Никаких drop’ов.
    • AuthBucket пуст → блокирующее ожидание (не fail). Нужен для восстановления session.
  5. Session acquire: SessionManager.Acquire. Если нет живой session — login (через AuthBucket). Проактивный refresh при remaining_ttl < min(5min, TTL/10).
  6. Connector.Fetch (sync endpoint) или Connector.SubmitRemoteJob (async-report endpoint):
    • Sync: HTTP-запрос, получен response.
    • Async-report (например, ETM SgGds): POST create → RemoteJobRef{uuid}RemoteReportRequested, job → awaiting_report. PollRemoteJob по PollStrategy (списываются токены из DataBucket[async_poll]). state=readyRemoteReportReady + fetching. max_wait истёк → RemoteReportTimedOut + EnrichmentJobFailed.
  7. Error normalization: connector нормализует ответ в ConnectorError (ADR-0024).
    • SessionExpired → re-login, retry один раз.
    • AuthRejected (после свежего login) → SessionAuthFailed; три подряд за 10 мин → MarkFailing.
    • NotFound → пустой observation-маркер (товар исчез), не fail.
    • OnRequest → observation с PriceSet.on_request=true.
    • RateLimitedEnrichmentJobSkipped + backoff.
    • Transient → exponential backoff.
    • Permanent / parse error → EnrichmentJobFailed + DLQ.
  8. Parse + Save: каждая RawPayload (с полным SkuIdentityPack) пишется в S3 (immutable), генерится RawPayloadStored.
  9. Offers consumer (Policy RawPayloadStored):
    • IngestRawPayload → создаёт/обновляет SupplierOffer. Если (supplier_ref, supplier_sku) нов — SupplierOfferSeen. Иначе — OfferCharacteristicsUpdated / OfferMediaUpdated (по содержимому diff).
    • Если Capabilities.Marketplace.IsMarketplace=true (ADR-0026): один payload содержит N seller-rows. Consumer fan-out’ит:
      • для каждого seller: lookup/создать Seller entity (SellerRegistered если новый) → SupplierRelationshipEstablished{kind=listed_on, source=observed}.
      • RecordObservation N раз, каждое с seller_ref: append-only OfferObservationRecorded{offer, credential, seller_ref, prices, stock, ...}. Все N observations имеют одинаковый observed_at (время fetch’а aggregator’а).
      • Seller rating в payload → SellerRatingUpdated (если diff ≥ min_delta).
      • Seller отсутствует в payload > N fetch’ей подряд → SellerStatusChanged{status=inactive}.
    • Если IsMarketplace=false (ETM-like): RecordObservation один раз, seller_ref=null.
    • Diff к предыдущему observation (per (offer, credential, seller_ref)) — derived OfferPriceSetChanged (с маской изменённых полей), OfferStockChanged, OfferForecastUpdated, OfferDeliveryTermChanged.
    • Observations от connector’а с TransportKind=html_scrape (ADR-0027) помечаются reliability=degraded.
    • Multi-currency payload (ADR-0025 extended): если connector отдаёт несколько валют (DKC RUB+KZT) — observation.prices заполняется как MultiCurrencyPrices{"RUB":..., "KZT":...} в одном observation. Не множим observations per currency.
    • External identities (ADR-0028): если payload несёт ETIM class / GTIN / MPN — после MatchDecided для связанного Canonical отправляется команда AssignExternalIdentityExternalIdentityAssigned event в catalog.external_identities.v1. 10a. Incremental sync commit (ADR-0029): при kind=incremental_sync — обработка RawPayloads и cursor update происходят в одной транзакции:
    BEGIN;
      INSERT observations... (UNIQUE подавляет дубли при at-least-once);
      UPDATE incremental_sync_cursor SET cursor_value=<new>, last_applied_at=now() WHERE id=?;
    COMMIT;
    
    Если поставщик вернул 400 unknown cursorIncrementalSyncCursorBroken → auto-fallback на full_catalog.
  10. Supplier Network consumer (Policy SupplierOfferSeen / change events): RecomputeTraceSupplyChainTraceRecomputed. Snapshot (включая warehouse_chain) вкладывается обратно в SupplierOffer.supply_chain_trace. Если connector вернул InfoSuppStores-подобный payload с новым складом — auto-создание Warehouse{kind=manufacturer_warehouse, trust_level=origin}.
  11. Matching consumer (Policy SupplierOfferSeen / OfferCharacteristicsUpdated): AttemptMatchMatchDecided (см. matching-flow.md). Характеристики проходят через SupplierCharacteristicMapping; неизвестные supplier_char_code — в Moderation.
  12. Job completion: EnrichmentJobCompleted / EnrichmentJobPartial. Если кто-то подписан (webhook / SSE) — push-нотификация.

Decision points

  • Дедуп найден → join вместо нового запроса.
  • SessionExpired (ETM 403) → re-login + retry один раз. Не считается AuthRejected.
  • Три подряд AuthRejected после свежего login за 10 мин → publish SessionAuthFailedMarkCredentialFailing (Credentials).
  • Rate limit: DataBucket — skip + backoff; AuthBucket — wait.
  • Async-report ожидает более max_waitRemoteReportTimedOutEnrichmentJobFailed.
  • Parse error / schema violationEnrichmentJobFailed + DLQ alert.
  • canonical_id уже existed для (supplier_ref, supplier_sku) → не создаём новый offer, идёт update.
  • Connector вернул watermarked media, credential имеет media_unwatermarked → Offers сохраняет обе версии; canonical-level media выбирается по priority watermarked=false.

Edge cases

СлучайПоведение
API возвращает пустой ответOfferObservationRecorded со stock=0 (не drop) — отслеживаем «исчезновение». Через N таких observations подряд → OfferLifecycleStatusChanged=discontinued.
Несколько credentials видят один offerOfferSourceCredentialAdded (append-only), one offer canvas — multiple observations.
SupplyChainTrace зависит от ещё не созданного supplier (race)RecomputeTrace делает best-effort + ставит follow-up через RematchRequested. Daily scheduled rebuilder фиксирует расхождения.
Credential перешла в failing посреди jobJob завершается тем, что успело; новые job этой credential — skip до Validated.
Поставщик вернул один SKU дважды в одном payloadИдемпотентность: второй вход нодаёт duplicate observation за тот же observed_at (UNIQUE constraint per (offer_id, credential_id, seller_ref, observed_at)).
Marketplace: seller пропал из payloadPolicy N missed fetches → SellerStatusChanged{status=inactive}. Observations seller’а перестают попадать в Pricing tie-break (invariant 12 контекста Pricing).
Marketplace: seller сменил external_seller_id (ребрендинг)Детектируется heuristic’ой (совпадение legal_name + rating history) → SellerProposalOpened в Moderation. Автоматического merge нет — рискованно.
Scraper: markup driftParser сверяет с baseline; нарушение → ConnectorError.Permanent, alert connector_markup_drift_total. Observation не записывается (fail-fast).
Scraper: 429 / 503 Retry-AfterConnector возвращает ConnectorError.RateLimited + поле retry_after_ms. RateLimiter учитывает Retry-After поверх своего бюджета.
Scraper: headless browser crashChromium pod restart’ится (liveness probe); job — Transient, retry с backoff. После 3 подряд — EnrichmentJobFailed + alert.

Инварианты сценария

  1. Ни один payload не теряется без следа (либо успешно обработан, либо в DLQ с alert).
  2. Connector никогда не выбирает credential сам — только CredentialRouter.
  3. Observation append-only: после записи — никогда не меняется.
  4. На клиента ответ не блокируется ожиданием поставщика. Customer-triggered jobs имеют priority=high, но customer не ждёт результат — получает обновление через event/webhook/SSE.

Метрики и observability

  • enrichment_jobs_total{supplier, kind, status} — counters.
  • enrichment_job_latency_seconds{supplier, kind} — histogram.
  • connector_request_total{supplier, endpoint, outcome}success / auth_failed / rate_limited / timeout.
  • outbox_lag_seconds — задержка публикации RawPayloadStored (см. ADR-0020).
  • Alerts: outbox_lag > 10s, auth_failure_rate{supplier} > 5%, dlq_size > 0.

Связанные файлы