stream: route eose to session, ignores eose subtest
This commit is contained in:
+16
-3
@@ -273,11 +273,24 @@ func (m *RequestManager) dispatchInbox(msg InboxMessage) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
select {
|
select {
|
||||||
case req.buffer <- ReqEvent{PeerID: msg.ID, ReceivedAt: msg.ReceivedAt, Data: event}:
|
case req.buffer <- ReqEvent{
|
||||||
default:
|
PeerID: msg.ID, ReceivedAt: msg.ReceivedAt, Data: event}:
|
||||||
}
|
}
|
||||||
case "EOSE":
|
case "EOSE":
|
||||||
// route to session
|
subID, err := envelope.FindEOSE(msg.Data)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
m.mu.RLock()
|
||||||
|
sub, ok := m.inboxSubs[subID]
|
||||||
|
m.mu.RUnlock()
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case sub.eose <- struct{}{}:
|
||||||
|
default:
|
||||||
|
}
|
||||||
case "CLOSED":
|
case "CLOSED":
|
||||||
// route to session and request
|
// route to session and request
|
||||||
}
|
}
|
||||||
|
|||||||
+48
-5
@@ -299,11 +299,54 @@ func TestRequestManager_Stream(t *testing.T) {
|
|||||||
})
|
})
|
||||||
|
|
||||||
t.Run("ignores eose", func(t *testing.T) {
|
t.Run("ignores eose", func(t *testing.T) {
|
||||||
// connect, call Stream
|
p, envoy := newMockEnvoy(t)
|
||||||
// inject an EOSE envelope for the sub id
|
p.connect()
|
||||||
// assert the events channel does not close
|
Eventually(t, envoy.IsConnected, "envoy should be connected")
|
||||||
// assert the closed channel receives nothing
|
|
||||||
// assert a subsequent EVENT is still forwarded
|
m := NewRequestManager(envoy)
|
||||||
|
filters := [][]byte{[]byte(`{}`)}
|
||||||
|
id, events, closed := m.Stream(filters)
|
||||||
|
|
||||||
|
// drain the REQ send
|
||||||
|
Eventually(t, func() bool {
|
||||||
|
select {
|
||||||
|
case <-p.sent:
|
||||||
|
return true
|
||||||
|
default:
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}, "expected REQ send")
|
||||||
|
|
||||||
|
p.receive(envelope.EncloseEOSE(id))
|
||||||
|
|
||||||
|
Never(t, func() bool {
|
||||||
|
m.mu.RLock()
|
||||||
|
_, ok := m.sessions[id]
|
||||||
|
m.mu.RUnlock()
|
||||||
|
return !ok
|
||||||
|
}, "session should not terminate after eose")
|
||||||
|
|
||||||
|
Never(t, func() bool {
|
||||||
|
select {
|
||||||
|
case <-closed:
|
||||||
|
return true
|
||||||
|
default:
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}, "closed should not signal on eose for stream")
|
||||||
|
|
||||||
|
// assert a subsequent event is still forwarded
|
||||||
|
eventA := []byte(`{"id":"a"}`)
|
||||||
|
p.receive(envelope.EncloseSubscriptionEvent(id, eventA))
|
||||||
|
|
||||||
|
Eventually(t, func() bool {
|
||||||
|
select {
|
||||||
|
case ev := <-events:
|
||||||
|
return assert.Equal(t, eventA, ev.Data)
|
||||||
|
default:
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}, "expected event after eose")
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("closed deregisters and signals caller", func(t *testing.T) {
|
t.Run("closed deregisters and signals caller", func(t *testing.T) {
|
||||||
|
|||||||
Reference in New Issue
Block a user