Фреймворк коннекторов

NOTE

Статус: Target service boundary. Документ описывает целевую сервисную границу. Код либо полностью отсутствует, либо существует только как scaffold — смотрите секцию «Статус документа» ниже для точного указания на код. Правила маркировки — в 50-processes/documentation-standard.md.

Этот документ описывает целевой connector framework внутри ingestion. Он нужен, чтобы проектировать новые интеграции поставщиков и не путать общую connector-модель с текущим состоянием репозитория, где выделенного ingestor runtime пока нет.

Интерфейс

package connector
 
import (
    "context"
    "time"
)
 
// Connector — интерфейс, реализуемый каждым модулем поставщика.
// Connector НЕ выбирает credential сам — ему передают CredentialContext.
type Connector interface {
    ID() string
    Capabilities() Capabilities
    Schema() SchemaDescriptor
    Fetch(ctx context.Context, cred CredentialContext, job FetchJob) (<-chan RawPayload, error)
 
    // Validate проверяет работоспособность credential через тестовый запрос.
    // Используется при создании credential и периодически.
    Validate(ctx context.Context, cred CredentialContext) error
}
 
type CredentialContext struct {
    CredentialID    string
    Scope           CredentialScope    // system | customer
    CustomerID      string             // если scope=customer
    SecretRef       string             // secret://connectors/<supplier>/<scope>
    RateLimitBucket string             // ключ rate bucket'а для этой credential
    WarehouseScope  []string           // ограничение по складам, если есть
    Features        []string           // capabilities, разрешённые этой credential
}
 
type CredentialScope string
 
const (
    ScopeSystem   CredentialScope = "system"
    ScopeCustomer CredentialScope = "customer"
)
 
type Capabilities struct {
    Catalog          bool
    Characteristics  bool
    Prices           bool
    Stock            bool
    AsyncJobs        bool
    Webhooks         bool
    IncrementalSince bool
    MultiCredential  bool   // умеет ли коннектор корректно обрабатывать несколько credentials параллельно
}
 
type FetchJob struct {
    Type     FetchJobType
    Window   TimeWindow
    Filters  map[string]any
    DryRun   bool
}
 
type FetchJobType string
 
const (
    FetchFullCatalog    FetchJobType = "full_catalog"
    FetchIncremental    FetchJobType = "incremental"
    FetchPriceBatch     FetchJobType = "price_batch"
    FetchStockBatch     FetchJobType = "stock_batch"
    FetchSingleSKU      FetchJobType = "single_sku"
)
 
type RawPayload struct {
    SupplierSKU  string
    PayloadType  string // "characteristics" | "price" | "stock" | "catalog-line"
    Data         []byte
    FetchedAt    time.Time
    Source       SourceRef
    CredentialID string  // через какую credential получено — важно для observation routing
}
 
type SourceRef struct {
    Endpoint   string
    Parameters map[string]string
    HTTPStatus int
}
 
type SchemaDescriptor struct {
    // Машиночитаемое описание: какие поля отдаёт поставщик, какие типы.
    Fields []FieldDescriptor
}

Маршрутизация credentials (вне коннектора)

Коннектор сам не выбирает credential — это ответственность CredentialRouter в целевом ingestion-модуле. Логика:

  • Catalog discovery → роутер возвращает следующую credential по round-robin среди allowed_use IN (own, shared_for_catalog).
  • Pricing/Stock для customer → credential customer’а (если есть) или системная (fallback).
  • Системные нужды → системная credential.

Коннектор получает готовый CredentialContext, открывает по нему сессию (или переиспользует), делает запрос и возвращает payload с пометкой CredentialID.

Жизненный цикл

  1. Регистрация — в целевом backend/cmd/ingestor/main.go вызывается registry.Register(...).
  2. Запуск — коннектор получает контекст и начинает слушать triggers из scheduler / webhook / ручного запуска.
  3. Fetch — по каждому вызову Fetch() возвращает поток RawPayload’ов.
  4. Shutdown — graceful: завершить текущие задачи, вернуть активные сессии.

Стандартная инфраструктура, доступная коннекторам

Через backend/internal/platform/*:

  • platform/rate-limiter — централизованный token bucket на Redis.
  • platform/messaging — Kafka producer.
  • platform/storage — PG, S3 helpers.
  • platform/observability — OTel tracer, метрики, logger.
  • platform/auth — загрузка и расшифровка credentials поставщиков.

Общие паттерны в base-пакете

В целевом backend/internal/connectors/base/:

  • SessionManager — общий паттерн «login + refresh».
  • RateLimitedHTTPClient — обёртка над http.Client с автоматическим acquire из rate limiter.
  • AsyncJobPoller — generic polling с backoff.
  • RawPayloadWriter — writer в S3 + publish в Kafka.

Коннектор реализует специфичное, переиспользует общее.

Тестирование

Каждый коннектор обязан иметь:

  • Unit-тесты парсинга (с golden fixtures из реальных payload’ов).
  • Integration-тест против mock-сервера (моки API поставщика).
  • Compatibility-тест: раз в N дней прогон на sandbox-окружении поставщика (если есть test-credentials).

Правила (нужно / нельзя)

Нужно:

  • Выделять слои domain, app, infra в каждом коннекторе.
  • Писать тесты парсинга на golden fixtures.
  • Собирать метрики <supplier>_<endpoint>_*.
  • Логировать correlation_id.

Нельзя:

  • Обходить rate limiter.
  • Писать напрямую в ES / ClickHouse / canonical-таблицы.
  • Ходить в ядро: связь только через Kafka-события.
  • Хардкодить credentials — только через platform/auth.