From 57e66e0281f1e81c7bbad3c0088df9bbd6e2e3ee Mon Sep 17 00:00:00 2001 From: Jay Date: Sun, 17 May 2026 16:41:32 -0400 Subject: [PATCH] query: test returns empty events and closed on relay closed --- request.go | 6 +++--- request_test.go | 33 +++++++++++++++++++++++++++------ 2 files changed, 30 insertions(+), 9 deletions(-) diff --git a/request.go b/request.go index 84bb4e3..871b282 100644 --- a/request.go +++ b/request.go @@ -85,7 +85,7 @@ type terminateReason int const ( termSendFailed terminateReason = iota - termCloseSent + termClosedOnEOSE termReceivedClosed termDone termCancelled @@ -290,7 +290,7 @@ func (m *RequestManager) spawnSession(req *request, query bool) { delete(m.sessions, req.id) m.mu.Unlock() m.sessionWg.Done() - if r == termReceivedClosed || r == termCloseSent { + if r == termReceivedClosed || r == termClosedOnEOSE { req.deregisterOnce.Do(func() { close(req.buffer) close(req.closed) @@ -461,7 +461,7 @@ func (s *session) run() { case <-s.eose: if s.closeOnEOSE { s.send(envelope.EncloseClose(s.id)) - s.terminate(termCloseSent) + s.terminate(termClosedOnEOSE) return } case <-s.closed: diff --git a/request_test.go b/request_test.go index 2bdc3a8..3d0f674 100644 --- a/request_test.go +++ b/request_test.go @@ -114,7 +114,7 @@ func TestRequestManager_Session(t *testing.T) { Eventually(t, func() bool { select { case r := <-h.terminatedWith: - return r == termCloseSent + return r == termClosedOnEOSE default: return false } @@ -585,11 +585,32 @@ func TestRequestManager_Query(t *testing.T) { }) t.Run("returns empty events and closed on relay closed", func(t *testing.T) { - // connect the envoy - // in a goroutine: inject a CLOSED envelope before any EVENT - // call Query - // assert the returned slice is empty - // assert closed is non-nil and contains the relay's reason string + p, envoy := newMockEnvoy(t) + p.connect() + Eventually(t, envoy.IsConnected, "envoy should be connected") + + m := NewRequestManager(envoy) + t.Cleanup(func() { m.Close() }) + + filters := [][]byte{[]byte(`{}`)} + const reason = "rate-limited: slow down" + + go func() { + reqBytes := <-p.sent + subID, _, err := envelope.FindReq(reqBytes) + if err != nil { + t.Errorf("FindReq: %v", err) + return + } + p.receive(envelope.EncloseClosed(subID, reason)) + }() + + events, closed := m.Query(filters, TestTimeout) + + assert.Empty(t, events) + if assert.NotNil(t, closed) { + assert.Equal(t, reason, closed.Data) + } }) t.Run("returns partial events on timeout", func(t *testing.T) {