diff --git a/request.go b/request.go index 6a82415..352bdb8 100644 --- a/request.go +++ b/request.go @@ -204,8 +204,24 @@ func (m *RequestManager) Cancel(id string) error { func (m *RequestManager) Close() { m.cancel() m.wg.Wait() - // call session.Close for each open session - // manually deregister and close each registered request. + + m.mu.Lock() + for _, sess := range m.sessions { + sess.Close() + } + m.mu.Unlock() + + m.sessionWg.Wait() + + m.mu.Lock() + for id, req := range m.reqs { + req.once.Do(func() { + close(req.buffer) + close(req.closed) + }) + delete(m.reqs, id) + } + m.mu.Unlock() } func (m *RequestManager) spawnSession(req *request) { diff --git a/request_test.go b/request_test.go index d39ee65..11cf861 100644 --- a/request_test.go +++ b/request_test.go @@ -598,19 +598,93 @@ func TestRequestManager_InboxRouting(t *testing.T) { func TestRequestManager_Close(t *testing.T) { t.Run("terminates all sessions without deadlock", func(t *testing.T) { - // connect, open three streams - // call manager.Close() - // assert Close returns (does not deadlock) - // assert all sessions are terminated (sessions map empty) + p, envoy := newMockEnvoy(t) + p.connect() + Eventually(t, envoy.IsConnected, "envoy should be connected") + + m := NewRequestManager(envoy) + filters := [][]byte{[]byte(`{}`)} + m.Stream(filters) + m.Stream(filters) + m.Stream(filters) + + // drain all three REQ sends + for range 3 { + Eventually(t, func() bool { + select { + case <-p.sent: + return true + default: + return false + } + }, "expected REQ send") + } + + done := make(chan struct{}) + go func() { + m.Close() + close(done) + }() + + Eventually(t, func() bool { + select { + case <-done: + return true + default: + return false + } + }, "Close should return without deadlock") + + m.mu.RLock() + count := len(m.sessions) + m.mu.RUnlock() + assert.Equal(t, 0, count, "all sessions should be terminated") }) t.Run("deregisters all requests on close", func(t *testing.T) { - // connect, open two streams - // call manager.Close() - // -- calls session.Close for each registration - // -- manually cleans up the rest - // all sessions are stopped - // all request registrations are removed - // all registration channels close + p, envoy := newMockEnvoy(t) + p.connect() + Eventually(t, envoy.IsConnected, "envoy should be connected") + + m := NewRequestManager(envoy) + filters := [][]byte{[]byte(`{}`)} + _, eventsA, _ := m.Stream(filters) + _, eventsB, _ := m.Stream(filters) + + for range 2 { + Eventually(t, func() bool { + select { + case <-p.sent: + return true + default: + return false + } + }, "expected REQ send") + } + + m.Close() + + m.mu.RLock() + count := len(m.reqs) + m.mu.RUnlock() + assert.Equal(t, 0, count, "all registrations should be removed") + + Eventually(t, func() bool { + select { + case _, ok := <-eventsA: + return !ok + default: + return false + } + }, "eventsA should close after manager close") + + Eventually(t, func() bool { + select { + case _, ok := <-eventsB: + return !ok + default: + return false + } + }, "eventsB should close after manager close") }) }