From 1e1d197b85921a819af6c98358ad03a132f7e92d Mon Sep 17 00:00:00 2001 From: Jay Date: Sun, 17 May 2026 12:13:30 -0400 Subject: [PATCH] stream: register request, spawn session, send REQ on connect --- request.go | 71 +++++++++++++++++++++++++++++++++++++++++++++---- request_test.go | 32 +++++++++++++++++++--- 2 files changed, 94 insertions(+), 9 deletions(-) diff --git a/request.go b/request.go index cea588b..30f4ce5 100644 --- a/request.go +++ b/request.go @@ -138,11 +138,31 @@ func NewRequestManager(e *Envoy) *RequestManager { func (m *RequestManager) Stream( filters [][]byte, ) (string, <-chan ReqEvent, <-chan ReqClosed) { - // generate id - // create channels - // register request - // spawn session if connected - return "", nil, nil + id := generateID() + buffer := make(chan ReqEvent, 64) + events := make(chan ReqEvent) + closed := make(chan ReqClosed, 1) + + req := &request{ + id: id, + filters: filters, + buffer: buffer, + events: events, + closed: closed, + } + + m.mu.Lock() + m.reqs[id] = req + go func() { + bufferedPipe(buffer, events) + close(events) + }() + if m.envoy.IsConnected() { + m.spawnSession(req) + } + m.mu.Unlock() + + return id, events, closed } func (m *RequestManager) Query( @@ -162,6 +182,47 @@ func (m *RequestManager) Close() { m.wg.Wait() } +func (m *RequestManager) spawnSession(req *request) { + eose := make(chan struct{}) + closed := make(chan struct{}) + + sub := &sessionSub{eose: eose, closed: closed} + m.inboxSubs[req.id] = sub + + var once sync.Once + terminate := func(r terminateReason) { + once.Do(func() { + m.mu.Lock() + close(eose) + close(closed) + delete(m.inboxSubs, req.id) + delete(m.sessions, req.id) + m.mu.Unlock() + m.sessionWg.Done() + if r == termReceivedClosed { + req.once.Do(func() { + close(req.buffer) + close(req.closed) + }) + m.mu.Lock() + delete(m.reqs, req.id) + m.mu.Unlock() + } + }) + } + + req_env := envelope.EncloseReq(req.id, req.filters) + sess := newSession( + m.ctx, req.id, req_env, + eose, closed, m.done, + m.envoy.Send, terminate, + false, m.handler, + ) + m.sessions[req.id] = sess + m.sessionWg.Add(1) + go sess.run() +} + func (m *RequestManager) start() { // start all request sessions } diff --git a/request_test.go b/request_test.go index f8daa86..9e97590 100644 --- a/request_test.go +++ b/request_test.go @@ -208,10 +208,34 @@ func TestRequestManager_Session(t *testing.T) { func TestRequestManager_Stream(t *testing.T) { t.Run("spawns session and sends req when connected", func(t *testing.T) { - // connect the envoy before calling Stream - // call Stream with filters - // assert the mock send was called with a REQ envelope - // assert the generated id appears in the REQ envelope + p := newMockPool(t) + emb := NewEmbassy(p.ctx, p.plugin, nil) + err := emb.Dispatch(p.url) + assert.NoError(t, err) + envoy := emb.Call(p.url) + + p.connect() + Eventually(t, envoy.IsConnected, "envoy should be connected") + + m := NewRequestManager(envoy) + filters := [][]byte{[]byte(`{}`)} + id, events, closed := m.Stream(filters) + + assert.NotEmpty(t, id) + assert.NotNil(t, events) + assert.NotNil(t, closed) + + var got []byte + Eventually(t, func() bool { + select { + case got = <-p.sent: + return true + default: + return false + } + }, "expected REQ send") + + assert.Equal(t, []byte(envelope.EncloseReq(id, filters)), got) }) t.Run("registers but does not spawn session when disconnected", func(t *testing.T) {