From 49ce2eb2ac440e0eaeaa0ef1befe6fe6b9985418 Mon Sep 17 00:00:00 2001 From: Jay Date: Mon, 11 May 2026 21:55:51 -0400 Subject: [PATCH] completed stream request flow and tests. restructured other parts of the code. --- adapter.go | 20 +- clerk.go | 6 +- journal.go | 4 - journal_test.go | 6 +- post.go | 22 +- postmaster_test.go | 234 +++++++---------- req.go | 524 +++++++++++++++++++++++++++++--------- streamreq_test.go | 612 +++++++++++++++++++++++++++++++++++++++++++++ 8 files changed, 1121 insertions(+), 307 deletions(-) diff --git a/adapter.go b/adapter.go index 5ebb92f..85724b7 100644 --- a/adapter.go +++ b/adapter.go @@ -107,7 +107,7 @@ type Hotel struct { func NewEmbassy( ctx context.Context, pool EmbassyPlugin, - jc *JournalCollector, + collector *JournalCollector, handler slog.Handler, ) *Embassy { ctx, cancel := context.WithCancel( @@ -121,16 +121,14 @@ func NewEmbassy( cancel: cancel, } - if jc != nil { + if collector != nil { e.journals = make(chan JournalEntry, 16) - jc.Enroll(e.journals) + collector.Enroll(e.journals) } if handler != nil { - c, ok := component.Get(ctx) - if ok { - e.logger = slog.New(handler).With(slog.Any("component", c)) - } + c := component.FromContext(ctx) + e.logger = slog.New(handler).With(slog.Any("component", c)) } e.wg.Add(1) @@ -156,7 +154,7 @@ func (e *Embassy) Dispatch(url string) error { at := time.Now() if e.journals != nil { - c, _ := component.Get(e.ctx) + c := component.FromContext(e.ctx) select { case <-e.ctx.Done(): return fmt.Errorf("closing") @@ -192,7 +190,7 @@ func (e *Embassy) Dismiss(url string) error { at := time.Now() if e.journals != nil { - c, _ := component.Get(e.ctx) + c := component.FromContext(e.ctx) select { case <-e.ctx.Done(): return fmt.Errorf("closing") @@ -332,14 +330,14 @@ func (e *Embassy) runEventRouter() { if canJournal { switch kind { case EventConnected: - c, _ := component.Get(e.ctx) + c := component.FromContext(e.ctx) select { case <-e.ctx.Done(): case e.journals <- NewPeerConnectedJournal( url, c, PeerConnectedData{At: ev.At}): } case EventDisconnected: - c, _ := component.Get(e.ctx) + c := component.FromContext(e.ctx) select { case <-e.ctx.Done(): case e.journals <- NewPeerDisconnectedJournal( diff --git a/clerk.go b/clerk.go index d46f79b..e48a947 100644 --- a/clerk.go +++ b/clerk.go @@ -86,10 +86,8 @@ func NewClerk( } if handler != nil { - comp, ok := component.Get(ctx) - if ok { - c.logger = slog.New(handler).With(slog.Any("component", comp)) - } + comp := component.FromContext(ctx) + c.logger = slog.New(handler).With(slog.Any("component", comp)) } return c diff --git a/journal.go b/journal.go index 8afb21f..f358ae9 100644 --- a/journal.go +++ b/journal.go @@ -182,7 +182,6 @@ type ReqQueuedData struct { SubID string LetterID uint64 QueuedAt time.Time - Err error } func NewReqQueuedJournal( @@ -202,7 +201,6 @@ type CloseQueuedData struct { SubID string LetterID uint64 QueuedAt time.Time - Err error } func NewCloseQueuedJournal( @@ -225,7 +223,6 @@ type ReqSendOutcomeData struct { SentAt time.Time MissedAt time.Time RetryCount int - Err error } func NewReqSendOutcomeJournal( @@ -248,7 +245,6 @@ type CloseSendOutcomeData struct { SentAt time.Time MissedAt time.Time RetryCount int - Err error } func NewCloseSendOutcomeJournal( diff --git a/journal_test.go b/journal_test.go index 407c111..656a305 100644 --- a/journal_test.go +++ b/journal_test.go @@ -22,7 +22,7 @@ func TestJournalCollector_SingleProducer(t *testing.T) { jc.Enroll(ch) ctx := component.MustNew(context.Background(), "test", "emitter") - comp, _ := component.Get(ctx) + comp := component.FromContext(ctx) e1 := newTestEntry("peer1", comp) e2 := newTestEntry("peer2", comp) @@ -52,7 +52,7 @@ func TestJournalCollector_MultipleProducers(t *testing.T) { jc.Enroll(ch2) ctx := component.MustNew(context.Background(), "test", "emitter") - comp, _ := component.Get(ctx) + comp := component.FromContext(ctx) ch1 <- newTestEntry("p1", comp) ch2 <- newTestEntry("p2", comp) @@ -138,7 +138,7 @@ func TestJournalCollector_ComponentIdentity(t *testing.T) { mod := "test-mod" path := "a.b.c" ctx := component.MustNew(context.Background(), mod, path) - comp, _ := component.Get(ctx) + comp := component.FromContext(ctx) entry := newTestEntry("peer-id", comp) ch <- entry diff --git a/post.go b/post.go index 1c78dcb..e4e6c29 100644 --- a/post.go +++ b/post.go @@ -159,11 +159,9 @@ func NewPostmaster( } if handler != nil { - comp, ok := component.Get(ctx) - if ok { - pm.handler = handler - pm.logger = slog.New(handler).With(slog.Any("component", comp)) - } + comp := component.FromContext(ctx) + pm.handler = handler + pm.logger = slog.New(handler).With(slog.Any("component", comp)) } pm.wg.Add(1) @@ -178,7 +176,7 @@ func (pm *Postmaster) Send( data Envelope, callback func(LetterOutcome), opts ...SendOption, -) context.CancelFunc { +) (uint64, context.CancelFunc) { cfg := sendConfig{deadline: pm.cfg.defaultDeadline} for _, opt := range opts { opt(&cfg) @@ -191,12 +189,12 @@ func (pm *Postmaster) Send( peerID, ok := pm.poolHasPeer(peerID) if !ok { go callback(LetterOutcome{PeerID: peerID, Kind: OutcomeRejected}) - return func() {} + return 0, func() {} } courier, ok := pm.couriers[peerID] if !ok { go callback(LetterOutcome{PeerID: peerID, Kind: OutcomeRejected}) - return func() {} + return 0, func() {} } ctx, cancel := context.WithTimeout(ctx, cfg.deadline) @@ -210,7 +208,7 @@ func (pm *Postmaster) Send( courier.Enqueue(letter, callback) - return cancel + return letter.id, cancel } func (pm *Postmaster) Peers() []string { @@ -331,10 +329,8 @@ func NewCourier( } if handler != nil { - comp, ok := component.Get(ctx) - if ok { - c.logger = slog.New(handler).With(slog.Any("component", comp)) - } + comp := component.FromContext(ctx) + c.logger = slog.New(handler).With(slog.Any("component", comp)) } c.wg.Add(1) diff --git a/postmaster_test.go b/postmaster_test.go index 9fbd14c..6215740 100644 --- a/postmaster_test.go +++ b/postmaster_test.go @@ -2,79 +2,101 @@ package prism import ( "context" + "fmt" "github.com/stretchr/testify/assert" "testing" "time" ) -const testURL = "wss://test" +// Helpers -func TestPostmasterUnknownPeerSend(t *testing.T) { - ctx := context.Background() +func mockPostmaster( + ctx context.Context, +) (pm *Postmaster, poolEvents chan PoolEvent) { poolHasPeer := func(id string) (string, bool) { return id, true } - poolEvents := make(chan PoolEvent, 4) + poolEvents = make(chan PoolEvent, 4) poolSendFunc := func(id string, data Envelope) error { return nil } + pm = NewPostmaster(ctx, poolHasPeer, poolEvents, poolSendFunc, nil) + return +} - pm := NewPostmaster(ctx, poolHasPeer, poolEvents, poolSendFunc, nil) - - called := make(chan LetterOutcome, 1) - pm.Send(ctx, testURL, nil, func(o LetterOutcome) { called <- o }) +func expectLetterOutcome( + t *testing.T, ch chan LetterOutcome, kind LetterOutcomeKind, +) { + t.Helper() var outcome LetterOutcome Eventually(t, func() bool { select { default: return false - case outcome = <-called: + case outcome = <-ch: return true } }, "should have received outcome") - assert.Equal(t, OutcomeRejected, outcome.Kind) + assert.Equal(t, kind, outcome.Kind) +} + +func expectAllLetterOutcomes( + t *testing.T, ch chan LetterOutcome, kind LetterOutcomeKind, count int, +) { + t.Helper() + + outcomes := make([]LetterOutcome, 0, count) + Eventually(t, func() bool { + select { + default: + return false + case o := <-ch: + outcomes = append(outcomes, o) + return len(outcomes) == count + } + }, fmt.Sprintf("should have returned %d outcomes", count)) + + if len(outcomes) >= count { + for i := range count { + assert.Equal(t, OutcomeCancelled, outcomes[i].Kind) + } + } +} + +// Tests + +func TestPostmasterUnknownPeerSend(t *testing.T) { + ctx := context.Background() + pm, _ := mockPostmaster(ctx) + + called := make(chan LetterOutcome, 1) + pm.Send(ctx, "peer", nil, func(o LetterOutcome) { called <- o }) + + expectLetterOutcome(t, called, OutcomeRejected) } func TestPostmasterSend(t *testing.T) { ctx := context.Background() - poolHasPeer := func(id string) (string, bool) { return id, true } - poolEvents := make(chan PoolEvent, 4) - poolSendFunc := func(id string, data Envelope) error { return nil } + pm, poolEvents := mockPostmaster(ctx) - pm := NewPostmaster(ctx, poolHasPeer, poolEvents, poolSendFunc, nil) - - poolEvents <- PoolEvent{ID: testURL, Kind: EventAdded, At: time.Now()} - poolEvents <- PoolEvent{ID: testURL, Kind: EventConnected, At: time.Now()} + poolEvents <- PoolEvent{ID: "peer", Kind: EventAdded, At: time.Now()} + poolEvents <- PoolEvent{ID: "peer", Kind: EventConnected, At: time.Now()} Eventually(t, func() bool { return len(pm.Peers()) > 0 }, "should add peer") called := make(chan LetterOutcome, 1) - pm.Send(ctx, testURL, nil, func(o LetterOutcome) { called <- o }) + pm.Send(ctx, "peer", nil, func(o LetterOutcome) { called <- o }) - var outcome LetterOutcome - Eventually(t, func() bool { - select { - default: - return false - case outcome = <-called: - return true - } - }, "should have received outcome") - - assert.Equal(t, OutcomeSent, outcome.Kind) + expectLetterOutcome(t, called, OutcomeSent) } func TestPostmasterCancelInFlight(t *testing.T) { ctx := context.Background() - poolHasPeer := func(id string) (string, bool) { return id, true } - poolEvents := make(chan PoolEvent, 4) - poolSendFunc := func(id string, data Envelope) error { return nil } + pm, poolEvents := mockPostmaster(ctx) - pm := NewPostmaster(ctx, poolHasPeer, poolEvents, poolSendFunc, nil) - - poolEvents <- PoolEvent{ID: testURL, Kind: EventAdded, At: time.Now()} + poolEvents <- PoolEvent{ID: "peer", Kind: EventAdded, At: time.Now()} Eventually(t, func() bool { return len(pm.Peers()) > 0 }, "should add peer") called := make(chan LetterOutcome, 1) - cancel := pm.Send(ctx, testURL, nil, func(o LetterOutcome) { called <- o }) + _, cancel := pm.Send(ctx, "peer", nil, func(o LetterOutcome) { called <- o }) // wait for letter to queue time.Sleep(100 * time.Millisecond) @@ -83,133 +105,74 @@ func TestPostmasterCancelInFlight(t *testing.T) { cancel() // connect the pool - poolEvents <- PoolEvent{ID: testURL, Kind: EventConnected, At: time.Now()} + poolEvents <- PoolEvent{ID: "peer", Kind: EventConnected, At: time.Now()} - var outcome LetterOutcome - Eventually(t, func() bool { - select { - default: - return false - case outcome = <-called: - return true - } - }, "should have received outcome") - - // letter should drain out of the queue and return cancelled - assert.Equal(t, OutcomeCancelled, outcome.Kind) + expectLetterOutcome(t, called, OutcomeCancelled) } func TestPostmasterExpire(t *testing.T) { ctx := context.Background() - poolHasPeer := func(id string) (string, bool) { return id, true } - poolEvents := make(chan PoolEvent, 4) - poolSendFunc := func(id string, data Envelope) error { return nil } + pm, poolEvents := mockPostmaster(ctx) - pm := NewPostmaster(ctx, poolHasPeer, poolEvents, poolSendFunc, nil) - - poolEvents <- PoolEvent{ID: testURL, Kind: EventAdded, At: time.Now()} + poolEvents <- PoolEvent{ID: "peer", Kind: EventAdded, At: time.Now()} Eventually(t, func() bool { return len(pm.Peers()) > 0 }, "should add peer") called := make(chan LetterOutcome, 1) - pm.Send(ctx, testURL, nil, func(o LetterOutcome) { called <- o }, + pm.Send(ctx, "peer", nil, func(o LetterOutcome) { called <- o }, WithDeadline(1*time.Millisecond)) // wait for letter to queue and expire time.Sleep(100 * time.Millisecond) // connect the pool - poolEvents <- PoolEvent{ID: testURL, Kind: EventConnected, At: time.Now()} + poolEvents <- PoolEvent{ID: "peer", Kind: EventConnected, At: time.Now()} - var outcome LetterOutcome - Eventually(t, func() bool { - select { - default: - return false - case outcome = <-called: - return true - } - }, "should have received outcome") - - // letter should drain out of the queue and return expired - assert.Equal(t, OutcomeExpired, outcome.Kind) + expectLetterOutcome(t, called, OutcomeExpired) } func TestPostmasterPeerRemoved(t *testing.T) { ctx := context.Background() - poolHasPeer := func(id string) (string, bool) { return id, true } - poolEvents := make(chan PoolEvent, 4) - poolSendFunc := func(id string, data Envelope) error { return nil } - - pm := NewPostmaster(ctx, poolHasPeer, poolEvents, poolSendFunc, nil) + pm, poolEvents := mockPostmaster(ctx) // add peer, but do not connect - poolEvents <- PoolEvent{ID: testURL, Kind: EventAdded, At: time.Now()} + poolEvents <- PoolEvent{ID: "peer", Kind: EventAdded, At: time.Now()} Eventually(t, func() bool { return len(pm.Peers()) > 0 }, "should add peer") // send two letters - outcomes := make([]LetterOutcome, 0, 2) called := make(chan LetterOutcome, 2) - pm.Send(ctx, testURL, nil, func(o LetterOutcome) { called <- o }) - pm.Send(ctx, testURL, nil, func(o LetterOutcome) { called <- o }) + pm.Send(ctx, "peer", nil, func(o LetterOutcome) { called <- o }) + pm.Send(ctx, "peer", nil, func(o LetterOutcome) { called <- o }) // wait for them to hit the courier queue time.Sleep(100 * time.Millisecond) // remove the peer - poolEvents <- PoolEvent{ID: testURL, Kind: EventRemoved, At: time.Now()} + poolEvents <- PoolEvent{ID: "peer", Kind: EventRemoved, At: time.Now()} // expect each letter to return cancelled - Eventually(t, func() bool { - select { - default: - return false - case o := <-called: - outcomes = append(outcomes, o) - return len(outcomes) == 2 - } - }, "should have returned 2 outcomes") - - if len(outcomes) >= 2 { - assert.Equal(t, OutcomeCancelled, outcomes[0].Kind) - assert.Equal(t, OutcomeCancelled, outcomes[1].Kind) - } + expectAllLetterOutcomes(t, called, OutcomeCancelled, 2) // subsequent sends should fail - pm.Send(ctx, testURL, nil, func(o LetterOutcome) { called <- o }) - - var outcome LetterOutcome - Eventually(t, func() bool { - select { - default: - return false - case outcome = <-called: - return true - } - }, "should have received outcome") - - assert.Equal(t, OutcomeRejected, outcome.Kind) + pm.Send(ctx, "peer", nil, func(o LetterOutcome) { called <- o }) + expectLetterOutcome(t, called, OutcomeRejected) } func TestPostmasterCourierCloseRace(t *testing.T) { ctx := context.Background() - poolHasPeer := func(id string) (string, bool) { return id, true } - poolEvents := make(chan PoolEvent, 4) - poolSendFunc := func(id string, data Envelope) error { return nil } - - pm := NewPostmaster(ctx, poolHasPeer, poolEvents, poolSendFunc, nil) + pm, poolEvents := mockPostmaster(ctx) // add peer, but do not connect - poolEvents <- PoolEvent{ID: testURL, Kind: EventAdded, At: time.Now()} + poolEvents <- PoolEvent{ID: "peer", Kind: EventAdded, At: time.Now()} Eventually(t, func() bool { return len(pm.Peers()) > 0 }, "should add peer") // remove the peer - poolEvents <- PoolEvent{ID: testURL, Kind: EventRemoved, At: time.Now()} + poolEvents <- PoolEvent{ID: "peer", Kind: EventRemoved, At: time.Now()} // send a letter time.Sleep(5 * time.Microsecond) // small wait lines up the race condition var outcome *LetterOutcome called := make(chan LetterOutcome, 1) - pm.Send(ctx, testURL, nil, func(o LetterOutcome) { called <- o }) + pm.Send(ctx, "peer", nil, func(o LetterOutcome) { called <- o }) Eventually(t, func() bool { select { @@ -236,21 +199,16 @@ func TestPostmasterCourierCloseRace(t *testing.T) { func TestPostmasterClose(t *testing.T) { ctx := context.Background() - poolHasPeer := func(id string) (string, bool) { return id, true } - poolEvents := make(chan PoolEvent, 4) - poolSendFunc := func(id string, data Envelope) error { return nil } - - pm := NewPostmaster(ctx, poolHasPeer, poolEvents, poolSendFunc, nil) + pm, poolEvents := mockPostmaster(ctx) // add peer, but do not connect - poolEvents <- PoolEvent{ID: testURL, Kind: EventAdded, At: time.Now()} + poolEvents <- PoolEvent{ID: "peer", Kind: EventAdded, At: time.Now()} Eventually(t, func() bool { return len(pm.Peers()) > 0 }, "should add peer") // send two letters - outcomes := make([]LetterOutcome, 0, 2) called := make(chan LetterOutcome, 2) - pm.Send(ctx, testURL, nil, func(o LetterOutcome) { called <- o }) - pm.Send(ctx, testURL, nil, func(o LetterOutcome) { called <- o }) + pm.Send(ctx, "peer", nil, func(o LetterOutcome) { called <- o }) + pm.Send(ctx, "peer", nil, func(o LetterOutcome) { called <- o }) // wait for them to hit the courier queue time.Sleep(100 * time.Millisecond) @@ -259,33 +217,9 @@ func TestPostmasterClose(t *testing.T) { pm.Close() // expect each letter to return cancelled - Eventually(t, func() bool { - select { - default: - return false - case o := <-called: - outcomes = append(outcomes, o) - return len(outcomes) == 2 - } - }, "should have returned 2 outcomes") - - if len(outcomes) >= 2 { - assert.Equal(t, OutcomeCancelled, outcomes[0].Kind) - assert.Equal(t, OutcomeCancelled, outcomes[1].Kind) - } + expectAllLetterOutcomes(t, called, OutcomeCancelled, 2) // subsequent sends should be rejected - pm.Send(ctx, testURL, nil, func(o LetterOutcome) { called <- o }) - - var outcome LetterOutcome - Eventually(t, func() bool { - select { - default: - return false - case outcome = <-called: - return true - } - }, "should have received outcome") - - assert.Equal(t, OutcomeRejected, outcome.Kind) + pm.Send(ctx, "peer", nil, func(o LetterOutcome) { called <- o }) + expectLetterOutcome(t, called, OutcomeRejected) } diff --git a/req.go b/req.go index 98356e7..d58684d 100644 --- a/req.go +++ b/req.go @@ -3,11 +3,18 @@ package prism import ( "context" "encoding/base32" + "fmt" + "git.wisehodl.dev/jay/go-mana-component" + "git.wisehodl.dev/jay/go-roots-ws" "log/slog" "sync" "time" ) +var ( + _ fmt.Formatter +) + // ---------------------------------------------------------------------------- // Types // ---------------------------------------------------------------------------- @@ -17,13 +24,13 @@ import ( type ReqEvent struct { PeerID string ReceivedAt time.Time - Event []byte + Data []byte } type ReqMessage struct { PeerID string ReceivedAt time.Time - Message string + Data string } // Options @@ -84,13 +91,14 @@ type request struct { messages chan ReqMessage postmaster *Postmaster - journals chan<- JournalEntry + journals chan JournalEntry isConnected func(peerID string) bool onClose func() - sendCtx context.Context - wg sync.WaitGroup - logger *slog.Logger + ctx context.Context + wg sync.WaitGroup + peerWg sync.WaitGroup + logger *slog.Logger } // Stream Request @@ -101,6 +109,7 @@ type StreamReq struct { } type streamPeer struct { + reqSent bool closeSent bool closed bool closeOnce sync.Once @@ -115,69 +124,13 @@ type QueryReq struct { } type queryPeer struct { + reqSent bool eoseTimer *time.Timer closeSent bool closed bool closeOnce sync.Once } -// Request Tasks - -type reqTask interface{ reqTask() } // gates task channel - -type taskRecordReqOutcome struct { - peerID string - outcome LetterOutcome -} - -func (taskRecordReqOutcome) reqTask() {} - -type taskRecordCloseOutcome struct { - peerID string - outcome LetterOutcome -} - -func (taskRecordCloseOutcome) reqTask() {} - -type taskReceiveEvent struct { - peerID string - at time.Time - data Envelope -} - -func (taskReceiveEvent) reqTask() {} - -type taskReceiveEOSE struct { - peerID string - at time.Time -} - -func (taskReceiveEOSE) reqTask() {} - -type taskReceiveClosed struct { - peerID string - at time.Time - message string -} - -func (taskReceiveClosed) reqTask() {} - -type taskClosePeer struct{ peerID string } - -func (taskClosePeer) reqTask() {} - -type taskCloseReq struct{} - -func (taskCloseReq) reqTask() {} - -type taskHandleReconnect struct{ peerID string } - -func (taskHandleReconnect) reqTask() {} - -type taskHandleEOSETimeout struct{ peerID string } - -func (taskHandleEOSETimeout) reqTask() {} - // ---------------------------------------------------------------------------- // Request Options // ---------------------------------------------------------------------------- @@ -260,7 +213,7 @@ func (m *ReqManager) CloseReq(id string) error { func (m *ReqManager) Close() {} -func (m *ReqManager) makeOnClose(subID, peers []string) func() { +func (m *ReqManager) makeOnClose(subID string, peers []string) func() { return func() {} } @@ -292,22 +245,49 @@ func generateID(prefix string) string { // Base Request // ---------------------------------------------------------------------------- +func (r *request) runReturnEvents() { + defer r.wg.Done() + defer close(r.events) + defer close(r.messages) + bufferedPipe(r.buffer, r.events) +} + +func (r *request) dispatchEvent(task taskEvent) { + select { + case <-r.done: + case r.buffer <- ReqEvent{ + PeerID: task.peerID, ReceivedAt: task.at, Data: task.data}: + } +} + func (r *request) emit(entry JournalEntry) { - // send into journal entry channel - // selects on r.done and r.journals + select { + case <-r.done: + case r.journals <- entry: + } } func (r *request) order(task reqTask) { - // send into task queue - // selects on r.done and r.tasks + select { + case <-r.done: + case r.tasks <- task: + } } -func (r *request) send( - peerID string, - data Envelope, - makeOutcomeTask func(peerID string, outcome LetterOutcome) reqTask, -) error { - return nil +func (r *request) Close() { + r.order(newCloseReq()) + r.wg.Wait() +} + +func (r *request) terminate() { + defer r.wg.Done() + r.peerWg.Wait() + close(r.done) + close(r.buffer) + if r.journals != nil { + close(r.journals) + } + r.onClose() } // ---------------------------------------------------------------------------- @@ -321,48 +301,252 @@ func NewStreamReq( peers []string, postmaster *Postmaster, isConnected func(string) bool, - journals chan<- JournalEntry, + collector *JournalCollector, onClose func(), handler slog.Handler, ) *StreamReq { - // start buffered pipe to event output - // pipe return drives channel closures - return nil -} + ctx = component.MustExtend(ctx, "stream") -func (r *StreamReq) Peers() []string { - return nil -} + r := &StreamReq{ + request: &request{ + id: id, + req: envelope.EncloseReq(id, filters), -func (r *StreamReq) Close() {} + tasks: make(chan reqTask, len(peers)*16), + done: make(chan struct{}), -func (r *StreamReq) sendReq(peerID string) error { - return nil -} + buffer: make(chan ReqEvent, len(peers)*16), + events: make(chan ReqEvent), + messages: make(chan ReqMessage, len(peers)), -func (r *StreamReq) sendClose(peerID string) error { - return nil + postmaster: postmaster, + isConnected: isConnected, + onClose: onClose, + + ctx: ctx, + }, + peers: make(map[string]*streamPeer), + } + + if collector != nil { + r.journals = make(chan JournalEntry, len(peers)*16) + collector.Enroll(r.journals) + } + + if handler != nil { + c := component.FromContext(ctx) + r.logger = slog.New(handler).With(slog.Any("component", c)) + } + + for _, peerID := range peers { + r.peers[peerID] = &streamPeer{} + r.peerWg.Add(1) + } + + r.wg.Add(2) + go r.run() + go r.runReturnEvents() + + // send initial REQs + for id := range r.peers { + if r.isConnected(id) { + r.sendReq(id) + } + } + + return r } func (r *StreamReq) run() { - // switches on task type + defer r.wg.Done() + + for { + select { + case <-r.done: + return + case t := <-r.tasks: + r.dispatch(t) + } + } } -func (r *StreamReq) applyRecordReqOutcome(task taskRecordReqOutcome) {} +func (r *StreamReq) Peers() []string { + peers := make([]string, 0, len(r.peers)) + for p := range r.peers { + peers = append(peers, p) + } + return peers +} -func (r *StreamReq) applyRecordCloseOutcome(task taskRecordCloseOutcome) {} +func (r *StreamReq) sendReq(peerID string) { + _, ok := r.peers[peerID] + if !ok { + return + } -func (r *StreamReq) applyReceiveEvent(task taskReceiveEvent) {} + id, _ := r.postmaster.Send(r.ctx, peerID, r.req, + func(o LetterOutcome) { r.order(newReqOutcomeTask(peerID, o)) }) -func (r *StreamReq) applyReceiveEOSE(task taskReceiveEOSE) {} + c := component.FromContext(r.ctx) + r.emit(NewReqQueuedJournal(peerID, c, ReqQueuedData{ + SubID: r.id, LetterID: id, QueuedAt: time.Now(), + })) +} -func (r *StreamReq) applyReceiveClosed(task taskReceiveClosed) {} +func (r *StreamReq) sendClose(peerID string) { + peer, ok := r.peers[peerID] + if !ok || peer.closeSent { + return + } -func (r *StreamReq) applyClosePeer(task taskClosePeer) {} + if !peer.reqSent { + r.closePeer(peerID) + return + } -func (r *StreamReq) applyCloseReq(task taskCloseReq) {} + id, _ := r.postmaster.Send(r.ctx, peerID, envelope.EncloseClose(r.id), + func(o LetterOutcome) { r.order(newCloseOutcomeTask(peerID, o)) }) -func (r *StreamReq) applyHandleReconnect(task taskHandleReconnect) {} + peer.closeSent = true + c := component.FromContext(r.ctx) + r.emit(NewCloseQueuedJournal(peerID, c, CloseQueuedData{ + SubID: r.id, LetterID: id, QueuedAt: time.Now(), + })) +} + +func (r *StreamReq) closePeer(peerID string) { + peer, ok := r.peers[peerID] + if !ok { + return + } + + peer.closeOnce.Do(func() { + r.peerWg.Done() + peer.closed = true + }) +} + +func (r *StreamReq) dispatch(task reqTask) { + switch t := task.(type) { + case taskReqOutcome: + r.dispatchReqOutcome(t) + + case taskCloseOutcome: + r.dispatchCloseOutcome(t) + + case taskEvent: + r.dispatchEvent(t) + + case taskEOSE: + r.dispatchEOSE(t) + + case taskClosed: + r.dispatchClosed(t) + + case taskClosePeer: + r.dispatchClosePeer(t) + + case taskCloseReq: + r.dispatchCloseReq(t) + + case taskHandleReconnect: + r.dispatchHandleReconnect(t) + } +} + +func (r *StreamReq) dispatchReqOutcome(task taskReqOutcome) { + peer := r.peers[task.peerID] + if task.outcome.Kind == OutcomeSent { + peer.reqSent = true + } + + c := component.FromContext(r.ctx) + r.emit(NewReqSendOutcomeJournal(task.peerID, c, ReqSendOutcomeData{ + SubID: r.id, + LetterID: task.outcome.LetterID, + Outcome: task.outcome.Kind, + SentAt: task.outcome.SentAt, + MissedAt: task.outcome.MissedAt, + RetryCount: task.outcome.Retries, + })) +} + +func (r *StreamReq) dispatchCloseOutcome(task taskCloseOutcome) { + r.closePeer(task.peerID) + + c := component.FromContext(r.ctx) + r.emit(NewCloseSendOutcomeJournal(task.peerID, c, CloseSendOutcomeData{ + SubID: r.id, + LetterID: task.outcome.LetterID, + Outcome: task.outcome.Kind, + SentAt: task.outcome.SentAt, + MissedAt: task.outcome.MissedAt, + RetryCount: task.outcome.Retries, + })) +} + +func (r *StreamReq) dispatchEOSE(task taskEOSE) { + c := component.FromContext(r.ctx) + r.emit(NewReceivedEOSEJournal(task.peerID, c, ReceivedEOSEData{ + SubID: r.id, At: task.at, + })) +} + +func (r *StreamReq) dispatchClosed(task taskClosed) { + c := component.FromContext(r.ctx) + r.emit(NewReceivedClosedJournal(task.peerID, c, ReceivedClosedData{ + SubID: r.id, At: task.at, Message: task.message, + })) + + peer := r.peers[task.peerID] + if peer.closed { + return + } + + select { + case <-r.done: + case r.messages <- ReqMessage{ + PeerID: task.peerID, + ReceivedAt: task.at, + Data: task.message, + }: + } + + r.closePeer(task.peerID) +} + +func (r *StreamReq) dispatchClosePeer(task taskClosePeer) { + r.closePeer(task.peerID) +} + +func (r *StreamReq) dispatchCloseReq(task taskCloseReq) { + if r.closing { + return + } + + r.closing = true + + for id, peer := range r.peers { + if !peer.closed { + if !r.isConnected(id) { + r.closePeer(id) + } else { + r.sendClose(id) + } + } + } + + r.wg.Add(1) + go r.terminate() +} + +func (r *StreamReq) dispatchHandleReconnect(task taskHandleReconnect) { + peer, ok := r.peers[task.peerID] + if !ok || peer.closed || r.closing || peer.closeSent { + return + } + r.sendReq(task.peerID) +} // ---------------------------------------------------------------------------- // Query Request @@ -386,11 +570,13 @@ func NewQueryReq( } func (r *QueryReq) Peers() []string { - return nil + peers := make([]string, 0, len(r.peers)) + for p := range r.peers { + peers = append(peers, p) + } + return peers } -func (r *QueryReq) Close() {} - func (r *QueryReq) sendReq(peerID string) error { return nil } @@ -400,50 +586,144 @@ func (r *QueryReq) sendClose(peerID string) error { } func (r *QueryReq) run() { - // switches on task type + defer r.wg.Done() + + for { + select { + case <-r.done: + return + case t := <-r.tasks: + r.dispatch(t) + } + } } -func (r *QueryReq) applyRecordReqOutcome(task taskRecordReqOutcome) {} +func (r *QueryReq) dispatch(task reqTask) { + switch t := task.(type) { + case taskReqOutcome: + r.dispatchReqOutcome(t) -func (r *QueryReq) applyRecordCloseOutcome(task taskRecordCloseOutcome) {} + case taskCloseOutcome: + r.dispatchCloseOutcome(t) -func (r *QueryReq) applyReceiveEvent(task taskReceiveEvent) {} + case taskEvent: + r.dispatchEvent(t) -func (r *QueryReq) applyReceiveEOSE(task taskReceiveEOSE) {} + case taskEOSE: + r.dispatchEOSE(t) -func (r *QueryReq) applyReceiveClosed(task taskReceiveClosed) {} + case taskClosed: + r.dispatchClosed(t) -func (r *QueryReq) applyClosePeer(task taskClosePeer) {} + case taskClosePeer: + r.dispatchClosePeer(t) -func (r *QueryReq) applyCloseReq(task taskCloseReq) {} + case taskCloseReq: + r.dispatchCloseReq(t) -func (r *QueryReq) applyHandleEOSETimeout(task taskHandleEOSETimeout) {} + case taskMissedEOSE: + r.dispatchMissedEOSE(t) + } +} + +func (r *QueryReq) dispatchReqOutcome(task taskReqOutcome) {} + +func (r *QueryReq) dispatchCloseOutcome(task taskCloseOutcome) {} + +func (r *QueryReq) dispatchEOSE(task taskEOSE) {} + +func (r *QueryReq) dispatchClosed(task taskClosed) {} + +func (r *QueryReq) dispatchClosePeer(task taskClosePeer) {} + +func (r *QueryReq) dispatchCloseReq(task taskCloseReq) {} + +func (r *QueryReq) dispatchMissedEOSE(task taskMissedEOSE) {} // ---------------------------------------------------------------------------- // Request Tasks // ---------------------------------------------------------------------------- -func newRecordReqOutcome(peerID string, outcome LetterOutcome) taskRecordReqOutcome { - return taskRecordReqOutcome{peerID: peerID, outcome: outcome} +// Types + +type reqTask interface{ reqTask() } // gates task channel + +type taskReqOutcome struct { + peerID string + outcome LetterOutcome } -func newRecordCloseOutcome(peerID string, outcome LetterOutcome) taskRecordCloseOutcome { - return taskRecordCloseOutcome{peerID: peerID, outcome: outcome} +func (taskReqOutcome) reqTask() {} + +type taskCloseOutcome struct { + peerID string + outcome LetterOutcome } -func newReceiveEvent(peerID string, at time.Time, data Envelope) taskReceiveEvent { - return taskReceiveEvent{peerID: peerID, at: at, data: data} +func (taskCloseOutcome) reqTask() {} + +type taskEvent struct { + peerID string + at time.Time + data Envelope } -func newReceiveEOSE(peerID string, at time.Time) taskReceiveEOSE { - return taskReceiveEOSE{peerID: peerID, at: at} +func (taskEvent) reqTask() {} + +type taskEOSE struct { + peerID string + at time.Time } -func newReceiveClosed(peerID string, at time.Time, message string) taskReceiveClosed { - return taskReceiveClosed{peerID: peerID, at: at, message: message} +func (taskEOSE) reqTask() {} + +type taskClosed struct { + peerID string + at time.Time + message string } -func newClosePeer(peerID string) taskClosePeer { +func (taskClosed) reqTask() {} + +type taskClosePeer struct{ peerID string } + +func (taskClosePeer) reqTask() {} + +type taskCloseReq struct{} + +func (taskCloseReq) reqTask() {} + +type taskHandleReconnect struct{ peerID string } + +func (taskHandleReconnect) reqTask() {} + +type taskMissedEOSE struct{ peerID string } + +func (taskMissedEOSE) reqTask() {} + +// Constructors + +func newReqOutcomeTask(peerID string, outcome LetterOutcome) taskReqOutcome { + return taskReqOutcome{peerID: peerID, outcome: outcome} +} + +func newCloseOutcomeTask(peerID string, outcome LetterOutcome) taskCloseOutcome { + return taskCloseOutcome{peerID: peerID, outcome: outcome} +} + +func newEventTask(peerID string, at time.Time, data Envelope) taskEvent { + return taskEvent{peerID: peerID, at: at, data: data} +} + +func newEOSETask(peerID string, at time.Time) taskEOSE { + return taskEOSE{peerID: peerID, at: at} +} + +func newClosedTask(peerID string, at time.Time, message string) taskClosed { + return taskClosed{peerID: peerID, at: at, message: message} +} + +func newClosePeerTask(peerID string) taskClosePeer { return taskClosePeer{peerID: peerID} } @@ -455,6 +735,6 @@ func newHandleReconnect(peerID string) taskHandleReconnect { return taskHandleReconnect{peerID: peerID} } -func newHandleEOSETimeout(peerID string) taskHandleEOSETimeout { - return taskHandleEOSETimeout{peerID: peerID} +func newMissedEOSETask(peerID string) taskMissedEOSE { + return taskMissedEOSE{peerID: peerID} } diff --git a/streamreq_test.go b/streamreq_test.go index 8828353..42687d5 100644 --- a/streamreq_test.go +++ b/streamreq_test.go @@ -1 +1,613 @@ package prism + +import ( + "context" + "fmt" + "git.wisehodl.dev/jay/go-mana-component" + "git.wisehodl.dev/jay/go-roots-ws" + "github.com/stretchr/testify/assert" + "reflect" + "slices" + "sync" + "sync/atomic" + "testing" + "time" +) + +// TODO: remove +var ( + _ context.Context + _ assert.Assertions + _ testing.T + _ time.Time + _ fmt.Formatter +) + +// Helpers + +type reqTestHarness struct { + ctx context.Context + pm *Postmaster + events chan PoolEvent + sent map[string][]string + sentMu *sync.RWMutex + isConnected func(string) bool + collector *JournalCollector + journals <-chan JournalEntry + closed atomic.Bool +} + +func setupReqHarness(t *testing.T, peers []string) reqTestHarness { + ctx := component.MustNew(context.Background(), "prism", "test") + pm, poolEvents, sent, sentMu, isConnected := mockReqPostmaster(t, ctx, peers) + collector := NewJournalCollector() + journals := collector.Out() + return reqTestHarness{ + ctx: ctx, + pm: pm, + events: poolEvents, + sent: sent, + sentMu: sentMu, + isConnected: isConnected, + collector: collector, + journals: journals, + } +} + +func mockReqPostmaster( + t *testing.T, + ctx context.Context, + peers []string, +) ( + pm *Postmaster, + poolEvents chan PoolEvent, + sent map[string][]string, + sentMu *sync.RWMutex, + isConnected func(id string) bool, +) { + t.Helper() + + poolHasPeer := func(id string) (string, bool) { + if ok := slices.Contains(peers, id); ok { + return id, true + } + return "", false + } + + poolEvents = make(chan PoolEvent, 4) + pmEvents := make(chan PoolEvent, 4) + + connected := make(map[string]bool) + connMu := sync.RWMutex{} + isConnected = func(id string) bool { + connMu.RLock() + defer connMu.RUnlock() + return connected[id] + } + + go func() { + for ev := range poolEvents { + connMu.Lock() + switch ev.Kind { + case EventConnected: + connected[ev.ID] = true + case EventDisconnected: + connected[ev.ID] = false + } + connMu.Unlock() + pmEvents <- ev + } + }() + + sent = make(map[string][]string) + sentMu = &sync.RWMutex{} + poolSendFunc := func(id string, data Envelope) error { + sentMu.Lock() + defer sentMu.Unlock() + sent[id] = append(sent[id], string(data)) + return nil + } + + pm = NewPostmaster(ctx, poolHasPeer, pmEvents, poolSendFunc, nil) + + for _, id := range peers { + poolEvents <- PoolEvent{ID: id, Kind: EventAdded, At: time.Now()} + connected[id] = false + } + + Eventually(t, func() bool { return len(pm.Peers()) == len(peers) }, + "should add peers") + + return +} + +func expectSentMessage(t *testing.T, + sent map[string][]string, + mu *sync.RWMutex, + peerID string, + msg []byte, + index int, +) { + t.Helper() + + Eventually(t, func() bool { + mu.RLock() + defer mu.RUnlock() + if len(sent[peerID]) <= index { + return false + } + return sent[peerID][index] == string(msg) + }, fmt.Sprintf("expected message to be sent to %q: %s", peerID, string(msg))) +} + +func neverSentMessage(t *testing.T, + sent map[string][]string, + mu *sync.RWMutex, + peerID string, + msg []byte, + index int, +) { + t.Helper() + + Never(t, func() bool { + mu.RLock() + defer mu.RUnlock() + if len(sent[peerID]) <= index { + return false + } + return sent[peerID][index] == string(msg) + }, fmt.Sprintf("unexpected message sent to %q: %s", peerID, string(msg))) +} + +func expectJournalEntry(t *testing.T, + journals <-chan JournalEntry, + expected reflect.Type, +) { + t.Helper() + + Eventually(t, func() bool { + select { + default: + return false + case entry := <-journals: + got := reflect.TypeOf(entry) + return expected == got + } + }, fmt.Sprintf("expected journal entry: %s", expected)) +} + +// Tests + +func TestStreamReq_InitialReq(t *testing.T) { + t.Run("sends req to one peer", func(t *testing.T) { + peers := []string{"peer"} + h := setupReqHarness(t, peers) + + // connect to peer + h.events <- PoolEvent{ID: "peer", Kind: EventConnected, At: time.Now()} + Eventually(t, func() bool { return h.isConnected("peer") }, + "expected peer to connect") + + // open req + filters := [][]byte{[]byte("{}")} + req := NewStreamReq( + h.ctx, "REQ", filters, peers, h.pm, h.isConnected, + h.collector, func() { h.closed.Store(true) }, nil) + + expectedReq := envelope.EncloseReq("REQ", filters) + expectedClose := envelope.EncloseClose("REQ") + + expectJournalEntry(t, h.journals, reflect.TypeOf(ReqQueuedJournal{})) + expectJournalEntry(t, h.journals, reflect.TypeOf(ReqSendOutcomeJournal{})) + expectSentMessage(t, h.sent, h.sentMu, "peer", expectedReq, 0) + + // close req + req.Close() + + expectJournalEntry(t, h.journals, reflect.TypeOf(CloseQueuedJournal{})) + expectJournalEntry(t, h.journals, reflect.TypeOf(CloseSendOutcomeJournal{})) + expectSentMessage(t, h.sent, h.sentMu, "peer", expectedClose, 1) + + Eventually(t, func() bool { return h.closed.Load() }, + "expected close callback to be called") + }) + + t.Run("doesn't send to disconnected peer", func(t *testing.T) { + peers := []string{"peer"} + h := setupReqHarness(t, peers) + + // open req + filters := [][]byte{[]byte("{}")} + req := NewStreamReq( + h.ctx, "REQ", filters, peers, h.pm, h.isConnected, + h.collector, func() { h.closed.Store(true) }, nil) + + expectedReq := envelope.EncloseReq("REQ", filters) + expectedClose := envelope.EncloseClose("REQ") + + neverSentMessage(t, h.sent, h.sentMu, "peer", expectedReq, 0) + + // close req + req.Close() + + neverSentMessage(t, h.sent, h.sentMu, "peer", expectedClose, 0) + + Eventually(t, func() bool { return h.closed.Load() }, + "expected close callback to be called") + }) + + t.Run("sends req to multiple peers", func(t *testing.T) { + peers := []string{"peer1", "peer2"} + h := setupReqHarness(t, peers) + + // connect to peers + h.events <- PoolEvent{ID: "peer1", Kind: EventConnected, At: time.Now()} + h.events <- PoolEvent{ID: "peer2", Kind: EventConnected, At: time.Now()} + Eventually(t, func() bool { return h.isConnected("peer1") }, + "expected peer 1 to connect") + Eventually(t, func() bool { return h.isConnected("peer2") }, + "expected peer 2 to connect") + + // open req + filters := [][]byte{[]byte("{}")} + req := NewStreamReq( + h.ctx, "REQ", filters, peers, h.pm, h.isConnected, + h.collector, func() { h.closed.Store(true) }, nil) + + expectedReq := envelope.EncloseReq("REQ", filters) + expectedClose := envelope.EncloseClose("REQ") + + expectSentMessage(t, h.sent, h.sentMu, "peer1", expectedReq, 0) + expectSentMessage(t, h.sent, h.sentMu, "peer2", expectedReq, 0) + + // expect two req outcomes + expectJournalEntry(t, h.journals, reflect.TypeOf(ReqSendOutcomeJournal{})) + expectJournalEntry(t, h.journals, reflect.TypeOf(ReqSendOutcomeJournal{})) + + // close req + req.Close() + + expectSentMessage(t, h.sent, h.sentMu, "peer1", expectedClose, 1) + expectSentMessage(t, h.sent, h.sentMu, "peer2", expectedClose, 1) + + Eventually(t, func() bool { return h.closed.Load() }, + "expected close callback to be called") + }) +} + +func TestStreamReq_EventForwarding(t *testing.T) { + t.Run("events are forwarded", func(t *testing.T) { + peers := []string{"peer"} + h := setupReqHarness(t, peers) + + filters := [][]byte{[]byte("{}")} + req := NewStreamReq( + h.ctx, "REQ", filters, peers, h.pm, h.isConnected, + h.collector, func() { h.closed.Store(true) }, nil) + + // simulate receive event + req.order(newEventTask("peer", time.Now(), []byte("event"))) + + // receive event + var event ReqEvent + Eventually(t, func() bool { + select { + default: + return false + case event = <-req.events: + return true + } + }, "expected event") + + assert.Equal(t, "peer", event.PeerID) + assert.False(t, event.ReceivedAt.IsZero()) + assert.Equal(t, []byte("event"), event.Data) + }) + + t.Run("events channel closes on close", func(t *testing.T) { + peers := []string{"peer"} + h := setupReqHarness(t, peers) + + // open req + filters := [][]byte{[]byte("{}")} + req := NewStreamReq( + h.ctx, "REQ", filters, peers, h.pm, h.isConnected, + h.collector, func() { h.closed.Store(true) }, nil) + + // close req + req.Close() + + Eventually(t, func() bool { + select { + default: + return false + case _, ok := <-req.events: + // expect channel close + return !ok + } + }, "expected event channel to close") + }) +} + +func TestStreamReq_EOSEHandling(t *testing.T) { + t.Run("eose emits journal", func(t *testing.T) { + peers := []string{"peer"} + h := setupReqHarness(t, peers) + + // open req + filters := [][]byte{[]byte("{}")} + req := NewStreamReq( + h.ctx, "REQ", filters, peers, h.pm, h.isConnected, + h.collector, func() { h.closed.Store(true) }, nil) + + // simulate EOSE + req.order(newEOSETask("peer", time.Now())) + + expectJournalEntry(t, h.journals, reflect.TypeOf(ReceivedEOSEJournal{})) + }) +} + +func TestStreamReq_ClosedHandling(t *testing.T) { + t.Run("closed forwards message once", func(t *testing.T) { + peers := []string{"peer"} + h := setupReqHarness(t, peers) + + // open req + filters := [][]byte{[]byte("{}")} + req := NewStreamReq( + h.ctx, "REQ", filters, peers, h.pm, h.isConnected, + h.collector, func() { h.closed.Store(true) }, nil) + + // simulate closed + req.order(newClosedTask("peer", time.Now(), "closed")) + expectJournalEntry(t, h.journals, reflect.TypeOf(ReceivedClosedJournal{})) + + // receive message + var message ReqMessage + Eventually(t, func() bool { + select { + default: + return false + case message = <-req.messages: + return true + } + }, "expected closed message") + + assert.Equal(t, "peer", message.PeerID) + assert.False(t, message.ReceivedAt.IsZero()) + assert.Equal(t, "closed", message.Data) + + // multiple closed emit journals + req.order(newClosedTask("peer", time.Now(), "closed")) + expectJournalEntry(t, h.journals, reflect.TypeOf(ReceivedClosedJournal{})) + + // but do not emit more than one message to the caller + Never(t, func() bool { + select { + default: + return false + case <-req.messages: + return true + } + }, "second closed message should not arrive") + + // close req + req.Close() + + // expect messages channel to close + Eventually(t, func() bool { + select { + default: + return false + case _, ok := <-req.messages: + // expect channel close + return !ok + } + }, "expected messages channel to close") + }) +} + +func TestStreamReq_Reconnect(t *testing.T) { + t.Run("req replays after reconnect", func(t *testing.T) { + peers := []string{"peer"} + h := setupReqHarness(t, peers) + + h.events <- PoolEvent{ID: "peer", Kind: EventConnected, At: time.Now()} + Eventually(t, func() bool { return h.isConnected("peer") }, + "expected peer to connect") + + // open req + filters := [][]byte{[]byte("{}")} + req := NewStreamReq( + h.ctx, "REQ", filters, peers, h.pm, h.isConnected, + h.collector, func() { h.closed.Store(true) }, nil) + + expectedReq := envelope.EncloseReq("REQ", filters) + expectedClose := envelope.EncloseClose("REQ") + + // initial req is sent + expectSentMessage(t, h.sent, h.sentMu, "peer", expectedReq, 0) + expectJournalEntry(t, h.journals, reflect.TypeOf(ReqSendOutcomeJournal{})) + + // cycle disconnect-reconnect + h.events <- PoolEvent{ID: "peer", Kind: EventDisconnected, At: time.Now()} + Eventually(t, func() bool { return !h.isConnected("peer") }, + "expected peer to disconnect") + h.events <- PoolEvent{ID: "peer", Kind: EventConnected, At: time.Now()} + Eventually(t, func() bool { return h.isConnected("peer") }, + "expected peer to connect") + + // simulate req manager handling connect event + req.order(newHandleReconnect("peer")) + + // expect replayed req + expectSentMessage(t, h.sent, h.sentMu, "peer", expectedReq, 1) + expectJournalEntry(t, h.journals, reflect.TypeOf(ReqSendOutcomeJournal{})) + + // close req + req.Close() + + expectSentMessage(t, h.sent, h.sentMu, "peer", expectedClose, 2) + + Eventually(t, func() bool { return h.closed.Load() }, + "expected close callback to be called") + }) + + t.Run("delayed connection sends req", func(t *testing.T) { + peers := []string{"peer"} + h := setupReqHarness(t, peers) + + // open req + filters := [][]byte{[]byte("{}")} + req := NewStreamReq( + h.ctx, "REQ", filters, peers, h.pm, h.isConnected, + h.collector, func() { h.closed.Store(true) }, nil) + + expectedReq := envelope.EncloseReq("REQ", filters) + expectedClose := envelope.EncloseClose("REQ") + + neverSentMessage(t, h.sent, h.sentMu, "peer", expectedReq, 0) + + // postmaster-side connect + h.events <- PoolEvent{ID: "peer", Kind: EventConnected, At: time.Now()} + Eventually(t, func() bool { return h.isConnected("peer") }, + "expected peer to connect") + + // simulate req manager handling connect event + req.order(newHandleReconnect("peer")) + + expectSentMessage(t, h.sent, h.sentMu, "peer", expectedReq, 0) + expectJournalEntry(t, h.journals, reflect.TypeOf(ReqSendOutcomeJournal{})) + + // close req + req.Close() + + expectSentMessage(t, h.sent, h.sentMu, "peer", expectedClose, 1) + + Eventually(t, func() bool { return h.closed.Load() }, + "expected close callback to be called") + }) + + t.Run("no replay when closing", func(t *testing.T) { + peers := []string{"peer"} + h := setupReqHarness(t, peers) + + // open req + filters := [][]byte{[]byte("{}")} + req := NewStreamReq( + h.ctx, "REQ", filters, peers, h.pm, h.isConnected, + h.collector, func() { h.closed.Store(true) }, nil) + + expectedReq := envelope.EncloseReq("REQ", filters) + + neverSentMessage(t, h.sent, h.sentMu, "peer", expectedReq, 0) + + // postmaster-side connect + h.events <- PoolEvent{ID: "peer", Kind: EventConnected, At: time.Now()} + Eventually(t, func() bool { return h.isConnected("peer") }, + "expected peer to connect") + + // close req + req.Close() + + // reconnect during or after close + req.order(newHandleReconnect("peer")) + neverSentMessage(t, h.sent, h.sentMu, "peer", expectedReq, 0) + + Eventually(t, func() bool { return h.closed.Load() }, + "expected close callback to be called") + }) +} + +func TestStreamReq_Terminal(t *testing.T) { +} + +func TestStreamReq_Close(t *testing.T) { + t.Run("close is idempotent", func(t *testing.T) { + peers := []string{"peer"} + h := setupReqHarness(t, peers) + + // connect to peer + h.events <- PoolEvent{ID: "peer", Kind: EventConnected, At: time.Now()} + Eventually(t, func() bool { return h.isConnected("peer") }, + "expected peer to connect") + + // open req + filters := [][]byte{[]byte("{}")} + req := NewStreamReq( + h.ctx, "REQ", filters, peers, h.pm, h.isConnected, + h.collector, func() { h.closed.Store(true) }, nil) + + expectedReq := envelope.EncloseReq("REQ", filters) + expectedClose := envelope.EncloseClose("REQ") + + expectSentMessage(t, h.sent, h.sentMu, "peer", expectedReq, 0) + expectJournalEntry(t, h.journals, reflect.TypeOf(ReqSendOutcomeJournal{})) + + // close req twice + req.Close() + req.Close() + + // only expect one close + expectSentMessage(t, h.sent, h.sentMu, "peer", expectedClose, 1) + + // second close never arrives + neverSentMessage(t, h.sent, h.sentMu, "peer", expectedClose, 2) + + Eventually(t, func() bool { return h.closed.Load() }, + "expected close callback to be called") + }) + + t.Run("close not sent if req was never sent", func(t *testing.T) { + peers := []string{"peer"} + h := setupReqHarness(t, peers) + + // open req + filters := [][]byte{[]byte("{}")} + req := NewStreamReq( + h.ctx, "REQ", filters, peers, h.pm, h.isConnected, + h.collector, func() { h.closed.Store(true) }, nil) + + expectedReq := envelope.EncloseReq("REQ", filters) + expectedClose := envelope.EncloseClose("REQ") + + neverSentMessage(t, h.sent, h.sentMu, "peer", expectedReq, 0) + + // close req + req.Close() + + // req was never sent, so a close should not be sent + neverSentMessage(t, h.sent, h.sentMu, "peer", expectedClose, 0) + + Eventually(t, func() bool { return h.closed.Load() }, + "expected close callback to be called") + }) + + t.Run("close not sent if req was cancelled", func(t *testing.T) { + peers := []string{"peer"} + h := setupReqHarness(t, peers) + + // open req + filters := [][]byte{[]byte("{}")} + req := NewStreamReq( + h.ctx, "REQ", filters, peers, h.pm, h.isConnected, + h.collector, func() { h.closed.Store(true) }, nil) + + expectedClose := envelope.EncloseClose("REQ") + + // simulate cancelled req outcome + req.order(newReqOutcomeTask("peer", LetterOutcome{Kind: OutcomeCancelled})) + + // close req + req.Close() + + // req was never sent, so a close should not be sent + neverSentMessage(t, h.sent, h.sentMu, "peer", expectedClose, 0) + + Eventually(t, func() bool { return h.closed.Load() }, + "expected close callback to be called") + }) +} + +func TestStreamReq_Journals(t *testing.T) { +}