From 6facb6eed024213051a131c7f6c74d1a34024b3a Mon Sep 17 00:00:00 2001 From: Jay Date: Wed, 20 May 2026 08:53:46 -0400 Subject: [PATCH] remove references to the queue --- config.go | 26 -------------------------- worker.go | 8 -------- 2 files changed, 34 deletions(-) diff --git a/config.go b/config.go index 896e923..6dcb0e8 100644 --- a/config.go +++ b/config.go @@ -157,7 +157,6 @@ func WithWorkerFactory(wf WorkerFactory) PoolOption { type WorkerConfig struct { KeepaliveTimeout time.Duration ReconnectDelay time.Duration - MaxQueueSize int LoggingEnabled bool LogLevel *slog.Level } @@ -179,7 +178,6 @@ func GetDefaultWorkerConfig() *WorkerConfig { return &WorkerConfig{ KeepaliveTimeout: 60 * time.Second, ReconnectDelay: 2 * time.Second, - MaxQueueSize: 0, // disabled by default LoggingEnabled: true, LogLevel: nil, } @@ -200,18 +198,6 @@ func ValidateWorkerConfig(config *WorkerConfig) error { return err } - err = validateMaxQueueSize(config.MaxQueueSize) - if err != nil { - return err - } - - return nil -} - -func validateMaxQueueSize(value int) error { - if value < 0 { - return InvalidMaxQueueSize - } return nil } @@ -252,18 +238,6 @@ func WithReconnectDelay(value time.Duration) WorkerOption { } } -// When MaxQueueSize is set to zero, queue limits are disabled. -func WithMaxQueueSize(value int) WorkerOption { - return func(c *WorkerConfig) error { - err := validateMaxQueueSize(value) - if err != nil { - return err - } - c.MaxQueueSize = value - return nil - } -} - func WithWorkerLoggingEnabled(value bool) WorkerOption { return func(c *WorkerConfig) error { c.LoggingEnabled = value diff --git a/worker.go b/worker.go index bfb344f..c055cc0 100644 --- a/worker.go +++ b/worker.go @@ -23,13 +23,11 @@ type Worker interface { type WorkerStats struct { IncomingAvailable bool ChanIncoming int - BufferDepth int64 ConnectionAvailable bool Connection transport.ConnectionStats TotalProcessed uint64 - TotalDropped uint64 TotalSent uint64 TotalRestarts uint64 } @@ -41,10 +39,8 @@ type DefaultWorker struct { heartbeat chan struct{} processedCount *atomic.Uint64 - droppedCount *atomic.Uint64 outgoingCount *atomic.Uint64 restartCount *atomic.Uint64 - bufferDepth *atomic.Int64 config *WorkerConfig ctx context.Context @@ -71,10 +67,8 @@ func NewWorker( config: config, heartbeat: make(chan struct{}), processedCount: &atomic.Uint64{}, - droppedCount: &atomic.Uint64{}, outgoingCount: &atomic.Uint64{}, restartCount: &atomic.Uint64{}, - bufferDepth: &atomic.Int64{}, ctx: wctx, cancel: wcancel, logger: logger, @@ -176,13 +170,11 @@ func (w *DefaultWorker) Stats() WorkerStats { return WorkerStats{ IncomingAvailable: connectionAvailable, ChanIncoming: incomingLen, - BufferDepth: w.bufferDepth.Load(), ConnectionAvailable: connectionAvailable, Connection: connStats, TotalProcessed: w.processedCount.Load(), - TotalDropped: w.droppedCount.Load(), TotalRestarts: w.restartCount.Load(), TotalSent: w.outgoingCount.Load(), }