From a096450fc7a0d0142f2c3242879dce49eaf5548a Mon Sep 17 00:00:00 2001 From: Jay Date: Wed, 13 May 2026 16:51:09 -0400 Subject: [PATCH] refactor to peer-centric architecture --- .gitignore | 1 + adapter.go | 399 ------------------------ clerk.go | 209 ------------- clerk_test.go | 171 ----------- courier_test.go | 251 --------------- embassy.go | 467 ++++++++++++++++++++++++++++ embassy_test.go | 425 +++++++------------------- envoy_test.go | 151 +++++++++ journal.go | 327 -------------------- journal_test.go | 165 ---------- post.go | 537 -------------------------------- postmaster_test.go | 225 -------------- queryreq_test.go | 1 - req.go | 740 --------------------------------------------- reqmanager_test.go | 1 - request.go | 339 +++++++++++++++++++++ streamreq_test.go | 613 ------------------------------------- 17 files changed, 1063 insertions(+), 3959 deletions(-) create mode 100644 .gitignore delete mode 100644 adapter.go delete mode 100644 clerk.go delete mode 100644 clerk_test.go delete mode 100644 courier_test.go create mode 100644 embassy.go create mode 100644 envoy_test.go delete mode 100644 journal.go delete mode 100644 journal_test.go delete mode 100644 post.go delete mode 100644 postmaster_test.go delete mode 100644 queryreq_test.go delete mode 100644 req.go delete mode 100644 reqmanager_test.go create mode 100644 request.go delete mode 100644 streamreq_test.go diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..f3d4377 --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +draft diff --git a/adapter.go b/adapter.go deleted file mode 100644 index 85724b7..0000000 --- a/adapter.go +++ /dev/null @@ -1,399 +0,0 @@ -package prism - -import ( - "context" - "fmt" - "git.wisehodl.dev/jay/go-honeybee" - "git.wisehodl.dev/jay/go-mana-component" - "log/slog" - "sync" - "time" -) - -// ---------------------------------------------------------------------------- -// Types -// ---------------------------------------------------------------------------- - -type Envelope = []byte -type PoolSendFunc = func(id string, data Envelope) error - -// Pool Plugins - -type EmbassyPlugin struct { - Connect func(id string) error - Remove func(id string) error - Send PoolSendFunc - Events <-chan honeybee.OutboundPoolEvent -} - -type HotelPlugin struct { - Add func(id string, socket honeybee.Socket) error - Replace func(id string, socket honeybee.Socket) error - Remove func(id string) error - Send PoolSendFunc - Events <-chan honeybee.InboundPoolEvent -} - -// Events - -type PoolEventKind = int - -const ( - EventConnected PoolEventKind = iota - EventDisconnected - EventAdded - EventRemoved -) - -type PoolEvent struct { - ID string - Kind PoolEventKind - At time.Time -} - -func NewPoolEvent(id string, kind PoolEventKind, at time.Time) PoolEvent { - return PoolEvent{ID: id, Kind: kind, At: at} -} - -var convertPoolEvent = map[honeybee.OutboundPoolEventKind]PoolEventKind{ - honeybee.OutboundEventConnected: EventConnected, - honeybee.OutboundEventDisconnected: EventDisconnected, -} - -// Adapter - -type Adapter interface { - Peers() []string - HasPeer(id string) (string, bool) - IsConnected(id string) bool - Subscribe() <-chan PoolEvent - Send(id string, data Envelope) error -} - -// Embassy - -type Embassy struct { - pool EmbassyPlugin - peers map[string]bool // peerID: isConnected - journals chan JournalEntry - eventSubs []chan PoolEvent - - ctx context.Context - cancel context.CancelFunc - mu sync.RWMutex - wg sync.WaitGroup - logger *slog.Logger -} - -// Hotel - -type Hotel struct { - pool HotelPlugin - peers map[string]bool // peerID: isConnected - journals chan JournalEntry - eventSubs []chan PoolEvent - - ctx context.Context - cancel context.CancelFunc - mu sync.RWMutex - wg sync.WaitGroup - logger *slog.Logger -} - -// ---------------------------------------------------------------------------- -// Embassy (Outbound Adapter) -// ---------------------------------------------------------------------------- - -func NewEmbassy( - ctx context.Context, - pool EmbassyPlugin, - collector *JournalCollector, - handler slog.Handler, -) *Embassy { - ctx, cancel := context.WithCancel( - component.MustNew(ctx, "prism", "embassy")) - - e := &Embassy{ - pool: pool, - peers: make(map[string]bool), - eventSubs: make([]chan PoolEvent, 0), - ctx: ctx, - cancel: cancel, - } - - if collector != nil { - e.journals = make(chan JournalEntry, 16) - collector.Enroll(e.journals) - } - - if handler != nil { - c := component.FromContext(ctx) - e.logger = slog.New(handler).With(slog.Any("component", c)) - } - - e.wg.Add(1) - go e.runEventRouter() - - return e -} - -func (e *Embassy) Dispatch(url string) error { - url, err := honeybee.NormalizeURL(url) - if err != nil { - return fmt.Errorf("invalid url: %s", url) - } - - if err := e.pool.Connect(url); err != nil { - return fmt.Errorf("dispatch: %w", err) - } - - e.mu.Lock() - e.peers[url] = false - subs := e.eventSubs - e.mu.Unlock() - - at := time.Now() - if e.journals != nil { - c := component.FromContext(e.ctx) - select { - case <-e.ctx.Done(): - return fmt.Errorf("closing") - case e.journals <- NewPeerAddedJournal(url, c, PeerAddedData{At: at}): - } - } - - for _, ch := range subs { - select { - case <-e.ctx.Done(): - return fmt.Errorf("closing") - case ch <- NewPoolEvent(url, EventAdded, at): - } - } - - return nil -} - -func (e *Embassy) Dismiss(url string) error { - url, err := honeybee.NormalizeURL(url) - if err != nil { - return fmt.Errorf("invalid url: %s", url) - } - - if err := e.pool.Remove(url); err != nil { - return fmt.Errorf("dismiss: %w", err) - } - - e.mu.Lock() - delete(e.peers, url) - subs := e.eventSubs - e.mu.Unlock() - - at := time.Now() - if e.journals != nil { - c := component.FromContext(e.ctx) - select { - case <-e.ctx.Done(): - return fmt.Errorf("closing") - case e.journals <- NewPeerRemovedJournal(url, c, PeerRemovedData{At: at}): - } - } - - for _, ch := range subs { - select { - case <-e.ctx.Done(): - return fmt.Errorf("closing") - case ch <- NewPoolEvent(url, EventRemoved, at): - } - } - - return nil -} - -func (e *Embassy) Close() { - e.mu.Lock() - peers := e.peers - e.mu.Unlock() - - // dismiss peers - for peer, _ := range peers { - e.Dismiss(peer) - } - - e.cancel() - e.wg.Wait() - - e.mu.Lock() - // reset peers after dismissal - e.peers = make(map[string]bool) - - subs := e.eventSubs - e.eventSubs = make([]chan PoolEvent, 0) - e.mu.Unlock() - - // close subs - for _, sub := range subs { - close(sub) - } - - if e.journals != nil { - close(e.journals) - } -} - -func (e *Embassy) Peers() []string { - e.mu.RLock() - defer e.mu.RUnlock() - - peers := make([]string, 0, len(e.peers)) - for p, _ := range e.peers { - peers = append(peers, p) - } - return peers -} - -func (e *Embassy) HasPeer(url string) (string, bool) { - url, err := honeybee.NormalizeURL(url) - if err != nil { - return "", false - } - - e.mu.RLock() - defer e.mu.RUnlock() - - _, ok := e.peers[url] - return url, ok -} - -func (e *Embassy) IsConnected(url string) bool { - url, err := honeybee.NormalizeURL(url) - if err != nil { - return false - } - - e.mu.RLock() - defer e.mu.RUnlock() - - connected, _ := e.peers[url] - return connected -} - -func (e *Embassy) Subscribe() <-chan PoolEvent { - e.mu.Lock() - defer e.mu.Unlock() - - ch := make(chan PoolEvent, 16) - e.eventSubs = append(e.eventSubs, ch) - - return ch -} - -func (e *Embassy) Send(id string, data Envelope) error { - return nil -} - -// Internal - -func (e *Embassy) runEventRouter() { - defer e.wg.Done() - - for { - select { - case <-e.ctx.Done(): - return - case ev, ok := <-e.pool.Events: - if !ok { - return - } - - url, err := honeybee.NormalizeURL(ev.ID) - if err != nil { - continue - } - - if _, ok := e.HasPeer(url); !ok { - continue - } - - kind := convertPoolEvent[ev.Kind] - - e.mu.Lock() - switch kind { - case EventConnected: - e.peers[url] = true - case EventDisconnected: - e.peers[url] = false - } - subs := e.eventSubs - canJournal := e.journals != nil - e.mu.Unlock() - - if canJournal { - switch kind { - case EventConnected: - c := component.FromContext(e.ctx) - select { - case <-e.ctx.Done(): - case e.journals <- NewPeerConnectedJournal( - url, c, PeerConnectedData{At: ev.At}): - } - case EventDisconnected: - c := component.FromContext(e.ctx) - select { - case <-e.ctx.Done(): - case e.journals <- NewPeerDisconnectedJournal( - url, c, PeerDisconnectedData{At: ev.At}): - } - } - } - - for _, ch := range subs { - select { - case <-e.ctx.Done(): - case ch <- NewPoolEvent(url, kind, ev.At): - } - } - } - } -} - -// ---------------------------------------------------------------------------- -// Hotel (Inbound Adapter) -// ---------------------------------------------------------------------------- - -func NewHotel() *Hotel { - return nil -} - -func (h *Hotel) Welcome(id string, socket honeybee.Socket) error { - return nil -} - -func (h *Hotel) WelcomeBack(id string, socket honeybee.Socket) error { - return nil -} - -func (h *Hotel) Farewell(id string) error { - return nil -} - -func (h *Hotel) Close() {} - -func (h *Hotel) Peers() []string { - return nil -} - -func (h *Hotel) HasPeer(id string) (string, bool) { - return "", false -} - -func (h *Hotel) IsConnected(id string) bool { - return false -} - -func (h *Hotel) Subscribe() <-chan PoolEvent { - return nil -} - -func (h *Hotel) Send(id string, data Envelope) error { - return nil -} diff --git a/clerk.go b/clerk.go deleted file mode 100644 index e48a947..0000000 --- a/clerk.go +++ /dev/null @@ -1,209 +0,0 @@ -package prism - -import ( - "context" - "errors" - "fmt" - "git.wisehodl.dev/jay/go-honeybee" - "git.wisehodl.dev/jay/go-mana-component" - "git.wisehodl.dev/jay/go-roots-ws" - "log/slog" - "sync" - "time" -) - -// ---------------------------------------------------------------------------- -// Errors -// ---------------------------------------------------------------------------- - -var ( - ErrAlreadyStarted = errors.New("clerk already started") - ErrUnknownLabel = errors.New("unknown label") -) - -// ---------------------------------------------------------------------------- -// Types -// ---------------------------------------------------------------------------- - -// Letters - -type InboundLetter struct { - ID string - Data Envelope - At time.Time -} - -// Clerk - -type Clerk struct { - inbox <-chan honeybee.InboxMessage - - // wiring phase - mu sync.Mutex - started bool - pending []clerkSub - known map[string]struct{} - - // runtime phase - routes clerkRoutes - - ctx context.Context - cancel context.CancelFunc - wg sync.WaitGroup - logger *slog.Logger -} - -type clerkSub struct { - ch chan InboundLetter - labels map[string]struct{} -} - -type clerkRoutes = map[string][]chan InboundLetter - -// ---------------------------------------------------------------------------- -// Clerk -// ---------------------------------------------------------------------------- - -func NewClerk( - ctx context.Context, - inbox <-chan honeybee.InboxMessage, - knownLabels map[string]struct{}, - handler slog.Handler, -) *Clerk { - ctx, cancel := context.WithCancel( - component.MustNew(ctx, "prism", "clerk")) - - known := make(map[string]struct{}, len(knownLabels)) - for label := range knownLabels { - known[label] = struct{}{} - } - - c := &Clerk{ - inbox: inbox, - known: known, - ctx: ctx, - cancel: cancel, - } - - if handler != nil { - comp := component.FromContext(ctx) - c.logger = slog.New(handler).With(slog.Any("component", comp)) - } - - return c -} - -func (c *Clerk) Subscribe( - labels map[string]struct{}, - buffer int, -) (<-chan InboundLetter, error) { - c.mu.Lock() - defer c.mu.Unlock() - - if c.started { - return nil, ErrAlreadyStarted - } - - for label := range labels { - if _, ok := c.known[label]; !ok { - return nil, fmt.Errorf("%w: %s", ErrUnknownLabel, label) - } - } - - subLabels := make(map[string]struct{}, len(labels)) - for label := range labels { - subLabels[label] = struct{}{} - } - - ch := make(chan InboundLetter, buffer) - c.pending = append(c.pending, clerkSub{ch: ch, labels: subLabels}) - - return ch, nil -} - -func (c *Clerk) Start() error { - c.mu.Lock() - defer c.mu.Unlock() - - if c.started { - return ErrAlreadyStarted - } - - routes := make(clerkRoutes, len(c.known)) - for _, sub := range c.pending { - for label := range sub.labels { - routes[label] = append(routes[label], sub.ch) - } - } - - c.routes = routes - c.started = true - - c.wg.Add(1) - go c.run() - - return nil -} - -func (c *Clerk) Close() { - c.cancel() - c.wg.Wait() - - c.mu.Lock() - defer c.mu.Unlock() - - for _, sub := range c.pending { - close(sub.ch) - } - - // prevent double channel closes if Close() is called twice - c.pending = nil -} - -func (c *Clerk) run() { - defer c.wg.Done() - - for { - select { - case <-c.ctx.Done(): - return - - case msg, ok := <-c.inbox: - if !ok { - // inbox closed externally, close clerk - c.cancel() - return - } - - labelBytes, err := envelope.GetLabel(msg.Data) - if err != nil { - if c.logger != nil { - c.logger.Warn("invalid envelope", - "peer_id", msg.ID, - "received_at", msg.ReceivedAt, - ) - } - continue - } - - subs, ok := c.routes[string(labelBytes)] - if !ok { - continue - } - - letter := InboundLetter{ - ID: msg.ID, - Data: msg.Data, - At: msg.ReceivedAt, - } - - for _, ch := range subs { - select { - case ch <- letter: - case <-c.ctx.Done(): - return - } - } - } - } -} diff --git a/clerk_test.go b/clerk_test.go deleted file mode 100644 index 5ea374f..0000000 --- a/clerk_test.go +++ /dev/null @@ -1,171 +0,0 @@ -package prism - -import ( - "context" - "git.wisehodl.dev/jay/go-honeybee" - "github.com/stretchr/testify/assert" - "testing" - "time" -) - -// ---------------------------------------------------------------------------- -// Helpers -// ---------------------------------------------------------------------------- - -func mockInbox() (chan honeybee.InboxMessage, func(label string)) { - ch := make(chan honeybee.InboxMessage, 8) - inject := func(label string) { - ch <- honeybee.InboxMessage{ - ID: "wss://test", - Data: []byte(`["` + label + `","payload"]`), - ReceivedAt: time.Now(), - } - } - return ch, inject -} - -func makeClerk(inbox chan honeybee.InboxMessage) *Clerk { - known := map[string]struct{}{ - "EVENT": {}, - "EOSE": {}, - "CLOSE": {}, - } - return NewClerk(context.Background(), inbox, known, nil) -} - -// ---------------------------------------------------------------------------- -// Tests -// ---------------------------------------------------------------------------- - -func TestClerkRouting(t *testing.T) { - inbox, inject := mockInbox() - c := makeClerk(inbox) - - subA, err := c.Subscribe(map[string]struct{}{"EVENT": {}}, 4) - assert.NoError(t, err) - - subB, err := c.Subscribe(map[string]struct{}{"EVENT": {}, "EOSE": {}}, 4) - assert.NoError(t, err) - - assert.NoError(t, c.Start()) - - inject("EVENT") - inject("EOSE") - - // A receives exactly one letter (EVENT only) - Eventually(t, func() bool { - select { - case l := <-subA: - return string(l.Data) == `["EVENT","payload"]` - default: - return false - } - }, "subA should receive the EVENT letter") - - Never(t, func() bool { - select { - case <-subA: - return true - default: - return false - } - }, "subA should receive no further letters") - - // B receives two letters (EVENT and EOSE) - count := 0 - Eventually(t, func() bool { - select { - case <-subB: - count++ - default: - } - return count == 2 - }, "subB should receive both letters") -} - -func TestClerkStartup(t *testing.T) { - inbox, _ := mockInbox() - c := makeClerk(inbox) - - assert.NoError(t, c.Start()) - - _, err := c.Subscribe(map[string]struct{}{"EVENT": {}}, 4) - assert.ErrorIs(t, err, ErrAlreadyStarted) - - c.Close() -} - -func TestClerkUnknownSubscriptionLabel(t *testing.T) { - inbox, _ := mockInbox() - c := makeClerk(inbox) - - _, err := c.Subscribe(map[string]struct{}{"UNKNOWN": {}}, 4) - assert.ErrorIs(t, err, ErrUnknownLabel) -} - -func TestClerkUnknownInboxLabel(t *testing.T) { - inbox, inject := mockInbox() - c := makeClerk(inbox) - - // subscribe to every known label - sub, err := c.Subscribe( - map[string]struct{}{"EVENT": {}, "EOSE": {}, "CLOSE": {}}, 4) - assert.NoError(t, err) - assert.NoError(t, c.Start()) - - // inject a valid nostr label, but is not in the test label set - inject("NOTICE") - - Never(t, func() bool { - select { - case <-sub: - return true - default: - return false - } - }, "no subscriber should receive an unknown label") -} - -func TestClerkInboxClose(t *testing.T) { - inbox, _ := mockInbox() - c := makeClerk(inbox) - - sub, err := c.Subscribe(map[string]struct{}{"EVENT": {}}, 4) - assert.NoError(t, err) - assert.NoError(t, c.Start()) - - // close the inbox as the pool would on shutdown - close(inbox) - - // internal waitgroup should clear - Eventually(t, func() bool { - c.wg.Wait() - return true - }, "wg should clear") - - // subscriptions remain open. Close() must be called to completely shut down - Never(t, func() bool { - _, ok := <-sub - return !ok - }, "sub should remain open") -} - -func TestClerkClose(t *testing.T) { - inbox, _ := mockInbox() - c := makeClerk(inbox) - - subA, err := c.Subscribe(map[string]struct{}{"EVENT": {}}, 4) - assert.NoError(t, err) - subB, err := c.Subscribe(map[string]struct{}{"EOSE": {}}, 4) - assert.NoError(t, err) - - assert.NoError(t, c.Start()) - - c.Close() - - Eventually(t, func() bool { - _, okA := <-subA - _, okB := <-subB - return !okA && !okB - }, "all subscriber channels should be closed after Close()") -} diff --git a/courier_test.go b/courier_test.go deleted file mode 100644 index 0b4988f..0000000 --- a/courier_test.go +++ /dev/null @@ -1,251 +0,0 @@ -package prism - -import ( - "context" - "fmt" - "git.wisehodl.dev/jay/go-mana-component" - "github.com/stretchr/testify/assert" - "sync/atomic" - "testing" - "time" -) - -// Helpers - -func newTestLetter(ctx context.Context, id uint64) OutboundLetter { - ctx, cancel := context.WithCancel( - component.MustExtend(ctx, "test_letter")) - return OutboundLetter{ - id: id, - peerID: "wss://test", - data: []byte("[]"), - ctx: ctx, - cancel: cancel, - } -} - -// Tests - -func TestCourierSendsAfterConnect(t *testing.T) { - ctx := component.MustNew(context.Background(), "prism", "test") - - var sendCount atomic.Uint32 - sendFunc := func(data Envelope) error { - sendCount.Add(1) - return nil - } - - c := NewCourier(ctx, sendFunc, nil) - called := make(chan LetterOutcome, 1) - c.Enqueue(newTestLetter(ctx, 1), func(o LetterOutcome) { called <- o }) - - Never(t, func() bool { return sendCount.Load() > 0 }, - "should not have sent while disconnected") - - c.HandleConnect() - - Eventually(t, func() bool { return sendCount.Load() > 0 }, - "should have sent after connect") - - var outcome LetterOutcome - Eventually(t, func() bool { - select { - default: - return false - case outcome = <-called: - return true - } - }, "should have returned outcome") - - assert.Equal(t, uint64(1), outcome.LetterID) - assert.Equal(t, "wss://test", outcome.PeerID) - assert.Equal(t, OutcomeSent, outcome.Kind) - assert.False(t, outcome.SentAt.IsZero()) - assert.True(t, outcome.MissedAt.IsZero()) - assert.Equal(t, 0, outcome.Retries) -} - -func TestCourierMultipleSends(t *testing.T) { - ctx := component.MustNew(context.Background(), "prism", "test") - - var sendCount atomic.Uint32 - sendFunc := func(data Envelope) error { - sendCount.Add(1) - return nil - } - - c := NewCourier(ctx, sendFunc, nil) - c.HandleConnect() - - outcomes := make([]LetterOutcome, 0, 2) - called := make(chan LetterOutcome, 2) - c.Enqueue(newTestLetter(ctx, 1), func(o LetterOutcome) { called <- o }) - c.Enqueue(newTestLetter(ctx, 2), func(o LetterOutcome) { called <- o }) - - Eventually(t, func() bool { return sendCount.Load() == 2 }, - "should have sent letters") - - Eventually(t, func() bool { - select { - default: - return false - case o := <-called: - outcomes = append(outcomes, o) - return len(outcomes) == 2 - } - }, "should have returned 2 outcomes") - - // callbacks are called in goroutines and may arrive out of order - assert.Equal(t, OutcomeSent, outcomes[0].Kind) - assert.Equal(t, OutcomeSent, outcomes[1].Kind) -} - -func TestCourierSkipsCancelledLetter(t *testing.T) { - ctx := component.MustNew(context.Background(), "prism", "test") - - var sendCount atomic.Uint32 - sendFunc := func(data Envelope) error { - sendCount.Add(1) - return nil - } - - c := NewCourier(ctx, sendFunc, nil) - c.HandleConnect() - - l := newTestLetter(ctx, 1) - l.cancel() - - called := make(chan LetterOutcome, 1) - c.Enqueue(l, func(o LetterOutcome) { called <- o }) - - var outcome LetterOutcome - Eventually(t, func() bool { - select { - default: - return false - case outcome = <-called: - return true - } - }, "should have returned outcome") - - assert.Equal(t, OutcomeCancelled, outcome.Kind) -} - -func TestCourierRetryOnFailure(t *testing.T) { - ctx := component.MustNew(context.Background(), "prism", "test") - - var sendCount atomic.Uint32 - sendFunc := func(data Envelope) error { - sendCount.Add(1) - if sendCount.Load() < 3 { - return fmt.Errorf("transient failure") - } - return nil - } - - c := NewCourier(ctx, sendFunc, nil) - c.HandleConnect() - - called := make(chan LetterOutcome, 1) - c.Enqueue(newTestLetter(ctx, 1), func(o LetterOutcome) { called <- o }) - - Eventually(t, func() bool { return sendCount.Load() > 0 }, - "should send eventually") - - var outcome LetterOutcome - Eventually(t, func() bool { - select { - default: - return false - case outcome = <-called: - return true - } - }, "should have returned outcome") - - assert.Equal(t, OutcomeSent, outcome.Kind) - assert.Equal(t, 2, outcome.Retries) -} - -func TestCourierPauseOnDisconnect(t *testing.T) { - ctx := component.MustNew(context.Background(), "prism", "test") - - var sendCount atomic.Uint32 - var gate atomic.Bool - gate.Store(false) - sendFunc := func(data Envelope) error { - // gated send - if gate.Load() { - sendCount.Add(1) - return nil - } - - return fmt.Errorf("gate is closed") - } - - c := NewCourier(ctx, sendFunc, nil) - c.HandleConnect() - - // queue a letter - called := make(chan LetterOutcome, 1) - c.Enqueue(newTestLetter(ctx, 1), func(o LetterOutcome) { called <- o }) - - // manually wait for letters to queue - time.Sleep(100 * time.Millisecond) - - // manually wait for disconnect toggle - c.HandleDisconnect() - time.Sleep(100 * time.Millisecond) - - // open gate - gate.Store(true) - - // should never have sent in this time - Never(t, func() bool { return sendCount.Load() > 0 }, - "should not have sent while disconnected") - - // reconnect, gate is open, letter should send - c.HandleConnect() - Eventually(t, func() bool { return sendCount.Load() > 0 }, - "should have sent") -} - -func TestCourierDrainOnClose(t *testing.T) { - ctx := component.MustNew(context.Background(), "prism", "test") - - var sendCount atomic.Uint32 - sendFunc := func(data Envelope) error { - sendCount.Add(1) - return nil - } - - c := NewCourier(ctx, sendFunc, nil) - - // do not connect, queue some letters - outcomes := make([]LetterOutcome, 0, 2) - called := make(chan LetterOutcome, 4) - c.Enqueue(newTestLetter(ctx, 1), func(o LetterOutcome) { called <- o }) - c.Enqueue(newTestLetter(ctx, 2), func(o LetterOutcome) { called <- o }) - - // should not send any letters - Never(t, func() bool { return sendCount.Load() > 0 }, - "should not have sent letters") - - // close the courier - c.Close() - - // expect each letter to return cancelled - Eventually(t, func() bool { - select { - default: - return false - case o := <-called: - outcomes = append(outcomes, o) - return len(outcomes) == 2 - } - }, "should have returned 2 outcomes") - - if len(outcomes) >= 2 { - assert.Equal(t, OutcomeCancelled, outcomes[0].Kind) - assert.Equal(t, OutcomeCancelled, outcomes[1].Kind) - } -} diff --git a/embassy.go b/embassy.go new file mode 100644 index 0000000..d201a39 --- /dev/null +++ b/embassy.go @@ -0,0 +1,467 @@ +package prism + +import ( + "context" + "fmt" + "git.wisehodl.dev/jay/go-honeybee" + "git.wisehodl.dev/jay/go-mana-component" + "git.wisehodl.dev/jay/go-roots-ws" + "log/slog" + "sync" + "time" +) + +// ---------------------------------------------------------------------------- +// Types +// ---------------------------------------------------------------------------- + +type EmbassyPlugin struct { + Connect func(url string) error + Remove func(url string) error + Send func(url string, data []byte) error + Events <-chan honeybee.OutboundPoolEvent + Inbox <-chan honeybee.InboxMessage +} + +type EmbassyEventKind int + +const ( + EventConnected EmbassyEventKind = iota + EventDisconnected + EventEmbassyUnknown +) + +func mapEmbassyEvent(kind honeybee.OutboundPoolEventKind) EmbassyEventKind { + switch kind { + case honeybee.OutboundEventConnected: + return EventConnected + case honeybee.OutboundEventDisconnected: + return EventDisconnected + default: + return EventEmbassyUnknown + } +} + +type OutboundPoolEvent struct { + ID string + Kind EmbassyEventKind + At time.Time +} + +type InboxMessage struct { + ID string + Data []byte + ReceivedAt time.Time +} + +type Embassy struct { + pool EmbassyPlugin + envoys map[string]*Envoy + eventSubs map[string]chan<- OutboundPoolEvent + inboxSubs map[string]chan<- InboxMessage + + ctx context.Context + cancel context.CancelFunc + mu sync.RWMutex + wg sync.WaitGroup + handler slog.Handler + logger *slog.Logger +} + +type Envoy struct { + url string + connected bool + terminate func() + queue chan []byte + send func(data []byte) error + events <-chan OutboundPoolEvent + inbox <-chan InboxMessage + eventSubs []chan<- OutboundPoolEvent + inboxSubs map[string][]chan<- InboxMessage + + ctx context.Context + cancel context.CancelFunc + mu sync.RWMutex + wg sync.WaitGroup + handler slog.Handler + logger *slog.Logger +} + +// ---------------------------------------------------------------------------- +// Embassy +// ---------------------------------------------------------------------------- + +func NewEmbassy( + ctx context.Context, + pool EmbassyPlugin, + handler slog.Handler, +) *Embassy { + ctx, cancel := context.WithCancel(component.MustNew(ctx, "prism", "embassy")) + + e := &Embassy{ + pool: pool, + envoys: make(map[string]*Envoy), + eventSubs: make(map[string]chan<- OutboundPoolEvent), + inboxSubs: make(map[string]chan<- InboxMessage), + ctx: ctx, + cancel: cancel, + } + + if handler != nil { + comp := component.FromContext(ctx) + e.handler = handler + e.logger = slog.New(handler).With(slog.Any("component", comp)) + } + + e.wg.Add(2) + go e.routeEvents() + go e.routeInbox() + + return e +} + +func (e *Embassy) Dispatch(url string) error { + url, err := honeybee.NormalizeURL(url) + if err != nil { + return fmt.Errorf("invalid url: %w", err) + } + + e.mu.RLock() + _, exists := e.envoys[url] + if exists { + e.mu.RUnlock() + return fmt.Errorf("already dispatched: %s", url) + } + e.mu.RUnlock() + + e.mu.Lock() + + e.pool.Connect(url) + + terminate := func() { e.dismiss(url) } + send := func(data []byte) error { return e.send(url, data) } + events := e.subscribeEventsLock(url) + inbox := e.subscribeInboxLock(url) + + e.envoys[url] = newEnvoy(e.ctx, url, terminate, send, events, inbox, e.handler) + + e.mu.Unlock() + + return nil +} + +func (e *Embassy) Envoys() []string { + e.mu.RLock() + defer e.mu.RUnlock() + var envoys []string + for url := range e.envoys { + envoys = append(envoys, url) + } + return envoys +} + +func (e *Embassy) Call(url string) *Envoy { + url, err := honeybee.NormalizeURL(url) + if err != nil { + return nil + } + + e.mu.RLock() + defer e.mu.RUnlock() + + envoy, ok := e.envoys[url] + if !ok { + return nil + } + return envoy +} + +func (e *Embassy) Close() { + e.cancel() + e.wg.Wait() +} + +func (e *Embassy) dismiss(url string) { + e.mu.Lock() + defer e.mu.Unlock() + + e.pool.Remove(url) + e.unsubscribeEventsLock(url) + e.unsubscribeInboxLock(url) + delete(e.envoys, url) +} + +func (e *Embassy) send(url string, data []byte) error { + return e.pool.Send(url, data) +} + +func (e *Embassy) subscribeEventsLock(url string) <-chan OutboundPoolEvent { + ch := make(chan OutboundPoolEvent) + e.eventSubs[url] = ch + return ch +} + +func (e *Embassy) unsubscribeEventsLock(url string) { + ch, ok := e.eventSubs[url] + if !ok { + return + } + close(ch) + delete(e.eventSubs, url) +} + +func (e *Embassy) subscribeInboxLock(url string) <-chan InboxMessage { + ch := make(chan InboxMessage) + e.inboxSubs[url] = ch + return ch +} + +func (e *Embassy) unsubscribeInboxLock(url string) { + ch, ok := e.inboxSubs[url] + if !ok { + return + } + close(ch) + delete(e.inboxSubs, url) +} + +func (e *Embassy) routeEvents() { + defer e.wg.Done() + + for { + select { + case <-e.ctx.Done(): + return + case ev, ok := <-e.pool.Events: + if !ok { + return + } + + url, err := honeybee.NormalizeURL(ev.ID) + if err != nil { + continue + } + + e.mu.RLock() + sub, ok := e.eventSubs[url] + e.mu.RUnlock() + + if !ok { + continue + } + + select { + case <-e.ctx.Done(): + return + case sub <- OutboundPoolEvent{ + ID: ev.ID, Kind: mapEmbassyEvent(ev.Kind), At: ev.At, + }: + } + } + } +} + +func (e *Embassy) routeInbox() { + defer e.wg.Done() + + for { + select { + case <-e.ctx.Done(): + return + case ev, ok := <-e.pool.Inbox: + if !ok { + return + } + + url, err := honeybee.NormalizeURL(ev.ID) + if err != nil { + continue + } + + e.mu.RLock() + sub, ok := e.inboxSubs[url] + e.mu.RUnlock() + + if !ok { + continue + } + + select { + case <-e.ctx.Done(): + return + case sub <- InboxMessage{ + ID: ev.ID, Data: ev.Data, ReceivedAt: ev.ReceivedAt}: + } + } + } +} + +// ---------------------------------------------------------------------------- +// Envoy +// ---------------------------------------------------------------------------- + +func newEnvoy( + ctx context.Context, + url string, + terminate func(), + send func(data []byte) error, + events <-chan OutboundPoolEvent, + inbox <-chan InboxMessage, + handler slog.Handler, +) *Envoy { + ctx, cancel := context.WithCancel(component.MustExtend(ctx, "envoy")) + + e := &Envoy{ + url: url, + terminate: terminate, + queue: make(chan []byte), + send: send, + events: events, + inbox: inbox, + inboxSubs: make(map[string][]chan<- InboxMessage), + ctx: ctx, + cancel: cancel, + } + + if handler != nil { + comp := component.FromContext(ctx) + e.handler = handler + e.logger = slog.New(handler).With(slog.Any("component", comp)).With("peer", url) + } + + e.wg.Add(2) + go e.publishEvents() + go e.routeInbox() + + return e +} + +func (e *Envoy) IsConnected() bool { + e.mu.RLock() + defer e.mu.RUnlock() + return e.connected +} + +func (e *Envoy) Context() context.Context { + return e.ctx +} + +func (e *Envoy) Handler() slog.Handler { + return e.handler +} + +func (e *Envoy) Dismiss() { + e.terminate() + e.cancel() + e.wg.Wait() + + e.mu.Lock() + defer e.mu.Unlock() + + for _, sub := range e.eventSubs { + close(sub) + } + + for _, subs := range e.inboxSubs { + for _, sub := range subs { + close(sub) + } + } + + e.eventSubs = nil + e.inboxSubs = make(map[string][]chan<- InboxMessage) +} + +func (e *Envoy) Send(data []byte) error { + return e.send(data) +} + +func (e *Envoy) SubscribeEvents() <-chan OutboundPoolEvent { + e.mu.Lock() + defer e.mu.Unlock() + ch := make(chan OutboundPoolEvent) + e.eventSubs = append(e.eventSubs, ch) + return ch +} + +func (e *Envoy) SubscribeInbox(labels []string) <-chan InboxMessage { + e.mu.Lock() + defer e.mu.Unlock() + ch := make(chan InboxMessage) + for _, label := range labels { + if _, ok := e.inboxSubs[label]; !ok { + e.inboxSubs[label] = make([]chan<- InboxMessage, 0) + } + e.inboxSubs[label] = append(e.inboxSubs[label], ch) + } + return ch +} + +func (e *Envoy) publishEvents() { + defer e.wg.Done() + + for { + select { + case <-e.ctx.Done(): + return + case ev, ok := <-e.events: + if !ok { + return + } + + e.mu.Lock() + switch ev.Kind { + case EventConnected: + e.connected = true + case EventDisconnected: + e.connected = false + } + subs := e.eventSubs + e.mu.Unlock() + + for _, ch := range subs { + select { + case <-e.ctx.Done(): + return + case ch <- ev: + } + } + } + } +} + +func (e *Envoy) routeInbox() { + defer e.wg.Done() + + for { + select { + case <-e.ctx.Done(): + return + case msg, ok := <-e.inbox: + if !ok { + return + } + + label, err := envelope.GetLabel(msg.Data) + if err != nil { + continue + } + + e.mu.RLock() + subs, ok := e.inboxSubs[string(label)] + e.mu.RUnlock() + + if !ok { + continue + } + + for _, ch := range subs { + select { + case <-e.ctx.Done(): + return + case ch <- msg: + } + } + } + } +} diff --git a/embassy_test.go b/embassy_test.go index cf8bec1..714906f 100644 --- a/embassy_test.go +++ b/embassy_test.go @@ -3,348 +3,133 @@ package prism import ( "context" "git.wisehodl.dev/jay/go-honeybee" + "git.wisehodl.dev/jay/go-roots-ws" "github.com/stretchr/testify/assert" + "sync/atomic" "testing" "time" ) -func TestEmbassyPoolEvents(t *testing.T) { +func TestEmbassy_TEMPLATE(t *testing.T) { ctx := context.Background() - eventsCh := make(chan honeybee.OutboundPoolEvent) + url := "wss://test" + connect := func(url string) error { return nil } + remove := func(url string) error { return nil } + sent := false + _ = sent + send := func(url string, data []byte) error { + sent = true + return nil + } + events := make(chan honeybee.OutboundPoolEvent) + inbox := make(chan honeybee.InboxMessage) pool := EmbassyPlugin{ - Connect: func(id string) error { return nil }, - Remove: func(id string) error { return nil }, - Send: func(id string, data []byte) error { return nil }, - Events: eventsCh, + Connect: connect, + Remove: remove, + Send: send, + Events: events, + Inbox: inbox, } - e := NewEmbassy(ctx, pool, nil, nil) - sub := e.Subscribe() - - t.Run("added then removed", func(t *testing.T) { - err := e.Dispatch("wss://test") - assert.NoError(t, err) - - Eventually(t, func() bool { - select { - default: - return false - case ev := <-sub: - return ev.Kind == EventAdded - } - }, "expected added event") - - err = e.Dismiss("wss://test") - assert.NoError(t, err) - - Eventually(t, func() bool { - select { - default: - return false - case ev := <-sub: - return ev.Kind == EventRemoved - } - }, "expected removed event") - }) - - t.Run("connected", func(t *testing.T) { - e.Dispatch("wss://test") - - eventsCh <- honeybee.OutboundPoolEvent{ - ID: "wss://test", - Kind: honeybee.OutboundEventConnected, - At: time.Now(), - } - - Eventually(t, func() bool { - select { - default: - return false - case ev := <-sub: - return ev.Kind == EventConnected - } - }, "expected connected event") - }) - - t.Run("disconnected", func(t *testing.T) { - e.Dispatch("wss://test") - - eventsCh <- honeybee.OutboundPoolEvent{ - ID: "wss://test", - Kind: honeybee.OutboundEventDisconnected, - At: time.Now(), - } - - Eventually(t, func() bool { - select { - default: - return false - case ev := <-sub: - return ev.Kind == EventDisconnected - } - }, "expected disconnected event") - }) + embassy := NewEmbassy(ctx, pool, nil) + embassy.Dispatch(url) + envoy := embassy.Call(url) + assert.NotNil(t, envoy) } -func TestEmbassyPeerRegistry(t *testing.T) { +func TestEmbassy_Dispatch(t *testing.T) { ctx := context.Background() - eventsCh := make(chan honeybee.OutboundPoolEvent) + url := "wss://test" + connectCalled := make(chan struct{}) + removeCalled := make(chan struct{}) + connect := func(url string) error { close(connectCalled); return nil } + remove := func(url string) error { close(removeCalled); return nil } + sent := false + send := func(url string, data []byte) error { + sent = true + return nil + } + events := make(chan honeybee.OutboundPoolEvent) + inbox := make(chan honeybee.InboxMessage) pool := EmbassyPlugin{ - Connect: func(id string) error { return nil }, - Remove: func(id string) error { return nil }, - Send: func(id string, data []byte) error { return nil }, - Events: eventsCh, + Connect: connect, + Remove: remove, + Send: send, + Events: events, + Inbox: inbox, } - e := NewEmbassy(ctx, pool, nil, nil) + embassy := NewEmbassy(ctx, pool, nil) + embassy.Dispatch(url) + envoy := embassy.Call(url) + assert.NotNil(t, envoy) - // add - e.Dispatch("wss://test") - - url, ok := e.HasPeer("wss://test/") - assert.Equal(t, "wss://test", url) - assert.True(t, ok) - assert.False(t, e.IsConnected("wss://test")) - - // connect - eventsCh <- honeybee.OutboundPoolEvent{ - ID: "wss://test", - Kind: honeybee.OutboundEventConnected, - At: time.Now(), - } - - Eventually(t, func() bool { - _, exists := e.HasPeer("wss://test") - connected := e.IsConnected("wss://test") - return exists && connected - }, "expected: exists, connected") - - // disconnect - eventsCh <- honeybee.OutboundPoolEvent{ - ID: "wss://test", - Kind: honeybee.OutboundEventDisconnected, - At: time.Now(), - } - - Eventually(t, func() bool { - _, exists := e.HasPeer("wss://test") - connected := e.IsConnected("wss://test") - return exists && !connected - }, "expected: exists, disconnected") - - // remove - e.Dismiss("wss://test") - - _, ok = e.HasPeer("wss://test") + _, ok := <-connectCalled assert.False(t, ok) - assert.False(t, e.IsConnected("wss://test")) -} -func TestEmbassyPeers(t *testing.T) { - ctx := context.Background() + eventSub := envoy.SubscribeEvents() + inboxSub := envoy.SubscribeInbox([]string{"EVENT"}) - pool := EmbassyPlugin{ - Connect: func(id string) error { return nil }, - Remove: func(id string) error { return nil }, - Send: func(id string, data []byte) error { return nil }, - Events: nil, - } - - e := NewEmbassy(ctx, pool, nil, nil) - - assert.Len(t, e.Peers(), 0) - - e.Dispatch("wss://test1") - e.Dispatch("wss://test2") - assert.Len(t, e.Peers(), 2) - - e.Dismiss("wss://test2") - assert.Len(t, e.Peers(), 1) -} - -func TestEmbassySubFanout(t *testing.T) { - ctx := context.Background() - eventsCh := make(chan honeybee.OutboundPoolEvent) - - pool := EmbassyPlugin{ - Connect: func(id string) error { return nil }, - Remove: func(id string) error { return nil }, - Send: func(id string, data []byte) error { return nil }, - Events: eventsCh, - } - - e := NewEmbassy(ctx, pool, nil, nil) - sub1 := e.Subscribe() - sub2 := e.Subscribe() - - e.Dispatch("wss://test") - - Eventually(t, func() bool { - select { - default: - return false - case ev := <-sub1: - return ev.Kind == EventAdded - } - }, "expected added event on sub1") - - Eventually(t, func() bool { - select { - default: - return false - case ev := <-sub2: - return ev.Kind == EventAdded - } - }, "expected added event on sub2") -} - -func TestEmbassyClose(t *testing.T) { - ctx := context.Background() - eventsCh := make(chan honeybee.OutboundPoolEvent, 1) - - pool := EmbassyPlugin{ - Connect: func(id string) error { return nil }, - Remove: func(id string) error { return nil }, - Send: func(id string, data []byte) error { return nil }, - Events: eventsCh, - } - - e := NewEmbassy(ctx, pool, nil, nil) - sub1 := e.Subscribe() - sub2 := e.Subscribe() - - e.Dispatch("wss://test") - - e.Close() - - // peer gets removed - Eventually(t, func() bool { - select { - default: - return false - case ev := <-sub1: - return ev.ID == "wss://test" && ev.Kind == EventRemoved - } - }, "expected peer removed") - - Eventually(t, func() bool { - select { - default: - return false - case ev := <-sub2: - return ev.ID == "wss://test" && ev.Kind == EventRemoved - } - }, "expected peer removed") - - // peer list is empty - _, ok := e.HasPeer("wss://test") - assert.False(t, ok) - assert.Len(t, e.Peers(), 0) - - // subs close - Eventually(t, func() bool { - _, ok1 := <-sub1 - _, ok2 := <-sub2 - return !ok1 && !ok2 - }, "subs should close") -} - -func TestEmbassyJournals(t *testing.T) { - ctx := context.Background() - jc := NewJournalCollector() - eventsCh := make(chan honeybee.OutboundPoolEvent, 1) - - pool := EmbassyPlugin{ - Connect: func(id string) error { return nil }, - Remove: func(id string) error { return nil }, - Send: func(id string, data []byte) error { return nil }, - Events: eventsCh, - } - - e := NewEmbassy(ctx, pool, jc, nil) - out := jc.Out() - peer := "wss://test" - - // added - e.Dispatch(peer) - Eventually(t, func() bool { - select { - case entry := <-out: - _, ok := entry.(PeerAddedJournal) - return ok - default: - return false - } - }, "expected PeerAddedJournal") - - // connected - eventsCh <- honeybee.OutboundPoolEvent{ - ID: peer, - Kind: honeybee.OutboundEventConnected, - At: time.Now(), - } - Eventually(t, func() bool { - select { - case entry := <-out: - e, ok := entry.(PeerConnectedJournal) - - // ensure fields are correct - peerOk := e.PeerID() == "wss://test" - modOk := e.Component().Module() == "prism" - pathOk := e.Component().PathString() == "embassy" - - return ok && peerOk && modOk && pathOk - default: - return false - } - }, "expected PeerConnectedJournal") - - // disconnected - eventsCh <- honeybee.OutboundPoolEvent{ - ID: peer, - Kind: honeybee.OutboundEventDisconnected, - At: time.Now(), - } - Eventually(t, func() bool { - select { - case entry := <-out: - _, ok := entry.(PeerDisconnectedJournal) - return ok - default: - return false - } - }, "expected PeerDisconnectedJournal") - - // removed - e.Dismiss(peer) - Eventually(t, func() bool { - select { - case entry := <-out: - _, ok := entry.(PeerRemovedJournal) - return ok - default: - return false - } - }, "expected PeerRemovedJournal") - - // close embassy: closes journal channel - e.Close() - - // Ensure jc can close now that embassy has closed its journal channel - jcClosed := make(chan struct{}) + gotEvent := atomic.Int64{} + gotInbox := atomic.Int64{} + eventDone := make(chan struct{}) + inboxDone := make(chan struct{}) go func() { - jc.Close() - close(jcClosed) + for range eventSub { + gotEvent.Add(1) + } + close(eventDone) + }() + go func() { + for range inboxSub { + gotInbox.Add(1) + } + close(inboxDone) }() - Eventually(t, func() bool { - select { - case <-jcClosed: - return true - default: - return false - } - }, "JournalCollector.Close() should return after Embassy.Close()") + events <- honeybee.OutboundPoolEvent{ + ID: url, Kind: honeybee.OutboundEventConnected, At: time.Now()} + events <- honeybee.OutboundPoolEvent{ + ID: "wss://other", Kind: honeybee.OutboundEventConnected, At: time.Now()} + inbox <- honeybee.InboxMessage{ + ID: url, + Data: envelope.EncloseEvent([]byte("{}")), + ReceivedAt: time.Now(), + } + inbox <- honeybee.InboxMessage{ + ID: "wss://other", + Data: envelope.EncloseEvent([]byte("{}")), + ReceivedAt: time.Now(), + } + + Eventually(t, func() bool { return gotEvent.Load() > 0 }, + "should have gotten event") + Eventually(t, func() bool { return gotInbox.Load() > 0 }, + "should have gotten inbox message") + Eventually(t, func() bool { return envoy.IsConnected() }, + "state should have toggled") + Never(t, func() bool { return gotEvent.Load() > 1 }, + "should have only gotten one event") + Never(t, func() bool { return gotInbox.Load() > 1 }, + "should have only gotten one inbox message") + + envoy.Send([]byte("hello")) + assert.True(t, sent) + + envoy.Dismiss() + + _, ok = <-removeCalled + assert.False(t, ok) + + _, ok = <-eventDone + assert.False(t, ok) + + _, ok = <-inboxDone + assert.False(t, ok) + + // envoy no longer in embassy + envoy = embassy.Call(url) + assert.Nil(t, envoy) } diff --git a/envoy_test.go b/envoy_test.go new file mode 100644 index 0000000..c7f0a3c --- /dev/null +++ b/envoy_test.go @@ -0,0 +1,151 @@ +package prism + +import ( + "context" + "git.wisehodl.dev/jay/go-honeybee" + "git.wisehodl.dev/jay/go-mana-component" + "git.wisehodl.dev/jay/go-roots-ws" + "github.com/stretchr/testify/assert" + "sync" + "testing" + "time" +) + +func TestEnvoy_Dismiss(t *testing.T) { + ctx := component.MustNew(context.Background(), "prism", "test") + mu := sync.RWMutex{} + url := "wss://test" + terminated := false + terminate := func() { + mu.Lock() + defer mu.Unlock() + terminated = true + } + + envoy := newEnvoy(ctx, url, terminate, nil, nil, nil, nil) + envoy.Dismiss() + + mu.RLock() + defer mu.RUnlock() + assert.True(t, terminated) +} + +func TestEnvoy_Send(t *testing.T) { + ctx := component.MustNew(context.Background(), "prism", "test") + url := "wss://test" + var sent bool + send := func(data []byte) error { + sent = true + return nil + } + + envoy := newEnvoy(ctx, url, nil, send, nil, nil, nil) + envoy.Send([]byte("hello")) + + Eventually(t, func() bool { + return sent + }, "should have sent") +} + +func TestEnvoy_IsConnected(t *testing.T) { + ctx := component.MustNew(context.Background(), "prism", "test") + mu := sync.RWMutex{} + url := "wss://test" + events := make(chan honeybee.OutboundPoolEvent) + + envoy := newEnvoy(ctx, url, nil, nil, events, nil, nil) + eventSub := envoy.SubscribeEvents() + + gotEvents := []honeybee.OutboundPoolEvent{} + go func() { + for ev := range eventSub { + mu.Lock() + gotEvents = append(gotEvents, ev) + mu.Unlock() + } + }() + + events <- honeybee.OutboundPoolEvent{ + ID: url, Kind: honeybee.OutboundEventConnected, At: time.Now()} + + Eventually(t, func() bool { + return envoy.IsConnected() + }, "state should have toggled") +} + +func TestEnvoy_Events(t *testing.T) { + ctx := component.MustNew(context.Background(), "prism", "test") + mu := sync.RWMutex{} + url := "wss://test" + events := make(chan honeybee.OutboundPoolEvent) + + envoy := newEnvoy(ctx, url, nil, nil, events, nil, nil) + eventSub := envoy.SubscribeEvents() + + gotEvents := []honeybee.OutboundPoolEvent{} + go func() { + for ev := range eventSub { + mu.Lock() + gotEvents = append(gotEvents, ev) + mu.Unlock() + } + }() + + events <- honeybee.OutboundPoolEvent{ + ID: url, Kind: honeybee.OutboundEventConnected, At: time.Now()} + + Eventually(t, func() bool { + mu.RLock() + defer mu.RUnlock() + return len(gotEvents) > 0 + }, "should have gotten event") + + mu.RLock() + assert.Equal(t, honeybee.OutboundEventConnected, gotEvents[0].Kind) + mu.RUnlock() +} + +func TestEnvoy_Inbox(t *testing.T) { + ctx := component.MustNew(context.Background(), "prism", "test") + mu := sync.RWMutex{} + url := "wss://test" + inbox := make(chan honeybee.InboxMessage) + + envoy := newEnvoy(ctx, url, nil, nil, nil, inbox, nil) + inboxSub := envoy.SubscribeInbox([]string{"EVENT"}) + + gotInbox := []honeybee.InboxMessage{} + go func() { + for ev := range inboxSub { + mu.Lock() + gotInbox = append(gotInbox, ev) + mu.Unlock() + } + }() + + inbox <- honeybee.InboxMessage{ + ID: url, + Data: envelope.EncloseEvent([]byte("{}")), + ReceivedAt: time.Now(), + } + inbox <- honeybee.InboxMessage{ + ID: url, + Data: envelope.EncloseOK("id", true, "ok"), + ReceivedAt: time.Now(), + } + + Eventually(t, func() bool { + mu.RLock() + defer mu.RUnlock() + return len(gotInbox) > 0 + }, "should have gotten inbox message") + + // should only receive the EVENT message + assert.Len(t, gotInbox, 1) + + mu.RLock() + data, err := envelope.FindEvent(gotInbox[0].Data) + mu.RUnlock() + assert.NoError(t, err) + assert.Equal(t, "{}", string(data)) +} diff --git a/journal.go b/journal.go deleted file mode 100644 index f358ae9..0000000 --- a/journal.go +++ /dev/null @@ -1,327 +0,0 @@ -package prism - -import ( - "fmt" - "git.wisehodl.dev/jay/go-mana-component" - "sync" - "time" -) - -// ---------------------------------------------------------------------------- -// Types -// ---------------------------------------------------------------------------- - -// JournalCollector - -type JournalCollector struct { - out chan JournalEntry - buffer chan JournalEntry - mu sync.Mutex - wg sync.WaitGroup - closing bool -} - -// JournalEntry - -type JournalEntry interface { - PeerID() string - SealedAt() time.Time - Component() component.Component -} - -type entry struct { - peerID string - sealedAt time.Time - component component.Component -} - -func (e *entry) PeerID() string { return e.peerID } -func (e *entry) SealedAt() time.Time { return e.sealedAt } -func (e *entry) Component() component.Component { return e.component } - -// ---------------------------------------------------------------------------- -// Journal Collector -// ---------------------------------------------------------------------------- - -func NewJournalCollector() *JournalCollector { - c := &JournalCollector{ - out: make(chan JournalEntry), - buffer: make(chan JournalEntry, 1024), - } - - go func() { - bufferedPipe(c.buffer, c.out) - close(c.out) - }() - - return c -} - -func (c *JournalCollector) Enroll(ch <-chan JournalEntry) error { - c.mu.Lock() - defer c.mu.Unlock() - - if c.closing { - return fmt.Errorf("journal collector is closing") - } - - c.wg.Add(1) - go func() { - defer c.wg.Done() - for e := range ch { - c.buffer <- e - } - }() - - return nil -} - -func (c *JournalCollector) Close() { - c.mu.Lock() - if c.closing { - c.mu.Unlock() - return - } - c.closing = true - c.mu.Unlock() - - c.wg.Wait() - close(c.buffer) -} - -func (c *JournalCollector) Out() <-chan JournalEntry { return c.out } - -// ---------------------------------------------------------------------------- -// Journal Entries -// ---------------------------------------------------------------------------- - -func newEntry(peerID string, component component.Component) *entry { - return &entry{ - peerID: peerID, - component: component, - sealedAt: time.Now(), - } -} - -// PeerAdded - -type PeerAddedJournal struct { - *entry - Data PeerAddedData -} - -type PeerAddedData struct { - At time.Time -} - -func NewPeerAddedJournal( - peerID string, component component.Component, data PeerAddedData, -) PeerAddedJournal { - return PeerAddedJournal{entry: newEntry(peerID, component), Data: data} -} - -// PeerRemoved - -type PeerRemovedJournal struct { - *entry - Data PeerRemovedData -} - -type PeerRemovedData struct { - At time.Time -} - -func NewPeerRemovedJournal( - peerID string, component component.Component, data PeerRemovedData, -) PeerRemovedJournal { - return PeerRemovedJournal{entry: newEntry(peerID, component), Data: data} -} - -// PeerConnected - -type PeerConnectedJournal struct { - *entry - Data PeerConnectedData -} - -type PeerConnectedData struct { - At time.Time -} - -func NewPeerConnectedJournal( - peerID string, component component.Component, data PeerConnectedData, -) PeerConnectedJournal { - return PeerConnectedJournal{entry: newEntry(peerID, component), Data: data} -} - -// PeerDisconnected - -type PeerDisconnectedJournal struct { - *entry - Data PeerDisconnectedData -} - -type PeerDisconnectedData struct { - At time.Time -} - -func NewPeerDisconnectedJournal( - peerID string, component component.Component, data PeerDisconnectedData, -) PeerDisconnectedJournal { - return PeerDisconnectedJournal{entry: newEntry(peerID, component), Data: data} -} - -// ReqQueued - -type ReqQueuedJournal struct { - *entry - Data ReqQueuedData -} - -type ReqQueuedData struct { - SubID string - LetterID uint64 - QueuedAt time.Time -} - -func NewReqQueuedJournal( - peerID string, component component.Component, data ReqQueuedData, -) ReqQueuedJournal { - return ReqQueuedJournal{entry: newEntry(peerID, component), Data: data} -} - -// CloseQueued - -type CloseQueuedJournal struct { - *entry - Data CloseQueuedData -} - -type CloseQueuedData struct { - SubID string - LetterID uint64 - QueuedAt time.Time -} - -func NewCloseQueuedJournal( - peerID string, component component.Component, data CloseQueuedData, -) CloseQueuedJournal { - return CloseQueuedJournal{entry: newEntry(peerID, component), Data: data} -} - -// ReqSendOutcome - -type ReqSendOutcomeJournal struct { - *entry - Data ReqSendOutcomeData -} - -type ReqSendOutcomeData struct { - SubID string - LetterID uint64 - Outcome LetterOutcomeKind - SentAt time.Time - MissedAt time.Time - RetryCount int -} - -func NewReqSendOutcomeJournal( - peerID string, component component.Component, data ReqSendOutcomeData, -) ReqSendOutcomeJournal { - return ReqSendOutcomeJournal{entry: newEntry(peerID, component), Data: data} -} - -// CloseSendOutcome - -type CloseSendOutcomeJournal struct { - *entry - Data CloseSendOutcomeData -} - -type CloseSendOutcomeData struct { - SubID string - LetterID uint64 - Outcome LetterOutcomeKind - SentAt time.Time - MissedAt time.Time - RetryCount int -} - -func NewCloseSendOutcomeJournal( - peerID string, component component.Component, data CloseSendOutcomeData, -) CloseSendOutcomeJournal { - return CloseSendOutcomeJournal{entry: newEntry(peerID, component), Data: data} -} - -// ReceivedEOSE - -type ReceivedEOSEJournal struct { - *entry - Data ReceivedEOSEData -} - -type ReceivedEOSEData struct { - SubID string - At time.Time -} - -func NewReceivedEOSEJournal( - peerID string, component component.Component, data ReceivedEOSEData, -) ReceivedEOSEJournal { - return ReceivedEOSEJournal{entry: newEntry(peerID, component), Data: data} -} - -// MissedEOSE - -type MissedEOSEJournal struct { - *entry - Data MissedEOSEData -} - -type MissedEOSEData struct { - SubID string - At time.Time -} - -func NewMissedEOSEJournal( - peerID string, component component.Component, data MissedEOSEData, -) MissedEOSEJournal { - return MissedEOSEJournal{entry: newEntry(peerID, component), Data: data} -} - -// ReceivedClosed - -type ReceivedClosedJournal struct { - *entry - Data ReceivedClosedData -} - -type ReceivedClosedData struct { - SubID string - At time.Time - Message string -} - -func NewReceivedClosedJournal( - peerID string, component component.Component, data ReceivedClosedData, -) ReceivedClosedJournal { - return ReceivedClosedJournal{entry: newEntry(peerID, component), Data: data} -} - -// ReqClosed - -type ReqClosedJournal struct { - *entry - Data ReqClosedData -} - -type ReqClosedData struct { - SubID string - At time.Time -} - -func NewReqClosedJournal( - peerID string, component component.Component, data ReqClosedData, -) ReqClosedJournal { - return ReqClosedJournal{entry: newEntry(peerID, component), Data: data} -} diff --git a/journal_test.go b/journal_test.go deleted file mode 100644 index 656a305..0000000 --- a/journal_test.go +++ /dev/null @@ -1,165 +0,0 @@ -package prism - -import ( - "context" - "git.wisehodl.dev/jay/go-mana-component" - "github.com/stretchr/testify/assert" - "testing" - "time" -) - -type testJournalEntry struct { - *entry -} - -func newTestEntry(peerID string, comp component.Component) JournalEntry { - return &testJournalEntry{entry: newEntry(peerID, comp)} -} - -func TestJournalCollector_SingleProducer(t *testing.T) { - jc := NewJournalCollector() - ch := make(chan JournalEntry, 10) - jc.Enroll(ch) - - ctx := component.MustNew(context.Background(), "test", "emitter") - comp := component.FromContext(ctx) - e1 := newTestEntry("peer1", comp) - e2 := newTestEntry("peer2", comp) - - ch <- e1 - ch <- e2 - close(ch) - - var received []JournalEntry - out := jc.Out() - - // Wait for entries - Eventually(t, func() bool { - select { - case e := <-out: - received = append(received, e) - default: - } - return len(received) == 2 - }, "should receive all entries") -} - -func TestJournalCollector_MultipleProducers(t *testing.T) { - jc := NewJournalCollector() - ch1 := make(chan JournalEntry, 5) - ch2 := make(chan JournalEntry, 5) - jc.Enroll(ch1) - jc.Enroll(ch2) - - ctx := component.MustNew(context.Background(), "test", "emitter") - comp := component.FromContext(ctx) - - ch1 <- newTestEntry("p1", comp) - ch2 <- newTestEntry("p2", comp) - ch1 <- newTestEntry("p3", comp) - ch2 <- newTestEntry("p4", comp) - - close(ch1) - close(ch2) - - count := 0 - out := jc.Out() - Eventually(t, func() bool { - select { - case <-out: - count++ - default: - } - return count == 4 - }, "should merge entries from all producers") -} - -func TestJournalCollector_EnrollAfterClose(t *testing.T) { - jc := NewJournalCollector() - jc.Close() - - ch := make(chan JournalEntry) - err := jc.Enroll(ch) - assert.Error(t, err) - assert.Contains(t, err.Error(), "closing") -} - -func TestJournalCollector_CloseBlocks(t *testing.T) { - jc := NewJournalCollector() - ch := make(chan JournalEntry) - jc.Enroll(ch) - - closed := make(chan struct{}) - go func() { - jc.Close() - close(closed) - }() - - // Output (Out()) should still be open because the producer (ch) is open - select { - case <-jc.Out(): - t.Fatal("output channel closed prematurely") - case <-time.After(NegativeTestTimeout): - } - - // Output should not be reached yet - select { - case <-closed: - t.Fatal("Close() returned before producer closed") - default: - } - - close(ch) - - Eventually(t, func() bool { - select { - case _, ok := <-jc.Out(): - return !ok - default: - return false - } - }, "Out() should close after all producers close") - - Eventually(t, func() bool { - select { - case <-closed: - return true - default: - return false - } - }, "Close() should return after producers finish") -} - -func TestJournalCollector_ComponentIdentity(t *testing.T) { - jc := NewJournalCollector() - ch := make(chan JournalEntry, 1) - jc.Enroll(ch) - - mod := "test-mod" - path := "a.b.c" - ctx := component.MustNew(context.Background(), mod, path) - comp := component.FromContext(ctx) - - entry := newTestEntry("peer-id", comp) - ch <- entry - close(ch) - - out := jc.Out() - var received JournalEntry - Eventually(t, func() bool { - select { - case e := <-out: - received = e - return true - default: - return false - } - }, "should receive the entry") - - typed, ok := received.(*testJournalEntry) - assert.True(t, ok, "should be correct concrete type") - assert.Equal(t, mod, typed.Component().Module()) - assert.Equal(t, path, typed.Component().PathString()) - - jc.Close() -} diff --git a/post.go b/post.go deleted file mode 100644 index e4e6c29..0000000 --- a/post.go +++ /dev/null @@ -1,537 +0,0 @@ -package prism - -import ( - "container/list" - "context" - "git.wisehodl.dev/jay/go-mana-component" - "log/slog" - "sync" - "sync/atomic" - "time" -) - -// ---------------------------------------------------------------------------- -// Types -// ---------------------------------------------------------------------------- - -// Letters - -type LetterID = uint64 - -type OutboundLetter struct { - id uint64 - peerID string - data Envelope - ctx context.Context - cancel context.CancelFunc -} - -type LetterOutcomeKind int - -const ( - OutcomeSent LetterOutcomeKind = iota - OutcomeExpired - OutcomeCancelled - OutcomeRejected -) - -func (k LetterOutcomeKind) String() string { - switch k { - case OutcomeSent: - return "sent" - case OutcomeExpired: - return "expired" - case OutcomeCancelled: - return "cancelled" - case OutcomeRejected: - return "rejected" - default: - return "unknown" - } -} - -type LetterOutcome struct { - LetterID uint64 - PeerID string - Kind LetterOutcomeKind - SentAt time.Time - MissedAt time.Time - Retries int -} - -// Postmaster - -type Postmaster struct { - couriers map[string]*Courier - poolHasPeer func(id string) (string, bool) - poolEvents <-chan PoolEvent // Adapter.Subscribe - poolSend PoolSendFunc // Adapter.Send - counter atomic.Uint64 - - ctx context.Context - cancel context.CancelFunc - mu sync.RWMutex - wg sync.WaitGroup - cfg postmasterConfig - handler slog.Handler - logger *slog.Logger -} - -// Courier - -type Courier struct { - tasks chan courierTask - sendFunc func(data Envelope) error - - // state - queue list.List - connected bool - sending bool - - ctx context.Context - cancel context.CancelFunc - wg sync.WaitGroup - logger *slog.Logger -} - -// Messages - -type courierTask interface { - dispatch(c *Courier) -} - -// Options - -const ( - DefaultPostmasterDeadline = 30 * time.Second -) - -type PostmasterOption func(*postmasterConfig) - -type postmasterConfig struct { - defaultDeadline time.Duration -} - -func WithDefaultDeadline(d time.Duration) PostmasterOption { - return func(c *postmasterConfig) { c.defaultDeadline = d } -} - -type SendOption func(*sendConfig) - -type sendConfig struct { - deadline time.Duration -} - -func WithDeadline(d time.Duration) SendOption { - return func(c *sendConfig) { c.deadline = d } -} - -// ---------------------------------------------------------------------------- -// Postmaster -// ---------------------------------------------------------------------------- - -func NewPostmaster( - ctx context.Context, - poolHasPeer func(id string) (string, bool), - poolEvents <-chan PoolEvent, - poolSendFunc PoolSendFunc, - handler slog.Handler, - opts ...PostmasterOption, -) *Postmaster { - ctx, cancel := context.WithCancel( - component.MustNew(ctx, "prism", "postmaster")) - - cfg := postmasterConfig{ - defaultDeadline: DefaultPostmasterDeadline, - } - for _, opt := range opts { - opt(&cfg) - } - - pm := &Postmaster{ - couriers: make(map[string]*Courier), - poolHasPeer: poolHasPeer, - poolEvents: poolEvents, - poolSend: poolSendFunc, - ctx: ctx, - cancel: cancel, - cfg: cfg, - } - - if handler != nil { - comp := component.FromContext(ctx) - pm.handler = handler - pm.logger = slog.New(handler).With(slog.Any("component", comp)) - } - - pm.wg.Add(1) - go pm.handlePoolEvents() - - return pm -} - -func (pm *Postmaster) Send( - ctx context.Context, - peerID string, - data Envelope, - callback func(LetterOutcome), - opts ...SendOption, -) (uint64, context.CancelFunc) { - cfg := sendConfig{deadline: pm.cfg.defaultDeadline} - for _, opt := range opts { - opt(&cfg) - } - - pm.mu.RLock() - defer pm.mu.RUnlock() - - // check if peer courier exists - peerID, ok := pm.poolHasPeer(peerID) - if !ok { - go callback(LetterOutcome{PeerID: peerID, Kind: OutcomeRejected}) - return 0, func() {} - } - courier, ok := pm.couriers[peerID] - if !ok { - go callback(LetterOutcome{PeerID: peerID, Kind: OutcomeRejected}) - return 0, func() {} - } - - ctx, cancel := context.WithTimeout(ctx, cfg.deadline) - letter := OutboundLetter{ - id: pm.counter.Add(1), - peerID: peerID, - data: data, - ctx: ctx, - cancel: cancel, - } - - courier.Enqueue(letter, callback) - - return letter.id, cancel -} - -func (pm *Postmaster) Peers() []string { - pm.mu.RLock() - defer pm.mu.RUnlock() - - peers := make([]string, 0, len(pm.couriers)) - for id, _ := range pm.couriers { - peers = append(peers, id) - } - return peers -} - -func (pm *Postmaster) Close() { - pm.cancel() - pm.wg.Wait() - - // close each courier - pm.mu.Lock() - couriers := pm.couriers - pm.couriers = make(map[string]*Courier) - pm.mu.Unlock() - - for _, courier := range couriers { - courier.Close() - } -} - -func (pm *Postmaster) handlePoolEvents() { - defer pm.wg.Done() - - for { - select { - case <-pm.ctx.Done(): - return - case ev := <-pm.poolEvents: - switch ev.Kind { - case EventAdded: - pm.mu.Lock() - _, exists := pm.couriers[ev.ID] - if exists { - pm.mu.Unlock() - continue - } - send := func(data Envelope) error { return pm.poolSend(ev.ID, data) } - courier := NewCourier(pm.ctx, send, pm.handler) - pm.couriers[ev.ID] = courier - pm.mu.Unlock() - - case EventRemoved: - pm.mu.Lock() - courier, exists := pm.couriers[ev.ID] - if exists { - delete(pm.couriers, ev.ID) - } - pm.mu.Unlock() - courier.Close() - - case EventConnected: - pm.mu.RLock() - courier, exists := pm.couriers[ev.ID] - if exists { - courier.HandleConnect() - } - pm.mu.RUnlock() - - case EventDisconnected: - pm.mu.RLock() - courier, exists := pm.couriers[ev.ID] - if exists { - courier.HandleDisconnect() - } - pm.mu.RUnlock() - } - } - } -} - -// ---------------------------------------------------------------------------- -// Courier -// ---------------------------------------------------------------------------- - -// Letter State - -type letterState struct { - letter OutboundLetter - onOutcome func(LetterOutcome) - - sentAt time.Time - missedAt time.Time - retries int - once sync.Once -} - -func (s *letterState) isCancelled() bool { - return s.letter.ctx.Err() != nil -} - -func (s *letterState) countRetry() { s.retries++ } -func (s *letterState) setSentAt(at time.Time) { s.sentAt = at } -func (s *letterState) setMissedAt(at time.Time) { s.missedAt = at } - -// Courier - -func NewCourier( - ctx context.Context, - sendFunc func(data Envelope) error, // func => PoolSendFunc(id) - handler slog.Handler, -) *Courier { - ctx, cancel := context.WithCancel( - component.MustExtend(ctx, "courier")) - - c := &Courier{ - tasks: make(chan courierTask, 64), - sendFunc: sendFunc, - ctx: ctx, - cancel: cancel, - } - - if handler != nil { - comp := component.FromContext(ctx) - c.logger = slog.New(handler).With(slog.Any("component", comp)) - } - - c.wg.Add(1) - go c.run() - - return c -} - -func (c *Courier) Enqueue(letter OutboundLetter, onOutcome func(LetterOutcome)) { - wrappedLetter := &letterState{ - letter: letter, - onOutcome: onOutcome, - } - c.order(taskEnqueue{letter: wrappedLetter}) -} - -func (c *Courier) HandleConnect() { - c.order(taskConnected{}) -} - -func (c *Courier) HandleDisconnect() { - c.order(taskDisconnected{}) -} - -func (c *Courier) Close() { - c.cancel() - c.wg.Wait() - c.terminate() -} - -// Internal - -func (c *Courier) order(task courierTask) { - select { - case <-c.ctx.Done(): - case c.tasks <- task: - } -} - -func (c *Courier) run() { - defer c.wg.Done() - - for { - select { - case <-c.ctx.Done(): - return - case task := <-c.tasks: - task.dispatch(c) - c.maybeSend() - } - } -} - -func (c *Courier) maybeSend() { - if !c.preflight() { - c.drain() - return - } - - s, ok := c.pop() - if !ok { - return - } - c.sending = true - - c.wg.Add(1) - go c.sendOnce(s) - -} - -func (c *Courier) sendOnce(s *letterState) { - defer c.wg.Done() - err := c.sendFunc(s.letter.data) - c.order(taskHandleSendResult{letter: s, at: time.Now(), err: err}) -} - -func (c *Courier) doneOnce(s *letterState) { - var kind LetterOutcomeKind - if s.isCancelled() { - // letter was cancelled - if s.letter.ctx.Err() == context.DeadlineExceeded { - // letter expired - kind = OutcomeExpired - } else { - // letter was cancelled externally - kind = OutcomeCancelled - } - } else { - // letter was sent - kind = OutcomeSent - } - - outcome := LetterOutcome{ - LetterID: s.letter.id, - PeerID: s.letter.peerID, - Kind: kind, - SentAt: s.sentAt, - MissedAt: s.missedAt, - Retries: s.retries, - } - - s.once.Do(func() { - s.letter.cancel() - go s.onOutcome(outcome) - }) -} - -func (c *Courier) terminate() { - // cancel remaining letters - for { - s, ok := c.pop() - if !ok { - break - } - s.letter.cancel() - s.setMissedAt(time.Now()) - c.doneOnce(s) - } -} - -// Helpers - -func (c *Courier) preflight() bool { - isConnected := c.connected - notAlreadySending := !c.sending - hasQueuedLetters := c.queue.Len() > 0 - return isConnected && notAlreadySending && hasQueuedLetters -} - -func (c *Courier) drain() { - for { - front := c.queue.Front() - if front == nil { - return - } - - s := front.Value.(*letterState) - if !s.isCancelled() { - return - } - - s.setMissedAt(time.Now()) - c.doneOnce(s) - c.queue.Remove(front) - } -} - -func (c *Courier) pop() (*letterState, bool) { - for { - front := c.queue.Front() - if front == nil { - return nil, false - } - - s := front.Value.(*letterState) - c.queue.Remove(front) - - if !s.isCancelled() { - return s, true - } - - s.setMissedAt(time.Now()) - c.doneOnce(s) - } -} - -// ---------------------------------------------------------------------------- -// Courier Messages -// ---------------------------------------------------------------------------- - -type taskEnqueue struct{ letter *letterState } - -func (t taskEnqueue) dispatch(c *Courier) { - c.queue.PushBack(t.letter) -} - -type taskConnected struct{} - -func (t taskConnected) dispatch(c *Courier) { - c.connected = true -} - -type taskDisconnected struct{} - -func (t taskDisconnected) dispatch(c *Courier) { - c.connected = false -} - -type taskHandleSendResult struct { - letter *letterState - at time.Time - err error -} - -func (t taskHandleSendResult) dispatch(c *Courier) { - c.sending = false - if t.err != nil { - t.letter.countRetry() - c.queue.PushFront(t.letter) - } else { - t.letter.setSentAt(t.at) - c.doneOnce(t.letter) - } -} diff --git a/postmaster_test.go b/postmaster_test.go deleted file mode 100644 index 6215740..0000000 --- a/postmaster_test.go +++ /dev/null @@ -1,225 +0,0 @@ -package prism - -import ( - "context" - "fmt" - "github.com/stretchr/testify/assert" - "testing" - "time" -) - -// Helpers - -func mockPostmaster( - ctx context.Context, -) (pm *Postmaster, poolEvents chan PoolEvent) { - poolHasPeer := func(id string) (string, bool) { return id, true } - poolEvents = make(chan PoolEvent, 4) - poolSendFunc := func(id string, data Envelope) error { return nil } - pm = NewPostmaster(ctx, poolHasPeer, poolEvents, poolSendFunc, nil) - return -} - -func expectLetterOutcome( - t *testing.T, ch chan LetterOutcome, kind LetterOutcomeKind, -) { - t.Helper() - - var outcome LetterOutcome - Eventually(t, func() bool { - select { - default: - return false - case outcome = <-ch: - return true - } - }, "should have received outcome") - - assert.Equal(t, kind, outcome.Kind) -} - -func expectAllLetterOutcomes( - t *testing.T, ch chan LetterOutcome, kind LetterOutcomeKind, count int, -) { - t.Helper() - - outcomes := make([]LetterOutcome, 0, count) - Eventually(t, func() bool { - select { - default: - return false - case o := <-ch: - outcomes = append(outcomes, o) - return len(outcomes) == count - } - }, fmt.Sprintf("should have returned %d outcomes", count)) - - if len(outcomes) >= count { - for i := range count { - assert.Equal(t, OutcomeCancelled, outcomes[i].Kind) - } - } -} - -// Tests - -func TestPostmasterUnknownPeerSend(t *testing.T) { - ctx := context.Background() - pm, _ := mockPostmaster(ctx) - - called := make(chan LetterOutcome, 1) - pm.Send(ctx, "peer", nil, func(o LetterOutcome) { called <- o }) - - expectLetterOutcome(t, called, OutcomeRejected) -} - -func TestPostmasterSend(t *testing.T) { - ctx := context.Background() - pm, poolEvents := mockPostmaster(ctx) - - poolEvents <- PoolEvent{ID: "peer", Kind: EventAdded, At: time.Now()} - poolEvents <- PoolEvent{ID: "peer", Kind: EventConnected, At: time.Now()} - - Eventually(t, func() bool { return len(pm.Peers()) > 0 }, "should add peer") - - called := make(chan LetterOutcome, 1) - pm.Send(ctx, "peer", nil, func(o LetterOutcome) { called <- o }) - - expectLetterOutcome(t, called, OutcomeSent) -} - -func TestPostmasterCancelInFlight(t *testing.T) { - ctx := context.Background() - pm, poolEvents := mockPostmaster(ctx) - - poolEvents <- PoolEvent{ID: "peer", Kind: EventAdded, At: time.Now()} - Eventually(t, func() bool { return len(pm.Peers()) > 0 }, "should add peer") - - called := make(chan LetterOutcome, 1) - _, cancel := pm.Send(ctx, "peer", nil, func(o LetterOutcome) { called <- o }) - - // wait for letter to queue - time.Sleep(100 * time.Millisecond) - - // cancel the letter using its callback - cancel() - - // connect the pool - poolEvents <- PoolEvent{ID: "peer", Kind: EventConnected, At: time.Now()} - - expectLetterOutcome(t, called, OutcomeCancelled) -} - -func TestPostmasterExpire(t *testing.T) { - ctx := context.Background() - pm, poolEvents := mockPostmaster(ctx) - - poolEvents <- PoolEvent{ID: "peer", Kind: EventAdded, At: time.Now()} - Eventually(t, func() bool { return len(pm.Peers()) > 0 }, "should add peer") - - called := make(chan LetterOutcome, 1) - pm.Send(ctx, "peer", nil, func(o LetterOutcome) { called <- o }, - WithDeadline(1*time.Millisecond)) - - // wait for letter to queue and expire - time.Sleep(100 * time.Millisecond) - - // connect the pool - poolEvents <- PoolEvent{ID: "peer", Kind: EventConnected, At: time.Now()} - - expectLetterOutcome(t, called, OutcomeExpired) -} - -func TestPostmasterPeerRemoved(t *testing.T) { - ctx := context.Background() - pm, poolEvents := mockPostmaster(ctx) - - // add peer, but do not connect - poolEvents <- PoolEvent{ID: "peer", Kind: EventAdded, At: time.Now()} - Eventually(t, func() bool { return len(pm.Peers()) > 0 }, "should add peer") - - // send two letters - called := make(chan LetterOutcome, 2) - pm.Send(ctx, "peer", nil, func(o LetterOutcome) { called <- o }) - pm.Send(ctx, "peer", nil, func(o LetterOutcome) { called <- o }) - - // wait for them to hit the courier queue - time.Sleep(100 * time.Millisecond) - - // remove the peer - poolEvents <- PoolEvent{ID: "peer", Kind: EventRemoved, At: time.Now()} - - // expect each letter to return cancelled - expectAllLetterOutcomes(t, called, OutcomeCancelled, 2) - - // subsequent sends should fail - pm.Send(ctx, "peer", nil, func(o LetterOutcome) { called <- o }) - expectLetterOutcome(t, called, OutcomeRejected) -} - -func TestPostmasterCourierCloseRace(t *testing.T) { - ctx := context.Background() - pm, poolEvents := mockPostmaster(ctx) - - // add peer, but do not connect - poolEvents <- PoolEvent{ID: "peer", Kind: EventAdded, At: time.Now()} - Eventually(t, func() bool { return len(pm.Peers()) > 0 }, "should add peer") - - // remove the peer - poolEvents <- PoolEvent{ID: "peer", Kind: EventRemoved, At: time.Now()} - - // send a letter - time.Sleep(5 * time.Microsecond) // small wait lines up the race condition - var outcome *LetterOutcome - called := make(chan LetterOutcome, 1) - pm.Send(ctx, "peer", nil, func(o LetterOutcome) { called <- o }) - - Eventually(t, func() bool { - select { - default: - return false - case o := <-called: - outcome = &o - return true - } - }, "should have returned 1 outcomes") - - if outcome == nil { - t.Fatal("did not receive an outcome") - } - - // depending on the race, the outcome could be: - // close, then send: send is rejected by the postmaster - // send, then close: send is cancelled by the courier - assert.Contains(t, - []LetterOutcomeKind{OutcomeCancelled, OutcomeRejected}, - outcome.Kind, - ) -} - -func TestPostmasterClose(t *testing.T) { - ctx := context.Background() - pm, poolEvents := mockPostmaster(ctx) - - // add peer, but do not connect - poolEvents <- PoolEvent{ID: "peer", Kind: EventAdded, At: time.Now()} - Eventually(t, func() bool { return len(pm.Peers()) > 0 }, "should add peer") - - // send two letters - called := make(chan LetterOutcome, 2) - pm.Send(ctx, "peer", nil, func(o LetterOutcome) { called <- o }) - pm.Send(ctx, "peer", nil, func(o LetterOutcome) { called <- o }) - - // wait for them to hit the courier queue - time.Sleep(100 * time.Millisecond) - - // close postmaster - pm.Close() - - // expect each letter to return cancelled - expectAllLetterOutcomes(t, called, OutcomeCancelled, 2) - - // subsequent sends should be rejected - pm.Send(ctx, "peer", nil, func(o LetterOutcome) { called <- o }) - expectLetterOutcome(t, called, OutcomeRejected) -} diff --git a/queryreq_test.go b/queryreq_test.go deleted file mode 100644 index 8828353..0000000 --- a/queryreq_test.go +++ /dev/null @@ -1 +0,0 @@ -package prism diff --git a/req.go b/req.go deleted file mode 100644 index d58684d..0000000 --- a/req.go +++ /dev/null @@ -1,740 +0,0 @@ -package prism - -import ( - "context" - "encoding/base32" - "fmt" - "git.wisehodl.dev/jay/go-mana-component" - "git.wisehodl.dev/jay/go-roots-ws" - "log/slog" - "sync" - "time" -) - -var ( - _ fmt.Formatter -) - -// ---------------------------------------------------------------------------- -// Types -// ---------------------------------------------------------------------------- - -// Outputs - -type ReqEvent struct { - PeerID string - ReceivedAt time.Time - Data []byte -} - -type ReqMessage struct { - PeerID string - ReceivedAt time.Time - Data string -} - -// Options - -const ( - defaultLabel = "REQ" - defaultEOSETimeout = 30 * time.Second -) - -type reqConfig struct { - id string - label string - eoseTimeout time.Duration -} - -type ReqOption func(*reqConfig) - -// Request Manager - -type ReqManager struct { - subs map[string]Request - byPeer map[string]map[string]struct{} // peerID -> subID set - - postmaster *Postmaster - collector *JournalCollector - journals chan JournalEntry // JournalCollector.Enroll - - isConnected func(peerID string) bool // Adapter.IsConnected - poolEvents <-chan PoolEvent // Adapter.Subscribe - poolInbox <-chan InboundLetter // Clerk.Subscribe - - ctx context.Context - cancel context.CancelFunc - mu sync.RWMutex - wg sync.WaitGroup - handler slog.Handler - logger *slog.Logger -} - -type Request interface { - order(reqTask) - Peers() []string - Close() -} - -// Base Request - -type request struct { - id string - req Envelope - - tasks chan reqTask - closing bool - done chan struct{} - - buffer chan ReqEvent - events chan ReqEvent - messages chan ReqMessage - - postmaster *Postmaster - journals chan JournalEntry - isConnected func(peerID string) bool - onClose func() - - ctx context.Context - wg sync.WaitGroup - peerWg sync.WaitGroup - logger *slog.Logger -} - -// Stream Request - -type StreamReq struct { - *request - peers map[string]*streamPeer -} - -type streamPeer struct { - reqSent bool - closeSent bool - closed bool - closeOnce sync.Once -} - -// Query Request - -type QueryReq struct { - *request - peers map[string]*queryPeer - eoseTimeout time.Duration -} - -type queryPeer struct { - reqSent bool - eoseTimer *time.Timer - closeSent bool - closed bool - closeOnce sync.Once -} - -// ---------------------------------------------------------------------------- -// Request Options -// ---------------------------------------------------------------------------- - -func newReqConfig(opts ...ReqOption) reqConfig { - cfg := reqConfig{ - id: "", - label: defaultLabel, - eoseTimeout: defaultEOSETimeout, - } - for _, opt := range opts { - opt(&cfg) - } - return cfg -} - -func WithID(id string) ReqOption { - return func(c *reqConfig) { - c.id = id - } -} - -func WithLabel(label string) ReqOption { - return func(c *reqConfig) { - c.label = label - } -} - -func WithEOSETimeout(timeout time.Duration) ReqOption { - return func(c *reqConfig) { - c.eoseTimeout = timeout - } -} - -// ---------------------------------------------------------------------------- -// Request Manager -// ---------------------------------------------------------------------------- - -func NewReqManager( - ctx context.Context, - postmaster *Postmaster, - isConnected func(string) bool, - poolEvents <-chan PoolEvent, - poolInbox <-chan InboundLetter, - collector *JournalCollector, - handler slog.Handler, -) *ReqManager { - return nil -} - -func (m *ReqManager) OpenStream( - filters [][]byte, - peers []string, - opts ...ReqOption, -) ( - id string, - events <-chan ReqEvent, - messages <-chan ReqMessage, - err error, -) { - return "", nil, nil, nil -} - -func (m *ReqManager) OpenQuery( - filters [][]byte, - peers []string, - opts ...ReqOption, -) ( - id string, - events <-chan ReqEvent, - messages <-chan ReqMessage, - err error, -) { - return "", nil, nil, nil -} - -func (m *ReqManager) CloseReq(id string) error { - return nil -} - -func (m *ReqManager) Close() {} - -func (m *ReqManager) makeOnClose(subID string, peers []string) func() { - return func() {} -} - -func (m *ReqManager) routeInbox() { - // parses envelope label and sub ID from the letter - // looks up in sub registry - // calls req.order() -} - -func (m *ReqManager) routeEvents() { - // reads PoolEvent - // looks up in m.byPeer - // calls req.order() on each matching request -} - -// Helpers - -func cleanPeers(peers []string) []string { - return nil -} - -var encoder = base32.StdEncoding.WithPadding(base32.NoPadding) - -func generateID(prefix string) string { - return "" -} - -// ---------------------------------------------------------------------------- -// Base Request -// ---------------------------------------------------------------------------- - -func (r *request) runReturnEvents() { - defer r.wg.Done() - defer close(r.events) - defer close(r.messages) - bufferedPipe(r.buffer, r.events) -} - -func (r *request) dispatchEvent(task taskEvent) { - select { - case <-r.done: - case r.buffer <- ReqEvent{ - PeerID: task.peerID, ReceivedAt: task.at, Data: task.data}: - } -} - -func (r *request) emit(entry JournalEntry) { - select { - case <-r.done: - case r.journals <- entry: - } -} - -func (r *request) order(task reqTask) { - select { - case <-r.done: - case r.tasks <- task: - } -} - -func (r *request) Close() { - r.order(newCloseReq()) - r.wg.Wait() -} - -func (r *request) terminate() { - defer r.wg.Done() - r.peerWg.Wait() - close(r.done) - close(r.buffer) - if r.journals != nil { - close(r.journals) - } - r.onClose() -} - -// ---------------------------------------------------------------------------- -// Stream Request -// ---------------------------------------------------------------------------- - -func NewStreamReq( - ctx context.Context, - id string, - filters [][]byte, - peers []string, - postmaster *Postmaster, - isConnected func(string) bool, - collector *JournalCollector, - onClose func(), - handler slog.Handler, -) *StreamReq { - ctx = component.MustExtend(ctx, "stream") - - r := &StreamReq{ - request: &request{ - id: id, - req: envelope.EncloseReq(id, filters), - - tasks: make(chan reqTask, len(peers)*16), - done: make(chan struct{}), - - buffer: make(chan ReqEvent, len(peers)*16), - events: make(chan ReqEvent), - messages: make(chan ReqMessage, len(peers)), - - postmaster: postmaster, - isConnected: isConnected, - onClose: onClose, - - ctx: ctx, - }, - peers: make(map[string]*streamPeer), - } - - if collector != nil { - r.journals = make(chan JournalEntry, len(peers)*16) - collector.Enroll(r.journals) - } - - if handler != nil { - c := component.FromContext(ctx) - r.logger = slog.New(handler).With(slog.Any("component", c)) - } - - for _, peerID := range peers { - r.peers[peerID] = &streamPeer{} - r.peerWg.Add(1) - } - - r.wg.Add(2) - go r.run() - go r.runReturnEvents() - - // send initial REQs - for id := range r.peers { - if r.isConnected(id) { - r.sendReq(id) - } - } - - return r -} - -func (r *StreamReq) run() { - defer r.wg.Done() - - for { - select { - case <-r.done: - return - case t := <-r.tasks: - r.dispatch(t) - } - } -} - -func (r *StreamReq) Peers() []string { - peers := make([]string, 0, len(r.peers)) - for p := range r.peers { - peers = append(peers, p) - } - return peers -} - -func (r *StreamReq) sendReq(peerID string) { - _, ok := r.peers[peerID] - if !ok { - return - } - - id, _ := r.postmaster.Send(r.ctx, peerID, r.req, - func(o LetterOutcome) { r.order(newReqOutcomeTask(peerID, o)) }) - - c := component.FromContext(r.ctx) - r.emit(NewReqQueuedJournal(peerID, c, ReqQueuedData{ - SubID: r.id, LetterID: id, QueuedAt: time.Now(), - })) -} - -func (r *StreamReq) sendClose(peerID string) { - peer, ok := r.peers[peerID] - if !ok || peer.closeSent { - return - } - - if !peer.reqSent { - r.closePeer(peerID) - return - } - - id, _ := r.postmaster.Send(r.ctx, peerID, envelope.EncloseClose(r.id), - func(o LetterOutcome) { r.order(newCloseOutcomeTask(peerID, o)) }) - - peer.closeSent = true - c := component.FromContext(r.ctx) - r.emit(NewCloseQueuedJournal(peerID, c, CloseQueuedData{ - SubID: r.id, LetterID: id, QueuedAt: time.Now(), - })) -} - -func (r *StreamReq) closePeer(peerID string) { - peer, ok := r.peers[peerID] - if !ok { - return - } - - peer.closeOnce.Do(func() { - r.peerWg.Done() - peer.closed = true - }) -} - -func (r *StreamReq) dispatch(task reqTask) { - switch t := task.(type) { - case taskReqOutcome: - r.dispatchReqOutcome(t) - - case taskCloseOutcome: - r.dispatchCloseOutcome(t) - - case taskEvent: - r.dispatchEvent(t) - - case taskEOSE: - r.dispatchEOSE(t) - - case taskClosed: - r.dispatchClosed(t) - - case taskClosePeer: - r.dispatchClosePeer(t) - - case taskCloseReq: - r.dispatchCloseReq(t) - - case taskHandleReconnect: - r.dispatchHandleReconnect(t) - } -} - -func (r *StreamReq) dispatchReqOutcome(task taskReqOutcome) { - peer := r.peers[task.peerID] - if task.outcome.Kind == OutcomeSent { - peer.reqSent = true - } - - c := component.FromContext(r.ctx) - r.emit(NewReqSendOutcomeJournal(task.peerID, c, ReqSendOutcomeData{ - SubID: r.id, - LetterID: task.outcome.LetterID, - Outcome: task.outcome.Kind, - SentAt: task.outcome.SentAt, - MissedAt: task.outcome.MissedAt, - RetryCount: task.outcome.Retries, - })) -} - -func (r *StreamReq) dispatchCloseOutcome(task taskCloseOutcome) { - r.closePeer(task.peerID) - - c := component.FromContext(r.ctx) - r.emit(NewCloseSendOutcomeJournal(task.peerID, c, CloseSendOutcomeData{ - SubID: r.id, - LetterID: task.outcome.LetterID, - Outcome: task.outcome.Kind, - SentAt: task.outcome.SentAt, - MissedAt: task.outcome.MissedAt, - RetryCount: task.outcome.Retries, - })) -} - -func (r *StreamReq) dispatchEOSE(task taskEOSE) { - c := component.FromContext(r.ctx) - r.emit(NewReceivedEOSEJournal(task.peerID, c, ReceivedEOSEData{ - SubID: r.id, At: task.at, - })) -} - -func (r *StreamReq) dispatchClosed(task taskClosed) { - c := component.FromContext(r.ctx) - r.emit(NewReceivedClosedJournal(task.peerID, c, ReceivedClosedData{ - SubID: r.id, At: task.at, Message: task.message, - })) - - peer := r.peers[task.peerID] - if peer.closed { - return - } - - select { - case <-r.done: - case r.messages <- ReqMessage{ - PeerID: task.peerID, - ReceivedAt: task.at, - Data: task.message, - }: - } - - r.closePeer(task.peerID) -} - -func (r *StreamReq) dispatchClosePeer(task taskClosePeer) { - r.closePeer(task.peerID) -} - -func (r *StreamReq) dispatchCloseReq(task taskCloseReq) { - if r.closing { - return - } - - r.closing = true - - for id, peer := range r.peers { - if !peer.closed { - if !r.isConnected(id) { - r.closePeer(id) - } else { - r.sendClose(id) - } - } - } - - r.wg.Add(1) - go r.terminate() -} - -func (r *StreamReq) dispatchHandleReconnect(task taskHandleReconnect) { - peer, ok := r.peers[task.peerID] - if !ok || peer.closed || r.closing || peer.closeSent { - return - } - r.sendReq(task.peerID) -} - -// ---------------------------------------------------------------------------- -// Query Request -// ---------------------------------------------------------------------------- - -func NewQueryReq( - ctx context.Context, - id string, - filters [][]byte, - peers []string, - postmaster *Postmaster, - isConnected func(string) bool, - journals chan<- JournalEntry, - eoseTimeout time.Duration, - onClose func(), - handler slog.Handler, -) *QueryReq { - // start buffered pipe to event output - // pipe return drives channel closures - return nil -} - -func (r *QueryReq) Peers() []string { - peers := make([]string, 0, len(r.peers)) - for p := range r.peers { - peers = append(peers, p) - } - return peers -} - -func (r *QueryReq) sendReq(peerID string) error { - return nil -} - -func (r *QueryReq) sendClose(peerID string) error { - return nil -} - -func (r *QueryReq) run() { - defer r.wg.Done() - - for { - select { - case <-r.done: - return - case t := <-r.tasks: - r.dispatch(t) - } - } -} - -func (r *QueryReq) dispatch(task reqTask) { - switch t := task.(type) { - case taskReqOutcome: - r.dispatchReqOutcome(t) - - case taskCloseOutcome: - r.dispatchCloseOutcome(t) - - case taskEvent: - r.dispatchEvent(t) - - case taskEOSE: - r.dispatchEOSE(t) - - case taskClosed: - r.dispatchClosed(t) - - case taskClosePeer: - r.dispatchClosePeer(t) - - case taskCloseReq: - r.dispatchCloseReq(t) - - case taskMissedEOSE: - r.dispatchMissedEOSE(t) - } -} - -func (r *QueryReq) dispatchReqOutcome(task taskReqOutcome) {} - -func (r *QueryReq) dispatchCloseOutcome(task taskCloseOutcome) {} - -func (r *QueryReq) dispatchEOSE(task taskEOSE) {} - -func (r *QueryReq) dispatchClosed(task taskClosed) {} - -func (r *QueryReq) dispatchClosePeer(task taskClosePeer) {} - -func (r *QueryReq) dispatchCloseReq(task taskCloseReq) {} - -func (r *QueryReq) dispatchMissedEOSE(task taskMissedEOSE) {} - -// ---------------------------------------------------------------------------- -// Request Tasks -// ---------------------------------------------------------------------------- - -// Types - -type reqTask interface{ reqTask() } // gates task channel - -type taskReqOutcome struct { - peerID string - outcome LetterOutcome -} - -func (taskReqOutcome) reqTask() {} - -type taskCloseOutcome struct { - peerID string - outcome LetterOutcome -} - -func (taskCloseOutcome) reqTask() {} - -type taskEvent struct { - peerID string - at time.Time - data Envelope -} - -func (taskEvent) reqTask() {} - -type taskEOSE struct { - peerID string - at time.Time -} - -func (taskEOSE) reqTask() {} - -type taskClosed struct { - peerID string - at time.Time - message string -} - -func (taskClosed) reqTask() {} - -type taskClosePeer struct{ peerID string } - -func (taskClosePeer) reqTask() {} - -type taskCloseReq struct{} - -func (taskCloseReq) reqTask() {} - -type taskHandleReconnect struct{ peerID string } - -func (taskHandleReconnect) reqTask() {} - -type taskMissedEOSE struct{ peerID string } - -func (taskMissedEOSE) reqTask() {} - -// Constructors - -func newReqOutcomeTask(peerID string, outcome LetterOutcome) taskReqOutcome { - return taskReqOutcome{peerID: peerID, outcome: outcome} -} - -func newCloseOutcomeTask(peerID string, outcome LetterOutcome) taskCloseOutcome { - return taskCloseOutcome{peerID: peerID, outcome: outcome} -} - -func newEventTask(peerID string, at time.Time, data Envelope) taskEvent { - return taskEvent{peerID: peerID, at: at, data: data} -} - -func newEOSETask(peerID string, at time.Time) taskEOSE { - return taskEOSE{peerID: peerID, at: at} -} - -func newClosedTask(peerID string, at time.Time, message string) taskClosed { - return taskClosed{peerID: peerID, at: at, message: message} -} - -func newClosePeerTask(peerID string) taskClosePeer { - return taskClosePeer{peerID: peerID} -} - -func newCloseReq() taskCloseReq { - return taskCloseReq{} -} - -func newHandleReconnect(peerID string) taskHandleReconnect { - return taskHandleReconnect{peerID: peerID} -} - -func newMissedEOSETask(peerID string) taskMissedEOSE { - return taskMissedEOSE{peerID: peerID} -} diff --git a/reqmanager_test.go b/reqmanager_test.go deleted file mode 100644 index 8828353..0000000 --- a/reqmanager_test.go +++ /dev/null @@ -1 +0,0 @@ -package prism diff --git a/request.go b/request.go new file mode 100644 index 0000000..dedbe94 --- /dev/null +++ b/request.go @@ -0,0 +1,339 @@ +package prism + +import ( + "context" + "encoding/base32" + "fmt" + "git.wisehodl.dev/jay/go-honeybee" + "git.wisehodl.dev/jay/go-mana-component" + "git.wisehodl.dev/jay/go-roots-ws" + "log/slog" + "sync" + "time" +) + +// ---------------------------------------------------------------------------- +// Types +// ---------------------------------------------------------------------------- + +type ReqEvent struct { + PeerID string + ReceivedAt time.Time + Data []byte +} + +type ReqClosed struct { + PeerID string + ReceivedAt time.Time + Data string +} + +type RequestManager struct { + reqs map[string][][]byte + inboxSubs map[string]chan<- InboxMessage + done chan struct{} + reqWg sync.WaitGroup + + envoy *Envoy + events <-chan OutboundPoolEvent + inbox <-chan InboxMessage + + ctx context.Context + cancel context.CancelFunc + mu sync.RWMutex + wg sync.WaitGroup + handler slog.Handler + logger *slog.Logger +} + +type request struct { + id string + req []byte + query bool + + inbox <-chan InboxMessage + stop chan struct{} + terminate func() + + events chan ReqEvent + closed chan ReqClosed + + ctx context.Context + cancel context.CancelFunc + wg sync.WaitGroup + logger *slog.Logger +} + +// ---------------------------------------------------------------------------- +// Helpers +// ---------------------------------------------------------------------------- + +var encoder = base32.StdEncoding.WithPadding(base32.NoPadding) + +func generateID() string { + return "" +} + +// ---------------------------------------------------------------------------- +// Request Manager +// ---------------------------------------------------------------------------- + +func NewRequestManager(envoy *Envoy) *RequestManager { + ctx, cancel := context.WithCancel( + component.MustExtend(envoy.Context(), "request_manager")) + + m := &RequestManager{ + reqs: make(map[string]*request), + envoy: envoy, + events: envoy.SubscribeEvents(), + inbox: envoy.SubscribeInbox([]string{"EVENT", "EOSE", "CLOSED"}), + ctx: ctx, + cancel: cancel, + } + + if h := envoy.Handler(); h != nil { + comp := component.FromContext(ctx) + m.handler = h + m.logger = slog.New(h).With(slog.Any("component", comp)) + } + + m.wg.Add(1) + go m.handleEvents() + + return m +} + +func (m *RequestManager) Stream( + filters [][]byte, +) ( + reqID string, + events <-chan ReqEvent, + closed <-chan ReqClosed, +) { + ctx := component.MustExtend(m.ctx, "stream") + id := generateID() + terminate := func() { + m.mu.Lock() + defer m.mu.Unlock() + m.unsubscribeInboxLock(id) + delete(m.reqs, id) + m.reqWg.Done() + } + + m.mu.Lock() + defer m.mu.Unlock() + + m.reqWg.Add(1) + r := newStreamRequest(ctx, id, envelope.EncloseReq(id, filters), + m.subscribeInboxLock(id), m.done, terminate, m.handler) + + m.reqs[id] = r + + return id, r.Events(), r.Closed() +} + +func (m *RequestManager) Query( + filters [][]byte, + timeout time.Duration, +) ( + events []ReqEvent, + closed *ReqClosed, +) { + ctx, _ := context.WithTimeout(component.MustExtend(m.ctx, "query"), timeout) + + id := generateID() + terminate := func() { + m.mu.Lock() + defer m.mu.Unlock() + m.unsubscribeInboxLock(id) + } + + m.mu.Lock() + r := newQueryRequest(ctx, id, envelope.EncloseReq(id, filters), + m.subscribeInboxLock(id), m.done, terminate, m.handler) + m.mu.Unlock() + + for { + select { + case <-m.ctx.Done(): + return + case rEvent, ok := <-r.Events(): + if !ok { + return + } + events = append(events, rEvent) + case rClosed := <-r.Closed(): + closed = &rClosed + return + } + } + +} + +func (m *RequestManager) Cancel(id string) error { + req, ok := m.reqs[id] + if !ok { + return fmt.Errorf("req not found: %s", id) + } + req.Close() + return nil +} + +func (m *RequestManager) Close() { + m.cancel() + m.wg.Wait() +} + +func (m *RequestManager) start() { + +} + +func (m *RequestManager) stop() { + +} + +func (m *RequestManager) subscribeInboxLock(id string) <-chan InboxMessage { + ch := make(chan InboxMessage) + m.inboxSubs[id] = ch + return ch +} + +func (m *RequestManager) unsubscribeInboxLock(id string) { + ch, ok := m.inboxSubs[id] + if !ok { + return + } + close(ch) + delete(m.inboxSubs, id) +} + +func (m *RequestManager) handleEvents() { + defer m.wg.Done() + + for { + select { + case <-m.ctx.Done(): + return + case ev := <-m.events: + switch ev.Kind { + case EventConnected: + m.start() + + case EventDisconnected: + m.stop() + } + } + } +} + +func (m *RequestManager) routeInbox() { + defer m.wg.Done() + + for { + select { + case <-m.ctx.Done(): + return + case ev, ok := <-m.inbox: + if !ok { + return + } + + url, err := honeybee.NormalizeURL(ev.ID) + if err != nil { + continue + } + + m.mu.RLock() + sub, ok := m.inboxSubs[url] + m.mu.RUnlock() + + if !ok { + continue + } + + select { + case <-m.ctx.Done(): + return + case sub <- ev: + } + } + } +} + +// ---------------------------------------------------------------------------- +// Request +// ---------------------------------------------------------------------------- + +func newStreamRequest( + ctx context.Context, + id string, + req []byte, + inbox <-chan InboxMessage, + stop chan struct{}, + terminate func(), + handler slog.Handler, +) *request { + ctx, cancel := context.WithCancel(component.MustExtend(ctx, "request")) + + r := &request{ + id: id, + req: req, + query: false, + inbox: inbox, + stop: stop, + terminate: terminate, + ctx: ctx, + cancel: cancel, + } + + if handler != nil { + comp := component.FromContext(ctx) + r.logger = slog.New(handler).With(slog.Any("component", comp)) + } + + return r +} + +func newQueryRequest( + ctx context.Context, + id string, + req []byte, + inbox <-chan InboxMessage, + stop chan struct{}, + terminate func(), + handler slog.Handler, +) *request { + ctx, cancel := context.WithCancel(component.MustExtend(ctx, "request")) + + r := &request{ + id: id, + req: req, + query: true, + inbox: inbox, + stop: stop, + terminate: terminate, + ctx: ctx, + cancel: cancel, + } + + if handler != nil { + comp := component.FromContext(ctx) + r.logger = slog.New(handler).With(slog.Any("component", comp)) + } + + return r +} + +func (r *request) Close() { + r.cancel() + r.wg.Wait() + r.terminate() +} + +func (r *request) Events() <-chan ReqEvent { + return r.events +} + +func (r *request) Closed() <-chan ReqClosed { + return r.closed +} diff --git a/streamreq_test.go b/streamreq_test.go deleted file mode 100644 index 42687d5..0000000 --- a/streamreq_test.go +++ /dev/null @@ -1,613 +0,0 @@ -package prism - -import ( - "context" - "fmt" - "git.wisehodl.dev/jay/go-mana-component" - "git.wisehodl.dev/jay/go-roots-ws" - "github.com/stretchr/testify/assert" - "reflect" - "slices" - "sync" - "sync/atomic" - "testing" - "time" -) - -// TODO: remove -var ( - _ context.Context - _ assert.Assertions - _ testing.T - _ time.Time - _ fmt.Formatter -) - -// Helpers - -type reqTestHarness struct { - ctx context.Context - pm *Postmaster - events chan PoolEvent - sent map[string][]string - sentMu *sync.RWMutex - isConnected func(string) bool - collector *JournalCollector - journals <-chan JournalEntry - closed atomic.Bool -} - -func setupReqHarness(t *testing.T, peers []string) reqTestHarness { - ctx := component.MustNew(context.Background(), "prism", "test") - pm, poolEvents, sent, sentMu, isConnected := mockReqPostmaster(t, ctx, peers) - collector := NewJournalCollector() - journals := collector.Out() - return reqTestHarness{ - ctx: ctx, - pm: pm, - events: poolEvents, - sent: sent, - sentMu: sentMu, - isConnected: isConnected, - collector: collector, - journals: journals, - } -} - -func mockReqPostmaster( - t *testing.T, - ctx context.Context, - peers []string, -) ( - pm *Postmaster, - poolEvents chan PoolEvent, - sent map[string][]string, - sentMu *sync.RWMutex, - isConnected func(id string) bool, -) { - t.Helper() - - poolHasPeer := func(id string) (string, bool) { - if ok := slices.Contains(peers, id); ok { - return id, true - } - return "", false - } - - poolEvents = make(chan PoolEvent, 4) - pmEvents := make(chan PoolEvent, 4) - - connected := make(map[string]bool) - connMu := sync.RWMutex{} - isConnected = func(id string) bool { - connMu.RLock() - defer connMu.RUnlock() - return connected[id] - } - - go func() { - for ev := range poolEvents { - connMu.Lock() - switch ev.Kind { - case EventConnected: - connected[ev.ID] = true - case EventDisconnected: - connected[ev.ID] = false - } - connMu.Unlock() - pmEvents <- ev - } - }() - - sent = make(map[string][]string) - sentMu = &sync.RWMutex{} - poolSendFunc := func(id string, data Envelope) error { - sentMu.Lock() - defer sentMu.Unlock() - sent[id] = append(sent[id], string(data)) - return nil - } - - pm = NewPostmaster(ctx, poolHasPeer, pmEvents, poolSendFunc, nil) - - for _, id := range peers { - poolEvents <- PoolEvent{ID: id, Kind: EventAdded, At: time.Now()} - connected[id] = false - } - - Eventually(t, func() bool { return len(pm.Peers()) == len(peers) }, - "should add peers") - - return -} - -func expectSentMessage(t *testing.T, - sent map[string][]string, - mu *sync.RWMutex, - peerID string, - msg []byte, - index int, -) { - t.Helper() - - Eventually(t, func() bool { - mu.RLock() - defer mu.RUnlock() - if len(sent[peerID]) <= index { - return false - } - return sent[peerID][index] == string(msg) - }, fmt.Sprintf("expected message to be sent to %q: %s", peerID, string(msg))) -} - -func neverSentMessage(t *testing.T, - sent map[string][]string, - mu *sync.RWMutex, - peerID string, - msg []byte, - index int, -) { - t.Helper() - - Never(t, func() bool { - mu.RLock() - defer mu.RUnlock() - if len(sent[peerID]) <= index { - return false - } - return sent[peerID][index] == string(msg) - }, fmt.Sprintf("unexpected message sent to %q: %s", peerID, string(msg))) -} - -func expectJournalEntry(t *testing.T, - journals <-chan JournalEntry, - expected reflect.Type, -) { - t.Helper() - - Eventually(t, func() bool { - select { - default: - return false - case entry := <-journals: - got := reflect.TypeOf(entry) - return expected == got - } - }, fmt.Sprintf("expected journal entry: %s", expected)) -} - -// Tests - -func TestStreamReq_InitialReq(t *testing.T) { - t.Run("sends req to one peer", func(t *testing.T) { - peers := []string{"peer"} - h := setupReqHarness(t, peers) - - // connect to peer - h.events <- PoolEvent{ID: "peer", Kind: EventConnected, At: time.Now()} - Eventually(t, func() bool { return h.isConnected("peer") }, - "expected peer to connect") - - // open req - filters := [][]byte{[]byte("{}")} - req := NewStreamReq( - h.ctx, "REQ", filters, peers, h.pm, h.isConnected, - h.collector, func() { h.closed.Store(true) }, nil) - - expectedReq := envelope.EncloseReq("REQ", filters) - expectedClose := envelope.EncloseClose("REQ") - - expectJournalEntry(t, h.journals, reflect.TypeOf(ReqQueuedJournal{})) - expectJournalEntry(t, h.journals, reflect.TypeOf(ReqSendOutcomeJournal{})) - expectSentMessage(t, h.sent, h.sentMu, "peer", expectedReq, 0) - - // close req - req.Close() - - expectJournalEntry(t, h.journals, reflect.TypeOf(CloseQueuedJournal{})) - expectJournalEntry(t, h.journals, reflect.TypeOf(CloseSendOutcomeJournal{})) - expectSentMessage(t, h.sent, h.sentMu, "peer", expectedClose, 1) - - Eventually(t, func() bool { return h.closed.Load() }, - "expected close callback to be called") - }) - - t.Run("doesn't send to disconnected peer", func(t *testing.T) { - peers := []string{"peer"} - h := setupReqHarness(t, peers) - - // open req - filters := [][]byte{[]byte("{}")} - req := NewStreamReq( - h.ctx, "REQ", filters, peers, h.pm, h.isConnected, - h.collector, func() { h.closed.Store(true) }, nil) - - expectedReq := envelope.EncloseReq("REQ", filters) - expectedClose := envelope.EncloseClose("REQ") - - neverSentMessage(t, h.sent, h.sentMu, "peer", expectedReq, 0) - - // close req - req.Close() - - neverSentMessage(t, h.sent, h.sentMu, "peer", expectedClose, 0) - - Eventually(t, func() bool { return h.closed.Load() }, - "expected close callback to be called") - }) - - t.Run("sends req to multiple peers", func(t *testing.T) { - peers := []string{"peer1", "peer2"} - h := setupReqHarness(t, peers) - - // connect to peers - h.events <- PoolEvent{ID: "peer1", Kind: EventConnected, At: time.Now()} - h.events <- PoolEvent{ID: "peer2", Kind: EventConnected, At: time.Now()} - Eventually(t, func() bool { return h.isConnected("peer1") }, - "expected peer 1 to connect") - Eventually(t, func() bool { return h.isConnected("peer2") }, - "expected peer 2 to connect") - - // open req - filters := [][]byte{[]byte("{}")} - req := NewStreamReq( - h.ctx, "REQ", filters, peers, h.pm, h.isConnected, - h.collector, func() { h.closed.Store(true) }, nil) - - expectedReq := envelope.EncloseReq("REQ", filters) - expectedClose := envelope.EncloseClose("REQ") - - expectSentMessage(t, h.sent, h.sentMu, "peer1", expectedReq, 0) - expectSentMessage(t, h.sent, h.sentMu, "peer2", expectedReq, 0) - - // expect two req outcomes - expectJournalEntry(t, h.journals, reflect.TypeOf(ReqSendOutcomeJournal{})) - expectJournalEntry(t, h.journals, reflect.TypeOf(ReqSendOutcomeJournal{})) - - // close req - req.Close() - - expectSentMessage(t, h.sent, h.sentMu, "peer1", expectedClose, 1) - expectSentMessage(t, h.sent, h.sentMu, "peer2", expectedClose, 1) - - Eventually(t, func() bool { return h.closed.Load() }, - "expected close callback to be called") - }) -} - -func TestStreamReq_EventForwarding(t *testing.T) { - t.Run("events are forwarded", func(t *testing.T) { - peers := []string{"peer"} - h := setupReqHarness(t, peers) - - filters := [][]byte{[]byte("{}")} - req := NewStreamReq( - h.ctx, "REQ", filters, peers, h.pm, h.isConnected, - h.collector, func() { h.closed.Store(true) }, nil) - - // simulate receive event - req.order(newEventTask("peer", time.Now(), []byte("event"))) - - // receive event - var event ReqEvent - Eventually(t, func() bool { - select { - default: - return false - case event = <-req.events: - return true - } - }, "expected event") - - assert.Equal(t, "peer", event.PeerID) - assert.False(t, event.ReceivedAt.IsZero()) - assert.Equal(t, []byte("event"), event.Data) - }) - - t.Run("events channel closes on close", func(t *testing.T) { - peers := []string{"peer"} - h := setupReqHarness(t, peers) - - // open req - filters := [][]byte{[]byte("{}")} - req := NewStreamReq( - h.ctx, "REQ", filters, peers, h.pm, h.isConnected, - h.collector, func() { h.closed.Store(true) }, nil) - - // close req - req.Close() - - Eventually(t, func() bool { - select { - default: - return false - case _, ok := <-req.events: - // expect channel close - return !ok - } - }, "expected event channel to close") - }) -} - -func TestStreamReq_EOSEHandling(t *testing.T) { - t.Run("eose emits journal", func(t *testing.T) { - peers := []string{"peer"} - h := setupReqHarness(t, peers) - - // open req - filters := [][]byte{[]byte("{}")} - req := NewStreamReq( - h.ctx, "REQ", filters, peers, h.pm, h.isConnected, - h.collector, func() { h.closed.Store(true) }, nil) - - // simulate EOSE - req.order(newEOSETask("peer", time.Now())) - - expectJournalEntry(t, h.journals, reflect.TypeOf(ReceivedEOSEJournal{})) - }) -} - -func TestStreamReq_ClosedHandling(t *testing.T) { - t.Run("closed forwards message once", func(t *testing.T) { - peers := []string{"peer"} - h := setupReqHarness(t, peers) - - // open req - filters := [][]byte{[]byte("{}")} - req := NewStreamReq( - h.ctx, "REQ", filters, peers, h.pm, h.isConnected, - h.collector, func() { h.closed.Store(true) }, nil) - - // simulate closed - req.order(newClosedTask("peer", time.Now(), "closed")) - expectJournalEntry(t, h.journals, reflect.TypeOf(ReceivedClosedJournal{})) - - // receive message - var message ReqMessage - Eventually(t, func() bool { - select { - default: - return false - case message = <-req.messages: - return true - } - }, "expected closed message") - - assert.Equal(t, "peer", message.PeerID) - assert.False(t, message.ReceivedAt.IsZero()) - assert.Equal(t, "closed", message.Data) - - // multiple closed emit journals - req.order(newClosedTask("peer", time.Now(), "closed")) - expectJournalEntry(t, h.journals, reflect.TypeOf(ReceivedClosedJournal{})) - - // but do not emit more than one message to the caller - Never(t, func() bool { - select { - default: - return false - case <-req.messages: - return true - } - }, "second closed message should not arrive") - - // close req - req.Close() - - // expect messages channel to close - Eventually(t, func() bool { - select { - default: - return false - case _, ok := <-req.messages: - // expect channel close - return !ok - } - }, "expected messages channel to close") - }) -} - -func TestStreamReq_Reconnect(t *testing.T) { - t.Run("req replays after reconnect", func(t *testing.T) { - peers := []string{"peer"} - h := setupReqHarness(t, peers) - - h.events <- PoolEvent{ID: "peer", Kind: EventConnected, At: time.Now()} - Eventually(t, func() bool { return h.isConnected("peer") }, - "expected peer to connect") - - // open req - filters := [][]byte{[]byte("{}")} - req := NewStreamReq( - h.ctx, "REQ", filters, peers, h.pm, h.isConnected, - h.collector, func() { h.closed.Store(true) }, nil) - - expectedReq := envelope.EncloseReq("REQ", filters) - expectedClose := envelope.EncloseClose("REQ") - - // initial req is sent - expectSentMessage(t, h.sent, h.sentMu, "peer", expectedReq, 0) - expectJournalEntry(t, h.journals, reflect.TypeOf(ReqSendOutcomeJournal{})) - - // cycle disconnect-reconnect - h.events <- PoolEvent{ID: "peer", Kind: EventDisconnected, At: time.Now()} - Eventually(t, func() bool { return !h.isConnected("peer") }, - "expected peer to disconnect") - h.events <- PoolEvent{ID: "peer", Kind: EventConnected, At: time.Now()} - Eventually(t, func() bool { return h.isConnected("peer") }, - "expected peer to connect") - - // simulate req manager handling connect event - req.order(newHandleReconnect("peer")) - - // expect replayed req - expectSentMessage(t, h.sent, h.sentMu, "peer", expectedReq, 1) - expectJournalEntry(t, h.journals, reflect.TypeOf(ReqSendOutcomeJournal{})) - - // close req - req.Close() - - expectSentMessage(t, h.sent, h.sentMu, "peer", expectedClose, 2) - - Eventually(t, func() bool { return h.closed.Load() }, - "expected close callback to be called") - }) - - t.Run("delayed connection sends req", func(t *testing.T) { - peers := []string{"peer"} - h := setupReqHarness(t, peers) - - // open req - filters := [][]byte{[]byte("{}")} - req := NewStreamReq( - h.ctx, "REQ", filters, peers, h.pm, h.isConnected, - h.collector, func() { h.closed.Store(true) }, nil) - - expectedReq := envelope.EncloseReq("REQ", filters) - expectedClose := envelope.EncloseClose("REQ") - - neverSentMessage(t, h.sent, h.sentMu, "peer", expectedReq, 0) - - // postmaster-side connect - h.events <- PoolEvent{ID: "peer", Kind: EventConnected, At: time.Now()} - Eventually(t, func() bool { return h.isConnected("peer") }, - "expected peer to connect") - - // simulate req manager handling connect event - req.order(newHandleReconnect("peer")) - - expectSentMessage(t, h.sent, h.sentMu, "peer", expectedReq, 0) - expectJournalEntry(t, h.journals, reflect.TypeOf(ReqSendOutcomeJournal{})) - - // close req - req.Close() - - expectSentMessage(t, h.sent, h.sentMu, "peer", expectedClose, 1) - - Eventually(t, func() bool { return h.closed.Load() }, - "expected close callback to be called") - }) - - t.Run("no replay when closing", func(t *testing.T) { - peers := []string{"peer"} - h := setupReqHarness(t, peers) - - // open req - filters := [][]byte{[]byte("{}")} - req := NewStreamReq( - h.ctx, "REQ", filters, peers, h.pm, h.isConnected, - h.collector, func() { h.closed.Store(true) }, nil) - - expectedReq := envelope.EncloseReq("REQ", filters) - - neverSentMessage(t, h.sent, h.sentMu, "peer", expectedReq, 0) - - // postmaster-side connect - h.events <- PoolEvent{ID: "peer", Kind: EventConnected, At: time.Now()} - Eventually(t, func() bool { return h.isConnected("peer") }, - "expected peer to connect") - - // close req - req.Close() - - // reconnect during or after close - req.order(newHandleReconnect("peer")) - neverSentMessage(t, h.sent, h.sentMu, "peer", expectedReq, 0) - - Eventually(t, func() bool { return h.closed.Load() }, - "expected close callback to be called") - }) -} - -func TestStreamReq_Terminal(t *testing.T) { -} - -func TestStreamReq_Close(t *testing.T) { - t.Run("close is idempotent", func(t *testing.T) { - peers := []string{"peer"} - h := setupReqHarness(t, peers) - - // connect to peer - h.events <- PoolEvent{ID: "peer", Kind: EventConnected, At: time.Now()} - Eventually(t, func() bool { return h.isConnected("peer") }, - "expected peer to connect") - - // open req - filters := [][]byte{[]byte("{}")} - req := NewStreamReq( - h.ctx, "REQ", filters, peers, h.pm, h.isConnected, - h.collector, func() { h.closed.Store(true) }, nil) - - expectedReq := envelope.EncloseReq("REQ", filters) - expectedClose := envelope.EncloseClose("REQ") - - expectSentMessage(t, h.sent, h.sentMu, "peer", expectedReq, 0) - expectJournalEntry(t, h.journals, reflect.TypeOf(ReqSendOutcomeJournal{})) - - // close req twice - req.Close() - req.Close() - - // only expect one close - expectSentMessage(t, h.sent, h.sentMu, "peer", expectedClose, 1) - - // second close never arrives - neverSentMessage(t, h.sent, h.sentMu, "peer", expectedClose, 2) - - Eventually(t, func() bool { return h.closed.Load() }, - "expected close callback to be called") - }) - - t.Run("close not sent if req was never sent", func(t *testing.T) { - peers := []string{"peer"} - h := setupReqHarness(t, peers) - - // open req - filters := [][]byte{[]byte("{}")} - req := NewStreamReq( - h.ctx, "REQ", filters, peers, h.pm, h.isConnected, - h.collector, func() { h.closed.Store(true) }, nil) - - expectedReq := envelope.EncloseReq("REQ", filters) - expectedClose := envelope.EncloseClose("REQ") - - neverSentMessage(t, h.sent, h.sentMu, "peer", expectedReq, 0) - - // close req - req.Close() - - // req was never sent, so a close should not be sent - neverSentMessage(t, h.sent, h.sentMu, "peer", expectedClose, 0) - - Eventually(t, func() bool { return h.closed.Load() }, - "expected close callback to be called") - }) - - t.Run("close not sent if req was cancelled", func(t *testing.T) { - peers := []string{"peer"} - h := setupReqHarness(t, peers) - - // open req - filters := [][]byte{[]byte("{}")} - req := NewStreamReq( - h.ctx, "REQ", filters, peers, h.pm, h.isConnected, - h.collector, func() { h.closed.Store(true) }, nil) - - expectedClose := envelope.EncloseClose("REQ") - - // simulate cancelled req outcome - req.order(newReqOutcomeTask("peer", LetterOutcome{Kind: OutcomeCancelled})) - - // close req - req.Close() - - // req was never sent, so a close should not be sent - neverSentMessage(t, h.sent, h.sentMu, "peer", expectedClose, 0) - - Eventually(t, func() bool { return h.closed.Load() }, - "expected close callback to be called") - }) -} - -func TestStreamReq_Journals(t *testing.T) { -}