stream: route closed to session and request; closed deregisters and signals caller

This commit is contained in:
Jay
2026-05-17 12:43:01 -04:00
parent f7948c08b8
commit de3a59d6a6
2 changed files with 68 additions and 6 deletions
+15 -1
View File
@@ -292,7 +292,21 @@ func (m *RequestManager) dispatchInbox(msg InboxMessage) {
default: default:
} }
case "CLOSED": case "CLOSED":
// route to session and request subID, message, err := envelope.FindClosed(msg.Data)
if err != nil {
return
}
m.mu.RLock()
req, reqOk := m.reqs[subID]
sub, subOk := m.inboxSubs[subID]
m.mu.RUnlock()
if reqOk {
req.closed <- ReqClosed{
PeerID: msg.ID, ReceivedAt: msg.ReceivedAt, Data: message}
}
if subOk {
sub.closed <- struct{}{}
}
} }
} }
+53 -5
View File
@@ -350,11 +350,59 @@ func TestRequestManager_Stream(t *testing.T) {
}) })
t.Run("closed deregisters and signals caller", func(t *testing.T) { t.Run("closed deregisters and signals caller", func(t *testing.T) {
// connect, call Stream p, envoy := newMockEnvoy(t)
// inject a CLOSED envelope with a reason string p.connect()
// assert the closed channel yields a ReqClosed with the correct message Eventually(t, envoy.IsConnected, "envoy should be connected")
// assert the events channel eventually closes (buffer drained and deregistered)
// assert the registration is removed from reqs m := NewRequestManager(envoy)
filters := [][]byte{[]byte(`{}`)}
id, events, closed := m.Stream(filters)
// drain the REQ send
Eventually(t, func() bool {
select {
case <-p.sent:
return true
default:
return false
}
}, "expected REQ send")
p.receive(envelope.EncloseClosed(id, "error: test"))
var got ReqClosed
Eventually(t, func() bool {
select {
case got = <-closed:
return true
default:
return false
}
}, "expected closed signal")
assert.Equal(t, "error: test", got.Data)
Eventually(t, func() bool {
select {
case _, ok := <-events:
return !ok
default:
return false
}
}, "events channel should close after deregistration")
Eventually(t, func() bool {
select {
case _, ok := <-closed:
return !ok
default:
return false
}
}, "closed channel should close after deregistration")
m.mu.RLock()
_, ok := m.reqs[id]
m.mu.RUnlock()
assert.False(t, ok, "registration should be removed from reqs")
}) })
} }