config: flatten ConnectionConfig to value type in PoolConfig
This commit is contained in:
@@ -14,7 +14,7 @@ import (
|
|||||||
type PoolConfig struct {
|
type PoolConfig struct {
|
||||||
InboxBufferSize int
|
InboxBufferSize int
|
||||||
EventsBufferSize int
|
EventsBufferSize int
|
||||||
ConnectionConfig *transport.ConnectionConfig
|
ConnectionConfig transport.ConnectionConfig
|
||||||
WorkerFactory WorkerFactory
|
WorkerFactory WorkerFactory
|
||||||
WorkerConfig *WorkerConfig
|
WorkerConfig *WorkerConfig
|
||||||
}
|
}
|
||||||
@@ -38,7 +38,7 @@ func GetDefaultPoolConfig() *PoolConfig {
|
|||||||
return &PoolConfig{
|
return &PoolConfig{
|
||||||
InboxBufferSize: 256,
|
InboxBufferSize: 256,
|
||||||
EventsBufferSize: 10,
|
EventsBufferSize: 10,
|
||||||
ConnectionConfig: nil,
|
ConnectionConfig: *transport.GetDefaultConnectionConfig(),
|
||||||
WorkerFactory: nil,
|
WorkerFactory: nil,
|
||||||
WorkerConfig: nil,
|
WorkerConfig: nil,
|
||||||
}
|
}
|
||||||
@@ -58,11 +58,9 @@ func applyPoolOptions(config *PoolConfig, options ...PoolOption) error {
|
|||||||
func ValidatePoolConfig(config *PoolConfig) error {
|
func ValidatePoolConfig(config *PoolConfig) error {
|
||||||
var err error
|
var err error
|
||||||
|
|
||||||
if config.ConnectionConfig != nil {
|
err = transport.ValidateConnectionConfig(&config.ConnectionConfig)
|
||||||
err = transport.ValidateConnectionConfig(config.ConnectionConfig)
|
if err != nil {
|
||||||
if err != nil {
|
return err
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if config.WorkerConfig != nil {
|
if config.WorkerConfig != nil {
|
||||||
@@ -104,9 +102,9 @@ func WithEventsBufferSize(value int) PoolOption {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func WithConnectionConfig(cc *transport.ConnectionConfig) PoolOption {
|
func WithConnectionConfig(cc transport.ConnectionConfig) PoolOption {
|
||||||
return func(c *PoolConfig) error {
|
return func(c *PoolConfig) error {
|
||||||
err := transport.ValidateConnectionConfig(cc)
|
err := transport.ValidateConnectionConfig(&cc)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|||||||
+13
-10
@@ -14,7 +14,7 @@ func TestNewPoolConfig(t *testing.T) {
|
|||||||
assert.Equal(t, conf, &PoolConfig{
|
assert.Equal(t, conf, &PoolConfig{
|
||||||
InboxBufferSize: 256,
|
InboxBufferSize: 256,
|
||||||
EventsBufferSize: 10,
|
EventsBufferSize: 10,
|
||||||
ConnectionConfig: nil,
|
ConnectionConfig: *transport.GetDefaultConnectionConfig(),
|
||||||
WorkerConfig: nil,
|
WorkerConfig: nil,
|
||||||
WorkerFactory: nil,
|
WorkerFactory: nil,
|
||||||
})
|
})
|
||||||
@@ -26,7 +26,7 @@ func TestDefaultPoolConfig(t *testing.T) {
|
|||||||
assert.Equal(t, conf, &PoolConfig{
|
assert.Equal(t, conf, &PoolConfig{
|
||||||
InboxBufferSize: 256,
|
InboxBufferSize: 256,
|
||||||
EventsBufferSize: 10,
|
EventsBufferSize: 10,
|
||||||
ConnectionConfig: nil,
|
ConnectionConfig: *transport.GetDefaultConnectionConfig(),
|
||||||
WorkerConfig: nil,
|
WorkerConfig: nil,
|
||||||
WorkerFactory: nil,
|
WorkerFactory: nil,
|
||||||
})
|
})
|
||||||
@@ -36,7 +36,7 @@ func TestApplyPoolOptions(t *testing.T) {
|
|||||||
conf := &PoolConfig{}
|
conf := &PoolConfig{}
|
||||||
err := applyPoolOptions(
|
err := applyPoolOptions(
|
||||||
conf,
|
conf,
|
||||||
WithConnectionConfig(&transport.ConnectionConfig{
|
WithConnectionConfig(transport.ConnectionConfig{
|
||||||
Retry: transport.RetryConfig{Disabled: true},
|
Retry: transport.RetryConfig{Disabled: true},
|
||||||
}),
|
}),
|
||||||
)
|
)
|
||||||
@@ -59,19 +59,18 @@ func TestWithBufferSizes(t *testing.T) {
|
|||||||
|
|
||||||
func TestWithConnectionConfig(t *testing.T) {
|
func TestWithConnectionConfig(t *testing.T) {
|
||||||
conf := &PoolConfig{}
|
conf := &PoolConfig{}
|
||||||
opt := WithConnectionConfig(&transport.ConnectionConfig{
|
opt := WithConnectionConfig(transport.ConnectionConfig{
|
||||||
WriteTimeout: 1 * time.Second,
|
WriteTimeout: 1 * time.Second,
|
||||||
Retry: transport.RetryConfig{Disabled: true},
|
Retry: transport.RetryConfig{Disabled: true},
|
||||||
})
|
})
|
||||||
err := applyPoolOptions(conf, opt)
|
err := applyPoolOptions(conf, opt)
|
||||||
assert.NoError(t, err)
|
assert.NoError(t, err)
|
||||||
assert.NotNil(t, conf.ConnectionConfig)
|
|
||||||
assert.Equal(t, 1*time.Second, conf.ConnectionConfig.WriteTimeout)
|
assert.Equal(t, 1*time.Second, conf.ConnectionConfig.WriteTimeout)
|
||||||
|
|
||||||
// invalid config is rejected
|
// invalid config is rejected
|
||||||
conf = &PoolConfig{}
|
conf = &PoolConfig{}
|
||||||
opt = WithConnectionConfig(
|
opt = WithConnectionConfig(
|
||||||
&transport.ConnectionConfig{
|
transport.ConnectionConfig{
|
||||||
WriteTimeout: -1 * time.Second,
|
WriteTimeout: -1 * time.Second,
|
||||||
Retry: transport.RetryConfig{Disabled: true},
|
Retry: transport.RetryConfig{Disabled: true},
|
||||||
})
|
})
|
||||||
@@ -87,8 +86,12 @@ func TestValidatePoolConfig(t *testing.T) {
|
|||||||
wantErrText string
|
wantErrText string
|
||||||
}{
|
}{
|
||||||
{
|
{
|
||||||
name: "valid empty",
|
name: "valid empty (retry disabled)",
|
||||||
conf: *&PoolConfig{},
|
conf: PoolConfig{
|
||||||
|
ConnectionConfig: transport.ConnectionConfig{
|
||||||
|
Retry: transport.RetryConfig{Disabled: true},
|
||||||
|
},
|
||||||
|
},
|
||||||
},
|
},
|
||||||
{
|
{
|
||||||
name: "valid defaults",
|
name: "valid defaults",
|
||||||
@@ -97,7 +100,7 @@ func TestValidatePoolConfig(t *testing.T) {
|
|||||||
{
|
{
|
||||||
name: "valid complete",
|
name: "valid complete",
|
||||||
conf: PoolConfig{
|
conf: PoolConfig{
|
||||||
ConnectionConfig: &transport.ConnectionConfig{
|
ConnectionConfig: transport.ConnectionConfig{
|
||||||
Retry: transport.RetryConfig{Disabled: true},
|
Retry: transport.RetryConfig{Disabled: true},
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
@@ -105,7 +108,7 @@ func TestValidatePoolConfig(t *testing.T) {
|
|||||||
{
|
{
|
||||||
name: "invalid connection config",
|
name: "invalid connection config",
|
||||||
conf: PoolConfig{
|
conf: PoolConfig{
|
||||||
ConnectionConfig: &transport.ConnectionConfig{
|
ConnectionConfig: transport.ConnectionConfig{
|
||||||
Retry: transport.RetryConfig{
|
Retry: transport.RetryConfig{
|
||||||
InitialDelay: 10 * time.Second,
|
InitialDelay: 10 * time.Second,
|
||||||
MaxDelay: 1 * time.Second,
|
MaxDelay: 1 * time.Second,
|
||||||
|
|||||||
@@ -57,7 +57,7 @@ type PoolPlugin struct {
|
|||||||
Events chan<- PoolEvent
|
Events chan<- PoolEvent
|
||||||
InboxCounter *atomic.Uint64
|
InboxCounter *atomic.Uint64
|
||||||
Dialer types.Dialer
|
Dialer types.Dialer
|
||||||
ConnectionConfig *transport.ConnectionConfig
|
ConnectionConfig transport.ConnectionConfig
|
||||||
}
|
}
|
||||||
|
|
||||||
// ----------------------------------------------------------------------------
|
// ----------------------------------------------------------------------------
|
||||||
@@ -264,6 +264,7 @@ func (p *Pool) Connect(id string) error {
|
|||||||
InboxCounter: p.inboxCounter,
|
InboxCounter: p.inboxCounter,
|
||||||
Dialer: p.dialer,
|
Dialer: p.dialer,
|
||||||
ConnectionConfig: p.config.ConnectionConfig,
|
ConnectionConfig: p.config.ConnectionConfig,
|
||||||
|
// ConnectionConfig is assigned by value — each worker gets its own copy
|
||||||
}
|
}
|
||||||
|
|
||||||
p.wg.Go(func() {
|
p.wg.Go(func() {
|
||||||
|
|||||||
@@ -334,7 +334,8 @@ func connect(
|
|||||||
pool PoolPlugin,
|
pool PoolPlugin,
|
||||||
handler slog.Handler,
|
handler slog.Handler,
|
||||||
) (*transport.Connection, error) {
|
) (*transport.Connection, error) {
|
||||||
conn, err := transport.NewConnection(ctx, id, pool.ConnectionConfig, handler)
|
cc := pool.ConnectionConfig
|
||||||
|
conn, err := transport.NewConnection(ctx, id, &cc, handler)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|||||||
+7
-6
@@ -25,9 +25,10 @@ func makeWorkerContext(t *testing.T) (
|
|||||||
inbox = make(chan types.InboxMessage, 256)
|
inbox = make(chan types.InboxMessage, 256)
|
||||||
events = make(chan PoolEvent, 10)
|
events = make(chan PoolEvent, 10)
|
||||||
pool = PoolPlugin{
|
pool = PoolPlugin{
|
||||||
Inbox: inbox,
|
Inbox: inbox,
|
||||||
Events: events,
|
Events: events,
|
||||||
InboxCounter: &atomic.Uint64{},
|
InboxCounter: &atomic.Uint64{},
|
||||||
|
ConnectionConfig: *transport.GetDefaultConnectionConfig(),
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@@ -95,7 +96,7 @@ func TestWorkerSession(t *testing.T) {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
cc, _ := transport.NewConnectionConfig(transport.WithRetryDisabled())
|
cc, _ := transport.NewConnectionConfig(transport.WithRetryDisabled())
|
||||||
pool.ConnectionConfig = cc
|
pool.ConnectionConfig = *cc
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
wg.Go(func() { w.Start(pool) })
|
wg.Go(func() { w.Start(pool) })
|
||||||
@@ -151,7 +152,7 @@ func TestWorkerSession(t *testing.T) {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
cc, _ := transport.NewConnectionConfig(transport.WithRetryDisabled())
|
cc, _ := transport.NewConnectionConfig(transport.WithRetryDisabled())
|
||||||
pool.ConnectionConfig = cc
|
pool.ConnectionConfig = *cc
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
wg.Go(func() { w.Start(pool) })
|
wg.Go(func() { w.Start(pool) })
|
||||||
@@ -176,7 +177,7 @@ func TestWorkerSession(t *testing.T) {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
cc, _ := transport.NewConnectionConfig(transport.WithRetryDisabled())
|
cc, _ := transport.NewConnectionConfig(transport.WithRetryDisabled())
|
||||||
pool.ConnectionConfig = cc
|
pool.ConnectionConfig = *cc
|
||||||
|
|
||||||
var wg sync.WaitGroup
|
var wg sync.WaitGroup
|
||||||
wg.Go(func() { w.Start(pool) })
|
wg.Go(func() { w.Start(pool) })
|
||||||
|
|||||||
Reference in New Issue
Block a user