From a05e20ec6ae02c58f1e13c2b150486ddd08f0edd Mon Sep 17 00:00:00 2001 From: Jay Date: Sun, 17 May 2026 21:10:18 -0400 Subject: [PATCH] reconnect: test registrations survive, respawn, and resume events --- request_test.go | 193 ++++++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 178 insertions(+), 15 deletions(-) diff --git a/request_test.go b/request_test.go index d07a789..03f4320 100644 --- a/request_test.go +++ b/request_test.go @@ -456,26 +456,189 @@ func TestRequestManager_Reconnect(t *testing.T) { }) t.Run("registrations survive disconnect", func(t *testing.T) { - // connect, open two streams, hold both events and closed channels - // send a disconnect event - // after sessions terminate, assert both registrations remain in reqs - // assert both events channels are still open - // assert both closed channels are still open + p, envoy := newMockEnvoy(t) + p.connect() + Eventually(t, envoy.IsConnected, "envoy should be connected") + + m := NewRequestManager(envoy) + t.Cleanup(func() { m.Close() }) + filters := [][]byte{[]byte(`{}`)} + idA, eventsA, closedA := m.Stream(filters) + idB, eventsB, closedB := m.Stream(filters) + + for range 2 { + Eventually(t, func() bool { + select { + case <-p.sent: + return true + default: + return false + } + }, "expected REQ send") + } + + p.disconnect() + + Eventually(t, func() bool { + m.mu.RLock() + defer m.mu.RUnlock() + reqA, okA := m.reqs[idA] + reqB, okB := m.reqs[idB] + return okA && okB && !reqA.active && !reqB.active + }, "both requests should be inactive after disconnect") + + m.mu.RLock() + _, okA := m.reqs[idA] + _, okB := m.reqs[idB] + m.mu.RUnlock() + assert.True(t, okA, "registration A should still exist after disconnect") + assert.True(t, okB, "registration B should still exist after disconnect") + + Never(t, func() bool { + select { + case _, ok := <-eventsA: + return !ok + default: + return false + } + }, "eventsA should remain open after disconnect") + + Never(t, func() bool { + select { + case _, ok := <-eventsB: + return !ok + default: + return false + } + }, "eventsB should remain open after disconnect") + + Never(t, func() bool { + select { + case _, ok := <-closedA: + return !ok + default: + return false + } + }, "closedA should remain open after disconnect") + + Never(t, func() bool { + select { + case _, ok := <-closedB: + return !ok + default: + return false + } + }, "closedB should remain open after disconnect") }) - t.Run("sessions respawn and resend req on reconnect", func(t *testing.T) { - // connect, open two streams - // disconnect, wait for sessions to terminate - // reconnect (send connect event) - // assert mock send is called again for each sub id (two new REQ envelopes) + t.Run("requests respawn and resend req on reconnect", func(t *testing.T) { + p, envoy := newMockEnvoy(t) + p.connect() + Eventually(t, envoy.IsConnected, "envoy should be connected") + + m := NewRequestManager(envoy) + t.Cleanup(func() { m.Close() }) + filters := [][]byte{[]byte(`{}`)} + idA, _, _ := m.Stream(filters) + idB, _, _ := m.Stream(filters) + + for range 2 { + Eventually(t, func() bool { + select { + case <-p.sent: + return true + default: + return false + } + }, "expected initial REQ send") + } + + p.disconnect() + Eventually(t, func() bool { + m.mu.RLock() + defer m.mu.RUnlock() + reqA, okA := m.reqs[idA] + reqB, okB := m.reqs[idB] + return okA && okB && !reqA.active && !reqB.active + }, "both requests should be inactive after disconnect") + + p.connect() + + var sentIDs []string + for range 2 { + Eventually(t, func() bool { + select { + case raw := <-p.sent: + subID, _, err := envelope.FindReq(raw) + if err == nil { + sentIDs = append(sentIDs, subID) + } + return err == nil + default: + return false + } + }, "expected REQ resend after reconnect") + } + + assert.ElementsMatch(t, []string{idA, idB}, sentIDs) + + Eventually(t, func() bool { + m.mu.RLock() + defer m.mu.RUnlock() + reqA, okA := m.reqs[idA] + reqB, okB := m.reqs[idB] + return okA && okB && reqA.active && reqB.active + }, "both requests should be active after reconnect") }) t.Run("events resume on same channel after reconnect", func(t *testing.T) { - // connect, open a stream, hold the events channel - // disconnect, reconnect - // inject an EVENT for the sub id - // assert the event appears on the original events channel - // the caller's reference to the channel is unaffected by the reconnect cycle + p, envoy := newMockEnvoy(t) + p.connect() + Eventually(t, envoy.IsConnected, "envoy should be connected") + + m := NewRequestManager(envoy) + t.Cleanup(func() { m.Close() }) + filters := [][]byte{[]byte(`{}`)} + id, events, _ := m.Stream(filters) + + Eventually(t, func() bool { + select { + case <-p.sent: + return true + default: + return false + } + }, "expected initial REQ send") + + p.disconnect() + Eventually(t, func() bool { + m.mu.RLock() + defer m.mu.RUnlock() + req, ok := m.reqs[id] + return ok && !req.active + }, "request should be inactive after disconnect") + + p.connect() + Eventually(t, func() bool { + select { + case <-p.sent: + return true + default: + return false + } + }, "expected REQ resend after reconnect") + + eventData := []byte(`{"id":"z"}`) + p.receive(envelope.EncloseSubscriptionEvent(id, eventData)) + + Eventually(t, func() bool { + select { + case ev := <-events: + return assert.Equal(t, eventData, ev.Data) + default: + return false + } + }, "event should arrive on original channel after reconnect") }) }