From 7e7b18bb2a842be3915b892355dc0ae72a276bca Mon Sep 17 00:00:00 2001 From: Jay Date: Thu, 23 Apr 2026 21:26:40 -0400 Subject: [PATCH] Increase keepalive timeout. Add reconnection delay. --- outbound/config.go | 22 +++++++++++++++++++++- outbound/errors.go | 1 + outbound/worker.go | 20 ++++++++++++-------- 3 files changed, 34 insertions(+), 9 deletions(-) diff --git a/outbound/config.go b/outbound/config.go index 5673095..7e4b2f3 100644 --- a/outbound/config.go +++ b/outbound/config.go @@ -168,6 +168,7 @@ func WithWorkerFactory(wf WorkerFactory) PoolOption { type WorkerConfig struct { KeepaliveTimeout time.Duration + ReconnectDelay time.Duration MaxQueueSize int LoggingEnabled bool LogLevel *slog.Level @@ -188,7 +189,8 @@ func NewWorkerConfig(options ...WorkerOption) (*WorkerConfig, error) { func GetDefaultWorkerConfig() *WorkerConfig { return &WorkerConfig{ - KeepaliveTimeout: 20 * time.Second, + KeepaliveTimeout: 60 * time.Second, + ReconnectDelay: 2 * time.Second, MaxQueueSize: 0, // disabled by default LoggingEnabled: true, LogLevel: nil, @@ -232,6 +234,13 @@ func validateKeepaliveTimeout(value time.Duration) error { return nil } +func validateReconnectDelay(value time.Duration) error { + if value < 0 { + return InvalidReconnectDelay + } + return nil +} + // When KeepaliveTimeout is set to zero, keepalive timeouts are disabled. func WithKeepaliveTimeout(value time.Duration) WorkerOption { return func(c *WorkerConfig) error { @@ -244,6 +253,17 @@ func WithKeepaliveTimeout(value time.Duration) WorkerOption { } } +func WithReconnectDelay(value time.Duration) WorkerOption { + return func(c *WorkerConfig) error { + err := validateReconnectDelay(value) + if err != nil { + return err + } + c.ReconnectDelay = value + return nil + } +} + // When MaxQueueSize is set to zero, queue limits are disabled. func WithMaxQueueSize(value int) WorkerOption { return func(c *WorkerConfig) error { diff --git a/outbound/errors.go b/outbound/errors.go index dbed788..e308151 100644 --- a/outbound/errors.go +++ b/outbound/errors.go @@ -6,6 +6,7 @@ import "fmt" var ( // Config errors InvalidKeepaliveTimeout = errors.New("keepalive timeout cannot be negative") + InvalidReconnectDelay = errors.New("reconnect delay cannot be negative") InvalidMaxQueueSize = errors.New("maximum queue size cannot be negative") InvalidBufferSize = errors.New("buffer size must be greater than zero") diff --git a/outbound/worker.go b/outbound/worker.go index 66e81bc..7a4d85b 100644 --- a/outbound/worker.go +++ b/outbound/worker.go @@ -93,14 +93,15 @@ func (w *DefaultWorker) Start(pool PoolPlugin) { go func() { defer wg.Done() session := &Session{ - id: w.id, - connPtr: &w.conn, - messages: toQueue, - heartbeat: w.heartbeat, - dial: dial, - keepalive: keepalive, - newConn: newConn, - logger: w.logger, + id: w.id, + connPtr: &w.conn, + messages: toQueue, + heartbeat: w.heartbeat, + dial: dial, + keepalive: keepalive, + newConn: newConn, + reconnectDelay: w.config.ReconnectDelay, + logger: w.logger, } session.Start(w.ctx, pool) }() @@ -153,6 +154,8 @@ type Session struct { keepalive <-chan struct{} newConn <-chan *transport.Connection + reconnectDelay time.Duration + logger *slog.Logger } @@ -238,6 +241,7 @@ func (s *Session) Start( } // refresh session + time.Sleep(s.reconnectDelay) } }