Various performance and correctness improvements.
This commit is contained in:
+37
-6
@@ -7,9 +7,11 @@ import (
|
||||
type CloseHandler func(code int, text string) error
|
||||
|
||||
type ConnectionConfig struct {
|
||||
CloseHandler CloseHandler
|
||||
WriteTimeout time.Duration
|
||||
Retry *RetryConfig
|
||||
CloseHandler CloseHandler
|
||||
WriteTimeout time.Duration
|
||||
IncomingBufferSize int
|
||||
ErrorsBufferSize int
|
||||
Retry *RetryConfig
|
||||
}
|
||||
|
||||
type RetryConfig struct {
|
||||
@@ -34,9 +36,11 @@ func NewConnectionConfig(options ...ConnectionOption) (*ConnectionConfig, error)
|
||||
|
||||
func GetDefaultConnectionConfig() *ConnectionConfig {
|
||||
return &ConnectionConfig{
|
||||
CloseHandler: nil,
|
||||
WriteTimeout: 30 * time.Second,
|
||||
Retry: GetDefaultRetryConfig(),
|
||||
CloseHandler: nil,
|
||||
WriteTimeout: 30 * time.Second,
|
||||
IncomingBufferSize: 100,
|
||||
ErrorsBufferSize: 10,
|
||||
Retry: GetDefaultRetryConfig(),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -100,6 +104,13 @@ func validateWriteTimeout(value time.Duration) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func validateBufferSize(value int) error {
|
||||
if value < 1 {
|
||||
return InvalidBufferSize
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func validateMaxRetries(value int) error {
|
||||
if value < 0 {
|
||||
return InvalidRetryMaxRetries
|
||||
@@ -147,6 +158,26 @@ func WithWriteTimeout(value time.Duration) ConnectionOption {
|
||||
}
|
||||
}
|
||||
|
||||
func WithIncomingBufferSize(value int) ConnectionOption {
|
||||
return func(c *ConnectionConfig) error {
|
||||
if err := validateBufferSize(value); err != nil {
|
||||
return err
|
||||
}
|
||||
c.IncomingBufferSize = value
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func WithErrorsBufferSize(value int) ConnectionOption {
|
||||
return func(c *ConnectionConfig) error {
|
||||
if err := validateBufferSize(value); err != nil {
|
||||
return err
|
||||
}
|
||||
c.ErrorsBufferSize = value
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func WithoutRetry() ConnectionOption {
|
||||
return func(c *ConnectionConfig) error {
|
||||
c.Retry = nil
|
||||
|
||||
@@ -13,9 +13,11 @@ func TestNewConnectionConfig(t *testing.T) {
|
||||
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, conf, &ConnectionConfig{
|
||||
CloseHandler: nil,
|
||||
WriteTimeout: 30 * time.Second,
|
||||
Retry: GetDefaultRetryConfig(),
|
||||
CloseHandler: nil,
|
||||
WriteTimeout: 30 * time.Second,
|
||||
IncomingBufferSize: 100,
|
||||
ErrorsBufferSize: 10,
|
||||
Retry: GetDefaultRetryConfig(),
|
||||
})
|
||||
|
||||
// errors propagate
|
||||
@@ -32,9 +34,11 @@ func TestDefaultConnectionConfig(t *testing.T) {
|
||||
conf := GetDefaultConnectionConfig()
|
||||
|
||||
assert.Equal(t, conf, &ConnectionConfig{
|
||||
CloseHandler: nil,
|
||||
WriteTimeout: 30 * time.Second,
|
||||
Retry: GetDefaultRetryConfig(),
|
||||
CloseHandler: nil,
|
||||
WriteTimeout: 30 * time.Second,
|
||||
IncomingBufferSize: 100,
|
||||
ErrorsBufferSize: 10,
|
||||
Retry: GetDefaultRetryConfig(),
|
||||
})
|
||||
}
|
||||
|
||||
@@ -55,12 +59,16 @@ func TestApplyConnectionOptions(t *testing.T) {
|
||||
conf := &ConnectionConfig{}
|
||||
err := applyConnectionOptions(
|
||||
conf,
|
||||
WithIncomingBufferSize(256),
|
||||
WithErrorsBufferSize(100),
|
||||
WithRetryMaxRetries(0),
|
||||
WithRetryInitialDelay(3*time.Second),
|
||||
WithRetryJitterFactor(0.5),
|
||||
)
|
||||
|
||||
assert.NoError(t, err)
|
||||
assert.Equal(t, 256, conf.IncomingBufferSize)
|
||||
assert.Equal(t, 100, conf.ErrorsBufferSize)
|
||||
assert.Equal(t, 0, conf.Retry.MaxRetries)
|
||||
assert.Equal(t, 3*time.Second, conf.Retry.InitialDelay)
|
||||
assert.Equal(t, 0.5, conf.Retry.JitterFactor)
|
||||
|
||||
@@ -78,8 +78,8 @@ func NewConnection(urlStr string, config *ConnectionConfig, logger *slog.Logger)
|
||||
socket: nil,
|
||||
config: config,
|
||||
logger: logger,
|
||||
incoming: make(chan []byte, 100),
|
||||
errors: make(chan error, 10),
|
||||
incoming: make(chan []byte, config.IncomingBufferSize),
|
||||
errors: make(chan error, config.ErrorsBufferSize),
|
||||
state: StateDisconnected,
|
||||
done: make(chan struct{}),
|
||||
}
|
||||
@@ -108,8 +108,8 @@ func NewConnectionFromSocket(
|
||||
socket: socket,
|
||||
config: config,
|
||||
logger: logger,
|
||||
incoming: make(chan []byte, 100),
|
||||
errors: make(chan error, 10),
|
||||
incoming: make(chan []byte, config.IncomingBufferSize),
|
||||
errors: make(chan error, config.ErrorsBufferSize),
|
||||
state: StateConnected,
|
||||
done: make(chan struct{}),
|
||||
}
|
||||
|
||||
@@ -9,6 +9,7 @@ var (
|
||||
|
||||
// Configuration Errors
|
||||
InvalidWriteTimeout = errors.New("write timeout cannot be negative")
|
||||
InvalidBufferSize = errors.New("buffer size must be greater than zero")
|
||||
InvalidRetryMaxRetries = errors.New("max retry count cannot be negative")
|
||||
InvalidRetryInitialDelay = errors.New("initial delay must be positive")
|
||||
InvalidRetryMaxDelay = errors.New("max delay must be positive")
|
||||
|
||||
+23
-1
@@ -9,12 +9,24 @@ import (
|
||||
type RetryManager struct {
|
||||
config *RetryConfig
|
||||
retryCount int
|
||||
saturation int
|
||||
}
|
||||
|
||||
func NewRetryManager(config *RetryConfig) *RetryManager {
|
||||
// saturationCount: retry count at which base delay meets or exceeds MaxDelay.
|
||||
// Conservative by one to preserve jitter variance near the boundary.
|
||||
saturation := 0
|
||||
if config != nil &&
|
||||
config.InitialDelay > 0 &&
|
||||
config.InitialDelay <= config.MaxDelay {
|
||||
ratio := float64(config.MaxDelay) / float64(config.InitialDelay)
|
||||
saturation = int(math.Ceil(math.Log2(ratio))) + 2
|
||||
}
|
||||
|
||||
return &RetryManager{
|
||||
config: config,
|
||||
retryCount: 0,
|
||||
saturation: saturation,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -40,8 +52,18 @@ func (r *RetryManager) CalculateDelay() time.Duration {
|
||||
return 0
|
||||
}
|
||||
|
||||
// if saturation is reached, calculated backoff will always be higher than
|
||||
// the maximum delay
|
||||
if r.config != nil && r.retryCount >= r.saturation {
|
||||
return r.config.MaxDelay
|
||||
}
|
||||
|
||||
// Exponential backoff: InitialDelay * 2^(attempts-1)
|
||||
backoffMultiplier := math.Pow(2, float64(r.retryCount-1))
|
||||
shift := r.retryCount - 1
|
||||
if shift > 62 {
|
||||
shift = 62
|
||||
} // prevent overflow
|
||||
backoffMultiplier := float64(int64(1) << shift)
|
||||
baseDelay := float64(r.config.InitialDelay) * backoffMultiplier
|
||||
|
||||
// Apply jitter: delay * (1 + jitterFactor * (random - 0.5))
|
||||
|
||||
Reference in New Issue
Block a user