close: terminate sessions, deregister all requests
This commit is contained in:
+18
-2
@@ -204,8 +204,24 @@ func (m *RequestManager) Cancel(id string) error {
|
|||||||
func (m *RequestManager) Close() {
|
func (m *RequestManager) Close() {
|
||||||
m.cancel()
|
m.cancel()
|
||||||
m.wg.Wait()
|
m.wg.Wait()
|
||||||
// call session.Close for each open session
|
|
||||||
// manually deregister and close each registered request.
|
m.mu.Lock()
|
||||||
|
for _, sess := range m.sessions {
|
||||||
|
sess.Close()
|
||||||
|
}
|
||||||
|
m.mu.Unlock()
|
||||||
|
|
||||||
|
m.sessionWg.Wait()
|
||||||
|
|
||||||
|
m.mu.Lock()
|
||||||
|
for id, req := range m.reqs {
|
||||||
|
req.once.Do(func() {
|
||||||
|
close(req.buffer)
|
||||||
|
close(req.closed)
|
||||||
|
})
|
||||||
|
delete(m.reqs, id)
|
||||||
|
}
|
||||||
|
m.mu.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *RequestManager) spawnSession(req *request) {
|
func (m *RequestManager) spawnSession(req *request) {
|
||||||
|
|||||||
+85
-11
@@ -598,19 +598,93 @@ func TestRequestManager_InboxRouting(t *testing.T) {
|
|||||||
|
|
||||||
func TestRequestManager_Close(t *testing.T) {
|
func TestRequestManager_Close(t *testing.T) {
|
||||||
t.Run("terminates all sessions without deadlock", func(t *testing.T) {
|
t.Run("terminates all sessions without deadlock", func(t *testing.T) {
|
||||||
// connect, open three streams
|
p, envoy := newMockEnvoy(t)
|
||||||
// call manager.Close()
|
p.connect()
|
||||||
// assert Close returns (does not deadlock)
|
Eventually(t, envoy.IsConnected, "envoy should be connected")
|
||||||
// assert all sessions are terminated (sessions map empty)
|
|
||||||
|
m := NewRequestManager(envoy)
|
||||||
|
filters := [][]byte{[]byte(`{}`)}
|
||||||
|
m.Stream(filters)
|
||||||
|
m.Stream(filters)
|
||||||
|
m.Stream(filters)
|
||||||
|
|
||||||
|
// drain all three REQ sends
|
||||||
|
for range 3 {
|
||||||
|
Eventually(t, func() bool {
|
||||||
|
select {
|
||||||
|
case <-p.sent:
|
||||||
|
return true
|
||||||
|
default:
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}, "expected REQ send")
|
||||||
|
}
|
||||||
|
|
||||||
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
m.Close()
|
||||||
|
close(done)
|
||||||
|
}()
|
||||||
|
|
||||||
|
Eventually(t, func() bool {
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
return true
|
||||||
|
default:
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}, "Close should return without deadlock")
|
||||||
|
|
||||||
|
m.mu.RLock()
|
||||||
|
count := len(m.sessions)
|
||||||
|
m.mu.RUnlock()
|
||||||
|
assert.Equal(t, 0, count, "all sessions should be terminated")
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("deregisters all requests on close", func(t *testing.T) {
|
t.Run("deregisters all requests on close", func(t *testing.T) {
|
||||||
// connect, open two streams
|
p, envoy := newMockEnvoy(t)
|
||||||
// call manager.Close()
|
p.connect()
|
||||||
// -- calls session.Close for each registration
|
Eventually(t, envoy.IsConnected, "envoy should be connected")
|
||||||
// -- manually cleans up the rest
|
|
||||||
// all sessions are stopped
|
m := NewRequestManager(envoy)
|
||||||
// all request registrations are removed
|
filters := [][]byte{[]byte(`{}`)}
|
||||||
// all registration channels close
|
_, eventsA, _ := m.Stream(filters)
|
||||||
|
_, eventsB, _ := m.Stream(filters)
|
||||||
|
|
||||||
|
for range 2 {
|
||||||
|
Eventually(t, func() bool {
|
||||||
|
select {
|
||||||
|
case <-p.sent:
|
||||||
|
return true
|
||||||
|
default:
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}, "expected REQ send")
|
||||||
|
}
|
||||||
|
|
||||||
|
m.Close()
|
||||||
|
|
||||||
|
m.mu.RLock()
|
||||||
|
count := len(m.reqs)
|
||||||
|
m.mu.RUnlock()
|
||||||
|
assert.Equal(t, 0, count, "all registrations should be removed")
|
||||||
|
|
||||||
|
Eventually(t, func() bool {
|
||||||
|
select {
|
||||||
|
case _, ok := <-eventsA:
|
||||||
|
return !ok
|
||||||
|
default:
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}, "eventsA should close after manager close")
|
||||||
|
|
||||||
|
Eventually(t, func() bool {
|
||||||
|
select {
|
||||||
|
case _, ok := <-eventsB:
|
||||||
|
return !ok
|
||||||
|
default:
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}, "eventsB should close after manager close")
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user