From a6922182d6ff3c91a849df5173b57cafa72bdc41 Mon Sep 17 00:00:00 2001 From: Jay Date: Sun, 17 May 2026 11:43:11 -0400 Subject: [PATCH] session: ignores eose if stream; unify send and message loop into single for/select --- request.go | 25 ++++++++++++------------ request_test.go | 52 +++++++++++++++++++++++++++++++++++++++---------- 2 files changed, 55 insertions(+), 22 deletions(-) diff --git a/request.go b/request.go index 84d5b0d..86d4066 100644 --- a/request.go +++ b/request.go @@ -223,21 +223,22 @@ func (s *session) run() { sent := make(chan error, 1) go func() { sent <- s.send(s.req) }() - select { - case err := <-sent: - if err != nil { - s.terminate(termSendFailed) + for { + select { + case err := <-sent: + if err != nil { + s.terminate(termSendFailed) + return + } + case <-s.done: + s.terminate(termExternal) return + case <-s.ctx.Done(): + s.terminate(termExternal) + return + case <-s.eose: } - case <-s.done: - s.terminate(termExternal) - return - case <-s.ctx.Done(): - s.terminate(termExternal) - return } - - // TODO: main message loop (eose, closed, done, ctx) -- deferred to later tests } func (s *session) Close() { diff --git a/request_test.go b/request_test.go index e5bb555..a8fbcac 100644 --- a/request_test.go +++ b/request_test.go @@ -12,6 +12,7 @@ import ( // Helpers type mockSessionHarness struct { + ctx context.Context id string filters [][]byte req []byte @@ -25,6 +26,7 @@ type mockSessionHarness struct { } func newMockSessionHarness() *mockSessionHarness { + ctx := component.MustNew(context.Background(), "prism", "test") filters := [][]byte{[]byte(`{}`)} id := "TESTREQ" sent := make(chan []byte, 2) @@ -36,6 +38,7 @@ func newMockSessionHarness() *mockSessionHarness { terminate := func(r terminateReason) { terminatedWith <- r } return &mockSessionHarness{ + ctx: ctx, id: id, filters: filters, req: envelope.EncloseReq(id, filters), @@ -57,10 +60,8 @@ func newMockSessionHarness() *mockSessionHarness { func TestRequestManager_Session(t *testing.T) { t.Run("sends req on start", func(t *testing.T) { h := newMockSessionHarness() - - ctx := component.MustNew(context.Background(), "prism", "test") s := newSession( - ctx, h.id, h.req, h.eose, h.closed, h.done, + h.ctx, h.id, h.req, h.eose, h.closed, h.done, h.send, h.terminate, false, nil) go s.run() @@ -79,12 +80,10 @@ func TestRequestManager_Session(t *testing.T) { t.Run("terminates on failed req send", func(t *testing.T) { h := newMockSessionHarness() - send := func([]byte) error { return fmt.Errorf("send failed") } - ctx := component.MustNew(context.Background(), "prism", "test") s := newSession( - ctx, h.id, h.req, h.eose, h.closed, h.done, + h.ctx, h.id, h.req, h.eose, h.closed, h.done, send, h.terminate, false, nil) go s.run() @@ -99,10 +98,43 @@ func TestRequestManager_Session(t *testing.T) { }) t.Run("ignores eose if stream", func(t *testing.T) { - // construct a session with closeOnEOSE = false - // send a value into the eose channel - // assert terminate is never called - // assert a subsequent event still causes the session to run normally + h := newMockSessionHarness() + s := newSession( + h.ctx, h.id, h.req, h.eose, h.closed, h.done, + h.send, h.terminate, false, nil) + go s.run() + + // wait for initial REQ send before proceeding + Eventually(t, func() bool { + select { + case <-h.sent: + return true + default: + return false + } + }, "expected initial send") + + h.eose <- struct{}{} + + Never(t, func() bool { + select { + case <-h.terminatedWith: + return true + default: + return false + } + }, "terminate should not be called on eose for stream") + + // session is still running; done closes it cleanly + close(h.done) + Eventually(t, func() bool { + select { + case r := <-h.terminatedWith: + return r == termExternal + default: + return false + } + }, "expected termExternal after done closed") }) t.Run("sends close on eose if query", func(t *testing.T) {