Сценарий: обновления и определение разницы

NOTE

Статус: Target design. Документ описывает целевую доменную модель. Соответствующий код реализован частично (см. backend/internal/core/) или пока не начат. Правила маркировки — в 50-processes/documentation-standard.md.

Триггер

  • Schedule (типичный refresh interval per supplier).
  • TTL истёк для observation.
  • User-инициированный refresh.
  • CustomerEstimateRequested для линий конкретной сметы.
  • SupplierRelationshipChanged (Supplier Network) → rebuild trace + потенциально rematch.
  • IdentityProfileChanged (Catalog) → rematch.

Участники

BCРоль
IngestionПерезапрос данных.
OffersAppend observation; вычисление diff к предыдущему.
MatchingВозможный rematch.
CatalogОбновление характеристик canonical, если match=exact/strong.
Pricing / Search / EstimateInvalidate caches / projections / mark estimates stale.

Sequence diagram

sequenceDiagram
    autonumber
    participant SCH as Schedule/Trigger
    participant ING as Ingestion
    participant OFR as Offers
    participant DIFF as Offers Diff Engine
    participant CAT as Catalog
    participant M as Matching
    participant P as Pricing
    participant S as Search
    participant E as Estimate

    SCH->>ING: 🟦 EnqueueEnrichmentJob{kind=refresh_observation}
    ING->>OFR: 🟧 RawPayloadStored
    OFR->>OFR: 🟦 RecordObservation
    OFR-->>OFR: 🟧 OfferObservationRecorded (новый)
    OFR->>DIFF: compare с предыдущим max(observed_at)
    alt цена изменилась
        DIFF-->>P: 🟧 OfferPriceChanged (derived)
        DIFF-->>E: 🟧 OfferPriceChanged
    end
    alt stock изменился
        DIFF-->>S: 🟧 OfferStockChanged (derived)
    end
    alt characteristics изменились
        DIFF-->>OFR: 🟧 OfferCharacteristicsUpdated
        OFR-->>M: 🟧 OfferCharacteristicsUpdated
        M->>M: 🟦 AttemptMatch (single rematch)
        alt match=exact/strong
            M-->>CAT: 🟧 MatchDecided
            CAT->>CAT: 🟦 ОбновитьХарактеристикиТовара
            CAT-->>S: 🟧 CanonicalCharacteristicsChanged
        end
    end
    alt lifecycle changed (поставщик пометил discontinued)
        OFR-->>OFR: 🟧 OfferLifecycleStatusChanged
        alt последний active offer для canonical
            OFR-->>CAT: policy → 🟦 ChangeLifecycleStatus(deprecated)
            CAT-->>S: 🟧 CanonicalLifecycleStatusChanged
        end
    end
    P-->>P: invalidate cache (canonical_id)
    S-->>S: 🟦 ReindexCanonical
    E-->>E: mark estimate stale

Шаги

  1. Job создаётся — обычно scheduled.

  2. Connector опрашивает API, пишет RawPayload. Точно так же, как в ingestion-flow, но на уже существующий offer.

  3. Append observation — новое OfferObservationRecorded. Оригинал предыдущего не тронут.

  4. Diff engine (внутри Offers) сравнивает поля:

    • price_* отличается → OfferPriceChanged (derived).
    • stock_by_warehouse[] отличается → OfferStockChanged (derived).
    • characteristics / packaging / mediaOfferCharacteristicsUpdated.
    • lifecycle_statusOfferLifecycleStatusChanged.

    Derived events не источник истины — источник OfferObservationRecorded. Но downstream-системам удобно subscribe’иться на конкретное изменение.

  5. Rematching (Policy OfferCharacteristicsUpdated): single rematch для этого offer.

    • Если match upgraded (e.g. появился новый second signal) → MatchUpgraded.
    • Если match downgraded (новые данные противоречат) → MatchDowngraded + alert.
  6. Catalog update (если match=exact/strong): новые characteristics из offer применяются к canonical через ОбновитьХарактеристикиТовара. Conflict resolution rules (см. characteristics.md) — приоритет источников.

  7. Cascade lifecycle: если offer стал discontinued и был последним active для canonical → ChangeLifecycleStatus(deprecated) для canonical.

  8. Downstream invalidation:

    • Pricing: invalidate cache для затронутых (canonical_id, customer_id).
    • Search: ReindexCanonical (partial update — только изменившиеся поля).
    • Estimate: пометить смет(ы), содержащие этот canonical, флагом stale (breakdown сохраняется, но клиент видит «нужен пересчёт»).

Decision points

  • Diff = none → ничего не публикуется (оптимизация).
  • Match downgraded → не auto-применяем к canonical; ставим ModerationItemAdded.
  • pricing_mode сменился fixed → on_request → особый случай: для всех смет с этой позицией показать пометку.
  • Discontinued + есть replaced_by от поставщика → Catalog получает hint и предлагает оператору set ReplacedBySet.

Edge cases

СлучайПоведение
Поставщик прислал observed_at в прошлом ( max(existing))Все равно append (out-of-order observations допускаются), но не считается current. Логируется.
Поставщик «потерял» SKU из выдачиПосле N consecutive opaqness — синтетическое OfferObservationRecorded{stock=0, pricing_mode=on_request}. Если N+M прошло — OfferLifecycleStatusChanged=discontinued.
Critical attribute изменилсяЭто breaking для identity_signature. Offer может перестать матчиться к этому canonical → MatchDowngraded → moderation.
Множественные observations за короткое время от разных credentialsOK, append-only их различает по (credential_id, observed_at). Pricing использует best per algorithm.
Smoothing для частых апдейтов ценыОпционально: throttle derived OfferPriceChanged если delta < threshold (config).

Инварианты сценария

  1. Append-only observation: предыдущие никогда не изменяются и не удаляются (только архивация старых).
  2. Idempotency: повторная обработка того же RawPayload не создаёт duplicate OfferObservationRecorded (UNIQUE по (offer_id, credential_id, observed_at)).
  3. Любое изменение в Catalog characteristics через update от offer проходит conflict resolution (см. characteristics.md).
  4. Estimate с этим canonical не «портится» автоматически — только маркируется stale; решение пересчитать — за клиентом.

Метрики и observability

  • offer_observations_recorded_total{supplier}.
  • offer_price_changed_total{supplier}, offer_stock_changed_total{supplier}.
  • match_downgrades_total{from, to} — alert если spike.
  • canonical_characteristics_changed_total{canonical_id, source}.
  • estimates_marked_stale_total{customer_id}.

Связанные файлы