Increase keepalive timeout. Add reconnection delay.
This commit is contained in:
+21
-1
@@ -168,6 +168,7 @@ func WithWorkerFactory(wf WorkerFactory) PoolOption {
|
|||||||
|
|
||||||
type WorkerConfig struct {
|
type WorkerConfig struct {
|
||||||
KeepaliveTimeout time.Duration
|
KeepaliveTimeout time.Duration
|
||||||
|
ReconnectDelay time.Duration
|
||||||
MaxQueueSize int
|
MaxQueueSize int
|
||||||
LoggingEnabled bool
|
LoggingEnabled bool
|
||||||
LogLevel *slog.Level
|
LogLevel *slog.Level
|
||||||
@@ -188,7 +189,8 @@ func NewWorkerConfig(options ...WorkerOption) (*WorkerConfig, error) {
|
|||||||
|
|
||||||
func GetDefaultWorkerConfig() *WorkerConfig {
|
func GetDefaultWorkerConfig() *WorkerConfig {
|
||||||
return &WorkerConfig{
|
return &WorkerConfig{
|
||||||
KeepaliveTimeout: 20 * time.Second,
|
KeepaliveTimeout: 60 * time.Second,
|
||||||
|
ReconnectDelay: 2 * time.Second,
|
||||||
MaxQueueSize: 0, // disabled by default
|
MaxQueueSize: 0, // disabled by default
|
||||||
LoggingEnabled: true,
|
LoggingEnabled: true,
|
||||||
LogLevel: nil,
|
LogLevel: nil,
|
||||||
@@ -232,6 +234,13 @@ func validateKeepaliveTimeout(value time.Duration) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func validateReconnectDelay(value time.Duration) error {
|
||||||
|
if value < 0 {
|
||||||
|
return InvalidReconnectDelay
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// When KeepaliveTimeout is set to zero, keepalive timeouts are disabled.
|
// When KeepaliveTimeout is set to zero, keepalive timeouts are disabled.
|
||||||
func WithKeepaliveTimeout(value time.Duration) WorkerOption {
|
func WithKeepaliveTimeout(value time.Duration) WorkerOption {
|
||||||
return func(c *WorkerConfig) error {
|
return func(c *WorkerConfig) error {
|
||||||
@@ -244,6 +253,17 @@ func WithKeepaliveTimeout(value time.Duration) WorkerOption {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func WithReconnectDelay(value time.Duration) WorkerOption {
|
||||||
|
return func(c *WorkerConfig) error {
|
||||||
|
err := validateReconnectDelay(value)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
c.ReconnectDelay = value
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// When MaxQueueSize is set to zero, queue limits are disabled.
|
// When MaxQueueSize is set to zero, queue limits are disabled.
|
||||||
func WithMaxQueueSize(value int) WorkerOption {
|
func WithMaxQueueSize(value int) WorkerOption {
|
||||||
return func(c *WorkerConfig) error {
|
return func(c *WorkerConfig) error {
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ import "fmt"
|
|||||||
var (
|
var (
|
||||||
// Config errors
|
// Config errors
|
||||||
InvalidKeepaliveTimeout = errors.New("keepalive timeout cannot be negative")
|
InvalidKeepaliveTimeout = errors.New("keepalive timeout cannot be negative")
|
||||||
|
InvalidReconnectDelay = errors.New("reconnect delay cannot be negative")
|
||||||
InvalidMaxQueueSize = errors.New("maximum queue size cannot be negative")
|
InvalidMaxQueueSize = errors.New("maximum queue size cannot be negative")
|
||||||
InvalidBufferSize = errors.New("buffer size must be greater than zero")
|
InvalidBufferSize = errors.New("buffer size must be greater than zero")
|
||||||
|
|
||||||
|
|||||||
@@ -100,6 +100,7 @@ func (w *DefaultWorker) Start(pool PoolPlugin) {
|
|||||||
dial: dial,
|
dial: dial,
|
||||||
keepalive: keepalive,
|
keepalive: keepalive,
|
||||||
newConn: newConn,
|
newConn: newConn,
|
||||||
|
reconnectDelay: w.config.ReconnectDelay,
|
||||||
logger: w.logger,
|
logger: w.logger,
|
||||||
}
|
}
|
||||||
session.Start(w.ctx, pool)
|
session.Start(w.ctx, pool)
|
||||||
@@ -153,6 +154,8 @@ type Session struct {
|
|||||||
keepalive <-chan struct{}
|
keepalive <-chan struct{}
|
||||||
newConn <-chan *transport.Connection
|
newConn <-chan *transport.Connection
|
||||||
|
|
||||||
|
reconnectDelay time.Duration
|
||||||
|
|
||||||
logger *slog.Logger
|
logger *slog.Logger
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -238,6 +241,7 @@ func (s *Session) Start(
|
|||||||
}
|
}
|
||||||
|
|
||||||
// refresh session
|
// refresh session
|
||||||
|
time.Sleep(s.reconnectDelay)
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user