Wrote runReader tests, runSession body

This commit is contained in:
Jay
2026-04-19 08:00:30 -04:00
parent 29020369e1
commit 72f0793047
2 changed files with 293 additions and 79 deletions
+75 -11
View File
@@ -54,9 +54,20 @@ func NewWorker(
return w, nil
}
func (w *Worker) Start(
ctx WorkerContext,
wg *sync.WaitGroup,
) {
}
func (w *Worker) Stop() {
w.cancel()
}
func (w *Worker) Send(data []byte) error {
conn := w.conn.Load()
if conn == nil {
// connection not established by session
return NewWorkerError(w.id, ErrConnectionUnavailable)
}
@@ -74,16 +85,6 @@ func (w *Worker) Send(data []byte) error {
return nil
}
func (w *Worker) Start(
ctx WorkerContext,
wg *sync.WaitGroup,
) {
}
func (w *Worker) Stop() {
w.cancel()
}
func (w *Worker) runSession(
ctx context.Context,
wctx WorkerContext,
@@ -92,9 +93,72 @@ func (w *Worker) runSession(
dial chan<- struct{},
keepalive <-chan struct{},
outbound <-chan []byte,
newConn <-chan *transport.Connection,
) {
for {
// request new connection
select {
case dial <- struct{}{}:
default:
}
// obtain new connection
var conn *transport.Connection
preConn:
for {
select {
case <-ctx.Done():
return
case <-keepalive:
select {
case dial <- struct{}{}:
default:
}
case conn = <-newConn:
break preConn
}
}
// set up new connection
w.conn.Store(conn)
wctx.Events <- PoolEvent{ID: w.id, Kind: EventConnected}
// set up session
sessionDone := make(chan struct{})
var once sync.Once
onStop := func() {
once.Do(func() { close(sessionDone) })
}
// start session
var wg sync.WaitGroup
wg.Add(2)
go func() {
defer wg.Done()
w.runReader(conn, messages, sessionDone, onStop)
}()
go func() {
defer wg.Done()
w.runStopMonitor(ctx, conn, keepalive, sessionDone, onStop)
}()
// complete session
wg.Wait()
// tear down connection
w.conn.Store(nil)
wctx.Events <- PoolEvent{ID: w.id, Kind: EventDisconnected}
// exit if worker is shutting down
select {
case <-ctx.Done():
return
default:
}
// refresh session
}
}
func (w *Worker) runReader(
+218 -68
View File
@@ -6,14 +6,231 @@ import (
"git.wisehodl.dev/jay/go-honeybee/honeybeetest"
"git.wisehodl.dev/jay/go-honeybee/transport"
"git.wisehodl.dev/jay/go-honeybee/types"
"github.com/gorilla/websocket"
"github.com/stretchr/testify/assert"
"io"
"net/http"
"sync"
"sync/atomic"
"testing"
"time"
)
func TestRunSession(t *testing.T) {
}
func TestRunReader(t *testing.T) {
t.Run("message arrives with correct data and non-zero receivedAt", func(t *testing.T) {
conn, _, incomingData, _ := setupWorkerTestConnection(t)
defer conn.Close()
messages := make(chan receivedMessage, 1)
heartbeat := make(chan struct{})
sessionDone := make(chan struct{})
onStop := func() {}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
w := &Worker{
ctx: ctx,
cancel: cancel,
id: "wss://test",
heartbeat: heartbeat,
}
go func() {
for range heartbeat {
}
}()
go w.runReader(conn, messages, sessionDone, onStop)
before := time.Now()
incomingData <- honeybeetest.MockIncomingData{
MsgType: websocket.TextMessage,
Data: []byte("hello"),
}
assert.Eventually(t, func() bool {
select {
case msg := <-messages:
return string(msg.data) == "hello" && msg.receivedAt.After(before)
default:
return false
}
}, honeybeetest.TestTimeout, honeybeetest.TestTick)
})
t.Run("heartbeat receives one signal per message", func(t *testing.T) {
conn, _, incomingData, _ := setupWorkerTestConnection(t)
defer conn.Close()
messages := make(chan receivedMessage, 10)
heartbeat := make(chan struct{})
sessionDone := make(chan struct{})
onStop := func() {}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
w := &Worker{
ctx: ctx,
cancel: cancel,
id: "wss://test",
heartbeat: heartbeat,
}
received := atomic.Int32{}
go func() {
for range heartbeat {
received.Add(1)
}
}()
go func() {
for range messages {
}
}()
go w.runReader(conn, messages, sessionDone, onStop)
const count = 3
for i := 0; i < count; i++ {
incomingData <- honeybeetest.MockIncomingData{
MsgType: websocket.TextMessage,
Data: []byte(fmt.Sprintf("msg-%d", i)),
}
}
assert.Eventually(t, func() bool {
return received.Load() == count
}, honeybeetest.TestTimeout, honeybeetest.TestTick)
})
t.Run("incoming channel close calls conn.Close and onStop", func(t *testing.T) {
conn, _, incomingData, _ := setupWorkerTestConnection(t)
messages := make(chan receivedMessage, 1)
heartbeat := make(chan struct{})
sessionDone := make(chan struct{})
onStopCalled := atomic.Bool{}
onStop := func() { onStopCalled.Store(true) }
ctx := context.Background()
w := &Worker{
ctx: ctx,
id: "wss://test",
heartbeat: heartbeat,
}
go func() {
for range heartbeat {
}
}()
go func() {
for range messages {
}
}()
go w.runReader(conn, messages, sessionDone, onStop)
// simulate remote close
incomingData <- honeybeetest.MockIncomingData{Err: io.EOF}
assert.Eventually(t, func() bool {
return connClosed(conn)
}, honeybeetest.TestTimeout, honeybeetest.TestTick)
assert.True(t, onStopCalled.Load())
})
t.Run("sessionDone close calls conn.Close and onStop", func(t *testing.T) {
conn, _, _, _ := setupWorkerTestConnection(t)
messages := make(chan receivedMessage, 1)
heartbeat := make(chan struct{})
sessionDone := make(chan struct{})
onStopCalled := atomic.Bool{}
onStop := func() { onStopCalled.Store(true) }
ctx := context.Background()
w := &Worker{
ctx: ctx,
id: "wss://test",
heartbeat: heartbeat,
}
go w.runReader(conn, messages, sessionDone, onStop)
close(sessionDone)
assert.Eventually(t, func() bool {
return connClosed(conn)
}, honeybeetest.TestTimeout, honeybeetest.TestTick)
assert.True(t, onStopCalled.Load())
})
}
func TestRunStopMonitor(t *testing.T) {
t.Run("keepalive signal calls conn.Close and onStop", func(t *testing.T) {
conn, _, _, _ := setupWorkerTestConnection(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
keepalive := make(chan struct{}, 1)
sessionDone := make(chan struct{})
onStopCalled := atomic.Bool{}
onStop := func() { onStopCalled.Store(true) }
w := &Worker{id: "wss://test"}
go w.runStopMonitor(ctx, conn, keepalive, sessionDone, onStop)
keepalive <- struct{}{}
assert.Eventually(t, func() bool {
return connClosed(conn)
}, honeybeetest.TestTimeout, honeybeetest.TestTick)
assert.True(t, onStopCalled.Load())
})
t.Run("ctx.Done calls conn.Close and onStop", func(t *testing.T) {
conn, _, _, _ := setupWorkerTestConnection(t)
ctx, cancel := context.WithCancel(context.Background())
keepalive := make(chan struct{})
sessionDone := make(chan struct{})
onStopCalled := atomic.Bool{}
onStop := func() { onStopCalled.Store(true) }
w := &Worker{id: "wss://test"}
go w.runStopMonitor(ctx, conn, keepalive, sessionDone, onStop)
cancel()
assert.Eventually(t, func() bool {
return connClosed(conn)
}, honeybeetest.TestTimeout, honeybeetest.TestTick)
assert.True(t, onStopCalled.Load())
})
t.Run("sessionDone close calls conn.Close and onStop", func(t *testing.T) {
conn, _, _, _ := setupWorkerTestConnection(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
keepalive := make(chan struct{})
sessionDone := make(chan struct{})
onStopCalled := atomic.Bool{}
onStop := func() { onStopCalled.Store(true) }
w := &Worker{id: "wss://test"}
go w.runStopMonitor(ctx, conn, keepalive, sessionDone, onStop)
close(sessionDone)
assert.Eventually(t, func() bool {
return connClosed(conn)
}, honeybeetest.TestTimeout, honeybeetest.TestTick)
assert.True(t, onStopCalled.Load())
})
}
func TestRunForwarder(t *testing.T) {
t.Run("message passes through to inbox", func(t *testing.T) {
messages := make(chan receivedMessage, 1)
@@ -180,73 +397,6 @@ func TestRunKeepalive(t *testing.T) {
})
}
func TestRunStopMonitor(t *testing.T) {
t.Run("keepalive signal calls conn.Close and onStop", func(t *testing.T) {
conn, _, _, _ := setupWorkerTestConnection(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
keepalive := make(chan struct{}, 1)
sessionDone := make(chan struct{})
onStopCalled := atomic.Bool{}
onStop := func() { onStopCalled.Store(true) }
w := &Worker{id: "wss://test"}
go w.runStopMonitor(ctx, conn, keepalive, sessionDone, onStop)
keepalive <- struct{}{}
assert.Eventually(t, func() bool {
return connClosed(conn)
}, honeybeetest.TestTimeout, honeybeetest.TestTick)
assert.True(t, onStopCalled.Load())
})
t.Run("ctx.Done calls conn.Close and onStop", func(t *testing.T) {
conn, _, _, _ := setupWorkerTestConnection(t)
ctx, cancel := context.WithCancel(context.Background())
keepalive := make(chan struct{})
sessionDone := make(chan struct{})
onStopCalled := atomic.Bool{}
onStop := func() { onStopCalled.Store(true) }
w := &Worker{id: "wss://test"}
go w.runStopMonitor(ctx, conn, keepalive, sessionDone, onStop)
cancel()
assert.Eventually(t, func() bool {
return connClosed(conn)
}, honeybeetest.TestTimeout, honeybeetest.TestTick)
assert.True(t, onStopCalled.Load())
})
t.Run("sessionDone close calls conn.Close and onStop", func(t *testing.T) {
conn, _, _, _ := setupWorkerTestConnection(t)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
keepalive := make(chan struct{})
sessionDone := make(chan struct{})
onStopCalled := atomic.Bool{}
onStop := func() { onStopCalled.Store(true) }
w := &Worker{id: "wss://test"}
go w.runStopMonitor(ctx, conn, keepalive, sessionDone, onStop)
close(sessionDone)
assert.Eventually(t, func() bool {
return connClosed(conn)
}, honeybeetest.TestTimeout, honeybeetest.TestTick)
assert.True(t, onStopCalled.Load())
})
}
func TestRunDialer(t *testing.T) {
t.Run("successful dial delivers connection to newConn", func(t *testing.T) {
w := &Worker{id: "wss://test"}