Хранение через события (Event Sourcing)

NOTE

Статус: Target design. Документ описывает целевую архитектуру. Сервисы, модули и контракты, упомянутые ниже, могут ещё не существовать в backend/. Правила маркировки — в 50-processes/documentation-standard.md.

Применение Event Sourcing в системе. Где применяется, как устроено хранение, как работают проекции.

Что хранится через события, а что нет

Агрегат / данныеEvent SourcedПричина
Canonical Productдаистория изменений характеристик критична для объяснимости
Manufacturer (+aliases)даslowly-changing, важна история алиасов для аудита матчинга
Identity Profileдаизменение профиля → пересчёт identity, история нужна
Matching Decisionдааудит автоматических решений
Equivalence Classдаявное управление жизненным циклом
Supplier Offer metadataда (лёгкий ES)отслеживание появления/пропадания, lifecycle, изменения товарной канвы (характеристики/media). См. ADR-0003 §“Supplier Offer ES scope”. Цены/остатки/условия/forecast — в SupplierOfferObservation, append-only, не event-sourced.
SupplierCharacteristicMapping / SupplierClassificationMapping / SupplierManufacturerMappingдаACL маппинг — критический для explainability матчинга; кто/когда/как добавил.
ClientSkuMapда (лёгкий ES)per-credential cross-reference; изменение — аудитируемое.
BasePriceKindPolicyдаредкие изменения, критичны для финансового аудита.
PreferredSellerPolicyдаcustomer-visible policy, аудит обязателен.
Warehouse (с kind)да (лёгкий ES)Reclassify операция — структурная, аудит обязателен.
Seller (marketplace)да (лёгкий ES)lifecycle (registered → active → inactive → promoted), rating history — в CH. ADR-0026.
Price (current)нетсам current — read-only проекция из observation
PriceSet historyнет (append-only в CH через offer.observation.v2)объём большой, для ES избыточно; observations immutable by construction
Stock / StockForecastнет (append-only в CH)то же
Raw Payloadнет (immutable в S3)сами по себе событие
Session, cacheнетвременное состояние
EnrichmentJob runtime state (awaiting_report, fetching)нетoperational state, живёт в PG jobs table
Estimateда (лёгкий ES)версии важны для клиента

Event store в PostgreSQL

Основная таблица:

CREATE TABLE event_store (
    id              bigserial PRIMARY KEY,
    aggregate_type  text NOT NULL,
    aggregate_id    uuid NOT NULL,
    version         int  NOT NULL,
    event_type      text NOT NULL,
    event_data      jsonb NOT NULL,
    metadata        jsonb NOT NULL,
    occurred_at     timestamptz NOT NULL DEFAULT now(),
    UNIQUE (aggregate_type, aggregate_id, version)
);
 
CREATE INDEX event_store_aggregate_idx
    ON event_store (aggregate_type, aggregate_id, version);
 
CREATE INDEX event_store_type_occurred_idx
    ON event_store (aggregate_type, occurred_at);
 
CREATE TABLE snapshot_store (
    aggregate_type  text NOT NULL,
    aggregate_id    uuid NOT NULL,
    version         int  NOT NULL,
    state           jsonb NOT NULL,
    created_at      timestamptz NOT NULL DEFAULT now(),
    PRIMARY KEY (aggregate_type, aggregate_id)
);
 
CREATE TABLE outbox (
    id              bigserial PRIMARY KEY,
    event_id        bigint NOT NULL REFERENCES event_store(id),
    topic           text NOT NULL,
    partition_key   text NOT NULL,
    payload         jsonb NOT NULL,
    headers         jsonb NOT NULL DEFAULT '{}'::jsonb,
    enqueued_at     timestamptz NOT NULL DEFAULT now(),
    published_at    timestamptz,
    attempts        int NOT NULL DEFAULT 0
);
 
CREATE INDEX outbox_unpublished_idx
    ON outbox (enqueued_at) WHERE published_at IS NULL;

Метаданные события

Каждое событие содержит в metadata:

{
  "actor": {
    "type": "user" | "system" | "connector",
    "id": "user-42 | ingestor | etm"
  },
  "correlation_id": "uuid",
  "trace_id": "otel-trace-id",
  "source": "admin-api | matcher | normalizer | ...",
  "reason": "human-readable optional"
}

Это даёт полную аудит-трейл без дополнительных таблиц.

Транзакция записи

Запись события и публикация в Kafka — через outbox pattern:

BEGIN;
  INSERT INTO event_store (aggregate_type, aggregate_id, version, event_type, event_data, metadata)
         VALUES (...);
  UPDATE aggregate_snapshot_or_projection ...;  -- локальная проекция, если нужна немедленно
  INSERT INTO outbox (event_id, topic, partition_key, payload, headers)
         VALUES (...);
COMMIT;

Отдельный процесс Outbox Relay читает неопубликованные строки и публикует в Kafka. После подтверждения брокером — проставляет published_at.

Гарантии:

  • At-least-once публикация — consumer’ы должны быть идемпотентны.
  • Порядок в рамках одного aggregate_id гарантируется (partition_key = aggregate_id).

Снапшоты

Когда создаются

  • Периодически по числу событий: каждые N (по умолчанию 50) событий агрегата.
  • Периодически по времени: если агрегат не менялся M часов (по умолчанию 24), но имел снапшот старее K дней (по умолчанию 7) — пересоздать.
  • По триггеру: после “крупного” события (merge/split canonical, изменение identity profile, массовый rematch).

Как используются

При загрузке агрегата:

  1. Читаем snapshot из snapshot_store.
  2. Читаем события из event_store с version > snapshot.version.
  3. Применяем события к состоянию snapshot.

Если snapshot отсутствует — загружаем с нуля.

Проекции

Проекции — read-модели, обновляемые из событий. Они не часть event-sourced агрегата, их можно пересоздать заново.

Типы проекций

  • In-PG projections — денормализованные таблицы в том же PG (для быстрого чтения без джойнов). Обновляются в той же транзакции, что и event (через UPDATE по ключу).
  • Elasticsearch projectionscanonical_products индекс, обновляется consumer’ом из Kafka.
  • ClickHouse projections — агрегаты для аналитики, обновляются Kafka engine в CH.
  • Redis projections — “горячие” кэши (например, текущий снапшот top-товаров).

Пересборка

Каждая проекция должна уметь пересобираться:

  1. Читаем все события из event_store по aggregate_type упорядоченно по (aggregate_id, version).
  2. Применяем к пустой проекции.
  3. Переключаем чтение на новую проекцию (blue-green).

Для ES это происходит автоматически через blue-green reindex.

Evolution событий

Схемы событий версионируются:

  • Имя топика включает версию: canonical.events.v1.
  • JSON Schema в schemas/events/canonical.events.v1.json.
  • Добавление optional поля — не breaking.
  • Удаление поля / изменение типа — breaking: новый топик .v2, писатели пишут в оба до полной миграции читателей.

Замечания по производительности

  • Для агрегатов с очень частыми событиями (потенциальный риск у matching decisions) — отдельная стратегия с большим N в snapshot.
  • Outbox relay — в 2+ инстансах с распределением по partition hash.
  • Архивация event_store: старые события агрегатов, снятых с активного использования, переносятся в event_store_archive (та же структура) после 1+ года.

Архивация event_store

Триггер архивации:

  • агрегат не имел новых событий ≥ 365 дней;
  • ИЛИ агрегат в терминальном состоянии (canonical lifecycle = discontinued без активных offers; estimate finalized + закрытый заказ).

Процесс (job event-store-archiver, daily):

  1. Идентифицирует агрегаты, попавшие под триггер.
  2. Создаёт snapshot последнего состояния в snapshot_store (если ещё нет).
  3. Перемещает события в event_store_archive (та же схема + поле archived_at).
  4. Записывает AggregateArchived событие в основной store (для аудита).

Восстановление архивированного агрегата:

  • Чтение из snapshot из snapshot_store.
  • Если нужны исторические события — SELECT FROM event_store_archive WHERE aggregate_type = ? AND aggregate_id = ?.
  • Архивный store доступен только read-only из application code; writes — только через archiver job.

RTO для архивированного агрегата (если потребовалось вернуть в активную работу): < 5 минут (загрузка snapshot + опциональная replay архивных событий). Это предусматривает редкий сценарий (например, реактивация discontinued canonical при появлении нового offer).

SLA публикации outbox

См. ADR-0020. Кратко:

  • p95 outbox lag (enqueued_at → published_at) < 1 с.
  • alert при > 10 000 unpublished строк или при attempts ≥ 5.

Replay из event_store

Полный rebuild проекции (ES, ClickHouse projection, in-PG read model) — операция предусмотренная архитектурой, выполняется через blue-green:

  1. Инициализируется новая пустая проекция.
  2. SELECT FROM event_store ORDER BY aggregate_type, aggregate_id, version (с пагинацией по offset/cursor).
  3. Применяются обработчики проекции в pure-функциональной форме.
  4. По завершении — atomic switch чтения (для ES — alias swap, для CH — alter view).

Replay performance budget: ~10 000 events/sec на одной replay-реплике. Полный rebuild ES при 100M canonical events — порядка 2.5–3 часа в один поток, до часа при шардинге replay’ы по aggregate_id % N.