From 6a3ba05fd55bca493329c877dea1f74c49b2a562 Mon Sep 17 00:00:00 2001 From: Jay Date: Fri, 24 Apr 2026 13:48:52 -0400 Subject: [PATCH] introduce statistics collection --- honeybee.go | 5 ++ inbound/pool.go | 117 ++++++++++++++++++++++++------ inbound/worker.go | 80 +++++++++++++++----- inbound/worker_forwarder_test.go | 3 +- inbound/worker_test.go | 1 + outbound/pool.go | 96 +++++++++++++++++++++--- outbound/worker.go | 102 +++++++++++++++++++++----- outbound/worker_forwarder_test.go | 3 +- outbound/worker_send_test.go | 18 +++-- outbound/worker_session_test.go | 60 ++++++++------- outbound/worker_start_test.go | 24 ++++-- queue/queue.go | 3 + queue/queue_test.go | 7 +- transport/connection.go | 74 ++++++++++++++----- 14 files changed, 453 insertions(+), 140 deletions(-) diff --git a/honeybee.go b/honeybee.go index 9073d03..94ce177 100644 --- a/honeybee.go +++ b/honeybee.go @@ -22,6 +22,7 @@ type Connection = transport.Connection type ConnectionConfig = transport.ConnectionConfig type RetryConfig = transport.RetryConfig type ConnectionOption = transport.ConnectionOption +type ConnectionStats = transport.ConnectionStats // Outbound Pool types @@ -33,6 +34,8 @@ type OutboundWorkerOption = outbound.WorkerOption type OutboundInboxMessage = outbound.InboxMessage type OutboundPoolEvent = outbound.PoolEvent type OutboundPoolEventKind = outbound.PoolEventKind +type OutboundPoolStats = outbound.PoolStats +type OutboundWorkerStats = outbound.WorkerStats // Pool event constants @@ -54,6 +57,8 @@ type InboundWorkerExitKind = inbound.WorkerExitKind type InboundInboxMessage = inbound.InboxMessage type InboundPoolEvent = inbound.PoolEvent type InboundPoolEventKind = inbound.PoolEventKind +type InboundPoolStats = inbound.PoolStats +type InboundWorkerStats = inbound.WorkerStats // Inbound Pool event constants diff --git a/inbound/pool.go b/inbound/pool.go index a87d39d..5f3ecf7 100644 --- a/inbound/pool.go +++ b/inbound/pool.go @@ -8,6 +8,7 @@ import ( "git.wisehodl.dev/jay/go-honeybee/types" "log/slog" "sync" + "sync/atomic" "time" ) @@ -36,6 +37,24 @@ type PoolEvent struct { Kind PoolEventKind } +type PoolStats struct { + ChanInbox int + ChanEvents int + ChanErrors int + + TotalReceived uint64 + TotalSent uint64 + + PeerCount int + PeerStats []PeerStats +} + +type PeerStats struct { + ID string + Worker WorkerStats + Connection transport.ConnectionStats +} + type InboxMessage struct { ID string Data []byte @@ -43,11 +62,12 @@ type InboxMessage struct { } type PoolPlugin struct { - Inbox chan<- InboxMessage - Events chan<- PoolEvent - Errors chan<- error - OnExit OnExitFunction - Handler slog.Handler + Inbox chan<- InboxMessage + Events chan<- PoolEvent + Errors chan<- error + InboxCounter *atomic.Uint64 + OnExit OnExitFunction + Handler slog.Handler } // Pool @@ -70,6 +90,9 @@ type Pool struct { events chan PoolEvent errors chan error + inboxCounter *atomic.Uint64 + outgoingCount *atomic.Uint64 + config *PoolConfig handler slog.Handler logger *slog.Logger @@ -116,16 +139,18 @@ func NewPool(ctx context.Context, id string, config *PoolConfig, handler slog.Ha } return &Pool{ - ctx: pctx, - cancel: cancel, - id: id, - peers: make(map[string]*Peer), - inbox: make(chan InboxMessage, config.InboxBufferSize), - events: make(chan PoolEvent, config.EventsBufferSize), - errors: make(chan error, config.ErrorsBufferSize), - config: config, - handler: handler, - logger: logger, + ctx: pctx, + cancel: cancel, + id: id, + peers: make(map[string]*Peer), + inbox: make(chan InboxMessage, config.InboxBufferSize), + events: make(chan PoolEvent, config.EventsBufferSize), + errors: make(chan error, config.ErrorsBufferSize), + inboxCounter: &atomic.Uint64{}, + outgoingCount: &atomic.Uint64{}, + config: config, + handler: handler, + logger: logger, }, nil } @@ -153,6 +178,49 @@ func (p *Pool) Errors() <-chan error { return p.errors } +func (p *Pool) Stats() PoolStats { + p.mu.RLock() + defer p.mu.RUnlock() + + count := len(p.peers) + peerStats := make([]PeerStats, 0, count) + for id, peer := range p.peers { + peerStats = append(peerStats, PeerStats{ + ID: id, + Worker: peer.worker.Stats(), + Connection: peer.conn.Stats(), + }) + } + + return PoolStats{ + ChanInbox: len(p.inbox), + ChanEvents: len(p.events), + ChanErrors: len(p.errors), + + TotalReceived: p.inboxCounter.Load(), + TotalSent: p.outgoingCount.Load(), + + PeerCount: len(p.peers), + PeerStats: peerStats, + } +} + +func (p *Pool) PeerStats(id string) (PeerStats, error) { + p.mu.RLock() + defer p.mu.RUnlock() + + peer, exists := p.peers[id] + if !exists { + return PeerStats{}, ErrPeerNotFound + } + + return PeerStats{ + ID: id, + Worker: peer.worker.Stats(), + Connection: peer.conn.Stats(), + }, nil +} + func (p *Pool) Close() { if p.logger != nil { p.logger.Debug("closing") @@ -266,7 +334,13 @@ func (p *Pool) Send(id string, data []byte) error { return ErrPeerNotFound } - return peer.worker.Send(data) + err := peer.worker.Send(data) + if err != nil { + return err + } + + p.outgoingCount.Add(1) + return nil } // addLocked constructs and registers a peer. Caller must hold p.mu write lock. @@ -315,11 +389,12 @@ func (p *Pool) addLocked(id string, socket types.Socket) error { } pool := PoolPlugin{ - Inbox: p.inbox, - Events: p.events, - Errors: p.errors, - OnExit: onExit, - Handler: p.handler, + Inbox: p.inbox, + Events: p.events, + Errors: p.errors, + InboxCounter: p.inboxCounter, + OnExit: onExit, + Handler: p.handler, } peer := &Peer{ diff --git a/inbound/worker.go b/inbound/worker.go index dc52465..9e521d1 100644 --- a/inbound/worker.go +++ b/inbound/worker.go @@ -8,6 +8,7 @@ import ( "git.wisehodl.dev/jay/go-honeybee/types" "log/slog" "sync" + "sync/atomic" "time" ) @@ -15,6 +16,7 @@ type Worker interface { Start(pool PoolPlugin) Stop() Send(data []byte) error + Stats() WorkerStats } type WorkerExitKind string @@ -26,14 +28,32 @@ const ( ExitPolicy WorkerExitKind = "policy" ) +type WorkerStats struct { + ChanIncoming int + ChanQueue int + ChanForwarder int + + TotalProcessed uint64 + TotalDropped uint64 + TotalSent uint64 +} + type DefaultWorker struct { - id string - conn *transport.Connection - heartbeat chan struct{} - config *WorkerConfig - ctx context.Context - cancel context.CancelFunc - logger *slog.Logger + id string + conn *transport.Connection + + heartbeat chan struct{} + toQueue chan types.ReceivedMessage + toForwarder chan types.ReceivedMessage + + processedCount *atomic.Uint64 + droppedCount *atomic.Uint64 + outgoingCount *atomic.Uint64 + + ctx context.Context + cancel context.CancelFunc + config *WorkerConfig + logger *slog.Logger } func NewWorker( @@ -52,13 +72,18 @@ func NewWorker( wctx, cancel := context.WithCancel(ctx) return &DefaultWorker{ - id: id, - conn: conn, - heartbeat: make(chan struct{}), - config: config, - ctx: wctx, - cancel: cancel, - logger: logger, + id: id, + conn: conn, + heartbeat: make(chan struct{}), + toQueue: make(chan types.ReceivedMessage, 256), + toForwarder: make(chan types.ReceivedMessage, 256), + processedCount: &atomic.Uint64{}, + droppedCount: &atomic.Uint64{}, + outgoingCount: &atomic.Uint64{}, + config: config, + ctx: wctx, + cancel: cancel, + logger: logger, }, nil } @@ -67,15 +92,12 @@ func (w *DefaultWorker) Start(pool PoolPlugin) { w.logger.Debug("starting") } - toQueue := make(chan types.ReceivedMessage, 256) - toForwarder := make(chan types.ReceivedMessage, 256) - var wg sync.WaitGroup wg.Add(5) go func() { defer wg.Done() - RunReader(w.ctx, pool.OnExit, w.conn, toQueue, w.heartbeat, w.logger) + RunReader(w.ctx, pool.OnExit, w.conn, w.toQueue, w.heartbeat, w.logger) }() go func() { @@ -85,12 +107,12 @@ func (w *DefaultWorker) Start(pool PoolPlugin) { go func() { defer wg.Done() - queue.RunQueue(w.id, w.ctx, toQueue, toForwarder, w.config.MaxQueueSize) + queue.RunQueue(w.id, w.ctx, w.toQueue, w.toForwarder, w.config.MaxQueueSize, w.droppedCount) }() go func() { defer wg.Done() - RunForwarder(w.id, w.ctx, toForwarder, pool.Inbox) + RunForwarder(w.id, w.ctx, w.toForwarder, pool.Inbox, w.processedCount, pool.InboxCounter) }() go func() { @@ -126,9 +148,23 @@ func (w *DefaultWorker) Send(data []byte) error { case <-w.ctx.Done(): } + w.outgoingCount.Add(1) + return nil } +func (w *DefaultWorker) Stats() WorkerStats { + return WorkerStats{ + ChanIncoming: len(w.conn.Incoming()), + ChanQueue: len(w.toQueue), + ChanForwarder: len(w.toForwarder), + + TotalProcessed: w.processedCount.Load(), + TotalDropped: w.droppedCount.Load(), + TotalSent: w.outgoingCount.Load(), + } +} + func RunReader( ctx context.Context, onPeerClose OnExitFunction, @@ -217,6 +253,8 @@ func RunForwarder( ctx context.Context, messages <-chan types.ReceivedMessage, inbox chan<- InboxMessage, + workerProcessedCount *atomic.Uint64, + poolInboxCount *atomic.Uint64, ) { for { select { @@ -235,6 +273,8 @@ func RunForwarder( Data: msg.Data, ReceivedAt: msg.ReceivedAt, }: + workerProcessedCount.Add(1) + poolInboxCount.Add(1) } } } diff --git a/inbound/worker_forwarder_test.go b/inbound/worker_forwarder_test.go index ecec080..f89157c 100644 --- a/inbound/worker_forwarder_test.go +++ b/inbound/worker_forwarder_test.go @@ -4,6 +4,7 @@ import ( "context" "git.wisehodl.dev/jay/go-honeybee/honeybeetest" "git.wisehodl.dev/jay/go-honeybee/types" + "sync/atomic" "testing" "time" ) @@ -16,7 +17,7 @@ func TestRunForwarder(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go RunForwarder(id, ctx, messages, inbox) + go RunForwarder(id, ctx, messages, inbox, &atomic.Uint64{}, &atomic.Uint64{}) messages <- types.ReceivedMessage{Data: []byte("hello"), ReceivedAt: time.Now()} diff --git a/inbound/worker_test.go b/inbound/worker_test.go index 8b3aef5..2a9dff1 100644 --- a/inbound/worker_test.go +++ b/inbound/worker_test.go @@ -47,6 +47,7 @@ func setupWorkerTest(t *testing.T) workerTestVars { OnExit: func(kind WorkerExitKind) { once.Do(func() { exitKind.Store(kind) }) }, + InboxCounter: &atomic.Uint64{}, } wg := &sync.WaitGroup{} diff --git a/outbound/pool.go b/outbound/pool.go index bd8eee6..3777b0e 100644 --- a/outbound/pool.go +++ b/outbound/pool.go @@ -7,6 +7,7 @@ import ( "git.wisehodl.dev/jay/go-honeybee/types" "log/slog" "sync" + "sync/atomic" "time" ) @@ -24,6 +25,23 @@ type PoolEvent struct { Kind PoolEventKind } +type PoolStats struct { + ChanInbox int + ChanEvents int + ChanErrors int + + TotalReceived uint64 + TotalSent uint64 + + PeerCount int + PeerStats []PeerStats +} + +type PeerStats struct { + ID string + Worker WorkerStats +} + type InboxMessage struct { ID string Data []byte @@ -35,6 +53,7 @@ type PoolPlugin struct { Inbox chan<- InboxMessage Events chan<- PoolEvent Errors chan<- error + InboxCounter *atomic.Uint64 Dialer types.Dialer ConnectionConfig *transport.ConnectionConfig Handler slog.Handler @@ -58,6 +77,9 @@ type Pool struct { events chan PoolEvent errors chan error + inboxCounter *atomic.Uint64 + outgoingCount *atomic.Uint64 + dialer types.Dialer config *PoolConfig handler slog.Handler @@ -101,17 +123,19 @@ func NewPool(ctx context.Context, id string, config *PoolConfig, handler slog.Ha } return &Pool{ - ctx: pctx, - cancel: cancel, - id: id, - peers: make(map[string]*Peer), - inbox: make(chan InboxMessage, config.InboxBufferSize), - events: make(chan PoolEvent, config.EventsBufferSize), - errors: make(chan error, config.ErrorsBufferSize), - dialer: transport.NewDialer(), - config: config, - handler: handler, - logger: logger, + ctx: pctx, + cancel: cancel, + id: id, + peers: make(map[string]*Peer), + inbox: make(chan InboxMessage, config.InboxBufferSize), + events: make(chan PoolEvent, config.EventsBufferSize), + errors: make(chan error, config.ErrorsBufferSize), + inboxCounter: &atomic.Uint64{}, + outgoingCount: &atomic.Uint64{}, + dialer: transport.NewDialer(), + config: config, + handler: handler, + logger: logger, }, nil } @@ -138,6 +162,47 @@ func (p *Pool) Errors() <-chan error { return p.errors } +func (p *Pool) Stats() PoolStats { + p.mu.RLock() + defer p.mu.RUnlock() + + count := len(p.peers) + peerStats := make([]PeerStats, 0, count) + for id, peer := range p.peers { + peerStats = append(peerStats, PeerStats{ + ID: id, + Worker: peer.worker.Stats(), + }) + } + + return PoolStats{ + ChanInbox: len(p.inbox), + ChanEvents: len(p.events), + ChanErrors: len(p.errors), + + TotalReceived: p.inboxCounter.Load(), + TotalSent: p.outgoingCount.Load(), + + PeerCount: len(p.peers), + PeerStats: peerStats, + } +} + +func (p *Pool) PeerStats(id string) (PeerStats, error) { + p.mu.RLock() + defer p.mu.RUnlock() + + peer, exists := p.peers[id] + if !exists { + return PeerStats{}, ErrPeerNotFound + } + + return PeerStats{ + ID: id, + Worker: peer.worker.Stats(), + }, nil +} + func (p *Pool) SetDialer(d types.Dialer) { if d == nil { panic("dialer cannot be nil") @@ -214,6 +279,7 @@ func (p *Pool) Connect(id string) error { Inbox: p.inbox, Events: p.events, Errors: p.errors, + InboxCounter: p.inboxCounter, Dialer: p.dialer, ConnectionConfig: p.config.ConnectionConfig, Handler: p.handler, @@ -284,5 +350,11 @@ func (p *Pool) Send(id string, data []byte) error { return NewPoolError(ErrPeerNotFound) } - return peer.worker.Send(data) + err = peer.worker.Send(data) + if err != nil { + return err + } + + p.outgoingCount.Add(1) + return nil } diff --git a/outbound/worker.go b/outbound/worker.go index 8566a10..a3bc613 100644 --- a/outbound/worker.go +++ b/outbound/worker.go @@ -18,16 +18,41 @@ type Worker interface { Start(pool PoolPlugin) Stop() Send(data []byte) error + Stats() WorkerStats +} + +type WorkerStats struct { + IncomingAvailable bool + ChanIncoming int + ChanQueue int + ChanForwarder int + + ConnectionAvailable bool + Connection transport.ConnectionStats + + TotalProcessed uint64 + TotalDropped uint64 + TotalSent uint64 + TotalRestarts uint64 } type DefaultWorker struct { - id string - conn atomic.Pointer[transport.Connection] - heartbeat chan struct{} - config *WorkerConfig - ctx context.Context - cancel context.CancelFunc - logger *slog.Logger + id string + conn atomic.Pointer[transport.Connection] + + heartbeat chan struct{} + toQueue chan types.ReceivedMessage + toForwarder chan types.ReceivedMessage + + processedCount *atomic.Uint64 + droppedCount *atomic.Uint64 + outgoingCount *atomic.Uint64 + restartCount *atomic.Uint64 + + config *WorkerConfig + ctx context.Context + cancel context.CancelFunc + logger *slog.Logger } func NewWorker( @@ -45,12 +70,18 @@ func NewWorker( wctx, wcancel := context.WithCancel(ctx) w := &DefaultWorker{ - id: id, - config: config, - heartbeat: make(chan struct{}), - ctx: wctx, - cancel: wcancel, - logger: logger, + id: id, + config: config, + heartbeat: make(chan struct{}), + toQueue: make(chan types.ReceivedMessage, 256), + toForwarder: make(chan types.ReceivedMessage, 256), + processedCount: &atomic.Uint64{}, + droppedCount: &atomic.Uint64{}, + outgoingCount: &atomic.Uint64{}, + restartCount: &atomic.Uint64{}, + ctx: wctx, + cancel: wcancel, + logger: logger, } return w, nil @@ -63,8 +94,6 @@ func (w *DefaultWorker) Start(pool PoolPlugin) { dial := make(chan struct{}, 1) newConn := make(chan *transport.Connection, 1) - toQueue := make(chan types.ReceivedMessage, 256) - toForwarder := make(chan types.ReceivedMessage, 256) keepalive := make(chan struct{}, 1) var wg sync.WaitGroup @@ -82,12 +111,12 @@ func (w *DefaultWorker) Start(pool PoolPlugin) { go func() { defer wg.Done() - queue.RunQueue(w.id, w.ctx, toQueue, toForwarder, w.config.MaxQueueSize) + queue.RunQueue(w.id, w.ctx, w.toQueue, w.toForwarder, w.config.MaxQueueSize, w.droppedCount) }() go func() { defer wg.Done() - RunForwarder(w.id, w.ctx, toForwarder, pool.Inbox) + RunForwarder(w.id, w.ctx, w.toForwarder, pool.Inbox, w.processedCount, pool.InboxCounter) }() go func() { @@ -95,12 +124,13 @@ func (w *DefaultWorker) Start(pool PoolPlugin) { session := &Session{ id: w.id, connPtr: &w.conn, - messages: toQueue, + messages: w.toQueue, heartbeat: w.heartbeat, dial: dial, keepalive: keepalive, newConn: newConn, reconnectDelay: w.config.ReconnectDelay, + restartCount: w.restartCount, logger: w.logger, } session.Start(w.ctx, pool) @@ -140,9 +170,39 @@ func (w *DefaultWorker) Send(data []byte) error { case <-w.ctx.Done(): } + w.outgoingCount.Add(1) + return nil } +func (w *DefaultWorker) Stats() WorkerStats { + connectionAvailable := false + incomingLen := 0 + connStats := transport.ConnectionStats{} + + conn := w.conn.Load() + if conn != nil { + connectionAvailable = true + incomingLen = len(conn.Incoming()) + connStats = conn.Stats() + } + + return WorkerStats{ + IncomingAvailable: connectionAvailable, + ChanIncoming: incomingLen, + ChanQueue: len(w.toQueue), + ChanForwarder: len(w.toForwarder), + + ConnectionAvailable: connectionAvailable, + Connection: connStats, + + TotalProcessed: w.processedCount.Load(), + TotalDropped: w.droppedCount.Load(), + TotalRestarts: w.restartCount.Load(), + TotalSent: w.outgoingCount.Load(), + } +} + type Session struct { id string connPtr *atomic.Pointer[transport.Connection] @@ -155,6 +215,7 @@ type Session struct { newConn <-chan *transport.Connection reconnectDelay time.Duration + restartCount *atomic.Uint64 logger *slog.Logger } @@ -246,6 +307,7 @@ func (s *Session) Start( // refresh session time.Sleep(s.reconnectDelay) + s.restartCount.Add(1) } } @@ -346,6 +408,8 @@ func RunForwarder( ctx context.Context, messages <-chan types.ReceivedMessage, inbox chan<- InboxMessage, + workerProcessedCount *atomic.Uint64, + poolInboxCount *atomic.Uint64, ) { for { select { @@ -364,6 +428,8 @@ func RunForwarder( Data: msg.Data, ReceivedAt: msg.ReceivedAt, }: + workerProcessedCount.Add(1) + poolInboxCount.Add(1) } } } diff --git a/outbound/worker_forwarder_test.go b/outbound/worker_forwarder_test.go index b34ccdf..60a4a72 100644 --- a/outbound/worker_forwarder_test.go +++ b/outbound/worker_forwarder_test.go @@ -4,6 +4,7 @@ import ( "context" "git.wisehodl.dev/jay/go-honeybee/honeybeetest" "git.wisehodl.dev/jay/go-honeybee/types" + "sync/atomic" "testing" "time" ) @@ -16,7 +17,7 @@ func TestRunForwarder(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go RunForwarder(id, ctx, messages, inbox) + go RunForwarder(id, ctx, messages, inbox, &atomic.Uint64{}, &atomic.Uint64{}) messages <- types.ReceivedMessage{Data: []byte("hello"), ReceivedAt: time.Now()} diff --git a/outbound/worker_send_test.go b/outbound/worker_send_test.go index 5058b0b..6e06ccb 100644 --- a/outbound/worker_send_test.go +++ b/outbound/worker_send_test.go @@ -20,10 +20,11 @@ func TestWorkerSend(t *testing.T) { heartbeatCount := atomic.Int32{} w := &DefaultWorker{ - ctx: ctx, - cancel: cancel, - id: "wss://test", - heartbeat: heartbeat, + ctx: ctx, + cancel: cancel, + id: "wss://test", + heartbeat: heartbeat, + outgoingCount: &atomic.Uint64{}, } w.conn.Store(conn) defer w.cancel() @@ -64,10 +65,11 @@ func TestWorkerSend(t *testing.T) { heartbeatCount := atomic.Int32{} w := &DefaultWorker{ - ctx: ctx, - cancel: cancel, - id: "wss://test", - heartbeat: heartbeat, + ctx: ctx, + cancel: cancel, + id: "wss://test", + heartbeat: heartbeat, + outgoingCount: &atomic.Uint64{}, } w.conn.Store(conn) defer w.cancel() diff --git a/outbound/worker_session_test.go b/outbound/worker_session_test.go index 32775a8..4d16c30 100644 --- a/outbound/worker_session_test.go +++ b/outbound/worker_session_test.go @@ -211,13 +211,14 @@ func TestRunSessionDisconnect(t *testing.T) { events := make(chan PoolEvent, 10) pool := PoolPlugin{Events: events} session := &Session{ - id: v.id, - connPtr: v.connPtr, - messages: v.messages, - heartbeat: v.heartbeat, - dial: v.dial, - keepalive: v.keepalive, - newConn: v.newConn, + id: v.id, + connPtr: v.connPtr, + messages: v.messages, + heartbeat: v.heartbeat, + dial: v.dial, + keepalive: v.keepalive, + newConn: v.newConn, + restartCount: &atomic.Uint64{}, } go session.Start(ctx, pool) @@ -237,13 +238,14 @@ func TestRunSessionDisconnect(t *testing.T) { events := make(chan PoolEvent, 10) pool := PoolPlugin{Events: events} session := &Session{ - id: v.id, - connPtr: v.connPtr, - messages: v.messages, - heartbeat: v.heartbeat, - dial: v.dial, - keepalive: v.keepalive, - newConn: v.newConn, + id: v.id, + connPtr: v.connPtr, + messages: v.messages, + heartbeat: v.heartbeat, + dial: v.dial, + keepalive: v.keepalive, + newConn: v.newConn, + restartCount: &atomic.Uint64{}, } go session.Start(ctx, pool) @@ -266,13 +268,14 @@ func TestRunSessionDisconnect(t *testing.T) { events := make(chan PoolEvent, 10) pool := PoolPlugin{Events: events} session := &Session{ - id: v.id, - connPtr: v.connPtr, - messages: v.messages, - heartbeat: v.heartbeat, - dial: v.dial, - keepalive: v.keepalive, - newConn: v.newConn, + id: v.id, + connPtr: v.connPtr, + messages: v.messages, + heartbeat: v.heartbeat, + dial: v.dial, + keepalive: v.keepalive, + newConn: v.newConn, + restartCount: &atomic.Uint64{}, } go session.Start(ctx, pool) @@ -303,13 +306,14 @@ func TestRunSessionDisconnect(t *testing.T) { events := make(chan PoolEvent, 10) pool := PoolPlugin{Events: events} session := &Session{ - id: v.id, - connPtr: v.connPtr, - messages: v.messages, - heartbeat: v.heartbeat, - dial: v.dial, - keepalive: v.keepalive, - newConn: v.newConn, + id: v.id, + connPtr: v.connPtr, + messages: v.messages, + heartbeat: v.heartbeat, + dial: v.dial, + keepalive: v.keepalive, + newConn: v.newConn, + restartCount: &atomic.Uint64{}, } go session.Start(ctx, pool) diff --git a/outbound/worker_start_test.go b/outbound/worker_start_test.go index 32bcf75..e8b1f50 100644 --- a/outbound/worker_start_test.go +++ b/outbound/worker_start_test.go @@ -10,6 +10,7 @@ import ( "github.com/stretchr/testify/assert" "net/http" "sync" + "sync/atomic" "testing" "time" ) @@ -25,9 +26,10 @@ func makeWorkerContext(t *testing.T) ( events = make(chan PoolEvent, 10) errors = make(chan error, 10) pool = PoolPlugin{ - Inbox: inbox, - Events: events, - Errors: errors, + Inbox: inbox, + Events: events, + Errors: errors, + InboxCounter: &atomic.Uint64{}, } return } @@ -38,11 +40,17 @@ func makeWorker(t *testing.T, ctx context.Context, cancel context.CancelFunc) *D WithReconnectDelay(0 * time.Second), ) return &DefaultWorker{ - ctx: ctx, - cancel: cancel, - id: "wss://test", - config: config, - heartbeat: make(chan struct{}), + ctx: ctx, + cancel: cancel, + id: "wss://test", + config: config, + heartbeat: make(chan struct{}), + toQueue: make(chan types.ReceivedMessage, 256), + toForwarder: make(chan types.ReceivedMessage, 256), + processedCount: &atomic.Uint64{}, + droppedCount: &atomic.Uint64{}, + outgoingCount: &atomic.Uint64{}, + restartCount: &atomic.Uint64{}, } } diff --git a/queue/queue.go b/queue/queue.go index 89b28db..022a9a1 100644 --- a/queue/queue.go +++ b/queue/queue.go @@ -3,6 +3,7 @@ package queue import ( "context" "git.wisehodl.dev/jay/go-honeybee/types" + "sync/atomic" ) func RunQueue( @@ -11,6 +12,7 @@ func RunQueue( in <-chan types.ReceivedMessage, out chan<- types.ReceivedMessage, maxQueueSize int, + droppedCount *atomic.Uint64, ) { var next types.ReceivedMessage var queue messageQueue @@ -37,6 +39,7 @@ func RunQueue( if maxQueueSize > 0 && queue.len() >= maxQueueSize { // drop oldest message _ = queue.pop() + droppedCount.Add(1) } // add new message queue.push(msg) diff --git a/queue/queue_test.go b/queue/queue_test.go index de00209..efb5ab5 100644 --- a/queue/queue_test.go +++ b/queue/queue_test.go @@ -5,6 +5,7 @@ import ( "git.wisehodl.dev/jay/go-honeybee/honeybeetest" "git.wisehodl.dev/jay/go-honeybee/types" "github.com/stretchr/testify/assert" + "sync/atomic" "testing" "time" ) @@ -17,7 +18,7 @@ func TestRunQueue(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go RunQueue(id, ctx, inChan, outChan, 0) + go RunQueue(id, ctx, inChan, outChan, 0, &atomic.Uint64{}) inChan <- types.ReceivedMessage{Data: []byte("hello"), ReceivedAt: time.Now()} @@ -49,7 +50,7 @@ func TestRunQueue(t *testing.T) { } }() - go RunQueue(id, ctx, inChan, gatedInbox, 2) + go RunQueue(id, ctx, inChan, gatedInbox, 2, &atomic.Uint64{}) // send three messages while the gated inbox is blocked inChan <- types.ReceivedMessage{Data: []byte("first"), ReceivedAt: time.Now()} @@ -87,7 +88,7 @@ func TestRunQueue(t *testing.T) { done := make(chan struct{}) go func() { - RunQueue(id, ctx, inChan, outChan, 0) + RunQueue(id, ctx, inChan, outChan, 0, &atomic.Uint64{}) close(done) }() diff --git a/transport/connection.go b/transport/connection.go index e3b43d8..1308675 100644 --- a/transport/connection.go +++ b/transport/connection.go @@ -8,6 +8,7 @@ import ( "math/rand" "net/url" "sync" + "sync/atomic" "time" "git.wisehodl.dev/jay/go-honeybee/types" @@ -38,6 +39,15 @@ func (s ConnectionState) String() string { } } +type ConnectionStats struct { + ChanIncoming int + ChanErrors int + + TotalReceived uint64 + TotalSent uint64 + TotalHeartbeats uint64 +} + type Connection struct { url *url.URL dialer types.Dialer @@ -50,6 +60,10 @@ type Connection struct { errors chan error done chan struct{} + incomingCount *atomic.Uint64 + outgoingCount *atomic.Uint64 + heartbeatCount *atomic.Uint64 + state ConnectionState wg sync.WaitGroup @@ -75,16 +89,19 @@ func NewConnection(urlStr string, config *ConnectionConfig, logger *slog.Logger) } conn := &Connection{ - url: url, - dialer: NewDialer(), - socket: nil, - config: config, - logger: logger, - incoming: make(chan []byte, config.IncomingBufferSize), - heartbeat: make(chan struct{}, 1), - errors: make(chan error, config.ErrorsBufferSize), - state: StateDisconnected, - done: make(chan struct{}), + url: url, + dialer: NewDialer(), + socket: nil, + config: config, + logger: logger, + incoming: make(chan []byte, config.IncomingBufferSize), + heartbeat: make(chan struct{}, 1), + errors: make(chan error, config.ErrorsBufferSize), + incomingCount: &atomic.Uint64{}, + outgoingCount: &atomic.Uint64{}, + heartbeatCount: &atomic.Uint64{}, + state: StateDisconnected, + done: make(chan struct{}), } return conn, nil @@ -106,16 +123,19 @@ func NewConnectionFromSocket( } conn := &Connection{ - url: nil, - dialer: nil, - socket: socket, - config: config, - logger: logger, - incoming: make(chan []byte, config.IncomingBufferSize), - heartbeat: make(chan struct{}, 1), - errors: make(chan error, config.ErrorsBufferSize), - state: StateConnected, - done: make(chan struct{}), + url: nil, + dialer: nil, + socket: socket, + config: config, + logger: logger, + incoming: make(chan []byte, config.IncomingBufferSize), + heartbeat: make(chan struct{}, 1), + errors: make(chan error, config.ErrorsBufferSize), + incomingCount: &atomic.Uint64{}, + outgoingCount: &atomic.Uint64{}, + heartbeatCount: &atomic.Uint64{}, + state: StateConnected, + done: make(chan struct{}), } if config.CloseHandler != nil { @@ -336,6 +356,7 @@ func (c *Connection) startReader() { case <-c.done: return case c.incoming <- data: + c.incomingCount.Add(1) } } @@ -348,6 +369,7 @@ func (c *Connection) setupPongHandler() { c.socket.SetPongHandler(func(appData string) error { select { case c.heartbeat <- struct{}{}: + c.heartbeatCount.Add(1) default: } return nil @@ -410,6 +432,8 @@ func (c *Connection) Send(data []byte) error { return NewConnectionError(fmt.Errorf("%w: %w", ErrWriteFailed, err)) } + c.outgoingCount.Add(1) + return nil } @@ -431,6 +455,16 @@ func (c *Connection) State() ConnectionState { return c.state } +func (c *Connection) Stats() ConnectionStats { + return ConnectionStats{ + ChanIncoming: len(c.incoming), + ChanErrors: len(c.errors), + TotalReceived: c.incomingCount.Load(), + TotalSent: c.outgoingCount.Load(), + TotalHeartbeats: c.heartbeatCount.Load(), + } +} + func (c *Connection) SetDialer(d types.Dialer) { c.dialer = d }