Реестр Kafka-топиков
NOTE
Статус: Target schemas. Описание целевых схем данных. Миграции и реальные таблицы в
backend/migrations/могут отставать от документа. Правила маркировки — в50-processes/documentation-standard.md.
Полный каталог топиков, их конфигурации, producer’ов и consumer’ов.
Phase 1-a registry (iter-1 state)
Phase 1-a ingestion emits only one topic; остальные declared so Phase 2 consumers могут compile-time зафиксировать subjects против outbox- агностичного producer’а (ADR-0020).
Каждое сообщение несёт outbox-envelope заголовки: message_id (32-hex),
occurred_at (RFC3339Nano UTC), schema_version. Consumers ОБЯЗАНЫ
deduplicate по message_id.
| Topic | Producer | Payload version | Purpose | Iter-1 state |
|---|---|---|---|---|
raw.supplier.etm.payload.v1 | supplier-sync | 1 | MinIO raw-payload references per SKU + kind | declared, not emitted iter-1 |
offer.observation.v2 | supplier-sync | 2 | One row per (offer, credential) observation per tick | emitted iter-1 by offers.UpsertService |
offer.price_set_changed.v1 | supplier-sync | 1 | Diff signal when PriceSet changes between ticks | declared, not emitted iter-1 |
offer.forecast_changed.v1 | supplier-sync | 1 | Diff signal for StockForecast | declared, not emitted iter-1 |
canonical.events.v1 | api-server / supplier-sync | 1 | Canonical lifecycle (created/updated/merged) | declared, not emitted iter-1 |
offer.observation.v2 — iter-1 canonical payload
{
"offer_id": "uuid",
"supplier": "etm",
"supplier_sku": "ABC123",
"canonical_id": "uuid",
"credential_ref": "etm-system",
"observed_at": "2026-04-21T10:00:00Z"
}Partition key: canonical_id — всё observation для одного canonical
продукта ложится в одну партицию, так consumers поддерживают per-product
ordering без реперегрузок. Целевой full envelope (prices, stock_*,
delivery_term, source_outcome, reliability) описан ниже в секции
offer.observation.v2; iter-1 публикует подмножество.
Деferred топики
Declared-but-unused топики зафиксированы здесь чтобы Phase 2 consumers могли bind schema registry subjects без вмешательства в iter-1 producer (ADR-0020 outbox остаётся agnostic к числу consumer’ов).
Governance
- Новые имена топиков ДОЛЖНЫ следовать
<domain>.<aggregate>.<event>.v<N>per ADR-0010. - Version bump (v1 → v2) требует создать НОВЫЙ топик, не переписывать payload существующего — правила cutover consumer’ов в ADR-0020.
Правила именования
- Формат:
<domain>.<entity>.<action>.v<N>. - Примеры:
offer.price.changed.v1,canonical.events.v1. - Только lowercase, без дефисов, через точки и подчёркивания.
Каталог
raw.supplier.{supplier_id}.payload.v1
- Назначение: сырые payload’ы от поставщика (после fetch, до нормализации).
- Ключ партиции:
supplier_sku(чтобы гарантировать порядок по товару). - Compaction: нет.
- Retention: 30 дней.
- Партиции: 12 на старте.
- Producer:
connectors/<n>. - Consumer:
core/normalizer.
offer.normalized.v1
- Назначение: нормализованные offer’ы (после парсинга).
- Ключ:
supplier_id:supplier_sku. - Compaction: нет.
- Retention: 14 дней.
- Партиции: 24.
- Producer:
core/normalizer. - Consumer:
core/matching.
offer.price_changed.v1 (DEPRECATED — use offer.price_set_changed.v1)
- Статус: deprecated ADR-0025. Dual-write до миграции consumers; удаляется через 30 дней после полной миграции.
offer.price_set_changed.v1
- Назначение: derived событие об изменении
PriceSet(любое изnet / gross / list_tarif / retail_rec / vat_rate / on_request). - Ключ:
supplier_id:supplier_sku:credential_id. - Compaction: нет.
- Retention: 7 дней.
- Партиции: 24.
- Payload:
{previous_price_set, new_price_set, changed_fields: [...], observed_at}. - Consumer:
core/pricing-projection,clickhouse-sink(price_history).
offer.stock_changed.v1
- Назначение: изменения текущих остатков offer’а (
StockCurrentdiff). - Ключ:
supplier_id:supplier_sku. - Compaction: нет.
- Retention: 7 дней.
- Партиции: 24.
- Consumer:
core/offers-projection,clickhouse-sink.
offer.forecast_changed.v1
- Назначение: изменения
StockForecast(incoming / in_way / backorders). - Ключ:
supplier_id:supplier_sku:warehouse_ref. - Compaction: нет.
- Retention: 14 дней.
- Партиции: 12.
- Consumer:
core/estimate-optimizer(lead_time),core/search-projection(фильтр «скоро будет»),clickhouse-sink.
offer.delivery_term_changed.v1
- Назначение: изменения
DeliveryTerm. - Ключ:
supplier_id:supplier_sku. - Compaction: нет.
- Retention: 14 дней.
- Партиции: 12.
- Consumer:
core/pricing-projection(logistics rules),core/estimate-optimizer.
offer.observation.v1 (DEPRECATED — use offer.observation.v2)
- Статус: deprecated ADR-0025. Dual-write до миграции consumers; удаление через 30 дней после.
offer.observation.v2
- Назначение: новое observation по тройке
(SupplierOffer, SupplierCredential, Seller?)— снимокPriceSet + StockCurrent + StockForecast + DeliveryTermпосле очередного fetch. Для marketplace-поставщиков один fetch публикует N сообщений (по одному на seller’а); для обычных — одно сообщение сseller_ref=null. - Ключ:
supplier_id:supplier_sku:credential_id:seller_ref(seller_ref =noneдля non-marketplace). - Compaction: нет.
- Retention: 14 дней.
- Партиции: 24.
- Producer:
core/offers(после нормализации observation). - Consumer:
core/pricing(кэш-инвалидация),public-api ws server(push),clickhouse-sink(price_history / stock_history / forecast_history). - Payload envelope:
{offer_ref, credential_ref, seller_ref?, observed_at, prices: MultiCurrencyPrices (map<ISO4217, PriceSet>), stock_current: StockCurrent, stock_forecast?: StockForecast, delivery: DeliveryTerm, reliability: regular|degraded, supply_chain_trace_ref}.PriceSetможет иметьvalidity_windowдля promo периодов (ADR-0025). - Замечание: при стабильных полях допускается дедупликация на стороне producer’а — повторное observation без изменений можно не публиковать в этот топик, но всегда писать derived topic’и (
offer.price_set_changed.v1/offer.stock_changed.v1/offer.forecast_changed.v1/offer.delivery_term_changed.v1) при фактических изменениях. - Marketplace specifics (ADR-0026): каждый seller — отдельное сообщение. Partition key гарантирует order per
(offer, seller), не across sellers.
canonical.events.v1
- Назначение: все события canonical product (для projections + подписчиков). Включает
MediaAssetAdded/Removed,CanonicalMediaChanged. - Ключ:
canonical_id. - Compaction: да.
- Retention: бесконечно (compacted).
- Партиции: 24.
- Producer:
core/catalog(через outbox relay). - Consumer:
core/search-projection(ES),core/offers-matching,clickhouse-sink.
catalog.mappings.v1
- Назначение: события ACL-маппингов
SupplierCharacteristicMapping/SupplierClassificationMapping/SupplierManufacturerMapping. - Ключ:
supplier_ref. - Compaction: да.
- Retention: бесконечно (compacted).
- Партиции: 12.
- Producer:
core/catalog(через outbox relay). - Consumer:
core/matching— invalidate decisions по затронутым offer’ам.core/enrichment— invalidate extractor cache.core/search-projection— обновление переведённых classification tags.
catalog.taxonomies.v1
- Назначение: события federated identity standards:
IdentityStandardRegistered/Changed/Deprecated,TaxonomyCharacteristicMappingAdded/Changed/Removed,TaxonomyClassMappingAdded/Changed/Removed,TaxonomyValueMappingAdded/Changed/Removed. ADR-0028. - Ключ:
standard_ref(для standard events) или(standard_ref, external_code)(для mapping events). - Compaction: да (current state per standard / mapping).
- Retention: бесконечно.
- Партиции: 6.
- Producer:
core/catalog. - Consumer:
core/matching— invalidate при изменении mapping’а.core/enrichment— пересчёт extractor для затронутых features.admin-ui api— preview changes для operator.
catalog.external_identities.v1
- Назначение:
ExternalIdentityAssigned / ExternalIdentityRevokedна CanonicalProduct. ADR-0028. - Ключ:
canonical_id. - Compaction: да (как часть canonical state, но отдельный поток для streamed matching).
- Retention: бесконечно.
- Партиции: 12.
- Consumer:
core/matching(identity signal),core/search-projection(дополнительные lookup keys — поиск по GTIN/MPN).
supplier.incremental.v1
- Назначение: события
IncrementalSyncCursorAdvanced / Staled / Broken,IncrementalFallbackToFullCatalogTriggered. ADR-0029. - Ключ:
(supplier_ref, data_scope). - Compaction: да (current cursor state).
- Retention: бесконечно.
- Партиции: 6.
- Consumer:
admin-ui api(health dashboard),clickhouse-sink(cursor drift analytics), alerting (cursor_stale/cursor_broken).
enrichment.job.events.v2
- Статус: в дополнение к
v1. Schema расширена статусамиawaiting_report/fetchingи полямиremote_job_ref,poll_strategy(ADR-0024).v1dual-write 30 дней, далее deprecated. - Ключ:
job_id. - Compaction: да.
- Retention: бесконечно.
- Партиции: 12.
- Consumer:
public-api ws server,clickhouse-sink, audit.
canonical.snapshots.v1
- Назначение: periodic snapshots canonical для fast-rebuild проекций.
- Ключ:
canonical_id. - Compaction: да.
- Retention: бесконечно.
- Партиции: 24.
matching.decided.v1
- Назначение: результаты матчинга (для аудита, моделей обучения).
- Ключ:
supplier_id:supplier_sku. - Compaction: нет.
- Retention: 30 дней.
- Партиции: 12.
moderation.requested.v1
- Назначение: запросы на ручную модерацию (неизвестный алиас, низкий confidence, …).
- Ключ:
request_id. - Compaction: нет.
- Retention: 90 дней.
- Партиции: 6.
estimate.events.v1
- Назначение: события сметы (создана, версионирована, оптимизирована).
- Ключ:
estimate_id. - Compaction: да.
- Partition: 12.
supplier.sellers.v1
- Назначение: события Seller entity (marketplace aggregator sub-node) —
SellerRegistered,SellerRatingUpdated,SellerStatusChanged,SellerPromotedToSupplier. ADR-0026. - Ключ:
aggregator_supplier_id(Partition order per aggregator). - Compaction: да (current state per seller).
- Retention: бесконечно.
- Партиции: 12.
- Producer:
core/supplier-network. - Consumers:
core/pricing— invalidate cache при смене rating / status / blocked.core/search-projection— seller-aware фильтры.admin-ui api— live updates.clickhouse-sink— analytics (rating trends).
supplier.graph.events.v1
- Назначение: все события supplier graph: Supplier, SupplierRole, SupplierRelationship, dictionary kinds.
- Ключ:
supplier_idдля events of supplier;relationship_idдля relationship events. - Compaction: да (для restoration).
- Retention: бесконечно (compacted).
- Партиции: 12.
- Producer:
core/catalog(через outbox relay). - Consumers:
supply-chain-recalculator— пересчёт affected SupplyChainTraces.search-projection— admin search.public-api ws server— admin live notifications.clickhouse-sink— analytics.
Типы событий:
SupplierCreated,SupplierUpdated,SupplierDeprecatedSupplierRoleAdded,SupplierRoleRemoved,SupplierRoleScopeChangedSupplierRelationshipCreated,SupplierRelationshipUpdated,SupplierRelationshipRevokedSupplierRoleKindAdded,SupplierRoleKindUpdatedSupplierRelationshipKindAdded,SupplierRelationshipKindUpdatedSupplyChainTraceRecomputed
enrichment.job.events.v1
- Назначение: события async EnrichmentJob (queued, started, completed, failed).
- Ключ:
job_id. - Compaction: да (для текущего состояния).
- Retention: бесконечно.
- Партиции: 12.
- Consumers:
public-api ws server— для уведомлений клиентам, подписавшимся на конкретный job.clickhouse-sink— analytics.- audit.
Dead-letter топики
Для каждого основного топика — соответствующий DLQ <topic>.dlq:
- retention 30 дней;
- метрика:
kafka_dlq_messages_total; - алерт при появлении сообщений.
Консьюмер-группы
| Топик | Consumer group | Сервис |
|---|---|---|
raw.supplier.etm.payload.v1 | normalizer | normalization |
offer.normalized.v1 | matcher | matching |
canonical.events.v1 | search-projection | search |
canonical.events.v1 | ch-catalog-sink | clickhouse-sink |
offer.price_changed.v1 | pricing-projection | pricing |
offer.price_changed.v1 | ch-price-sink | clickhouse-sink |
Schema Registry
Рекомендация — использовать Schema Registry (Confluent) даже без Avro (поддерживает JSON Schema). Все изменения — через registry с проверкой compatibility level BACKWARD_TRANSITIVE для всех топиков по умолчанию.