added queue optimizations and depth monitoring.

This commit is contained in:
Jay
2026-05-08 14:47:28 -04:00
parent c87a8cce6f
commit 56c2539249
5 changed files with 21 additions and 9 deletions
+5 -1
View File
@@ -32,6 +32,7 @@ type WorkerStats struct {
ChanIncoming int ChanIncoming int
ChanQueue int ChanQueue int
ChanForwarder int ChanForwarder int
BufferDepth int64
TotalProcessed uint64 TotalProcessed uint64
TotalDropped uint64 TotalDropped uint64
@@ -49,6 +50,7 @@ type DefaultWorker struct {
processedCount *atomic.Uint64 processedCount *atomic.Uint64
droppedCount *atomic.Uint64 droppedCount *atomic.Uint64
outgoingCount *atomic.Uint64 outgoingCount *atomic.Uint64
bufferDepth *atomic.Int64
ctx context.Context ctx context.Context
cancel context.CancelFunc cancel context.CancelFunc
@@ -80,6 +82,7 @@ func NewWorker(
processedCount: &atomic.Uint64{}, processedCount: &atomic.Uint64{},
droppedCount: &atomic.Uint64{}, droppedCount: &atomic.Uint64{},
outgoingCount: &atomic.Uint64{}, outgoingCount: &atomic.Uint64{},
bufferDepth: &atomic.Int64{},
config: config, config: config,
ctx: wctx, ctx: wctx,
cancel: cancel, cancel: cancel,
@@ -107,7 +110,7 @@ func (w *DefaultWorker) Start(pool PoolPlugin) {
go func() { go func() {
defer wg.Done() defer wg.Done()
queue.RunQueue(w.id, w.ctx, w.toQueue, w.toForwarder, w.config.MaxQueueSize, w.droppedCount) queue.RunQueue(w.id, w.ctx, w.toQueue, w.toForwarder, w.config.MaxQueueSize, w.droppedCount, w.bufferDepth)
}() }()
go func() { go func() {
@@ -158,6 +161,7 @@ func (w *DefaultWorker) Stats() WorkerStats {
ChanIncoming: len(w.conn.Incoming()), ChanIncoming: len(w.conn.Incoming()),
ChanQueue: len(w.toQueue), ChanQueue: len(w.toQueue),
ChanForwarder: len(w.toForwarder), ChanForwarder: len(w.toForwarder),
BufferDepth: w.bufferDepth.Load(),
TotalProcessed: w.processedCount.Load(), TotalProcessed: w.processedCount.Load(),
TotalDropped: w.droppedCount.Load(), TotalDropped: w.droppedCount.Load(),
+5 -1
View File
@@ -26,6 +26,7 @@ type WorkerStats struct {
ChanIncoming int ChanIncoming int
ChanQueue int ChanQueue int
ChanForwarder int ChanForwarder int
BufferDepth int64
ConnectionAvailable bool ConnectionAvailable bool
Connection transport.ConnectionStats Connection transport.ConnectionStats
@@ -48,6 +49,7 @@ type DefaultWorker struct {
droppedCount *atomic.Uint64 droppedCount *atomic.Uint64
outgoingCount *atomic.Uint64 outgoingCount *atomic.Uint64
restartCount *atomic.Uint64 restartCount *atomic.Uint64
bufferDepth *atomic.Int64
config *WorkerConfig config *WorkerConfig
ctx context.Context ctx context.Context
@@ -79,6 +81,7 @@ func NewWorker(
droppedCount: &atomic.Uint64{}, droppedCount: &atomic.Uint64{},
outgoingCount: &atomic.Uint64{}, outgoingCount: &atomic.Uint64{},
restartCount: &atomic.Uint64{}, restartCount: &atomic.Uint64{},
bufferDepth: &atomic.Int64{},
ctx: wctx, ctx: wctx,
cancel: wcancel, cancel: wcancel,
logger: logger, logger: logger,
@@ -111,7 +114,7 @@ func (w *DefaultWorker) Start(pool PoolPlugin) {
go func() { go func() {
defer wg.Done() defer wg.Done()
queue.RunQueue(w.id, w.ctx, w.toQueue, w.toForwarder, w.config.MaxQueueSize, w.droppedCount) queue.RunQueue(w.id, w.ctx, w.toQueue, w.toForwarder, w.config.MaxQueueSize, w.droppedCount, w.bufferDepth)
}() }()
go func() { go func() {
@@ -192,6 +195,7 @@ func (w *DefaultWorker) Stats() WorkerStats {
ChanIncoming: incomingLen, ChanIncoming: incomingLen,
ChanQueue: len(w.toQueue), ChanQueue: len(w.toQueue),
ChanForwarder: len(w.toForwarder), ChanForwarder: len(w.toForwarder),
BufferDepth: w.bufferDepth.Load(),
ConnectionAvailable: connectionAvailable, ConnectionAvailable: connectionAvailable,
Connection: connStats, Connection: connStats,
+1
View File
@@ -51,6 +51,7 @@ func makeWorker(t *testing.T, ctx context.Context, cancel context.CancelFunc) *D
droppedCount: &atomic.Uint64{}, droppedCount: &atomic.Uint64{},
outgoingCount: &atomic.Uint64{}, outgoingCount: &atomic.Uint64{},
restartCount: &atomic.Uint64{}, restartCount: &atomic.Uint64{},
bufferDepth: &atomic.Int64{},
} }
} }
+7 -4
View File
@@ -13,13 +13,14 @@ func RunQueue(
out chan<- types.ReceivedMessage, out chan<- types.ReceivedMessage,
maxQueueSize int, maxQueueSize int,
droppedCount *atomic.Uint64, droppedCount *atomic.Uint64,
bufferDepth *atomic.Int64,
) { ) {
var next types.ReceivedMessage var next types.ReceivedMessage
var queue messageQueue var queue messageQueue
if maxQueueSize > 0 { if maxQueueSize > 0 {
queue = newBoundedRing(maxQueueSize) queue = newBoundedRing(maxQueueSize)
} else { } else {
queue = newUnboundedRing(64) queue = newUnboundedRing(1024)
} }
for { for {
@@ -40,12 +41,15 @@ func RunQueue(
// drop oldest message // drop oldest message
_ = queue.pop() _ = queue.pop()
droppedCount.Add(1) droppedCount.Add(1)
bufferDepth.Add(-1)
} }
// add new message // add new message
queue.push(msg) queue.push(msg)
bufferDepth.Add(1)
// send next message to out channel // send next message to out channel
case outOrNil <- next: case outOrNil <- next:
_ = queue.pop() _ = queue.pop()
bufferDepth.Add(-1)
} }
} }
} }
@@ -118,9 +122,8 @@ func newUnboundedRing(initialCap int) *unboundedRing {
func (u *unboundedRing) push(m types.ReceivedMessage) { func (u *unboundedRing) push(m types.ReceivedMessage) {
if u.size == len(u.buf) { if u.size == len(u.buf) {
bigger := make([]types.ReceivedMessage, len(u.buf)*2) bigger := make([]types.ReceivedMessage, len(u.buf)*2)
for i := 0; i < u.size; i++ { n := copy(bigger, u.buf[u.head:])
bigger[i] = u.buf[(u.head+i)%len(u.buf)] copy(bigger[n:], u.buf[:u.head])
}
u.buf = bigger u.buf = bigger
u.head = 0 u.head = 0
} }
+3 -3
View File
@@ -18,7 +18,7 @@ func TestRunQueue(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
defer cancel() defer cancel()
go RunQueue(id, ctx, inChan, outChan, 0, &atomic.Uint64{}) go RunQueue(id, ctx, inChan, outChan, 0, &atomic.Uint64{}, &atomic.Int64{})
inChan <- types.ReceivedMessage{Data: []byte("hello"), ReceivedAt: time.Now()} inChan <- types.ReceivedMessage{Data: []byte("hello"), ReceivedAt: time.Now()}
@@ -50,7 +50,7 @@ func TestRunQueue(t *testing.T) {
} }
}() }()
go RunQueue(id, ctx, inChan, gatedInbox, 2, &atomic.Uint64{}) go RunQueue(id, ctx, inChan, gatedInbox, 2, &atomic.Uint64{}, &atomic.Int64{})
// send three messages while the gated inbox is blocked // send three messages while the gated inbox is blocked
inChan <- types.ReceivedMessage{Data: []byte("first"), ReceivedAt: time.Now()} inChan <- types.ReceivedMessage{Data: []byte("first"), ReceivedAt: time.Now()}
@@ -88,7 +88,7 @@ func TestRunQueue(t *testing.T) {
done := make(chan struct{}) done := make(chan struct{})
go func() { go func() {
RunQueue(id, ctx, inChan, outChan, 0, &atomic.Uint64{}) RunQueue(id, ctx, inChan, outChan, 0, &atomic.Uint64{}, &atomic.Int64{})
close(done) close(done)
}() }()