Хранение через события (Event Sourcing)
NOTE
Статус: Target design. Документ описывает целевую архитектуру. Сервисы, модули и контракты, упомянутые ниже, могут ещё не существовать в
backend/. Правила маркировки — в50-processes/documentation-standard.md.
Применение Event Sourcing в системе. Где применяется, как устроено хранение, как работают проекции.
Что хранится через события, а что нет
| Агрегат / данные | Event Sourced | Причина |
|---|---|---|
| Canonical Product | да | история изменений характеристик критична для объяснимости |
| Manufacturer (+aliases) | да | slowly-changing, важна история алиасов для аудита матчинга |
| Identity Profile | да | изменение профиля → пересчёт identity, история нужна |
| Matching Decision | да | аудит автоматических решений |
| Equivalence Class | да | явное управление жизненным циклом |
| Supplier Offer metadata | да (лёгкий ES) | отслеживание появления/пропадания, lifecycle, изменения товарной канвы (характеристики/media). См. ADR-0003 §“Supplier Offer ES scope”. Цены/остатки/условия/forecast — в SupplierOfferObservation, append-only, не event-sourced. |
SupplierCharacteristicMapping / SupplierClassificationMapping / SupplierManufacturerMapping | да | ACL маппинг — критический для explainability матчинга; кто/когда/как добавил. |
ClientSkuMap | да (лёгкий ES) | per-credential cross-reference; изменение — аудитируемое. |
BasePriceKindPolicy | да | редкие изменения, критичны для финансового аудита. |
PreferredSellerPolicy | да | customer-visible policy, аудит обязателен. |
Warehouse (с kind) | да (лёгкий ES) | Reclassify операция — структурная, аудит обязателен. |
Seller (marketplace) | да (лёгкий ES) | lifecycle (registered → active → inactive → promoted), rating history — в CH. ADR-0026. |
| Price (current) | нет | сам current — read-only проекция из observation |
| PriceSet history | нет (append-only в CH через offer.observation.v2) | объём большой, для ES избыточно; observations immutable by construction |
| Stock / StockForecast | нет (append-only в CH) | то же |
| Raw Payload | нет (immutable в S3) | сами по себе событие |
| Session, cache | нет | временное состояние |
| EnrichmentJob runtime state (awaiting_report, fetching) | нет | operational state, живёт в PG jobs table |
| Estimate | да (лёгкий ES) | версии важны для клиента |
Event store в PostgreSQL
Основная таблица:
CREATE TABLE event_store (
id bigserial PRIMARY KEY,
aggregate_type text NOT NULL,
aggregate_id uuid NOT NULL,
version int NOT NULL,
event_type text NOT NULL,
event_data jsonb NOT NULL,
metadata jsonb NOT NULL,
occurred_at timestamptz NOT NULL DEFAULT now(),
UNIQUE (aggregate_type, aggregate_id, version)
);
CREATE INDEX event_store_aggregate_idx
ON event_store (aggregate_type, aggregate_id, version);
CREATE INDEX event_store_type_occurred_idx
ON event_store (aggregate_type, occurred_at);
CREATE TABLE snapshot_store (
aggregate_type text NOT NULL,
aggregate_id uuid NOT NULL,
version int NOT NULL,
state jsonb NOT NULL,
created_at timestamptz NOT NULL DEFAULT now(),
PRIMARY KEY (aggregate_type, aggregate_id)
);
CREATE TABLE outbox (
id bigserial PRIMARY KEY,
event_id bigint NOT NULL REFERENCES event_store(id),
topic text NOT NULL,
partition_key text NOT NULL,
payload jsonb NOT NULL,
headers jsonb NOT NULL DEFAULT '{}'::jsonb,
enqueued_at timestamptz NOT NULL DEFAULT now(),
published_at timestamptz,
attempts int NOT NULL DEFAULT 0
);
CREATE INDEX outbox_unpublished_idx
ON outbox (enqueued_at) WHERE published_at IS NULL;Метаданные события
Каждое событие содержит в metadata:
{
"actor": {
"type": "user" | "system" | "connector",
"id": "user-42 | ingestor | etm"
},
"correlation_id": "uuid",
"trace_id": "otel-trace-id",
"source": "admin-api | matcher | normalizer | ...",
"reason": "human-readable optional"
}Это даёт полную аудит-трейл без дополнительных таблиц.
Транзакция записи
Запись события и публикация в Kafka — через outbox pattern:
BEGIN;
INSERT INTO event_store (aggregate_type, aggregate_id, version, event_type, event_data, metadata)
VALUES (...);
UPDATE aggregate_snapshot_or_projection ...; -- локальная проекция, если нужна немедленно
INSERT INTO outbox (event_id, topic, partition_key, payload, headers)
VALUES (...);
COMMIT;
Отдельный процесс Outbox Relay читает неопубликованные строки и публикует в Kafka. После подтверждения брокером — проставляет published_at.
Гарантии:
- At-least-once публикация — consumer’ы должны быть идемпотентны.
- Порядок в рамках одного aggregate_id гарантируется (partition_key = aggregate_id).
Снапшоты
Когда создаются
- Периодически по числу событий: каждые
N(по умолчанию 50) событий агрегата. - Периодически по времени: если агрегат не менялся
Mчасов (по умолчанию 24), но имел снапшот старееKдней (по умолчанию 7) — пересоздать. - По триггеру: после “крупного” события (merge/split canonical, изменение identity profile, массовый rematch).
Как используются
При загрузке агрегата:
- Читаем snapshot из
snapshot_store. - Читаем события из
event_storeсversion > snapshot.version. - Применяем события к состоянию snapshot.
Если snapshot отсутствует — загружаем с нуля.
Проекции
Проекции — read-модели, обновляемые из событий. Они не часть event-sourced агрегата, их можно пересоздать заново.
Типы проекций
- In-PG projections — денормализованные таблицы в том же PG (для быстрого чтения без джойнов). Обновляются в той же транзакции, что и event (через
UPDATEпо ключу). - Elasticsearch projections —
canonical_productsиндекс, обновляется consumer’ом из Kafka. - ClickHouse projections — агрегаты для аналитики, обновляются Kafka engine в CH.
- Redis projections — “горячие” кэши (например, текущий снапшот top-товаров).
Пересборка
Каждая проекция должна уметь пересобираться:
- Читаем все события из
event_storeпоaggregate_typeупорядоченно по(aggregate_id, version). - Применяем к пустой проекции.
- Переключаем чтение на новую проекцию (blue-green).
Для ES это происходит автоматически через blue-green reindex.
Evolution событий
Схемы событий версионируются:
- Имя топика включает версию:
canonical.events.v1. - JSON Schema в
schemas/events/canonical.events.v1.json. - Добавление optional поля — не breaking.
- Удаление поля / изменение типа — breaking: новый топик
.v2, писатели пишут в оба до полной миграции читателей.
Замечания по производительности
- Для агрегатов с очень частыми событиями (потенциальный риск у matching decisions) — отдельная стратегия с большим N в snapshot.
- Outbox relay — в 2+ инстансах с распределением по partition hash.
- Архивация
event_store: старые события агрегатов, снятых с активного использования, переносятся вevent_store_archive(та же структура) после 1+ года.
Архивация event_store
Триггер архивации:
- агрегат не имел новых событий ≥ 365 дней;
- ИЛИ агрегат в терминальном состоянии (
canonical lifecycle = discontinuedбез активных offers;estimate finalized + закрытый заказ).
Процесс (job event-store-archiver, daily):
- Идентифицирует агрегаты, попавшие под триггер.
- Создаёт snapshot последнего состояния в
snapshot_store(если ещё нет). - Перемещает события в
event_store_archive(та же схема + полеarchived_at). - Записывает
AggregateArchivedсобытие в основной store (для аудита).
Восстановление архивированного агрегата:
- Чтение из snapshot из
snapshot_store. - Если нужны исторические события —
SELECT FROM event_store_archive WHERE aggregate_type = ? AND aggregate_id = ?. - Архивный store доступен только read-only из application code; writes — только через archiver job.
RTO для архивированного агрегата (если потребовалось вернуть в активную работу): < 5 минут (загрузка snapshot + опциональная replay архивных событий). Это предусматривает редкий сценарий (например, реактивация discontinued canonical при появлении нового offer).
SLA публикации outbox
См. ADR-0020. Кратко:
- p95 outbox lag (enqueued_at → published_at) < 1 с.
- alert при > 10 000 unpublished строк или при
attempts ≥ 5.
Replay из event_store
Полный rebuild проекции (ES, ClickHouse projection, in-PG read model) — операция предусмотренная архитектурой, выполняется через blue-green:
- Инициализируется новая пустая проекция.
SELECT FROM event_store ORDER BY aggregate_type, aggregate_id, version(с пагинацией по offset/cursor).- Применяются обработчики проекции в pure-функциональной форме.
- По завершении — atomic switch чтения (для ES — alias swap, для CH — alter view).
Replay performance budget: ~10 000 events/sec на одной replay-реплике. Полный rebuild ES при 100M canonical events — порядка 2.5–3 часа в один поток, до часа при шардинге replay’ы по aggregate_id % N.