incorporate observer interface into components

This commit is contained in:
Jay
2026-05-19 21:20:00 -04:00
parent ce0b13e914
commit 30e9881dae
7 changed files with 118 additions and 78 deletions
+71 -34
View File
@@ -5,6 +5,7 @@ import (
"fmt" "fmt"
honeybee "git.wisehodl.dev/jay/go-honeybee/outbound" honeybee "git.wisehodl.dev/jay/go-honeybee/outbound"
"git.wisehodl.dev/jay/go-mana-component" "git.wisehodl.dev/jay/go-mana-component"
"git.wisehodl.dev/jay/go-mana-prism/observer"
"git.wisehodl.dev/jay/go-roots-ws" "git.wisehodl.dev/jay/go-roots-ws"
"log/slog" "log/slog"
"sync" "sync"
@@ -54,50 +55,58 @@ type InboxMessage struct {
ReceivedAt time.Time ReceivedAt time.Time
} }
type Embassy struct { // ----------------------------------------------------------------------------
pool EmbassyPlugin // Options
envoys map[string]*Envoy // ----------------------------------------------------------------------------
eventSubs map[string]chan<- OutboundPoolEvent
inboxSubs map[string]chan<- InboxMessage
ctx context.Context type EmbassyConfig struct {
cancel context.CancelFunc handler slog.Handler
mu sync.RWMutex observer observer.Observer
wg sync.WaitGroup
handler slog.Handler
logger *slog.Logger
} }
type Envoy struct { type EmbassyOption func(*EmbassyConfig)
url string
connected bool
terminate func()
queue chan []byte
send func(data []byte) error
events <-chan OutboundPoolEvent
inbox <-chan InboxMessage
eventSubs []chan<- OutboundPoolEvent
labelledInboxSubs map[string][]chan<- InboxMessage
inboxSubs []chan<- InboxMessage
ctx context.Context func WithEmbassyHandler(h slog.Handler) EmbassyOption {
cancel context.CancelFunc return func(c *EmbassyConfig) {
mu sync.RWMutex c.handler = h
wg sync.WaitGroup }
handler slog.Handler }
logger *slog.Logger
func WithEmbassyObserver(o observer.Observer) EmbassyOption {
return func(c *EmbassyConfig) {
c.observer = o
}
} }
// ---------------------------------------------------------------------------- // ----------------------------------------------------------------------------
// Embassy // Embassy
// ---------------------------------------------------------------------------- // ----------------------------------------------------------------------------
type Embassy struct {
pool EmbassyPlugin
envoys map[string]*Envoy
eventSubs map[string]chan<- OutboundPoolEvent
inboxSubs map[string]chan<- InboxMessage
ctx context.Context
cancel context.CancelFunc
mu sync.RWMutex
wg sync.WaitGroup
observer observer.Observer
handler slog.Handler
logger *slog.Logger
}
func NewEmbassy( func NewEmbassy(
ctx context.Context, ctx context.Context,
pool EmbassyPlugin, pool EmbassyPlugin,
handler slog.Handler, opts ...EmbassyOption,
) *Embassy { ) *Embassy {
ctx, cancel := context.WithCancel(component.MustNew(ctx, "prism", "embassy")) ctx, cancel := context.WithCancel(component.MustNew(ctx, "prism", "embassy"))
cfg := EmbassyConfig{observer: observer.NullObserver{}}
for _, o := range opts {
o(&cfg)
}
e := &Embassy{ e := &Embassy{
pool: pool, pool: pool,
@@ -106,12 +115,13 @@ func NewEmbassy(
inboxSubs: make(map[string]chan<- InboxMessage), inboxSubs: make(map[string]chan<- InboxMessage),
ctx: ctx, ctx: ctx,
cancel: cancel, cancel: cancel,
observer: cfg.observer,
handler: cfg.handler,
} }
if handler != nil { if e.handler != nil {
comp := component.FromContext(ctx) comp := component.FromContext(ctx)
e.handler = handler e.logger = slog.New(cfg.handler).With(slog.Any("component", comp))
e.logger = slog.New(handler).With(slog.Any("component", comp))
} }
e.wg.Add(2) e.wg.Add(2)
@@ -144,7 +154,7 @@ func (e *Embassy) Dispatch(url string) error {
events := e.subscribeEventsLock(url) events := e.subscribeEventsLock(url)
inbox := e.subscribeInboxLock(url) inbox := e.subscribeInboxLock(url)
e.envoys[url] = newEnvoy(e.ctx, url, terminate, send, events, inbox, e.handler) e.envoys[url] = newEnvoy(e.ctx, url, terminate, send, events, inbox, e.observer, e.handler)
e.mu.Unlock() e.mu.Unlock()
@@ -301,6 +311,27 @@ func (e *Embassy) routeInbox() {
// Envoy // Envoy
// ---------------------------------------------------------------------------- // ----------------------------------------------------------------------------
type Envoy struct {
url string
connected bool
terminate func()
queue chan []byte
send func(data []byte) error
events <-chan OutboundPoolEvent
inbox <-chan InboxMessage
eventSubs []chan<- OutboundPoolEvent
labelledInboxSubs map[string][]chan<- InboxMessage
inboxSubs []chan<- InboxMessage
ctx context.Context
cancel context.CancelFunc
mu sync.RWMutex
wg sync.WaitGroup
observer observer.Observer
handler slog.Handler
logger *slog.Logger
}
func newEnvoy( func newEnvoy(
ctx context.Context, ctx context.Context,
url string, url string,
@@ -308,6 +339,7 @@ func newEnvoy(
send func(data []byte) error, send func(data []byte) error,
events <-chan OutboundPoolEvent, events <-chan OutboundPoolEvent,
inbox <-chan InboxMessage, inbox <-chan InboxMessage,
observer observer.Observer,
handler slog.Handler, handler slog.Handler,
) *Envoy { ) *Envoy {
ctx, cancel := context.WithCancel(component.MustExtend(ctx, "envoy")) ctx, cancel := context.WithCancel(component.MustExtend(ctx, "envoy"))
@@ -322,11 +354,12 @@ func newEnvoy(
labelledInboxSubs: make(map[string][]chan<- InboxMessage), labelledInboxSubs: make(map[string][]chan<- InboxMessage),
ctx: ctx, ctx: ctx,
cancel: cancel, cancel: cancel,
observer: observer,
handler: handler,
} }
if handler != nil { if handler != nil {
comp := component.FromContext(ctx) comp := component.FromContext(ctx)
e.handler = handler
e.logger = slog.New(handler).With(slog.Any("component", comp)).With("peer", url) e.logger = slog.New(handler).With(slog.Any("component", comp)).With("peer", url)
} }
@@ -351,6 +384,10 @@ func (e *Envoy) Handler() slog.Handler {
return e.handler return e.handler
} }
func (e *Envoy) Observer() observer.Observer {
return e.observer
}
func (e *Envoy) Dismiss() { func (e *Envoy) Dismiss() {
e.terminate() e.terminate()
e.cancel() e.cancel()
+1 -1
View File
@@ -12,7 +12,7 @@ import (
func TestEmbassy_Dispatch(t *testing.T) { func TestEmbassy_Dispatch(t *testing.T) {
p := newMockPool(t) p := newMockPool(t)
embassy := NewEmbassy(p.ctx, p.plugin, nil) embassy := NewEmbassy(p.ctx, p.plugin)
embassy.Dispatch(p.url) embassy.Dispatch(p.url)
envoy := embassy.Call(p.url) envoy := embassy.Call(p.url)
assert.NotNil(t, envoy) assert.NotNil(t, envoy)
+5 -5
View File
@@ -21,7 +21,7 @@ func TestEnvoy_Dismiss(t *testing.T) {
terminated = true terminated = true
} }
envoy := newEnvoy(ctx, url, terminate, nil, nil, nil, nil) envoy := newEnvoy(ctx, url, terminate, nil, nil, nil, nil, nil)
eventSub := envoy.SubscribeEvents() eventSub := envoy.SubscribeEvents()
inboxSub := envoy.SubscribeInbox([]string{"A", "B"}) inboxSub := envoy.SubscribeInbox([]string{"A", "B"})
@@ -48,7 +48,7 @@ func TestEnvoy_Send(t *testing.T) {
return nil return nil
} }
envoy := newEnvoy(ctx, url, nil, send, nil, nil, nil) envoy := newEnvoy(ctx, url, nil, send, nil, nil, nil, nil)
envoy.Send([]byte("hello")) envoy.Send([]byte("hello"))
Eventually(t, func() bool { Eventually(t, func() bool {
@@ -62,7 +62,7 @@ func TestEnvoy_IsConnected(t *testing.T) {
url := "wss://test" url := "wss://test"
events := make(chan OutboundPoolEvent) events := make(chan OutboundPoolEvent)
envoy := newEnvoy(ctx, url, nil, nil, events, nil, nil) envoy := newEnvoy(ctx, url, nil, nil, events, nil, nil, nil)
eventSub := envoy.SubscribeEvents() eventSub := envoy.SubscribeEvents()
gotEvents := []OutboundPoolEvent{} gotEvents := []OutboundPoolEvent{}
@@ -88,7 +88,7 @@ func TestEnvoy_Events(t *testing.T) {
url := "wss://test" url := "wss://test"
events := make(chan OutboundPoolEvent) events := make(chan OutboundPoolEvent)
envoy := newEnvoy(ctx, url, nil, nil, events, nil, nil) envoy := newEnvoy(ctx, url, nil, nil, events, nil, nil, nil)
eventSub := envoy.SubscribeEvents() eventSub := envoy.SubscribeEvents()
gotEvents := []OutboundPoolEvent{} gotEvents := []OutboundPoolEvent{}
@@ -120,7 +120,7 @@ func TestEnvoy_Inbox(t *testing.T) {
url := "wss://test" url := "wss://test"
inbox := make(chan InboxMessage) inbox := make(chan InboxMessage)
envoy := newEnvoy(ctx, url, nil, nil, nil, inbox, nil) envoy := newEnvoy(ctx, url, nil, nil, nil, inbox, nil, nil)
inboxSub := envoy.SubscribeInbox([]string{"EVENT"}) inboxSub := envoy.SubscribeInbox([]string{"EVENT"})
gotInbox := []InboxMessage{} gotInbox := []InboxMessage{}
+1 -1
View File
@@ -96,7 +96,7 @@ func newMockEnvoy(t *testing.T) (*mockPool, *Envoy) {
t.Helper() t.Helper()
p := newMockPool(t) p := newMockPool(t)
emb := NewEmbassy(p.ctx, p.plugin, nil) emb := NewEmbassy(p.ctx, p.plugin)
err := emb.Dispatch(p.url) err := emb.Dispatch(p.url)
assert.NoError(t, err) assert.NoError(t, err)
envoy := emb.Call(p.url) envoy := emb.Call(p.url)
+8 -8
View File
@@ -1,19 +1,19 @@
// Package peerstat defines the contract for collecting peer-level statistics. // Package observer defines the contract for collecting peer-level statistics.
package peerstat package observer
// Sink is the interface through which add-ons report statistics. // Observer is the interface through which add-ons report statistics.
// Add-ons should call Record() with a peer ID and a typed event when // Add-ons should call Record() with a peer ID and a typed event when
// significant events occur. The event parameter is expected to be a // significant events occur. The event parameter is expected to be a
// domain-specific struct defined in the add-on's package. // domain-specific struct defined in the add-on's package.
type Sink interface { type Observer interface {
// Record reports a statistic event for a given peer. // Record reports a statistic event for a given peer.
// The event parameter contains the domain-specific details. // The event parameter contains the domain-specific details.
Record(peerID string, event any) Record(peerID string, event any)
} }
// NoopSink is a null implementation of Sink that does nothing. // NullObserver is a null implementation of Observer that does nothing.
// It is used when no actual statistics collector is wired in. // It is used when no actual statistics collector is wired in.
type NoopSink struct{} type NullObserver struct{}
// Record implements Sink for NoopSink, doing nothing. // Record implements Observer for NullObserver, doing nothing.
func (NoopSink) Record(_ string, _ any) {} func (NullObserver) Record(_ string, _ any) {}
@@ -1,15 +1,15 @@
package peerstat package observer
import ( import (
"testing" "testing"
) )
func TestNoopSink(t *testing.T) { func TestNullObserver(t *testing.T) {
// Test that NoopSink implements the Sink interface // Test that NullObserver implements the Observer interface
var _ Sink = NoopSink{} var _ Observer = NullObserver{}
// Test that calling Record doesn't panic or crash // Test that calling Record doesn't panic or crash
sink := NoopSink{} sink := NullObserver{}
sink.Record("peer1", "test event") sink.Record("peer1", "test event")
sink.Record("", nil) sink.Record("", nil)
+27 -24
View File
@@ -6,6 +6,7 @@ import (
"encoding/base32" "encoding/base32"
"fmt" "fmt"
"git.wisehodl.dev/jay/go-mana-component" "git.wisehodl.dev/jay/go-mana-component"
"git.wisehodl.dev/jay/go-mana-prism/observer"
"git.wisehodl.dev/jay/go-roots-ws" "git.wisehodl.dev/jay/go-roots-ws"
"log/slog" "log/slog"
"sync" "sync"
@@ -28,21 +29,6 @@ type ReqClosed struct {
Data string Data string
} }
type RequestManager struct {
reqs map[string]*request
envoy *Envoy
events <-chan OutboundPoolEvent
inbox <-chan InboxMessage
ctx context.Context
cancel context.CancelFunc
mu sync.RWMutex
wg sync.WaitGroup
handler slog.Handler
logger *slog.Logger
}
// ---------------------------------------------------------------------------- // ----------------------------------------------------------------------------
// ID Generation // ID Generation
// ---------------------------------------------------------------------------- // ----------------------------------------------------------------------------
@@ -80,6 +66,26 @@ func WithLabel(label string) RequestOption {
return func(o *requestOptions) { o.label = label } return func(o *requestOptions) { o.label = label }
} }
// ----------------------------------------------------------------------------
// Request Manager
// ----------------------------------------------------------------------------
type RequestManager struct {
reqs map[string]*request
envoy *Envoy
events <-chan OutboundPoolEvent
inbox <-chan InboxMessage
ctx context.Context
cancel context.CancelFunc
mu sync.RWMutex
wg sync.WaitGroup
observer observer.Observer
handler slog.Handler
logger *slog.Logger
}
type request struct { type request struct {
id string id string
filters [][]byte filters [][]byte
@@ -95,10 +101,6 @@ type request struct {
closedOnce sync.Once closedOnce sync.Once
} }
// ----------------------------------------------------------------------------
// Request Manager
// ----------------------------------------------------------------------------
func NewRequestManager(e *Envoy) *RequestManager { func NewRequestManager(e *Envoy) *RequestManager {
ctx, cancel := context.WithCancel( ctx, cancel := context.WithCancel(
component.MustExtend(e.Context(), "request_manager")) component.MustExtend(e.Context(), "request_manager"))
@@ -110,14 +112,15 @@ func NewRequestManager(e *Envoy) *RequestManager {
events: e.SubscribeEvents(), events: e.SubscribeEvents(),
inbox: e.SubscribeInbox([]string{"EVENT", "EOSE", "CLOSED"}), inbox: e.SubscribeInbox([]string{"EVENT", "EOSE", "CLOSED"}),
ctx: ctx, ctx: ctx,
cancel: cancel, cancel: cancel,
observer: e.Observer(),
handler: e.Handler(),
} }
if h := e.Handler(); h != nil { if m.handler != nil {
comp := component.FromContext(ctx) comp := component.FromContext(ctx)
m.handler = h m.logger = slog.New(m.handler).With(slog.Any("component", comp))
m.logger = slog.New(h).With(slog.Any("component", comp))
} }
m.wg.Add(2) m.wg.Add(2)