From e4c7cffb142517b8fb1d591e5873228d7cafb85c Mon Sep 17 00:00:00 2001 From: Jay Date: Sun, 17 May 2026 15:58:14 -0400 Subject: [PATCH] stream: guard closed and eose sends with Once; test duplicate closed --- request.go | 45 +++++++++++++++++++++++++++------------------ request_test.go | 33 +++++++++++++++++++++++++++++++++ 2 files changed, 60 insertions(+), 18 deletions(-) diff --git a/request.go b/request.go index 759d139..1f5043b 100644 --- a/request.go +++ b/request.go @@ -48,12 +48,13 @@ type RequestManager struct { } type request struct { - id string - filters [][]byte - buffer chan ReqEvent - events chan ReqEvent - closed chan ReqClosed - once sync.Once + id string + filters [][]byte + buffer chan ReqEvent + events chan ReqEvent + closed chan ReqClosed + deregisterOnce sync.Once + closedOnce sync.Once } type session struct { @@ -74,8 +75,10 @@ type session struct { } type sessionSub struct { - eose chan<- struct{} - closed chan<- struct{} + eose chan<- struct{} + closed chan<- struct{} + eoseOnce sync.Once + closedOnce sync.Once } type terminateReason int @@ -192,7 +195,7 @@ func (m *RequestManager) Cancel(id string) error { sess.Close() } - req.once.Do(func() { + req.deregisterOnce.Do(func() { close(req.buffer) close(req.closed) }) @@ -220,7 +223,7 @@ func (m *RequestManager) Close() { m.mu.Lock() for id, req := range m.reqs { - req.once.Do(func() { + req.deregisterOnce.Do(func() { close(req.buffer) close(req.closed) }) @@ -247,7 +250,7 @@ func (m *RequestManager) spawnSession(req *request) { m.mu.Unlock() m.sessionWg.Done() if r == termReceivedClosed { - req.once.Do(func() { + req.deregisterOnce.Do(func() { close(req.buffer) close(req.closed) }) @@ -333,10 +336,12 @@ func (m *RequestManager) dispatchInbox(msg InboxMessage) { if !ok { return } - select { - case sub.eose <- struct{}{}: - default: - } + sub.eoseOnce.Do(func() { + select { + case sub.eose <- struct{}{}: + default: + } + }) case "CLOSED": subID, message, err := envelope.FindClosed(msg.Data) if err != nil { @@ -347,11 +352,15 @@ func (m *RequestManager) dispatchInbox(msg InboxMessage) { sub, subOk := m.inboxSubs[subID] m.mu.RUnlock() if reqOk { - req.closed <- ReqClosed{ - PeerID: msg.ID, ReceivedAt: msg.ReceivedAt, Data: message} + req.closedOnce.Do(func() { + req.closed <- ReqClosed{ + PeerID: msg.ID, ReceivedAt: msg.ReceivedAt, Data: message} + }) } if subOk { - sub.closed <- struct{}{} + sub.closedOnce.Do(func() { + sub.closed <- struct{}{} + }) } } } diff --git a/request_test.go b/request_test.go index 11cf861..11d4f47 100644 --- a/request_test.go +++ b/request_test.go @@ -404,6 +404,39 @@ func TestRequestManager_Stream(t *testing.T) { m.mu.RUnlock() assert.False(t, ok, "registration should be removed from reqs") }) + + t.Run("duplicate closed does not panic", func(t *testing.T) { + p, envoy := newMockEnvoy(t) + p.connect() + Eventually(t, envoy.IsConnected, "envoy should be connected") + + m := NewRequestManager(envoy) + filters := [][]byte{[]byte(`{}`)} + id, _, _ := m.Stream(filters) + + // drain the REQ send + Eventually(t, func() bool { + select { + case <-p.sent: + return true + default: + return false + } + }, "expected REQ send") + + // inject both before the router can process either + p.receive(envelope.EncloseClosed(id, "error: first")) + p.receive(envelope.EncloseClosed(id, "error: second")) + + // if the router panics, the test will fail with a goroutine crash; + // assert the session eventually terminates cleanly as a liveness check + Eventually(t, func() bool { + m.mu.RLock() + _, ok := m.sessions[id] + m.mu.RUnlock() + return !ok + }, "session should terminate after closed") + }) } func TestRequestManager_Cancel(t *testing.T) {