diff --git a/embassy.go b/embassy.go index ce08e52..21ca5c8 100644 --- a/embassy.go +++ b/embassy.go @@ -5,6 +5,7 @@ import ( "fmt" honeybee "git.wisehodl.dev/jay/go-honeybee/outbound" "git.wisehodl.dev/jay/go-mana-component" + "git.wisehodl.dev/jay/go-mana-prism/observer" "git.wisehodl.dev/jay/go-roots-ws" "log/slog" "sync" @@ -54,50 +55,58 @@ type InboxMessage struct { ReceivedAt time.Time } -type Embassy struct { - pool EmbassyPlugin - envoys map[string]*Envoy - eventSubs map[string]chan<- OutboundPoolEvent - inboxSubs map[string]chan<- InboxMessage +// ---------------------------------------------------------------------------- +// Options +// ---------------------------------------------------------------------------- - ctx context.Context - cancel context.CancelFunc - mu sync.RWMutex - wg sync.WaitGroup - handler slog.Handler - logger *slog.Logger +type EmbassyConfig struct { + handler slog.Handler + observer observer.Observer } -type Envoy struct { - url string - connected bool - terminate func() - queue chan []byte - send func(data []byte) error - events <-chan OutboundPoolEvent - inbox <-chan InboxMessage - eventSubs []chan<- OutboundPoolEvent - labelledInboxSubs map[string][]chan<- InboxMessage - inboxSubs []chan<- InboxMessage +type EmbassyOption func(*EmbassyConfig) - ctx context.Context - cancel context.CancelFunc - mu sync.RWMutex - wg sync.WaitGroup - handler slog.Handler - logger *slog.Logger +func WithEmbassyHandler(h slog.Handler) EmbassyOption { + return func(c *EmbassyConfig) { + c.handler = h + } +} + +func WithEmbassyObserver(o observer.Observer) EmbassyOption { + return func(c *EmbassyConfig) { + c.observer = o + } } // ---------------------------------------------------------------------------- // Embassy // ---------------------------------------------------------------------------- +type Embassy struct { + pool EmbassyPlugin + envoys map[string]*Envoy + eventSubs map[string]chan<- OutboundPoolEvent + inboxSubs map[string]chan<- InboxMessage + + ctx context.Context + cancel context.CancelFunc + mu sync.RWMutex + wg sync.WaitGroup + observer observer.Observer + handler slog.Handler + logger *slog.Logger +} + func NewEmbassy( ctx context.Context, pool EmbassyPlugin, - handler slog.Handler, + opts ...EmbassyOption, ) *Embassy { ctx, cancel := context.WithCancel(component.MustNew(ctx, "prism", "embassy")) + cfg := EmbassyConfig{observer: observer.NullObserver{}} + for _, o := range opts { + o(&cfg) + } e := &Embassy{ pool: pool, @@ -106,12 +115,13 @@ func NewEmbassy( inboxSubs: make(map[string]chan<- InboxMessage), ctx: ctx, cancel: cancel, + observer: cfg.observer, + handler: cfg.handler, } - if handler != nil { + if e.handler != nil { comp := component.FromContext(ctx) - e.handler = handler - e.logger = slog.New(handler).With(slog.Any("component", comp)) + e.logger = slog.New(cfg.handler).With(slog.Any("component", comp)) } e.wg.Add(2) @@ -144,7 +154,7 @@ func (e *Embassy) Dispatch(url string) error { events := e.subscribeEventsLock(url) inbox := e.subscribeInboxLock(url) - e.envoys[url] = newEnvoy(e.ctx, url, terminate, send, events, inbox, e.handler) + e.envoys[url] = newEnvoy(e.ctx, url, terminate, send, events, inbox, e.observer, e.handler) e.mu.Unlock() @@ -301,6 +311,27 @@ func (e *Embassy) routeInbox() { // Envoy // ---------------------------------------------------------------------------- +type Envoy struct { + url string + connected bool + terminate func() + queue chan []byte + send func(data []byte) error + events <-chan OutboundPoolEvent + inbox <-chan InboxMessage + eventSubs []chan<- OutboundPoolEvent + labelledInboxSubs map[string][]chan<- InboxMessage + inboxSubs []chan<- InboxMessage + + ctx context.Context + cancel context.CancelFunc + mu sync.RWMutex + wg sync.WaitGroup + observer observer.Observer + handler slog.Handler + logger *slog.Logger +} + func newEnvoy( ctx context.Context, url string, @@ -308,6 +339,7 @@ func newEnvoy( send func(data []byte) error, events <-chan OutboundPoolEvent, inbox <-chan InboxMessage, + observer observer.Observer, handler slog.Handler, ) *Envoy { ctx, cancel := context.WithCancel(component.MustExtend(ctx, "envoy")) @@ -322,11 +354,12 @@ func newEnvoy( labelledInboxSubs: make(map[string][]chan<- InboxMessage), ctx: ctx, cancel: cancel, + observer: observer, + handler: handler, } if handler != nil { comp := component.FromContext(ctx) - e.handler = handler e.logger = slog.New(handler).With(slog.Any("component", comp)).With("peer", url) } @@ -351,6 +384,10 @@ func (e *Envoy) Handler() slog.Handler { return e.handler } +func (e *Envoy) Observer() observer.Observer { + return e.observer +} + func (e *Envoy) Dismiss() { e.terminate() e.cancel() diff --git a/embassy_test.go b/embassy_test.go index bee0f4e..2dbf1b7 100644 --- a/embassy_test.go +++ b/embassy_test.go @@ -12,7 +12,7 @@ import ( func TestEmbassy_Dispatch(t *testing.T) { p := newMockPool(t) - embassy := NewEmbassy(p.ctx, p.plugin, nil) + embassy := NewEmbassy(p.ctx, p.plugin) embassy.Dispatch(p.url) envoy := embassy.Call(p.url) assert.NotNil(t, envoy) diff --git a/envoy_test.go b/envoy_test.go index 8c020ab..faaca82 100644 --- a/envoy_test.go +++ b/envoy_test.go @@ -21,7 +21,7 @@ func TestEnvoy_Dismiss(t *testing.T) { terminated = true } - envoy := newEnvoy(ctx, url, terminate, nil, nil, nil, nil) + envoy := newEnvoy(ctx, url, terminate, nil, nil, nil, nil, nil) eventSub := envoy.SubscribeEvents() inboxSub := envoy.SubscribeInbox([]string{"A", "B"}) @@ -48,7 +48,7 @@ func TestEnvoy_Send(t *testing.T) { return nil } - envoy := newEnvoy(ctx, url, nil, send, nil, nil, nil) + envoy := newEnvoy(ctx, url, nil, send, nil, nil, nil, nil) envoy.Send([]byte("hello")) Eventually(t, func() bool { @@ -62,7 +62,7 @@ func TestEnvoy_IsConnected(t *testing.T) { url := "wss://test" events := make(chan OutboundPoolEvent) - envoy := newEnvoy(ctx, url, nil, nil, events, nil, nil) + envoy := newEnvoy(ctx, url, nil, nil, events, nil, nil, nil) eventSub := envoy.SubscribeEvents() gotEvents := []OutboundPoolEvent{} @@ -88,7 +88,7 @@ func TestEnvoy_Events(t *testing.T) { url := "wss://test" events := make(chan OutboundPoolEvent) - envoy := newEnvoy(ctx, url, nil, nil, events, nil, nil) + envoy := newEnvoy(ctx, url, nil, nil, events, nil, nil, nil) eventSub := envoy.SubscribeEvents() gotEvents := []OutboundPoolEvent{} @@ -120,7 +120,7 @@ func TestEnvoy_Inbox(t *testing.T) { url := "wss://test" inbox := make(chan InboxMessage) - envoy := newEnvoy(ctx, url, nil, nil, nil, inbox, nil) + envoy := newEnvoy(ctx, url, nil, nil, nil, inbox, nil, nil) inboxSub := envoy.SubscribeInbox([]string{"EVENT"}) gotInbox := []InboxMessage{} diff --git a/helpers_test.go b/helpers_test.go index 45d2c11..fa67a3d 100644 --- a/helpers_test.go +++ b/helpers_test.go @@ -96,7 +96,7 @@ func newMockEnvoy(t *testing.T) (*mockPool, *Envoy) { t.Helper() p := newMockPool(t) - emb := NewEmbassy(p.ctx, p.plugin, nil) + emb := NewEmbassy(p.ctx, p.plugin) err := emb.Dispatch(p.url) assert.NoError(t, err) envoy := emb.Call(p.url) diff --git a/peerstat/sink.go b/observer/observer.go similarity index 52% rename from peerstat/sink.go rename to observer/observer.go index 35694e6..c570785 100644 --- a/peerstat/sink.go +++ b/observer/observer.go @@ -1,19 +1,19 @@ -// Package peerstat defines the contract for collecting peer-level statistics. -package peerstat +// Package observer defines the contract for collecting peer-level statistics. +package observer -// Sink is the interface through which add-ons report statistics. +// Observer is the interface through which add-ons report statistics. // Add-ons should call Record() with a peer ID and a typed event when // significant events occur. The event parameter is expected to be a // domain-specific struct defined in the add-on's package. -type Sink interface { +type Observer interface { // Record reports a statistic event for a given peer. // The event parameter contains the domain-specific details. Record(peerID string, event any) } -// NoopSink is a null implementation of Sink that does nothing. +// NullObserver is a null implementation of Observer that does nothing. // It is used when no actual statistics collector is wired in. -type NoopSink struct{} +type NullObserver struct{} -// Record implements Sink for NoopSink, doing nothing. -func (NoopSink) Record(_ string, _ any) {} +// Record implements Observer for NullObserver, doing nothing. +func (NullObserver) Record(_ string, _ any) {} diff --git a/peerstat/sink_test.go b/observer/observer_test.go similarity index 54% rename from peerstat/sink_test.go rename to observer/observer_test.go index 5fc8390..a2989bb 100644 --- a/peerstat/sink_test.go +++ b/observer/observer_test.go @@ -1,15 +1,15 @@ -package peerstat +package observer import ( "testing" ) -func TestNoopSink(t *testing.T) { - // Test that NoopSink implements the Sink interface - var _ Sink = NoopSink{} +func TestNullObserver(t *testing.T) { + // Test that NullObserver implements the Observer interface + var _ Observer = NullObserver{} // Test that calling Record doesn't panic or crash - sink := NoopSink{} + sink := NullObserver{} sink.Record("peer1", "test event") sink.Record("", nil) diff --git a/request.go b/request.go index 444916f..a4435a7 100644 --- a/request.go +++ b/request.go @@ -6,6 +6,7 @@ import ( "encoding/base32" "fmt" "git.wisehodl.dev/jay/go-mana-component" + "git.wisehodl.dev/jay/go-mana-prism/observer" "git.wisehodl.dev/jay/go-roots-ws" "log/slog" "sync" @@ -28,21 +29,6 @@ type ReqClosed struct { Data string } -type RequestManager struct { - reqs map[string]*request - - envoy *Envoy - events <-chan OutboundPoolEvent - inbox <-chan InboxMessage - - ctx context.Context - cancel context.CancelFunc - mu sync.RWMutex - wg sync.WaitGroup - handler slog.Handler - logger *slog.Logger -} - // ---------------------------------------------------------------------------- // ID Generation // ---------------------------------------------------------------------------- @@ -80,6 +66,26 @@ func WithLabel(label string) RequestOption { return func(o *requestOptions) { o.label = label } } +// ---------------------------------------------------------------------------- +// Request Manager +// ---------------------------------------------------------------------------- + +type RequestManager struct { + reqs map[string]*request + + envoy *Envoy + events <-chan OutboundPoolEvent + inbox <-chan InboxMessage + + ctx context.Context + cancel context.CancelFunc + mu sync.RWMutex + wg sync.WaitGroup + observer observer.Observer + handler slog.Handler + logger *slog.Logger +} + type request struct { id string filters [][]byte @@ -95,10 +101,6 @@ type request struct { closedOnce sync.Once } -// ---------------------------------------------------------------------------- -// Request Manager -// ---------------------------------------------------------------------------- - func NewRequestManager(e *Envoy) *RequestManager { ctx, cancel := context.WithCancel( component.MustExtend(e.Context(), "request_manager")) @@ -110,14 +112,15 @@ func NewRequestManager(e *Envoy) *RequestManager { events: e.SubscribeEvents(), inbox: e.SubscribeInbox([]string{"EVENT", "EOSE", "CLOSED"}), - ctx: ctx, - cancel: cancel, + ctx: ctx, + cancel: cancel, + observer: e.Observer(), + handler: e.Handler(), } - if h := e.Handler(); h != nil { + if m.handler != nil { comp := component.FromContext(ctx) - m.handler = h - m.logger = slog.New(h).With(slog.Any("component", comp)) + m.logger = slog.New(m.handler).With(slog.Any("component", comp)) } m.wg.Add(2)