From f7948c08b874334859d05b658e32fe9976a5a8b2 Mon Sep 17 00:00:00 2001 From: Jay Date: Sun, 17 May 2026 12:29:42 -0400 Subject: [PATCH] stream: route eose to session, ignores eose subtest --- request.go | 19 +++++++++++++++--- request_test.go | 53 ++++++++++++++++++++++++++++++++++++++++++++----- 2 files changed, 64 insertions(+), 8 deletions(-) diff --git a/request.go b/request.go index 0445581..f9a0000 100644 --- a/request.go +++ b/request.go @@ -273,11 +273,24 @@ func (m *RequestManager) dispatchInbox(msg InboxMessage) { return } select { - case req.buffer <- ReqEvent{PeerID: msg.ID, ReceivedAt: msg.ReceivedAt, Data: event}: - default: + case req.buffer <- ReqEvent{ + PeerID: msg.ID, ReceivedAt: msg.ReceivedAt, Data: event}: } case "EOSE": - // route to session + subID, err := envelope.FindEOSE(msg.Data) + if err != nil { + return + } + m.mu.RLock() + sub, ok := m.inboxSubs[subID] + m.mu.RUnlock() + if !ok { + return + } + select { + case sub.eose <- struct{}{}: + default: + } case "CLOSED": // route to session and request } diff --git a/request_test.go b/request_test.go index f4b41bb..5ac608b 100644 --- a/request_test.go +++ b/request_test.go @@ -299,11 +299,54 @@ func TestRequestManager_Stream(t *testing.T) { }) t.Run("ignores eose", func(t *testing.T) { - // connect, call Stream - // inject an EOSE envelope for the sub id - // assert the events channel does not close - // assert the closed channel receives nothing - // assert a subsequent EVENT is still forwarded + p, envoy := newMockEnvoy(t) + p.connect() + Eventually(t, envoy.IsConnected, "envoy should be connected") + + m := NewRequestManager(envoy) + filters := [][]byte{[]byte(`{}`)} + id, events, closed := m.Stream(filters) + + // drain the REQ send + Eventually(t, func() bool { + select { + case <-p.sent: + return true + default: + return false + } + }, "expected REQ send") + + p.receive(envelope.EncloseEOSE(id)) + + Never(t, func() bool { + m.mu.RLock() + _, ok := m.sessions[id] + m.mu.RUnlock() + return !ok + }, "session should not terminate after eose") + + Never(t, func() bool { + select { + case <-closed: + return true + default: + return false + } + }, "closed should not signal on eose for stream") + + // assert a subsequent event is still forwarded + eventA := []byte(`{"id":"a"}`) + p.receive(envelope.EncloseSubscriptionEvent(id, eventA)) + + Eventually(t, func() bool { + select { + case ev := <-events: + return assert.Equal(t, eventA, ev.Data) + default: + return false + } + }, "expected event after eose") }) t.Run("closed deregisters and signals caller", func(t *testing.T) {