Реестр Kafka-топиков

NOTE

Статус: Target schemas. Описание целевых схем данных. Миграции и реальные таблицы в backend/migrations/ могут отставать от документа. Правила маркировки — в 50-processes/documentation-standard.md.

Полный каталог топиков, их конфигурации, producer’ов и consumer’ов.

Phase 1-a registry (iter-1 state)

Phase 1-a ingestion emits only one topic; остальные declared so Phase 2 consumers могут compile-time зафиксировать subjects против outbox- агностичного producer’а (ADR-0020).

Каждое сообщение несёт outbox-envelope заголовки: message_id (32-hex), occurred_at (RFC3339Nano UTC), schema_version. Consumers ОБЯЗАНЫ deduplicate по message_id.

TopicProducerPayload versionPurposeIter-1 state
raw.supplier.etm.payload.v1supplier-sync1MinIO raw-payload references per SKU + kinddeclared, not emitted iter-1
offer.observation.v2supplier-sync2One row per (offer, credential) observation per tickemitted iter-1 by offers.UpsertService
offer.price_set_changed.v1supplier-sync1Diff signal when PriceSet changes between ticksdeclared, not emitted iter-1
offer.forecast_changed.v1supplier-sync1Diff signal for StockForecastdeclared, not emitted iter-1
canonical.events.v1api-server / supplier-sync1Canonical lifecycle (created/updated/merged)declared, not emitted iter-1

offer.observation.v2 — iter-1 canonical payload

{
  "offer_id": "uuid",
  "supplier": "etm",
  "supplier_sku": "ABC123",
  "canonical_id": "uuid",
  "credential_ref": "etm-system",
  "observed_at": "2026-04-21T10:00:00Z"
}

Partition key: canonical_id — всё observation для одного canonical продукта ложится в одну партицию, так consumers поддерживают per-product ordering без реперегрузок. Целевой full envelope (prices, stock_*, delivery_term, source_outcome, reliability) описан ниже в секции offer.observation.v2; iter-1 публикует подмножество.

Деferred топики

Declared-but-unused топики зафиксированы здесь чтобы Phase 2 consumers могли bind schema registry subjects без вмешательства в iter-1 producer (ADR-0020 outbox остаётся agnostic к числу consumer’ов).

Governance

  • Новые имена топиков ДОЛЖНЫ следовать <domain>.<aggregate>.<event>.v<N> per ADR-0010.
  • Version bump (v1 → v2) требует создать НОВЫЙ топик, не переписывать payload существующего — правила cutover consumer’ов в ADR-0020.

Правила именования

  • Формат: <domain>.<entity>.<action>.v<N>.
  • Примеры: offer.price.changed.v1, canonical.events.v1.
  • Только lowercase, без дефисов, через точки и подчёркивания.

Каталог

raw.supplier.{supplier_id}.payload.v1

  • Назначение: сырые payload’ы от поставщика (после fetch, до нормализации).
  • Ключ партиции: supplier_sku (чтобы гарантировать порядок по товару).
  • Compaction: нет.
  • Retention: 30 дней.
  • Партиции: 12 на старте.
  • Producer: connectors/<n>.
  • Consumer: core/normalizer.

offer.normalized.v1

  • Назначение: нормализованные offer’ы (после парсинга).
  • Ключ: supplier_id:supplier_sku.
  • Compaction: нет.
  • Retention: 14 дней.
  • Партиции: 24.
  • Producer: core/normalizer.
  • Consumer: core/matching.

offer.price_changed.v1 (DEPRECATED — use offer.price_set_changed.v1)

  • Статус: deprecated ADR-0025. Dual-write до миграции consumers; удаляется через 30 дней после полной миграции.

offer.price_set_changed.v1

  • Назначение: derived событие об изменении PriceSet (любое из net / gross / list_tarif / retail_rec / vat_rate / on_request).
  • Ключ: supplier_id:supplier_sku:credential_id.
  • Compaction: нет.
  • Retention: 7 дней.
  • Партиции: 24.
  • Payload: {previous_price_set, new_price_set, changed_fields: [...], observed_at}.
  • Consumer: core/pricing-projection, clickhouse-sink (price_history).

offer.stock_changed.v1

  • Назначение: изменения текущих остатков offer’а (StockCurrent diff).
  • Ключ: supplier_id:supplier_sku.
  • Compaction: нет.
  • Retention: 7 дней.
  • Партиции: 24.
  • Consumer: core/offers-projection, clickhouse-sink.

offer.forecast_changed.v1

  • Назначение: изменения StockForecast (incoming / in_way / backorders).
  • Ключ: supplier_id:supplier_sku:warehouse_ref.
  • Compaction: нет.
  • Retention: 14 дней.
  • Партиции: 12.
  • Consumer: core/estimate-optimizer (lead_time), core/search-projection (фильтр «скоро будет»), clickhouse-sink.

offer.delivery_term_changed.v1

  • Назначение: изменения DeliveryTerm.
  • Ключ: supplier_id:supplier_sku.
  • Compaction: нет.
  • Retention: 14 дней.
  • Партиции: 12.
  • Consumer: core/pricing-projection (logistics rules), core/estimate-optimizer.

offer.observation.v1 (DEPRECATED — use offer.observation.v2)

  • Статус: deprecated ADR-0025. Dual-write до миграции consumers; удаление через 30 дней после.

offer.observation.v2

  • Назначение: новое observation по тройке (SupplierOffer, SupplierCredential, Seller?) — снимок PriceSet + StockCurrent + StockForecast + DeliveryTerm после очередного fetch. Для marketplace-поставщиков один fetch публикует N сообщений (по одному на seller’а); для обычных — одно сообщение с seller_ref=null.
  • Ключ: supplier_id:supplier_sku:credential_id:seller_ref (seller_ref = none для non-marketplace).
  • Compaction: нет.
  • Retention: 14 дней.
  • Партиции: 24.
  • Producer: core/offers (после нормализации observation).
  • Consumer: core/pricing (кэш-инвалидация), public-api ws server (push), clickhouse-sink (price_history / stock_history / forecast_history).
  • Payload envelope: {offer_ref, credential_ref, seller_ref?, observed_at, prices: MultiCurrencyPrices (map<ISO4217, PriceSet>), stock_current: StockCurrent, stock_forecast?: StockForecast, delivery: DeliveryTerm, reliability: regular|degraded, supply_chain_trace_ref}. PriceSet может иметь validity_window для promo периодов (ADR-0025).
  • Замечание: при стабильных полях допускается дедупликация на стороне producer’а — повторное observation без изменений можно не публиковать в этот топик, но всегда писать derived topic’и (offer.price_set_changed.v1 / offer.stock_changed.v1 / offer.forecast_changed.v1 / offer.delivery_term_changed.v1) при фактических изменениях.
  • Marketplace specifics (ADR-0026): каждый seller — отдельное сообщение. Partition key гарантирует order per (offer, seller), не across sellers.

canonical.events.v1

  • Назначение: все события canonical product (для projections + подписчиков). Включает MediaAssetAdded/Removed, CanonicalMediaChanged.
  • Ключ: canonical_id.
  • Compaction: да.
  • Retention: бесконечно (compacted).
  • Партиции: 24.
  • Producer: core/catalog (через outbox relay).
  • Consumer: core/search-projection (ES), core/offers-matching, clickhouse-sink.

catalog.mappings.v1

  • Назначение: события ACL-маппингов SupplierCharacteristicMapping / SupplierClassificationMapping / SupplierManufacturerMapping.
  • Ключ: supplier_ref.
  • Compaction: да.
  • Retention: бесконечно (compacted).
  • Партиции: 12.
  • Producer: core/catalog (через outbox relay).
  • Consumer:
    • core/matching — invalidate decisions по затронутым offer’ам.
    • core/enrichment — invalidate extractor cache.
    • core/search-projection — обновление переведённых classification tags.

catalog.taxonomies.v1

  • Назначение: события federated identity standards: IdentityStandardRegistered/Changed/Deprecated, TaxonomyCharacteristicMappingAdded/Changed/Removed, TaxonomyClassMappingAdded/Changed/Removed, TaxonomyValueMappingAdded/Changed/Removed. ADR-0028.
  • Ключ: standard_ref (для standard events) или (standard_ref, external_code) (для mapping events).
  • Compaction: да (current state per standard / mapping).
  • Retention: бесконечно.
  • Партиции: 6.
  • Producer: core/catalog.
  • Consumer:
    • core/matching — invalidate при изменении mapping’а.
    • core/enrichment — пересчёт extractor для затронутых features.
    • admin-ui api — preview changes для operator.

catalog.external_identities.v1

  • Назначение: ExternalIdentityAssigned / ExternalIdentityRevoked на CanonicalProduct. ADR-0028.
  • Ключ: canonical_id.
  • Compaction: да (как часть canonical state, но отдельный поток для streamed matching).
  • Retention: бесконечно.
  • Партиции: 12.
  • Consumer: core/matching (identity signal), core/search-projection (дополнительные lookup keys — поиск по GTIN/MPN).

supplier.incremental.v1

  • Назначение: события IncrementalSyncCursorAdvanced / Staled / Broken, IncrementalFallbackToFullCatalogTriggered. ADR-0029.
  • Ключ: (supplier_ref, data_scope).
  • Compaction: да (current cursor state).
  • Retention: бесконечно.
  • Партиции: 6.
  • Consumer: admin-ui api (health dashboard), clickhouse-sink (cursor drift analytics), alerting (cursor_stale / cursor_broken).

enrichment.job.events.v2

  • Статус: в дополнение к v1. Schema расширена статусами awaiting_report / fetching и полями remote_job_ref, poll_strategy (ADR-0024). v1 dual-write 30 дней, далее deprecated.
  • Ключ: job_id.
  • Compaction: да.
  • Retention: бесконечно.
  • Партиции: 12.
  • Consumer: public-api ws server, clickhouse-sink, audit.

canonical.snapshots.v1

  • Назначение: periodic snapshots canonical для fast-rebuild проекций.
  • Ключ: canonical_id.
  • Compaction: да.
  • Retention: бесконечно.
  • Партиции: 24.

matching.decided.v1

  • Назначение: результаты матчинга (для аудита, моделей обучения).
  • Ключ: supplier_id:supplier_sku.
  • Compaction: нет.
  • Retention: 30 дней.
  • Партиции: 12.

moderation.requested.v1

  • Назначение: запросы на ручную модерацию (неизвестный алиас, низкий confidence, …).
  • Ключ: request_id.
  • Compaction: нет.
  • Retention: 90 дней.
  • Партиции: 6.

estimate.events.v1

  • Назначение: события сметы (создана, версионирована, оптимизирована).
  • Ключ: estimate_id.
  • Compaction: да.
  • Partition: 12.

supplier.sellers.v1

  • Назначение: события Seller entity (marketplace aggregator sub-node) — SellerRegistered, SellerRatingUpdated, SellerStatusChanged, SellerPromotedToSupplier. ADR-0026.
  • Ключ: aggregator_supplier_id (Partition order per aggregator).
  • Compaction: да (current state per seller).
  • Retention: бесконечно.
  • Партиции: 12.
  • Producer: core/supplier-network.
  • Consumers:
    • core/pricing — invalidate cache при смене rating / status / blocked.
    • core/search-projection — seller-aware фильтры.
    • admin-ui api — live updates.
    • clickhouse-sink — analytics (rating trends).

supplier.graph.events.v1

  • Назначение: все события supplier graph: Supplier, SupplierRole, SupplierRelationship, dictionary kinds.
  • Ключ: supplier_id для events of supplier; relationship_id для relationship events.
  • Compaction: да (для restoration).
  • Retention: бесконечно (compacted).
  • Партиции: 12.
  • Producer: core/catalog (через outbox relay).
  • Consumers:
    • supply-chain-recalculator — пересчёт affected SupplyChainTraces.
    • search-projection — admin search.
    • public-api ws server — admin live notifications.
    • clickhouse-sink — analytics.

Типы событий:

  • SupplierCreated, SupplierUpdated, SupplierDeprecated
  • SupplierRoleAdded, SupplierRoleRemoved, SupplierRoleScopeChanged
  • SupplierRelationshipCreated, SupplierRelationshipUpdated, SupplierRelationshipRevoked
  • SupplierRoleKindAdded, SupplierRoleKindUpdated
  • SupplierRelationshipKindAdded, SupplierRelationshipKindUpdated
  • SupplyChainTraceRecomputed

enrichment.job.events.v1

  • Назначение: события async EnrichmentJob (queued, started, completed, failed).
  • Ключ: job_id.
  • Compaction: да (для текущего состояния).
  • Retention: бесконечно.
  • Партиции: 12.
  • Consumers:
    • public-api ws server — для уведомлений клиентам, подписавшимся на конкретный job.
    • clickhouse-sink — analytics.
    • audit.

Dead-letter топики

Для каждого основного топика — соответствующий DLQ <topic>.dlq:

  • retention 30 дней;
  • метрика: kafka_dlq_messages_total;
  • алерт при появлении сообщений.

Консьюмер-группы

ТопикConsumer groupСервис
raw.supplier.etm.payload.v1normalizernormalization
offer.normalized.v1matchermatching
canonical.events.v1search-projectionsearch
canonical.events.v1ch-catalog-sinkclickhouse-sink
offer.price_changed.v1pricing-projectionpricing
offer.price_changed.v1ch-price-sinkclickhouse-sink

Schema Registry

Рекомендация — использовать Schema Registry (Confluent) даже без Avro (поддерживает JSON Schema). Все изменения — через registry с проверкой compatibility level BACKWARD_TRANSITIVE для всех топиков по умолчанию.