diff --git a/inbound/config.go b/inbound/config.go index b8bf6b5..6c2d4d5 100644 --- a/inbound/config.go +++ b/inbound/config.go @@ -8,88 +8,6 @@ import ( "time" ) -// Worker Config - -type WorkerConfig struct { - MaxQueueSize int - InactivityTimeout time.Duration -} - -type WorkerOption func(*WorkerConfig) error - -func NewWorkerConfig(options ...WorkerOption) (*WorkerConfig, error) { - conf := GetDefaultWorkerConfig() - if err := applyWorkerOptions(conf, options...); err != nil { - return nil, err - } - if err := ValidateWorkerConfig(conf); err != nil { - return nil, err - } - return conf, nil -} - -func GetDefaultWorkerConfig() *WorkerConfig { - return &WorkerConfig{ - MaxQueueSize: 0, // queue can grow indefinitely by default - InactivityTimeout: 0, // eviction disabled by default - } -} - -func applyWorkerOptions(config *WorkerConfig, options ...WorkerOption) error { - for _, option := range options { - if err := option(config); err != nil { - return err - } - } - return nil -} - -func ValidateWorkerConfig(config *WorkerConfig) error { - if err := validateMaxQueueSize(config.MaxQueueSize); err != nil { - return err - } - if err := validateInactivityTimeout(config.InactivityTimeout); err != nil { - return err - } - return nil -} - -func validateMaxQueueSize(value int) error { - if value < 0 { - return InvalidMaxQueueSize - } - return nil -} - -func validateInactivityTimeout(value time.Duration) error { - if value < 0 { - return InvalidInactivityTimeout - } - return nil -} - -// When MaxQueueSize is zero, queue limits are disabled. -func WithMaxQueueSize(value int) WorkerOption { - return func(c *WorkerConfig) error { - if err := validateMaxQueueSize(value); err != nil { - return err - } - c.MaxQueueSize = value - return nil - } -} - -// When InactivityTimeout is zero, the watchdog is disabled. -func WithInactivityTimeout(value time.Duration) WorkerOption { - return func(c *WorkerConfig) error { - if err := validateInactivityTimeout(value); err != nil { - return err - } - c.InactivityTimeout = value - return nil - } -} - // Pool Config type WorkerFactory func( @@ -104,6 +22,8 @@ type PoolConfig struct { InboxBufferSize int EventsBufferSize int ErrorsBufferSize int + LoggingEnabled bool + LogLevel *slog.Level ConnectionConfig *transport.ConnectionConfig WorkerConfig *WorkerConfig WorkerFactory WorkerFactory @@ -127,6 +47,8 @@ func GetDefaultPoolConfig() *PoolConfig { InboxBufferSize: 256, EventsBufferSize: 10, ErrorsBufferSize: 10, + LoggingEnabled: true, + LogLevel: nil, ConnectionConfig: nil, WorkerConfig: nil, WorkerFactory: nil, @@ -193,6 +115,20 @@ func WithErrorsBufferSize(value int) PoolOption { } } +func WithPoolLoggingEnabled(value bool) PoolOption { + return func(c *PoolConfig) error { + c.LoggingEnabled = value + return nil + } +} + +func WithPoolLogLevel(level slog.Level) PoolOption { + return func(c *PoolConfig) error { + c.LogLevel = &level + return nil + } +} + func WithConnectionConfig(cc *transport.ConnectionConfig) PoolOption { return func(c *PoolConfig) error { if err := transport.ValidateConnectionConfig(cc); err != nil { @@ -219,3 +155,103 @@ func WithWorkerFactory(wf WorkerFactory) PoolOption { return nil } } + +// Worker Config + +type WorkerConfig struct { + MaxQueueSize int + InactivityTimeout time.Duration + LoggingEnabled bool + LogLevel *slog.Level +} + +type WorkerOption func(*WorkerConfig) error + +func NewWorkerConfig(options ...WorkerOption) (*WorkerConfig, error) { + conf := GetDefaultWorkerConfig() + if err := applyWorkerOptions(conf, options...); err != nil { + return nil, err + } + if err := ValidateWorkerConfig(conf); err != nil { + return nil, err + } + return conf, nil +} + +func GetDefaultWorkerConfig() *WorkerConfig { + return &WorkerConfig{ + MaxQueueSize: 0, // queue can grow indefinitely by default + InactivityTimeout: 0, // eviction disabled by default + LoggingEnabled: true, + LogLevel: nil, + } +} + +func applyWorkerOptions(config *WorkerConfig, options ...WorkerOption) error { + for _, option := range options { + if err := option(config); err != nil { + return err + } + } + return nil +} + +func ValidateWorkerConfig(config *WorkerConfig) error { + if err := validateMaxQueueSize(config.MaxQueueSize); err != nil { + return err + } + if err := validateInactivityTimeout(config.InactivityTimeout); err != nil { + return err + } + return nil +} + +func validateMaxQueueSize(value int) error { + if value < 0 { + return InvalidMaxQueueSize + } + return nil +} + +func validateInactivityTimeout(value time.Duration) error { + if value < 0 { + return InvalidInactivityTimeout + } + return nil +} + +// When MaxQueueSize is zero, queue limits are disabled. +func WithMaxQueueSize(value int) WorkerOption { + return func(c *WorkerConfig) error { + if err := validateMaxQueueSize(value); err != nil { + return err + } + c.MaxQueueSize = value + return nil + } +} + +// When InactivityTimeout is zero, the watchdog is disabled. +func WithInactivityTimeout(value time.Duration) WorkerOption { + return func(c *WorkerConfig) error { + if err := validateInactivityTimeout(value); err != nil { + return err + } + c.InactivityTimeout = value + return nil + } +} + +func WithWorkerLoggingEnabled(value bool) WorkerOption { + return func(c *WorkerConfig) error { + c.LoggingEnabled = value + return nil + } +} + +func WithWorkerLogLevel(level slog.Level) WorkerOption { + return func(c *WorkerConfig) error { + c.LogLevel = &level + return nil + } +} diff --git a/inbound/config_test.go b/inbound/config_test.go index 4b4a1fe..4f804e9 100644 --- a/inbound/config_test.go +++ b/inbound/config_test.go @@ -19,6 +19,8 @@ func TestDefaultWorkerConfig(t *testing.T) { assert.Equal(t, &WorkerConfig{ MaxQueueSize: 0, InactivityTimeout: 0, + LoggingEnabled: true, + LogLevel: nil, }, conf) } @@ -104,6 +106,8 @@ func TestDefaultPoolConfig(t *testing.T) { InboxBufferSize: 256, EventsBufferSize: 10, ErrorsBufferSize: 10, + LoggingEnabled: true, + LogLevel: nil, ConnectionConfig: nil, WorkerConfig: nil, WorkerFactory: nil, diff --git a/inbound/pool.go b/inbound/pool.go index d18889f..ee07ddd 100644 --- a/inbound/pool.go +++ b/inbound/pool.go @@ -90,7 +90,6 @@ func NewPool(ctx context.Context, id string, config *PoolConfig, handler slog.Ha // The factory function should be non-blocking or else Connect() may cause // deadlocks. if config.WorkerFactory == nil { - // TODO: Construct worker logger config.WorkerFactory = func( ctx context.Context, id string, @@ -109,8 +108,9 @@ func NewPool(ctx context.Context, id string, config *PoolConfig, handler slog.Ha pctx, cancel := context.WithCancel(ctx) var logger *slog.Logger - if handler != nil { - logger = logging.NewInboundPoolLogger(handler, id) + if handler != nil && config.LoggingEnabled { + logger = logging.NewInboundPoolLogger( + logging.WrapOrDefault(config.LogLevel, handler), id) } return &Pool{ @@ -249,8 +249,9 @@ func (p *Pool) Send(id string, data []byte) error { // addLocked constructs and registers a peer. Caller must hold p.mu write lock. func (p *Pool) addLocked(id string, socket types.Socket) error { var logger *slog.Logger - if p.handler != nil { - logger = logging.NewConnectionLogger(p.handler, p.id, id) + if p.handler != nil && p.config.ConnectionConfig.LoggingEnabled { + logger = logging.NewConnectionLogger( + logging.WrapOrDefault(p.config.ConnectionConfig.LogLevel, p.handler), p.id, id) } conn, err := transport.NewConnectionFromSocket( @@ -259,12 +260,13 @@ func (p *Pool) addLocked(id string, socket types.Socket) error { return err } - // The worker factory must be non-blocking to avoid deadlocks wctx, cancel := context.WithCancel(p.ctx) - if p.handler != nil { - logger = logging.NewInboundWorkerLogger(p.handler, p.id, id) + if p.handler != nil && p.config.WorkerConfig.LoggingEnabled { + logger = logging.NewInboundWorkerLogger( + logging.WrapOrDefault(p.config.WorkerConfig.LogLevel, p.handler), p.id, id) } + // The worker factory must be non-blocking to avoid deadlocks worker, err := p.config.WorkerFactory(wctx, id, conn, p.config.WorkerConfig, logger) if err != nil { cancel() diff --git a/logging/logging.go b/logging/logging.go index 637699d..624e889 100644 --- a/logging/logging.go +++ b/logging/logging.go @@ -1,6 +1,7 @@ package logging import ( + "context" "log/slog" ) @@ -21,7 +22,7 @@ const COMPONENT_INBOUND_WORKER = "inbound_worker" const COMPONENT_CONNECTION = "connection" -// Outbound loggers +// Constructors func NewOutboundPoolLogger(handler slog.Handler, poolID string) *slog.Logger { return newLogger(handler, @@ -40,8 +41,6 @@ func NewOutboundWorkerLogger(handler slog.Handler, poolID string, peerID string) ) } -// Inbound loggers - func NewInboundPoolLogger(handler slog.Handler, poolID string) *slog.Logger { return newLogger(handler, KEY_MODULE, MODULE_NAME, @@ -59,8 +58,6 @@ func NewInboundWorkerLogger(handler slog.Handler, poolID string, peerID string) ) } -// Connection logger - func NewConnectionLogger(handler slog.Handler, poolID string, peerID string) *slog.Logger { return newLogger(handler, KEY_MODULE, MODULE_NAME, @@ -75,3 +72,40 @@ func NewConnectionLogger(handler slog.Handler, poolID string, peerID string) *sl func newLogger(handler slog.Handler, attrs ...any) *slog.Logger { return slog.New(handler).With(attrs...) } + +// Handlers + +type ForcedLevelHandler struct { + level slog.Level + next slog.Handler +} + +func NewForcedLevelHandler(level slog.Level, next slog.Handler) slog.Handler { + return &ForcedLevelHandler{ + level: level, + next: next, + } +} + +func (h *ForcedLevelHandler) Enabled(_ context.Context, l slog.Level) bool { + return l >= h.level +} + +func (h *ForcedLevelHandler) Handle(ctx context.Context, r slog.Record) error { + return h.next.Handle(ctx, r) +} + +func (h *ForcedLevelHandler) WithAttrs(attrs []slog.Attr) slog.Handler { + return &ForcedLevelHandler{next: h.next.WithAttrs(attrs)} +} + +func (h *ForcedLevelHandler) WithGroup(name string) slog.Handler { + return &ForcedLevelHandler{next: h.next.WithGroup(name)} +} + +func WrapOrDefault(level *slog.Level, handler slog.Handler) slog.Handler { + if level != nil { + return NewForcedLevelHandler(*level, handler) + } + return handler +} diff --git a/outbound/config.go b/outbound/config.go index c05eff1..f59d2bd 100644 --- a/outbound/config.go +++ b/outbound/config.go @@ -21,6 +21,8 @@ type PoolConfig struct { InboxBufferSize int EventsBufferSize int ErrorsBufferSize int + LoggingEnabled bool + LogLevel *slog.Level ConnectionConfig *transport.ConnectionConfig WorkerFactory WorkerFactory WorkerConfig *WorkerConfig @@ -44,6 +46,8 @@ func GetDefaultPoolConfig() *PoolConfig { InboxBufferSize: 256, EventsBufferSize: 10, ErrorsBufferSize: 10, + LoggingEnabled: true, + LogLevel: nil, ConnectionConfig: nil, WorkerFactory: nil, WorkerConfig: nil, @@ -116,6 +120,20 @@ func WithErrorsBufferSize(value int) PoolOption { } } +func WithPoolLoggingEnabled(value bool) PoolOption { + return func(c *PoolConfig) error { + c.LoggingEnabled = value + return nil + } +} + +func WithPoolLogLevel(level slog.Level) PoolOption { + return func(c *PoolConfig) error { + c.LogLevel = &level + return nil + } +} + func WithConnectionConfig(cc *transport.ConnectionConfig) PoolOption { return func(c *PoolConfig) error { err := transport.ValidateConnectionConfig(cc) @@ -150,6 +168,8 @@ func WithWorkerFactory(wf WorkerFactory) PoolOption { type WorkerConfig struct { KeepaliveTimeout time.Duration MaxQueueSize int + LoggingEnabled bool + LogLevel *slog.Level } type WorkerOption func(*WorkerConfig) error @@ -169,6 +189,8 @@ func GetDefaultWorkerConfig() *WorkerConfig { return &WorkerConfig{ KeepaliveTimeout: 20 * time.Second, MaxQueueSize: 0, // disabled by default + LoggingEnabled: true, + LogLevel: nil, } } @@ -232,3 +254,17 @@ func WithMaxQueueSize(value int) WorkerOption { return nil } } + +func WithWorkerLoggingEnabled(value bool) WorkerOption { + return func(c *WorkerConfig) error { + c.LoggingEnabled = value + return nil + } +} + +func WithWorkerLogLevel(level slog.Level) WorkerOption { + return func(c *WorkerConfig) error { + c.LogLevel = &level + return nil + } +} diff --git a/outbound/config_pool_test.go b/outbound/config_pool_test.go index b8e735e..b4ba604 100644 --- a/outbound/config_pool_test.go +++ b/outbound/config_pool_test.go @@ -15,6 +15,8 @@ func TestNewPoolConfig(t *testing.T) { InboxBufferSize: 256, EventsBufferSize: 10, ErrorsBufferSize: 10, + LoggingEnabled: true, + LogLevel: nil, ConnectionConfig: nil, WorkerConfig: nil, WorkerFactory: nil, @@ -28,6 +30,8 @@ func TestDefaultPoolConfig(t *testing.T) { InboxBufferSize: 256, EventsBufferSize: 10, ErrorsBufferSize: 10, + LoggingEnabled: true, + LogLevel: nil, ConnectionConfig: nil, WorkerConfig: nil, WorkerFactory: nil, diff --git a/outbound/pool.go b/outbound/pool.go index f8415ce..fd14d42 100644 --- a/outbound/pool.go +++ b/outbound/pool.go @@ -95,8 +95,9 @@ func NewPool(ctx context.Context, id string, config *PoolConfig, handler slog.Ha pctx, cancel := context.WithCancel(ctx) var logger *slog.Logger - if handler != nil { - logger = logging.NewOutboundPoolLogger(handler, id) + if handler != nil && config.LoggingEnabled { + logger = logging.NewOutboundPoolLogger( + logging.WrapOrDefault(config.LogLevel, handler), id) } return &Pool{ @@ -185,8 +186,9 @@ func (p *Pool) Connect(id string) error { } var logger *slog.Logger - if p.handler != nil { - logger = logging.NewOutboundWorkerLogger(p.handler, p.id, id) + if p.handler != nil && p.config.WorkerConfig.LoggingEnabled { + logger = logging.NewOutboundWorkerLogger( + logging.WrapOrDefault(p.config.WorkerConfig.LogLevel, p.handler), p.id, id) } // The worker factory must be non-blocking to avoid deadlocks diff --git a/outbound/worker.go b/outbound/worker.go index 8ae8c10..cb0c02a 100644 --- a/outbound/worker.go +++ b/outbound/worker.go @@ -338,8 +338,9 @@ func connect( pool PoolPlugin, ) (*transport.Connection, error) { var logger *slog.Logger - if pool.Handler != nil { - logger = logging.NewConnectionLogger(pool.Handler, pool.ID, id) + if pool.Handler != nil && pool.ConnectionConfig.LoggingEnabled { + logger = logging.NewConnectionLogger( + logging.WrapOrDefault(pool.ConnectionConfig.LogLevel, pool.Handler), pool.ID, id) } conn, err := transport.NewConnection(id, pool.ConnectionConfig, logger) diff --git a/transport/config.go b/transport/config.go index 0df8403..8885d55 100644 --- a/transport/config.go +++ b/transport/config.go @@ -1,6 +1,7 @@ package transport import ( + "log/slog" "time" ) @@ -11,6 +12,8 @@ type ConnectionConfig struct { WriteTimeout time.Duration IncomingBufferSize int ErrorsBufferSize int + LoggingEnabled bool + LogLevel *slog.Level Retry *RetryConfig } @@ -40,6 +43,8 @@ func GetDefaultConnectionConfig() *ConnectionConfig { WriteTimeout: 30 * time.Second, IncomingBufferSize: 100, ErrorsBufferSize: 10, + LoggingEnabled: true, + LogLevel: nil, Retry: GetDefaultRetryConfig(), } } @@ -178,6 +183,20 @@ func WithErrorsBufferSize(value int) ConnectionOption { } } +func WithLoggingEnabled(value bool) ConnectionOption { + return func(c *ConnectionConfig) error { + c.LoggingEnabled = value + return nil + } +} + +func WithLogLevel(level slog.Level) ConnectionOption { + return func(c *ConnectionConfig) error { + c.LogLevel = &level + return nil + } +} + func WithoutRetry() ConnectionOption { return func(c *ConnectionConfig) error { c.Retry = nil diff --git a/transport/config_test.go b/transport/config_test.go index e116bb6..d72994f 100644 --- a/transport/config_test.go +++ b/transport/config_test.go @@ -2,6 +2,7 @@ package transport import ( "github.com/stretchr/testify/assert" + "log/slog" "testing" "time" ) @@ -17,6 +18,8 @@ func TestNewConnectionConfig(t *testing.T) { WriteTimeout: 30 * time.Second, IncomingBufferSize: 100, ErrorsBufferSize: 10, + LoggingEnabled: true, + LogLevel: nil, Retry: GetDefaultRetryConfig(), }) @@ -38,6 +41,8 @@ func TestDefaultConnectionConfig(t *testing.T) { WriteTimeout: 30 * time.Second, IncomingBufferSize: 100, ErrorsBufferSize: 10, + LoggingEnabled: true, + LogLevel: nil, Retry: GetDefaultRetryConfig(), }) } @@ -61,6 +66,8 @@ func TestApplyConnectionOptions(t *testing.T) { conf, WithIncomingBufferSize(256), WithErrorsBufferSize(100), + WithLoggingEnabled(false), + WithLogLevel(slog.LevelError), WithRetryMaxRetries(0), WithRetryInitialDelay(3*time.Second), WithRetryJitterFactor(0.5), @@ -69,6 +76,8 @@ func TestApplyConnectionOptions(t *testing.T) { assert.NoError(t, err) assert.Equal(t, 256, conf.IncomingBufferSize) assert.Equal(t, 100, conf.ErrorsBufferSize) + assert.False(t, conf.LoggingEnabled) + assert.Equal(t, slog.LevelError, *conf.LogLevel) assert.Equal(t, 0, conf.Retry.MaxRetries) assert.Equal(t, 3*time.Second, conf.Retry.InitialDelay) assert.Equal(t, 0.5, conf.Retry.JitterFactor)