diff --git a/config.go b/config.go index bc5de23..37a2650 100644 --- a/config.go +++ b/config.go @@ -8,35 +8,42 @@ import ( // Types type CloseHandler func(code int, text string) error +type WorkerFactory func( + id string, + conn *Connection, + onReconnect func() (*Connection, error), +) Worker -// Pool Config +// Initiator Pool Config -type PoolConfig struct { +type InitiatorPoolConfig struct { ConnectionConfig *ConnectionConfig - IdleTimeout time.Duration + WorkerFactory WorkerFactory + WorkerConfig *InitiatorWorkerConfig } -type PoolOption func(*PoolConfig) error +type InitiatorPoolOption func(*InitiatorPoolConfig) error -func NewPoolConfig(options ...PoolOption) (*PoolConfig, error) { - conf := GetDefaultPoolConfig() - if err := applyPoolOptions(conf, options...); err != nil { +func NewInitiatorPoolConfig(options ...InitiatorPoolOption) (*InitiatorPoolConfig, error) { + conf := GetDefaultInitiatorPoolConfig() + if err := applyInitiatorPoolOptions(conf, options...); err != nil { return nil, err } - if err := validatePoolConfig(conf); err != nil { + if err := validateInitiatorPoolConfig(conf); err != nil { return nil, err } return conf, nil } -func GetDefaultPoolConfig() *PoolConfig { - return &PoolConfig{ - IdleTimeout: 20 * time.Second, +func GetDefaultInitiatorPoolConfig() *InitiatorPoolConfig { + return &InitiatorPoolConfig{ ConnectionConfig: nil, + WorkerFactory: nil, + WorkerConfig: nil, } } -func applyPoolOptions(config *PoolConfig, options ...PoolOption) error { +func applyInitiatorPoolOptions(config *InitiatorPoolConfig, options ...InitiatorPoolOption) error { for _, option := range options { if err := option(config); err != nil { return err @@ -45,11 +52,8 @@ func applyPoolOptions(config *PoolConfig, options ...PoolOption) error { return nil } -func validatePoolConfig(config *PoolConfig) error { - err := validateIdleTimeout(config.IdleTimeout) - if err != nil { - return err - } +func validateInitiatorPoolConfig(config *InitiatorPoolConfig) error { + var err error if config.ConnectionConfig != nil { err = validateConnectionConfig(config.ConnectionConfig) @@ -58,30 +62,18 @@ func validatePoolConfig(config *PoolConfig) error { } } - return nil -} - -func validateIdleTimeout(value time.Duration) error { - if value < 0 { - return errors.InvalidIdleTimeout - } - return nil -} - -// When IdleTimeout is set to zero, idle timeouts are disabled. -func WithIdleTimeout(value time.Duration) PoolOption { - return func(c *PoolConfig) error { - err := validateIdleTimeout(value) + if config.WorkerConfig != nil { + err = validateInitiatorWorkerConfig(config.WorkerConfig) if err != nil { return err } - c.IdleTimeout = value - return nil } + + return nil } -func WithConnectionConfig(cc *ConnectionConfig) PoolOption { - return func(c *PoolConfig) error { +func WithInitiatorConnectionConfig(cc *ConnectionConfig) InitiatorPoolOption { + return func(c *InitiatorPoolConfig) error { err := validateConnectionConfig(cc) if err != nil { return err @@ -91,6 +83,32 @@ func WithConnectionConfig(cc *ConnectionConfig) PoolOption { } } +func WithInitiatorWorkerConfig(wc *InitiatorWorkerConfig) InitiatorPoolOption { + return func(c *InitiatorPoolConfig) error { + err := validateInitiatorWorkerConfig(wc) + if err != nil { + return err + } + c.WorkerConfig = wc + return nil + } +} + +func WithInitiatorWorkerFactory(wf WorkerFactory) InitiatorPoolOption { + return func(c *InitiatorPoolConfig) error { + c.WorkerFactory = wf + return nil + } +} + +// Responder Pool Config + +type ResponderPoolConfig struct { + ConnectionConfig *ConnectionConfig + WorkerFactory WorkerFactory + WorkerConfig *ResponderWorkerConfig +} + // Connection Config type ConnectionConfig struct { @@ -310,3 +328,95 @@ func WithRetryJitterFactor(value float64) ConnectionOption { return nil } } + +// Initiator Worker Config + +type InitiatorWorkerConfig struct { + IdleTimeout time.Duration + MaxQueueSize int +} + +type InitiatorWorkerOption func(*InitiatorWorkerConfig) error + +func NewInitiatorWorkerConfig(options ...InitiatorWorkerOption) (*InitiatorWorkerConfig, error) { + conf := GetDefaultInitiatorWorkerConfig() + if err := applyInitiatorWorkerOptions(conf, options...); err != nil { + return nil, err + } + if err := validateInitiatorWorkerConfig(conf); err != nil { + return nil, err + } + return conf, nil +} + +func GetDefaultInitiatorWorkerConfig() *InitiatorWorkerConfig { + return &InitiatorWorkerConfig{ + IdleTimeout: 20 * time.Second, + MaxQueueSize: 0, // disabled by default + } +} + +func applyInitiatorWorkerOptions(config *InitiatorWorkerConfig, options ...InitiatorWorkerOption) error { + for _, option := range options { + if err := option(config); err != nil { + return err + } + } + return nil +} + +func validateInitiatorWorkerConfig(config *InitiatorWorkerConfig) error { + err := validateIdleTimeout(config.IdleTimeout) + if err != nil { + return err + } + + err = validateMaxQueueSize(config.MaxQueueSize) + if err != nil { + return err + } + + return nil +} + +func validateMaxQueueSize(value int) error { + if value < 0 { + return errors.InvalidMaxQueueSize + } + return nil +} + +func validateIdleTimeout(value time.Duration) error { + if value < 0 { + return errors.InvalidIdleTimeout + } + return nil +} + +// When IdleTimeout is set to zero, idle timeouts are disabled. +func WithIdleTimeout(value time.Duration) InitiatorWorkerOption { + return func(c *InitiatorWorkerConfig) error { + err := validateIdleTimeout(value) + if err != nil { + return err + } + c.IdleTimeout = value + return nil + } +} + +// When MaxQueueSize is set to zero, queue limits are disabled. +func WithMaxQueueSize(value int) InitiatorWorkerOption { + return func(c *InitiatorWorkerConfig) error { + err := validateMaxQueueSize(value) + if err != nil { + return err + } + c.MaxQueueSize = value + return nil + } +} + +// Responder Worker Config + +type ResponderWorkerConfig struct{} diff --git a/config_pool_test.go b/config_pool_test.go index 7c066ea..9200c7a 100644 --- a/config_pool_test.go +++ b/config_pool_test.go @@ -1,118 +1,82 @@ package honeybee import ( - "git.wisehodl.dev/jay/go-honeybee/errors" "github.com/stretchr/testify/assert" "testing" "time" ) func TestNewPoolConfig(t *testing.T) { - conf, err := NewPoolConfig() + conf, err := NewInitiatorPoolConfig() assert.NoError(t, err) - assert.Equal(t, conf, &PoolConfig{ - IdleTimeout: 20 * time.Second, + assert.Equal(t, conf, &InitiatorPoolConfig{ ConnectionConfig: nil, + WorkerConfig: nil, + WorkerFactory: nil, }) - - // errors propagate - _, err = NewPoolConfig(WithIdleTimeout(-1)) - assert.Error(t, err) } func TestDefaultPoolConfig(t *testing.T) { - conf := GetDefaultPoolConfig() + conf := GetDefaultInitiatorPoolConfig() - assert.Equal(t, conf, &PoolConfig{ - IdleTimeout: 20 * time.Second, + assert.Equal(t, conf, &InitiatorPoolConfig{ ConnectionConfig: nil, + WorkerConfig: nil, + WorkerFactory: nil, }) } func TestApplyPoolOptions(t *testing.T) { - conf := &PoolConfig{} - err := applyPoolOptions( + conf := &InitiatorPoolConfig{} + err := applyInitiatorPoolOptions( conf, - WithIdleTimeout(15), - WithConnectionConfig(&ConnectionConfig{}), + WithInitiatorConnectionConfig(&ConnectionConfig{}), ) assert.NoError(t, err) - assert.Equal(t, time.Duration(15), conf.IdleTimeout) assert.Equal(t, 0*time.Second, conf.ConnectionConfig.WriteTimeout) - - // errors propagate - err = applyPoolOptions( - conf, - WithIdleTimeout(-1), - ) - - assert.ErrorIs(t, err, errors.InvalidIdleTimeout) -} - -func TestWithIdleTimeout(t *testing.T) { - conf := &PoolConfig{} - opt := WithIdleTimeout(30) - err := applyPoolOptions(conf, opt) - assert.NoError(t, err) - assert.Equal(t, conf.IdleTimeout, time.Duration(30)) - - // zero allowed - conf = &PoolConfig{} - opt = WithIdleTimeout(0) - err = applyPoolOptions(conf, opt) - assert.NoError(t, err) - assert.Equal(t, conf.IdleTimeout, time.Duration(0)) - - // negative disallowed - conf = &PoolConfig{} - opt = WithIdleTimeout(-30) - err = applyPoolOptions(conf, opt) - assert.ErrorIs(t, err, errors.InvalidIdleTimeout) - assert.ErrorContains(t, err, "idle timeout cannot be negative") } func TestWithConnectionConfig(t *testing.T) { - conf := &PoolConfig{} - opt := WithConnectionConfig(&ConnectionConfig{WriteTimeout: 1 * time.Second}) - err := applyPoolOptions(conf, opt) + conf := &InitiatorPoolConfig{} + opt := WithInitiatorConnectionConfig(&ConnectionConfig{WriteTimeout: 1 * time.Second}) + err := applyInitiatorPoolOptions(conf, opt) assert.NoError(t, err) assert.NotNil(t, conf.ConnectionConfig) assert.Equal(t, 1*time.Second, conf.ConnectionConfig.WriteTimeout) // invalid config is rejected - conf = &PoolConfig{} - opt = WithConnectionConfig(&ConnectionConfig{WriteTimeout: -1 * time.Second}) - err = applyPoolOptions(conf, opt) + conf = &InitiatorPoolConfig{} + opt = WithInitiatorConnectionConfig(&ConnectionConfig{WriteTimeout: -1 * time.Second}) + err = applyInitiatorPoolOptions(conf, opt) assert.Error(t, err) } func TestValidatePoolConfig(t *testing.T) { cases := []struct { name string - conf PoolConfig + conf InitiatorPoolConfig wantErr error wantErrText string }{ { name: "valid empty", - conf: *&PoolConfig{}, + conf: *&InitiatorPoolConfig{}, }, { name: "valid defaults", - conf: *GetDefaultPoolConfig(), + conf: *GetDefaultInitiatorPoolConfig(), }, { name: "valid complete", - conf: PoolConfig{ - IdleTimeout: 15 * time.Second, + conf: InitiatorPoolConfig{ ConnectionConfig: &ConnectionConfig{}, }, }, { name: "invalid connection config", - conf: PoolConfig{ + conf: InitiatorPoolConfig{ ConnectionConfig: &ConnectionConfig{ Retry: &RetryConfig{ InitialDelay: 10 * time.Second, @@ -126,7 +90,7 @@ func TestValidatePoolConfig(t *testing.T) { for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { - err := validatePoolConfig(&tc.conf) + err := validateInitiatorPoolConfig(&tc.conf) if tc.wantErr != nil || tc.wantErrText != "" { if tc.wantErr != nil { diff --git a/errors/errors.go b/errors/errors.go index 9e38c89..d768a9f 100644 --- a/errors/errors.go +++ b/errors/errors.go @@ -14,6 +14,7 @@ var ( InvalidRetryInitialDelay = errors.New("initial delay must be positive") InvalidRetryMaxDelay = errors.New("max delay must be positive") InvalidRetryJitterFactor = errors.New("jitter factor must be between 0.0 and 1.0") + InvalidMaxQueueSize = errors.New("maximum queue size cannot be negative") ) func NewConfigError(text string) error { diff --git a/pool.go b/pool.go index 62e5df6..0e68edd 100644 --- a/pool.go +++ b/pool.go @@ -62,7 +62,7 @@ type pool struct { errors chan error done chan struct{} - config *PoolConfig + config *InitiatorPoolConfig logger *slog.Logger mu sync.RWMutex @@ -147,12 +147,12 @@ type InitiatorPool struct { dialer Dialer } -func NewInitiatorPool(config *PoolConfig, logger *slog.Logger) (*InitiatorPool, error) { +func NewInitiatorPool(config *InitiatorPoolConfig, logger *slog.Logger) (*InitiatorPool, error) { if config == nil { - config = GetDefaultPoolConfig() + config = GetDefaultInitiatorPoolConfig() } - if err := validatePoolConfig(config); err != nil { + if err := validateInitiatorPoolConfig(config); err != nil { return nil, err } diff --git a/pool_test.go b/pool_test.go index 70a5a36..0b06902 100644 --- a/pool_test.go +++ b/pool_test.go @@ -70,7 +70,7 @@ func TestPoolConnect(t *testing.T) { t.Run("fails to add connection", func(t *testing.T) { pool, err := NewInitiatorPool( - &PoolConfig{ + &InitiatorPoolConfig{ ConnectionConfig: &ConnectionConfig{ Retry: &RetryConfig{ MaxRetries: 1, diff --git a/worker.go b/worker.go index e701842..a187674 100644 --- a/worker.go +++ b/worker.go @@ -1,18 +1,107 @@ package honeybee +import ( + "log/slog" + "sync" + "time" +) + // Types // Worker Implementation -type Worker interface{} +type Worker interface { + Start( + ctx *WorkerContext, + wg *sync.WaitGroup, + ) +} + +type WorkerContext struct { + Inbox chan<- InboxMessage + Events chan<- PoolEvent + Errors chan<- error + Stop <-chan struct{} + PoolDone <-chan struct{} + Logger *slog.Logger +} // Base Struct -type worker struct{} +type worker struct { + id string +} + +func (w *worker) runForwarder( + messages <-chan []byte, + inbox chan<- []byte, + stop <-chan struct{}, + poolDone <-chan struct{}, + maxQueueSize int, +) { +} // Initiator Worker -type InitiatorWorker struct{} +type InitiatorWorker struct { + *worker + config *InitiatorWorkerConfig + onReconnect func() (*Connection, error) +} + +func newInitiatorWorker( + id string, + config *InitiatorWorkerConfig, + onReconnect func() (*Connection, error), + logger *slog.Logger, + +) (*InitiatorWorker, error) { + w := &InitiatorWorker{ + worker: &worker{ + id: id, + logger: logger, + }, + config: config, + onReconnect: onReconnect, + } + + return w, nil +} + +func (w *InitiatorWorker) Start( + inbox chan<- InboxMessage, + events chan<- PoolEvent, + stop <-chan struct{}, + poolDone <-chan struct{}, + wg *sync.WaitGroup, +) { +} + +func runReader(conn *Connection, + messages chan<- []byte, + heartbeat chan<- time.Time, + reconnect chan<- struct{}, + newConn <-chan *Connection, + stop <-chan struct{}, + poolDone <-chan struct{}, + +) { +} + +func runHealthMonitor( + heartbeat <-chan time.Time, + stop <-chan struct{}, + poolDone <-chan struct{}, +) { +} + +func runReconnector( + reconnect <-chan struct{}, + newConn chan<- *Connection, + stop <-chan struct{}, + poolDone <-chan struct{}, +) { +} // Responder Worker diff --git a/worker_test.go b/worker_test.go index 72aab9a..53eb7a8 100644 --- a/worker_test.go +++ b/worker_test.go @@ -1 +1,7 @@ package honeybee + +import ( +// "github.com/stretchr/testify/assert" +// "testing" +// "time" +)