Паттерны интеграции

NOTE

Статус: Target design. Документ описывает целевую архитектуру. Сервисы, модули и контракты, упомянутые ниже, могут ещё не существовать в backend/. Правила маркировки — в 50-processes/documentation-standard.md.

Стандартные паттерны для интеграции с внешними поставщиками.

Принципиальное ограничение: no-proxy + graceful degradation

Все запросы к поставщикам — асинхронные, в background-процессах. Клиентский запрос никогда не блокируется ожиданием поставщика.

См. ADR-0011 и ADR-0014.

EnrichmentJob — единая модель async-задачи к поставщику

EnrichmentJob
├── id
├── kind                — discovery | refresh_observation | refresh_characteristics |
│                          rebuild_supply_chain | full_catalog | stock_batch |
│                          manufacturer_dictionary_refresh | characteristic_dictionary_refresh |
│                          classification_dictionary_refresh | client_sku_map_import
├── target              — JSONB: что именно искать/обновлять (canonical_id, supplier_sku, etc.)
├── credential_context  — какая credential (+ ключи AuthBucket / DataBucket)
├── priority            — high | normal | low
├── status              — queued | running | awaiting_report | fetching | completed | failed | skipped
├── remote_job_ref      — для async-report pattern: {uuid, provider, submitted_at, deadline}
├── poll_strategy       — {initial_delay, interval, max_wait, backoff}
├── enqueued_at, started_at, completed_at
├── triggered_by        — user_request | schedule | system_event
├── parent_job_id       — для chain'ов
├── result_summary
└── error               — ConnectorError taxonomy (ADR-0024)

Дедупликация

Перед созданием job проверяем: есть ли pending/running job с тем же (kind, target, credential_context). Если да — присоединяемся, не создаём дубль. Возвращаем существующий job_id вызывающему.

Троттлинг

На один target — не чаще одного user-request triggered job в N секунд (по умолчанию 60s). Schedule jobs не считаются.

Приоритеты

  • high — по запросу клиента (он что-то просит, ждёт).
  • normal — scheduled периодические.
  • low — background обогащение.

Worker pool обрабатывает с приоритетом, но без полного starvation low (минимальная пропорция).

Подписки

Клиент может подписаться на EnrichmentJob.completed через webhook или SSE — тогда после завершения job клиент сразу получает обновлённые данные.

Pattern 1: REST API connector

Базовый паттерн для поставщиков с HTTP API (ETM, большинство дистрибьюторов).

Компоненты

  • Session Manager — получение и переиспользование access-токена / сессии.
  • Rate Limiter — централизованный (Redis), per-supplier budget.
  • Async Job Poller — если поставщик генерирует отчёты асинхронно.
  • Pagination walker — если ответы постраничные.
  • Retry + Backoff — экспоненциальный backoff на transient errors.
  • Circuit Breaker — при длительной недоступности.
  • Raw Payload Writer — сохраняем все сырые ответы в S3.
  • Parser / Normalizer — парсинг в SupplierOffer.

Контракт connector.Connector (ADR-0024)

type Connector interface {
    ID() string
    Capabilities() Capabilities
    Schema() SchemaDescriptor
 
    // Sync data fetch.
    Fetch(ctx context.Context, cred CredentialContext, job FetchJob) (<-chan RawPayload, error)
 
    // Async report pattern.
    SubmitRemoteJob(ctx context.Context, cred CredentialContext, job FetchJob) (RemoteJobRef, error)
    PollRemoteJob(ctx context.Context, cred CredentialContext, ref RemoteJobRef) (RemoteJobState, error)
    FetchRemoteJobResult(ctx context.Context, cred CredentialContext, ref RemoteJobRef) (<-chan RawPayload, error)
 
    // Dictionary exports (manufacturer / characteristic / classification / client_sku_map).
    FetchDictionary(ctx context.Context, cred CredentialContext, kind DictionaryKind) (<-chan RawPayload, error)
}
 
type CredentialContext struct {
    CredentialID       string
    Scope              CredentialScope    // system | customer
    CustomerID         string
    SecretRef          string
    AuthBucketKey      string             // ключ AuthBucket (обслуживает login / session refresh)
    DataBucketKeys     map[string]string  // ключи DataBucket per endpoint_class
    WarehouseScope     []string
    Features           CredentialFeatures // catalog / prices / stock / place_orders /
                                          // media_unwatermarked / dictionary_exports / async_reports
    EgressPolicy       EgressPolicy       // fixed_ip_required, allowed_cidrs
}
 
type Capabilities struct {
    Catalog           bool
    Characteristics   bool
    Prices            bool
    PriceSet          PriceSetShape      // Net / Gross / ListTarif / RetailRec
    Stock             bool
    StockForecast     bool
    WarehouseKinds    []WarehouseKind
    AsyncJobs         bool
    Webhooks          bool
    Media             MediaCapability
    CodeTypes         []SkuCodeType      // ClientSku / SupplierSku / ManufacturerArticle
    DictionaryExports []DictionaryKind
}
 
type FetchJob struct {
    Type     FetchJobType  // FullCatalog | Incremental | PriceBatch | StockBatch | SingleSKU
    Window   TimeWindow
    Filters  map[string]any
}
 
type RawPayload struct {
    IdentityPack SkuIdentityPack   // supplier_sku, manufacturer_article?, manufacturer_code?, client_sku?
    PayloadType  string            // "characteristics" | "price" | "stock" | "catalog-line" | "dictionary_*"
    Data         []byte
    FetchedAt    time.Time
    Source       SourceRef
    CredentialID string
}
 
// ConnectorError — нормализованная taxonomy (ADR-0024).
type ConnectorErrorKind int
const (
    AuthRejected ConnectorErrorKind = iota
    SessionExpired
    RateLimited
    NotFound
    OnRequest
    Transient
    Permanent
    PartialSuccess
)

Маршрутизация credentials

Выбор credential для запроса делает не connector, а CredentialRouter в core/ingestion:

type CredentialRouter interface {
    // Для catalog discovery: вернуть credential по round-robin
    // среди всех credentials с allowed_use IN (own, shared_for_catalog).
    NextForCatalog(supplierID string) (CredentialContext, error)
 
    // Для customer-specific pricing/stock: вернуть credential customer'а или fallback на system.
    ForCustomer(supplierID, customerID string, requireFresh bool) (CredentialContext, error)
 
    // Для системных потоков: системная credential.
    ForSystem(supplierID string) (CredentialContext, error)
}

Таким образом connector полностью изолирован от логики выбора credential — ему просто передают, какой credential context использовать.

Как реализуется

В connectors/<n>/:

  • domain/ — value objects (SupplierSKU, SupplierReference) и доменные ошибки.
  • app/:
    • command/fetch_full_catalog.go — use case полной выгрузки.
    • command/fetch_prices_batch.go, fetch_stock_batch.go.
    • port/supplier_client.go — интерфейс HTTP-клиента.
    • port/session_store.go — интерфейс хранилища сессий.
  • infra/:
    • integration/http_client.go — реализация port’а, с rate limiter, retries.
    • integration/session_manager.go — реализация port’а (Redis).
    • integration/parser.go — из RawPayload в структуры.
    • persistence/raw_payload_writer.go — S3.
  • api/:
    • consumer/scheduled_fetch.go — consumer сообщений от scheduler’а.
    • consumer/async_job_poller.go.

Конфигурация

Каждый connector имеет свой конфиг-блок:

connectors:
  etm:
    enabled: true
    base_url: "https://ipro.etm.ru/api/v1"
    credentials_ref: "secret://connectors/etm/system"
    rate_limit:
      auth:
        login:
          rate: 1
          period: 120s
          mode: wait            # блокирующее ожидание — нельзя fail (ADR-0008)
      data:
        goods:
          rate: 1
          period: 1s
          mode: skip_and_reschedule
        price:
          rate: 1
          period: 1s
          mode: skip_and_reschedule
        remains:
          rate: 1
          period: 1s
          mode: skip_and_reschedule
        async_poll:
          rate: 2
          period: 1s
          mode: skip_and_reschedule
    session_ttl: 120m            # 2 часа по spec ETM
    session_refresh_before: 5m
    warehouses: [11, 12, 13, 14, 15, 16, 17, 18, 19, 35, 36]
    warehouse_kinds:
      11: regional_center
      12: regional_center
      36: logistics_center
    async_jobs:
      poll_strategy:
        initial_delay: 60s
        interval: 60s
        max_wait: 3h
        backoff: { factor: 1.0 }  # у ETM poll не экспоненциальный
    code_types: [cli, supplier, manufacturer]
    price_set_shape: [net, gross, list_tarif, retail_rec]
    media:
      supports_images: true
      supports_videos: true
      supports_certificates: true
      watermark_policy: requires_feature_media_unwatermarked
    egress:
      fixed_ip_required: true   # ETM требует whitelisted IP для foreign hosting

Pattern 2: File feed connector

Если поставщик отдаёт данные через файловые выгрузки (FTP / S3 / HTTP-скачивание). Пример: КЭАЗ (https://files.keaz.ru/ftp/keaz.xls?<timestamp> + keaz.zip).

Варианты формата

FileFeed.Format:

  • xls / xlsx — табличные прайс-листы (типично РФ-дистрибьюторы).
  • csv — flat data (классический exchange format).
  • json / ndjson — структурированные выгрузки.
  • xml — старые ERP / OASIS / Open Trans формата.
  • bmecat — специализированный XML для промышленных каталогов (включая ETIM payloads).

Сжатие

FileFeed.Compression:

  • none.
  • zip — архив с одним файлом внутри (КЭАЗ: keaz.zip → keaz.xls).
  • gzip.

Check mode (когда качать)

FileFeed.CheckMode:

  • etag — полагаемся на HTTP ETag header, conditional If-None-Match.
  • last_modifiedIf-Modified-Since. КЭАЗ: возвращает Last-Modified.
  • timestamp_in_url — URL содержит query-param с timestamp (?1776499168). Изменение timestamp = новый контент. Самый быстрый способ узнать про обновление.
  • always — скачиваем по расписанию вне зависимости от headers (fallback).

Encoding

FileFeed.Encoding: utf-8 | windows-1251 | iso-8859-1. РФ-выгрузки часто в windows-1251 — обязательно явно указывать, парсер не должен полагаться на autodetect.

Шаги

  1. Scheduler / Cron запускает EnrichmentJob{kind=full_catalog} или {kind=refresh_observation}.
  2. Check: HEAD-запрос с If-None-Match / If-Modified-Since. Если 304 Not Modified — EnrichmentJobSkipped{reason=not_modified}.
  3. Download (GET) с Authorization если требуется. Для КЭАЗ — public, без auth.
  4. Decompress если Compression != none.
  5. Raw Writer сохраняет archive + extracted file в S3.
  6. Parser (format-specific) обрабатывает построчно. Для больших файлов — streaming (не загружать весь XLS в память).
  7. Fan-out: каждая строка → RawPayload с payload_type=catalog-line. Observations генерятся пачками.

Конфигурация (пример КЭАЗ)

connectors:
  keaz:
    enabled: true
    transport: file_feed
    file_feed:
      url: "https://files.keaz.ru/ftp/keaz.zip"
      format: xls
      compression: zip
      check_mode: timestamp_in_url          # URL несёт ?<ts>, изменение ts = новый файл
      encoding: windows-1251                 # типично для РФ-выгрузок
      update_schedule: "0 */4 * * *"         # каждые 4 часа check; сам файл обновляется 1-2 раза в день
    capabilities:
      catalog: true
      characteristics: true
      prices: true
      stock: true
      stock_forecast: false
      media: false                            # XLS не содержит media; отдельный catalog-etim для images
      taxonomies:
        supported_standards: [etim_v7]        # ETIM caталог отдельный
        native_coding: false
      incremental_sync:
        supported: false                       # КЭАЗ всегда full file
    auth:
      schema: public_no_auth
    rate_limit:
      data:
        download:
          rate: 1
          period: 3600s                        # не чаще 1 раз в час скачиваем 12MB файл
          mode: skip_and_reschedule

Конфигурация Systeme Electric (REST + API key + ETIM native + /getdeltaproducts)

connectors:
  systeme-electric:
    enabled: true
    transport: rest
    base_url: "https://api.systeme.ru/new-api/JSON"
    auth:
      schema: api_key
      key_param: accessCode
      key_ref: "secret://connectors/systeme-electric/system"
    rate_limit:
      auth: {}                                # API key — single-shot, не нужен bucket
      data:
        getdata:        { rate: 5, period: 1s, mode: skip_and_reschedule }
        getprice:       { rate: 5, period: 1s, mode: skip_and_reschedule }
        getstocks:      { rate: 5, period: 1s, mode: skip_and_reschedule }
        getdeltaproducts: { rate: 1, period: 10s, mode: skip_and_reschedule }
    pagination:
      kind: query_param
      page_param: page
      page_size_param: pageSize
      max_page_size: 50
    batching:
      kind: comma_separated
      param: commercialRef
      max_items: 50
    capabilities:
      prices: true
      price_set_shape: [list_tarif]           # tariff price; non-tariff требует manager request
      stock: true
      stock_forecast: false
      taxonomies:
        supported_standards: [etim_v5, etim_v6, etim_v7]
        native_coding: true                   # payload содержит ETIM коды напрямую
        exports_dictionary: false             # ETIM registry берём из официального dump
      incremental_sync:
        supported: true
        data_scopes: [materials, price, stock]
        cursor_kind: date                      # /getdeltaproducts?since=<date>
        min_fetch_interval: 5m
        max_fetch_interval: 24h
      multi_currency:
        supported_currencies: [RUB]
        default_currency: RUB
      code_types: [supplier, manufacturer]     # commercialRef + MPN
      media:
        supports_images: true
        supports_videos: true
        supports_certificates: true

Конфигурация DKC (REST + MasterKey→AccessToken + revision incremental + multi-currency + ETIM export)

connectors:
  dkc:
    enabled: true
    transport: rest
    base_url: "https://api.dkc.ru/v1"
    auth:
      schema: master_key_token_exchange
      master_key_ref: "secret://connectors/dkc/system/master_key"
      exchange_endpoint: "/auth.access.token/{MasterKey}"
      access_token_lifetime: 3600s             # оценка; SessionManager proactively refresh'ит за 5 мин до истечения
    rate_limit:
      auth:
        token_exchange: { rate: 1, period: 60s, mode: wait }
      data:
        catalog_material:     { rate: 5, period: 1s, mode: skip_and_reschedule }
        catalog_material_stock: { rate: 5, period: 1s, mode: skip_and_reschedule }
        revisions:            { rate: 2, period: 1s, mode: skip_and_reschedule }
        etim:                 { rate: 1, period: 5s, mode: skip_and_reschedule }
    capabilities:
      prices: true
      price_set_shape: [retail_rec]            # retailPrice + minRetailPrice
      stock: true
      stock_forecast: false
      taxonomies:
        supported_standards: [etim_v7]
        native_coding: true
        exports_dictionary: true                # /etim/* endpoints отдают полный dump
      incremental_sync:
        supported: true
        data_scopes:
          - materials
          - stock
          - certificates
          - packings
          - nodes
          - distributor_offices
          - taxonomy_etim_v7
        cursor_kind: revision                    # /revisions/last/size + /revisions/last
        min_fetch_interval: 5m
        max_fetch_interval: 2h                   # DKC revision формируется каждые 2h (stock)
        supports_bulk_deltas: false
      multi_currency:
        supported_currencies: [RUB, KZT]         # /catalog/material/price/kzt и /retailPrice
        default_currency: RUB
        parallel_currency_fetch: false           # отдельный endpoint per currency
      code_types: [supplier, manufacturer]
      media:
        supports_images: true
        supports_videos: true
        supports_certificates: true

Pattern 3: Webhook-driven

Если поставщик умеет push. Мы поднимаем endpoint в connector’е, валидируем подпись, пишем raw payload, далее — общий pipeline.

Pattern 4: HTML scraper (ADR-0027)

Для поставщиков без API (пример: smart-shop.pro). Данные через парсинг HTML страниц.

Compliance gate (обязательно до implementation)

  1. robots.txt проверен на наличие запретов для нужных путей.
  2. TOS проанализирован — есть ли прямой запрет автоматического сбора?
  3. Если запрет или неоднозначность — письменное разрешение от поставщика (email / договор), артефакт compliance/written-permission.md.
  4. User-Agent идентифицируемый: Tracium-Bot/1.0 (+https://tracium.tld/bot; contact=catalog-ops@tracium.tld).
  5. Rate budget start conservative (≤ 0.5 req/сек).
  6. Retry-After compliance — без игнора 429/503.

CI compliance-gate stage блокирует merge при отсутствии артефактов.

Компоненты

  • Session Manager — cookie jar + CSRF token + логин через form data.
  • Rate Limiter — двухуровневый (ADR-0008), значения агрессивно консервативные.
  • HTML Fetchernet/http клиент с encoding detection (windows-1251!), decompression, cookie jar; либо Playwright / chromedp для CSR-сайтов.
  • SelectorRegistry — CSS/XPath per connector в yaml (изменение markup ≠ код).
  • Parser — golden fixtures tests, markup drift detection.
  • Raw Payload Writer — HTML blob (gzipped) в S3 + структурированные fields в RawPayload.Data.
  • Headless Browser Pod (опционально, если RequiresHeadlessBrowser=true): Chromium container, 1 session per pod, liveness probe.

Capabilities specifics

Capabilities{
  TransportKind:           html_scrape,
  RequiresHeadlessBrowser: true,       // если SPA
  RespectsRobotsTxt:       true,       // обязательно для scraper'ов
  Marketplace: Marketplace{
    IsMarketplace:        true,        // если сайт — агрегатор
    SellerExports:        true,
    CrossSellerInventory: false,
  },
  // ...
}

Observation от html_scrape по умолчанию помечается reliability=degraded (scraping менее надёжен чем API).

Конфигурация (пример smart-shop.pro)

connectors:
  smart-shop-pro:
    enabled: true
    base_url: "https://smart-shop.pro"
    transport: html_scrape
    renders_csr: true
    compliance:
      robots_txt_checked_at: 2026-04-18
      tos_analysis_ref: "compliance/smart-shop-pro/tos-analysis.md"
      written_permission_ref: "compliance/smart-shop-pro/permission-2026.pdf"
    user_agent: "Tracium-Bot/1.0 (+https://tracium.tld/bot; contact=catalog-ops@tracium.tld)"
    headless:
      image: mcr.microsoft.com/playwright/go:latest
      cpu: 500m
      memory: 1Gi
      page_timeout: 30s
      block_resource_types: [image, font, media, stylesheet]
    rate_limit:
      auth:
        login:
          rate: 1
          period: 300s
          mode: wait
      data:
        listing:
          rate: 1
          period: 2s
          mode: skip_and_reschedule
        itemcard:
          rate: 1
          period: 2s
          mode: skip_and_reschedule
    pagination:
      kind: query_param
      param: page
      start: 1
    url_templates:
      category: "/catalogs/{group_code}/{category_code}"
      itemcard: "/itemcard/{brand}/{article}"
      analogs:  "/item/{composite_id}/analogs"
    selectors_ref: "connectors/smart-shop-pro/selectors.yaml"
    marketplace:
      enabled: true
      seller_row_selector: ".seller-offer-row"
      default_seller_trust: unverified
    capabilities:
      prices: true
      stock: true
      stock_forecast: false
      characteristics: true
      media: true
      code_types: [supplier, manufacturer]
      price_set_shape: [gross]
    egress:
      user_agent_signature_required: true

Markup drift detection

Parser сверяет с baseline:

  • количество item_row на listing ≥ min_threshold;
  • presence обязательных полей (price, article, title);
  • regex match price format \d[\d\s]*([,.]\d+)?\s*(?:руб|₽);
  • seller block detection consistency.

При drift — ConnectorError.Permanent + alert connector_markup_drift_total{supplier}. Никакого guessing — fail-fast.

Ethical crawling

  • Identified User-Agent (выше).
  • Conditional requests (If-Modified-Since / ETag где поддерживается).
  • No parallel crawl внутри одного host’а — один worker на хост.
  • Sitemap-first, если доступен.
  • Skip auth-walls если TOS неоднозначен; customer credential — только по явному consent customer’а.
  • Honor 429 / 503 / Retry-After безусловно.

Общие требования ко всем паттернам

  1. Идемпотентность: повторная обработка того же payload’а не создаёт дублей.
  2. Observability: структурированные логи с supplier_id, correlation_id, метрики fetch success/failure, latency, rate-limit hits.
  3. Graceful failure: падение одного коннектора не влияет на остальные.
  4. No silent drops: необработанные payload’ы копятся в dead-letter queue с алертом.
  5. Raw retention: сырые payload’ы хранятся минимум 30 дней (точная политика — в open questions).
  6. Connector contract compliance: все 9 пунктов ADR-0024 выполняются (SessionManager, двухуровневый rate, SKU identity pack, error taxonomy, async-report, media rights, capabilities, egress, provider contract tests).
  7. Marketplace awareness (ADR-0026): если Capabilities.Marketplace.IsMarketplace=true — один fetch генерирует N observations с seller_ref. Core обрабатывает эту размерность транспарентно.
  8. Scraper compliance (ADR-0027): для TransportKind=html_scrape обязательны robots.txt / TOS / written permission артефакты + identified User-Agent.

См. также ../50-processes/adding-new-supplier.md, ADR-0024, ADR-0026, ADR-0027.