stream: route inbox events to request buffer
This commit is contained in:
+42
-6
@@ -130,7 +130,8 @@ func NewRequestManager(e *Envoy) *RequestManager {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// start event handler
|
// start event handler
|
||||||
// start inbox router
|
m.wg.Add(1)
|
||||||
|
go m.routeInbox()
|
||||||
|
|
||||||
return m
|
return m
|
||||||
}
|
}
|
||||||
@@ -240,11 +241,46 @@ func (m *RequestManager) handleEvents() {
|
|||||||
func (m *RequestManager) routeInbox() {
|
func (m *RequestManager) routeInbox() {
|
||||||
defer m.wg.Done()
|
defer m.wg.Done()
|
||||||
|
|
||||||
// unpack/route inbox message
|
for {
|
||||||
// events forward directly to request event buffer
|
select {
|
||||||
// eose goes to session
|
case <-m.ctx.Done():
|
||||||
// closed goes both to session and request
|
return
|
||||||
// uses read lock for map lookups
|
case msg, ok := <-m.inbox:
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
m.dispatchInbox(msg)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *RequestManager) dispatchInbox(msg InboxMessage) {
|
||||||
|
label, err := envelope.GetLabel(msg.Data)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
switch string(label) {
|
||||||
|
case "EVENT":
|
||||||
|
subID, event, err := envelope.FindSubscriptionEvent(msg.Data)
|
||||||
|
if err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
m.mu.RLock()
|
||||||
|
req, ok := m.reqs[subID]
|
||||||
|
m.mu.RUnlock()
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case req.buffer <- ReqEvent{PeerID: msg.ID, ReceivedAt: msg.ReceivedAt, Data: event}:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
case "EOSE":
|
||||||
|
// route to session
|
||||||
|
case "CLOSED":
|
||||||
|
// route to session and request
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ----------------------------------------------------------------------------
|
// ----------------------------------------------------------------------------
|
||||||
|
|||||||
+40
-4
@@ -256,10 +256,46 @@ func TestRequestManager_Stream(t *testing.T) {
|
|||||||
})
|
})
|
||||||
|
|
||||||
t.Run("forwards events to caller", func(t *testing.T) {
|
t.Run("forwards events to caller", func(t *testing.T) {
|
||||||
// connect, call Stream, get events channel
|
p, envoy := newMockEnvoy(t)
|
||||||
// inject two EVENT envelopes for the correct sub id into mock inbox
|
p.connect()
|
||||||
// inject one EVENT envelope for an unrelated sub id
|
Eventually(t, envoy.IsConnected, "envoy should be connected")
|
||||||
// assert exactly two events appear on the caller's events channel
|
|
||||||
|
m := NewRequestManager(envoy)
|
||||||
|
filters := [][]byte{[]byte(`{}`)}
|
||||||
|
id, events, _ := m.Stream(filters)
|
||||||
|
|
||||||
|
// drain the REQ send
|
||||||
|
Eventually(t, func() bool {
|
||||||
|
select {
|
||||||
|
case <-p.sent:
|
||||||
|
return true
|
||||||
|
default:
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
}, "expected REQ send")
|
||||||
|
|
||||||
|
eventA := []byte(`{"id":"a"}`)
|
||||||
|
eventB := []byte(`{"id":"b"}`)
|
||||||
|
eventC := []byte(`{"id":"c"}`)
|
||||||
|
p.receive(envelope.EncloseSubscriptionEvent(id, eventA))
|
||||||
|
p.receive(envelope.EncloseSubscriptionEvent(id, eventB))
|
||||||
|
p.receive(envelope.EncloseSubscriptionEvent("unrelated", eventC))
|
||||||
|
|
||||||
|
var got []ReqEvent
|
||||||
|
Eventually(t, func() bool {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case ev := <-events:
|
||||||
|
got = append(got, ev)
|
||||||
|
default:
|
||||||
|
return len(got) >= 2
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}, "expected two events")
|
||||||
|
|
||||||
|
assert.Len(t, got, 2)
|
||||||
|
assert.Equal(t, eventA, got[0].Data)
|
||||||
|
assert.Equal(t, eventB, got[1].Data)
|
||||||
})
|
})
|
||||||
|
|
||||||
t.Run("ignores eose", func(t *testing.T) {
|
t.Run("ignores eose", func(t *testing.T) {
|
||||||
|
|||||||
Reference in New Issue
Block a user