From de3c997279b773c891cf594a6f9300723cd01532 Mon Sep 17 00:00:00 2001 From: Jay Date: Mon, 20 Apr 2026 09:26:19 -0400 Subject: [PATCH] Wrote worker tests. --- responderpool/pool.go | 1 - responderpool/worker.go | 7 +- responderpool/worker_forwarder_test.go | 102 +++++++++++++ responderpool/worker_reader_test.go | 189 +++++++++++++++++++++++++ responderpool/worker_watchdog_test.go | 103 ++++++++++++++ 5 files changed, 399 insertions(+), 3 deletions(-) diff --git a/responderpool/pool.go b/responderpool/pool.go index 2201002..72faedc 100644 --- a/responderpool/pool.go +++ b/responderpool/pool.go @@ -9,7 +9,6 @@ type PoolEventKind string const ( EventPeerDisconnected PoolEventKind = "disconnected" EventPeerDropped PoolEventKind = "dropped" - EventPeerInactive PoolEventKind = "inactive" EventPeerEvicted PoolEventKind = "evicted" ) diff --git a/responderpool/worker.go b/responderpool/worker.go index a6d8990..29cc450 100644 --- a/responderpool/worker.go +++ b/responderpool/worker.go @@ -33,6 +33,9 @@ func RunReader( // by default, the peer dropped unexpectedly kind := EventPeerDropped select { + // the peer-side error is sent before the connection is closed, + // so a non-blocking call here is correct + // if an error is not sent, then assume the default event kind case err := <-conn.Errors(): if errors.Is(err, transport.ErrPeerClosedClean) { kind = EventPeerDisconnected @@ -101,7 +104,7 @@ func RunForwarder( func RunWatchdog( ctx context.Context, - onTimeout onEventFunc, + onInactive func(), heartbeat <-chan struct{}, timeout time.Duration, ) { @@ -133,7 +136,7 @@ func RunWatchdog( // timer completed case <-timer.C: // signal peer is inactive - onTimeout(EventPeerInactive) + onInactive() return } } diff --git a/responderpool/worker_forwarder_test.go b/responderpool/worker_forwarder_test.go index 1d0ef5c..c0459f8 100644 --- a/responderpool/worker_forwarder_test.go +++ b/responderpool/worker_forwarder_test.go @@ -1 +1,103 @@ package responderpool + +import ( + "context" + "git.wisehodl.dev/jay/go-honeybee/honeybeetest" + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +func TestRunForwarder(t *testing.T) { + t.Run("message passes through to inbox", func(t *testing.T) { + id := "wss://test" + messages := make(chan ReceivedMessage, 1) + inbox := make(chan InboxMessage, 1) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go RunForwarder(id, ctx, messages, inbox, 0) + + messages <- ReceivedMessage{data: []byte("hello"), receivedAt: time.Now()} + + honeybeetest.Eventually(t, func() bool { + select { + case msg := <-inbox: + return string(msg.Data) == "hello" && msg.ID == "wss://test" + default: + return false + } + }, "expected message") + }) + + t.Run("oldest message dropped when queue is full", func(t *testing.T) { + id := "wss://test" + messages := make(chan ReceivedMessage, 1) + inbox := make(chan InboxMessage, 1) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + gate := make(chan struct{}) + gatedInbox := make(chan InboxMessage) + + // gate the inbox from receiving messages until the gate is opened + go func() { + <-gate + for msg := range gatedInbox { + inbox <- msg + } + }() + + go RunForwarder(id, ctx, messages, gatedInbox, 2) + + // send three messages while the gated inbox is blocked + messages <- ReceivedMessage{data: []byte("first"), receivedAt: time.Now()} + messages <- ReceivedMessage{data: []byte("second"), receivedAt: time.Now()} + messages <- ReceivedMessage{data: []byte("third"), receivedAt: time.Now()} + + // allow time for the first message to be dropped + time.Sleep(20 * time.Millisecond) + + // close the gate, draining messages into the inbox + close(gate) + + // receive messages from the inbox + var received []string + honeybeetest.Eventually(t, func() bool { + select { + case msg := <-inbox: + received = append(received, string(msg.Data)) + default: + } + return len(received) == 2 + }, "expected messages") + + // first message was dropped + assert.Equal(t, []string{"second", "third"}, received) + + }) + + t.Run("exits on context cancellation", func(t *testing.T) { + id := "wss://test" + messages := make(chan ReceivedMessage, 1) + inbox := make(chan InboxMessage, 1) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + done := make(chan struct{}) + go func() { + RunForwarder(id, ctx, messages, inbox, 0) + close(done) + }() + + cancel() + honeybeetest.Eventually(t, func() bool { + select { + case <-done: + return true + default: + return false + } + }, "expected done signal") + }) +} diff --git a/responderpool/worker_reader_test.go b/responderpool/worker_reader_test.go index 1d0ef5c..3e5cc1a 100644 --- a/responderpool/worker_reader_test.go +++ b/responderpool/worker_reader_test.go @@ -1 +1,190 @@ package responderpool + +import ( + "context" + "git.wisehodl.dev/jay/go-honeybee/honeybeetest" + "github.com/gorilla/websocket" + "github.com/stretchr/testify/assert" + "io" + "sync/atomic" + "testing" + "time" +) + +func TestRunReader(t *testing.T) { + t.Run("message forwarded with correct data and non-zero receivedAt", func(t *testing.T) { + conn, _, incoming, _ := setupReaderTestConnection(t) + defer conn.Close() + + messages := make(chan ReceivedMessage, 1) + heartbeat := make(chan struct{}, 1) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go RunReader(ctx, func(PoolEventKind) {}, conn, messages, heartbeat) + + before := time.Now() + incoming <- honeybeetest.MockIncomingData{MsgType: websocket.TextMessage, Data: []byte("hello")} + + honeybeetest.Eventually(t, func() bool { + select { + case msg := <-messages: + return string(msg.data) == "hello" && msg.receivedAt.After(before) + default: + return false + } + }, "expected message") + }) + + t.Run("heartbeat sent per forwarded message", func(t *testing.T) { + conn, _, incoming, _ := setupReaderTestConnection(t) + defer conn.Close() + + messages := make(chan ReceivedMessage, 10) + heartbeat := make(chan struct{}, 10) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + count := atomic.Int32{} + go func() { + for range heartbeat { + count.Add(1) + } + }() + go func() { + for range messages { + } + }() + go RunReader(ctx, func(PoolEventKind) {}, conn, messages, heartbeat) + + const n = 3 + for i := 0; i < n; i++ { + incoming <- honeybeetest.MockIncomingData{MsgType: websocket.TextMessage, Data: []byte("msg")} + } + + honeybeetest.Eventually(t, func() bool { + return count.Load() == n + }, "expected heartbeats") + }) + + t.Run("clean close calls onPeerClose with EventPeerDisconnected", func(t *testing.T) { + conn, mock, _, _ := setupReaderTestConnection(t) + mock.ReadMessageFunc = func() (int, []byte, error) { + return 0, nil, &websocket.CloseError{Code: websocket.CloseNormalClosure} + } + + messages := make(chan ReceivedMessage, 1) + heartbeat := make(chan struct{}, 1) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var gotKind PoolEventKind + done := make(chan struct{}) + go RunReader(ctx, func(kind PoolEventKind) { + gotKind = kind + close(done) + }, conn, messages, heartbeat) + + honeybeetest.Eventually(t, func() bool { + select { + case <-done: + return true + default: + return false + } + }, "expected onPeerClose") + + assert.Equal(t, EventPeerDisconnected, gotKind) + }) + + t.Run("unexpected close calls onPeerClose with EventPeerDropped", func(t *testing.T) { + conn, mock, _, _ := setupReaderTestConnection(t) + mock.ReadMessageFunc = func() (int, []byte, error) { + return 0, nil, &websocket.CloseError{Code: websocket.CloseProtocolError} + } + + messages := make(chan ReceivedMessage, 1) + heartbeat := make(chan struct{}, 1) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var gotKind PoolEventKind + done := make(chan struct{}) + go RunReader(ctx, func(kind PoolEventKind) { + gotKind = kind + close(done) + }, conn, messages, heartbeat) + + honeybeetest.Eventually(t, func() bool { + select { + case <-done: + return true + default: + return false + } + }, "expected onPeerClose") + + assert.Equal(t, EventPeerDropped, gotKind) + }) + + t.Run("read error calls onPeerClose with EventPeerDropped", func(t *testing.T) { + conn, mock, _, _ := setupReaderTestConnection(t) + mock.ReadMessageFunc = func() (int, []byte, error) { + return 0, nil, io.EOF + } + + messages := make(chan ReceivedMessage, 1) + heartbeat := make(chan struct{}, 1) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var gotKind PoolEventKind + done := make(chan struct{}) + go RunReader(ctx, func(kind PoolEventKind) { + gotKind = kind + close(done) + }, conn, messages, heartbeat) + + honeybeetest.Eventually(t, func() bool { + select { + case <-done: + return true + default: + return false + } + }, "expected onPeerClose") + + assert.Equal(t, EventPeerDropped, gotKind) + }) + + t.Run("ctx.Done exits without calling onPeerClose", func(t *testing.T) { + conn, _, _, _ := setupReaderTestConnection(t) + defer conn.Close() + + messages := make(chan ReceivedMessage, 1) + heartbeat := make(chan struct{}, 1) + ctx, cancel := context.WithCancel(context.Background()) + + called := atomic.Bool{} + done := make(chan struct{}) + go func() { + RunReader(ctx, func(PoolEventKind) { + called.Store(true) + }, conn, messages, heartbeat) + close(done) + }() + + cancel() + + honeybeetest.Eventually(t, func() bool { + select { + case <-done: + return true + default: + return false + } + }, "expected RunReader to exit") + + assert.False(t, called.Load()) + }) +} diff --git a/responderpool/worker_watchdog_test.go b/responderpool/worker_watchdog_test.go index 1d0ef5c..a612607 100644 --- a/responderpool/worker_watchdog_test.go +++ b/responderpool/worker_watchdog_test.go @@ -1 +1,104 @@ package responderpool + +import ( + "context" + "git.wisehodl.dev/jay/go-honeybee/honeybeetest" + "github.com/stretchr/testify/assert" + "sync/atomic" + "testing" + "time" +) + +func TestRunWatchdog(t *testing.T) { + t.Run("heartbeat resets timer, onInactive not called", func(t *testing.T) { + heartbeat := make(chan struct{}) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + called := atomic.Bool{} + go RunWatchdog(ctx, func() { called.Store(true) }, heartbeat, 200*time.Millisecond) + + for i := 0; i < 5; i++ { + time.Sleep(20 * time.Millisecond) + heartbeat <- struct{}{} + } + + honeybeetest.Never(t, func() bool { + return called.Load() + }, "unexpected onInactive call") + }) + + t.Run("timeout fires onInactive exactly once", func(t *testing.T) { + heartbeat := make(chan struct{}) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + count := atomic.Int32{} + done := make(chan struct{}) + go RunWatchdog(ctx, func() { + count.Add(1) + close(done) + }, heartbeat, 20*time.Millisecond) + + honeybeetest.Eventually(t, func() bool { + select { + case <-done: + return true + default: + return false + } + }, "expected onInactive") + + assert.Equal(t, int32(1), count.Load()) + }) + + t.Run("ctx.Done exits without calling onInactive", func(t *testing.T) { + heartbeat := make(chan struct{}) + ctx, cancel := context.WithCancel(context.Background()) + + called := atomic.Bool{} + done := make(chan struct{}) + go func() { + RunWatchdog(ctx, func() { called.Store(true) }, heartbeat, 20*time.Second) + close(done) + }() + + cancel() + + honeybeetest.Eventually(t, func() bool { + select { + case <-done: + return true + default: + return false + } + }, "expected RunWatchdog to exit") + + assert.False(t, called.Load()) + }) + + t.Run("zero timeout exits on ctx.Done without firing", func(t *testing.T) { + heartbeat := make(chan struct{}) + ctx, cancel := context.WithCancel(context.Background()) + + called := atomic.Bool{} + done := make(chan struct{}) + go func() { + RunWatchdog(ctx, func() { called.Store(true) }, heartbeat, 0) + close(done) + }() + + cancel() + + honeybeetest.Eventually(t, func() bool { + select { + case <-done: + return true + default: + return false + } + }, "expected RunWatchdog to exit") + + assert.False(t, called.Load()) + }) +}