Relay

Custom Store

Implement your own store backend.

Relay's store layer is interface-based. You can implement a custom backend by satisfying the store.Store interface.

The interface

type Store interface {
    catalog.Store
    endpoint.Store
    event.Store
    delivery.Store
    dlq.Store

    Migrate(ctx context.Context) error
    Ping(ctx context.Context) error
    Close() error
}

This composes five subsystem interfaces plus three lifecycle methods.

Subsystem interfaces

catalog.Store

type Store interface {
    RegisterType(ctx context.Context, et *EventType) error
    GetType(ctx context.Context, name string) (*EventType, error)
    GetTypeByID(ctx context.Context, etID id.ID) (*EventType, error)
    ListTypes(ctx context.Context, opts ListOpts) ([]*EventType, error)
    MatchTypes(ctx context.Context, eventType string) ([]*EventType, error)
    DeleteType(ctx context.Context, name string) error
}

endpoint.Store

type Store interface {
    CreateEndpoint(ctx context.Context, ep *Endpoint) error
    GetEndpoint(ctx context.Context, epID id.ID) (*Endpoint, error)
    UpdateEndpoint(ctx context.Context, ep *Endpoint) error
    DeleteEndpoint(ctx context.Context, epID id.ID) error
    ListEndpoints(ctx context.Context, tenantID string, opts ListOpts) ([]*Endpoint, error)
    SetEnabled(ctx context.Context, epID id.ID, enabled bool) error
    Resolve(ctx context.Context, tenantID string, eventType string) ([]*Endpoint, error)
}

delivery.Store

type Store interface {
    EnqueueBatch(ctx context.Context, deliveries []*Delivery) error
    Dequeue(ctx context.Context, limit int) ([]*Delivery, error)
    GetDelivery(ctx context.Context, delID id.ID) (*Delivery, error)
    UpdateDelivery(ctx context.Context, d *Delivery) error
    ListByEndpoint(ctx context.Context, epID id.ID, opts ListOpts) ([]*Delivery, error)
    ListByEvent(ctx context.Context, evtID id.ID, opts ListOpts) ([]*Delivery, error)
}

Implementation tips

  1. Start with the memory store source as a reference (store/memory/store.go).
  2. The Resolve() method must filter by tenant ID, enabled status, and match event type patterns against endpoint subscriptions.
  3. Dequeue() should atomically claim pending deliveries whose NextAttemptAt is in the past.
  4. Migrate() should be idempotent (safe to call multiple times).
  5. Run the existing test suite against your implementation to verify correctness.

On this page