From daf9f7534e6a3eb701477c00c4da156ed6508912 Mon Sep 17 00:00:00 2001 From: Jay Date: Mon, 20 Apr 2026 22:41:37 -0400 Subject: [PATCH] Various performance and correctness improvements. --- README.md | 42 +++++---- honeybee.go | 22 +++-- inbound/config.go | 43 +++++++++ inbound/config_test.go | 17 ++++ inbound/errors.go | 1 + inbound/pool.go | 19 ++-- inbound/worker.go | 70 ++++++--------- inbound/worker_forwarder_test.go | 79 +--------------- inbound/worker_reader_test.go | 15 ++-- outbound/config.go | 43 +++++++++ outbound/config_pool_test.go | 19 ++++ outbound/errors.go | 1 + outbound/pool.go | 6 +- outbound/worker.go | 72 ++++++--------- outbound/worker_forwarder_test.go | 79 +--------------- outbound/worker_session_inner_test.go | 11 +-- outbound/worker_session_test.go | 5 +- queue/queue.go | 125 ++++++++++++++++++++++++++ queue/queue_test.go | 104 +++++++++++++++++++++ transport/config.go | 43 +++++++-- transport/config_test.go | 20 +++-- transport/connection.go | 8 +- transport/errors.go | 1 + transport/retry.go | 24 ++++- types/types.go | 5 ++ 25 files changed, 577 insertions(+), 297 deletions(-) create mode 100644 queue/queue.go create mode 100644 queue/queue_test.go diff --git a/README.md b/README.md index 391dee1..a6b44b5 100644 --- a/README.md +++ b/README.md @@ -272,13 +272,15 @@ Three config types cover three scopes. Connection and retry: +- `WithCloseHandler(func)` installs a close handler on the socket. +- `WithWriteTimeout(duration)` sets per-message write deadline. +- `WithIncomingBufferSize(int)` sets the connection's incoming message channel buffer. +- `WithErrorsBufferSize(int)` sets the connection's errors channel buffer. - `WithoutRetry()` disables retry entirely. - `WithRetryMaxRetries(int)` caps retry attempts; zero means infinite. - `WithRetryInitialDelay(duration)` sets the first backoff interval. - `WithRetryMaxDelay(duration)` caps the backoff interval. - `WithRetryJitterFactor(float64)` adds randomization to backoff, range 0.0 to 1.0. -- `WithWriteTimeout(duration)` sets per-message write deadline. -- `WithCloseHandler(func)` installs a close handler on the socket. Inbound worker: @@ -292,6 +294,9 @@ Outbound worker: Pool wiring (both directions have inbound and outbound variants): +- `With{Inbound,Outbound}InboxBufferSize(int)` sets the pool's inbox channel buffer. +- `With{Inbound,Outbound}EventsBufferSize(int)` sets the pool's events channel buffer. +- `With{Inbound,Outbound}ErrorsBufferSize(int)` sets the pool's errors channel buffer. - `With{Inbound,Outbound}ConnectionConfig(*ConnectionConfig)` - `With{Inbound,Outbound}WorkerConfig(*WorkerConfig)` - `With{Inbound,Outbound}WorkerFactory(WorkerFactory)` @@ -300,20 +305,25 @@ All option functions validate their inputs. Invalid values return errors at appl ### Defaults -| Setting | Default | Disabled Value | Notes | -| ---------------------------- | ------- | ---------------- | ------------------------------- | -| `WriteTimeout` | 30s | `0` | Per-message write deadline | -| `Retry` enabled | yes | `WithoutRetry()` | Applies to `Connect()` only | -| `Retry.MaxRetries` | `0` | — | `0` means infinite | -| `Retry.InitialDelay` | 1s | — | Must be positive | -| `Retry.MaxDelay` | 5s | — | Must be at least `InitialDelay` | -| `Retry.JitterFactor` | 0.5 | `0.0` | Range [0.0, 1.0] | -| Inbound `MaxQueueSize` | `0` | `0` | `0` means unbounded | -| Inbound `InactivityTimeout` | `0` | `0` | `0` disables watchdog | -| Outbound `KeepaliveTimeout` | 20s | `0` | `0` disables keepalive | -| Outbound `MaxQueueSize` | `0` | `0` | `0` means unbounded | -| Connection inbox buffer | 100 | — | Not configurable | -| Connection errors buffer | 10 | — | Not configurable | +| Setting | Default | Disabled Value | Notes | +|----------------------------------|---------|------------------|---------------------------------| +| `WriteTimeout` | 30s | `0` | Per-message write deadline | +| `Retry` enabled | yes | `WithoutRetry()` | Applies to `Connect()` only | +| `Retry.MaxRetries` | `0` | — | `0` means infinite | +| `Retry.InitialDelay` | 1s | — | Must be positive | +| `Retry.MaxDelay` | 5s | — | Must be at least `InitialDelay` | +| `Retry.JitterFactor` | 0.5 | `0.0` | Range [0.0, 1.0] | +| Inbound `MaxQueueSize` | `0` | `0` | `0` means unbounded | +| Inbound `InactivityTimeout` | `0` | `0` | `0` disables watchdog | +| Outbound `KeepaliveTimeout` | 20s | `0` | `0` disables keepalive | +| Outbound `MaxQueueSize` | `0` | `0` | `0` means unbounded | +| Connection `IncomingBufferSize` | 100 | — | Must be positive | +| Connection `ErrorsBufferSize` | 10 | — | Must be positive | +| Inbound pool `InboxBufferSize` | 256 | — | Must be positive | +| Inbound pool `EventsBufferSize` | 10 | — | Must be positive | +| Outbound pool `InboxBufferSize` | 256 | — | Must be positive | +| Outbound pool `EventsBufferSize` | 10 | — | Must be positive | +| Outbound pool `ErrorsBufferSize` | 10 | — | Must be positive | ## Testing diff --git a/honeybee.go b/honeybee.go index 4b691cf..1c15aac 100644 --- a/honeybee.go +++ b/honeybee.go @@ -78,13 +78,15 @@ func NewConnectionConfig(opts ...ConnectionOption) (*ConnectionConfig, error) { // Connection options var ( - WithoutRetry = transport.WithoutRetry - WithRetryMaxRetries = transport.WithRetryMaxRetries - WithRetryInitialDelay = transport.WithRetryInitialDelay - WithRetryMaxDelay = transport.WithRetryMaxDelay - WithRetryJitterFactor = transport.WithRetryJitterFactor - WithWriteTimeout = transport.WithWriteTimeout - WithCloseHandler = transport.WithCloseHandler + WithIncomingBufferSize = transport.WithIncomingBufferSize + WithErrorsBufferSize = transport.WithErrorsBufferSize + WithoutRetry = transport.WithoutRetry + WithRetryMaxRetries = transport.WithRetryMaxRetries + WithRetryInitialDelay = transport.WithRetryInitialDelay + WithRetryMaxDelay = transport.WithRetryMaxDelay + WithRetryJitterFactor = transport.WithRetryJitterFactor + WithWriteTimeout = transport.WithWriteTimeout + WithCloseHandler = transport.WithCloseHandler ) // Outbound Pool constructors @@ -104,6 +106,9 @@ func NewOutboundWorkerConfig(opts ...OutboundWorkerOption) (*OutboundWorkerConfi // Outbound Pool options var ( + WithOutboundInboxBufferSize = outbound.WithInboxBufferSize + WithOutboundEventsBufferSize = outbound.WithEventsBufferSize + WithOutboundErrorsBufferSize = outbound.WithErrorsBufferSize WithOutboundConnectionConfig = outbound.WithConnectionConfig WithOutboundWorkerConfig = outbound.WithWorkerConfig WithOutboundWorkerFactory = outbound.WithWorkerFactory @@ -133,6 +138,9 @@ func NewInboundWorkerConfig(opts ...InboundWorkerOption) (*InboundWorkerConfig, // Inbound Pool options var ( + WithInboundInboxBufferSize = inbound.WithInboxBufferSize + WithInboundEventsBufferSize = inbound.WithEventsBufferSize + WithInboundErrorsBufferSize = inbound.WithErrorsBufferSize WithInboundConnectionConfig = inbound.WithConnectionConfig WithInboundWorkerConfig = inbound.WithWorkerConfig WithInboundWorkerFactory = inbound.WithWorkerFactory diff --git a/inbound/config.go b/inbound/config.go index d8aeaed..ceb5e61 100644 --- a/inbound/config.go +++ b/inbound/config.go @@ -99,6 +99,9 @@ type WorkerFactory func( ) (Worker, error) type PoolConfig struct { + InboxBufferSize int + EventsBufferSize int + ErrorsBufferSize int ConnectionConfig *transport.ConnectionConfig WorkerConfig *WorkerConfig WorkerFactory WorkerFactory @@ -119,6 +122,9 @@ func NewPoolConfig(options ...PoolOption) (*PoolConfig, error) { func GetDefaultPoolConfig() *PoolConfig { return &PoolConfig{ + InboxBufferSize: 256, + EventsBufferSize: 10, + ErrorsBufferSize: 10, ConnectionConfig: nil, WorkerConfig: nil, WorkerFactory: nil, @@ -148,6 +154,43 @@ func ValidatePoolConfig(config *PoolConfig) error { return nil } +func validateBufferSize(value int) error { + if value < 1 { + return InvalidBufferSize + } + return nil +} + +func WithInboxBufferSize(value int) PoolOption { + return func(c *PoolConfig) error { + if err := validateBufferSize(value); err != nil { + return err + } + c.InboxBufferSize = value + return nil + } +} + +func WithEventsBufferSize(value int) PoolOption { + return func(c *PoolConfig) error { + if err := validateBufferSize(value); err != nil { + return err + } + c.EventsBufferSize = value + return nil + } +} + +func WithErrorsBufferSize(value int) PoolOption { + return func(c *PoolConfig) error { + if err := validateBufferSize(value); err != nil { + return err + } + c.ErrorsBufferSize = value + return nil + } +} + func WithConnectionConfig(cc *transport.ConnectionConfig) PoolOption { return func(c *PoolConfig) error { if err := transport.ValidateConnectionConfig(cc); err != nil { diff --git a/inbound/config_test.go b/inbound/config_test.go index ed135c2..4b4a1fe 100644 --- a/inbound/config_test.go +++ b/inbound/config_test.go @@ -101,8 +101,12 @@ func TestNewPoolConfig(t *testing.T) { func TestDefaultPoolConfig(t *testing.T) { conf := GetDefaultPoolConfig() assert.Equal(t, &PoolConfig{ + InboxBufferSize: 256, + EventsBufferSize: 10, + ErrorsBufferSize: 10, ConnectionConfig: nil, WorkerConfig: nil, + WorkerFactory: nil, }, conf) } @@ -160,6 +164,19 @@ func TestValidatePoolConfig(t *testing.T) { } } +func TestWithBufferSizes(t *testing.T) { + conf := &PoolConfig{} + + err := applyPoolOptions(conf, + WithInboxBufferSize(100), + WithEventsBufferSize(20), + WithErrorsBufferSize(20), + ) + assert.NoError(t, err) + assert.Equal(t, 100, conf.InboxBufferSize) + assert.Equal(t, 20, conf.EventsBufferSize) +} + func TestWithConnectionConfig(t *testing.T) { conf := &PoolConfig{} diff --git a/inbound/errors.go b/inbound/errors.go index dc5cb19..9acbd7b 100644 --- a/inbound/errors.go +++ b/inbound/errors.go @@ -12,4 +12,5 @@ var ( // Config errors InvalidMaxQueueSize = errors.New("maximum queue size cannot be negative") InvalidInactivityTimeout = errors.New("inactivity timeout cannot be negative") + InvalidBufferSize = errors.New("buffer size must be greater than zero") ) diff --git a/inbound/pool.go b/inbound/pool.go index efe9026..4ccefed 100644 --- a/inbound/pool.go +++ b/inbound/pool.go @@ -42,6 +42,7 @@ type InboxMessage struct { type PoolPlugin struct { Inbox chan<- InboxMessage Events chan<- PoolEvent + Errors chan<- error Logger *slog.Logger OnExit OnExitFunction } @@ -62,6 +63,7 @@ type Pool struct { peers map[string]*Peer inbox chan InboxMessage events chan PoolEvent + errors chan error config *PoolConfig logger *slog.Logger @@ -100,8 +102,9 @@ func NewPool(ctx context.Context, config *PoolConfig, logger *slog.Logger) (*Poo ctx: pctx, cancel: cancel, peers: make(map[string]*Peer), - inbox: make(chan InboxMessage, 256), - events: make(chan PoolEvent, 10), + inbox: make(chan InboxMessage, config.InboxBufferSize), + events: make(chan PoolEvent, config.EventsBufferSize), + errors: make(chan error, config.ErrorsBufferSize), config: config, logger: logger, }, nil @@ -127,6 +130,10 @@ func (p *Pool) Events() <-chan PoolEvent { return p.events } +func (p *Pool) Errors() <-chan error { + return p.errors +} + func (p *Pool) Close() { p.mu.Lock() if p.closed { @@ -137,21 +144,22 @@ func (p *Pool) Close() { p.closed = true p.cancel() - // remove all peers - p.peers = make(map[string]*Peer) - // close all connections for _, peer := range p.peers { peer.worker.Stop() peer.conn.Close() } + // remove all peers + p.peers = make(map[string]*Peer) + p.mu.Unlock() go func() { p.wg.Wait() close(p.inbox) close(p.events) + close(p.errors) }() } @@ -263,6 +271,7 @@ func (p *Pool) addLocked(id string, socket types.Socket) error { pool := PoolPlugin{ Inbox: p.inbox, Events: p.events, + Errors: p.errors, Logger: logger, OnExit: onExit, } diff --git a/inbound/worker.go b/inbound/worker.go index d5c10fa..2ca50c2 100644 --- a/inbound/worker.go +++ b/inbound/worker.go @@ -1,10 +1,11 @@ package inbound import ( - "container/list" "context" "errors" + "git.wisehodl.dev/jay/go-honeybee/queue" "git.wisehodl.dev/jay/go-honeybee/transport" + "git.wisehodl.dev/jay/go-honeybee/types" "sync" "time" ) @@ -23,11 +24,6 @@ const ( ExitPolicy ) -type ReceivedMessage struct { - data []byte - receivedAt time.Time -} - type DefaultWorker struct { id string conn *transport.Connection @@ -62,19 +58,25 @@ func NewWorker( } func (w *DefaultWorker) Start(pool PoolPlugin) { - messages := make(chan ReceivedMessage, 256) + toQueue := make(chan types.ReceivedMessage, 256) + toForwarder := make(chan types.ReceivedMessage, 256) var wg sync.WaitGroup - wg.Add(3) + wg.Add(4) go func() { defer wg.Done() - RunReader(w.ctx, pool.OnExit, w.conn, messages, w.heartbeat) + RunReader(w.ctx, pool.OnExit, w.conn, toQueue, w.heartbeat) }() go func() { defer wg.Done() - RunForwarder(w.id, w.ctx, messages, pool.Inbox, w.config.MaxQueueSize) + queue.RunQueue(w.id, w.ctx, toQueue, toForwarder, w.config.MaxQueueSize) + }() + + go func() { + defer wg.Done() + RunForwarder(w.id, w.ctx, toForwarder, pool.Inbox) }() go func() { @@ -107,7 +109,7 @@ func RunReader( onPeerClose OnExitFunction, conn *transport.Connection, - messages chan<- ReceivedMessage, + messages chan<- types.ReceivedMessage, heartbeat chan<- struct{}, ) { for { @@ -134,7 +136,7 @@ func RunReader( return } - messages <- ReceivedMessage{data: data, receivedAt: time.Now()} + messages <- types.ReceivedMessage{Data: data, ReceivedAt: time.Now()} select { case heartbeat <- struct{}{}: @@ -148,43 +150,27 @@ func RunReader( func RunForwarder( id string, ctx context.Context, - messages <-chan ReceivedMessage, + messages <-chan types.ReceivedMessage, inbox chan<- InboxMessage, - maxQueueSize int, ) { - queue := list.New() - for { - var out chan<- InboxMessage - var next ReceivedMessage - - // enable inbox if it is populated - if queue.Len() > 0 { - out = inbox - - // read the first message in the queue - next = queue.Front().Value.(ReceivedMessage) - } - select { case <-ctx.Done(): return - case msg := <-messages: - // limit queue size if maximum is configured - if maxQueueSize > 0 && queue.Len() >= maxQueueSize { - // drop oldest message - queue.Remove(queue.Front()) + case msg, ok := <-messages: + if !ok { + return + } + select { + case <-ctx.Done(): + return + + case inbox <- InboxMessage{ + ID: id, + Data: msg.Data, + ReceivedAt: msg.ReceivedAt, + }: } - // add new message - queue.PushBack(msg) - // send next message to inbox - case out <- InboxMessage{ - ID: id, - Data: next.data, - ReceivedAt: next.receivedAt, - }: - // drop message from queue - queue.Remove(queue.Front()) } } } diff --git a/inbound/worker_forwarder_test.go b/inbound/worker_forwarder_test.go index 18b05ca..ecec080 100644 --- a/inbound/worker_forwarder_test.go +++ b/inbound/worker_forwarder_test.go @@ -3,7 +3,7 @@ package inbound import ( "context" "git.wisehodl.dev/jay/go-honeybee/honeybeetest" - "github.com/stretchr/testify/assert" + "git.wisehodl.dev/jay/go-honeybee/types" "testing" "time" ) @@ -11,14 +11,14 @@ import ( func TestRunForwarder(t *testing.T) { t.Run("message passes through to inbox", func(t *testing.T) { id := "wss://test" - messages := make(chan ReceivedMessage, 1) + messages := make(chan types.ReceivedMessage, 1) inbox := make(chan InboxMessage, 1) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go RunForwarder(id, ctx, messages, inbox, 0) + go RunForwarder(id, ctx, messages, inbox) - messages <- ReceivedMessage{data: []byte("hello"), receivedAt: time.Now()} + messages <- types.ReceivedMessage{Data: []byte("hello"), ReceivedAt: time.Now()} honeybeetest.Eventually(t, func() bool { select { @@ -29,75 +29,4 @@ func TestRunForwarder(t *testing.T) { } }, "expected message") }) - - t.Run("oldest message dropped when queue is full", func(t *testing.T) { - id := "wss://test" - messages := make(chan ReceivedMessage, 1) - inbox := make(chan InboxMessage, 1) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - gate := make(chan struct{}) - gatedInbox := make(chan InboxMessage) - - // gate the inbox from receiving messages until the gate is opened - go func() { - <-gate - for msg := range gatedInbox { - inbox <- msg - } - }() - - go RunForwarder(id, ctx, messages, gatedInbox, 2) - - // send three messages while the gated inbox is blocked - messages <- ReceivedMessage{data: []byte("first"), receivedAt: time.Now()} - messages <- ReceivedMessage{data: []byte("second"), receivedAt: time.Now()} - messages <- ReceivedMessage{data: []byte("third"), receivedAt: time.Now()} - - // allow time for the first message to be dropped - time.Sleep(20 * time.Millisecond) - - // close the gate, draining messages into the inbox - close(gate) - - // receive messages from the inbox - var received []string - honeybeetest.Eventually(t, func() bool { - select { - case msg := <-inbox: - received = append(received, string(msg.Data)) - default: - } - return len(received) == 2 - }, "expected messages") - - // first message was dropped - assert.Equal(t, []string{"second", "third"}, received) - - }) - - t.Run("exits on context cancellation", func(t *testing.T) { - id := "wss://test" - messages := make(chan ReceivedMessage, 1) - inbox := make(chan InboxMessage, 1) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - done := make(chan struct{}) - go func() { - RunForwarder(id, ctx, messages, inbox, 0) - close(done) - }() - - cancel() - honeybeetest.Eventually(t, func() bool { - select { - case <-done: - return true - default: - return false - } - }, "expected done signal") - }) } diff --git a/inbound/worker_reader_test.go b/inbound/worker_reader_test.go index f62d000..fb55e83 100644 --- a/inbound/worker_reader_test.go +++ b/inbound/worker_reader_test.go @@ -4,6 +4,7 @@ import ( "context" "git.wisehodl.dev/jay/go-honeybee/honeybeetest" "git.wisehodl.dev/jay/go-honeybee/transport" + "git.wisehodl.dev/jay/go-honeybee/types" "github.com/gorilla/websocket" "github.com/stretchr/testify/assert" "io" @@ -17,7 +18,7 @@ func TestRunReader(t *testing.T) { conn, _, incoming, _ := setupTestConnection(t) defer conn.Close() - messages := make(chan ReceivedMessage, 1) + messages := make(chan types.ReceivedMessage, 1) heartbeat := make(chan struct{}, 1) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -30,7 +31,7 @@ func TestRunReader(t *testing.T) { honeybeetest.Eventually(t, func() bool { select { case msg := <-messages: - return string(msg.data) == "hello" && msg.receivedAt.After(before) + return string(msg.Data) == "hello" && msg.ReceivedAt.After(before) default: return false } @@ -41,7 +42,7 @@ func TestRunReader(t *testing.T) { conn, _, incoming, _ := setupTestConnection(t) defer conn.Close() - messages := make(chan ReceivedMessage, 10) + messages := make(chan types.ReceivedMessage, 10) heartbeat := make(chan struct{}, 10) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -81,7 +82,7 @@ func TestRunReader(t *testing.T) { conn, err := transport.NewConnectionFromSocket(mock, nil, nil) assert.NoError(t, err) - messages := make(chan ReceivedMessage, 1) + messages := make(chan types.ReceivedMessage, 1) heartbeat := make(chan struct{}, 1) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -118,7 +119,7 @@ func TestRunReader(t *testing.T) { conn, err := transport.NewConnectionFromSocket(mock, nil, nil) assert.NoError(t, err) - messages := make(chan ReceivedMessage, 1) + messages := make(chan types.ReceivedMessage, 1) heartbeat := make(chan struct{}, 1) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -155,7 +156,7 @@ func TestRunReader(t *testing.T) { conn, err := transport.NewConnectionFromSocket(mock, nil, nil) assert.NoError(t, err) - messages := make(chan ReceivedMessage, 1) + messages := make(chan types.ReceivedMessage, 1) heartbeat := make(chan struct{}, 1) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -183,7 +184,7 @@ func TestRunReader(t *testing.T) { conn, _, _, _ := setupTestConnection(t) defer conn.Close() - messages := make(chan ReceivedMessage, 1) + messages := make(chan types.ReceivedMessage, 1) heartbeat := make(chan struct{}, 1) ctx, cancel := context.WithCancel(context.Background()) diff --git a/outbound/config.go b/outbound/config.go index 43cf587..4aa0558 100644 --- a/outbound/config.go +++ b/outbound/config.go @@ -13,6 +13,9 @@ type WorkerFactory func(ctx context.Context, id string) (Worker, error) // Pool Config type PoolConfig struct { + InboxBufferSize int + EventsBufferSize int + ErrorsBufferSize int ConnectionConfig *transport.ConnectionConfig WorkerFactory WorkerFactory WorkerConfig *WorkerConfig @@ -33,6 +36,9 @@ func NewPoolConfig(options ...PoolOption) (*PoolConfig, error) { func GetDefaultPoolConfig() *PoolConfig { return &PoolConfig{ + InboxBufferSize: 256, + EventsBufferSize: 10, + ErrorsBufferSize: 10, ConnectionConfig: nil, WorkerFactory: nil, WorkerConfig: nil, @@ -68,6 +74,43 @@ func ValidatePoolConfig(config *PoolConfig) error { return nil } +func validateBufferSize(value int) error { + if value < 1 { + return InvalidBufferSize + } + return nil +} + +func WithInboxBufferSize(value int) PoolOption { + return func(c *PoolConfig) error { + if err := validateBufferSize(value); err != nil { + return err + } + c.InboxBufferSize = value + return nil + } +} + +func WithEventsBufferSize(value int) PoolOption { + return func(c *PoolConfig) error { + if err := validateBufferSize(value); err != nil { + return err + } + c.EventsBufferSize = value + return nil + } +} + +func WithErrorsBufferSize(value int) PoolOption { + return func(c *PoolConfig) error { + if err := validateBufferSize(value); err != nil { + return err + } + c.ErrorsBufferSize = value + return nil + } +} + func WithConnectionConfig(cc *transport.ConnectionConfig) PoolOption { return func(c *PoolConfig) error { err := transport.ValidateConnectionConfig(cc) diff --git a/outbound/config_pool_test.go b/outbound/config_pool_test.go index 4bbe9f1..b8e735e 100644 --- a/outbound/config_pool_test.go +++ b/outbound/config_pool_test.go @@ -12,6 +12,9 @@ func TestNewPoolConfig(t *testing.T) { assert.NoError(t, err) assert.Equal(t, conf, &PoolConfig{ + InboxBufferSize: 256, + EventsBufferSize: 10, + ErrorsBufferSize: 10, ConnectionConfig: nil, WorkerConfig: nil, WorkerFactory: nil, @@ -22,6 +25,9 @@ func TestDefaultPoolConfig(t *testing.T) { conf := GetDefaultPoolConfig() assert.Equal(t, conf, &PoolConfig{ + InboxBufferSize: 256, + EventsBufferSize: 10, + ErrorsBufferSize: 10, ConnectionConfig: nil, WorkerConfig: nil, WorkerFactory: nil, @@ -39,6 +45,19 @@ func TestApplyPoolOptions(t *testing.T) { assert.Equal(t, 0*time.Second, conf.ConnectionConfig.WriteTimeout) } +func TestWithBufferSizes(t *testing.T) { + conf := &PoolConfig{} + + err := applyPoolOptions(conf, + WithInboxBufferSize(100), + WithEventsBufferSize(20), + WithErrorsBufferSize(20), + ) + assert.NoError(t, err) + assert.Equal(t, 100, conf.InboxBufferSize) + assert.Equal(t, 20, conf.EventsBufferSize) +} + func TestWithConnectionConfig(t *testing.T) { conf := &PoolConfig{} opt := WithConnectionConfig(&transport.ConnectionConfig{WriteTimeout: 1 * time.Second}) diff --git a/outbound/errors.go b/outbound/errors.go index 9282aed..45accc5 100644 --- a/outbound/errors.go +++ b/outbound/errors.go @@ -7,6 +7,7 @@ var ( // Config errors InvalidKeepaliveTimeout = errors.New("keepalive timeout cannot be negative") InvalidMaxQueueSize = errors.New("maximum queue size cannot be negative") + InvalidBufferSize = errors.New("buffer size must be greater than zero") // Pool errors ErrPoolClosed = errors.New("pool is closed") diff --git a/outbound/pool.go b/outbound/pool.go index 424273c..d0ad314 100644 --- a/outbound/pool.go +++ b/outbound/pool.go @@ -89,9 +89,9 @@ func NewPool(ctx context.Context, config *PoolConfig, logger *slog.Logger, ctx: pctx, cancel: cancel, peers: make(map[string]*Peer), - inbox: make(chan InboxMessage, 256), - events: make(chan PoolEvent, 10), - errors: make(chan error, 10), + inbox: make(chan InboxMessage, config.InboxBufferSize), + events: make(chan PoolEvent, config.EventsBufferSize), + errors: make(chan error, config.ErrorsBufferSize), dialer: transport.NewDialer(), config: config, logger: logger, diff --git a/outbound/worker.go b/outbound/worker.go index b11bf3b..57fce15 100644 --- a/outbound/worker.go +++ b/outbound/worker.go @@ -1,9 +1,10 @@ package outbound import ( - "container/list" "context" + "git.wisehodl.dev/jay/go-honeybee/queue" "git.wisehodl.dev/jay/go-honeybee/transport" + "git.wisehodl.dev/jay/go-honeybee/types" "sync" "sync/atomic" "time" @@ -17,11 +18,6 @@ type Worker interface { Send(data []byte) error } -type ReceivedMessage struct { - data []byte - receivedAt time.Time -} - type DefaultWorker struct { id string conn atomic.Pointer[transport.Connection] @@ -59,11 +55,12 @@ func NewWorker( func (w *DefaultWorker) Start(pool PoolPlugin) { dial := make(chan struct{}, 1) newConn := make(chan *transport.Connection, 1) - messages := make(chan ReceivedMessage, 256) + toQueue := make(chan types.ReceivedMessage, 256) + toForwarder := make(chan types.ReceivedMessage, 256) keepalive := make(chan struct{}, 1) var wg sync.WaitGroup - wg.Add(4) + wg.Add(5) go func() { defer wg.Done() @@ -77,7 +74,12 @@ func (w *DefaultWorker) Start(pool PoolPlugin) { go func() { defer wg.Done() - RunForwarder(w.id, w.ctx, messages, pool.Inbox, w.config.MaxQueueSize) + queue.RunQueue(w.id, w.ctx, toQueue, toForwarder, w.config.MaxQueueSize) + }() + + go func() { + defer wg.Done() + RunForwarder(w.id, w.ctx, toForwarder, pool.Inbox) }() go func() { @@ -85,7 +87,7 @@ func (w *DefaultWorker) Start(pool PoolPlugin) { session := &Session{ id: w.id, connPtr: &w.conn, - messages: messages, + messages: toQueue, heartbeat: w.heartbeat, dial: dial, keepalive: keepalive, @@ -124,7 +126,7 @@ type Session struct { id string connPtr *atomic.Pointer[transport.Connection] - messages chan<- ReceivedMessage + messages chan<- types.ReceivedMessage heartbeat chan<- struct{} dial chan<- struct{} @@ -203,7 +205,7 @@ func RunReader( ctx context.Context, onStop func(), conn *transport.Connection, - messages chan<- ReceivedMessage, + messages chan<- types.ReceivedMessage, heartbeat chan<- struct{}, ) { defer func() { @@ -222,7 +224,7 @@ func RunReader( } // send message forward - messages <- ReceivedMessage{data: data, receivedAt: time.Now()} + messages <- types.ReceivedMessage{Data: data, ReceivedAt: time.Now()} // send heartbeat select { @@ -254,43 +256,27 @@ func RunStopMonitor( func RunForwarder( id string, ctx context.Context, - messages <-chan ReceivedMessage, + messages <-chan types.ReceivedMessage, inbox chan<- InboxMessage, - maxQueueSize int, ) { - queue := list.New() - for { - var out chan<- InboxMessage - var next ReceivedMessage - - // enable inbox if it is populated - if queue.Len() > 0 { - out = inbox - - // read the first message in the queue - next = queue.Front().Value.(ReceivedMessage) - } - select { case <-ctx.Done(): return - case msg := <-messages: - // limit queue size if maximum is configured - if maxQueueSize > 0 && queue.Len() >= maxQueueSize { - // drop oldest message - queue.Remove(queue.Front()) + case msg, ok := <-messages: + if !ok { + return + } + select { + case <-ctx.Done(): + return + + case inbox <- InboxMessage{ + ID: id, + Data: msg.Data, + ReceivedAt: msg.ReceivedAt, + }: } - // add new message - queue.PushBack(msg) - // send next message to inbox - case out <- InboxMessage{ - ID: id, - Data: next.data, - ReceivedAt: next.receivedAt, - }: - // drop message from queue - queue.Remove(queue.Front()) } } } diff --git a/outbound/worker_forwarder_test.go b/outbound/worker_forwarder_test.go index c16288f..b34ccdf 100644 --- a/outbound/worker_forwarder_test.go +++ b/outbound/worker_forwarder_test.go @@ -3,7 +3,7 @@ package outbound import ( "context" "git.wisehodl.dev/jay/go-honeybee/honeybeetest" - "github.com/stretchr/testify/assert" + "git.wisehodl.dev/jay/go-honeybee/types" "testing" "time" ) @@ -11,14 +11,14 @@ import ( func TestRunForwarder(t *testing.T) { t.Run("message passes through to inbox", func(t *testing.T) { id := "wss://test" - messages := make(chan ReceivedMessage, 1) + messages := make(chan types.ReceivedMessage, 1) inbox := make(chan InboxMessage, 1) ctx, cancel := context.WithCancel(context.Background()) defer cancel() - go RunForwarder(id, ctx, messages, inbox, 0) + go RunForwarder(id, ctx, messages, inbox) - messages <- ReceivedMessage{data: []byte("hello"), receivedAt: time.Now()} + messages <- types.ReceivedMessage{Data: []byte("hello"), ReceivedAt: time.Now()} honeybeetest.Eventually(t, func() bool { select { @@ -29,75 +29,4 @@ func TestRunForwarder(t *testing.T) { } }, "expected message") }) - - t.Run("oldest message dropped when queue is full", func(t *testing.T) { - id := "wss://test" - messages := make(chan ReceivedMessage, 1) - inbox := make(chan InboxMessage, 1) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - gate := make(chan struct{}) - gatedInbox := make(chan InboxMessage) - - // gate the inbox from receiving messages until the gate is opened - go func() { - <-gate - for msg := range gatedInbox { - inbox <- msg - } - }() - - go RunForwarder(id, ctx, messages, gatedInbox, 2) - - // send three messages while the gated inbox is blocked - messages <- ReceivedMessage{data: []byte("first"), receivedAt: time.Now()} - messages <- ReceivedMessage{data: []byte("second"), receivedAt: time.Now()} - messages <- ReceivedMessage{data: []byte("third"), receivedAt: time.Now()} - - // allow time for the first message to be dropped - time.Sleep(20 * time.Millisecond) - - // close the gate, draining messages into the inbox - close(gate) - - // receive messages from the inbox - var received []string - honeybeetest.Eventually(t, func() bool { - select { - case msg := <-inbox: - received = append(received, string(msg.Data)) - default: - } - return len(received) == 2 - }, "expected messages") - - // first message was dropped - assert.Equal(t, []string{"second", "third"}, received) - - }) - - t.Run("exits on context cancellation", func(t *testing.T) { - id := "wss://test" - messages := make(chan ReceivedMessage, 1) - inbox := make(chan InboxMessage, 1) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - - done := make(chan struct{}) - go func() { - RunForwarder(id, ctx, messages, inbox, 0) - close(done) - }() - - cancel() - honeybeetest.Eventually(t, func() bool { - select { - case <-done: - return true - default: - return false - } - }, "expected done signal") - }) } diff --git a/outbound/worker_session_inner_test.go b/outbound/worker_session_inner_test.go index adeb160..c9da1f6 100644 --- a/outbound/worker_session_inner_test.go +++ b/outbound/worker_session_inner_test.go @@ -5,6 +5,7 @@ import ( "fmt" "git.wisehodl.dev/jay/go-honeybee/honeybeetest" "git.wisehodl.dev/jay/go-honeybee/transport" + "git.wisehodl.dev/jay/go-honeybee/types" "github.com/gorilla/websocket" "github.com/stretchr/testify/assert" "io" @@ -18,7 +19,7 @@ func TestRunReader(t *testing.T) { conn, _, incomingData, _ := setupTestConnection(t) defer conn.Close() - messages := make(chan ReceivedMessage, 1) + messages := make(chan types.ReceivedMessage, 1) heartbeat := make(chan struct{}) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -38,7 +39,7 @@ func TestRunReader(t *testing.T) { honeybeetest.Eventually(t, func() bool { select { case msg := <-messages: - return string(msg.data) == "hello" && msg.receivedAt.After(before) + return string(msg.Data) == "hello" && msg.ReceivedAt.After(before) default: return false } @@ -49,7 +50,7 @@ func TestRunReader(t *testing.T) { conn, _, incomingData, _ := setupTestConnection(t) defer conn.Close() - messages := make(chan ReceivedMessage, 10) + messages := make(chan types.ReceivedMessage, 10) heartbeat := make(chan struct{}) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -82,7 +83,7 @@ func TestRunReader(t *testing.T) { t.Run("incoming channel close calls conn.Close and onStop", func(t *testing.T) { conn, _, incomingData, _ := setupTestConnection(t) - messages := make(chan ReceivedMessage, 1) + messages := make(chan types.ReceivedMessage, 1) heartbeat := make(chan struct{}) ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -120,7 +121,7 @@ func TestRunReader(t *testing.T) { t.Run("sessionDone close calls conn.Close and onStop", func(t *testing.T) { conn, _, _, _ := setupTestConnection(t) - messages := make(chan ReceivedMessage, 1) + messages := make(chan types.ReceivedMessage, 1) heartbeat := make(chan struct{}) ctx, cancel := context.WithCancel(context.Background()) diff --git a/outbound/worker_session_test.go b/outbound/worker_session_test.go index ad21468..32775a8 100644 --- a/outbound/worker_session_test.go +++ b/outbound/worker_session_test.go @@ -5,6 +5,7 @@ import ( "fmt" "git.wisehodl.dev/jay/go-honeybee/honeybeetest" "git.wisehodl.dev/jay/go-honeybee/transport" + "git.wisehodl.dev/jay/go-honeybee/types" "sync/atomic" "testing" ) @@ -28,7 +29,7 @@ type testVars struct { keepalive chan struct{} heartbeat chan struct{} newConn chan *transport.Connection - messages chan ReceivedMessage + messages chan types.ReceivedMessage conn *transport.Connection mockSocket *honeybeetest.MockSocket @@ -52,7 +53,7 @@ func setup(t *testing.T) ( keepalive: make(chan struct{}, 1), heartbeat: make(chan struct{}, 1), newConn: make(chan *transport.Connection, 1), - messages: make(chan ReceivedMessage, 256), + messages: make(chan types.ReceivedMessage, 256), conn: conn, mockSocket: mockSocket, incomingData: incomingData, diff --git a/queue/queue.go b/queue/queue.go new file mode 100644 index 0000000..89b28db --- /dev/null +++ b/queue/queue.go @@ -0,0 +1,125 @@ +package queue + +import ( + "context" + "git.wisehodl.dev/jay/go-honeybee/types" +) + +func RunQueue( + id string, + ctx context.Context, + in <-chan types.ReceivedMessage, + out chan<- types.ReceivedMessage, + maxQueueSize int, +) { + var next types.ReceivedMessage + var queue messageQueue + if maxQueueSize > 0 { + queue = newBoundedRing(maxQueueSize) + } else { + queue = newUnboundedRing(64) + } + + for { + var outOrNil chan<- types.ReceivedMessage + + // enable out channel if queue is populated + if queue.len() > 0 { + outOrNil = out + next = queue.peek() + } + + select { + case <-ctx.Done(): + return + case msg := <-in: + // limit queue size if maximum is configured + if maxQueueSize > 0 && queue.len() >= maxQueueSize { + // drop oldest message + _ = queue.pop() + } + // add new message + queue.push(msg) + // send next message to out channel + case outOrNil <- next: + _ = queue.pop() + } + } +} + +// Ring Buffer Queue + +type messageQueue interface { + push(types.ReceivedMessage) + pop() types.ReceivedMessage + peek() types.ReceivedMessage + len() int +} + +type ring struct { + buf []types.ReceivedMessage + head int + size int +} + +func (r *ring) len() int { return r.size } + +func (r *ring) pop() types.ReceivedMessage { + m := r.buf[r.head] + var zero types.ReceivedMessage + r.buf[r.head] = zero // release reference for GC + r.head = (r.head + 1) % len(r.buf) + r.size-- + return m +} + +func (r *ring) peek() types.ReceivedMessage { + m := r.buf[r.head] + return m +} + +// shared write at logical tail; caller guarantees space exists +func (r *ring) writeTail(m types.ReceivedMessage) { + r.buf[(r.head+r.size)%len(r.buf)] = m + r.size++ +} + +// Bounded ring + +type boundedRing struct{ ring } + +func newBoundedRing(cap int) *boundedRing { + return &boundedRing{ring{buf: make([]types.ReceivedMessage, cap)}} +} + +func (b *boundedRing) push(m types.ReceivedMessage) { + if b.size == len(b.buf) { + b.buf[b.head] = m + b.head = (b.head + 1) % len(b.buf) + return + } + b.writeTail(m) +} + +// Unbounded Ring + +type unboundedRing struct{ ring } + +func newUnboundedRing(initialCap int) *unboundedRing { + if initialCap < 1 { + initialCap = 1 + } + return &unboundedRing{ring{buf: make([]types.ReceivedMessage, initialCap)}} +} + +func (u *unboundedRing) push(m types.ReceivedMessage) { + if u.size == len(u.buf) { + bigger := make([]types.ReceivedMessage, len(u.buf)*2) + for i := 0; i < u.size; i++ { + bigger[i] = u.buf[(u.head+i)%len(u.buf)] + } + u.buf = bigger + u.head = 0 + } + u.writeTail(m) +} diff --git a/queue/queue_test.go b/queue/queue_test.go new file mode 100644 index 0000000..de00209 --- /dev/null +++ b/queue/queue_test.go @@ -0,0 +1,104 @@ +package queue + +import ( + "context" + "git.wisehodl.dev/jay/go-honeybee/honeybeetest" + "git.wisehodl.dev/jay/go-honeybee/types" + "github.com/stretchr/testify/assert" + "testing" + "time" +) + +func TestRunQueue(t *testing.T) { + t.Run("message passes through to inbox", func(t *testing.T) { + id := "wss://test" + inChan := make(chan types.ReceivedMessage, 1) + outChan := make(chan types.ReceivedMessage, 1) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + go RunQueue(id, ctx, inChan, outChan, 0) + + inChan <- types.ReceivedMessage{Data: []byte("hello"), ReceivedAt: time.Now()} + + honeybeetest.Eventually(t, func() bool { + select { + case msg := <-outChan: + return string(msg.Data) == "hello" + default: + return false + } + }, "expected message") + }) + + t.Run("oldest message dropped when queue is full", func(t *testing.T) { + id := "wss://test" + inChan := make(chan types.ReceivedMessage, 1) + outChan := make(chan types.ReceivedMessage, 1) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + gate := make(chan struct{}) + gatedInbox := make(chan types.ReceivedMessage) + + // gate the inbox from receiving messages until the gate is opened + go func() { + <-gate + for msg := range gatedInbox { + outChan <- msg + } + }() + + go RunQueue(id, ctx, inChan, gatedInbox, 2) + + // send three messages while the gated inbox is blocked + inChan <- types.ReceivedMessage{Data: []byte("first"), ReceivedAt: time.Now()} + inChan <- types.ReceivedMessage{Data: []byte("second"), ReceivedAt: time.Now()} + inChan <- types.ReceivedMessage{Data: []byte("third"), ReceivedAt: time.Now()} + + // allow time for the first message to be dropped + time.Sleep(20 * time.Millisecond) + + // close the gate, draining messages into the inbox + close(gate) + + // receive messages from the inbox + var received []string + honeybeetest.Eventually(t, func() bool { + select { + case msg := <-outChan: + received = append(received, string(msg.Data)) + default: + } + return len(received) == 2 + }, "expected messages") + + // first message was dropped + assert.Equal(t, []string{"second", "third"}, received) + + }) + + t.Run("exits on context cancellation", func(t *testing.T) { + id := "wss://test" + inChan := make(chan types.ReceivedMessage, 1) + outChan := make(chan types.ReceivedMessage, 1) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + done := make(chan struct{}) + go func() { + RunQueue(id, ctx, inChan, outChan, 0) + close(done) + }() + + cancel() + honeybeetest.Eventually(t, func() bool { + select { + case <-done: + return true + default: + return false + } + }, "expected done signal") + }) +} diff --git a/transport/config.go b/transport/config.go index 13c3a9b..0df8403 100644 --- a/transport/config.go +++ b/transport/config.go @@ -7,9 +7,11 @@ import ( type CloseHandler func(code int, text string) error type ConnectionConfig struct { - CloseHandler CloseHandler - WriteTimeout time.Duration - Retry *RetryConfig + CloseHandler CloseHandler + WriteTimeout time.Duration + IncomingBufferSize int + ErrorsBufferSize int + Retry *RetryConfig } type RetryConfig struct { @@ -34,9 +36,11 @@ func NewConnectionConfig(options ...ConnectionOption) (*ConnectionConfig, error) func GetDefaultConnectionConfig() *ConnectionConfig { return &ConnectionConfig{ - CloseHandler: nil, - WriteTimeout: 30 * time.Second, - Retry: GetDefaultRetryConfig(), + CloseHandler: nil, + WriteTimeout: 30 * time.Second, + IncomingBufferSize: 100, + ErrorsBufferSize: 10, + Retry: GetDefaultRetryConfig(), } } @@ -100,6 +104,13 @@ func validateWriteTimeout(value time.Duration) error { return nil } +func validateBufferSize(value int) error { + if value < 1 { + return InvalidBufferSize + } + return nil +} + func validateMaxRetries(value int) error { if value < 0 { return InvalidRetryMaxRetries @@ -147,6 +158,26 @@ func WithWriteTimeout(value time.Duration) ConnectionOption { } } +func WithIncomingBufferSize(value int) ConnectionOption { + return func(c *ConnectionConfig) error { + if err := validateBufferSize(value); err != nil { + return err + } + c.IncomingBufferSize = value + return nil + } +} + +func WithErrorsBufferSize(value int) ConnectionOption { + return func(c *ConnectionConfig) error { + if err := validateBufferSize(value); err != nil { + return err + } + c.ErrorsBufferSize = value + return nil + } +} + func WithoutRetry() ConnectionOption { return func(c *ConnectionConfig) error { c.Retry = nil diff --git a/transport/config_test.go b/transport/config_test.go index 179be1b..e116bb6 100644 --- a/transport/config_test.go +++ b/transport/config_test.go @@ -13,9 +13,11 @@ func TestNewConnectionConfig(t *testing.T) { assert.NoError(t, err) assert.Equal(t, conf, &ConnectionConfig{ - CloseHandler: nil, - WriteTimeout: 30 * time.Second, - Retry: GetDefaultRetryConfig(), + CloseHandler: nil, + WriteTimeout: 30 * time.Second, + IncomingBufferSize: 100, + ErrorsBufferSize: 10, + Retry: GetDefaultRetryConfig(), }) // errors propagate @@ -32,9 +34,11 @@ func TestDefaultConnectionConfig(t *testing.T) { conf := GetDefaultConnectionConfig() assert.Equal(t, conf, &ConnectionConfig{ - CloseHandler: nil, - WriteTimeout: 30 * time.Second, - Retry: GetDefaultRetryConfig(), + CloseHandler: nil, + WriteTimeout: 30 * time.Second, + IncomingBufferSize: 100, + ErrorsBufferSize: 10, + Retry: GetDefaultRetryConfig(), }) } @@ -55,12 +59,16 @@ func TestApplyConnectionOptions(t *testing.T) { conf := &ConnectionConfig{} err := applyConnectionOptions( conf, + WithIncomingBufferSize(256), + WithErrorsBufferSize(100), WithRetryMaxRetries(0), WithRetryInitialDelay(3*time.Second), WithRetryJitterFactor(0.5), ) assert.NoError(t, err) + assert.Equal(t, 256, conf.IncomingBufferSize) + assert.Equal(t, 100, conf.ErrorsBufferSize) assert.Equal(t, 0, conf.Retry.MaxRetries) assert.Equal(t, 3*time.Second, conf.Retry.InitialDelay) assert.Equal(t, 0.5, conf.Retry.JitterFactor) diff --git a/transport/connection.go b/transport/connection.go index 41a5273..95ea008 100644 --- a/transport/connection.go +++ b/transport/connection.go @@ -78,8 +78,8 @@ func NewConnection(urlStr string, config *ConnectionConfig, logger *slog.Logger) socket: nil, config: config, logger: logger, - incoming: make(chan []byte, 100), - errors: make(chan error, 10), + incoming: make(chan []byte, config.IncomingBufferSize), + errors: make(chan error, config.ErrorsBufferSize), state: StateDisconnected, done: make(chan struct{}), } @@ -108,8 +108,8 @@ func NewConnectionFromSocket( socket: socket, config: config, logger: logger, - incoming: make(chan []byte, 100), - errors: make(chan error, 10), + incoming: make(chan []byte, config.IncomingBufferSize), + errors: make(chan error, config.ErrorsBufferSize), state: StateConnected, done: make(chan struct{}), } diff --git a/transport/errors.go b/transport/errors.go index 84c1bd0..c31c0cf 100644 --- a/transport/errors.go +++ b/transport/errors.go @@ -9,6 +9,7 @@ var ( // Configuration Errors InvalidWriteTimeout = errors.New("write timeout cannot be negative") + InvalidBufferSize = errors.New("buffer size must be greater than zero") InvalidRetryMaxRetries = errors.New("max retry count cannot be negative") InvalidRetryInitialDelay = errors.New("initial delay must be positive") InvalidRetryMaxDelay = errors.New("max delay must be positive") diff --git a/transport/retry.go b/transport/retry.go index 16063ba..bfb5048 100644 --- a/transport/retry.go +++ b/transport/retry.go @@ -9,12 +9,24 @@ import ( type RetryManager struct { config *RetryConfig retryCount int + saturation int } func NewRetryManager(config *RetryConfig) *RetryManager { + // saturationCount: retry count at which base delay meets or exceeds MaxDelay. + // Conservative by one to preserve jitter variance near the boundary. + saturation := 0 + if config != nil && + config.InitialDelay > 0 && + config.InitialDelay <= config.MaxDelay { + ratio := float64(config.MaxDelay) / float64(config.InitialDelay) + saturation = int(math.Ceil(math.Log2(ratio))) + 2 + } + return &RetryManager{ config: config, retryCount: 0, + saturation: saturation, } } @@ -40,8 +52,18 @@ func (r *RetryManager) CalculateDelay() time.Duration { return 0 } + // if saturation is reached, calculated backoff will always be higher than + // the maximum delay + if r.config != nil && r.retryCount >= r.saturation { + return r.config.MaxDelay + } + // Exponential backoff: InitialDelay * 2^(attempts-1) - backoffMultiplier := math.Pow(2, float64(r.retryCount-1)) + shift := r.retryCount - 1 + if shift > 62 { + shift = 62 + } // prevent overflow + backoffMultiplier := float64(int64(1) << shift) baseDelay := float64(r.config.InitialDelay) * backoffMultiplier // Apply jitter: delay * (1 + jitterFactor * (random - 0.5)) diff --git a/types/types.go b/types/types.go index 1dfa361..bd139da 100644 --- a/types/types.go +++ b/types/types.go @@ -22,3 +22,8 @@ type Socket interface { SetWriteDeadline(t time.Time) error SetCloseHandler(h func(code int, text string) error) } + +type ReceivedMessage struct { + Data []byte + ReceivedAt time.Time +}