Add granunal logging config controls.

This commit is contained in:
Jay
2026-04-23 18:48:47 -04:00
parent 727dc18b57
commit 2a6cd3a487
10 changed files with 248 additions and 101 deletions
+118 -82
View File
@@ -8,88 +8,6 @@ import (
"time" "time"
) )
// Worker Config
type WorkerConfig struct {
MaxQueueSize int
InactivityTimeout time.Duration
}
type WorkerOption func(*WorkerConfig) error
func NewWorkerConfig(options ...WorkerOption) (*WorkerConfig, error) {
conf := GetDefaultWorkerConfig()
if err := applyWorkerOptions(conf, options...); err != nil {
return nil, err
}
if err := ValidateWorkerConfig(conf); err != nil {
return nil, err
}
return conf, nil
}
func GetDefaultWorkerConfig() *WorkerConfig {
return &WorkerConfig{
MaxQueueSize: 0, // queue can grow indefinitely by default
InactivityTimeout: 0, // eviction disabled by default
}
}
func applyWorkerOptions(config *WorkerConfig, options ...WorkerOption) error {
for _, option := range options {
if err := option(config); err != nil {
return err
}
}
return nil
}
func ValidateWorkerConfig(config *WorkerConfig) error {
if err := validateMaxQueueSize(config.MaxQueueSize); err != nil {
return err
}
if err := validateInactivityTimeout(config.InactivityTimeout); err != nil {
return err
}
return nil
}
func validateMaxQueueSize(value int) error {
if value < 0 {
return InvalidMaxQueueSize
}
return nil
}
func validateInactivityTimeout(value time.Duration) error {
if value < 0 {
return InvalidInactivityTimeout
}
return nil
}
// When MaxQueueSize is zero, queue limits are disabled.
func WithMaxQueueSize(value int) WorkerOption {
return func(c *WorkerConfig) error {
if err := validateMaxQueueSize(value); err != nil {
return err
}
c.MaxQueueSize = value
return nil
}
}
// When InactivityTimeout is zero, the watchdog is disabled.
func WithInactivityTimeout(value time.Duration) WorkerOption {
return func(c *WorkerConfig) error {
if err := validateInactivityTimeout(value); err != nil {
return err
}
c.InactivityTimeout = value
return nil
}
}
// Pool Config // Pool Config
type WorkerFactory func( type WorkerFactory func(
@@ -104,6 +22,8 @@ type PoolConfig struct {
InboxBufferSize int InboxBufferSize int
EventsBufferSize int EventsBufferSize int
ErrorsBufferSize int ErrorsBufferSize int
LoggingEnabled bool
LogLevel *slog.Level
ConnectionConfig *transport.ConnectionConfig ConnectionConfig *transport.ConnectionConfig
WorkerConfig *WorkerConfig WorkerConfig *WorkerConfig
WorkerFactory WorkerFactory WorkerFactory WorkerFactory
@@ -127,6 +47,8 @@ func GetDefaultPoolConfig() *PoolConfig {
InboxBufferSize: 256, InboxBufferSize: 256,
EventsBufferSize: 10, EventsBufferSize: 10,
ErrorsBufferSize: 10, ErrorsBufferSize: 10,
LoggingEnabled: true,
LogLevel: nil,
ConnectionConfig: nil, ConnectionConfig: nil,
WorkerConfig: nil, WorkerConfig: nil,
WorkerFactory: nil, WorkerFactory: nil,
@@ -193,6 +115,20 @@ func WithErrorsBufferSize(value int) PoolOption {
} }
} }
func WithPoolLoggingEnabled(value bool) PoolOption {
return func(c *PoolConfig) error {
c.LoggingEnabled = value
return nil
}
}
func WithPoolLogLevel(level slog.Level) PoolOption {
return func(c *PoolConfig) error {
c.LogLevel = &level
return nil
}
}
func WithConnectionConfig(cc *transport.ConnectionConfig) PoolOption { func WithConnectionConfig(cc *transport.ConnectionConfig) PoolOption {
return func(c *PoolConfig) error { return func(c *PoolConfig) error {
if err := transport.ValidateConnectionConfig(cc); err != nil { if err := transport.ValidateConnectionConfig(cc); err != nil {
@@ -219,3 +155,103 @@ func WithWorkerFactory(wf WorkerFactory) PoolOption {
return nil return nil
} }
} }
// Worker Config
type WorkerConfig struct {
MaxQueueSize int
InactivityTimeout time.Duration
LoggingEnabled bool
LogLevel *slog.Level
}
type WorkerOption func(*WorkerConfig) error
func NewWorkerConfig(options ...WorkerOption) (*WorkerConfig, error) {
conf := GetDefaultWorkerConfig()
if err := applyWorkerOptions(conf, options...); err != nil {
return nil, err
}
if err := ValidateWorkerConfig(conf); err != nil {
return nil, err
}
return conf, nil
}
func GetDefaultWorkerConfig() *WorkerConfig {
return &WorkerConfig{
MaxQueueSize: 0, // queue can grow indefinitely by default
InactivityTimeout: 0, // eviction disabled by default
LoggingEnabled: true,
LogLevel: nil,
}
}
func applyWorkerOptions(config *WorkerConfig, options ...WorkerOption) error {
for _, option := range options {
if err := option(config); err != nil {
return err
}
}
return nil
}
func ValidateWorkerConfig(config *WorkerConfig) error {
if err := validateMaxQueueSize(config.MaxQueueSize); err != nil {
return err
}
if err := validateInactivityTimeout(config.InactivityTimeout); err != nil {
return err
}
return nil
}
func validateMaxQueueSize(value int) error {
if value < 0 {
return InvalidMaxQueueSize
}
return nil
}
func validateInactivityTimeout(value time.Duration) error {
if value < 0 {
return InvalidInactivityTimeout
}
return nil
}
// When MaxQueueSize is zero, queue limits are disabled.
func WithMaxQueueSize(value int) WorkerOption {
return func(c *WorkerConfig) error {
if err := validateMaxQueueSize(value); err != nil {
return err
}
c.MaxQueueSize = value
return nil
}
}
// When InactivityTimeout is zero, the watchdog is disabled.
func WithInactivityTimeout(value time.Duration) WorkerOption {
return func(c *WorkerConfig) error {
if err := validateInactivityTimeout(value); err != nil {
return err
}
c.InactivityTimeout = value
return nil
}
}
func WithWorkerLoggingEnabled(value bool) WorkerOption {
return func(c *WorkerConfig) error {
c.LoggingEnabled = value
return nil
}
}
func WithWorkerLogLevel(level slog.Level) WorkerOption {
return func(c *WorkerConfig) error {
c.LogLevel = &level
return nil
}
}
+4
View File
@@ -19,6 +19,8 @@ func TestDefaultWorkerConfig(t *testing.T) {
assert.Equal(t, &WorkerConfig{ assert.Equal(t, &WorkerConfig{
MaxQueueSize: 0, MaxQueueSize: 0,
InactivityTimeout: 0, InactivityTimeout: 0,
LoggingEnabled: true,
LogLevel: nil,
}, conf) }, conf)
} }
@@ -104,6 +106,8 @@ func TestDefaultPoolConfig(t *testing.T) {
InboxBufferSize: 256, InboxBufferSize: 256,
EventsBufferSize: 10, EventsBufferSize: 10,
ErrorsBufferSize: 10, ErrorsBufferSize: 10,
LoggingEnabled: true,
LogLevel: nil,
ConnectionConfig: nil, ConnectionConfig: nil,
WorkerConfig: nil, WorkerConfig: nil,
WorkerFactory: nil, WorkerFactory: nil,
+10 -8
View File
@@ -90,7 +90,6 @@ func NewPool(ctx context.Context, id string, config *PoolConfig, handler slog.Ha
// The factory function should be non-blocking or else Connect() may cause // The factory function should be non-blocking or else Connect() may cause
// deadlocks. // deadlocks.
if config.WorkerFactory == nil { if config.WorkerFactory == nil {
// TODO: Construct worker logger
config.WorkerFactory = func( config.WorkerFactory = func(
ctx context.Context, ctx context.Context,
id string, id string,
@@ -109,8 +108,9 @@ func NewPool(ctx context.Context, id string, config *PoolConfig, handler slog.Ha
pctx, cancel := context.WithCancel(ctx) pctx, cancel := context.WithCancel(ctx)
var logger *slog.Logger var logger *slog.Logger
if handler != nil { if handler != nil && config.LoggingEnabled {
logger = logging.NewInboundPoolLogger(handler, id) logger = logging.NewInboundPoolLogger(
logging.WrapOrDefault(config.LogLevel, handler), id)
} }
return &Pool{ return &Pool{
@@ -249,8 +249,9 @@ func (p *Pool) Send(id string, data []byte) error {
// addLocked constructs and registers a peer. Caller must hold p.mu write lock. // addLocked constructs and registers a peer. Caller must hold p.mu write lock.
func (p *Pool) addLocked(id string, socket types.Socket) error { func (p *Pool) addLocked(id string, socket types.Socket) error {
var logger *slog.Logger var logger *slog.Logger
if p.handler != nil { if p.handler != nil && p.config.ConnectionConfig.LoggingEnabled {
logger = logging.NewConnectionLogger(p.handler, p.id, id) logger = logging.NewConnectionLogger(
logging.WrapOrDefault(p.config.ConnectionConfig.LogLevel, p.handler), p.id, id)
} }
conn, err := transport.NewConnectionFromSocket( conn, err := transport.NewConnectionFromSocket(
@@ -259,12 +260,13 @@ func (p *Pool) addLocked(id string, socket types.Socket) error {
return err return err
} }
// The worker factory must be non-blocking to avoid deadlocks
wctx, cancel := context.WithCancel(p.ctx) wctx, cancel := context.WithCancel(p.ctx)
if p.handler != nil { if p.handler != nil && p.config.WorkerConfig.LoggingEnabled {
logger = logging.NewInboundWorkerLogger(p.handler, p.id, id) logger = logging.NewInboundWorkerLogger(
logging.WrapOrDefault(p.config.WorkerConfig.LogLevel, p.handler), p.id, id)
} }
// The worker factory must be non-blocking to avoid deadlocks
worker, err := p.config.WorkerFactory(wctx, id, conn, p.config.WorkerConfig, logger) worker, err := p.config.WorkerFactory(wctx, id, conn, p.config.WorkerConfig, logger)
if err != nil { if err != nil {
cancel() cancel()
+39 -5
View File
@@ -1,6 +1,7 @@
package logging package logging
import ( import (
"context"
"log/slog" "log/slog"
) )
@@ -21,7 +22,7 @@ const COMPONENT_INBOUND_WORKER = "inbound_worker"
const COMPONENT_CONNECTION = "connection" const COMPONENT_CONNECTION = "connection"
// Outbound loggers // Constructors
func NewOutboundPoolLogger(handler slog.Handler, poolID string) *slog.Logger { func NewOutboundPoolLogger(handler slog.Handler, poolID string) *slog.Logger {
return newLogger(handler, return newLogger(handler,
@@ -40,8 +41,6 @@ func NewOutboundWorkerLogger(handler slog.Handler, poolID string, peerID string)
) )
} }
// Inbound loggers
func NewInboundPoolLogger(handler slog.Handler, poolID string) *slog.Logger { func NewInboundPoolLogger(handler slog.Handler, poolID string) *slog.Logger {
return newLogger(handler, return newLogger(handler,
KEY_MODULE, MODULE_NAME, KEY_MODULE, MODULE_NAME,
@@ -59,8 +58,6 @@ func NewInboundWorkerLogger(handler slog.Handler, poolID string, peerID string)
) )
} }
// Connection logger
func NewConnectionLogger(handler slog.Handler, poolID string, peerID string) *slog.Logger { func NewConnectionLogger(handler slog.Handler, poolID string, peerID string) *slog.Logger {
return newLogger(handler, return newLogger(handler,
KEY_MODULE, MODULE_NAME, KEY_MODULE, MODULE_NAME,
@@ -75,3 +72,40 @@ func NewConnectionLogger(handler slog.Handler, poolID string, peerID string) *sl
func newLogger(handler slog.Handler, attrs ...any) *slog.Logger { func newLogger(handler slog.Handler, attrs ...any) *slog.Logger {
return slog.New(handler).With(attrs...) return slog.New(handler).With(attrs...)
} }
// Handlers
type ForcedLevelHandler struct {
level slog.Level
next slog.Handler
}
func NewForcedLevelHandler(level slog.Level, next slog.Handler) slog.Handler {
return &ForcedLevelHandler{
level: level,
next: next,
}
}
func (h *ForcedLevelHandler) Enabled(_ context.Context, l slog.Level) bool {
return l >= h.level
}
func (h *ForcedLevelHandler) Handle(ctx context.Context, r slog.Record) error {
return h.next.Handle(ctx, r)
}
func (h *ForcedLevelHandler) WithAttrs(attrs []slog.Attr) slog.Handler {
return &ForcedLevelHandler{next: h.next.WithAttrs(attrs)}
}
func (h *ForcedLevelHandler) WithGroup(name string) slog.Handler {
return &ForcedLevelHandler{next: h.next.WithGroup(name)}
}
func WrapOrDefault(level *slog.Level, handler slog.Handler) slog.Handler {
if level != nil {
return NewForcedLevelHandler(*level, handler)
}
return handler
}
+36
View File
@@ -21,6 +21,8 @@ type PoolConfig struct {
InboxBufferSize int InboxBufferSize int
EventsBufferSize int EventsBufferSize int
ErrorsBufferSize int ErrorsBufferSize int
LoggingEnabled bool
LogLevel *slog.Level
ConnectionConfig *transport.ConnectionConfig ConnectionConfig *transport.ConnectionConfig
WorkerFactory WorkerFactory WorkerFactory WorkerFactory
WorkerConfig *WorkerConfig WorkerConfig *WorkerConfig
@@ -44,6 +46,8 @@ func GetDefaultPoolConfig() *PoolConfig {
InboxBufferSize: 256, InboxBufferSize: 256,
EventsBufferSize: 10, EventsBufferSize: 10,
ErrorsBufferSize: 10, ErrorsBufferSize: 10,
LoggingEnabled: true,
LogLevel: nil,
ConnectionConfig: nil, ConnectionConfig: nil,
WorkerFactory: nil, WorkerFactory: nil,
WorkerConfig: nil, WorkerConfig: nil,
@@ -116,6 +120,20 @@ func WithErrorsBufferSize(value int) PoolOption {
} }
} }
func WithPoolLoggingEnabled(value bool) PoolOption {
return func(c *PoolConfig) error {
c.LoggingEnabled = value
return nil
}
}
func WithPoolLogLevel(level slog.Level) PoolOption {
return func(c *PoolConfig) error {
c.LogLevel = &level
return nil
}
}
func WithConnectionConfig(cc *transport.ConnectionConfig) PoolOption { func WithConnectionConfig(cc *transport.ConnectionConfig) PoolOption {
return func(c *PoolConfig) error { return func(c *PoolConfig) error {
err := transport.ValidateConnectionConfig(cc) err := transport.ValidateConnectionConfig(cc)
@@ -150,6 +168,8 @@ func WithWorkerFactory(wf WorkerFactory) PoolOption {
type WorkerConfig struct { type WorkerConfig struct {
KeepaliveTimeout time.Duration KeepaliveTimeout time.Duration
MaxQueueSize int MaxQueueSize int
LoggingEnabled bool
LogLevel *slog.Level
} }
type WorkerOption func(*WorkerConfig) error type WorkerOption func(*WorkerConfig) error
@@ -169,6 +189,8 @@ func GetDefaultWorkerConfig() *WorkerConfig {
return &WorkerConfig{ return &WorkerConfig{
KeepaliveTimeout: 20 * time.Second, KeepaliveTimeout: 20 * time.Second,
MaxQueueSize: 0, // disabled by default MaxQueueSize: 0, // disabled by default
LoggingEnabled: true,
LogLevel: nil,
} }
} }
@@ -232,3 +254,17 @@ func WithMaxQueueSize(value int) WorkerOption {
return nil return nil
} }
} }
func WithWorkerLoggingEnabled(value bool) WorkerOption {
return func(c *WorkerConfig) error {
c.LoggingEnabled = value
return nil
}
}
func WithWorkerLogLevel(level slog.Level) WorkerOption {
return func(c *WorkerConfig) error {
c.LogLevel = &level
return nil
}
}
+4
View File
@@ -15,6 +15,8 @@ func TestNewPoolConfig(t *testing.T) {
InboxBufferSize: 256, InboxBufferSize: 256,
EventsBufferSize: 10, EventsBufferSize: 10,
ErrorsBufferSize: 10, ErrorsBufferSize: 10,
LoggingEnabled: true,
LogLevel: nil,
ConnectionConfig: nil, ConnectionConfig: nil,
WorkerConfig: nil, WorkerConfig: nil,
WorkerFactory: nil, WorkerFactory: nil,
@@ -28,6 +30,8 @@ func TestDefaultPoolConfig(t *testing.T) {
InboxBufferSize: 256, InboxBufferSize: 256,
EventsBufferSize: 10, EventsBufferSize: 10,
ErrorsBufferSize: 10, ErrorsBufferSize: 10,
LoggingEnabled: true,
LogLevel: nil,
ConnectionConfig: nil, ConnectionConfig: nil,
WorkerConfig: nil, WorkerConfig: nil,
WorkerFactory: nil, WorkerFactory: nil,
+6 -4
View File
@@ -95,8 +95,9 @@ func NewPool(ctx context.Context, id string, config *PoolConfig, handler slog.Ha
pctx, cancel := context.WithCancel(ctx) pctx, cancel := context.WithCancel(ctx)
var logger *slog.Logger var logger *slog.Logger
if handler != nil { if handler != nil && config.LoggingEnabled {
logger = logging.NewOutboundPoolLogger(handler, id) logger = logging.NewOutboundPoolLogger(
logging.WrapOrDefault(config.LogLevel, handler), id)
} }
return &Pool{ return &Pool{
@@ -185,8 +186,9 @@ func (p *Pool) Connect(id string) error {
} }
var logger *slog.Logger var logger *slog.Logger
if p.handler != nil { if p.handler != nil && p.config.WorkerConfig.LoggingEnabled {
logger = logging.NewOutboundWorkerLogger(p.handler, p.id, id) logger = logging.NewOutboundWorkerLogger(
logging.WrapOrDefault(p.config.WorkerConfig.LogLevel, p.handler), p.id, id)
} }
// The worker factory must be non-blocking to avoid deadlocks // The worker factory must be non-blocking to avoid deadlocks
+3 -2
View File
@@ -338,8 +338,9 @@ func connect(
pool PoolPlugin, pool PoolPlugin,
) (*transport.Connection, error) { ) (*transport.Connection, error) {
var logger *slog.Logger var logger *slog.Logger
if pool.Handler != nil { if pool.Handler != nil && pool.ConnectionConfig.LoggingEnabled {
logger = logging.NewConnectionLogger(pool.Handler, pool.ID, id) logger = logging.NewConnectionLogger(
logging.WrapOrDefault(pool.ConnectionConfig.LogLevel, pool.Handler), pool.ID, id)
} }
conn, err := transport.NewConnection(id, pool.ConnectionConfig, logger) conn, err := transport.NewConnection(id, pool.ConnectionConfig, logger)
+19
View File
@@ -1,6 +1,7 @@
package transport package transport
import ( import (
"log/slog"
"time" "time"
) )
@@ -11,6 +12,8 @@ type ConnectionConfig struct {
WriteTimeout time.Duration WriteTimeout time.Duration
IncomingBufferSize int IncomingBufferSize int
ErrorsBufferSize int ErrorsBufferSize int
LoggingEnabled bool
LogLevel *slog.Level
Retry *RetryConfig Retry *RetryConfig
} }
@@ -40,6 +43,8 @@ func GetDefaultConnectionConfig() *ConnectionConfig {
WriteTimeout: 30 * time.Second, WriteTimeout: 30 * time.Second,
IncomingBufferSize: 100, IncomingBufferSize: 100,
ErrorsBufferSize: 10, ErrorsBufferSize: 10,
LoggingEnabled: true,
LogLevel: nil,
Retry: GetDefaultRetryConfig(), Retry: GetDefaultRetryConfig(),
} }
} }
@@ -178,6 +183,20 @@ func WithErrorsBufferSize(value int) ConnectionOption {
} }
} }
func WithLoggingEnabled(value bool) ConnectionOption {
return func(c *ConnectionConfig) error {
c.LoggingEnabled = value
return nil
}
}
func WithLogLevel(level slog.Level) ConnectionOption {
return func(c *ConnectionConfig) error {
c.LogLevel = &level
return nil
}
}
func WithoutRetry() ConnectionOption { func WithoutRetry() ConnectionOption {
return func(c *ConnectionConfig) error { return func(c *ConnectionConfig) error {
c.Retry = nil c.Retry = nil
+9
View File
@@ -2,6 +2,7 @@ package transport
import ( import (
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"log/slog"
"testing" "testing"
"time" "time"
) )
@@ -17,6 +18,8 @@ func TestNewConnectionConfig(t *testing.T) {
WriteTimeout: 30 * time.Second, WriteTimeout: 30 * time.Second,
IncomingBufferSize: 100, IncomingBufferSize: 100,
ErrorsBufferSize: 10, ErrorsBufferSize: 10,
LoggingEnabled: true,
LogLevel: nil,
Retry: GetDefaultRetryConfig(), Retry: GetDefaultRetryConfig(),
}) })
@@ -38,6 +41,8 @@ func TestDefaultConnectionConfig(t *testing.T) {
WriteTimeout: 30 * time.Second, WriteTimeout: 30 * time.Second,
IncomingBufferSize: 100, IncomingBufferSize: 100,
ErrorsBufferSize: 10, ErrorsBufferSize: 10,
LoggingEnabled: true,
LogLevel: nil,
Retry: GetDefaultRetryConfig(), Retry: GetDefaultRetryConfig(),
}) })
} }
@@ -61,6 +66,8 @@ func TestApplyConnectionOptions(t *testing.T) {
conf, conf,
WithIncomingBufferSize(256), WithIncomingBufferSize(256),
WithErrorsBufferSize(100), WithErrorsBufferSize(100),
WithLoggingEnabled(false),
WithLogLevel(slog.LevelError),
WithRetryMaxRetries(0), WithRetryMaxRetries(0),
WithRetryInitialDelay(3*time.Second), WithRetryInitialDelay(3*time.Second),
WithRetryJitterFactor(0.5), WithRetryJitterFactor(0.5),
@@ -69,6 +76,8 @@ func TestApplyConnectionOptions(t *testing.T) {
assert.NoError(t, err) assert.NoError(t, err)
assert.Equal(t, 256, conf.IncomingBufferSize) assert.Equal(t, 256, conf.IncomingBufferSize)
assert.Equal(t, 100, conf.ErrorsBufferSize) assert.Equal(t, 100, conf.ErrorsBufferSize)
assert.False(t, conf.LoggingEnabled)
assert.Equal(t, slog.LevelError, *conf.LogLevel)
assert.Equal(t, 0, conf.Retry.MaxRetries) assert.Equal(t, 0, conf.Retry.MaxRetries)
assert.Equal(t, 3*time.Second, conf.Retry.InitialDelay) assert.Equal(t, 3*time.Second, conf.Retry.InitialDelay)
assert.Equal(t, 0.5, conf.Retry.JitterFactor) assert.Equal(t, 0.5, conf.Retry.JitterFactor)