Decoupled worker from goroutines.

This commit is contained in:
Jay
2026-04-20 08:45:04 -04:00
parent 9859796338
commit 9b29592a39
10 changed files with 458 additions and 429 deletions
+244 -199
View File
@@ -5,6 +5,7 @@ import (
"fmt"
"git.wisehodl.dev/jay/go-honeybee/honeybeetest"
"git.wisehodl.dev/jay/go-honeybee/transport"
"sync/atomic"
"testing"
)
@@ -20,145 +21,180 @@ func drainEvent(t *testing.T, events <-chan PoolEvent, kind PoolEventKind) {
}, fmt.Sprintf("expected %s event", kind))
}
func TestRunSessionDial(t *testing.T) {
setup := func(t *testing.T) (
w *DefaultWorker,
ctx context.Context,
cancel context.CancelFunc,
dial chan struct{},
keepalive chan struct{},
newConn chan *transport.Connection,
) {
t.Helper()
ctx, cancel = context.WithCancel(context.Background())
w = &DefaultWorker{
Ctx: ctx,
Cancel: cancel,
Id: "wss://test",
Config: GetDefaultWorkerConfig(),
Heartbeat: make(chan struct{}),
type testVars struct {
id string
dial chan struct{}
keepalive chan struct{}
heartbeat chan struct{}
newConn chan *transport.Connection
messages chan ReceivedMessage
conn *transport.Connection
mockSocket *honeybeetest.MockSocket
incomingData chan honeybeetest.MockIncomingData
outgoingData chan honeybeetest.MockOutgoingData
connPtr *atomic.Pointer[transport.Connection]
}
func setup(t *testing.T) (
ctx context.Context,
cancel context.CancelFunc,
vars testVars,
) {
t.Helper()
ctx, cancel = context.WithCancel(context.Background())
conn, mockSocket, incomingData, outgoingData := setupWorkerTestConnection(t)
vars = testVars{
id: "wss://test",
dial: make(chan struct{}, 1),
keepalive: make(chan struct{}, 1),
heartbeat: make(chan struct{}, 1),
newConn: make(chan *transport.Connection, 1),
messages: make(chan ReceivedMessage, 256),
conn: conn,
mockSocket: mockSocket,
incomingData: incomingData,
outgoingData: outgoingData,
connPtr: &atomic.Pointer[transport.Connection]{},
}
return
}
func expectDial(t *testing.T, dial <-chan struct{}) {
t.Helper()
honeybeetest.Eventually(t, func() bool {
select {
case <-dial:
return true
default:
return false
}
dial = make(chan struct{}, 1)
keepalive = make(chan struct{}, 1)
newConn = make(chan *transport.Connection, 1)
return
}
expectDial := func(t *testing.T, dial <-chan struct{}) {
t.Helper()
honeybeetest.Eventually(t, func() bool {
select {
case <-dial:
return true
default:
return false
}
}, "expected dial signal")
}
}, "expected dial signal")
}
func TestRunSessionDial(t *testing.T) {
t.Run("fires dial immediately on entry", func(t *testing.T) {
w, ctx, cancel, dial, keepalive, newConn := setup(t)
ctx, cancel, v := setup(t)
defer cancel()
messages := make(chan ReceivedMessage, 1)
wctx := WorkerContext{Events: make(chan PoolEvent, 10)}
pool := PoolPlugin{Events: make(chan PoolEvent, 10)}
session := &Session{
id: v.id,
connPtr: v.connPtr,
messages: v.messages,
heartbeat: v.heartbeat,
dial: v.dial,
keepalive: v.keepalive,
newConn: v.newConn,
}
go w.RunSession(ctx, wctx, messages, dial, keepalive, newConn)
go session.Start(ctx, pool)
expectDial(t, dial)
expectDial(t, v.dial)
})
t.Run("keepalive fires dial", func(t *testing.T) {
w, ctx, cancel, dial, keepalive, newConn := setup(t)
ctx, cancel, v := setup(t)
defer cancel()
messages := make(chan ReceivedMessage, 1)
wctx := WorkerContext{Events: make(chan PoolEvent, 10)}
pool := PoolPlugin{Events: make(chan PoolEvent, 10)}
session := &Session{
id: v.id,
connPtr: v.connPtr,
messages: v.messages,
heartbeat: v.heartbeat,
dial: v.dial,
keepalive: v.keepalive,
newConn: v.newConn,
}
go w.RunSession(ctx, wctx, messages, dial, keepalive, newConn)
go session.Start(ctx, pool)
// drain initial dial
expectDial(t, dial)
expectDial(t, v.dial)
keepalive <- struct{}{}
expectDial(t, dial)
v.keepalive <- struct{}{}
expectDial(t, v.dial)
})
t.Run("multiple keepalive signals each fire dial", func(t *testing.T) {
w, ctx, cancel, dial, keepalive, newConn := setup(t)
ctx, cancel, v := setup(t)
defer cancel()
messages := make(chan ReceivedMessage, 1)
wctx := WorkerContext{Events: make(chan PoolEvent, 10)}
pool := PoolPlugin{Events: make(chan PoolEvent, 10)}
session := &Session{
id: v.id,
connPtr: v.connPtr,
messages: v.messages,
heartbeat: v.heartbeat,
dial: v.dial,
keepalive: v.keepalive,
newConn: v.newConn,
}
go w.RunSession(ctx, wctx, messages, dial, keepalive, newConn)
go session.Start(ctx, pool)
// drain initial dial
expectDial(t, dial)
expectDial(t, v.dial)
for i := 0; i < 3; i++ {
keepalive <- struct{}{}
expectDial(t, dial)
v.keepalive <- struct{}{}
expectDial(t, v.dial)
}
})
}
func TestRunSessionConnect(t *testing.T) {
setup := func(t *testing.T) (
w *DefaultWorker,
ctx context.Context,
cancel context.CancelFunc,
dial chan struct{},
keepalive chan struct{},
newConn chan *transport.Connection,
messages chan ReceivedMessage,
) {
t.Helper()
ctx, cancel = context.WithCancel(context.Background())
w = &DefaultWorker{
Ctx: ctx,
Cancel: cancel,
Id: "wss://test",
Config: GetDefaultWorkerConfig(),
Heartbeat: make(chan struct{}),
}
dial = make(chan struct{}, 1)
keepalive = make(chan struct{}, 1)
newConn = make(chan *transport.Connection, 1)
messages = make(chan ReceivedMessage, 256)
return
}
t.Run("w.conn set after newConn received", func(t *testing.T) {
w, ctx, cancel, dial, keepalive, newConn, messages := setup(t)
wctx := WorkerContext{Events: make(chan PoolEvent, 10)}
t.Run("connection pointer set after newConn received", func(t *testing.T) {
ctx, cancel, v := setup(t)
defer cancel()
conn, _, _, _ := setupWorkerTestConnection(t)
go w.RunSession(ctx, wctx, messages, dial, keepalive, newConn)
pool := PoolPlugin{Events: make(chan PoolEvent, 10)}
session := &Session{
id: v.id,
connPtr: v.connPtr,
messages: v.messages,
heartbeat: v.heartbeat,
dial: v.dial,
keepalive: v.keepalive,
newConn: v.newConn,
}
newConn <- conn
go session.Start(ctx, pool)
v.newConn <- v.conn
honeybeetest.Eventually(t, func() bool {
return w.Conn.Load() != nil
}, "expected w.conn to be set")
return v.connPtr.Load() != nil
}, "expected connection pointer to be set")
})
t.Run("EventConnected emitted", func(t *testing.T) {
w, ctx, cancel, dial, keepalive, newConn, messages := setup(t)
events := make(chan PoolEvent, 10)
wctx := WorkerContext{Events: events}
ctx, cancel, v := setup(t)
defer cancel()
conn, _, _, _ := setupWorkerTestConnection(t)
go w.RunSession(ctx, wctx, messages, dial, keepalive, newConn)
events := make(chan PoolEvent, 10)
pool := PoolPlugin{Events: events}
session := &Session{
id: v.id,
connPtr: v.connPtr,
messages: v.messages,
heartbeat: v.heartbeat,
dial: v.dial,
keepalive: v.keepalive,
newConn: v.newConn,
}
newConn <- conn
go session.Start(ctx, pool)
v.newConn <- v.conn
honeybeetest.Eventually(t, func() bool {
select {
case event := <-events:
return event.ID == w.Id && event.Kind == EventConnected
return event.ID == v.id && event.Kind == EventConnected
default:
return false
}
@@ -167,86 +203,91 @@ func TestRunSessionConnect(t *testing.T) {
}
func TestRunSessionDisconnect(t *testing.T) {
setup := func(t *testing.T) (
w *DefaultWorker,
ctx context.Context,
cancel context.CancelFunc,
dial chan struct{},
keepalive chan struct{},
newConn chan *transport.Connection,
messages chan ReceivedMessage,
conn *transport.Connection,
incomingData chan honeybeetest.MockIncomingData,
) {
t.Helper()
ctx, cancel = context.WithCancel(context.Background())
w = &DefaultWorker{
Ctx: ctx,
Cancel: cancel,
Id: "wss://test",
Config: GetDefaultWorkerConfig(),
Heartbeat: make(chan struct{}),
}
dial = make(chan struct{}, 1)
keepalive = make(chan struct{}, 1)
newConn = make(chan *transport.Connection, 1)
messages = make(chan ReceivedMessage, 256)
conn, _, incomingData, _ = setupWorkerTestConnection(t)
return
}
t.Run("EventDisconnected emitted on connection close", func(t *testing.T) {
w, ctx, cancel, dial, keepalive, newConn, messages, conn, incomingData := setup(t)
events := make(chan PoolEvent, 10)
wctx := WorkerContext{Events: events}
ctx, cancel, v := setup(t)
defer cancel()
go w.RunSession(ctx, wctx, messages, dial, keepalive, newConn)
newConn <- conn
events := make(chan PoolEvent, 10)
pool := PoolPlugin{Events: events}
session := &Session{
id: v.id,
connPtr: v.connPtr,
messages: v.messages,
heartbeat: v.heartbeat,
dial: v.dial,
keepalive: v.keepalive,
newConn: v.newConn,
}
go session.Start(ctx, pool)
v.newConn <- v.conn
drainEvent(t, events, EventConnected)
close(incomingData)
close(v.incomingData)
drainEvent(t, events, EventDisconnected)
})
t.Run("w.conn cleared after disconnect", func(t *testing.T) {
w, ctx, cancel, dial, keepalive, newConn, messages, conn, incomingData := setup(t)
events := make(chan PoolEvent, 10)
wctx := WorkerContext{Events: events}
t.Run("connection pointer cleared after disconnect", func(t *testing.T) {
ctx, cancel, v := setup(t)
defer cancel()
go w.RunSession(ctx, wctx, messages, dial, keepalive, newConn)
newConn <- conn
events := make(chan PoolEvent, 10)
pool := PoolPlugin{Events: events}
session := &Session{
id: v.id,
connPtr: v.connPtr,
messages: v.messages,
heartbeat: v.heartbeat,
dial: v.dial,
keepalive: v.keepalive,
newConn: v.newConn,
}
go session.Start(ctx, pool)
v.newConn <- v.conn
drainEvent(t, events, EventConnected)
close(incomingData)
close(v.incomingData)
drainEvent(t, events, EventDisconnected)
honeybeetest.Eventually(t, func() bool {
return w.Conn.Load() == nil
}, "expected w.conn to be cleared")
return v.connPtr.Load() == nil
}, "expected connection pointer to be nil")
})
t.Run("dial fires again after disconnect", func(t *testing.T) {
w, ctx, cancel, dial, keepalive, newConn, messages, conn, incomingData := setup(t)
events := make(chan PoolEvent, 10)
wctx := WorkerContext{Events: events}
ctx, cancel, v := setup(t)
defer cancel()
go w.RunSession(ctx, wctx, messages, dial, keepalive, newConn)
newConn <- conn
events := make(chan PoolEvent, 10)
pool := PoolPlugin{Events: events}
session := &Session{
id: v.id,
connPtr: v.connPtr,
messages: v.messages,
heartbeat: v.heartbeat,
dial: v.dial,
keepalive: v.keepalive,
newConn: v.newConn,
}
go session.Start(ctx, pool)
v.newConn <- v.conn
drainEvent(t, events, EventConnected)
// drain the initial dial signal before disconnecting
<-dial
<-v.dial
close(incomingData)
close(v.incomingData)
drainEvent(t, events, EventDisconnected)
honeybeetest.Eventually(t, func() bool {
select {
case <-dial:
case <-v.dial:
return true
default:
return false
@@ -255,60 +296,54 @@ func TestRunSessionDisconnect(t *testing.T) {
})
t.Run("second connection cycle emits EventConnected", func(t *testing.T) {
w, ctx, cancel, dial, keepalive, newConn, messages, conn, incomingData := setup(t)
events := make(chan PoolEvent, 10)
wctx := WorkerContext{Events: events}
ctx, cancel, v := setup(t)
defer cancel()
go w.RunSession(ctx, wctx, messages, dial, keepalive, newConn)
newConn <- conn
events := make(chan PoolEvent, 10)
pool := PoolPlugin{Events: events}
session := &Session{
id: v.id,
connPtr: v.connPtr,
messages: v.messages,
heartbeat: v.heartbeat,
dial: v.dial,
keepalive: v.keepalive,
newConn: v.newConn,
}
go session.Start(ctx, pool)
v.newConn <- v.conn
drainEvent(t, events, EventConnected)
close(incomingData)
close(v.incomingData)
drainEvent(t, events, EventDisconnected)
conn2, _, _, _ := setupWorkerTestConnection(t)
newConn <- conn2
v.newConn <- conn2
drainEvent(t, events, EventConnected)
})
}
func TestRunSessionCancellation(t *testing.T) {
setup := func(t *testing.T) (
w *DefaultWorker,
ctx context.Context,
cancel context.CancelFunc,
dial chan struct{},
keepalive chan struct{},
newConn chan *transport.Connection,
messages chan ReceivedMessage,
) {
t.Helper()
ctx, cancel = context.WithCancel(context.Background())
w = &DefaultWorker{
Ctx: ctx,
Cancel: cancel,
Id: "wss://test",
Config: GetDefaultWorkerConfig(),
Heartbeat: make(chan struct{}),
}
dial = make(chan struct{}, 1)
keepalive = make(chan struct{}, 1)
newConn = make(chan *transport.Connection, 1)
messages = make(chan ReceivedMessage, 256)
return
}
t.Run("ctx cancelled pre-connection exits without emitting events", func(t *testing.T) {
w, ctx, cancel, dial, keepalive, newConn, messages := setup(t)
ctx, cancel, v := setup(t)
events := make(chan PoolEvent, 10)
wctx := WorkerContext{Events: events}
pool := PoolPlugin{Events: events}
session := &Session{
id: v.id,
connPtr: v.connPtr,
messages: v.messages,
heartbeat: v.heartbeat,
dial: v.dial,
keepalive: v.keepalive,
newConn: v.newConn,
}
done := make(chan struct{})
go func() {
defer close(done)
w.RunSession(ctx, wctx, messages, dial, keepalive, newConn)
session.Start(ctx, pool)
}()
cancel()
@@ -333,24 +368,29 @@ func TestRunSessionCancellation(t *testing.T) {
})
t.Run("ctx cancelled post-connection emits EventDisconnected", func(t *testing.T) {
w, ctx, cancel, dial, keepalive, newConn, messages := setup(t)
ctx, cancel, v := setup(t)
events := make(chan PoolEvent, 10)
wctx := WorkerContext{Events: events}
conn, _, _, _ := setupWorkerTestConnection(t)
pool := PoolPlugin{Events: events}
session := &Session{
id: v.id,
connPtr: v.connPtr,
messages: v.messages,
heartbeat: v.heartbeat,
dial: v.dial,
keepalive: v.keepalive,
newConn: v.newConn,
}
done := make(chan struct{})
go func() {
defer close(done)
w.RunSession(ctx, wctx, messages, dial, keepalive, newConn)
session.Start(ctx, pool)
}()
newConn <- conn
v.newConn <- v.conn
drainEvent(t, events, EventConnected)
cancel()
drainEvent(t, events, EventDisconnected)
honeybeetest.Eventually(t, func() bool {
@@ -363,29 +403,34 @@ func TestRunSessionCancellation(t *testing.T) {
}, "expected runSession to exit")
})
t.Run("ctx cancelled post-connection clears w.conn", func(t *testing.T) {
w, ctx, cancel, dial, keepalive, newConn, messages := setup(t)
t.Run("ctx cancelled post-connection clears connection pointer", func(t *testing.T) {
ctx, cancel, v := setup(t)
events := make(chan PoolEvent, 10)
wctx := WorkerContext{Events: events}
conn, _, _, _ := setupWorkerTestConnection(t)
pool := PoolPlugin{Events: events}
session := &Session{
id: v.id,
connPtr: v.connPtr,
messages: v.messages,
heartbeat: v.heartbeat,
dial: v.dial,
keepalive: v.keepalive,
newConn: v.newConn,
}
done := make(chan struct{})
go func() {
defer close(done)
w.RunSession(ctx, wctx, messages, dial, keepalive, newConn)
session.Start(ctx, pool)
}()
newConn <- conn
v.newConn <- v.conn
drainEvent(t, events, EventConnected)
cancel()
drainEvent(t, events, EventDisconnected)
honeybeetest.Eventually(t, func() bool {
return w.Conn.Load() == nil
}, "expected w.conn to clear")
return v.connPtr.Load() == nil
}, "expected connection pointer to be nil")
})
}