From 96d7ab027bcc05f5b99a4ff1174eed9b8d75b2b3 Mon Sep 17 00:00:00 2001 From: Jay Date: Sun, 17 May 2026 16:34:03 -0400 Subject: [PATCH] query: implement Query; spawnSession accepts query bool --- request.go | 61 +++++++++++++++++++++++++++++++++++++++++-------- request_test.go | 47 ++++++++++++++++++++++++++++++++----- 2 files changed, 92 insertions(+), 16 deletions(-) diff --git a/request.go b/request.go index 1f5043b..84bb4e3 100644 --- a/request.go +++ b/request.go @@ -163,7 +163,7 @@ func (m *RequestManager) Stream( close(events) }() if m.envoy.IsConnected() { - m.spawnSession(req) + m.spawnSession(req, false) } m.mu.Unlock() @@ -174,12 +174,53 @@ func (m *RequestManager) Query( filters [][]byte, timeout time.Duration, ) (events []ReqEvent, closed *ReqClosed) { - // return if disconnected - // generate id - // create channels - // spawn session - // collect events - return + if !m.envoy.IsConnected() { + return nil, nil + } + + id := generateID() + buffer := make(chan ReqEvent, 64) + eventsCh := make(chan ReqEvent) + closedCh := make(chan ReqClosed, 1) + + req := &request{ + id: id, + filters: filters, + buffer: buffer, + events: eventsCh, + closed: closedCh, + } + + m.mu.Lock() + m.reqs[id] = req + go func() { + bufferedPipe(buffer, eventsCh) + close(eventsCh) + }() + + m.spawnSession(req, true) + m.mu.Unlock() + + ctx, cancel := context.WithTimeout(m.ctx, timeout) + defer cancel() + + var result []ReqEvent + for { + select { + case ev, ok := <-eventsCh: + if !ok { + return result, nil + } + result = append(result, ev) + case cl, ok := <-closedCh: + if !ok { + return result, nil + } + return result, &cl + case <-ctx.Done(): + return result, nil + } + } } func (m *RequestManager) Cancel(id string) error { @@ -232,7 +273,7 @@ func (m *RequestManager) Close() { m.mu.Unlock() } -func (m *RequestManager) spawnSession(req *request) { +func (m *RequestManager) spawnSession(req *request, query bool) { eose := make(chan struct{}) closed := make(chan struct{}) @@ -249,7 +290,7 @@ func (m *RequestManager) spawnSession(req *request) { delete(m.sessions, req.id) m.mu.Unlock() m.sessionWg.Done() - if r == termReceivedClosed { + if r == termReceivedClosed || r == termCloseSent { req.deregisterOnce.Do(func() { close(req.buffer) close(req.closed) @@ -266,7 +307,7 @@ func (m *RequestManager) spawnSession(req *request) { m.ctx, req.id, req_env, eose, closed, m.done, m.envoy.Send, terminate, - false, m.handler, + query, m.handler, ) m.sessions[req.id] = sess m.sessionWg.Add(1) diff --git a/request_test.go b/request_test.go index 537fa7a..2bdc3a8 100644 --- a/request_test.go +++ b/request_test.go @@ -5,6 +5,7 @@ import ( "git.wisehodl.dev/jay/go-roots-ws" "github.com/stretchr/testify/assert" "testing" + "time" ) // Session tests exercise the session struct in isolation. @@ -541,12 +542,46 @@ func TestRequestManager_Cancel(t *testing.T) { func TestRequestManager_Query(t *testing.T) { t.Run("returns events and nil closed on eose", func(t *testing.T) { - // connect the envoy - // in a goroutine: inject three EVENT envelopes then EOSE for the query sub id - // call Query (blocks until return) - // assert the returned slice contains exactly three events - // assert closed is nil - // assert mock send was called with a CLOSE envelope (closeOnEOSE behavior) + p, envoy := newMockEnvoy(t) + p.connect() + Eventually(t, envoy.IsConnected, "envoy should be connected") + + m := NewRequestManager(envoy) + t.Cleanup(func() { m.Close() }) + + filters := [][]byte{[]byte(`{}`)} + eventData := []byte(`{"id":"abc"}`) + + go func() { + // wait for the REQ to arrive, extract the sub ID + reqBytes := <-p.sent + subID, _, err := envelope.FindReq(reqBytes) + if err != nil { + t.Errorf("FindReq: %v", err) + return + } + // inject three EVENTs then EOSE + for range 3 { + raw := envelope.EncloseSubscriptionEvent(subID, eventData) + p.receive([]byte(raw)) + } + p.receive(envelope.EncloseEOSE(subID)) + }() + + events, closed := m.Query(filters, TestTimeout) + + assert.Len(t, events, 3) + assert.Nil(t, closed) + + // CLOSE envelope should have been sent by the session + var closeEnv []byte + select { + case closeEnv = <-p.sent: + case <-time.After(TestTimeout): + t.Fatal("timed out waiting for CLOSE envelope") + } + closeLabel, _ := envelope.GetLabel(closeEnv) + assert.Equal(t, "CLOSE", string(closeLabel)) }) t.Run("returns empty events and closed on relay closed", func(t *testing.T) {