diff --git a/initiatorpool/helper_test.go b/initiatorpool/helper_test.go index 85bebcd..c359449 100644 --- a/initiatorpool/helper_test.go +++ b/initiatorpool/helper_test.go @@ -17,8 +17,8 @@ func setupWorkerTestConnection(t *testing.T) ( ) { t.Helper() - incomingData = make(chan honeybeetest.MockIncomingData, 100) - outgoingData = make(chan honeybeetest.MockOutgoingData, 100) + incomingData = make(chan honeybeetest.MockIncomingData, 10) + outgoingData = make(chan honeybeetest.MockOutgoingData, 10) mockSocket = honeybeetest.NewMockSocket() mockSocket.CloseFunc = func() error { @@ -28,7 +28,10 @@ func setupWorkerTestConnection(t *testing.T) ( mockSocket.ReadMessageFunc = func() (int, []byte, error) { select { - case data := <-incomingData: + case data, ok := <-incomingData: + if !ok { + return 0, nil, io.EOF + } return data.MsgType, data.Data, data.Err case <-mockSocket.Closed: return 0, nil, io.EOF diff --git a/initiatorpool/worker_test.go b/initiatorpool/worker_test.go index f1ddefb..23993a8 100644 --- a/initiatorpool/worker_test.go +++ b/initiatorpool/worker_test.go @@ -15,6 +15,18 @@ import ( "time" ) +func drainEvent(t *testing.T, events <-chan PoolEvent, kind PoolEventKind) { + t.Helper() + honeybeetest.Eventually(t, func() bool { + select { + case e := <-events: + return e.Kind == kind + default: + return false + } + }, fmt.Sprintf("expected %s event", kind)) +} + func TestRunSessionDial(t *testing.T) { setup := func(t *testing.T) ( w *Worker, @@ -98,6 +110,293 @@ func TestRunSessionDial(t *testing.T) { }) } +func TestRunSessionConnect(t *testing.T) { + setup := func(t *testing.T) ( + w *Worker, + ctx context.Context, + cancel context.CancelFunc, + dial chan struct{}, + keepalive chan struct{}, + newConn chan *transport.Connection, + messages chan receivedMessage, + ) { + t.Helper() + ctx, cancel = context.WithCancel(context.Background()) + w = &Worker{ + ctx: ctx, + cancel: cancel, + id: "wss://test", + config: GetDefaultWorkerConfig(), + heartbeat: make(chan struct{}), + } + dial = make(chan struct{}, 1) + keepalive = make(chan struct{}, 1) + newConn = make(chan *transport.Connection, 1) + messages = make(chan receivedMessage, 256) + return + } + + t.Run("w.conn set after newConn received", func(t *testing.T) { + w, ctx, cancel, dial, keepalive, newConn, messages := setup(t) + wctx := WorkerContext{Events: make(chan PoolEvent, 10)} + defer cancel() + + conn, _, _, _ := setupWorkerTestConnection(t) + go w.runSession(ctx, wctx, messages, dial, keepalive, newConn) + + newConn <- conn + + honeybeetest.Eventually(t, func() bool { + return w.conn.Load() != nil + }, "expected w.conn to be set") + }) + + t.Run("EventConnected emitted", func(t *testing.T) { + w, ctx, cancel, dial, keepalive, newConn, messages := setup(t) + events := make(chan PoolEvent, 10) + wctx := WorkerContext{Events: events} + defer cancel() + + conn, _, _, _ := setupWorkerTestConnection(t) + go w.runSession(ctx, wctx, messages, dial, keepalive, newConn) + + newConn <- conn + + honeybeetest.Eventually(t, func() bool { + select { + case event := <-events: + return event.ID == w.id && event.Kind == EventConnected + default: + return false + } + }, "expected EventConnected") + }) +} + +func TestRunSessionDisconnect(t *testing.T) { + setup := func(t *testing.T) ( + w *Worker, + ctx context.Context, + cancel context.CancelFunc, + dial chan struct{}, + keepalive chan struct{}, + newConn chan *transport.Connection, + messages chan receivedMessage, + conn *transport.Connection, + incomingData chan honeybeetest.MockIncomingData, + ) { + t.Helper() + ctx, cancel = context.WithCancel(context.Background()) + w = &Worker{ + ctx: ctx, + cancel: cancel, + id: "wss://test", + config: GetDefaultWorkerConfig(), + heartbeat: make(chan struct{}), + } + dial = make(chan struct{}, 1) + keepalive = make(chan struct{}, 1) + newConn = make(chan *transport.Connection, 1) + messages = make(chan receivedMessage, 256) + conn, _, incomingData, _ = setupWorkerTestConnection(t) + return + } + + t.Run("EventDisconnected emitted on connection close", func(t *testing.T) { + w, ctx, cancel, dial, keepalive, newConn, messages, conn, incomingData := setup(t) + events := make(chan PoolEvent, 10) + wctx := WorkerContext{Events: events} + defer cancel() + + go w.runSession(ctx, wctx, messages, dial, keepalive, newConn) + newConn <- conn + drainEvent(t, events, EventConnected) + + close(incomingData) + + drainEvent(t, events, EventDisconnected) + }) + + t.Run("w.conn cleared after disconnect", func(t *testing.T) { + w, ctx, cancel, dial, keepalive, newConn, messages, conn, incomingData := setup(t) + events := make(chan PoolEvent, 10) + wctx := WorkerContext{Events: events} + defer cancel() + + go w.runSession(ctx, wctx, messages, dial, keepalive, newConn) + newConn <- conn + drainEvent(t, events, EventConnected) + + close(incomingData) + drainEvent(t, events, EventDisconnected) + + honeybeetest.Eventually(t, func() bool { + return w.conn.Load() == nil + }, "expected w.conn to be cleared") + }) + + t.Run("dial fires again after disconnect", func(t *testing.T) { + w, ctx, cancel, dial, keepalive, newConn, messages, conn, incomingData := setup(t) + events := make(chan PoolEvent, 10) + wctx := WorkerContext{Events: events} + defer cancel() + + go w.runSession(ctx, wctx, messages, dial, keepalive, newConn) + newConn <- conn + drainEvent(t, events, EventConnected) + + // drain the initial dial signal before disconnecting + <-dial + + close(incomingData) + drainEvent(t, events, EventDisconnected) + + honeybeetest.Eventually(t, func() bool { + select { + case <-dial: + return true + default: + return false + } + }, "expected dial signal after disconnect") + }) + + t.Run("second connection cycle emits EventConnected", func(t *testing.T) { + w, ctx, cancel, dial, keepalive, newConn, messages, conn, incomingData := setup(t) + events := make(chan PoolEvent, 10) + wctx := WorkerContext{Events: events} + defer cancel() + + go w.runSession(ctx, wctx, messages, dial, keepalive, newConn) + newConn <- conn + drainEvent(t, events, EventConnected) + + close(incomingData) + drainEvent(t, events, EventDisconnected) + + conn2, _, _, _ := setupWorkerTestConnection(t) + newConn <- conn2 + + drainEvent(t, events, EventConnected) + }) +} + +func TestRunSessionCancellation(t *testing.T) { + setup := func(t *testing.T) ( + w *Worker, + ctx context.Context, + cancel context.CancelFunc, + dial chan struct{}, + keepalive chan struct{}, + newConn chan *transport.Connection, + messages chan receivedMessage, + ) { + t.Helper() + ctx, cancel = context.WithCancel(context.Background()) + w = &Worker{ + ctx: ctx, + cancel: cancel, + id: "wss://test", + config: GetDefaultWorkerConfig(), + heartbeat: make(chan struct{}), + } + dial = make(chan struct{}, 1) + keepalive = make(chan struct{}, 1) + newConn = make(chan *transport.Connection, 1) + messages = make(chan receivedMessage, 256) + return + } + + t.Run("ctx cancelled pre-connection exits without emitting events", func(t *testing.T) { + w, ctx, cancel, dial, keepalive, newConn, messages := setup(t) + events := make(chan PoolEvent, 10) + wctx := WorkerContext{Events: events} + + done := make(chan struct{}) + go func() { + defer close(done) + w.runSession(ctx, wctx, messages, dial, keepalive, newConn) + }() + + cancel() + + honeybeetest.Eventually(t, func() bool { + select { + case <-done: + return true + default: + return false + } + }, "expected runSession to exit") + + honeybeetest.Never(t, func() bool { + select { + case <-events: + return true + default: + return false + } + }, "expected no events emitted") + }) + + t.Run("ctx cancelled post-connection emits EventDisconnected", func(t *testing.T) { + w, ctx, cancel, dial, keepalive, newConn, messages := setup(t) + events := make(chan PoolEvent, 10) + wctx := WorkerContext{Events: events} + + conn, _, _, _ := setupWorkerTestConnection(t) + + done := make(chan struct{}) + go func() { + defer close(done) + w.runSession(ctx, wctx, messages, dial, keepalive, newConn) + }() + + newConn <- conn + + drainEvent(t, events, EventConnected) + + cancel() + + drainEvent(t, events, EventDisconnected) + + honeybeetest.Eventually(t, func() bool { + select { + case <-done: + return true + default: + return false + } + }, "expected runSession to exit") + }) + + t.Run("ctx cancelled post-connection clears w.conn", func(t *testing.T) { + w, ctx, cancel, dial, keepalive, newConn, messages := setup(t) + events := make(chan PoolEvent, 10) + wctx := WorkerContext{Events: events} + + conn, _, _, _ := setupWorkerTestConnection(t) + + done := make(chan struct{}) + go func() { + defer close(done) + w.runSession(ctx, wctx, messages, dial, keepalive, newConn) + }() + + newConn <- conn + + drainEvent(t, events, EventConnected) + + cancel() + + drainEvent(t, events, EventDisconnected) + + honeybeetest.Eventually(t, func() bool { + return w.conn.Load() == nil + }, "expected w.conn to clear") + }) +} + func TestRunReader(t *testing.T) { t.Run("message arrives with correct data and non-zero receivedAt", func(t *testing.T) { conn, _, incomingData, _ := setupWorkerTestConnection(t) diff --git a/transport/connection_test.go b/transport/connection_test.go index b0fb660..8c7f8e6 100644 --- a/transport/connection_test.go +++ b/transport/connection_test.go @@ -505,7 +505,10 @@ func setupTestConnection(t *testing.T, config *ConnectionConfig) ( // Wire ReadMessage to pull from incomingData channel mockSocket.ReadMessageFunc = func() (int, []byte, error) { select { - case data := <-incomingData: + case data, ok := <-incomingData: + if !ok { + return 0, nil, io.EOF + } return data.MsgType, data.Data, data.Err case <-mockSocket.Closed: return 0, nil, io.EOF