stream: guard closed and eose sends with Once; test duplicate closed
This commit is contained in:
+27
-18
@@ -48,12 +48,13 @@ type RequestManager struct {
|
||||
}
|
||||
|
||||
type request struct {
|
||||
id string
|
||||
filters [][]byte
|
||||
buffer chan ReqEvent
|
||||
events chan ReqEvent
|
||||
closed chan ReqClosed
|
||||
once sync.Once
|
||||
id string
|
||||
filters [][]byte
|
||||
buffer chan ReqEvent
|
||||
events chan ReqEvent
|
||||
closed chan ReqClosed
|
||||
deregisterOnce sync.Once
|
||||
closedOnce sync.Once
|
||||
}
|
||||
|
||||
type session struct {
|
||||
@@ -74,8 +75,10 @@ type session struct {
|
||||
}
|
||||
|
||||
type sessionSub struct {
|
||||
eose chan<- struct{}
|
||||
closed chan<- struct{}
|
||||
eose chan<- struct{}
|
||||
closed chan<- struct{}
|
||||
eoseOnce sync.Once
|
||||
closedOnce sync.Once
|
||||
}
|
||||
|
||||
type terminateReason int
|
||||
@@ -192,7 +195,7 @@ func (m *RequestManager) Cancel(id string) error {
|
||||
sess.Close()
|
||||
}
|
||||
|
||||
req.once.Do(func() {
|
||||
req.deregisterOnce.Do(func() {
|
||||
close(req.buffer)
|
||||
close(req.closed)
|
||||
})
|
||||
@@ -220,7 +223,7 @@ func (m *RequestManager) Close() {
|
||||
|
||||
m.mu.Lock()
|
||||
for id, req := range m.reqs {
|
||||
req.once.Do(func() {
|
||||
req.deregisterOnce.Do(func() {
|
||||
close(req.buffer)
|
||||
close(req.closed)
|
||||
})
|
||||
@@ -247,7 +250,7 @@ func (m *RequestManager) spawnSession(req *request) {
|
||||
m.mu.Unlock()
|
||||
m.sessionWg.Done()
|
||||
if r == termReceivedClosed {
|
||||
req.once.Do(func() {
|
||||
req.deregisterOnce.Do(func() {
|
||||
close(req.buffer)
|
||||
close(req.closed)
|
||||
})
|
||||
@@ -333,10 +336,12 @@ func (m *RequestManager) dispatchInbox(msg InboxMessage) {
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
select {
|
||||
case sub.eose <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
sub.eoseOnce.Do(func() {
|
||||
select {
|
||||
case sub.eose <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
})
|
||||
case "CLOSED":
|
||||
subID, message, err := envelope.FindClosed(msg.Data)
|
||||
if err != nil {
|
||||
@@ -347,11 +352,15 @@ func (m *RequestManager) dispatchInbox(msg InboxMessage) {
|
||||
sub, subOk := m.inboxSubs[subID]
|
||||
m.mu.RUnlock()
|
||||
if reqOk {
|
||||
req.closed <- ReqClosed{
|
||||
PeerID: msg.ID, ReceivedAt: msg.ReceivedAt, Data: message}
|
||||
req.closedOnce.Do(func() {
|
||||
req.closed <- ReqClosed{
|
||||
PeerID: msg.ID, ReceivedAt: msg.ReceivedAt, Data: message}
|
||||
})
|
||||
}
|
||||
if subOk {
|
||||
sub.closed <- struct{}{}
|
||||
sub.closedOnce.Do(func() {
|
||||
sub.closed <- struct{}{}
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -404,6 +404,39 @@ func TestRequestManager_Stream(t *testing.T) {
|
||||
m.mu.RUnlock()
|
||||
assert.False(t, ok, "registration should be removed from reqs")
|
||||
})
|
||||
|
||||
t.Run("duplicate closed does not panic", func(t *testing.T) {
|
||||
p, envoy := newMockEnvoy(t)
|
||||
p.connect()
|
||||
Eventually(t, envoy.IsConnected, "envoy should be connected")
|
||||
|
||||
m := NewRequestManager(envoy)
|
||||
filters := [][]byte{[]byte(`{}`)}
|
||||
id, _, _ := m.Stream(filters)
|
||||
|
||||
// drain the REQ send
|
||||
Eventually(t, func() bool {
|
||||
select {
|
||||
case <-p.sent:
|
||||
return true
|
||||
default:
|
||||
return false
|
||||
}
|
||||
}, "expected REQ send")
|
||||
|
||||
// inject both before the router can process either
|
||||
p.receive(envelope.EncloseClosed(id, "error: first"))
|
||||
p.receive(envelope.EncloseClosed(id, "error: second"))
|
||||
|
||||
// if the router panics, the test will fail with a goroutine crash;
|
||||
// assert the session eventually terminates cleanly as a liveness check
|
||||
Eventually(t, func() bool {
|
||||
m.mu.RLock()
|
||||
_, ok := m.sessions[id]
|
||||
m.mu.RUnlock()
|
||||
return !ok
|
||||
}, "session should terminate after closed")
|
||||
})
|
||||
}
|
||||
|
||||
func TestRequestManager_Cancel(t *testing.T) {
|
||||
|
||||
Reference in New Issue
Block a user