Паттерны интеграции
NOTE
Статус: Target design. Документ описывает целевую архитектуру. Сервисы, модули и контракты, упомянутые ниже, могут ещё не существовать в
backend/. Правила маркировки — в50-processes/documentation-standard.md.
Стандартные паттерны для интеграции с внешними поставщиками.
Принципиальное ограничение: no-proxy + graceful degradation
Все запросы к поставщикам — асинхронные, в background-процессах. Клиентский запрос никогда не блокируется ожиданием поставщика.
EnrichmentJob — единая модель async-задачи к поставщику
EnrichmentJob
├── id
├── kind — discovery | refresh_observation | refresh_characteristics |
│ rebuild_supply_chain | full_catalog | stock_batch |
│ manufacturer_dictionary_refresh | characteristic_dictionary_refresh |
│ classification_dictionary_refresh | client_sku_map_import
├── target — JSONB: что именно искать/обновлять (canonical_id, supplier_sku, etc.)
├── credential_context — какая credential (+ ключи AuthBucket / DataBucket)
├── priority — high | normal | low
├── status — queued | running | awaiting_report | fetching | completed | failed | skipped
├── remote_job_ref — для async-report pattern: {uuid, provider, submitted_at, deadline}
├── poll_strategy — {initial_delay, interval, max_wait, backoff}
├── enqueued_at, started_at, completed_at
├── triggered_by — user_request | schedule | system_event
├── parent_job_id — для chain'ов
├── result_summary
└── error — ConnectorError taxonomy (ADR-0024)
Дедупликация
Перед созданием job проверяем: есть ли pending/running job с тем же (kind, target, credential_context). Если да — присоединяемся, не создаём дубль. Возвращаем существующий job_id вызывающему.
Троттлинг
На один target — не чаще одного user-request triggered job в N секунд (по умолчанию 60s). Schedule jobs не считаются.
Приоритеты
high— по запросу клиента (он что-то просит, ждёт).normal— scheduled периодические.low— background обогащение.
Worker pool обрабатывает с приоритетом, но без полного starvation low (минимальная пропорция).
Подписки
Клиент может подписаться на EnrichmentJob.completed через webhook или SSE — тогда после завершения job клиент сразу получает обновлённые данные.
Pattern 1: REST API connector
Базовый паттерн для поставщиков с HTTP API (ETM, большинство дистрибьюторов).
Компоненты
- Session Manager — получение и переиспользование access-токена / сессии.
- Rate Limiter — централизованный (Redis), per-supplier budget.
- Async Job Poller — если поставщик генерирует отчёты асинхронно.
- Pagination walker — если ответы постраничные.
- Retry + Backoff — экспоненциальный backoff на transient errors.
- Circuit Breaker — при длительной недоступности.
- Raw Payload Writer — сохраняем все сырые ответы в S3.
- Parser / Normalizer — парсинг в
SupplierOffer.
Контракт connector.Connector (ADR-0024)
type Connector interface {
ID() string
Capabilities() Capabilities
Schema() SchemaDescriptor
// Sync data fetch.
Fetch(ctx context.Context, cred CredentialContext, job FetchJob) (<-chan RawPayload, error)
// Async report pattern.
SubmitRemoteJob(ctx context.Context, cred CredentialContext, job FetchJob) (RemoteJobRef, error)
PollRemoteJob(ctx context.Context, cred CredentialContext, ref RemoteJobRef) (RemoteJobState, error)
FetchRemoteJobResult(ctx context.Context, cred CredentialContext, ref RemoteJobRef) (<-chan RawPayload, error)
// Dictionary exports (manufacturer / characteristic / classification / client_sku_map).
FetchDictionary(ctx context.Context, cred CredentialContext, kind DictionaryKind) (<-chan RawPayload, error)
}
type CredentialContext struct {
CredentialID string
Scope CredentialScope // system | customer
CustomerID string
SecretRef string
AuthBucketKey string // ключ AuthBucket (обслуживает login / session refresh)
DataBucketKeys map[string]string // ключи DataBucket per endpoint_class
WarehouseScope []string
Features CredentialFeatures // catalog / prices / stock / place_orders /
// media_unwatermarked / dictionary_exports / async_reports
EgressPolicy EgressPolicy // fixed_ip_required, allowed_cidrs
}
type Capabilities struct {
Catalog bool
Characteristics bool
Prices bool
PriceSet PriceSetShape // Net / Gross / ListTarif / RetailRec
Stock bool
StockForecast bool
WarehouseKinds []WarehouseKind
AsyncJobs bool
Webhooks bool
Media MediaCapability
CodeTypes []SkuCodeType // ClientSku / SupplierSku / ManufacturerArticle
DictionaryExports []DictionaryKind
}
type FetchJob struct {
Type FetchJobType // FullCatalog | Incremental | PriceBatch | StockBatch | SingleSKU
Window TimeWindow
Filters map[string]any
}
type RawPayload struct {
IdentityPack SkuIdentityPack // supplier_sku, manufacturer_article?, manufacturer_code?, client_sku?
PayloadType string // "characteristics" | "price" | "stock" | "catalog-line" | "dictionary_*"
Data []byte
FetchedAt time.Time
Source SourceRef
CredentialID string
}
// ConnectorError — нормализованная taxonomy (ADR-0024).
type ConnectorErrorKind int
const (
AuthRejected ConnectorErrorKind = iota
SessionExpired
RateLimited
NotFound
OnRequest
Transient
Permanent
PartialSuccess
)Маршрутизация credentials
Выбор credential для запроса делает не connector, а CredentialRouter в core/ingestion:
type CredentialRouter interface {
// Для catalog discovery: вернуть credential по round-robin
// среди всех credentials с allowed_use IN (own, shared_for_catalog).
NextForCatalog(supplierID string) (CredentialContext, error)
// Для customer-specific pricing/stock: вернуть credential customer'а или fallback на system.
ForCustomer(supplierID, customerID string, requireFresh bool) (CredentialContext, error)
// Для системных потоков: системная credential.
ForSystem(supplierID string) (CredentialContext, error)
}Таким образом connector полностью изолирован от логики выбора credential — ему просто передают, какой credential context использовать.
Как реализуется
В connectors/<n>/:
domain/— value objects (SupplierSKU, SupplierReference) и доменные ошибки.app/:command/fetch_full_catalog.go— use case полной выгрузки.command/fetch_prices_batch.go,fetch_stock_batch.go.port/supplier_client.go— интерфейс HTTP-клиента.port/session_store.go— интерфейс хранилища сессий.
infra/:integration/http_client.go— реализация port’а, с rate limiter, retries.integration/session_manager.go— реализация port’а (Redis).integration/parser.go— из RawPayload в структуры.persistence/raw_payload_writer.go— S3.
api/:consumer/scheduled_fetch.go— consumer сообщений от scheduler’а.consumer/async_job_poller.go.
Конфигурация
Каждый connector имеет свой конфиг-блок:
connectors:
etm:
enabled: true
base_url: "https://ipro.etm.ru/api/v1"
credentials_ref: "secret://connectors/etm/system"
rate_limit:
auth:
login:
rate: 1
period: 120s
mode: wait # блокирующее ожидание — нельзя fail (ADR-0008)
data:
goods:
rate: 1
period: 1s
mode: skip_and_reschedule
price:
rate: 1
period: 1s
mode: skip_and_reschedule
remains:
rate: 1
period: 1s
mode: skip_and_reschedule
async_poll:
rate: 2
period: 1s
mode: skip_and_reschedule
session_ttl: 120m # 2 часа по spec ETM
session_refresh_before: 5m
warehouses: [11, 12, 13, 14, 15, 16, 17, 18, 19, 35, 36]
warehouse_kinds:
11: regional_center
12: regional_center
36: logistics_center
async_jobs:
poll_strategy:
initial_delay: 60s
interval: 60s
max_wait: 3h
backoff: { factor: 1.0 } # у ETM poll не экспоненциальный
code_types: [cli, supplier, manufacturer]
price_set_shape: [net, gross, list_tarif, retail_rec]
media:
supports_images: true
supports_videos: true
supports_certificates: true
watermark_policy: requires_feature_media_unwatermarked
egress:
fixed_ip_required: true # ETM требует whitelisted IP для foreign hostingPattern 2: File feed connector
Если поставщик отдаёт данные через файловые выгрузки (FTP / S3 / HTTP-скачивание). Пример: КЭАЗ (https://files.keaz.ru/ftp/keaz.xls?<timestamp> + keaz.zip).
Варианты формата
FileFeed.Format:
xls/xlsx— табличные прайс-листы (типично РФ-дистрибьюторы).csv— flat data (классический exchange format).json/ndjson— структурированные выгрузки.xml— старые ERP / OASIS / Open Trans формата.bmecat— специализированный XML для промышленных каталогов (включая ETIM payloads).
Сжатие
FileFeed.Compression:
none.zip— архив с одним файлом внутри (КЭАЗ:keaz.zip → keaz.xls).gzip.
Check mode (когда качать)
FileFeed.CheckMode:
etag— полагаемся на HTTPETagheader, conditionalIf-None-Match.last_modified—If-Modified-Since. КЭАЗ: возвращаетLast-Modified.timestamp_in_url— URL содержит query-param с timestamp (?1776499168). Изменение timestamp = новый контент. Самый быстрый способ узнать про обновление.always— скачиваем по расписанию вне зависимости от headers (fallback).
Encoding
FileFeed.Encoding: utf-8 | windows-1251 | iso-8859-1. РФ-выгрузки часто в windows-1251 — обязательно явно указывать, парсер не должен полагаться на autodetect.
Шаги
- Scheduler / Cron запускает
EnrichmentJob{kind=full_catalog}или{kind=refresh_observation}. - Check: HEAD-запрос с
If-None-Match/If-Modified-Since. Если 304 Not Modified —EnrichmentJobSkipped{reason=not_modified}. - Download (GET) с
Authorizationесли требуется. Для КЭАЗ — public, без auth. - Decompress если
Compression != none. - Raw Writer сохраняет archive + extracted file в S3.
- Parser (format-specific) обрабатывает построчно. Для больших файлов — streaming (не загружать весь XLS в память).
- Fan-out: каждая строка →
RawPayloadсpayload_type=catalog-line. Observations генерятся пачками.
Конфигурация (пример КЭАЗ)
connectors:
keaz:
enabled: true
transport: file_feed
file_feed:
url: "https://files.keaz.ru/ftp/keaz.zip"
format: xls
compression: zip
check_mode: timestamp_in_url # URL несёт ?<ts>, изменение ts = новый файл
encoding: windows-1251 # типично для РФ-выгрузок
update_schedule: "0 */4 * * *" # каждые 4 часа check; сам файл обновляется 1-2 раза в день
capabilities:
catalog: true
characteristics: true
prices: true
stock: true
stock_forecast: false
media: false # XLS не содержит media; отдельный catalog-etim для images
taxonomies:
supported_standards: [etim_v7] # ETIM caталог отдельный
native_coding: false
incremental_sync:
supported: false # КЭАЗ всегда full file
auth:
schema: public_no_auth
rate_limit:
data:
download:
rate: 1
period: 3600s # не чаще 1 раз в час скачиваем 12MB файл
mode: skip_and_rescheduleКонфигурация Systeme Electric (REST + API key + ETIM native + /getdeltaproducts)
connectors:
systeme-electric:
enabled: true
transport: rest
base_url: "https://api.systeme.ru/new-api/JSON"
auth:
schema: api_key
key_param: accessCode
key_ref: "secret://connectors/systeme-electric/system"
rate_limit:
auth: {} # API key — single-shot, не нужен bucket
data:
getdata: { rate: 5, period: 1s, mode: skip_and_reschedule }
getprice: { rate: 5, period: 1s, mode: skip_and_reschedule }
getstocks: { rate: 5, period: 1s, mode: skip_and_reschedule }
getdeltaproducts: { rate: 1, period: 10s, mode: skip_and_reschedule }
pagination:
kind: query_param
page_param: page
page_size_param: pageSize
max_page_size: 50
batching:
kind: comma_separated
param: commercialRef
max_items: 50
capabilities:
prices: true
price_set_shape: [list_tarif] # tariff price; non-tariff требует manager request
stock: true
stock_forecast: false
taxonomies:
supported_standards: [etim_v5, etim_v6, etim_v7]
native_coding: true # payload содержит ETIM коды напрямую
exports_dictionary: false # ETIM registry берём из официального dump
incremental_sync:
supported: true
data_scopes: [materials, price, stock]
cursor_kind: date # /getdeltaproducts?since=<date>
min_fetch_interval: 5m
max_fetch_interval: 24h
multi_currency:
supported_currencies: [RUB]
default_currency: RUB
code_types: [supplier, manufacturer] # commercialRef + MPN
media:
supports_images: true
supports_videos: true
supports_certificates: trueКонфигурация DKC (REST + MasterKey→AccessToken + revision incremental + multi-currency + ETIM export)
connectors:
dkc:
enabled: true
transport: rest
base_url: "https://api.dkc.ru/v1"
auth:
schema: master_key_token_exchange
master_key_ref: "secret://connectors/dkc/system/master_key"
exchange_endpoint: "/auth.access.token/{MasterKey}"
access_token_lifetime: 3600s # оценка; SessionManager proactively refresh'ит за 5 мин до истечения
rate_limit:
auth:
token_exchange: { rate: 1, period: 60s, mode: wait }
data:
catalog_material: { rate: 5, period: 1s, mode: skip_and_reschedule }
catalog_material_stock: { rate: 5, period: 1s, mode: skip_and_reschedule }
revisions: { rate: 2, period: 1s, mode: skip_and_reschedule }
etim: { rate: 1, period: 5s, mode: skip_and_reschedule }
capabilities:
prices: true
price_set_shape: [retail_rec] # retailPrice + minRetailPrice
stock: true
stock_forecast: false
taxonomies:
supported_standards: [etim_v7]
native_coding: true
exports_dictionary: true # /etim/* endpoints отдают полный dump
incremental_sync:
supported: true
data_scopes:
- materials
- stock
- certificates
- packings
- nodes
- distributor_offices
- taxonomy_etim_v7
cursor_kind: revision # /revisions/last/size + /revisions/last
min_fetch_interval: 5m
max_fetch_interval: 2h # DKC revision формируется каждые 2h (stock)
supports_bulk_deltas: false
multi_currency:
supported_currencies: [RUB, KZT] # /catalog/material/price/kzt и /retailPrice
default_currency: RUB
parallel_currency_fetch: false # отдельный endpoint per currency
code_types: [supplier, manufacturer]
media:
supports_images: true
supports_videos: true
supports_certificates: truePattern 3: Webhook-driven
Если поставщик умеет push. Мы поднимаем endpoint в connector’е, валидируем подпись, пишем raw payload, далее — общий pipeline.
Pattern 4: HTML scraper (ADR-0027)
Для поставщиков без API (пример: smart-shop.pro). Данные через парсинг HTML страниц.
Compliance gate (обязательно до implementation)
robots.txtпроверен на наличие запретов для нужных путей.- TOS проанализирован — есть ли прямой запрет автоматического сбора?
- Если запрет или неоднозначность — письменное разрешение от поставщика (email / договор), артефакт
compliance/written-permission.md. - User-Agent идентифицируемый:
Tracium-Bot/1.0 (+https://tracium.tld/bot; contact=catalog-ops@tracium.tld). - Rate budget start conservative (≤ 0.5 req/сек).
Retry-Aftercompliance — без игнора 429/503.
CI compliance-gate stage блокирует merge при отсутствии артефактов.
Компоненты
- Session Manager — cookie jar + CSRF token + логин через form data.
- Rate Limiter — двухуровневый (ADR-0008), значения агрессивно консервативные.
- HTML Fetcher —
net/httpклиент с encoding detection (windows-1251!), decompression, cookie jar; либо Playwright / chromedp для CSR-сайтов. - SelectorRegistry — CSS/XPath per connector в yaml (изменение markup ≠ код).
- Parser — golden fixtures tests, markup drift detection.
- Raw Payload Writer — HTML blob (gzipped) в S3 + структурированные fields в
RawPayload.Data. - Headless Browser Pod (опционально, если
RequiresHeadlessBrowser=true): Chromium container, 1 session per pod, liveness probe.
Capabilities specifics
Capabilities{
TransportKind: html_scrape,
RequiresHeadlessBrowser: true, // если SPA
RespectsRobotsTxt: true, // обязательно для scraper'ов
Marketplace: Marketplace{
IsMarketplace: true, // если сайт — агрегатор
SellerExports: true,
CrossSellerInventory: false,
},
// ...
}Observation от html_scrape по умолчанию помечается reliability=degraded (scraping менее надёжен чем API).
Конфигурация (пример smart-shop.pro)
connectors:
smart-shop-pro:
enabled: true
base_url: "https://smart-shop.pro"
transport: html_scrape
renders_csr: true
compliance:
robots_txt_checked_at: 2026-04-18
tos_analysis_ref: "compliance/smart-shop-pro/tos-analysis.md"
written_permission_ref: "compliance/smart-shop-pro/permission-2026.pdf"
user_agent: "Tracium-Bot/1.0 (+https://tracium.tld/bot; contact=catalog-ops@tracium.tld)"
headless:
image: mcr.microsoft.com/playwright/go:latest
cpu: 500m
memory: 1Gi
page_timeout: 30s
block_resource_types: [image, font, media, stylesheet]
rate_limit:
auth:
login:
rate: 1
period: 300s
mode: wait
data:
listing:
rate: 1
period: 2s
mode: skip_and_reschedule
itemcard:
rate: 1
period: 2s
mode: skip_and_reschedule
pagination:
kind: query_param
param: page
start: 1
url_templates:
category: "/catalogs/{group_code}/{category_code}"
itemcard: "/itemcard/{brand}/{article}"
analogs: "/item/{composite_id}/analogs"
selectors_ref: "connectors/smart-shop-pro/selectors.yaml"
marketplace:
enabled: true
seller_row_selector: ".seller-offer-row"
default_seller_trust: unverified
capabilities:
prices: true
stock: true
stock_forecast: false
characteristics: true
media: true
code_types: [supplier, manufacturer]
price_set_shape: [gross]
egress:
user_agent_signature_required: trueMarkup drift detection
Parser сверяет с baseline:
- количество
item_rowна listing ≥ min_threshold; - presence обязательных полей (price, article, title);
- regex match price format
\d[\d\s]*([,.]\d+)?\s*(?:руб|₽); - seller block detection consistency.
При drift — ConnectorError.Permanent + alert connector_markup_drift_total{supplier}. Никакого guessing — fail-fast.
Ethical crawling
- Identified User-Agent (выше).
- Conditional requests (
If-Modified-Since/ETagгде поддерживается). - No parallel crawl внутри одного host’а — один worker на хост.
- Sitemap-first, если доступен.
- Skip auth-walls если TOS неоднозначен; customer credential — только по явному consent customer’а.
- Honor
429 / 503 / Retry-Afterбезусловно.
Общие требования ко всем паттернам
- Идемпотентность: повторная обработка того же payload’а не создаёт дублей.
- Observability: структурированные логи с
supplier_id,correlation_id, метрики fetch success/failure, latency, rate-limit hits. - Graceful failure: падение одного коннектора не влияет на остальные.
- No silent drops: необработанные payload’ы копятся в dead-letter queue с алертом.
- Raw retention: сырые payload’ы хранятся минимум 30 дней (точная политика — в open questions).
- Connector contract compliance: все 9 пунктов ADR-0024 выполняются (SessionManager, двухуровневый rate, SKU identity pack, error taxonomy, async-report, media rights, capabilities, egress, provider contract tests).
- Marketplace awareness (ADR-0026): если
Capabilities.Marketplace.IsMarketplace=true— один fetch генерирует N observations сseller_ref. Core обрабатывает эту размерность транспарентно. - Scraper compliance (ADR-0027): для
TransportKind=html_scrapeобязательны robots.txt / TOS / written permission артефакты + identified User-Agent.
См. также ../50-processes/adding-new-supplier.md, ADR-0024, ADR-0026, ADR-0027.