From 56c2539249bc93ca1b23722f42b73bcf5f0d79f8 Mon Sep 17 00:00:00 2001 From: Jay Date: Fri, 8 May 2026 14:47:28 -0400 Subject: [PATCH] added queue optimizations and depth monitoring. --- inbound/worker.go | 6 +++++- outbound/worker.go | 6 +++++- outbound/worker_start_test.go | 1 + queue/queue.go | 11 +++++++---- queue/queue_test.go | 6 +++--- 5 files changed, 21 insertions(+), 9 deletions(-) diff --git a/inbound/worker.go b/inbound/worker.go index 40eaa76..ec7f73d 100644 --- a/inbound/worker.go +++ b/inbound/worker.go @@ -32,6 +32,7 @@ type WorkerStats struct { ChanIncoming int ChanQueue int ChanForwarder int + BufferDepth int64 TotalProcessed uint64 TotalDropped uint64 @@ -49,6 +50,7 @@ type DefaultWorker struct { processedCount *atomic.Uint64 droppedCount *atomic.Uint64 outgoingCount *atomic.Uint64 + bufferDepth *atomic.Int64 ctx context.Context cancel context.CancelFunc @@ -80,6 +82,7 @@ func NewWorker( processedCount: &atomic.Uint64{}, droppedCount: &atomic.Uint64{}, outgoingCount: &atomic.Uint64{}, + bufferDepth: &atomic.Int64{}, config: config, ctx: wctx, cancel: cancel, @@ -107,7 +110,7 @@ func (w *DefaultWorker) Start(pool PoolPlugin) { go func() { defer wg.Done() - queue.RunQueue(w.id, w.ctx, w.toQueue, w.toForwarder, w.config.MaxQueueSize, w.droppedCount) + queue.RunQueue(w.id, w.ctx, w.toQueue, w.toForwarder, w.config.MaxQueueSize, w.droppedCount, w.bufferDepth) }() go func() { @@ -158,6 +161,7 @@ func (w *DefaultWorker) Stats() WorkerStats { ChanIncoming: len(w.conn.Incoming()), ChanQueue: len(w.toQueue), ChanForwarder: len(w.toForwarder), + BufferDepth: w.bufferDepth.Load(), TotalProcessed: w.processedCount.Load(), TotalDropped: w.droppedCount.Load(), diff --git a/outbound/worker.go b/outbound/worker.go index 492b42f..06fb092 100644 --- a/outbound/worker.go +++ b/outbound/worker.go @@ -26,6 +26,7 @@ type WorkerStats struct { ChanIncoming int ChanQueue int ChanForwarder int + BufferDepth int64 ConnectionAvailable bool Connection transport.ConnectionStats @@ -48,6 +49,7 @@ type DefaultWorker struct { droppedCount *atomic.Uint64 outgoingCount *atomic.Uint64 restartCount *atomic.Uint64 + bufferDepth *atomic.Int64 config *WorkerConfig ctx context.Context @@ -79,6 +81,7 @@ func NewWorker( droppedCount: &atomic.Uint64{}, outgoingCount: &atomic.Uint64{}, restartCount: &atomic.Uint64{}, + bufferDepth: &atomic.Int64{}, ctx: wctx, cancel: wcancel, logger: logger, @@ -111,7 +114,7 @@ func (w *DefaultWorker) Start(pool PoolPlugin) { go func() { defer wg.Done() - queue.RunQueue(w.id, w.ctx, w.toQueue, w.toForwarder, w.config.MaxQueueSize, w.droppedCount) + queue.RunQueue(w.id, w.ctx, w.toQueue, w.toForwarder, w.config.MaxQueueSize, w.droppedCount, w.bufferDepth) }() go func() { @@ -192,6 +195,7 @@ func (w *DefaultWorker) Stats() WorkerStats { ChanIncoming: incomingLen, ChanQueue: len(w.toQueue), ChanForwarder: len(w.toForwarder), + BufferDepth: w.bufferDepth.Load(), ConnectionAvailable: connectionAvailable, Connection: connStats, diff --git a/outbound/worker_start_test.go b/outbound/worker_start_test.go index 6a67d35..cff479a 100644 --- a/outbound/worker_start_test.go +++ b/outbound/worker_start_test.go @@ -51,6 +51,7 @@ func makeWorker(t *testing.T, ctx context.Context, cancel context.CancelFunc) *D droppedCount: &atomic.Uint64{}, outgoingCount: &atomic.Uint64{}, restartCount: &atomic.Uint64{}, + bufferDepth: &atomic.Int64{}, } } diff --git a/queue/queue.go b/queue/queue.go index 022a9a1..43b4b0f 100644 --- a/queue/queue.go +++ b/queue/queue.go @@ -13,13 +13,14 @@ func RunQueue( out chan<- types.ReceivedMessage, maxQueueSize int, droppedCount *atomic.Uint64, + bufferDepth *atomic.Int64, ) { var next types.ReceivedMessage var queue messageQueue if maxQueueSize > 0 { queue = newBoundedRing(maxQueueSize) } else { - queue = newUnboundedRing(64) + queue = newUnboundedRing(1024) } for { @@ -40,12 +41,15 @@ func RunQueue( // drop oldest message _ = queue.pop() droppedCount.Add(1) + bufferDepth.Add(-1) } // add new message queue.push(msg) + bufferDepth.Add(1) // send next message to out channel case outOrNil <- next: _ = queue.pop() + bufferDepth.Add(-1) } } } @@ -118,9 +122,8 @@ func newUnboundedRing(initialCap int) *unboundedRing { func (u *unboundedRing) push(m types.ReceivedMessage) { if u.size == len(u.buf) { bigger := make([]types.ReceivedMessage, len(u.buf)*2) - for i := 0; i < u.size; i++ { - bigger[i] = u.buf[(u.head+i)%len(u.buf)] - } + n := copy(bigger, u.buf[u.head:]) + copy(bigger[n:], u.buf[:u.head]) u.buf = bigger u.head = 0 } diff --git a/queue/queue_test.go b/queue/queue_test.go index efb5ab5..5711ebe 100644 --- a/queue/queue_test.go +++ b/queue/queue_test.go @@ -18,7 +18,7 @@ func TestRunQueue(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go RunQueue(id, ctx, inChan, outChan, 0, &atomic.Uint64{}) + go RunQueue(id, ctx, inChan, outChan, 0, &atomic.Uint64{}, &atomic.Int64{}) inChan <- types.ReceivedMessage{Data: []byte("hello"), ReceivedAt: time.Now()} @@ -50,7 +50,7 @@ func TestRunQueue(t *testing.T) { } }() - go RunQueue(id, ctx, inChan, gatedInbox, 2, &atomic.Uint64{}) + go RunQueue(id, ctx, inChan, gatedInbox, 2, &atomic.Uint64{}, &atomic.Int64{}) // send three messages while the gated inbox is blocked inChan <- types.ReceivedMessage{Data: []byte("first"), ReceivedAt: time.Now()} @@ -88,7 +88,7 @@ func TestRunQueue(t *testing.T) { done := make(chan struct{}) go func() { - RunQueue(id, ctx, inChan, outChan, 0, &atomic.Uint64{}) + RunQueue(id, ctx, inChan, outChan, 0, &atomic.Uint64{}, &atomic.Int64{}) close(done) }()