From ba5484e0dd6c8f13cdfdb8d716ac0ff8b1e9f6e1 Mon Sep 17 00:00:00 2001 From: Jay Date: Wed, 20 May 2026 08:37:44 -0400 Subject: [PATCH] collapse queue/forwarder path: reader writes directly to pool inbox --- outbound/worker.go | 72 +++++---------------------- outbound/worker_forwarder_test.go | 33 ------------ outbound/worker_session_inner_test.go | 22 ++++---- outbound/worker_session_test.go | 28 +++++------ outbound/worker_start_test.go | 2 - 5 files changed, 38 insertions(+), 119 deletions(-) delete mode 100644 outbound/worker_forwarder_test.go diff --git a/outbound/worker.go b/outbound/worker.go index 560e187..aca3fc0 100644 --- a/outbound/worker.go +++ b/outbound/worker.go @@ -3,7 +3,6 @@ package outbound import ( "context" "git.wisehodl.dev/jay/go-honeybee/logging" - "git.wisehodl.dev/jay/go-honeybee/queue" "git.wisehodl.dev/jay/go-honeybee/transport" "git.wisehodl.dev/jay/go-honeybee/types" "log/slog" @@ -24,8 +23,6 @@ type Worker interface { type WorkerStats struct { IncomingAvailable bool ChanIncoming int - ChanQueue int - ChanForwarder int BufferDepth int64 ConnectionAvailable bool @@ -41,9 +38,7 @@ type DefaultWorker struct { id string conn atomic.Pointer[transport.Connection] - heartbeat chan struct{} - toQueue chan types.ReceivedMessage - toForwarder chan types.ReceivedMessage + heartbeat chan struct{} processedCount *atomic.Uint64 droppedCount *atomic.Uint64 @@ -75,8 +70,6 @@ func NewWorker( id: id, config: config, heartbeat: make(chan struct{}), - toQueue: make(chan types.ReceivedMessage, 256), - toForwarder: make(chan types.ReceivedMessage, 256), processedCount: &atomic.Uint64{}, droppedCount: &atomic.Uint64{}, outgoingCount: &atomic.Uint64{}, @@ -100,7 +93,7 @@ func (w *DefaultWorker) Start(pool PoolPlugin) { keepalive := make(chan struct{}, 1) var wg sync.WaitGroup - wg.Add(5) + wg.Add(3) go func() { defer wg.Done() @@ -112,22 +105,12 @@ func (w *DefaultWorker) Start(pool PoolPlugin) { RunKeepalive(w.ctx, w.heartbeat, keepalive, w.config.KeepaliveTimeout, w.logger) }() - go func() { - defer wg.Done() - queue.RunQueue(w.id, w.ctx, w.toQueue, w.toForwarder, w.config.MaxQueueSize, w.droppedCount, w.bufferDepth) - }() - - go func() { - defer wg.Done() - RunForwarder(w.id, w.ctx, w.toForwarder, pool.Inbox, w.processedCount, pool.InboxCounter) - }() - go func() { defer wg.Done() session := &Session{ id: w.id, connPtr: &w.conn, - messages: w.toQueue, + poolInbox: pool.Inbox, heartbeat: w.heartbeat, dial: dial, keepalive: keepalive, @@ -193,8 +176,6 @@ func (w *DefaultWorker) Stats() WorkerStats { return WorkerStats{ IncomingAvailable: connectionAvailable, ChanIncoming: incomingLen, - ChanQueue: len(w.toQueue), - ChanForwarder: len(w.toForwarder), BufferDepth: w.bufferDepth.Load(), ConnectionAvailable: connectionAvailable, @@ -211,7 +192,7 @@ type Session struct { id string connPtr *atomic.Pointer[transport.Connection] - messages chan<- types.ReceivedMessage + poolInbox chan<- types.InboxMessage heartbeat chan<- struct{} dial chan<- struct{} @@ -276,7 +257,7 @@ func (s *Session) Start( wg.Add(3) go func() { defer wg.Done() - RunReader(sctx, onStop, conn, s.messages, s.heartbeat, s.logger) + RunReader(s.id, sctx, onStop, conn, s.poolInbox, s.heartbeat, s.logger) }() go func() { defer wg.Done() @@ -317,10 +298,11 @@ func (s *Session) Start( } func RunReader( + id string, ctx context.Context, onStop func(), conn *transport.Connection, - messages chan<- types.ReceivedMessage, + poolInbox chan<- types.InboxMessage, heartbeat chan<- struct{}, logger *slog.Logger, ) { @@ -347,7 +329,11 @@ func RunReader( } // send message forward - messages <- types.ReceivedMessage{Data: data, ReceivedAt: time.Now()} + poolInbox <- types.InboxMessage{ + ID: id, + Data: data, + ReceivedAt: time.Now(), + } // send heartbeat select { @@ -407,38 +393,6 @@ func RunStopMonitor( } } -func RunForwarder( - id string, - ctx context.Context, - messages <-chan types.ReceivedMessage, - inbox chan<- types.InboxMessage, - workerProcessedCount *atomic.Uint64, - poolInboxCount *atomic.Uint64, -) { - for { - select { - case <-ctx.Done(): - return - case msg, ok := <-messages: - if !ok { - return - } - select { - case <-ctx.Done(): - return - - case inbox <- types.InboxMessage{ - ID: id, - Data: msg.Data, - ReceivedAt: msg.ReceivedAt, - }: - workerProcessedCount.Add(1) - poolInboxCount.Add(1) - } - } - } -} - func RunKeepalive( ctx context.Context, heartbeat <-chan struct{}, @@ -558,7 +512,7 @@ func RunDialer( goto drained } } - drained: + drained: // send the new connection or close and exit select { diff --git a/outbound/worker_forwarder_test.go b/outbound/worker_forwarder_test.go deleted file mode 100644 index 7b1e519..0000000 --- a/outbound/worker_forwarder_test.go +++ /dev/null @@ -1,33 +0,0 @@ -package outbound - -import ( - "context" - "git.wisehodl.dev/jay/go-honeybee/honeybeetest" - "git.wisehodl.dev/jay/go-honeybee/types" - "sync/atomic" - "testing" - "time" -) - -func TestRunForwarder(t *testing.T) { - t.Run("message passes through to inbox", func(t *testing.T) { - id := "wss://test" - messages := make(chan types.ReceivedMessage, 1) - inbox := make(chan types.InboxMessage, 1) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - go RunForwarder(id, ctx, messages, inbox, &atomic.Uint64{}, &atomic.Uint64{}) - - messages <- types.ReceivedMessage{Data: []byte("hello"), ReceivedAt: time.Now()} - - honeybeetest.Eventually(t, func() bool { - select { - case msg := <-inbox: - return string(msg.Data) == "hello" && msg.ID == "wss://test" - default: - return false - } - }, "expected message") - }) -} diff --git a/outbound/worker_session_inner_test.go b/outbound/worker_session_inner_test.go index f92689d..d3777d5 100644 --- a/outbound/worker_session_inner_test.go +++ b/outbound/worker_session_inner_test.go @@ -19,7 +19,7 @@ func TestRunReader(t *testing.T) { conn, _, incomingData, _ := setupTestConnection(t) defer conn.Close() - messages := make(chan types.ReceivedMessage, 1) + inbox := make(chan types.InboxMessage, 1) heartbeat := make(chan struct{}) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -28,7 +28,7 @@ func TestRunReader(t *testing.T) { for range heartbeat { } }() - go RunReader(ctx, cancel, conn, messages, heartbeat, nil) + go RunReader("wss://test", ctx, cancel, conn, inbox, heartbeat, nil) before := time.Now() incomingData <- honeybeetest.MockIncomingData{ @@ -38,7 +38,7 @@ func TestRunReader(t *testing.T) { honeybeetest.Eventually(t, func() bool { select { - case msg := <-messages: + case msg := <-inbox: return string(msg.Data) == "hello" && msg.ReceivedAt.After(before) default: return false @@ -50,7 +50,7 @@ func TestRunReader(t *testing.T) { conn, _, incomingData, _ := setupTestConnection(t) defer conn.Close() - messages := make(chan types.ReceivedMessage, 10) + inbox := make(chan types.InboxMessage, 10) heartbeat := make(chan struct{}) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -62,10 +62,10 @@ func TestRunReader(t *testing.T) { } }() go func() { - for range messages { + for range inbox { } }() - go RunReader(ctx, cancel, conn, messages, heartbeat, nil) + go RunReader("wss://test", ctx, cancel, conn, inbox, heartbeat, nil) const count = 3 for i := 0; i < count; i++ { @@ -83,7 +83,7 @@ func TestRunReader(t *testing.T) { t.Run("incoming channel close calls conn.Close and onStop", func(t *testing.T) { conn, _, incomingData, _ := setupTestConnection(t) - messages := make(chan types.ReceivedMessage, 1) + inbox := make(chan types.InboxMessage, 1) heartbeat := make(chan struct{}) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -93,10 +93,10 @@ func TestRunReader(t *testing.T) { } }() go func() { - for range messages { + for range inbox { } }() - go RunReader(ctx, cancel, conn, messages, heartbeat, nil) + go RunReader("wss://test", ctx, cancel, conn, inbox, heartbeat, nil) // induce connection closure via reader incomingData <- honeybeetest.MockIncomingData{Err: io.EOF} @@ -121,11 +121,11 @@ func TestRunReader(t *testing.T) { t.Run("sessionDone close calls conn.Close and onStop", func(t *testing.T) { conn, _, _, _ := setupTestConnection(t) - messages := make(chan types.ReceivedMessage, 1) + inbox := make(chan types.InboxMessage, 1) heartbeat := make(chan struct{}) ctx, cancel := context.WithCancel(context.Background()) - go RunReader(ctx, cancel, conn, messages, heartbeat, nil) + go RunReader("wss://test", ctx, cancel, conn, inbox, heartbeat, nil) cancel() diff --git a/outbound/worker_session_test.go b/outbound/worker_session_test.go index 4d16c30..298312d 100644 --- a/outbound/worker_session_test.go +++ b/outbound/worker_session_test.go @@ -29,7 +29,7 @@ type testVars struct { keepalive chan struct{} heartbeat chan struct{} newConn chan *transport.Connection - messages chan types.ReceivedMessage + poolInbox chan types.InboxMessage conn *transport.Connection mockSocket *honeybeetest.MockSocket @@ -53,7 +53,7 @@ func setup(t *testing.T) ( keepalive: make(chan struct{}, 1), heartbeat: make(chan struct{}, 1), newConn: make(chan *transport.Connection, 1), - messages: make(chan types.ReceivedMessage, 256), + poolInbox: make(chan types.InboxMessage, 256), conn: conn, mockSocket: mockSocket, incomingData: incomingData, @@ -84,7 +84,7 @@ func TestRunSessionDial(t *testing.T) { session := &Session{ id: v.id, connPtr: v.connPtr, - messages: v.messages, + poolInbox: v.poolInbox, heartbeat: v.heartbeat, dial: v.dial, keepalive: v.keepalive, @@ -104,7 +104,7 @@ func TestRunSessionDial(t *testing.T) { session := &Session{ id: v.id, connPtr: v.connPtr, - messages: v.messages, + poolInbox: v.poolInbox, heartbeat: v.heartbeat, dial: v.dial, keepalive: v.keepalive, @@ -128,7 +128,7 @@ func TestRunSessionDial(t *testing.T) { session := &Session{ id: v.id, connPtr: v.connPtr, - messages: v.messages, + poolInbox: v.poolInbox, heartbeat: v.heartbeat, dial: v.dial, keepalive: v.keepalive, @@ -156,7 +156,7 @@ func TestRunSessionConnect(t *testing.T) { session := &Session{ id: v.id, connPtr: v.connPtr, - messages: v.messages, + poolInbox: v.poolInbox, heartbeat: v.heartbeat, dial: v.dial, keepalive: v.keepalive, @@ -181,7 +181,7 @@ func TestRunSessionConnect(t *testing.T) { session := &Session{ id: v.id, connPtr: v.connPtr, - messages: v.messages, + poolInbox: v.poolInbox, heartbeat: v.heartbeat, dial: v.dial, keepalive: v.keepalive, @@ -213,7 +213,7 @@ func TestRunSessionDisconnect(t *testing.T) { session := &Session{ id: v.id, connPtr: v.connPtr, - messages: v.messages, + poolInbox: v.poolInbox, heartbeat: v.heartbeat, dial: v.dial, keepalive: v.keepalive, @@ -240,7 +240,7 @@ func TestRunSessionDisconnect(t *testing.T) { session := &Session{ id: v.id, connPtr: v.connPtr, - messages: v.messages, + poolInbox: v.poolInbox, heartbeat: v.heartbeat, dial: v.dial, keepalive: v.keepalive, @@ -270,7 +270,7 @@ func TestRunSessionDisconnect(t *testing.T) { session := &Session{ id: v.id, connPtr: v.connPtr, - messages: v.messages, + poolInbox: v.poolInbox, heartbeat: v.heartbeat, dial: v.dial, keepalive: v.keepalive, @@ -308,7 +308,7 @@ func TestRunSessionDisconnect(t *testing.T) { session := &Session{ id: v.id, connPtr: v.connPtr, - messages: v.messages, + poolInbox: v.poolInbox, heartbeat: v.heartbeat, dial: v.dial, keepalive: v.keepalive, @@ -338,7 +338,7 @@ func TestRunSessionCancellation(t *testing.T) { session := &Session{ id: v.id, connPtr: v.connPtr, - messages: v.messages, + poolInbox: v.poolInbox, heartbeat: v.heartbeat, dial: v.dial, keepalive: v.keepalive, @@ -379,7 +379,7 @@ func TestRunSessionCancellation(t *testing.T) { session := &Session{ id: v.id, connPtr: v.connPtr, - messages: v.messages, + poolInbox: v.poolInbox, heartbeat: v.heartbeat, dial: v.dial, keepalive: v.keepalive, @@ -415,7 +415,7 @@ func TestRunSessionCancellation(t *testing.T) { session := &Session{ id: v.id, connPtr: v.connPtr, - messages: v.messages, + poolInbox: v.poolInbox, heartbeat: v.heartbeat, dial: v.dial, keepalive: v.keepalive, diff --git a/outbound/worker_start_test.go b/outbound/worker_start_test.go index 5f4fff3..2f79cab 100644 --- a/outbound/worker_start_test.go +++ b/outbound/worker_start_test.go @@ -40,8 +40,6 @@ func makeWorker(t *testing.T, ctx context.Context, cancel context.CancelFunc) *D id: "wss://test", config: config, heartbeat: make(chan struct{}), - toQueue: make(chan types.ReceivedMessage, 256), - toForwarder: make(chan types.ReceivedMessage, 256), processedCount: &atomic.Uint64{}, droppedCount: &atomic.Uint64{}, outgoingCount: &atomic.Uint64{},