From 72f0793047c474f6e98f7dd1d341e0c9be776015 Mon Sep 17 00:00:00 2001 From: Jay Date: Sun, 19 Apr 2026 08:00:30 -0400 Subject: [PATCH] Wrote runReader tests, runSession body --- initiatorpool/worker.go | 86 +++++++++-- initiatorpool/worker_test.go | 286 ++++++++++++++++++++++++++--------- 2 files changed, 293 insertions(+), 79 deletions(-) diff --git a/initiatorpool/worker.go b/initiatorpool/worker.go index fb58e08..ac5b761 100644 --- a/initiatorpool/worker.go +++ b/initiatorpool/worker.go @@ -54,9 +54,20 @@ func NewWorker( return w, nil } +func (w *Worker) Start( + ctx WorkerContext, + wg *sync.WaitGroup, +) { +} + +func (w *Worker) Stop() { + w.cancel() +} + func (w *Worker) Send(data []byte) error { conn := w.conn.Load() if conn == nil { + // connection not established by session return NewWorkerError(w.id, ErrConnectionUnavailable) } @@ -74,16 +85,6 @@ func (w *Worker) Send(data []byte) error { return nil } -func (w *Worker) Start( - ctx WorkerContext, - wg *sync.WaitGroup, -) { -} - -func (w *Worker) Stop() { - w.cancel() -} - func (w *Worker) runSession( ctx context.Context, wctx WorkerContext, @@ -92,9 +93,72 @@ func (w *Worker) runSession( dial chan<- struct{}, keepalive <-chan struct{}, - outbound <-chan []byte, newConn <-chan *transport.Connection, ) { + for { + // request new connection + select { + case dial <- struct{}{}: + default: + } + + // obtain new connection + var conn *transport.Connection + preConn: + for { + select { + case <-ctx.Done(): + return + case <-keepalive: + select { + case dial <- struct{}{}: + default: + } + case conn = <-newConn: + break preConn + } + } + + // set up new connection + w.conn.Store(conn) + wctx.Events <- PoolEvent{ID: w.id, Kind: EventConnected} + + // set up session + sessionDone := make(chan struct{}) + var once sync.Once + onStop := func() { + once.Do(func() { close(sessionDone) }) + } + + // start session + var wg sync.WaitGroup + wg.Add(2) + go func() { + defer wg.Done() + w.runReader(conn, messages, sessionDone, onStop) + }() + go func() { + defer wg.Done() + w.runStopMonitor(ctx, conn, keepalive, sessionDone, onStop) + }() + + // complete session + wg.Wait() + + // tear down connection + w.conn.Store(nil) + wctx.Events <- PoolEvent{ID: w.id, Kind: EventDisconnected} + + // exit if worker is shutting down + select { + case <-ctx.Done(): + return + default: + } + + // refresh session + } + } func (w *Worker) runReader( diff --git a/initiatorpool/worker_test.go b/initiatorpool/worker_test.go index 829a734..139aa14 100644 --- a/initiatorpool/worker_test.go +++ b/initiatorpool/worker_test.go @@ -6,14 +6,231 @@ import ( "git.wisehodl.dev/jay/go-honeybee/honeybeetest" "git.wisehodl.dev/jay/go-honeybee/transport" "git.wisehodl.dev/jay/go-honeybee/types" + "github.com/gorilla/websocket" "github.com/stretchr/testify/assert" + "io" "net/http" - "sync" "sync/atomic" "testing" "time" ) +func TestRunSession(t *testing.T) { + +} + +func TestRunReader(t *testing.T) { + t.Run("message arrives with correct data and non-zero receivedAt", func(t *testing.T) { + conn, _, incomingData, _ := setupWorkerTestConnection(t) + defer conn.Close() + + messages := make(chan receivedMessage, 1) + heartbeat := make(chan struct{}) + sessionDone := make(chan struct{}) + onStop := func() {} + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + w := &Worker{ + ctx: ctx, + cancel: cancel, + id: "wss://test", + heartbeat: heartbeat, + } + go func() { + for range heartbeat { + } + }() + go w.runReader(conn, messages, sessionDone, onStop) + + before := time.Now() + incomingData <- honeybeetest.MockIncomingData{ + MsgType: websocket.TextMessage, + Data: []byte("hello"), + } + + assert.Eventually(t, func() bool { + select { + case msg := <-messages: + return string(msg.data) == "hello" && msg.receivedAt.After(before) + default: + return false + } + }, honeybeetest.TestTimeout, honeybeetest.TestTick) + }) + + t.Run("heartbeat receives one signal per message", func(t *testing.T) { + conn, _, incomingData, _ := setupWorkerTestConnection(t) + defer conn.Close() + + messages := make(chan receivedMessage, 10) + heartbeat := make(chan struct{}) + sessionDone := make(chan struct{}) + onStop := func() {} + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + w := &Worker{ + ctx: ctx, + cancel: cancel, + id: "wss://test", + heartbeat: heartbeat, + } + + received := atomic.Int32{} + go func() { + for range heartbeat { + received.Add(1) + } + }() + go func() { + for range messages { + } + }() + go w.runReader(conn, messages, sessionDone, onStop) + + const count = 3 + for i := 0; i < count; i++ { + incomingData <- honeybeetest.MockIncomingData{ + MsgType: websocket.TextMessage, + Data: []byte(fmt.Sprintf("msg-%d", i)), + } + } + + assert.Eventually(t, func() bool { + return received.Load() == count + }, honeybeetest.TestTimeout, honeybeetest.TestTick) + }) + + t.Run("incoming channel close calls conn.Close and onStop", func(t *testing.T) { + conn, _, incomingData, _ := setupWorkerTestConnection(t) + + messages := make(chan receivedMessage, 1) + heartbeat := make(chan struct{}) + sessionDone := make(chan struct{}) + onStopCalled := atomic.Bool{} + onStop := func() { onStopCalled.Store(true) } + ctx := context.Background() + + w := &Worker{ + ctx: ctx, + id: "wss://test", + heartbeat: heartbeat, + } + go func() { + for range heartbeat { + } + }() + go func() { + for range messages { + } + }() + go w.runReader(conn, messages, sessionDone, onStop) + + // simulate remote close + incomingData <- honeybeetest.MockIncomingData{Err: io.EOF} + + assert.Eventually(t, func() bool { + return connClosed(conn) + }, honeybeetest.TestTimeout, honeybeetest.TestTick) + + assert.True(t, onStopCalled.Load()) + }) + + t.Run("sessionDone close calls conn.Close and onStop", func(t *testing.T) { + conn, _, _, _ := setupWorkerTestConnection(t) + + messages := make(chan receivedMessage, 1) + heartbeat := make(chan struct{}) + sessionDone := make(chan struct{}) + onStopCalled := atomic.Bool{} + onStop := func() { onStopCalled.Store(true) } + ctx := context.Background() + + w := &Worker{ + ctx: ctx, + id: "wss://test", + heartbeat: heartbeat, + } + go w.runReader(conn, messages, sessionDone, onStop) + + close(sessionDone) + + assert.Eventually(t, func() bool { + return connClosed(conn) + }, honeybeetest.TestTimeout, honeybeetest.TestTick) + + assert.True(t, onStopCalled.Load()) + }) +} + +func TestRunStopMonitor(t *testing.T) { + t.Run("keepalive signal calls conn.Close and onStop", func(t *testing.T) { + conn, _, _, _ := setupWorkerTestConnection(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + keepalive := make(chan struct{}, 1) + sessionDone := make(chan struct{}) + onStopCalled := atomic.Bool{} + onStop := func() { onStopCalled.Store(true) } + + w := &Worker{id: "wss://test"} + go w.runStopMonitor(ctx, conn, keepalive, sessionDone, onStop) + + keepalive <- struct{}{} + + assert.Eventually(t, func() bool { + return connClosed(conn) + }, honeybeetest.TestTimeout, honeybeetest.TestTick) + + assert.True(t, onStopCalled.Load()) + }) + + t.Run("ctx.Done calls conn.Close and onStop", func(t *testing.T) { + conn, _, _, _ := setupWorkerTestConnection(t) + ctx, cancel := context.WithCancel(context.Background()) + + keepalive := make(chan struct{}) + sessionDone := make(chan struct{}) + onStopCalled := atomic.Bool{} + onStop := func() { onStopCalled.Store(true) } + + w := &Worker{id: "wss://test"} + go w.runStopMonitor(ctx, conn, keepalive, sessionDone, onStop) + + cancel() + + assert.Eventually(t, func() bool { + return connClosed(conn) + }, honeybeetest.TestTimeout, honeybeetest.TestTick) + + assert.True(t, onStopCalled.Load()) + }) + + t.Run("sessionDone close calls conn.Close and onStop", func(t *testing.T) { + conn, _, _, _ := setupWorkerTestConnection(t) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + keepalive := make(chan struct{}) + sessionDone := make(chan struct{}) + onStopCalled := atomic.Bool{} + onStop := func() { onStopCalled.Store(true) } + + w := &Worker{id: "wss://test"} + go w.runStopMonitor(ctx, conn, keepalive, sessionDone, onStop) + + close(sessionDone) + + assert.Eventually(t, func() bool { + return connClosed(conn) + }, honeybeetest.TestTimeout, honeybeetest.TestTick) + + assert.True(t, onStopCalled.Load()) + }) +} + func TestRunForwarder(t *testing.T) { t.Run("message passes through to inbox", func(t *testing.T) { messages := make(chan receivedMessage, 1) @@ -180,73 +397,6 @@ func TestRunKeepalive(t *testing.T) { }) } -func TestRunStopMonitor(t *testing.T) { - t.Run("keepalive signal calls conn.Close and onStop", func(t *testing.T) { - conn, _, _, _ := setupWorkerTestConnection(t) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - keepalive := make(chan struct{}, 1) - sessionDone := make(chan struct{}) - onStopCalled := atomic.Bool{} - onStop := func() { onStopCalled.Store(true) } - - w := &Worker{id: "wss://test"} - go w.runStopMonitor(ctx, conn, keepalive, sessionDone, onStop) - - keepalive <- struct{}{} - - assert.Eventually(t, func() bool { - return connClosed(conn) - }, honeybeetest.TestTimeout, honeybeetest.TestTick) - - assert.True(t, onStopCalled.Load()) - }) - - t.Run("ctx.Done calls conn.Close and onStop", func(t *testing.T) { - conn, _, _, _ := setupWorkerTestConnection(t) - ctx, cancel := context.WithCancel(context.Background()) - - keepalive := make(chan struct{}) - sessionDone := make(chan struct{}) - onStopCalled := atomic.Bool{} - onStop := func() { onStopCalled.Store(true) } - - w := &Worker{id: "wss://test"} - go w.runStopMonitor(ctx, conn, keepalive, sessionDone, onStop) - - cancel() - - assert.Eventually(t, func() bool { - return connClosed(conn) - }, honeybeetest.TestTimeout, honeybeetest.TestTick) - - assert.True(t, onStopCalled.Load()) - }) - - t.Run("sessionDone close calls conn.Close and onStop", func(t *testing.T) { - conn, _, _, _ := setupWorkerTestConnection(t) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - keepalive := make(chan struct{}) - sessionDone := make(chan struct{}) - onStopCalled := atomic.Bool{} - onStop := func() { onStopCalled.Store(true) } - - w := &Worker{id: "wss://test"} - go w.runStopMonitor(ctx, conn, keepalive, sessionDone, onStop) - - close(sessionDone) - - assert.Eventually(t, func() bool { - return connClosed(conn) - }, honeybeetest.TestTimeout, honeybeetest.TestTick) - - assert.True(t, onStopCalled.Load()) - }) -} - func TestRunDialer(t *testing.T) { t.Run("successful dial delivers connection to newConn", func(t *testing.T) { w := &Worker{id: "wss://test"}