Фреймворк коннекторов
NOTE
Статус: Target service boundary. Документ описывает целевую сервисную границу. Код либо полностью отсутствует, либо существует только как scaffold — смотрите секцию «Статус документа» ниже для точного указания на код. Правила маркировки — в
50-processes/documentation-standard.md.
Этот документ описывает целевой connector framework внутри ingestion. Он нужен, чтобы проектировать новые интеграции поставщиков и не путать общую connector-модель с текущим состоянием репозитория, где выделенного ingestor runtime пока нет.
Интерфейс
package connector
import (
"context"
"time"
)
// Connector — интерфейс, реализуемый каждым модулем поставщика.
// Connector НЕ выбирает credential сам — ему передают CredentialContext.
type Connector interface {
ID() string
Capabilities() Capabilities
Schema() SchemaDescriptor
Fetch(ctx context.Context, cred CredentialContext, job FetchJob) (<-chan RawPayload, error)
// Validate проверяет работоспособность credential через тестовый запрос.
// Используется при создании credential и периодически.
Validate(ctx context.Context, cred CredentialContext) error
}
type CredentialContext struct {
CredentialID string
Scope CredentialScope // system | customer
CustomerID string // если scope=customer
SecretRef string // secret://connectors/<supplier>/<scope>
RateLimitBucket string // ключ rate bucket'а для этой credential
WarehouseScope []string // ограничение по складам, если есть
Features []string // capabilities, разрешённые этой credential
}
type CredentialScope string
const (
ScopeSystem CredentialScope = "system"
ScopeCustomer CredentialScope = "customer"
)
type Capabilities struct {
Catalog bool
Characteristics bool
Prices bool
Stock bool
AsyncJobs bool
Webhooks bool
IncrementalSince bool
MultiCredential bool // умеет ли коннектор корректно обрабатывать несколько credentials параллельно
}
type FetchJob struct {
Type FetchJobType
Window TimeWindow
Filters map[string]any
DryRun bool
}
type FetchJobType string
const (
FetchFullCatalog FetchJobType = "full_catalog"
FetchIncremental FetchJobType = "incremental"
FetchPriceBatch FetchJobType = "price_batch"
FetchStockBatch FetchJobType = "stock_batch"
FetchSingleSKU FetchJobType = "single_sku"
)
type RawPayload struct {
SupplierSKU string
PayloadType string // "characteristics" | "price" | "stock" | "catalog-line"
Data []byte
FetchedAt time.Time
Source SourceRef
CredentialID string // через какую credential получено — важно для observation routing
}
type SourceRef struct {
Endpoint string
Parameters map[string]string
HTTPStatus int
}
type SchemaDescriptor struct {
// Машиночитаемое описание: какие поля отдаёт поставщик, какие типы.
Fields []FieldDescriptor
}Маршрутизация credentials (вне коннектора)
Коннектор сам не выбирает credential — это ответственность CredentialRouter в целевом ingestion-модуле. Логика:
- Catalog discovery → роутер возвращает следующую credential по round-robin среди
allowed_use IN (own, shared_for_catalog). - Pricing/Stock для customer → credential customer’а (если есть) или системная (fallback).
- Системные нужды → системная credential.
Коннектор получает готовый CredentialContext, открывает по нему сессию (или переиспользует), делает запрос и возвращает payload с пометкой CredentialID.
Жизненный цикл
- Регистрация — в целевом
backend/cmd/ingestor/main.goвызываетсяregistry.Register(...). - Запуск — коннектор получает контекст и начинает слушать triggers из scheduler / webhook / ручного запуска.
- Fetch — по каждому вызову
Fetch()возвращает потокRawPayload’ов. - Shutdown — graceful: завершить текущие задачи, вернуть активные сессии.
Стандартная инфраструктура, доступная коннекторам
Через backend/internal/platform/*:
platform/rate-limiter— централизованный token bucket на Redis.platform/messaging— Kafka producer.platform/storage— PG, S3 helpers.platform/observability— OTel tracer, метрики, logger.platform/auth— загрузка и расшифровка credentials поставщиков.
Общие паттерны в base-пакете
В целевом backend/internal/connectors/base/:
SessionManager— общий паттерн «login + refresh».RateLimitedHTTPClient— обёртка над http.Client с автоматическим acquire из rate limiter.AsyncJobPoller— generic polling с backoff.RawPayloadWriter— writer в S3 + publish в Kafka.
Коннектор реализует специфичное, переиспользует общее.
Тестирование
Каждый коннектор обязан иметь:
- Unit-тесты парсинга (с golden fixtures из реальных payload’ов).
- Integration-тест против mock-сервера (моки API поставщика).
- Compatibility-тест: раз в N дней прогон на sandbox-окружении поставщика (если есть test-credentials).
Правила (нужно / нельзя)
Нужно:
- Выделять слои domain, app, infra в каждом коннекторе.
- Писать тесты парсинга на golden fixtures.
- Собирать метрики
<supplier>_<endpoint>_*. - Логировать correlation_id.
Нельзя:
- Обходить rate limiter.
- Писать напрямую в ES / ClickHouse / canonical-таблицы.
- Ходить в ядро: связь только через Kafka-события.
- Хардкодить credentials — только через
platform/auth.