Сценарий: синхронизация справочников поставщика

NOTE

Статус: Target design. Документ описывает целевую доменную модель. Соответствующий код реализован частично (см. backend/internal/core/) или пока не начат. Правила маркировки — в 50-processes/documentation-standard.md.

Триггер

Один из:

  • Schedule — еженедельный (kind=manufacturer_dictionary_refresh / characteristic_dictionary_refresh / classification_dictionary_refresh / taxonomy_dictionary_refresh), ежедневный (client_sku_map_import, distributor_office_refresh).
  • Event: SupplierCredentialActivated → сразу client_sku_map_import (если connector имеет dictionary_exports capability).
  • Event: CapabilitiesChanged для connector (новая версия задекларировала доступ к dictionary endpoints).
  • User: operator в admin UI «Обновить справочник ETM».
  • Incremental cursor advance (для поставщиков с cursor support, ADR-0029): поток новых записей dictionary приходит inline с прочими данными, отдельного «refresh job» может не потребоваться.

Federated taxonomy sync (ADR-0028) имеет отдельный подпоток:

  • JobKind=taxonomy_dictionary_refresh{standard=etim_v7} — загружает официальный ETIM dump (от ETIM International) в TaxonomyCharacteristicMapping / TaxonomyClassMapping. Это не supplier-specific — dump один для всей системы.
  • Supplier-side ETIM export (DKC /etim/*) — cross-check против Tracium canonical registry. Расхождения → Moderation case etim_drift_detected.

Участники

BCРоль
IngestionOwner процесса fetch’а. kind=*_dictionary_refresh jobs.
CredentialsВыдаёт CredentialContext с feature dictionary_exports. Хранит ClientSkuMap как aggregate.
CatalogOwner aggregate’ов SupplierManufacturerMapping, SupplierCharacteristicMapping, SupplierClassificationMapping. Merge входящих записей.
ModerationAI-агенты manufacturer_alias_proposal, characteristic_mapping_proposal, classification_mapping_proposal — для неоднозначных кейсов.
Supplier NetworkМожет получить новые Supplier{role=manufacturer} узлы при первом обнаружении.
ExternalDictionary endpoint поставщика (ETM: GET /info/search/r-manuf/, SgGds для client SKU, опционально — proprietary endpoints для char/class словарей).

Sequence diagram

sequenceDiagram
    autonumber
    participant SCH as Schedule / CapabilitiesChanged
    participant ING as Ingestion
    participant CR as Credentials
    participant API as 🌐 Supplier dictionary API
    participant S3 as S3 (raw)
    participant CAT as Catalog
    participant MOD as Moderation
    participant SN as Supplier Network

    SCH->>ING: 🟦 EnqueueEnrichmentJob{kind=manufacturer_dictionary_refresh}
    ING-->>ING: 🟧 EnrichmentJobEnqueued
    ING->>CR: ForSystem(supplier) / ForCustomer
    CR-->>ING: CredentialContext (features ∋ dictionary_exports)
    ING->>API: GET dictionary endpoint
    API-->>ING: rows[]
    ING->>S3: raw dictionary snapshot
    ING-->>ING: 🟧 RawPayloadStored{payload_type=dictionary_manufacturer}
    ING-->>CAT: 🟧 RawPayloadStored
    CAT->>CAT: 🟦 ImportManufacturerDictionary
    loop по строкам
        CAT->>CAT: lookup existing SupplierManufacturerMapping{supplier_ref, supplier_mnf_code}
        alt mapping есть + name совпал
            CAT-->>CAT: no-op
        else mapping есть + name changed
            CAT-->>MOD: 🟧 ManufacturerAliasProposalCreated
        else mapping нет + Manufacturer с таким name существует
            CAT-->>CAT: 🟧 SupplierManufacturerMappingAdded{source=imported, confidence=auto}
        else mapping нет + Manufacturer отсутствует
            CAT-->>MOD: 🟧 ManufacturerProposalCreated (create Manufacturer + mapping)
        end
    end
    MOD-->>MOD: Agent verdict
    MOD-->>CAT: 🟦 ApplyModerationDecision
    CAT-->>CAT: 🟧 SupplierManufacturerMappingAdded / ManufacturerCreated
    CAT-->>SN: 🟧 ManufacturerCreated
    SN->>SN: auto-link Supplier{kind=manufacturer} (если нет)
    CAT-->>ING: ack
    ING-->>ING: 🟧 EnrichmentJobCompleted

Шаги

  1. Scheduled trigger (или activate event) → EnqueueEnrichmentJob{kind=<dict_kind>, target=supplier_id, credential_context}.
  2. Dedup: pending/running job того же kind за последние 24ч → EnrichmentJobDeduplicated.
  3. Credential acquire: ForSystem(supplier) или ForCustomer(customer, supplier) для client-specific экспортов. Требуется CredentialFeatures ∋ dictionary_exports.
  4. Connector.FetchDictionary (sync или async-report паттерн):
    • Manufacturer dictionary (ETM: GET /info/search/r-manuf/) — sync.
    • Client SKU map (ETM: POST /job/create → GET /job/{uuid} → download file) — async-report, job проходит через awaiting_report → fetching → completed.
  5. RawPayloadStored с payload_type ∈ {dictionary_manufacturer, dictionary_characteristic, dictionary_classification, client_sku_map}.
  6. Catalog consumer (для первых трёх) / Credentials consumer (для client_sku_map):
    • Для manufacturer: merge в SupplierManufacturerMapping. Новые Manufacturer — publish ManufacturerProposalCreated → Moderation (manufacturer_alias_proposal).
    • Для characteristic: merge в SupplierCharacteristicMapping. Неоднозначные mappings (например, supplier отдал два разных char_code для одинакового name, или один code изменил семантику) → CharacteristicMappingProposalOpened → Moderation.
    • Для classification: merge в SupplierClassificationMapping (multi-tag). Иерархия поставщика распаковывается в plain ClassificationTag set.
    • Для client_sku_map: merge в ClientSkuMap{customer_ref, supplier_ref, credential_ref}. Коллизии (same client_sku → different supplier_sku) → fallback на observed_in_payload + audit.
  7. Moderation outcomes: AgentDecisionEmittedApplyModerationDecision → Catalog команда (AddManufacturerAlias, AcceptCharacteristicMapping, …). Source BC применяет + ack DecisionApplied.
  8. Supplier Network side effect: ManufacturerCreated триггерит auto-link Supplier{kind=manufacturer} с manufacturer.supplier_ref.
  9. Downstream invalidation: catalog.mappings.v1 event → Matching / Enrichment invalidate кеши по supplier_ref.
  10. Job completion: EnrichmentJobCompleted{kind=<dict>, stats: {added, updated, proposed, conflicts}}.

Decision points

  • Словарь изменился частично — diff применяется построчно, не целиком. Удалённые записи не очищают существующий mapping (policy: mapping постоянен, supplier мог ошибочно удалить из справочника).
  • Supplier отдал новый char_code, но имя совпадает с существующим Characteristic — auto-merge с MappingSource.imported, MappingConfidence.method=name_match.
  • Характеристика требует unit_override (например, supplier отдаёт «мм», Tracium ожидает канонический unit «m») — оператор/AI задаёт unit_override через Moderation.
  • Client SKU map содержит коды, уже наблюдавшиеся в raw payloads (через SkuIdentityPack.client_sku) — consolidate, verified_at обновляется.
  • Async-report SgGds таймаутитRemoteReportTimedOut, retry через 1 час, на 3-м таймауте — alert + EnrichmentJobFailed до ручного разбора.

Edge cases

СлучайПоведение
Dictionary endpoint временно недоступенTransient → exponential backoff; retry до max_wait — далее job остаётся queued до следующего schedule.
Supplier вернул дубликаты одного supplier_code с разным nameПоследняя встреченная — primary. Предыдущие — сохраняются в aliases[] aggregate.
Новый Manufacturer конфликтует с существующим по normalized namemanufacturer_alias_proposal → Moderation. Auto-merge только при polish-string equality + brand_code совпадает.
Client SKU map содержит коды, ссылающиеся на supplier_sku, которого нет в TraciumOK: mapping сохраняется, reverse-lookup работает; при первом observation для этого supplier_sku уже будет client_sku.
Характеристика mapping изменилась (supplier переопределил char_code=55 с “Тип изделия” на “Thread type”)CharacteristicMappingProposalOpened(kind=semantic_drift) → Moderation с высокой severity; автоматический merge запрещён, т.к. может сломать существующие canonicals.
Classification дерево поставщика стало глубже (было 3 уровня, стало 4)Multi-tag расширяется до всех 4 уровней, но IdentityProfile / EquivalenceClass не перестраиваются без ADR.
client_sku_map_import возвращает file 1GB+Потоковый парсер ndjson + пагинация записи ClientSkuMappingAdded батчами по 1000 записей. EnrichmentJobPartial допустим при частичной обработке.

Инварианты сценария

  1. Dictionary refresh никогда не удаляет существующие mappings — только добавляет/обновляет. Удаление — только через явный operator command + audit.
  2. AI-агент manufacturer_alias_proposal / characteristic_mapping_proposal / classification_mapping_proposal не имеет прямого write-доступа к aggregate’ам. Только через ProposedAction + ack-loop.
  3. ClientSkuMap хранится в scope (customer_ref, supplier_ref, credential_ref). Member’ы одной SupplierCredentialGroup не делят свои ClientSkuMap автоматически (privacy invariant).
  4. SupplierCharacteristicMapping.unit_override обязателен, если Characteristic.default_unit ≠ единице, в которой supplier отдаёт значение.
  5. Import никогда не падает полностью из-за одной строки — unrecoverable rows идут в DLQ с {row_context, raw_payload_ref}.

Метрики и observability

  • dictionary_sync_runs_total{supplier, kind, status} — counters.
  • dictionary_sync_latency_seconds{supplier, kind} — histogram.
  • dictionary_mappings_added_total{supplier, kind}, dictionary_mappings_updated_total.
  • dictionary_moderation_proposals_total{supplier, kind, agent_verdict} — сколько ушло в Moderation и чем завершилось.
  • client_sku_map_size{customer, supplier} — текущий размер.
  • Alerts:
    • dictionary_sync_not_succeeded_hours > 168 (неделя без успеха) → page.
    • dictionary_moderation_proposals_total{kind=char} всплеск > 3σ → review (возможный breaking change в справочнике поставщика).
    • client_sku_map_size резко упал > 20% → investigate (supplier мог ошибочно отдать пустой экспорт).

Связанные файлы