From c82e0184f5c68e211e87cab5fa5e39c8c186378f Mon Sep 17 00:00:00 2001 From: Jay Date: Tue, 26 May 2026 14:11:03 -0400 Subject: [PATCH] config: flatten ConnectionConfig to value type in PoolConfig --- config.go | 16 +++++++--------- config_pool_test.go | 23 +++++++++++++---------- pool.go | 3 ++- worker.go | 3 ++- worker_test.go | 13 +++++++------ 5 files changed, 31 insertions(+), 27 deletions(-) diff --git a/config.go b/config.go index 396b1e7..4c6e70c 100644 --- a/config.go +++ b/config.go @@ -14,7 +14,7 @@ import ( type PoolConfig struct { InboxBufferSize int EventsBufferSize int - ConnectionConfig *transport.ConnectionConfig + ConnectionConfig transport.ConnectionConfig WorkerFactory WorkerFactory WorkerConfig *WorkerConfig } @@ -38,7 +38,7 @@ func GetDefaultPoolConfig() *PoolConfig { return &PoolConfig{ InboxBufferSize: 256, EventsBufferSize: 10, - ConnectionConfig: nil, + ConnectionConfig: *transport.GetDefaultConnectionConfig(), WorkerFactory: nil, WorkerConfig: nil, } @@ -58,11 +58,9 @@ func applyPoolOptions(config *PoolConfig, options ...PoolOption) error { func ValidatePoolConfig(config *PoolConfig) error { var err error - if config.ConnectionConfig != nil { - err = transport.ValidateConnectionConfig(config.ConnectionConfig) - if err != nil { - return err - } + err = transport.ValidateConnectionConfig(&config.ConnectionConfig) + if err != nil { + return err } if config.WorkerConfig != nil { @@ -104,9 +102,9 @@ func WithEventsBufferSize(value int) PoolOption { } } -func WithConnectionConfig(cc *transport.ConnectionConfig) PoolOption { +func WithConnectionConfig(cc transport.ConnectionConfig) PoolOption { return func(c *PoolConfig) error { - err := transport.ValidateConnectionConfig(cc) + err := transport.ValidateConnectionConfig(&cc) if err != nil { return err } diff --git a/config_pool_test.go b/config_pool_test.go index 0a93e2a..6a6aebc 100644 --- a/config_pool_test.go +++ b/config_pool_test.go @@ -14,7 +14,7 @@ func TestNewPoolConfig(t *testing.T) { assert.Equal(t, conf, &PoolConfig{ InboxBufferSize: 256, EventsBufferSize: 10, - ConnectionConfig: nil, + ConnectionConfig: *transport.GetDefaultConnectionConfig(), WorkerConfig: nil, WorkerFactory: nil, }) @@ -26,7 +26,7 @@ func TestDefaultPoolConfig(t *testing.T) { assert.Equal(t, conf, &PoolConfig{ InboxBufferSize: 256, EventsBufferSize: 10, - ConnectionConfig: nil, + ConnectionConfig: *transport.GetDefaultConnectionConfig(), WorkerConfig: nil, WorkerFactory: nil, }) @@ -36,7 +36,7 @@ func TestApplyPoolOptions(t *testing.T) { conf := &PoolConfig{} err := applyPoolOptions( conf, - WithConnectionConfig(&transport.ConnectionConfig{ + WithConnectionConfig(transport.ConnectionConfig{ Retry: transport.RetryConfig{Disabled: true}, }), ) @@ -59,19 +59,18 @@ func TestWithBufferSizes(t *testing.T) { func TestWithConnectionConfig(t *testing.T) { conf := &PoolConfig{} - opt := WithConnectionConfig(&transport.ConnectionConfig{ + opt := WithConnectionConfig(transport.ConnectionConfig{ WriteTimeout: 1 * time.Second, Retry: transport.RetryConfig{Disabled: true}, }) err := applyPoolOptions(conf, opt) assert.NoError(t, err) - assert.NotNil(t, conf.ConnectionConfig) assert.Equal(t, 1*time.Second, conf.ConnectionConfig.WriteTimeout) // invalid config is rejected conf = &PoolConfig{} opt = WithConnectionConfig( - &transport.ConnectionConfig{ + transport.ConnectionConfig{ WriteTimeout: -1 * time.Second, Retry: transport.RetryConfig{Disabled: true}, }) @@ -87,8 +86,12 @@ func TestValidatePoolConfig(t *testing.T) { wantErrText string }{ { - name: "valid empty", - conf: *&PoolConfig{}, + name: "valid empty (retry disabled)", + conf: PoolConfig{ + ConnectionConfig: transport.ConnectionConfig{ + Retry: transport.RetryConfig{Disabled: true}, + }, + }, }, { name: "valid defaults", @@ -97,7 +100,7 @@ func TestValidatePoolConfig(t *testing.T) { { name: "valid complete", conf: PoolConfig{ - ConnectionConfig: &transport.ConnectionConfig{ + ConnectionConfig: transport.ConnectionConfig{ Retry: transport.RetryConfig{Disabled: true}, }, }, @@ -105,7 +108,7 @@ func TestValidatePoolConfig(t *testing.T) { { name: "invalid connection config", conf: PoolConfig{ - ConnectionConfig: &transport.ConnectionConfig{ + ConnectionConfig: transport.ConnectionConfig{ Retry: transport.RetryConfig{ InitialDelay: 10 * time.Second, MaxDelay: 1 * time.Second, diff --git a/pool.go b/pool.go index d7c17df..6139cc9 100644 --- a/pool.go +++ b/pool.go @@ -57,7 +57,7 @@ type PoolPlugin struct { Events chan<- PoolEvent InboxCounter *atomic.Uint64 Dialer types.Dialer - ConnectionConfig *transport.ConnectionConfig + ConnectionConfig transport.ConnectionConfig } // ---------------------------------------------------------------------------- @@ -264,6 +264,7 @@ func (p *Pool) Connect(id string) error { InboxCounter: p.inboxCounter, Dialer: p.dialer, ConnectionConfig: p.config.ConnectionConfig, + // ConnectionConfig is assigned by value — each worker gets its own copy } p.wg.Go(func() { diff --git a/worker.go b/worker.go index 24b207c..a95dc28 100644 --- a/worker.go +++ b/worker.go @@ -334,7 +334,8 @@ func connect( pool PoolPlugin, handler slog.Handler, ) (*transport.Connection, error) { - conn, err := transport.NewConnection(ctx, id, pool.ConnectionConfig, handler) + cc := pool.ConnectionConfig + conn, err := transport.NewConnection(ctx, id, &cc, handler) if err != nil { return nil, err } diff --git a/worker_test.go b/worker_test.go index 658f1f3..5054f4f 100644 --- a/worker_test.go +++ b/worker_test.go @@ -25,9 +25,10 @@ func makeWorkerContext(t *testing.T) ( inbox = make(chan types.InboxMessage, 256) events = make(chan PoolEvent, 10) pool = PoolPlugin{ - Inbox: inbox, - Events: events, - InboxCounter: &atomic.Uint64{}, + Inbox: inbox, + Events: events, + InboxCounter: &atomic.Uint64{}, + ConnectionConfig: *transport.GetDefaultConnectionConfig(), } return } @@ -95,7 +96,7 @@ func TestWorkerSession(t *testing.T) { }, } cc, _ := transport.NewConnectionConfig(transport.WithRetryDisabled()) - pool.ConnectionConfig = cc + pool.ConnectionConfig = *cc var wg sync.WaitGroup wg.Go(func() { w.Start(pool) }) @@ -151,7 +152,7 @@ func TestWorkerSession(t *testing.T) { }, } cc, _ := transport.NewConnectionConfig(transport.WithRetryDisabled()) - pool.ConnectionConfig = cc + pool.ConnectionConfig = *cc var wg sync.WaitGroup wg.Go(func() { w.Start(pool) }) @@ -176,7 +177,7 @@ func TestWorkerSession(t *testing.T) { }, } cc, _ := transport.NewConnectionConfig(transport.WithRetryDisabled()) - pool.ConnectionConfig = cc + pool.ConnectionConfig = *cc var wg sync.WaitGroup wg.Go(func() { w.Start(pool) })