diff --git a/request.go b/request.go index 30f4ce5..0445581 100644 --- a/request.go +++ b/request.go @@ -130,7 +130,8 @@ func NewRequestManager(e *Envoy) *RequestManager { } // start event handler - // start inbox router + m.wg.Add(1) + go m.routeInbox() return m } @@ -240,11 +241,46 @@ func (m *RequestManager) handleEvents() { func (m *RequestManager) routeInbox() { defer m.wg.Done() - // unpack/route inbox message - // events forward directly to request event buffer - // eose goes to session - // closed goes both to session and request - // uses read lock for map lookups + for { + select { + case <-m.ctx.Done(): + return + case msg, ok := <-m.inbox: + if !ok { + return + } + m.dispatchInbox(msg) + } + } +} + +func (m *RequestManager) dispatchInbox(msg InboxMessage) { + label, err := envelope.GetLabel(msg.Data) + if err != nil { + return + } + + switch string(label) { + case "EVENT": + subID, event, err := envelope.FindSubscriptionEvent(msg.Data) + if err != nil { + return + } + m.mu.RLock() + req, ok := m.reqs[subID] + m.mu.RUnlock() + if !ok { + return + } + select { + case req.buffer <- ReqEvent{PeerID: msg.ID, ReceivedAt: msg.ReceivedAt, Data: event}: + default: + } + case "EOSE": + // route to session + case "CLOSED": + // route to session and request + } } // ---------------------------------------------------------------------------- diff --git a/request_test.go b/request_test.go index d22b4fb..f4b41bb 100644 --- a/request_test.go +++ b/request_test.go @@ -256,10 +256,46 @@ func TestRequestManager_Stream(t *testing.T) { }) t.Run("forwards events to caller", func(t *testing.T) { - // connect, call Stream, get events channel - // inject two EVENT envelopes for the correct sub id into mock inbox - // inject one EVENT envelope for an unrelated sub id - // assert exactly two events appear on the caller's events channel + p, envoy := newMockEnvoy(t) + p.connect() + Eventually(t, envoy.IsConnected, "envoy should be connected") + + m := NewRequestManager(envoy) + filters := [][]byte{[]byte(`{}`)} + id, events, _ := m.Stream(filters) + + // drain the REQ send + Eventually(t, func() bool { + select { + case <-p.sent: + return true + default: + return false + } + }, "expected REQ send") + + eventA := []byte(`{"id":"a"}`) + eventB := []byte(`{"id":"b"}`) + eventC := []byte(`{"id":"c"}`) + p.receive(envelope.EncloseSubscriptionEvent(id, eventA)) + p.receive(envelope.EncloseSubscriptionEvent(id, eventB)) + p.receive(envelope.EncloseSubscriptionEvent("unrelated", eventC)) + + var got []ReqEvent + Eventually(t, func() bool { + for { + select { + case ev := <-events: + got = append(got, ev) + default: + return len(got) >= 2 + } + } + }, "expected two events") + + assert.Len(t, got, 2) + assert.Equal(t, eventA, got[0].Data) + assert.Equal(t, eventB, got[1].Data) }) t.Run("ignores eose", func(t *testing.T) {