Files
go-honeybee/inbound/worker_test.go
T
2026-04-24 10:59:33 -04:00

265 lines
6.1 KiB
Go

package inbound
import (
"context"
"fmt"
"git.wisehodl.dev/jay/go-honeybee/honeybeetest"
"git.wisehodl.dev/jay/go-honeybee/transport"
"github.com/gorilla/websocket"
"github.com/stretchr/testify/assert"
"sync"
"sync/atomic"
"testing"
"time"
)
type workerTestVars struct {
worker *DefaultWorker
conn *transport.Connection
incoming chan honeybeetest.MockIncomingData
outgoing chan honeybeetest.MockOutgoingData
pool PoolPlugin
inbox chan InboxMessage
events chan PoolEvent
exitKind *atomic.Value
wg *sync.WaitGroup
}
func setupWorkerTest(t *testing.T) workerTestVars {
t.Helper()
conn, _, incoming, outgoing := setupTestConnection(t)
ctx, cancel := context.WithCancel(context.Background())
var err error
worker, err := NewWorker(ctx, "peer-1", conn, nil, nil)
assert.NoError(t, err)
worker.cancel = cancel
inbox := make(chan InboxMessage, 256)
events := make(chan PoolEvent, 10)
exitKind := &atomic.Value{}
var once sync.Once
pool := PoolPlugin{
Inbox: inbox,
Events: events,
OnExit: func(kind WorkerExitKind) {
once.Do(func() { exitKind.Store(kind) })
},
}
wg := &sync.WaitGroup{}
wg.Add(1)
return workerTestVars{
worker: worker,
conn: conn,
incoming: incoming,
outgoing: outgoing,
pool: pool,
inbox: inbox,
events: events,
exitKind: exitKind,
wg: wg,
}
}
func TestWorkerStart(t *testing.T) {
t.Run("socket data arrives on inbox", func(t *testing.T) {
v := setupWorkerTest(t)
defer v.worker.Stop()
go v.worker.Start(v.pool)
v.incoming <- honeybeetest.MockIncomingData{
MsgType: websocket.TextMessage,
Data: []byte("hello"),
}
honeybeetest.Eventually(t, func() bool {
select {
case msg := <-v.inbox:
return msg.ID == "peer-1" && string(msg.Data) == "hello"
default:
return false
}
}, "expected message on inbox")
})
t.Run("clean peer close calls OnExit with ExitCleanDisconnect", func(t *testing.T) {
v := setupWorkerTest(t)
defer v.worker.Stop()
go v.worker.Start(v.pool)
v.incoming <- honeybeetest.MockIncomingData{
Err: &websocket.CloseError{Code: websocket.CloseNormalClosure},
}
honeybeetest.Eventually(t, func() bool {
val := v.exitKind.Load()
return val != nil && val.(WorkerExitKind) == ExitDisconnected
}, "expected ExitCleanDisconnect")
})
t.Run("unexpected peer close calls OnExit with ExitUnexpectedDrop", func(t *testing.T) {
v := setupWorkerTest(t)
defer v.worker.Stop()
go v.worker.Start(v.pool)
v.incoming <- honeybeetest.MockIncomingData{
Err: &websocket.CloseError{Code: websocket.CloseProtocolError},
}
honeybeetest.Eventually(t, func() bool {
val := v.exitKind.Load()
return val != nil && val.(WorkerExitKind) == ExitUnexpectedClose
}, "expected ExitUnexpectedDrop")
})
t.Run("watchdog timeout calls OnExit with ExitInactive", func(t *testing.T) {
conn, _, _, _ := setupTestConnection(t)
ctx, cancel := context.WithCancel(context.Background())
worker, err := NewWorker(ctx, "peer-1", conn, &WorkerConfig{
InactivityTimeout: 20 * time.Millisecond,
}, nil)
assert.NoError(t, err)
worker.cancel = cancel
defer worker.Stop()
exitKind := &atomic.Value{}
var once sync.Once
pool := PoolPlugin{
Inbox: make(chan InboxMessage, 256),
Events: make(chan PoolEvent, 10),
OnExit: func(kind WorkerExitKind) {
once.Do(func() { exitKind.Store(kind) })
},
}
var wg sync.WaitGroup
wg.Add(1)
go func() {
worker.Start(pool)
wg.Done()
}()
honeybeetest.Eventually(t, func() bool {
val := exitKind.Load()
return val != nil && val.(WorkerExitKind) == ExitPolicy
}, "expected ExitInactive")
})
}
func TestWorkerStop(t *testing.T) {
v := setupWorkerTest(t)
go func() { v.worker.Start(v.pool); v.wg.Done() }()
v.worker.Stop()
done := make(chan struct{})
go func() { v.wg.Wait(); close(done) }()
honeybeetest.Eventually(t, func() bool {
select {
case <-done:
return true
default:
return false
}
}, "expected wg to drain")
// does not call onExit
assert.Nil(t, v.exitKind.Load())
}
func TestWorkerSend(t *testing.T) {
t.Run("Send delivers data to socket", func(t *testing.T) {
v := setupWorkerTest(t)
defer v.worker.Stop()
go v.worker.Start(v.pool)
err := v.worker.Send([]byte("hello"))
assert.NoError(t, err)
honeybeetest.ExpectWrite(t, v.outgoing, websocket.TextMessage, []byte("hello"))
})
t.Run("Send produces heartbeats", func(t *testing.T) {
v := setupWorkerTest(t)
defer v.worker.Stop()
count := atomic.Int32{}
go func() {
for range v.worker.heartbeat {
count.Add(1)
}
}()
// do not start the worker, allow heartbeats to be drained manually
for i := 0; i < 3; i++ {
err := v.worker.Send([]byte(fmt.Sprintf("msg-%d", i)))
assert.NoError(t, err)
}
honeybeetest.Eventually(t, func() bool {
return count.Load() == 3
}, "expected heartbeats")
})
t.Run("Send returns error after connection closed", func(t *testing.T) {
v := setupWorkerTest(t)
defer v.worker.Stop()
go v.worker.Start(v.pool)
v.conn.Close()
honeybeetest.Eventually(t, func() bool {
return v.conn.State() == transport.StateClosed
}, "expected connection closed")
err := v.worker.Send([]byte("hello"))
assert.Error(t, err)
})
}
func TestHeartbeatForwarder(t *testing.T) {
t.Run("connection level heartbeat propagates", func(t *testing.T) {
socket, _, _ := honeybeetest.SetupTestSocket(t)
var pongHandler func(string) error
socket.SetPongHandlerFunc = func(h func(string) error) { pongHandler = h }
conn, err := transport.NewConnectionFromSocket(socket, nil, nil)
assert.NoError(t, err)
heartbeat := make(chan struct{}, 1)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go RunHeartbeatForwarder(ctx, conn, heartbeat, nil)
honeybeetest.Eventually(t, func() bool {
return pongHandler != nil
}, "expected Connection to register PongHandler")
if pongHandler == nil {
t.Fatal("pong handler was never set")
}
pongHandler("") // Trigger pong
select {
case <-heartbeat:
case <-time.After(time.Second):
t.Fatal("pong did not propagate to worker heartbeat")
}
})
}