diff --git a/request.go b/request.go index f9a0000..f5a02d1 100644 --- a/request.go +++ b/request.go @@ -292,7 +292,21 @@ func (m *RequestManager) dispatchInbox(msg InboxMessage) { default: } case "CLOSED": - // route to session and request + subID, message, err := envelope.FindClosed(msg.Data) + if err != nil { + return + } + m.mu.RLock() + req, reqOk := m.reqs[subID] + sub, subOk := m.inboxSubs[subID] + m.mu.RUnlock() + if reqOk { + req.closed <- ReqClosed{ + PeerID: msg.ID, ReceivedAt: msg.ReceivedAt, Data: message} + } + if subOk { + sub.closed <- struct{}{} + } } } diff --git a/request_test.go b/request_test.go index 5ac608b..0db27fe 100644 --- a/request_test.go +++ b/request_test.go @@ -350,11 +350,59 @@ func TestRequestManager_Stream(t *testing.T) { }) t.Run("closed deregisters and signals caller", func(t *testing.T) { - // connect, call Stream - // inject a CLOSED envelope with a reason string - // assert the closed channel yields a ReqClosed with the correct message - // assert the events channel eventually closes (buffer drained and deregistered) - // assert the registration is removed from reqs + p, envoy := newMockEnvoy(t) + p.connect() + Eventually(t, envoy.IsConnected, "envoy should be connected") + + m := NewRequestManager(envoy) + filters := [][]byte{[]byte(`{}`)} + id, events, closed := m.Stream(filters) + + // drain the REQ send + Eventually(t, func() bool { + select { + case <-p.sent: + return true + default: + return false + } + }, "expected REQ send") + + p.receive(envelope.EncloseClosed(id, "error: test")) + + var got ReqClosed + Eventually(t, func() bool { + select { + case got = <-closed: + return true + default: + return false + } + }, "expected closed signal") + assert.Equal(t, "error: test", got.Data) + + Eventually(t, func() bool { + select { + case _, ok := <-events: + return !ok + default: + return false + } + }, "events channel should close after deregistration") + + Eventually(t, func() bool { + select { + case _, ok := <-closed: + return !ok + default: + return false + } + }, "closed channel should close after deregistration") + + m.mu.RLock() + _, ok := m.reqs[id] + m.mu.RUnlock() + assert.False(t, ok, "registration should be removed from reqs") }) }