Table of Contents
- Overall Nostr Websocket Data Transport Stack
- roots-ws
- Functional Architecture
- 1. Core Types
- 2. Envelope Handling
- 3. Envelope Creation Functions
- 4. Envelope Parsing Functions
- 5. Error Types
- Architecture Decision Records
- honeybee
- Functional Architecture
- 1. Connection Management
- 2. Reconnection Management
- 3. Connection Health Monitoring
- 4. Rate Limiting
- 5. Compression
- 6. Connection Metrics
- 7. Message Router Implementation
- 8. Write Queue Implementation
- 9. Initiator Functions
- 10. Responder Functions
- 11. Subscription Management
- 12. Connection Pool
- 13. Error Types
- 14. Main Package
- Cross-Language Translation: Go to TypeScript with RxJS
- Architecture Decision Records
- ADR 1: Practical WebSocket Implementation
- ADR 2: Event Loop Based Architecture
- ADR 3: Role-Specific Connection Patterns
- ADR 4: Simple State Management
- ADR 5: Basic Resilience Features
- ADR 6: Dependency Minimization
- ADR 7: Simplicity as a First-Class Feature
- ADR 8: Interface-First Design for Extension
- mana-ws
Overall Nostr Websocket Data Transport Stack
Architecture Overview
The Nostr WebSocket Data Transport Stack provides a layered approach to implementing the Nostr protocol over WebSockets. The stack consists of three complementary libraries that can be used independently or together, depending on application requirements:
-
roots-ws: A foundation layer that provides pure functional primitives for working with Nostr protocol messages. This layer focuses on time-independent operations with no event loops or state management, implementing core message validation, serialization, and routing logic. -
honeybee: A practical implementation layer that builds onroots-wsto provide essential WebSocket transport functionality. This layer adds connection management, reconnection handling, and subscription management with appropriate event loops and state tracking. -
mana-ws: An extensions layer that provides specialized, optional functionality for advanced use cases. This layer implements sophisticated features like connection pooling, message compression, rate limiting, and detailed statistics tracking.
The architecture follows these key principles:
- Pure Core: The foundation is built on pure functions with minimal side effects
- Composition Over Inheritance: Higher layers compose functionality from lower layers rather than extending them
- Progressive Enhancement: Applications can start with just
roots-wsand add higher layers as needs evolve - Functional Paradigm: All layers use a functional approach with explicit state management
- Clear Separation of Concerns: Each layer has a distinct focus and responsibility
This layered approach allows applications to choose the appropriate level of functionality while maintaining a consistent programming model across the stack.
Architecture Decision Records
ADR 1: Layered Architecture with Unidirectional Dependencies
Context: Need to organize WebSocket transport functionality to support different levels of application complexity.
Decision: Split functionality into three layers (roots-ws, honeybee, mana-ws) with unidirectional dependencies.
Rationale:
- Allows applications to use only the functionality they need
- Creates natural boundaries between pure functionality and stateful behavior
- Promotes focused testing and simplified maintenance
- Enables incremental adoption for applications with evolving needs
- Follows the principle of separation of concerns
Consequences:
- Requires careful API design to ensure layers compose well
- May lead to some duplication between layers for independence
- Documentation must clearly explain the role of each layer
- Testing becomes more modular and focused on each layer's responsibilities
ADR 2: Functional Programming Paradigm Across Layers
Context: Need a consistent programming model that works well across all layers.
Decision: Adopt functional programming patterns with namespaced functions operating on explicit data structures.
Rationale:
- Creates a consistent mental model across all layers
- Improves testability through pure functions and explicit state
- Reduces coupling between components
- Aligns with Go's design philosophy
- Makes state management explicit and traceable
Consequences:
- May be less familiar to developers used to OOP
- Requires clear documentation on function usage patterns
- Improves composability of library components
- Makes state changes and side effects more visible
ADR 3: Transport-Only Focus with No Protocol Semantics
Context: Need to define the scope of the transport stack relative to other Nostr libraries.
Decision: Focus exclusively on transport concerns, deferring protocol semantics to other libraries.
Rationale:
- Creates a clear separation between transport and protocol semantics
- Allows the transport to be used with different protocol implementations
- Follows single responsibility principle
- Reduces coupling between transport and application logic
- Enables testing of transport independently from protocol semantics
Consequences:
- Applications need to compose transport with protocol libraries
- Requires clear API boundaries between transport and protocol concerns
- May require additional integration code in some applications
- Simplifies evolution of transport and protocol independently
ADR 4: Explicit Role-Based Connection Model
Context: Nostr protocol has inherent asymmetry in connection patterns.
Decision: Model connections explicitly as either initiator or responder roles rather than using client/relay terminology.
Rationale:
- Accurately represents the connection control model
- Avoids confusion with higher-level client/relay concepts
- Focuses on the transport protocol's communication pattern
- Creates clear API boundaries for each connection role
- Allows specialized behavior appropriate to each role
Consequences:
- API must make it explicit which operations are permitted for each role
- Requires role-specific implementations of some functionality
- May require additional validation to prevent role violations
- Improves clarity of connection behavior expectations
ADR 5: Tiered Configuration Strategy
Context: Different components require different levels of configurability.
Decision: Implement a tiered configuration approach with defaults, profiles, component-level options, and advanced parameters.
Rationale:
- Makes simple use cases simple with zero configuration
- Provides appropriate control for advanced users
- Organizes configuration into logical groupings
- Prevents configuration complexity from leaking into basic usage
- Allows applications to tune specific aspects without understanding all options
Consequences:
- Requires careful selection of default values
- Increases implementation complexity for configuration management
- Improves documentation by organizing options into logical tiers
- Enables incremental configuration as application needs grow
- May require configuration builders to maintain clean interfaces
roots-ws
Functional Architecture
1. Core Types
Purpose: Defines basic types for WebSocket connections and message handling.
// Core types
type ConnectionStatus int
const (
// StatusDisconnected indicates the connection is not active and no connection attempt is in progress.
StatusDisconnected ConnectionStatus = iota
// StatusConnecting indicates a connection attempt is currently in progress but not yet established.
StatusConnecting
// StatusConnected indicates the connection is active and ready for message exchange.
StatusConnected
// StatusClosing indicates the connection is in the process of shutting down gracefully.
StatusClosing
// StatusClosed indicates the connection has been fully closed with a clean shutdown.
StatusClosed
// StatusError indicates the connection has failed and is in an error state.
StatusError
// StatusBackoff indicates a temporary disconnected state during a reconnection attempt.
StatusBackoff
// StatusAwaiting indicates waiting for preconditions before attempting connection.
StatusAwaiting
// StatusUpgrading indicates WebSocket protocol upgrade in progress.
StatusUpgrading
)
2. Envelope Handling
Purpose: Provides a type and functions for working with Nostr protocol messages.
// Core envelope type
type Envelope []byte
// Get message label
func GetLabel(env Envelope) (string, error) {
// Parse JSON to extract first element as string (label)
// Handle errors if data isn't valid JSON or doesn't contain a label
}
// Get standard message labels
func GetStandardLabels() map[string]struct{} {
// Return map with standard Nostr message types:
// EVENT, REQ, CLOSE, CLOSED, EOSE, NOTICE, OK, AUTH
}
// Check if label is standard
func IsStandardLabel(label string) bool {
// Check against list of standard Nostr message labels
}
3. Envelope Creation Functions
Purpose: Creates standard Nostr protocol messages using pure functions.
// Create EVENT envelope for publishing events
func EncloseEvent(event []byte) Envelope {
// Create array with "EVENT" and event object
// Return as JSON bytes
}
// Create EVENT envelope for subscription events
func EncloseSubscriptionEvent(subID string, event []byte) Envelope {
// Create array with "EVENT", subID, and event
// Return as JSON bytes
}
// Create REQ envelope
func EncloseReq(subID string, filters [][]byte) Envelope {
// Create array with "REQ", subID, and filters
// Return as JSON bytes
}
// Create CLOSE envelope
func EncloseClose(subID string) Envelope {
// Create array with "CLOSE" and subID
// Return as JSON bytes
}
// Create CLOSED envelope
func EncloseClosed(subID string, message string) Envelope {
// Create array with "CLOSED", subID, and message
// Return as JSON bytes
}
// Create NOTICE envelope
func EncloseNotice(message string) Envelope {
// Create array with "NOTICE" and message
// Return as JSON bytes
}
// Create EOSE envelope
func EncloseEOSE(subID string) Envelope {
// Create array with "EOSE" and subID
// Return as JSON bytes
}
// Create OK envelope
func EncloseOK(eventID string, status bool, message string) Envelope {
// Create array with "OK", eventID, status, and message
// Return as JSON bytes
}
// Create AUTH challenge envelope
func EncloseAuthChallenge(challenge string) Envelope {
// Create array with "AUTH" and challenge
// Return as JSON bytes
}
// Create AUTH response envelope
func EncloseAuthResponse(event []byte) Envelope {
// Create array with "AUTH" and event
// Return as JSON bytes
}
4. Envelope Parsing Functions
Purpose: Extracts data from standard Nostr protocol messages.
// Helper functions for validation
func CheckArrayLength(arr []json.RawMessage, minLen int) error {
// Ensure array has at least the minimum required elements
}
func CheckLabel(got, want string) error {
// Verify that envelope label matches expected value
}
func ParseElement(element json.RawMessage, value interface{}, position string) error {
// Unmarshal array element into provided value with error handling
}
// Extract event from EVENT envelope
func FindEvent(env Envelope) ([]byte, error) {
// Parse message as JSON array
// Validate array has correct structure for EVENT
// Extract event object
// Return event as JSON bytes
}
// Extract subscription event
func FindSubscriptionEvent(env Envelope) (subID string, event []byte, err error) {
// Parse message as JSON array
// Validate array has correct structure for subscription EVENT
// Extract subscription ID and event
// Return both with appropriate error handling
}
// Extract subscription request
func FindReq(env Envelope) (subID string, filters [][]byte, err error) {
// Parse message as JSON array
// Validate array has correct structure for REQ
// Extract subscription ID and filters
// Return both with appropriate error handling
}
// Extract OK response
func FindOK(env Envelope) (eventID string, status bool, message string, err error) {
// Parse message as JSON array
// Validate array has correct structure for OK
// Extract eventID, status, and message
// Return all with appropriate error handling
}
// Extract EOSE message
func FindEOSE(env Envelope) (subID string, err error) {
// Parse message as JSON array
// Validate array has correct structure for EOSE
// Extract subscription ID
// Return with appropriate error handling
}
// Extract CLOSE message
func FindClose(env Envelope) (subID string, err error) {
// Parse message as JSON array
// Validate array has correct structure for CLOSE
// Extract subscription ID
// Return with appropriate error handling
}
// Extract CLOSED message
func FindClosed(env Envelope) (subID string, message string, err error) {
// Parse message as JSON array
// Validate array has correct structure for CLOSED
// Extract subscription ID and message
// Return both with appropriate error handling
}
// Extract NOTICE message
func FindNotice(env Envelope) (message string, err error) {
// Parse message as JSON array
// Validate array has correct structure for NOTICE
// Extract message
// Return with appropriate error handling
}
// Extract AUTH challenge
func FindAuthChallenge(env Envelope) (challenge string, err error) {
// Parse message as JSON array
// Validate array has correct structure for AUTH challenge
// Extract challenge
// Return with appropriate error handling
}
// Extract AUTH response
func FindAuthResponse(env Envelope) (event []byte, err error) {
// Parse message as JSON array
// Validate array has correct structure for AUTH response
// Extract event
// Return with appropriate error handling
}
5. Error Types
Purpose: Define error types for the library.
// Data Structure Errors
var (
InvalidJSON = errors.New("invalid JSON")
MissingField = errors.New("missing required field")
WrongFieldType = errors.New("wrong field type")
)
// Envelope Errors
var (
InvalidEnvelope = errors.New("invalid envelope format")
WrongEnvelopeLabel = errors.New("wrong envelope label")
)
Architecture Decision Records
ADR 1: Pure Functional Approach
Context: Need to define the programming paradigm for the foundation layer.
Decision: Implement roots-ws as a pure functional library with no side effects or state management.
Rationale:
- Creates a solid foundation for higher-level libraries to build upon
- Maximizes testability and predictability
- Eliminates concurrency issues at the foundation level
- Makes function composition more natural and explicit
- Aligns with functional programming best practices
Consequences:
- Cannot include any event loops or timers in this layer
- Must pass all state explicitly through function parameters
- All I/O operations must be handled by higher layers
- Library will feel incomplete when used standalone
- Function signatures will be more explicit about dependencies
ADR 2: Raw JSON Handling
Context: Need to decide how message content should be handled.
Decision: Process messages as raw JSON at this layer, deferring semantic validation to consumers.
Rationale:
- Avoids unnecessary serialization/deserialization cycles
- Maintains flexibility for consumers in how they process content
- Prevents tight coupling to specific JSON libraries
- Aligns with the transport-only focus of the library
- Makes message transformation more explicit
Consequences:
- Cannot validate message content semantics, only structure
- Consumers need to implement their own content parsing
- Performance benefits from reduced parsing overhead
- More flexibility for consumers to use different JSON libraries
- Must clearly document format expectations
ADR 3: Explicit Error Types
Context: Error handling approach needs to be defined.
Decision: Define explicit error types rather than using generic errors.
Rationale:
- Allows consumers to handle different error scenarios appropriately
- Creates clear documentation of possible failure modes
- Enables selective recovery strategies based on error type
- Improves debugging by providing specific error information
- Creates a consistent error handling pattern
Consequences:
- Increases code surface area with many error definitions
- Requires discipline to use the right error types
- Simplifies error handling for consumers
- Makes error handling expectations explicit in documentation
- May lead to error handling anti-patterns if not well documented
ADR 4: No Dependencies
Context: Need to determine dependency strategy for the foundation layer.
Decision: roots-ws should have no dependencies outside the standard library.
Rationale:
- Eliminates dependency conflicts for consumers
- Reduces maintenance burden from external API changes
- Creates a more stable foundation for higher layers
- Simplifies deployment and versioning
- Makes clear boundaries around functionality
Consequences:
- May need to implement functionality available in external libraries
- Limited to standard library capabilities
- Reduces feature bloat by forcing simplicity
- Improves long-term stability
- May require more code than using external libraries
ADR 5: Language-Specific Implementations
Context: Need to support multiple programming languages.
Decision: Provide idiomatic implementations for each supported language rather than a universal API.
Rationale:
- Allows each language implementation to follow best practices for that language
- Creates more natural APIs for consumers in each language
- Avoids awkward abstractions to force uniformity
- Enables leveraging language-specific features
- Improves adoption by feeling native to each language ecosystem
Consequences:
- Implementations will diverge in details across languages
- Requires maintaining multiple codebases
- Better developer experience in each language
- More challenging to maintain consistency in behavior
- More flexibility to evolve each implementation independently
ADR 6: Exported Helper Functions
Context: Need to decide which helper functions should be part of the public API to enable extensibility.
Decision: Export core helper functions (CheckArrayLength, CheckLabel, ParseElement) that enable consumers to create their own envelope types consistent with the library's patterns.
Rationale:
- Allows consumers to extend the library with custom envelope types
- Ensures consistency in validation and error handling across custom and built-in envelopes
- Enables protocol evolution without requiring library updates
- Follows the principle of providing tools rather than just finished products
- Creates a flexible extension point that preserves architectural integrity
Consequences:
- Increases the public API surface area
- Requires careful documentation of helper function behaviors and expectations
- Makes breaking changes to helpers more impactful
- Creates shared validation patterns across the ecosystem
- Encourages protocol innovation while maintaining implementation consistency
honeybee
Functional Architecture
1. Connection Management
Purpose: Manages WebSocket connection lifecycle with event loops.
Dependencies: roots-ws Connection types, Error types
package connection
// Core connection type
type Connection struct {
Status ConnectionStatus
URI string
InChan chan []byte // Socket → Application
OutChan chan []byte // Application → Socket
ErrChan chan error // Internal errors
socket *websocket.Conn
closeOnce sync.Once // Ensures single close execution
// Component references
ReconnectManager *reconnection.Manager // Optional reconnection handling
HealthMonitor *health.Monitor // Optional health monitoring
RateLimiter *ratelimit.Limiter // Optional rate limiting
CompressionEnabled bool // Whether compression is enabled
Metrics *metrics.Collector // Connection metrics tracking
done chan struct{} // Signal to shut down goroutines
}
// Configuration with sensible defaults
type Config struct {
// Connection timeout
ConnectionTimeout time.Duration
// Component configurations
ReconnectConfig *reconnection.Config // Optional reconnection config
HealthConfig *health.Config // Optional health monitoring config
RateLimitConfig *ratelimit.Config // Optional rate limiting config
CompressionConfig *compression.Config // Optional compression config
MetricsEnabled bool // Whether to collect metrics
// Message limits
MaxMessageSize int
}
// Connection status values
type ConnectionStatus int
const (
StatusDisconnected ConnectionStatus = iota
StatusConnecting
StatusConnected
StatusClosing
StatusClosed
StatusError
StatusBackoff
)
// Create new connection in disconnected state
func New(uri string, config Config) *Connection {
// Initialize connection with channels and disconnected status
// Initialize optional components if configs are provided
// Initialize metrics collector if enabled
// Return initialized connection
}
// Connect to websocket endpoint (for initiator)
func Connect(conn *Connection) error {
// Set connection status to StatusConnecting
// Create dialer with connection timeout from config
// Set up compression options if enabled
// Establish websocket connection with timeout
// Start reader and writer goroutines
// Initialize health monitoring if configured
// Set connection status to StatusConnected on success
}
// Accept existing websocket (for responder)
func FromSocket(socket *websocket.Conn, config Config) *Connection {
// Create connection from existing socket
// Start reader and writer goroutines
// Initialize health monitoring if configured
// Set up rate limiting if configured
// Enable compression if configured
// Initialize metrics if enabled
}
// Close connection
func Close(conn *Connection) error {
// Safely close the websocket using closeOnce
// Clean up goroutines and channels
// Record close event in metrics if enabled
}
// Get current connection state
func Status(conn *Connection) ConnectionStatus {
// Return current status
}
// Internal reader goroutine
func startReader(conn *Connection) {
// Read envelopes from socket
// Apply rate limiting if enabled
// Record metrics if enabled
// Send to InChan
// Handle errors via ErrChan
}
// Internal writer goroutine
func startWriter(conn *Connection) {
// Read from OutChan
// Apply compression if enabled
// Record metrics if enabled
// Write envelopes to socket
// Handle errors via ErrChan
}
// Handle onclose event from websocket
func setupCloseHandler(conn *Connection) {
// Set close handler on websocket
// Send close error through ErrChan
// Update connection status
// Record disconnection in metrics if enabled
// Trigger reconnection if enabled
}
2. Reconnection Management
Purpose: Handles automatic reconnection with exponential backoff.
Dependencies: roots-ws Error types
package reconnection
// Reconnection configuration with sensible defaults
type Config struct {
Enabled bool
MaxAttempts int // Maximum number of reconnection attempts (0 for unlimited)
InitialDelay time.Duration // Default: 1 second
MaxDelay time.Duration // Default: 30 seconds
BackoffFactor float64 // Default: 2.0 (doubles delay each attempt)
JitterFactor float64 // Default: 0.2 (adds ±20% randomization)
}
// Default reconnection configuration
func DefaultConfig() *Config {
return &Config{
Enabled: true,
MaxAttempts: 5,
InitialDelay: 1 * time.Second,
MaxDelay: 30 * time.Second,
BackoffFactor: 2.0,
JitterFactor: 0.2,
}
}
// Reconnection manager handles retry logic
type Manager struct {
Config *Config
attempts int
lastAttempt time.Time
callbacks []func() // Functions to call on successful reconnect
}
// Create new reconnection manager
func NewManager(config *Config) *Manager {
// Use default config if none provided
// Initialize manager with zero attempts
// Return initialized manager
}
// Register reconnection callback
func (m *Manager) OnReconnect(callback func()) {
// Add callback to callbacks slice
}
// Calculate backoff delay with jitter
func (m *Manager) CalculateDelay() time.Duration {
// Return zero if this is first attempt
// Calculate base delay: initialDelay * (backoffFactor ^ attempts)
// Add jitter: delay * (1 ± random * jitterFactor)
// Cap at maximum delay
// Return calculated delay
}
// Check if should attempt reconnection
func (m *Manager) ShouldAttempt() bool {
// Return false if disabled
// Check if max attempts exceeded (unless unlimited)
// Return true if should attempt
}
// Record reconnection attempt
func (m *Manager) RecordAttempt() {
// Increment attempts counter
// Update lastAttempt timestamp
}
// Reset after successful connection
func (m *Manager) Reset() {
// Reset attempts to zero
// Clear lastAttempt timestamp
}
// Handle successful reconnection
func (m *Manager) HandleReconnect() {
// Reset attempt counter
// Execute all registered callbacks
}
3. Connection Health Monitoring
Purpose: Monitors connection health using ping/pong heartbeats.
Dependencies: roots-ws Error types
package health
// Health monitoring configuration
type Config struct {
PingInterval time.Duration // Default: 25 seconds
PongTimeout time.Duration // Default: 15 seconds
PingFailureThreshold int // Default: 3 consecutive failures
}
// Default health configuration
func DefaultConfig() *Config {
return &Config{
PingInterval: 25 * time.Second,
PongTimeout: 15 * time.Second,
PingFailureThreshold: 3,
}
}
// Health monitor tracks connection health
type Monitor struct {
Config *Config
conn *websocket.Conn
lastPingSent time.Time
lastPongReceived time.Time
consecutiveFailures int
isHealthy bool
}
// Create new health monitor
func NewMonitor(conn *websocket.Conn, config *Config) *Monitor {
// Use default config if none provided
// Initialize monitor with healthy state
// Set up ping/pong handlers on connection
// Return initialized monitor
}
// Start health monitoring
func (m *Monitor) Start() {
// Start ticker with PingInterval
// Send pings at regular intervals
// Monitor pong responses
// Update health status based on responses
}
// Stop health monitoring
func (m *Monitor) Stop() {
// Stop ticker
// Clean up resources
}
// Check connection health
func (m *Monitor) IsHealthy() bool {
// Return current health status
}
// Get time since last successful pong
func (m *Monitor) TimeSinceLastPong() time.Duration {
// Return duration since lastPongReceived
// Return max duration if no pong received yet
}
// Handle ping send
func (m *Monitor) handlePingSend() {
// Send ping frame
// Update lastPingSent timestamp
// Schedule timeout check
}
// Handle pong received
func (m *Monitor) handlePongReceived() {
// Update lastPongReceived timestamp
// Reset consecutive failures counter
// Update health status to healthy
}
// Check for ping timeout
func (m *Monitor) checkPingTimeout() {
// Calculate time since last ping sent
// If exceeded PongTimeout:
// - Increment consecutive failures
// - Check if failures exceed threshold
// - Update health status if needed
}
4. Rate Limiting
Purpose: Provides simple rate limiting to prevent message flooding.
Dependencies: roots-ws Error types
package ratelimit
// Rate limiting configuration
type Config struct {
Enabled bool // Whether rate limiting is active (default: false)
MessagesPerSec int // Maximum messages per second (default: 10)
BurstSize int // Maximum burst allowed (default: 30)
}
// Default rate limit configuration
func DefaultConfig() *Config {
return &Config{
Enabled: false, // Off by default
MessagesPerSec: 10, // Reasonable default
BurstSize: 30, // Allow short bursts
}
}
// Token bucket for rate limiting
type Limiter struct {
rate float64 // Tokens per second
maxTokens int // Maximum capacity
availableTokens float64 // Current token count
lastRefill time.Time // Last time tokens were added
mutex sync.Mutex // For thread safety
}
// Create new rate limiter with configuration
func NewLimiter(config *Config) *Limiter {
// If disabled, return nil
// Create token bucket with initial tokens
// Set rate based on configuration
// Return initialized limiter
}
// Check if operation is allowed by rate limit
func Allow(limiter *Limiter) bool {
// Return true if limiter is nil (disabled)
// Lock for thread safety
// Refill tokens based on time elapsed
// Check if bucket has at least one token
// Consume token if available and return true
// Return false if no tokens available
}
// Wait until operation is allowed with timeout
func Wait(limiter *Limiter, timeout time.Duration) error {
// Return nil if limiter is nil (disabled)
// Set up context with timeout
// Loop until context cancelled or operation allowed
// Calculate time until next token available
// Wait for token to become available or context cancelled
}
// Refill tokens based on elapsed time
func refillTokens(limiter *Limiter) {
// Calculate elapsed time since last refill
// Calculate tokens to add based on rate
// Add tokens up to max capacity
// Update last refill time
}
5. Compression
Purpose: Implements WebSocket compression support for bandwidth efficiency.
Dependencies: None
package compression
// Compression configuration
type Config struct {
Enabled bool // Whether compression is enabled (default: false)
Level int // Compression level 0-9 (default: 6)
}
// Default compression configuration
func DefaultConfig() *Config {
return &Config{
Enabled: false, // Off by default
Level: 6, // Default is balanced compression
}
}
// Enable compression on WebSocket connection
func EnableCompression(conn *websocket.Conn, config *Config) error {
// Return early if compression not enabled
// Set compression parameters based on configuration
// Enable write compression on connection
// Return any errors from enabling compression
}
// Create dialer with compression support
func ConfigureDialer(dialer *websocket.Dialer, config *Config) {
// Return early if compression not enabled
// Enable compression on dialer
// Set compression level if supported by implementation
}
// Create compression options for server
func GetServerCompressionOptions(config *Config) *websocket.CompressionOptions {
// Return nil if compression not enabled
// Create compression options with configured level
// Set appropriate parameters for server-side compression
// Return compression options
}
// Check if message should be compressed
func ShouldCompress(size int, contentType string) bool {
// Skip very small messages (under 256 bytes)
// Skip already compressed content types
// Return true for compressible content
}
6. Connection Metrics
Purpose: Collects basic connection statistics for monitoring and debugging.
Dependencies: None
package metrics
// Connection metrics collector
type Collector struct {
StartTime time.Time // When the connection was established
MessagesSent int64 // Total messages sent
MessagesReceived int64 // Total messages received
Errors int64 // Total errors encountered
ReconnectionCount int // Number of reconnection attempts
BytesSent int64 // Total bytes sent
BytesReceived int64 // Total bytes received
LastMessageSent time.Time // Timestamp of last outgoing message
LastMessageReceived time.Time // Timestamp of last incoming message
mutex sync.RWMutex // For thread-safe updates to non-atomic fields
}
// Create new metrics collector
func NewCollector() *Collector {
// Initialize collector with current timestamp
// Set initial counters to zero
// Return initialized collector
}
// Record sent message
func RecordSent(collector *Collector, size int) {
// Skip if collector is nil
// Atomically increment sent counter
// Atomically add size to bytes sent
// Update last message sent timestamp
}
// Record received message
func RecordReceived(collector *Collector, size int) {
// Skip if collector is nil
// Atomically increment received counter
// Atomically add size to bytes received
// Update last message received timestamp
}
// Record error
func RecordError(collector *Collector) {
// Skip if collector is nil
// Atomically increment error counter
}
// Record reconnection attempt
func RecordReconnection(collector *Collector) {
// Skip if collector is nil
// Safely increment reconnection counter
}
// Get connection uptime
func GetUptime(collector *Collector) time.Duration {
// Return zero if collector is nil
// Calculate duration since start time
}
// Calculate messages per second (recent rate)
func GetMessagesPerSecond(collector *Collector) float64 {
// Return zero if collector is nil or insufficient history
// Calculate messages per second based on total messages and uptime
}
// Reset metrics (typically after reconnection)
func Reset(collector *Collector, resetStartTime bool) {
// Skip if collector is nil
// Reset message counters to zero
// Reset byte counters to zero
// Reset timestamps if requested
// Keep reconnection count
}
// Get metrics snapshot as map
func GetSnapshot(collector *Collector) map[string]interface{} {
// Return empty map if collector is nil
// Create map with current metrics values
// Include derived metrics (uptime, message rates)
// Return complete snapshot
}
7. Message Router Implementation
Purpose: Implements event loops for message routing based on type.
Dependencies: roots-ws Message routing logic, Message validation
package router
// Create routing channels
func CreateChannels(additionalLabels ...string) map[string]chan []byte {
// Initialize channels for each standard message type:
// EVENT, REQ, CLOSE, NOTICE, OK, EOSE, AUTH
// Set appropriate buffer sizes for each channel type
// based on expected message volume
}
// Route a message to appropriate channel
func RouteEnvelope(msg []byte, channels map[string]chan []byte) error {
// Extract message type using roots.message.GetType()
// Verify message type is valid
// Use roots.router.DetermineRoute() to select appropriate channel
// Handle unknown message types gracefully
// Send message to channel with non-blocking behavior
// Return error if channel is full or message type is invalid
}
// Process messages from connection
func StartRouting(conn *connection.Connection, channels map[string]chan []byte) {
// Launch goroutine for message processing
// Read from connection's raw message source
// Apply message validation and size checks using roots-ws functions
// Route each message to appropriate channel
// Handle routing errors without crashing the router
// Clean up when connection closes
}
8. Write Queue Implementation
Purpose: Provides thread-safe message transmission.
Dependencies: roots-ws Error types
package write
// Thread-safe write queue
type Queue struct {
MaxSize int
conn *connection.Connection
mutex sync.Mutex
}
// Create new write queue for a connection
func NewQueue(conn *connection.Connection, maxSize int) *Queue {
// Initialize queue with specified max size
// Return fully initialized queue
}
// Send message through queue
func Send(queue *Queue, msg []byte) error {
// Check if message exceeds max size
// Acquire mutex lock for thread safety
// Write message to connection's OutChan
// Release mutex lock
// Return any write errors encountered
}
// Start queue processing
func StartQueue(queue *Queue) {
// Start goroutine to monitor connection's OutChan
// Process messages in thread-safe manner
// Handle errors appropriately
}
9. Initiator Functions
Purpose: Implements connection initiator patterns.
Dependencies: roots-ws Protocol Messages
package initiator
// Send EVENT envelope
func PublishEvent(queue *write.Queue, event []byte) error {
// Validate event format using roots-ws
// Create EVENT message using roots-ws
// Send through write queue
// Return any errors encountered
}
// Send REQ envelope
func SendReq(queue *write.Queue, subID string, filters [][]byte) error {
// Validate subscription ID format and filters
// Create REQ message using roots-ws
// Send through write queue
// Return any errors encountered
}
// Send CLOSE envelope
func SendClose(queue *write.Queue, subID string) error {
// Validate subscription ID format
// Create CLOSE message using roots-ws
// Send through write queue
// Return any errors encountered
}
// Send AUTH response envelope
func SendAuthResponse(queue *write.Queue, event []byte) error {
// Validate event format using roots-ws
// Create AUTH message using roots-ws
// Send through write queue
// Return any errors encountered
}
10. Responder Functions
Purpose: Implements connection responder patterns.
Dependencies: roots-ws Protocol Messages
package responder
// Send event within subscription context
func SendEvent(queue *write.Queue, subID string, event []byte) error {
// Validate subscription ID and event format
// Create EVENT message with subscription ID included
// Send through write queue
// Return any errors encountered
}
// Send EOSE for subscription
func SendEOSE(queue *write.Queue, subID string) error {
// Validate subscription ID format
// Create EOSE message for subscription
// Send through write queue
// Return any errors encountered
}
// Send NOTICE message
func SendNotice(queue *write.Queue, message string) error {
// Validate notice message content
// Create NOTICE message
// Send through write queue
// Return any errors encountered
}
// Send OK message for event
func SendOK(queue *write.Queue, eventID string, success bool, message string) error {
// Validate event ID format
// Create OK message with event ID, success flag, and message
// Send through write queue
// Return any errors encountered
}
// Send AUTH challenge
func SendAuthChallenge(queue *write.Queue, challenge string) error {
// Validate challenge format
// Create AUTH challenge message
// Send through write queue
// Return any errors encountered
}
// Send CLOSED message
func SendClosed(queue *write.Queue, subID string, message string) error {
// Validate subscription ID format
// Create CLOSED message with subscription ID and reason
// Send through write queue
// Return any errors encountered
}
11. Subscription Management
Purpose: Manages subscription state and handles reconnection.
Dependencies: roots-ws Subscription types
package subscription
// Registry tracks subscriptions for reconnection
type Registry struct {
mu sync.RWMutex
subscriptions map[string]Entry
}
type Entry struct {
ID string
Filters [][]byte
Active bool
}
// Create new registry
func NewRegistry() *Registry {
// Initialize empty registry with map
// Return initialized registry
}
// Add subscription to registry
func (r *Registry) Add(id string, filters [][]byte) {
// Lock mutex for thread safety
// Create entry with provided ID and filters
// Set Active to true
// Add to subscriptions map
// Unlock mutex
}
// Remove subscription from registry
func (r *Registry) Remove(id string) {
// Lock mutex for thread safety
// Delete entry from subscriptions map
// Unlock mutex
}
// Get active subscriptions
func (r *Registry) GetActive() map[string][][]byte {
// Lock mutex for read access
// Create map of active subscription IDs to filters
// Unlock mutex
// Return active subscriptions map
}
// Subscribe with registry tracking
func Subscribe(queue *write.Queue, registry *Registry, id string, filters [][]byte) error {
// Create REQ message using initiator.SendReq
// Register subscription in registry if successful
// Return any errors from send operation
}
// Unsubscribe with registry tracking
func Unsubscribe(queue *write.Queue, registry *Registry, id string) error {
// Create CLOSE message using initiator.SendClose
// Remove from registry regardless of send result
// Return any errors from send operation
}
// Register reconnect callback for a connection
func RegisterReconnectCallback(conn *connection.Connection, registry *Registry, queue *write.Queue) {
// Create callback function that reestablishes subscriptions
// Register callback with connection using OnReconnect
}
// Reestablish subscriptions after reconnection
func ReestablishSubscriptions(queue *write.Queue, registry *Registry) []error {
// Get active subscriptions from registry
// Attempt to resubscribe to each using initiator.SendReq
// Collect any errors encountered
// Return array of errors (empty if all succeeded)
}
12. Connection Pool
Purpose: Manages multiple connections with shared operations.
Dependencies: Connection, Write Queue, Router types
package pool
// Connection pool manages multiple relays
type Pool struct {
mu sync.RWMutex // Protects connections and queues maps
connections map[string]*connection.Connection
queues map[string]*write.Queue
Channels map[string]chan []byte
}
// Create new pool with optional initial relays
func New(urls []string, config connection.Config) (*Pool, error) {
// Initialize pool with empty maps
// Set up shared routing channels
// Add initial relays if provided
// Return initialized pool
}
// Add relay to the pool
func (p *Pool) AddRelay(url string) error {
// Lock mutex for thread safety
// Check if relay already exists
// Create new connection with config
// Create write queue for connection
// Start routing to shared channels
// Connect to relay
// Add to connections and queues maps
// Unlock mutex
// Return any errors encountered
}
// Remove relay from pool
func (p *Pool) RemoveRelay(url string) error {
// Lock mutex for thread safety
// Check if relay exists in pool
// Close connection
// Remove from connections and queues maps
// Unlock mutex
// Return any errors encountered
}
// Send to specific relay
func (p *Pool) SendTo(url string, env []byte) error {
// Lock mutex for read access
// Find queue for relay
// Unlock mutex
// Send envelope through queue
// Return any errors encountered
}
// Broadcast to all relays
func (p *Pool) Broadcast(env []byte) []error {
// Lock mutex for read access
// Create copy of queues map to prevent deadlock
// Unlock mutex
// Send to all relays in copy
// Collect any errors
// Return array of errors (empty if all succeeded)
}
// Subscribe to all relays
func (p *Pool) Subscribe(id string, filters [][]byte, registry *subscription.Registry) []error {
// Lock mutex for read access
// Create copy of queues map to prevent deadlock
// Unlock mutex
// Subscribe to all relays in copy
// Register subscription if successful
// Collect any errors
// Return array of errors (empty if all succeeded)
}
// Unsubscribe from all relays
func (p *Pool) Unsubscribe(id string, registry *subscription.Registry) []error {
// Lock mutex for read access
// Create copy of queues map to prevent deadlock
// Unlock mutex
// Unsubscribe from all relays in copy
// Remove from registry
// Collect any errors
// Return array of errors (empty if all succeeded)
}
// Enable automatic reconnection for all relays
func (p *Pool) EnableAutoReconnect(registry *subscription.Registry) {
// Lock mutex for read access
// Create copy of connections map to prevent deadlock
// Unlock mutex
// Register reconnect callbacks for all connections
}
// Get connection status
func (p *Pool) Status(url string) (connection.ConnectionStatus, error) {
// Lock mutex for read access
// Find connection for relay
// Unlock mutex
// Return status or error if not found
}
// Get all statuses
func (p *Pool) Statuses() map[string]connection.ConnectionStatus {
// Lock mutex for read access
// Create map of relay URLs to connection statuses
// Unlock mutex
// Return statuses map
}
// Close all connections
func (p *Pool) Close() []error {
// Lock mutex for read access
// Create copy of connections map to prevent deadlock
// Unlock mutex
// Close all connections in copy
// Collect any errors
// Return array of errors (empty if all succeeded)
}
13. Error Types
Purpose: Defines error types for the library.
package errors
import "errors"
// Connection errors
var (
ErrConnectionFailed = errors.New("connection failed")
ErrConnectionTimeout = errors.New("connection timeout")
ErrConnectionClosed = errors.New("connection closed unexpectedly")
ErrInvalidState = errors.New("invalid state for operation")
ErrMaxReconnectAttemptsExceeded = errors.New("maximum reconnection attempts exceeded")
ErrPongTimeout = errors.New("pong response timeout")
ErrConnectionUnhealthy = errors.New("connection deemed unhealthy")
)
// Write errors
var (
ErrEnvelopeTooLarge = errors.New("envelope exceeds maximum size")
ErrWriteFailed = errors.New("write failed")
)
// Router errors
var (
ErrUnknownEnvelopeType = errors.New("unknown envelope type")
ErrChannelFull = errors.New("channel buffer full")
)
// Pool errors
var (
ErrRelayNotFound = errors.New("relay not found in pool")
ErrRelayAlreadyExists = errors.New("relay already exists in pool")
)
// Subscription errors
var (
ErrSubscriptionNotFound = errors.New("subscription not found")
ErrSubscriptionDuplicate = errors.New("subscription ID already exists")
)
14. Main Package
Purpose: Exports key types and functions with sensible defaults.
Dependencies: All other packages
package honeybee
import (
"time"
"github.com/nostr-protocol/go-roots-ws"
"github.com/nostr-protocol/go-honeybee/connection"
"github.com/nostr-protocol/go-honeybee/errors"
"github.com/nostr-protocol/go-honeybee/health"
"github.com/nostr-protocol/go-honeybee/initiator"
"github.com/nostr-protocol/go-honeybee/pool"
"github.com/nostr-protocol/go-honeybee/reconnection"
"github.com/nostr-protocol/go-honeybee/responder"
"github.com/nostr-protocol/go-honeybee/router"
"github.com/nostr-protocol/go-honeybee/subscription"
"github.com/nostr-protocol/go-honeybee/write"
)
// Export key types and functions for convenient access
type Connection = connection.Connection
type ConnectionStatus = connection.ConnectionStatus
type ConnectionConfig = connection.Config
type WriteQueue = write.Queue
type Pool = pool.Pool
type SubscriptionRegistry = subscription.Registry
type ReconnectionManager = reconnection.Manager
type ReconnectionConfig = reconnection.Config
type HealthMonitor = health.Monitor
type HealthConfig = health.Config
// Default configuration
func DefaultConfig() connection.Config {
return connection.Config{
ConnectionTimeout: 10 * time.Second,
ReconnectConfig: reconnection.DefaultConfig(),
HealthConfig: health.DefaultConfig(),
MaxMessageSize: 1024 * 1024, // 1MB
}
}
Cross-Language Translation: Go to TypeScript with RxJS
Core Architectural Principles
Recognize the fundamental paradigm difference:
- Go is built around CSP (Communicating Sequential Processes) - goroutines passing data through channels
- TypeScript/RxJS is built around reactive streams - declarative transformations of event flows
- Don't translate patterns 1:1; translate intent into each language's natural expression
Design APIs that feel native:
- Go users expect constructors that return values and errors, explicit
Start()andClose()methods - TypeScript users expect constructors that return ready-to-use objects, automatic activation on subscription
- The same feature should have different surface APIs in each language
TypeScript Multi-Environment Strategy
Shared foundation with environment-specific transport:
roots-ws(TypeScript): Pure functions for envelope handling, identical across environmentshoneybee-browser: Reactive wrapper for browser WebSocket APIshoneybee-node: Reactive wrapper for Node.js WebSocket libraries- Both honeybee variants expose identical observable-based APIs while using appropriate underlying implementations
Environment-aware design patterns:
- Browser: Event-driven lifecycle, page visibility handling, CORS considerations
- Node: Process lifecycle, server patterns, granular connection control
- Shared: Message routing, subscription management, error handling patterns
Go-Specific Considerations
Embrace structured concurrency:
- Use goroutines liberally - they're cheap and the expected tool
- Always pair goroutines with cleanup using
context.Contextandsync.WaitGroup - Channels are the primary communication mechanism between concurrent operations
- Make blocking behavior explicit and expected
Error handling:
- Return errors as values, don't hide them in callbacks
- Provide typed errors users can switch on:
var ErrConnectionClosed = errors.New(...) - Use
fmt.Errorfwith%wfor error wrapping to preserve context - Consider sentinel errors for common conditions
Lifecycle management:
- Explicit
Start()andClose()methods are idiomatic Close()should be safe to call multiple times- Use
deferfor cleanup - teach users this pattern - Consider implementing
io.Closerinterface
Type system:
- Prefer interfaces for dependencies (like
Dialer,Conn) - Keep interfaces small and focused (interface segregation)
- Use struct embedding for composition
- Type assertions with the
value, ok := x.(Type)pattern for optional capabilities
Testing:
- Provide interface-based abstractions specifically for testability
- Table-driven tests are the norm
- Use
testing.Thelpers and subtests - Mock by implementing interfaces, not with frameworks
TypeScript/RxJS-Specific Considerations
Embrace reactive composition:
- Expose observables as the primary API, not methods that trigger side effects
- Let users compose streams with operators rather than providing pre-composed solutions
- Hot vs cold observables matter - document which is which
- Use subjects internally only when you need to push values into a stream
Declarative over imperative:
- Prefer defining what streams represent over how to create them
- Use operators like
map,merge,switchMapto transform streams - Avoid manual subscription management - use
takeUntilfor cleanup - Let the framework handle execution timing
Error handling:
- Errors flow through the observable pipeline
- Provide separate error streams rather than mixing errors with data
- Use
catchErrorto handle and recover from errors within streams - Consider retry strategies with
retry,retryWhen
Lifecycle management:
- Construction should be side-effect free
- Activation happens on subscription, deactivation on unsubscription
- Use
finalizeoperator for cleanup logic - Provide a
destroy()orclose()method only for resource cleanup, not stream management
Type system:
- Use TypeScript's structural typing - interfaces describe shapes
- Generics for reusable stream types:
Observable<T> - Discriminated unions for state modeling
- Leverage type inference where possible
Testing:
- Use marble testing for complex stream interactions
- Mock at the observable level, not internal implementation
- Test by subscribing and asserting on emitted values
- Use
TestSchedulerfor time-based scenarios
Environment-Specific Patterns
Browser WebSocket Handling:
// Browser: Event-driven API with limited control
class BrowserConnection {
private socket: WebSocket;
constructor(url: string) {
this.socket = new WebSocket(url);
this.setupEventListeners();
}
private setupEventListeners() {
this.socket.onopen = () => this.statusSubject.next('connected');
this.socket.onmessage = (event) => this.messagesSubject.next(event.data);
this.socket.onclose = () => this.handleReconnection();
}
// Limited connection parameter control
// CORS restrictions apply
// Browser manages some reconnection behavior
}
Node.js WebSocket Handling:
// Node: More granular control and server patterns
class NodeConnection {
private socket: WebSocket;
constructor(url: string, options: NodeWebSocketOptions) {
this.socket = new WebSocket(url, {
headers: options.headers,
rejectUnauthorized: options.ssl?.verify,
// Full control over connection parameters
});
this.setupEventListeners();
}
// Server-side acceptance patterns
static fromIncoming(socket: WebSocket): NodeConnection {
return new NodeConnection(socket);
}
// Custom authentication flows
// Process lifecycle integration
// Resource cleanup responsibilities
}
Shared Observable Patterns:
// Both environments expose identical API patterns
interface HoneybeeConnection {
readonly messages$: Observable<Uint8Array>;
readonly connectionState$: Observable<ConnectionState>;
readonly errors$: Observable<Error>;
send(data: Uint8Array): void;
close(): void;
}
Specific Feature Comparisons
Connection establishment:
// Go: explicit, returns error
conn, err := websocket.Connect("ws://example.com")
if err != nil {
return err
}
defer conn.Close()
// TypeScript Browser: declarative, errors in stream
const conn = new BrowserConnection("ws://example.com");
conn.messages$.subscribe(msg => /* ... */);
conn.errors$.subscribe(err => /* ... */);
// TypeScript Node: declarative with more options
const conn = new NodeConnection("ws://example.com", {
headers: { 'Authorization': 'Bearer token' },
ssl: { verify: true }
});
conn.messages$.subscribe(msg => /* ... */);
Reading messages:
// Go: blocking channel read in goroutine
go func() {
for msg := range conn.Incoming {
process(msg)
}
}()
// TypeScript: subscribe to observable (both environments)
conn.messages$.subscribe(msg => process(msg));
Environment-specific optimizations:
// Browser: Page visibility API integration
conn.messages$.pipe(
pausable(pageVisibility$),
// Pause subscriptions when page hidden
).subscribe(msg => process(msg));
// Node: Process lifecycle integration
process.on('SIGTERM', () => {
conn.close();
// Graceful shutdown
});
Reconnection logic:
// Go: explicit retry loop with backoff
for attempt := 0; attempt < maxRetries; attempt++ {
conn, err := dial()
if err == nil {
return conn, nil
}
time.Sleep(backoff(attempt))
}
// TypeScript Browser: Limited reconnection control
const conn$ = connect().pipe(
retryWhen(errors => errors.pipe(
delay(1000),
take(3)
))
);
// TypeScript Node: Full reconnection control
const conn$ = connect().pipe(
retryWhen(errors => errors.pipe(
switchMap((error, index) =>
customBackoffStrategy(error, index)
),
takeWhile(shouldRetry)
))
);
Library Structure Comparison
Go Structure:
honeybee/
├── connection/ // Core connection management
├── reconnection/ // Backoff and retry logic
├── health/ // Ping/pong monitoring
├── router/ // Message routing
└── pool/ // Multi-connection management
TypeScript Structure:
roots-ws/ // Shared pure functions
├── envelope.ts // Message parsing/creation
├── validation.ts // Structure validation
└── errors.ts // Error types
honeybee-browser/ // Browser-specific transport
├── connection.ts // Browser WebSocket wrapper
├── health.ts // Visibility-aware monitoring
└── pool.ts // Multi-relay management
honeybee-node/ // Node-specific transport
├── connection.ts // Node WebSocket wrapper
├── server.ts // Server-side patterns
└── pool.ts // Process-aware management
Documentation Differences
Go documentation:
- Focus on goroutine safety and channel semantics
- Show explicit error handling patterns
- Demonstrate lifecycle with defer
- Example code shows complete programs, not snippets
TypeScript documentation:
- Focus on observable composition and operators
- Show stream transformations and combinations
- Demonstrate subscription management
- Document environment-specific behaviors and limitations
- Example code shows reactive chains and patterns
- Clearly distinguish browser vs Node capabilities
Common Pitfalls to Avoid
Don't force Go patterns into TypeScript:
- No "Start()" methods that users must call
- Don't require manual subscription management
- Don't expose channels-as-observables literally
Don't force TypeScript patterns into Go:
- No "auto-starting" connections that happen in constructors
- Don't hide goroutines from users
- Don't try to make channels look like observables
Environment-specific pitfalls:
- Don't assume browser capabilities in Node implementation
- Don't assume Node.js process control in browser implementation
- Don't leak environment-specific types across the API boundary
- Don't ignore CORS implications in browser documentation
Respect memory models:
- Go: channel sends are synchronization points, document goroutine ownership
- TypeScript Browser: observables handle threading implicitly, respect event loop constraints
- TypeScript Node: observables handle threading implicitly, document process integration patterns
Handle cleanup appropriately:
- Go: explicit Close() with context cancellation
- TypeScript Browser: automatic cleanup via unsubscription, respect page lifecycle
- TypeScript Node: automatic cleanup via unsubscription and finalize, respect process lifecycle
The goal is that a Go developer looks at your Go library and thinks "this feels like Go," while TypeScript developers in both browser and Node environments think "this feels like reactive programming in my environment." They solve the same problems but use each platform's strengths rather than fighting against them.
Architecture Decision Records
ADR 1: Practical WebSocket Implementation
Context: Need to define the implementation approach for the mid-level library.
Decision: Implement honeybee as a practical, lean WebSocket transport with essential connection management, while deferring advanced features to higher levels.
Rationale:
- Provides a usable implementation that doesn't require additional libraries for basic operation
- Creates clear expectations about what functionality is included
- Ensures stability by focusing on well-understood, essential features
- Makes the library approachable for most common use cases
- Enables straightforward testing and maintenance
- Establishes a foundation that can be enhanced by mana-ws components
- Creates clear boundaries between essential and advanced functionality
Consequences:
- Must clearly define what constitutes "essential" vs "advanced" features
- Some consumers may need to add
mana-wsfor specialized requirements - Implementation will be more complex than
roots-wsbut simpler thanmana-ws - Strikes a balance between functionality and simplicity
- Creates a natural upgrade path as application needs grow
- Interfaces designed to enable drop-in replacements from mana-ws
- Clear criteria established for what constitutes "essential" functionality
ADR 2: Event Loop Based Architecture
Context: Need to determine how to handle asynchronous operations.
Decision: Use event loops with channels (Go) or observables (TypeScript) for asynchronous message handling.
Rationale:
- Leverages the natural concurrency models of each language
- Provides consistent patterns for handling async events
- Enables composition with existing application event loops
- Makes asynchronous operations explicit and manageable
- Creates clear patterns for resource management and cleanup
Consequences:
- Requires careful channel/observable management to prevent leaks
- Creates more complex control flow than synchronous code
- Improves scalability for high-volume connections
- Requires education for developers unfamiliar with the pattern
- Enables consistent error handling patterns
ADR 3: Role-Specific Connection Patterns
Context: Nostr protocol has asymmetric connection roles.
Decision: Implement distinct initiator and responder patterns while sharing core connection code.
Rationale:
- Creates clear API boundaries for each role
- Prevents incorrect usage of operations not permitted for a role
- Allows specialized optimizations for each role
- Makes role expectations explicit in code
- Follows the principle of least privilege
Consequences:
- Increases API surface area with role-specific functions
- Improves clarity about permitted operations for each role
- May require some code duplication for shared functionality
- Makes incorrect usage harder by design
- Simplifies application code by providing role-appropriate functions
ADR 4: Simple State Management
Context: Need to balance functionality with complexity.
Decision: Implement simple, explicit state management without sophisticated tracking or analytics.
Rationale:
- Keeps the library focused and understandable
- Provides essential functionality without complexity
- Makes state changes explicit and traceable
- Reduces potential for bugs in state transitions
- Aligns with the functional approach of the stack
Consequences:
- Advanced state management must be implemented by consumers or
mana-ws - Functionality is more limited but more reliable
- State transitions are more predictable
- Less risk of unexpected behavior
- May require more manual management for complex applications
ADR 5: Basic Resilience Features
Context: Need to determine what resilience features belong in this layer.
Decision: Include basic reconnection and health monitoring with simple strategies.
Rationale:
- Provides essential resilience without complex implementation
- Makes connections practical for real-world usage
- Handles common failure scenarios gracefully
- Avoids complex recovery strategies that belong in
mana-ws - Ensures basic reliability without overwhelming complexity
Consequences:
- Limited to simpler reconnection strategies
- May not handle all edge cases optimally
- Provides "good enough" reliability for most applications
- Creates clear extension points for more sophisticated strategies
- Makes expected behavior clear and predictable
ADR 6: Dependency Minimization
Context: Need to determine external dependency strategy.
Decision: Limit dependencies to roots-ws and standard libraries only.
Rationale:
- Reduces potential for dependency conflicts
- Simplifies maintenance and upgrades
- Creates predictable behavior across environments
- Improves security by limiting attack surface
- Makes the library more self-contained and portable
Consequences:
- May require implementing functionality available in external libraries
- Increases code that must be maintained within the project
- Reduces unexpected behavior from external dependencies
- Improves long-term stability
- May limit some advanced features that would require external libraries
ADR 7: Simplicity as a First-Class Feature
Context: Need to determine how to balance functionality with ease of understanding and maintenance.
Decision: Treat simplicity as a first-class feature by deliberately deferring advanced functionality to separate repositories (mana-ws) and maintaining clear, focused APIs.
Rationale:
- Creates a more approachable entry point for new developers
- Reduces cognitive load for common use cases
- Improves maintainability by constraining scope
- Enhances testability through focused functionality
- Provides clearer documentation and learning path
- Makes complexity an opt-in choice rather than a requirement
- Allows the library to excel at its core purpose
Consequences:
- Some advanced use cases will require additional libraries
- Clear criteria needed for what belongs in honeybee vs. mana-ws
- Requires discipline to maintain simplicity over time
- Documentation must clearly explain the layered approach
- May need to resist pressure to add advanced features directly
ADR 8: Interface-First Design for Extension
Context: Need to ensure honeybee components can be extended or replaced by more sophisticated implementations.
Decision: Design honeybee using explicit interfaces for all core components, with minimal surface area that future extensions can implement while adding capabilities.
Rationale:
- Enables drop-in replacements from advanced libraries
- Creates clear extension points for specialized behavior
- Focuses implementation on essential behaviors
- Makes dependencies explicit through interfaces
- Provides stable API contracts for both users and extenders
- Supports future evolution without breaking changes
- Aligns with Go's interface-based composition patterns
Consequences:
- Requires more initial design effort
- Interface definitions must be stable and forward-thinking
- Implementation details must remain behind interfaces
- May constrain some implementation choices
- Enables cleaner separation between layers
- Improves testability through interface-based mocks
mana-ws
Functional Architecture
1. Advanced Write Queue
Purpose: Implements sophisticated back-pressure management, prioritization, and batching.
Dependencies: honeybee WriteQueue, roots-ws Error types
// Go implementation
// Core types extend honeybee types
type AdvancedWriteQueue struct {
honeybee.WriteQueue
HighWatermark int
LowWatermark int
UnderPressure bool
PriorityLevels int // Number of priority levels (default: 3)
PriorityQueues [][]Message // Separate queues for different priorities
BatchSize int // Maximum messages to batch (default: 10)
BatchDelay time.Duration // Maximum delay before sending batch (default: 20ms)
Stats WriteQueueStats
bufferPool *sync.Pool // Pool of reusable buffers
}
type WriteQueueStats struct {
MessagesQueued int64
MessagesSent int64
WriteErrors int64
DroppedMessages int64
DelayedMessages int64
BatchesSent int64
AverageBatchSize float64
PressureEvents int64
PressureDuration time.Duration
}
// Message with priority information
type Message struct {
Content []byte
Priority int // Higher number = higher priority
Created time.Time
}
// Create new advanced write queue with prioritization
func New(maxSize int, priorityLevels int) *AdvancedWriteQueue {
// Initialize queue with specified max size
// Set default watermarks (high=80%, low=50% of max size)
// Create priority queues array based on priorityLevels (default: 3)
// Initialize buffer pool for message batching
// Initialize statistics counters
// Set initial pressure state to false
// Return fully initialized queue
}
// Initialize buffer pool for efficient memory usage
func initBufferPool() *sync.Pool {
// Create sync.Pool for byte buffers
// Set New function to create buffers with reasonable initial size
// Return initialized pool
}
// Send message through queue with back pressure and priority
func SendWithPriority(queue *AdvancedWriteQueue, socket *websocket.Conn, msg []byte, priority int) error {
// Check if message exceeds max size
// Acquire mutex lock for thread safety
// Apply back pressure strategies if under pressure
// Add message to appropriate priority queue
// Process queue if batch ready or timer expired
// Update statistics
// Release mutex lock
// Return any errors encountered
}
// Process queued messages in priority order with batching
func ProcessQueue(queue *AdvancedWriteQueue, socket *websocket.Conn) {
// Start from highest priority queue
// Get buffer from pool
// Batch multiple messages into single WebSocket frame if possible
// For each priority level:
// - Collect up to BatchSize messages or until queue empty
// - Add to buffer with appropriate framing
// Send batched messages as single WebSocket frame when possible
// Return buffer to pool after use
// Update statistics
}
// Update queue statistics
func UpdateStats(queue *AdvancedWriteQueue, messageSize int, batchSize int, success bool) {
// Atomically update message counters
// Track bytes sent if successful
// Update batch statistics
// Increment error counter if failed
// Update queue size tracking
// Recalculate back pressure status
// Update exponential moving average of batch size
}
// Check if queue is under back pressure
func IsUnderPressure(queue *AdvancedWriteQueue) bool {
// Return current back pressure state
// Thread-safe access to pressure flag
}
// Update back pressure status
func UpdateBackPressure(queue *AdvancedWriteQueue) {
// Check current queue size against watermarks
// If under pressure and size drops below low watermark, release pressure
// If not under pressure and size exceeds high watermark, apply pressure
// Log pressure state changes
// Track pressure event duration
// Thread-safe update of pressure flag
}
// Apply back pressure strategies
func ApplyBackPressure(queue *AdvancedWriteQueue, msg []byte, priority int) (shouldSend bool, delay time.Duration) {
// If not under pressure, allow message with no delay
// If under pressure, apply priority-based filtering:
// - Always allow highest priority messages (control messages, CLOSE, EOSE)
// - Apply progressive dropping based on priority levels
// - Lower priorities have higher drop probability when under pressure
// - Calculate appropriate delay for lower priority messages
// - Update statistics for dropped/delayed messages
// Return decision on whether to send and how long to delay
}
// Create prepared message for efficient reuse
func PrepareMessage(queue *AdvancedWriteQueue, messageType int, data []byte) (*websocket.PreparedMessage, error) {
// Create WebSocket PreparedMessage (compressed if enabled)
// This allows the message to be prepared once but sent to multiple destinations
// Return prepared message or error
}
// Send prepared message through queue
func SendPrepared(queue *AdvancedWriteQueue, socket *websocket.Conn, prepared *websocket.PreparedMessage, priority int) error {
// Similar to SendWithPriority but for prepared messages
// Avoids repeated compression/preparation costs for broadcast scenarios
// Update statistics
// Return any errors
}
2. Connection Pool
Purpose: Manages multiple connections with advanced load distribution, health-based routing, and adaptive scaling.
Dependencies: honeybee Connection Pool
// Core types extend honeybee types
type AdvancedPool struct {
honeybee.Pool
LoadBalancingStrategy Strategy
HealthThreshold float64 // Minimum health score for active connections
MaxConnectionsPerHost int // Connection limit per host
ResourceMonitor *ResourceMonitor
Stats PoolStatistics
connectionCache *lru.Cache // Cache recently used connections
mutex sync.RWMutex
}
// Load balancing strategies
type Strategy int
const (
RoundRobin Strategy = iota
LeastConnections
ResponseTime
WeightedRandom
ConsistentHashing
)
// Resource allocation monitor
type ResourceMonitor struct {
TotalMemoryLimit int64
TotalConnectionLimit int
MemoryPerConnection int64
CurrentMemoryUsage int64
GoroutineLimit int
CurrentGoroutines int
}
// Pool statistics
type PoolStatistics struct {
ActiveConnections int
TotalMessages int64
MessagesPerSecond float64
ResponseTimeAverage time.Duration
ResponseTimeP95 time.Duration
ConnectionDistribution map[string]int // Count by host
FailoverEvents int
ResourceUtilization float64 // 0.0-1.0
lastCalculated time.Time
}
// Create new advanced connection pool
func NewAdvancedPool(config Config) *AdvancedPool {
// Initialize pool with honeybee base pool
// Set up load balancing strategy
// Initialize connection cache for connection reuse
// Set up resource monitor
// Initialize connection statistics tracking
// Return initialized pool
}
// Add connection with resource constraints
func (p *AdvancedPool) AddConnection(url string, config honeybee.ConnectionConfig) error {
// Check if adding connection would exceed resource limits
// Check if host already has maximum connections
// Add connection if resources available
// Update resource utilization
// Return error if resource constraints prevent addition
}
// Get connection using load balancing strategy
func (p *AdvancedPool) GetConnection(key string) (*honeybee.Connection, error) {
// Use load balancing strategy to select connection
// For consistent hashing, use key to determine connection
// For least connections, find connection with fewest active requests
// For response time, prefer faster connections
// Return selected connection or error if none available
}
// Monitor health across pool and rebalance as needed
func (p *AdvancedPool) MonitorHealth() {
// Periodically check health of all connections
// Remove unhealthy connections from active pool
// Rebalance load when connections change state
// Attempt recovery of unhealthy connections in background
}
// Manage resource allocation dynamically
func (p *AdvancedPool) ManageResources() {
// Track memory and goroutine usage
// Adjust connection limits based on current system load
// Implement graceful degradation when approaching resource limits
// Scale back non-essential operations under resource pressure
}
// Calculate and update pool statistics
func (p *AdvancedPool) UpdateStatistics() {
// Calculate messages per second rate
// Update response time metrics
// Track connection distribution
// Calculate resource utilization percentage
// Identify imbalances or potential problems
}
// Efficient connection lifecycle management
func (p *AdvancedPool) ManageLifecycle() {
// Implement connection recycling for frequently used endpoints
// Use connection cache to reduce connection overhead
// Gracefully phase out connections that need replacement
// Pre-warm connections for known high-traffic endpoints
}
3. Advanced Subscription Repository
Purpose: Implements sophisticated tracking and restoration of subscriptions with adaptive optimization.
Dependencies: honeybee Subscription Registry, roots-ws Subscription types
// Core types extend honeybee types
type AdvancedSubscriptionRepository struct {
honeybee.SubscriptionRegistry
CreationTimes map[string]time.Time // When subscriptions were created
UsageStats map[string]UsageStatistics
PriorityMap map[string]int // Subscription priorities
RestorationOrder []string // Ordered list for restoration
MaxSubscriptions int // Limit on active subscriptions
bufferPool *sync.Pool // For filter processing
}
type UsageStatistics struct {
MessageCount int64
LastActivity time.Time
ActivityFrequency float64 // Messages per minute
ResponseTimes []time.Duration // Recent response times
AvgResponseTime time.Duration
ImportanceScore float64 // Calculated from usage patterns
}
// Create new advanced repository
func NewAdvancedRepository(maxSubs int) *AdvancedSubscriptionRepository {
// Initialize base subscription registry
// Set up subscription statistics tracking
// Initialize buffer pool for filter operations
// Set maximum subscription limit
// Initialize ordering for intelligent restoration
// Return initialized repository
}
// Add subscription with priority
func (r *AdvancedSubscriptionRepository) AddWithPriority(id string, filters [][]byte, priority int) {
// Add to base registry
// Record creation time
// Set initial priority
// Initialize usage statistics
// Update restoration order based on priority
// Enforce subscription limits if reached
}
// Update subscription statistics on activity
func (r *AdvancedSubscriptionRepository) RecordActivity(id string, msgCount int, responseTime time.Duration) {
// Update message count
// Record last activity timestamp
// Update activity frequency calculation
// Add to response time history
// Recalculate importance score
// Potentially adjust priority based on usage patterns
}
// Intelligently restore subscriptions after reconnection
func (r *AdvancedSubscriptionRepository) RestoreWithPriority(queue *honeybee.WriteQueue, rateLimit int) []error {
// Sort subscriptions by priority and importance
// Restore highest priority/most important first
// Apply rate limiting to avoid flooding connection
// Track restoration outcomes
// Update statistics after restoration
// Return errors for failed restorations
}
// Optimize subscription set
func (r *AdvancedSubscriptionRepository) OptimizeSubscriptions() []string {
// Identify inactive or low-value subscriptions
// Suggest subscriptions to combine or eliminate
// Return list of subscription IDs to consider removing
}
// Manage subscription lifecycle
func (r *AdvancedSubscriptionRepository) ManageLifecycle() {
// Periodically review subscription activity
// Age out unused subscriptions
// Consolidate similar subscriptions when possible
// Balance subscription load across connections
}
4. Advanced Reconnection
Purpose: Implements sophisticated reconnection with circuit breaker pattern and predictive strategies.
Dependencies: honeybee Reconnection, roots-ws Error types
// Core types extend honeybee types
type AdvancedReconnectionConfig struct {
honeybee.ReconnectionConfig
Jitter float64 // Random jitter factor (0.0-1.0)
CircuitBreakerConfig CircuitBreakerConfig
FailureCategories map[string]FailureStrategy // Different strategies for error types
ProbeTimeout time.Duration // Timeout for probe attempts in half-open state
AdaptiveStrategy bool // Whether to use adaptive backoff based on history
}
type CircuitBreakerConfig struct {
Enabled bool
FailureThreshold int // Failures before opening circuit
ResetTimeout time.Duration // Time before attempting reset
HalfOpenMaxCalls int // Max calls in half-open state
}
type CircuitState int
const (
CircuitClosed CircuitState = iota // Normal operation
CircuitOpen // Failing, no attempts allowed
CircuitHalfOpen // Testing if recovered
)
type AdvancedReconnectionState struct {
honeybee.ReconnectionState
FailureCount int
CircuitState CircuitState
CircuitOpenUntil time.Time
FailureHistory []FailureRecord // Recent failures for pattern detection
SuccessfulConnects int // Consecutive successful connections
ReconnectionTimes []time.Duration // Historical reconnection times
}
type FailureRecord struct {
Time time.Time
ErrorType string
Message string
Duration time.Duration // How long the attempt took
}
type FailureStrategy struct {
MaxAttempts int
BaseDelay time.Duration
MaxDelay time.Duration
BackoffFactor float64
}
// Create default advanced reconnection config
func DefaultAdvancedConfig() AdvancedReconnectionConfig {
// Start with honeybee default config
baseConfig := honeybee.reconnection.DefaultConfig()
return AdvancedReconnectionConfig{
ReconnectionConfig: baseConfig,
Jitter: 0.2,
CircuitBreakerConfig: CircuitBreakerConfig{
Enabled: true,
FailureThreshold: 10,
ResetTimeout: 5 * time.Minute,
HalfOpenMaxCalls: 3,
},
FailureCategories: map[string]FailureStrategy{
// Different strategies for different error types
"network": {MaxAttempts: 5, BaseDelay: 1 * time.Second, MaxDelay: 30 * time.Second, BackoffFactor: 2.0},
"dns": {MaxAttempts: 3, BaseDelay: 5 * time.Second, MaxDelay: 60 * time.Second, BackoffFactor: 2.0},
"server": {MaxAttempts: 10, BaseDelay: 500 * time.Millisecond, MaxDelay: 10 * time.Second, BackoffFactor: 1.5},
"auth": {MaxAttempts: 2, BaseDelay: 10 * time.Second, MaxDelay: 60 * time.Second, BackoffFactor: 2.0},
},
ProbeTimeout: 10 * time.Second,
AdaptiveStrategy: true,
}
}
// Create new advanced reconnection state
func NewAdvancedState() *AdvancedReconnectionState {
// Initialize with base reconnection state
baseState := honeybee.reconnection.NewState()
return &AdvancedReconnectionState{
ReconnectionState: *baseState,
FailureCount: 0,
CircuitState: CircuitClosed,
CircuitOpenUntil: time.Time{},
FailureHistory: make([]FailureRecord, 0, 10),
SuccessfulConnects: 0,
ReconnectionTimes: make([]time.Duration, 0, 10),
}
}
// Calculate next backoff delay with jitter
func CalculateDelayWithJitter(state *AdvancedReconnectionState, config AdvancedReconnectionConfig, errorType string) time.Duration {
// Get failure strategy for this error type
strategy, exists := config.FailureCategories[errorType]
if !exists {
strategy = config.FailureCategories["network"] // Default
}
// Calculate base delay based on strategy
attempt := state.FailureCount
baseDelay := strategy.BaseDelay * time.Duration(math.Pow(float64(strategy.BackoffFactor), float64(attempt)))
// Cap at max delay
if baseDelay > strategy.MaxDelay {
baseDelay = strategy.MaxDelay
}
// Apply adaptive strategy if enabled
if config.AdaptiveStrategy {
// Analyze failure patterns to adjust delay
// If failures occur at regular intervals, avoid those times
// If time-of-day patterns exist, factor those in
}
// Apply jitter: delay * (1 ± jitterFactor * random)
jitterRange := float64(baseDelay) * config.Jitter
jitterAmount := (rand.Float64() * 2 - 1) * jitterRange // -jitterRange to +jitterRange
finalDelay := time.Duration(float64(baseDelay) + jitterAmount)
if finalDelay < 0 {
finalDelay = 0
}
return finalDelay
}
// Check circuit breaker state
func CheckCircuitState(state *AdvancedReconnectionState, config AdvancedReconnectionConfig) CircuitState {
// If circuit is closed, return closed
if state.CircuitState == CircuitClosed {
return CircuitClosed
}
// If circuit is open, check if reset timeout has passed
if state.CircuitState == CircuitOpen {
if time.Now().After(state.CircuitOpenUntil) {
// Transition to half-open
state.CircuitState = CircuitHalfOpen
return CircuitHalfOpen
}
return CircuitOpen
}
// If half-open, allow limited testing
return CircuitHalfOpen
}
// Start reconnection with circuit breaker
func StartReconnectionWithCircuitBreaker(conn *honeybee.Connection, state *AdvancedReconnectionState, config AdvancedReconnectionConfig, errorType string) error {
// Check circuit breaker state
circuitState := CheckCircuitState(state, config)
// If circuit is open, return circuit breaker error
if circuitState == CircuitOpen {
return errors.New("circuit breaker open")
}
// For half-open state, only allow limited probe attempts
if circuitState == CircuitHalfOpen {
// Enforce timeout for probe attempts
ctx, cancel := context.WithTimeout(context.Background(), config.ProbeTimeout)
defer cancel()
// Attempt connection with timeout
// On success, close circuit
// On failure, reopen circuit
}
// Increment failure count for closed circuit
state.FailureCount++
// Record failure details for pattern analysis
state.FailureHistory = append(state.FailureHistory, FailureRecord{
Time: time.Now(),
ErrorType: errorType,
Message: "Connection failed",
})
// Check if failure count exceeds circuit breaker threshold
if state.FailureCount >= config.CircuitBreakerConfig.FailureThreshold {
// Open circuit
state.CircuitState = CircuitOpen
state.CircuitOpenUntil = time.Now().Add(config.CircuitBreakerConfig.ResetTimeout)
return errors.New("circuit breaker opened after repeated failures")
}
// Calculate delay with jitter
delay := CalculateDelayWithJitter(state, config, errorType)
// Apply backoff delay
time.Sleep(delay)
// Use honeybee reconnection logic for actual reconnection attempt
return nil
}
// Reset circuit breaker after successful connection
func ResetCircuitBreaker(state *AdvancedReconnectionState) {
// Reset failure count to zero
state.FailureCount = 0
// Close circuit if open or half-open
state.CircuitState = CircuitClosed
// Reset circuit open until time
state.CircuitOpenUntil = time.Time{}
// Increment successful connects counter
state.SuccessfulConnects++
// Record success for pattern analysis
}
// Analyze failure patterns for optimized reconnection
func AnalyzeFailurePatterns(state *AdvancedReconnectionState) map[string]interface{} {
// Analyze timing patterns in failures
// Detect time-of-day patterns
// Identify error type frequencies
// Calculate success/failure ratio
// Return insights that can be used to optimize reconnection strategy
return nil // Placeholder
}
5. Advanced Connection Health
Purpose: Implements sophisticated health monitoring with predictive failure detection.
Dependencies: honeybee Health Monitor, roots-ws Error types
// Core types extend honeybee types
type AdvancedHealthConfig struct {
honeybee.HealthConfig
FailureThreshold int // Failures before connection marked unhealthy
DegradationThreshold int // Threshold for degraded status
LatencyThreshold time.Duration // Latency indicating performance issues
SamplingInterval time.Duration // How often to collect metrics
TrendWindowSize int // Number of samples for trend analysis
PredictionEnabled bool // Whether to use predictive modeling
}
type HealthStatus int
const (
StatusHealthy HealthStatus = iota
StatusDegraded // Working but showing signs of issues
StatusUnhealthy // Not working reliably
StatusFailing // Consistently failing
)
type AdvancedHealthState struct {
honeybee.HealthMonitor
Status HealthStatus
PingsSent int
PongsReceived int
MissedPongs int
PongLatencies []time.Duration // Recent latency measurements
LatencyPercentiles map[string]time.Duration // p50, p90, p99
LatencyTrend float64 // Direction and magnitude of trend
ErrorRates map[string]float64 // Error rates by category
PredictiveScore float64 // 0.0-1.0, likelihood of failure
HealthHistory []HealthRecord // Time series of health measurements
LastUpdated time.Time
}
type HealthRecord struct {
Timestamp time.Time
Status HealthStatus
Latency time.Duration
ErrorRate float64
ResourceUsage float64 // CPU/memory used by connection
}
// Create default advanced health config
func DefaultAdvancedConfig() AdvancedHealthConfig {
// Start with honeybee default config
baseConfig := honeybee.health.DefaultConfig()
return AdvancedHealthConfig{
HealthConfig: baseConfig,
FailureThreshold: 3,
DegradationThreshold: 2,
LatencyThreshold: 500 * time.Millisecond,
SamplingInterval: 10 * time.Second,
TrendWindowSize: 10,
PredictionEnabled: true,
}
}
// Create new advanced health state
func NewAdvancedState() *AdvancedHealthState {
// Initialize with base health state
baseMonitor := honeybee.health.NewMonitor(nil, nil)
return &AdvancedHealthState{
HealthMonitor: *baseMonitor,
Status: StatusHealthy,
PingsSent: 0,
PongsReceived: 0,
MissedPongs: 0,
PongLatencies: make([]time.Duration, 0, 10),
LatencyPercentiles: make(map[string]time.Duration),
LatencyTrend: 0.0,
ErrorRates: make(map[string]float64),
PredictiveScore: 0.0,
HealthHistory: make([]HealthRecord, 0, 100),
LastUpdated: time.Now(),
}
}
// Start advanced health monitoring
func StartAdvancedMonitoring(conn *websocket.Conn, state *AdvancedHealthState, config AdvancedHealthConfig) {
// Use honeybee health monitoring as base
honeybee.health.StartMonitoring(conn)
// Add handlers for tracking additional metrics
// Set up latency tracking
// Initialize health score calculation
// Start trend analysis
// Set up periodic health assessment
}
// Handle pong with latency tracking
func HandlePongWithLatency(state *AdvancedHealthState, sentTime time.Time) *AdvancedHealthState {
// Calculate latency between sent time and now
latency := time.Since(sentTime)
// Update basic pong handling using honeybee
state.PongsReceived++
// Add latency to tracking array (limited size)
state.PongLatencies = append(state.PongLatencies, latency)
if len(state.PongLatencies) > 20 {
// Remove oldest entry
state.PongLatencies = state.PongLatencies[1:]
}
// Update health history
state.HealthHistory = append(state.HealthHistory, HealthRecord{
Timestamp: time.Now(),
Latency: latency,
Status: state.Status,
})
// Recalculate latency percentiles
calculateLatencyPercentiles(state)
// Update latency trend
calculateLatencyTrend(state)
// Update health score based on latency
calculateHealthScore(state, config)
// Reset missed pongs counter
state.MissedPongs = 0
return state
}
// Handle missed pong with health impact
func HandleMissedPong(state *AdvancedHealthState, config AdvancedHealthConfig) *AdvancedHealthState {
// Increment missed pongs counter
state.MissedPongs++
// Update health score based on missed pong
calculateHealthScore(state, config)
// Update health history
state.HealthHistory = append(state.HealthHistory, HealthRecord{
Timestamp: time.Now(),
Status: state.Status,
ErrorRate: float64(state.MissedPongs) / float64(state.PingsSent),
})
// Check against degradation threshold
if state.MissedPongs >= config.DegradationThreshold && state.Status == StatusHealthy {
state.Status = StatusDegraded
}
// Check against failure threshold
if state.MissedPongs >= config.FailureThreshold {
state.Status = StatusUnhealthy
}
return state
}
// Calculate latency percentiles
func calculateLatencyPercentiles(state *AdvancedHealthState) {
// Need at least a few measurements
if len(state.PongLatencies) < 5 {
return
}
// Sort latencies for percentile calculation
sortedLatencies := make([]time.Duration, len(state.PongLatencies))
copy(sortedLatencies, state.PongLatencies)
sort.Slice(sortedLatencies, func(i, j int) bool {
return sortedLatencies[i] < sortedLatencies[j]
})
// Calculate p50 (median)
p50Index := len(sortedLatencies) / 2
state.LatencyPercentiles["p50"] = sortedLatencies[p50Index]
// Calculate p90
p90Index := int(float64(len(sortedLatencies)) * 0.9)
if p90Index >= len(sortedLatencies) {
p90Index = len(sortedLatencies) - 1
}
state.LatencyPercentiles["p90"] = sortedLatencies[p90Index]
// Calculate p99
p99Index := int(float64(len(sortedLatencies)) * 0.99)
if p99Index >= len(sortedLatencies) {
p99Index = len(sortedLatencies) - 1
}
state.LatencyPercentiles["p99"] = sortedLatencies[p99Index]
}
// Calculate latency trend
func calculateLatencyTrend(state *AdvancedHealthState) {
// Need enough history for trend
if len(state.PongLatencies) < 5 {
state.LatencyTrend = 0
return
}
// Simple linear regression on recent latencies
n := float64(len(state.PongLatencies))
sumX := 0.0
sumY := 0.0
sumXY := 0.0
sumXX := 0.0
for i, latency := range state.PongLatencies {
x := float64(i)
y := float64(latency.Milliseconds())
sumX += x
sumY += y
sumXY += x * y
sumXX += x * x
}
// Calculate slope of trendline
slope := (n*sumXY - sumX*sumY) / (n*sumXX - sumX*sumX)
// Normalize to -1.0 to 1.0 range for consistent reporting
// Positive means increasing latency (degradation)
// Negative means decreasing latency (improvement)
avgLatency := sumY / n
if avgLatency != 0 {
state.LatencyTrend = slope / avgLatency * 10 // Scale factor
} else {
state.LatencyTrend = 0
}
// Clamp to range
if state.LatencyTrend > 1.0 {
state.LatencyTrend = 1.0
} else if state.LatencyTrend < -1.0 {
state.LatencyTrend = -1.0
}
}
// Calculate health score based on all metrics
func calculateHealthScore(state *AdvancedHealthState, config AdvancedHealthConfig) {
// Start with perfect score (1.0)
score := 1.0
// Reduce score based on missed pongs
if state.PingsSent > 0 {
missRate := float64(state.MissedPongs) / float64(state.PingsSent)
score -= missRate * 0.5 // Up to 50% reduction for all pongs missed
}
// Reduce score based on latency relative to threshold
if len(state.PongLatencies) > 0 && config.LatencyThreshold > 0 {
avgLatency := average(state.PongLatencies)
if avgLatency > config.LatencyThreshold {
// Reduce score proportionally to how much threshold is exceeded
latencyFactor := float64(avgLatency) / float64(config.LatencyThreshold)
score -= math.Min(0.3, (latencyFactor-1.0)*0.1) // Up to 30% reduction
}
}
// Reduce score based on negative trend
if state.LatencyTrend > 0 {
score -= state.LatencyTrend * 0.2 // Up to 20% reduction for worsening trend
}
// Calculate predictive score based on patterns
if config.PredictionEnabled {
state.PredictiveScore = predictFailure(state)
// Incorporate predictive score into health score
score -= state.PredictiveScore * 0.2 // Up to 20% reduction for likely failure
}
// Update status based on score
if score < 0.5 {
state.Status = StatusUnhealthy
} else if score < 0.8 {
state.Status = StatusDegraded
} else {
state.Status = StatusHealthy
}
}
// Predict failure likelihood based on historical patterns
func predictFailure(state *AdvancedHealthState) float64 {
// Need enough history for prediction
if len(state.HealthHistory) < 10 {
return 0.0
}
// Detect patterns that historically preceded failures
// Look for increasing latency followed by errors
// Identify cyclic patterns in performance
// Check for correlation between resource usage and failures
// This would use more sophisticated algorithms in production
// For now, just base on trend and recent errors
// Simplified predictor: combine trend with recent error rate
recentErrorRate := 0.0
if state.PingsSent > 0 {
recentErrorRate = float64(state.MissedPongs) / float64(state.PingsSent)
}
// Weight trend more heavily if it's negative (worsening)
trendContribution := math.Max(0, state.LatencyTrend) * 0.6
return trendContribution + recentErrorRate*0.4 // Combined score
}
// Calculate average of duration slice
func average(durations []time.Duration) time.Duration {
if len(durations) == 0 {
return 0
}
var sum time.Duration
for _, d := range durations {
sum += d
}
return sum / time.Duration(len(durations))
}
// Check if connection is degraded but still working
func IsDegraded(state *AdvancedHealthState) bool {
return state.Status == StatusDegraded
}
// Check connection health against threshold
func IsHealthyWithThreshold(state *AdvancedHealthState, threshold float64) bool {
// Calculate current health score
score := 1.0 - (float64(state.MissedPongs) / float64(max(1, state.PingsSent)))
// Account for latency
if len(state.PongLatencies) > 0 {
avgLatency := average(state.PongLatencies)
if avgLatency > 300*time.Millisecond {
// Reduce score for high latency
score -= 0.2
}
}
return score >= threshold
}
6. In-Flight Operations
Purpose: Tracks and manages ongoing operations with cancellation support.
Dependencies: roots-ws Error types
// Core types
type OperationType string
const (
OperationSubscription OperationType = "subscription"
OperationPublication OperationType = "publication"
OperationConnection OperationType = "connection"
OperationQuery OperationType = "query"
)
type OperationStatus int
const (
StatusPending OperationStatus = iota
StatusInProgress
StatusCompleted
StatusFailed
StatusCancelled
StatusTimedOut
)
type Operation struct {
ID string
Type OperationType
StartTime time.Time
EndTime time.Time
Status OperationStatus
Cancelled bool
Done chan struct{}
Result interface{}
Error error
Priority int // Priority level for resource allocation
Metadata map[string]interface{}
mutex sync.Mutex
}
type OperationRegistry struct {
Operations map[string]*Operation
Mutex sync.RWMutex
Stats OperationStats
}
type OperationStats struct {
TotalOperations int64
CompletedOperations int64
FailedOperations int64
CancelledOperations int64
TimedOutOperations int64
AvgCompletionTime time.Duration
OperationsByType map[OperationType]int64
}
// Create new operation registry
func NewRegistry() *OperationRegistry {
return &OperationRegistry{
Operations: make(map[string]*Operation),
Mutex: sync.RWMutex{},
Stats: OperationStats{
OperationsByType: make(map[OperationType]int64),
},
}
}
// Generate unique operation ID
func generateOperationID() string {
// Generate UUID or other unique identifier
return fmt.Sprintf("op-%d-%s", time.Now().UnixNano(), generateRandomString(6))
}
// Create and register new operation
func RegisterOperation(registry *OperationRegistry, opType OperationType, priority int) *Operation {
// Generate unique operation ID
opID := generateOperationID()
// Create operation object
op := &Operation{
ID: opID,
Type: opType,
StartTime: time.Now(),
Status: StatusPending,
Done: make(chan struct{}),
Priority: priority,
Metadata: make(map[string]interface{}),
}
// Acquire write lock
registry.Mutex.Lock()
// Add operation to registry
registry.Operations[opID] = op
// Update stats
registry.Stats.TotalOperations++
registry.Stats.OperationsByType[opType]++
// Release lock
registry.Mutex.Unlock()
return op
}
// Mark operation as in progress
func StartOperation(registry *OperationRegistry, opID string) error {
// Acquire read lock to get operation
registry.Mutex.RLock()
op, exists := registry.Operations[opID]
registry.Mutex.RUnlock()
if !exists {
return errors.New("operation not found")
}
// Update operation status
op.mutex.Lock()
defer op.mutex.Unlock()
if op.Status != StatusPending {
return errors.New("operation not in pending state")
}
op.Status = StatusInProgress
return nil
}
// Complete operation
func CompleteOperation(registry *OperationRegistry, opID string, result interface{}, err error) error {
// Acquire read lock to get operation
registry.Mutex.RLock()
op, exists := registry.Operations[opID]
registry.Mutex.RUnlock()
if !exists {
return errors.New("operation not found")
}
// Update operation
op.mutex.Lock()
// Set result and error
op.Result = result
op.Error = err
op.EndTime = time.Now()
// Update status based on error
if err != nil {
op.Status = StatusFailed
} else {
op.Status = StatusCompleted
}
// Signal completion via done channel
close(op.Done)
op.mutex.Unlock()
// Update registry stats with write lock
registry.Mutex.Lock()
defer registry.Mutex.Unlock()
// Update stats based on outcome
if err != nil {
registry.Stats.FailedOperations++
} else {
registry.Stats.CompletedOperations++
// Update average completion time
completionTime := op.EndTime.Sub(op.StartTime)
if registry.Stats.CompletedOperations > 1 {
// Calculate new average
oldWeight := float64(registry.Stats.CompletedOperations-1) / float64(registry.Stats.CompletedOperations)
newWeight := 1.0 / float64(registry.Stats.CompletedOperations)
oldAvg := float64(registry.Stats.AvgCompletionTime.Nanoseconds())
newTime := float64(completionTime.Nanoseconds())
newAvg := oldAvg*oldWeight + newTime*newWeight
registry.Stats.AvgCompletionTime = time.Duration(int64(newAvg))
} else {
// First completion, just set the value
registry.Stats.AvgCompletionTime = completionTime
}
}
return nil
}
// Cancel operation
func CancelOperation(registry *OperationRegistry, opID string) error {
// Acquire read lock to get operation
registry.Mutex.RLock()
op, exists := registry.Operations[opID]
registry.Mutex.RUnlock()
if !exists {
return errors.New("operation not found")
}
// Update operation
op.mutex.Lock()
// Check if already completed or cancelled
if op.Status == StatusCompleted || op.Status == StatusFailed || op.Status == StatusCancelled {
op.mutex.Unlock()
return errors.New("operation already completed or cancelled")
}
// Mark as cancelled
op.Cancelled = true
op.Status = StatusCancelled
op.EndTime = time.Now()
op.Error = errors.New("operation cancelled")
// Signal cancellation via done channel
close(op.Done)
op.mutex.Unlock()
// Update registry stats
registry.Mutex.Lock()
registry.Stats.CancelledOperations++
registry.Mutex.Unlock()
return nil
}
// Wait for operation completion with timeout
func WaitForCompletion(op *Operation, timeout time.Duration) (interface{}, error) {
// Set up timer for timeout
timer := time.NewTimer(timeout)
defer timer.Stop()
// Wait for completion or timeout
select {
case <-op.Done:
// Operation completed or cancelled
op.mutex.Lock()
defer op.mutex.Unlock()
if op.Cancelled {
return nil, errors.New("operation cancelled")
}
return op.Result, op.Error
case <-timer.C:
// Timeout occurred
op.mutex.Lock()
if op.Status == StatusPending || op.Status == StatusInProgress {
op.Status = StatusTimedOut
}
op.mutex.Unlock()
return nil, errors.New("operation timeout")
}
}
// Cancel all operations of specific type
func CancelOperationsByType(registry *OperationRegistry, opType OperationType) int {
// Acquire read lock to get operations
registry.Mutex.RLock()
// Find all operations of specified type
opsToCancel := make([]string, 0)
for id, op := range registry.Operations {
if op.Type == opType {
opsToCancel = append(opsToCancel, id)
}
}
registry.Mutex.RUnlock()
// Cancel each operation
cancelCount := 0
for _, id := range opsToCancel {
err := CancelOperation(registry, id)
if err == nil {
cancelCount++
}
}
return cancelCount
}
// Cancel all operations
func CancelAllOperations(registry *OperationRegistry) int {
// Acquire read lock to get all operation IDs
registry.Mutex.RLock()
opsToCancel := make([]string, 0, len(registry.Operations))
for id := range registry.Operations {
opsToCancel = append(opsToCancel, id)
}
registry.Mutex.RUnlock()
// Cancel each operation
cancelCount := 0
for _, id := range opsToCancel {
err := CancelOperation(registry, id)
if err == nil {
cancelCount++
}
}
return cancelCount
}
// Get operation statistics
func GetOperationStats(registry *OperationRegistry) OperationStats {
registry.Mutex.RLock()
defer registry.Mutex.RUnlock()
// Return copy of stats to prevent race conditions
statsCopy := registry.Stats
// Copy map to prevent race conditions
statsCopy.OperationsByType = make(map[OperationType]int64)
for k, v := range registry.Stats.OperationsByType {
statsCopy.OperationsByType[k] = v
}
return statsCopy
}
// Clean up completed operations
func CleanupCompletedOperations(registry *OperationRegistry, maxAge time.Duration) int {
now := time.Now()
cutoffTime := now.Add(-maxAge)
registry.Mutex.Lock()
defer registry.Mutex.Unlock()
removeCount := 0
for id, op := range registry.Operations {
if op.EndTime.Before(cutoffTime) &&
(op.Status == StatusCompleted || op.Status == StatusFailed ||
op.Status == StatusCancelled || op.Status == StatusTimedOut) {
delete(registry.Operations, id)
removeCount++
}
}
return removeCount
}
7. Compression
Purpose: Compress WebSocket messages to reduce bandwidth with memory optimization.
Dependencies: roots-ws Error types
// Core types
type CompressionConfig struct {
Enabled bool
Level int // 0-9, higher means more compression
ContextTakeover bool // Whether to reuse compression context
Threshold int // Messages smaller than this won't be compressed
MemoryLimit int // Max memory to use for compression
CacheTimeout time.Duration // How long to cache prepared messages
}
type CompressionStats struct {
BytesBeforeCompression int64
BytesAfterCompression int64
MessagesCached int64
CacheHits int64
CacheMisses int64
CompressionTime time.Duration
CompressionRatio float64 // Calculated ratio
}
type MessageCache struct {
items map[uint64]*CachedMessage
mutex sync.RWMutex
stats CompressionStats
bufferPool *sync.Pool
}
type CachedMessage struct {
Prepared *websocket.PreparedMessage
Hash uint64
Size int
LastAccess time.Time
Expiration time.Time
}
// Create default compression config
func DefaultConfig() CompressionConfig {
return CompressionConfig{
Enabled: true,
Level: 6, // Medium compression level
ContextTakeover: false, // Disable context takeover to reduce memory
Threshold: 256, // Don't compress small messages
MemoryLimit: 64 * 1024 * 1024, // 64MB memory limit
CacheTimeout: 5 * time.Second, // Cache for 5 seconds
}
}
// Create new message cache for compression efficiency
func NewMessageCache(config CompressionConfig) *MessageCache {
cache := &MessageCache{
items: make(map[uint64]*CachedMessage),
bufferPool: &sync.Pool{
New: func() interface{} {
// Create buffer with reasonable initial size
return bytes.NewBuffer(make([]byte, 0, 4096))
},
},
}
// Start cleanup goroutine
go cache.cleanup(config.CacheTimeout)
return cache
}
// Generate hash for message content
func hashMessage(data []byte) uint64 {
h := fnv.New64a()
h.Write(data)
return h.Sum64()
}
// Set up compression for WebSocket
func ConfigureConnection(conn *websocket.Conn, config CompressionConfig) error {
// Skip if compression not enabled
if !config.Enabled {
return nil
}
// Configure WebSocket compression parameters
parameters := websocket.CompressionOptions{
Level: config.Level,
CompressionLevel: config.Level, // Same value used for both options
ClientNoContextTakeover: !config.ContextTakeover,
ServerNoContextTakeover: !config.ContextTakeover,
}
// Apply compression configuration
return conn.EnableWriteCompression(parameters)
}
// Get or create prepared message with memory optimizations
func GetOrCreatePrepared(cache *MessageCache, messageType int, data []byte, config CompressionConfig) (*websocket.PreparedMessage, error) {
// Don't compress small messages
if len(data) < config.Threshold || !config.Enabled {
// Still create PreparedMessage for consistent API
return websocket.NewPreparedMessage(messageType, data)
}
// Generate hash of message content
hash := hashMessage(data)
// Check cache first
cache.mutex.RLock()
if cachedMsg, exists := cache.items[hash]; exists {
// Update last access time
cachedMsg.LastAccess = time.Now()
cache.mutex.RUnlock()
// Update stats
atomic.AddInt64(&cache.stats.CacheHits, 1)
return cachedMsg.Prepared, nil
}
cache.mutex.RUnlock()
// Cache miss, need to compress
atomic.AddInt64(&cache.stats.CacheMisses, 1)
// Track compression time
startTime := time.Now()
// Get buffer from pool
buf := cache.bufferPool.Get().(*bytes.Buffer)
buf.Reset()
defer cache.bufferPool.Put(buf)
// Create flate writer with memory limit awareness
flateWriter, err := flate.NewWriter(buf, config.Level)
if err != nil {
return nil, err
}
// Write data to compressor
bytesWritten, err := flateWriter.Write(data)
if err != nil {
return nil, err
}
// Close flate writer to flush data
err = flateWriter.Close()
if err != nil {
return nil, err
}
// Track compression stats
atomic.AddInt64(&cache.stats.BytesBeforeCompression, int64(len(data)))
atomic.AddInt64(&cache.stats.BytesAfterCompression, int64(buf.Len()))
// Check if compression actually helped
var msgData []byte
if buf.Len() < len(data) {
// Use compressed data
msgData = make([]byte, buf.Len())
copy(msgData, buf.Bytes())
} else {
// Compression didn't help, use original
msgData = data
}
// Create prepared message
prepared, err := websocket.NewPreparedMessage(messageType, msgData)
if err != nil {
return nil, err
}
// Record compression time
compressionTime := time.Since(startTime)
atomic.AddInt64((*int64)(&cache.stats.CompressionTime), int64(compressionTime))
// Update cache
expirationTime := time.Now().Add(config.CacheTimeout)
cachedMsg := &CachedMessage{
Prepared: prepared,
Hash: hash,
Size: len(data),
LastAccess: time.Now(),
Expiration: expirationTime,
}
// Add to cache
cache.mutex.Lock()
cache.items[hash] = cachedMsg
cache.mutex.Unlock()
// Track cache stats
atomic.AddInt64(&cache.stats.MessagesCached, 1)
// Schedule automatic cleanup
time.AfterFunc(config.CacheTimeout, func() {
cache.mutex.Lock()
defer cache.mutex.Unlock()
// Only remove if still the same item (hash collision possible)
if existing, exists := cache.items[hash]; exists && existing == cachedMsg {
delete(cache.items, hash)
}
})
// Update compression ratio
totalBefore := atomic.LoadInt64(&cache.stats.BytesBeforeCompression)
totalAfter := atomic.LoadInt64(&cache.stats.BytesAfterCompression)
if totalBefore > 0 {
ratio := float64(totalAfter) / float64(totalBefore)
atomic.StoreUint64((*uint64)(&cache.stats.CompressionRatio), math.Float64bits(ratio))
}
return prepared, nil
}
// Cleanup expired cache entries
func (cache *MessageCache) cleanup(interval time.Duration) {
ticker := time.NewTicker(interval / 2)
defer ticker.Stop()
for range ticker.C {
now := time.Now()
cache.mutex.Lock()
for hash, msg := range cache.items {
if now.After(msg.Expiration) {
delete(cache.items, hash)
}
}
cache.mutex.Unlock()
}
}
// Get compression statistics
func (cache *MessageCache) GetStats() CompressionStats {
return cache.stats
}
// Reset cache
func (cache *MessageCache) Reset() {
cache.mutex.Lock()
defer cache.mutex.Unlock()
// Clear all items
cache.items = make(map[uint64]*CachedMessage)
// Reset stats
cache.stats = CompressionStats{}
}
8. Rate Limiting
Purpose: Prevent flooding with token bucket algorithm and adaptive limits.
Dependencies: roots-ws Error types
// Core types
type RateLimitConfig struct {
Enabled bool
MessagesPerSecond float64 // Default rate limit
BurstSize int // Maximum burst allowed
AdaptiveMode bool // Whether to adjust limits dynamically
MinRate float64 // Minimum rate allowed
MaxRate float64 // Maximum rate allowed
CategoryLimits map[string]CategoryLimit // Different limits for message types
}
type CategoryLimit struct {
MessagesPerSecond float64
BurstSize int
Priority int // Priority level (higher = more important)
}
type RateLimiter struct {
config RateLimitConfig
defaultBucket *TokenBucket
categoryBuckets map[string]*TokenBucket
lastAdaptation time.Time
stats RateLimitStats
mutex sync.RWMutex
}
type TokenBucket struct {
Rate float64 // Tokens per second
MaxTokens int // Maximum capacity
AvailableTokens float64 // Current token count
LastRefill time.Time // Last time tokens were added
mutex sync.Mutex
}
type RateLimitStats struct {
AllowedMessages int64
DelayedMessages int64
RejectedMessages int64
CurrentRate float64
AdaptationEvents int64
ThrottleEvents map[string]int64
}
// Create default rate limit config
func DefaultConfig() RateLimitConfig {
return RateLimitConfig{
Enabled: true,
MessagesPerSecond: 10,
BurstSize: 30,
AdaptiveMode: true,
MinRate: 1.0,
MaxRate: 100.0,
CategoryLimits: map[string]CategoryLimit{
"control": {
MessagesPerSecond: 50,
BurstSize: 100,
Priority: 10,
},
"event": {
MessagesPerSecond: 5,
BurstSize: 20,
Priority: 5,
},
"subscription": {
MessagesPerSecond: 2,
BurstSize: 10,
Priority: 3,
},
},
}
}
// Create new rate limiter
func NewLimiter(config RateLimitConfig) *RateLimiter {
// Create default token bucket
defaultBucket := &TokenBucket{
Rate: config.MessagesPerSecond,
MaxTokens: config.BurstSize,
AvailableTokens: float64(config.BurstSize), // Start with full bucket
LastRefill: time.Now(),
}
// Create category buckets
categoryBuckets := make(map[string]*TokenBucket)
for category, limit := range config.CategoryLimits {
categoryBuckets[category] = &TokenBucket{
Rate: limit.MessagesPerSecond,
MaxTokens: limit.BurstSize,
AvailableTokens: float64(limit.BurstSize), // Start with full bucket
LastRefill: time.Now(),
}
}
return &RateLimiter{
config: config,
defaultBucket: defaultBucket,
categoryBuckets: categoryBuckets,
lastAdaptation: time.Now(),
stats: RateLimitStats{
ThrottleEvents: make(map[string]int64),
},
}
}
// Check if operation is allowed by rate limit
func (limiter *RateLimiter) Allow(category string) bool {
// Return true if rate limiting disabled
if !limiter.config.Enabled {
return true
}
// Get appropriate bucket
bucket := limiter.getBucket(category)
// Refill tokens and check availability
bucket.mutex.Lock()
defer bucket.mutex.Unlock()
// Refill tokens based on time elapsed
limiter.refillTokens(bucket)
// Check if bucket has at least one token
if bucket.AvailableTokens >= 1.0 {
// Consume token
bucket.AvailableTokens -= 1.0
// Update stats
atomic.AddInt64(&limiter.stats.AllowedMessages, 1)
return true
}
// No tokens available
atomic.AddInt64(&limiter.stats.RejectedMessages, 1)
// Track category-specific throttling
limiter.mutex.Lock()
limiter.stats.ThrottleEvents[category]++
limiter.mutex.Unlock()
return false
}
// Wait until operation is allowed
func (limiter *RateLimiter) Wait(ctx context.Context, category string) error {
// If rate limiting disabled, return immediately
if !limiter.config.Enabled {
return nil
}
// Get appropriate bucket
bucket := limiter.getBucket(category)
for {
// Check if context cancelled
if ctx.Err() != nil {
return ctx.Err()
}
// Try to allow operation
bucket.mutex.Lock()
// Refill tokens
limiter.refillTokens(bucket)
if bucket.AvailableTokens >= 1.0 {
// Consume token
bucket.AvailableTokens -= 1.0
// Update stats
atomic.AddInt64(&limiter.stats.AllowedMessages, 1)
bucket.mutex.Unlock()
return nil
}
// Calculate time until next token available
timeToNextToken := time.Duration(1.0/bucket.Rate * float64(time.Second))
bucket.mutex.Unlock()
// Update delayed stats
atomic.AddInt64(&limiter.stats.DelayedMessages, 1)
// Wait for token to become available
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(timeToNextToken):
// Try again
}
}
}
// Refill tokens based on elapsed time
func (limiter *RateLimiter) refillTokens(bucket *TokenBucket) {
now := time.Now()
elapsed := now.Sub(bucket.LastRefill).Seconds()
// Calculate tokens to add
tokensToAdd := elapsed * bucket.Rate
// Add tokens to bucket (up to max)
bucket.AvailableTokens = math.Min(float64(bucket.MaxTokens), bucket.AvailableTokens+tokensToAdd)
// Update last refill time
bucket.LastRefill = now
}
// Get appropriate token bucket for category
func (limiter *RateLimiter) getBucket(category string) *TokenBucket {
limiter.mutex.RLock()
defer limiter.mutex.RUnlock()
if bucket, exists := limiter.categoryBuckets[category]; exists {
return bucket
}
return limiter.defaultBucket
}
// Adapt rate limits based on system conditions
func (limiter *RateLimiter) AdaptRateLimits(queueUtilization float64) {
// Skip if adaptive mode disabled
if !limiter.config.Enabled || !limiter.config.AdaptiveMode {
return
}
// Minimum time between adaptations
if time.Since(limiter.lastAdaptation) < 5*time.Second {
return
}
limiter.mutex.Lock()
defer limiter.mutex.Unlock()
// Current rate
currentRate := limiter.defaultBucket.Rate
newRate := currentRate
// Adjust based on queue utilization
if queueUtilization > 0.8 {
// Queue getting full, reduce rate
newRate = math.Max(limiter.config.MinRate, currentRate*0.8)
} else if queueUtilization < 0.2 {
// Queue mostly empty, increase rate
newRate = math.Min(limiter.config.MaxRate, currentRate*1.2)
}
// Only update if significant change
if math.Abs(newRate-currentRate)/currentRate > 0.05 {
// Update default bucket rate
limiter.defaultBucket.mutex.Lock()
limiter.defaultBucket.Rate = newRate
limiter.defaultBucket.mutex.Unlock()
// Track adaptation
atomic.AddInt64(&limiter.stats.AdaptationEvents, 1)
atomic.StoreUint64((*uint64)(&limiter.stats.CurrentRate), math.Float64bits(newRate))
// Update timestamp
limiter.lastAdaptation = time.Now()
}
}
// Get rate limit statistics
func (limiter *RateLimiter) GetStats() RateLimitStats {
limiter.mutex.RLock()
defer limiter.mutex.RUnlock()
// Create copy of stats to prevent race conditions
statsCopy := limiter.stats
// Copy map to prevent race conditions
statsCopy.ThrottleEvents = make(map[string]int64)
for k, v := range limiter.stats.ThrottleEvents {
statsCopy.ThrottleEvents[k] = v
}
return statsCopy
}
9. Message Replay
Purpose: Buffers messages for replay after reconnection with intelligent prioritization.
Dependencies: roots-ws Error types, honeybee Connection
// Core types
type ReplayConfig struct {
Enabled bool
MaxMessages int // Maximum messages to buffer
MaxAge time.Duration // Maximum message age to retain
MaxAttempts int // Maximum replay attempts per message
MaxBatchSize int // Maximum messages to replay at once
BatchDelay time.Duration // Delay between replay batches
}
type MessageBuffer struct {
Config ReplayConfig
Messages []BufferedMessage
Mutex sync.RWMutex
Stats ReplayStats
}
type BufferedMessage struct {
ID string
Content []byte
Timestamp time.Time
Attempts int
Priority int // Higher priority = replay first
MessageType string // Used for selective replay
}
type ReplayStats struct {
MessagesBuffered int64
MessagesReplayed int64
FailedReplays int64
DroppedMessages int64
OldestMessage time.Time
}
// Create default replay config
func DefaultConfig() ReplayConfig {
return ReplayConfig{
Enabled: true,
MaxMessages: 1000,
MaxAge: 5 * time.Minute,
MaxAttempts: 3,
MaxBatchSize: 50,
BatchDelay: 100 * time.Millisecond,
}
}
// Create new message buffer
func NewBuffer(config ReplayConfig) *MessageBuffer {
return &MessageBuffer{
Config: config,
Messages: make([]BufferedMessage, 0, config.MaxMessages/10), // Start smaller, grow as needed
Stats: ReplayStats{},
}
}
// Add message to buffer with priority
func BufferMessageWithPriority(buffer *MessageBuffer, msg []byte, msgType string, priority int) string {
// Skip if buffering disabled
if !buffer.Config.Enabled {
return ""
}
// Acquire write lock
buffer.Mutex.Lock()
defer buffer.Mutex.Unlock()
// Generate unique ID for message
id := generateMessageID()
// Create buffered message
bufferedMsg := BufferedMessage{
ID: id,
Content: cloneBytes(msg), // Clone bytes to avoid external modification
Timestamp: time.Now(),
Attempts: 0,
Priority: priority,
MessageType: msgType,
}
// Check if buffer is full
if len(buffer.Messages) >= buffer.Config.MaxMessages {
// Find lowest priority, oldest message to remove
oldestIdx := 0
lowestPriority := math.MaxInt32
oldestTime := time.Now()
for i, m := range buffer.Messages {
// Lower priority or same priority but older
if m.Priority < lowestPriority ||
(m.Priority == lowestPriority && m.Timestamp.Before(oldestTime)) {
lowestPriority = m.Priority
oldestTime = m.Timestamp
oldestIdx = i
}
}
// Remove lowest priority message
buffer.Messages = append(buffer.Messages[:oldestIdx], buffer.Messages[oldestIdx+1:]...)
// Update stats
buffer.Stats.DroppedMessages++
}
// Add new message to buffer
buffer.Messages = append(buffer.Messages, bufferedMsg)
// Update stats
buffer.Stats.MessagesBuffered++
if buffer.Stats.OldestMessage.IsZero() || bufferedMsg.Timestamp.Before(buffer.Stats.OldestMessage) {
buffer.Stats.OldestMessage = bufferedMsg.Timestamp
}
return id
}
// Clone byte slice to avoid external modification
func cloneBytes(data []byte) []byte {
clone := make([]byte, len(data))
copy(clone, data)
return clone
}
// Generate unique message ID
func generateMessageID() string {
return fmt.Sprintf("msg-%d-%s", time.Now().UnixNano(), generateRandomString(6))
}
// Mark message as sent
func MarkSent(buffer *MessageBuffer, id string) {
// Acquire write lock
buffer.Mutex.Lock()
defer buffer.Mutex.Unlock()
// Find message with matching ID
for i, msg := range buffer.Messages {
if msg.ID == id {
// Remove message from buffer (successful send)
buffer.Messages = append(buffer.Messages[:i], buffer.Messages[i+1:]...)
break
}
}
// Update stats if oldest message removed
if len(buffer.Messages) > 0 {
// Find new oldest
oldest := time.Now()
for _, msg := range buffer.Messages {
if msg.Timestamp.Before(oldest) {
oldest = msg.Timestamp
}
}
buffer.Stats.OldestMessage = oldest
} else {
buffer.Stats.OldestMessage = time.Time{}
}
}
// Get messages to replay, ordered by priority
func GetPendingMessages(buffer *MessageBuffer, msgType string) []BufferedMessage {
// Acquire read lock
buffer.Mutex.RLock()
defer buffer.Mutex.RUnlock()
now := time.Now()
maxAge := now.Add(-buffer.Config.MaxAge)
// Filter messages by max attempts, age, and message type
pending := make([]BufferedMessage, 0, len(buffer.Messages))
for _, msg := range buffer.Messages {
if msg.Attempts < buffer.Config.MaxAttempts &&
msg.Timestamp.After(maxAge) &&
(msgType == "" || msg.MessageType == msgType) {
pending = append(pending, msg)
}
}
// Sort by priority (high to low) and then by age (oldest first)
sort.Slice(pending, func(i, j int) bool {
if pending[i].Priority != pending[j].Priority {
return pending[i].Priority > pending[j].Priority
}
return pending[i].Timestamp.Before(pending[j].Timestamp)
})
return pending
}
// Record replay attempt
func RecordAttempt(buffer *MessageBuffer, id string) {
// Acquire write lock
buffer.Mutex.Lock()
defer buffer.Mutex.Unlock()
// Find message with matching ID
for i := range buffer.Messages {
if buffer.Messages[i].ID == id {
// Increment attempt counter
buffer.Messages[i].Attempts++
break
}
}
}
// Clean expired messages
func CleanExpired(buffer *MessageBuffer) int {
// Acquire write lock
buffer.Mutex.Lock()
defer buffer.Mutex.Unlock()
// Calculate expiration timestamp
now := time.Now()
maxAge := now.Add(-buffer.Config.MaxAge)
// Filter out expired messages
newMessages := make([]BufferedMessage, 0, len(buffer.Messages))
removed := 0
for _, msg := range buffer.Messages {
// Keep message if not expired and not over max attempts
if msg.Timestamp.After(maxAge) && msg.Attempts < buffer.Config.MaxAttempts {
newMessages = append(newMessages, msg)
} else {
removed++
buffer.Stats.DroppedMessages++
}
}
// Update buffer
buffer.Messages = newMessages
// Update oldest message timestamp
if len(newMessages) > 0 {
oldest := now
for _, msg := range newMessages {
if msg.Timestamp.Before(oldest) {
oldest = msg.Timestamp
}
}
buffer.Stats.OldestMessage = oldest
} else {
buffer.Stats.OldestMessage = time.Time{}
}
return removed
}
// Replay pending messages to connection with batching and rate limiting
func ReplayMessages(buffer *MessageBuffer, conn *honeybee.Connection, msgType string) []error {
// Skip if buffering disabled
if !buffer.Config.Enabled {
return nil
}
// Get pending messages
pending := GetPendingMessages(buffer, msgType)
// No messages to replay
if len(pending) == 0 {
return nil
}
// Collect errors
errors := make([]error, 0)
// Process in batches
for i := 0; i < len(pending); i += buffer.Config.MaxBatchSize {
// Calculate end of batch
end := i + buffer.Config.MaxBatchSize
if end > len(pending) {
end = len(pending)
}
// Process batch
batch := pending[i:end]
for _, msg := range batch {
// Send message to connection
select {
case conn.OutChan <- msg.Content:
// Record successful replay
atomic.AddInt64(&buffer.Stats.MessagesReplayed, 1)
// Mark as sent
MarkSent(buffer, msg.ID)
default:
// Channel full or closed
err := errors.New("replay failed: connection channel full or closed")
errors = append(errors, err)
// Record attempt
RecordAttempt(buffer, msg.ID)
atomic.AddInt64(&buffer.Stats.FailedReplays, 1)
}
}
// Delay between batches to prevent overwhelming the connection
if end < len(pending) {
time.Sleep(buffer.Config.BatchDelay)
}
}
return errors
}
// Get replay statistics
func GetReplayStats(buffer *MessageBuffer) ReplayStats {
// Acquire read lock
buffer.Mutex.RLock()
defer buffer.Mutex.RUnlock()
// Return copy of stats
return buffer.Stats
}
10. Connection Statistics
Purpose: Track connection usage metrics with predictive analytics.
Dependencies: honeybee Connection
// Core types
type ConnectionStats struct {
Connected time.Time
MessagesSent int64
MessagesReceived int64
BytesSent int64
BytesReceived int64
Errors int64
AvgLatency time.Duration
LastMessageSent time.Time
LastMessageReceived time.Time
// Advanced metrics
ErrorRates []float64 // Error rates over time periods (1m, 5m, 15m)
ThroughputRates []float64 // Messages per second over time periods
LatencyPercentiles []time.Duration // P50, P90, P99 latencies
ReconnectionCount int
HealthScoreHistory []float64 // Health scores over time
MessageTypeDistribution map[string]int64 // Count by message type
// Quality metrics
QualityScore float64 // 0.0-1.0 overall quality rating
Stability float64 // Connection stability (0.0-1.0)
Performance float64 // Performance rating (0.0-1.0)
Reliability float64 // Reliability rating (0.0-1.0)
// Resource utilization
MemoryUsage int64 // Bytes used by connection
GoroutineCount int // Goroutines associated with connection
// Private tracking
lastCalculation time.Time
latencyHistory []time.Duration
mutex sync.RWMutex
}
// Create new connection stats
func New() *ConnectionStats {
return &ConnectionStats{
Connected: time.Now(),
ErrorRates: make([]float64, 3), // 1m, 5m, 15m rates
ThroughputRates: make([]float64, 3), // 1m, 5m, 15m rates
LatencyPercentiles: make([]time.Duration, 3), // P50, P90, P99
HealthScoreHistory: make([]float64, 60), // Last 60 measurements
MessageTypeDistribution: make(map[string]int64), // Counts by message type
latencyHistory: make([]time.Duration, 0, 100),
lastCalculation: time.Now(),
QualityScore: 1.0, // Start with perfect score
Stability: 1.0,
Performance: 1.0,
Reliability: 1.0,
}
}
// Record message sent with type tracking
func RecordMessageSent(stats *ConnectionStats, size int, messageType string) {
// Atomically increment messages sent counter
atomic.AddInt64(&stats.MessagesSent, 1)
// Atomically add size to bytes sent counter
atomic.AddInt64(&stats.BytesSent, int64(size))
// Update last message sent timestamp
stats.mutex.Lock()
stats.LastMessageSent = time.Now()
// Update message type distribution
stats.MessageTypeDistribution[messageType]++
stats.mutex.Unlock()
// Trigger rate calculation if needed
if time.Since(stats.lastCalculation) > 10*time.Second {
go stats.calculateRates()
}
}
// Record message received with type tracking
func RecordMessageReceived(stats *ConnectionStats, size int, messageType string) {
// Atomically increment messages received counter
atomic.AddInt64(&stats.MessagesReceived, 1)
// Atomically add size to bytes received counter
atomic.AddInt64(&stats.BytesReceived, int64(size))
// Update last message received timestamp
stats.mutex.Lock()
stats.LastMessageReceived = time.Now()
// Update message type distribution
stats.MessageTypeDistribution[messageType]++
stats.mutex.Unlock()
// Trigger rate calculation if needed
if time.Since(stats.lastCalculation) > 10*time.Second {
go stats.calculateRates()
}
}
// Record error with categorization
func RecordError(stats *ConnectionStats, errorType string) {
// Atomically increment error counter
atomic.AddInt64(&stats.Errors, 1)
// Update error rate calculations
go stats.calculateRates()
// Update quality metrics
go stats.updateQualityMetrics()
}
// Record latency measurement
func RecordLatency(stats *ConnectionStats, latency time.Duration) {
stats.mutex.Lock()
// Add to latency history
stats.latencyHistory = append(stats.latencyHistory, latency)
if len(stats.latencyHistory) > 100 {
// Keep history bounded
stats.latencyHistory = stats.latencyHistory[1:]
}
// Calculate exponential moving average for average latency
if stats.AvgLatency == 0 {
stats.AvgLatency = latency
} else {
// 95% old value, 5% new value
oldWeight := 0.95
newWeight := 0.05
oldAvg := float64(stats.AvgLatency.Nanoseconds())
newLatency := float64(latency.Nanoseconds())
newAvg := oldAvg*oldWeight + newLatency*newWeight
stats.AvgLatency = time.Duration(int64(newAvg))
}
stats.mutex.Unlock()
// Recalculate percentiles if we have enough data
if len(stats.latencyHistory) >= 10 {
go stats.calculateLatencyPercentiles()
}
// Update quality metrics
go stats.updateQualityMetrics()
}
// Record reconnection event
func RecordReconnection(stats *ConnectionStats) {
stats.mutex.Lock()
// Increment reconnection counter
stats.ReconnectionCount++
// Reset certain metrics after reconnection
stats.lastCalculation = time.Now()
stats.mutex.Unlock()
// Update quality metrics
go stats.updateQualityMetrics()
}
// Record health score
func RecordHealthScore(stats *ConnectionStats, score float64) {
stats.mutex.Lock()
// Shift history and add new score
copy(stats.HealthScoreHistory[1:], stats.HealthScoreHistory)
stats.HealthScoreHistory[0] = score
stats.mutex.Unlock()
// Update quality metrics
go stats.updateQualityMetrics()
}
// Calculate throughput and error rates
func (stats *ConnectionStats) calculateRates() {
stats.mutex.Lock()
defer stats.mutex.Unlock()
now := time.Now()
elapsed := now.Sub(stats.lastCalculation).Seconds()
if elapsed < 1.0 {
return // Avoid too frequent calculations
}
// Calculate total messages
totalMessages := stats.MessagesSent + stats.MessagesReceived
// Calculate messages per second for 1m window
rate := float64(totalMessages) / elapsed
stats.ThroughputRates[0] = rate
// Calculate error rate for 1m window
if totalMessages > 0 {
errorRate := float64(stats.Errors) / float64(totalMessages)
stats.ErrorRates[0] = errorRate
}
// Update 5m and 15m rates with exponential decay
// 5m rate: 63% new, 37% old
stats.ThroughputRates[1] = 0.63*rate + 0.37*stats.ThroughputRates[1]
stats.ErrorRates[1] = 0.63*stats.ErrorRates[0] + 0.37*stats.ErrorRates[1]
// 15m rate: 28% new, 72% old
stats.ThroughputRates[2] = 0.28*rate + 0.72*stats.ThroughputRates[2]
stats.ErrorRates[2] = 0.28*stats.ErrorRates[0] + 0.72*stats.ErrorRates[2]
// Update last calculation time
stats.lastCalculation = now
}
// Calculate latency percentiles
func (stats *ConnectionStats) calculateLatencyPercentiles() {
stats.mutex.Lock()
defer stats.mutex.Unlock()
// Need at least a few measurements
if len(stats.latencyHistory) < 5 {
return
}
// Create sorted copy of latency history
sortedLatencies := make([]time.Duration, len(stats.latencyHistory))
copy(sortedLatencies, stats.latencyHistory)
sort.Slice(sortedLatencies, func(i, j int) bool {
return sortedLatencies[i] < sortedLatencies[j]
})
// Calculate p50 (median)
p50Index := len(sortedLatencies) / 2
stats.LatencyPercentiles[0] = sortedLatencies[p50Index]
// Calculate p90
p90Index := int(float64(len(sortedLatencies)) * 0.9)
if p90Index >= len(sortedLatencies) {
p90Index = len(sortedLatencies) - 1
}
stats.LatencyPercentiles[1] = sortedLatencies[p90Index]
// Calculate p99
p99Index := int(float64(len(sortedLatencies)) * 0.99)
if p99Index >= len(sortedLatencies) {
p99Index = len(sortedLatencies) - 1
}
stats.LatencyPercentiles[2] = sortedLatencies[p99Index]
}
// Update quality metrics
func (stats *ConnectionStats) updateQualityMetrics() {
stats.mutex.Lock()
defer stats.mutex.Unlock()
// Calculate stability based on reconnections and error rates
uptime := time.Since(stats.Connected).Minutes()
reconnRate := 0.0
if uptime > 0 {
reconnRate = float64(stats.ReconnectionCount) / uptime
}
// More than 1 reconnection per minute is bad
reconnFactor := math.Min(1.0, reconnRate)
errorFactor := stats.ErrorRates[0] // 1m error rate
stats.Stability = 1.0 - (0.6*reconnFactor + 0.4*errorFactor)
if stats.Stability < 0 {
stats.Stability = 0
}
// Calculate performance based on latency
var perfScore float64 = 1.0
if stats.AvgLatency > 0 {
// Latency score: 1.0 for <50ms, 0.0 for >500ms
avgLatencyMs := float64(stats.AvgLatency.Milliseconds())
if avgLatencyMs < 50 {
perfScore = 1.0
} else if avgLatencyMs > 500 {
perfScore = 0.0
} else {
// Linear scale between 50ms and 500ms
perfScore = 1.0 - (avgLatencyMs-50.0)/450.0
}
}
stats.Performance = perfScore
// Calculate reliability based on health scores
var healthSum float64
healthCount := 0
for _, score := range stats.HealthScoreHistory {
if score > 0 {
healthSum += score
healthCount++
}
}
if healthCount > 0 {
stats.Reliability = healthSum / float64(healthCount)
} else {
stats.Reliability = 1.0
}
// Calculate overall quality score
stats.QualityScore = 0.4*stats.Stability + 0.3*stats.Performance + 0.3*stats.Reliability
// Clamp to 0.0-1.0 range
if stats.QualityScore < 0.0 {
stats.QualityScore = 0.0
} else if stats.QualityScore > 1.0 {
stats.QualityScore = 1.0
}
}
// Get uptime
func GetUptime(stats *ConnectionStats) time.Duration {
return time.Since(stats.Connected)
}
// Get messages per second
func GetMessagesPerSecond(stats *ConnectionStats) float64 {
stats.mutex.RLock()
defer stats.mutex.RUnlock()
return stats.ThroughputRates[0]
}
// Get health trend
func GetHealthTrend(stats *ConnectionStats) float64 {
stats.mutex.RLock()
defer stats.mutex.RUnlock()
// Need enough history
if len(stats.HealthScoreHistory) < 5 {
return 0.0
}
// Calculate linear regression slope
n := 0
sumX := 0.0
sumY := 0.0
sumXY := 0.0
sumXX := 0.0
for i, score := range stats.HealthScoreHistory {
if score <= 0 {
continue
}
x := float64(i)
y := score
sumX += x
sumY += y
sumXY += x * y
sumXX += x * x
n++
}
if n < 3 {
return 0.0
}
// Calculate slope
nf := float64(n)
slope := (nf*sumXY - sumX*sumY) / (nf*sumXX - sumX*sumX)
// Normalize to -1.0 to 1.0 range
// Negative means declining health
// Positive means improving health
avgHealth := sumY / nf
if avgHealth != 0 {
slope = slope / avgHealth * 10.0 // Scale factor
}
// Clamp to range
if slope > 1.0 {
slope = 1.0
} else if slope < -1.0 {
slope = -1.0
}
return slope
}
// Reset stats
func Reset(stats *ConnectionStats) {
stats.mutex.Lock()
defer stats.mutex.Unlock()
// Keep connection time and reconnection count
connectedTime := stats.Connected
reconnCount := stats.ReconnectionCount
// Create new stats with same connection time
newStats := New()
newStats.Connected = connectedTime
newStats.ReconnectionCount = reconnCount
// Copy all fields
*stats = *newStats
}
// Generate statistics report
func GenerateReport(stats *ConnectionStats) map[string]interface{} {
stats.mutex.RLock()
defer stats.mutex.RUnlock()
// Calculate uptime
uptime := time.Since(stats.Connected)
// Create report
report := map[string]interface{}{
"uptime_seconds": uptime.Seconds(),
"messages_sent": stats.MessagesSent,
"messages_received": stats.MessagesReceived,
"bytes_sent": stats.BytesSent,
"bytes_received": stats.BytesReceived,
"errors": stats.Errors,
"reconnections": stats.ReconnectionCount,
"average_latency_ms": stats.AvgLatency.Milliseconds(),
"throughput": map[string]float64{
"messages_per_second_1m": stats.ThroughputRates[0],
"messages_per_second_5m": stats.ThroughputRates[1],
"messages_per_second_15m": stats.ThroughputRates[2],
},
"error_rates": map[string]float64{
"error_rate_1m": stats.ErrorRates[0],
"error_rate_5m": stats.ErrorRates[1],
"error_rate_15m": stats.ErrorRates[2],
},
"latency": map[string]int64{
"p50_ms": stats.LatencyPercentiles[0].Milliseconds(),
"p90_ms": stats.LatencyPercentiles[1].Milliseconds(),
"p99_ms": stats.LatencyPercentiles[2].Milliseconds(),
},
"quality": map[string]float64{
"overall_score": stats.QualityScore,
"stability": stats.Stability,
"performance": stats.Performance,
"reliability": stats.Reliability,
},
"health_trend": GetHealthTrend(stats),
}
// Add message type distribution
msgTypes := make(map[string]int64)
for typ, count := range stats.MessageTypeDistribution {
msgTypes[typ] = count
}
report["message_types"] = msgTypes
return report
}
Architecture Decision Records
ADR 1: Specialized Extensions Model
Context: Need a way to provide advanced functionality without bloating the core.
Decision: Implement specialized, independent extensions that build on honeybee but can be used selectively.
Rationale:
- Allows applications to choose only the extensions they need
- Prevents complexity from leaking into core functionality
- Creates clear boundaries between essential and optional features
- Enables specialized optimizations for specific use cases
- Follows the principle of progressive enhancement
- Enables à la carte adoption of advanced features
- Preserves compatibility with honeybee-based applications
Consequences:
- Increases total API surface area across multiple packages
- Improves application performance by avoiding unnecessary code
- Creates clearer documentation of advanced vs. basic functionality
- May require some duplication between extensions
- Allows independent evolution of each extension
- All extensions implement corresponding honeybee interfaces
- Extensions can be adopted individually without requiring full migration
ADR 2: High-Performance Focus
Context: Advanced applications have different performance requirements than basic ones.
Decision: Optimize mana-ws extensions for high-performance and high-volume scenarios.
Rationale:
- Addresses needs of applications with demanding requirements
- Provides optimizations that would be overkill for simple applications
- Creates clear upgrade path as application needs grow
- Allows fine-tuning for specific deployment environments
- Targets use cases where performance is critical
Consequences:
- Increases implementation complexity
- Requires more sophisticated testing
- Creates more configuration options
- Enables applications to scale to higher loads
- May have higher resource usage in some cases
ADR 3: Advanced State Management
Context: Complex applications need sophisticated state tracking beyond basic connection management.
Decision: Implement comprehensive state tracking with analytics and prediction capabilities.
Rationale:
- Provides visibility into connection behavior for debugging
- Enables predictive adjustments to improve reliability
- Creates foundation for adaptive optimization
- Supports monitoring and alerting in production environments
- Helps identify patterns in connection behavior
Consequences:
- Increases memory and CPU usage for tracking
- Provides valuable insights for complex applications
- May require tuning to balance tracking overhead
- Creates more complex internal state
- Enables more sophisticated recovery strategies
ADR 4: Adaptive Algorithms
Context: Static configurations don't work well across diverse network conditions.
Decision: Implement adaptive algorithms that adjust based on observed conditions.
Rationale:
- Improves performance across varying network environments
- Reduces need for manual tuning
- Creates more resilient connections
- Adjusts automatically to changing conditions
- Prevents degradation under stress
Consequences:
- Increases algorithm complexity
- Makes behavior less predictable in some cases
- Improves overall reliability
- Requires careful testing across diverse conditions
- May cause unexpected adaptation in edge cases
ADR 5: Pluggable Architecture
Context: Different applications need different combinations of advanced features.
Decision: Design extensions to be independently usable rather than as a monolithic advanced layer.
Rationale:
- Allows applications to choose only what they need
- Prevents unnecessary code inclusion
- Simplifies testing of individual features
- Enables incremental adoption of advanced features
- Follows the principle of separation of concerns
Consequences:
- Requires clear documentation of dependencies between extensions
- Increases number of import statements in application code
- Improves application size and performance
- May lead to some duplication between extensions
- Allows targeted updates to specific extensions
ADR 6: Detailed Analytics
Context: Understanding connection behavior is important for optimization.
Decision: Implement comprehensive statistics tracking and reporting.
Rationale:
- Provides visibility into connection performance
- Enables data-driven optimization
- Supports monitoring and alerting
- Helps identify problems before they become critical
- Creates foundation for automatic tuning
Consequences:
- Increases overhead of tracking and calculation
- Provides valuable insights for complex applications
- May require storage considerations for historical data
- Creates more observability in production
- Requires balancing detail with performance impact
ADR 7: Drop-in Component Compatibility
Context: Need to determine how mana-ws components relate to their honeybee counterparts.
Decision: Design mana-ws components to be drop-in replacements for honeybee components, implementing the same interfaces while providing additional capabilities through extended interfaces.
Rationale:
- Enables incremental adoption of advanced features
- Allows selective enhancement of specific components
- Preserves investment in existing application code
- Creates clear upgrade paths as requirements evolve
- Follows the principle of progressive enhancement
- Respects the user's choice of complexity level
- Supports targeted optimization of performance bottlenecks
Consequences:
- All mana-ws components must implement honeybee interfaces
- Need careful design of extended interfaces
- May require adapter patterns in some cases
- Enables feature detection through interface assertions
- Simplifies migration between implementations
- Requires comprehensive compatibility testing
ADR 8: Progressive Cognitive Load
Context: Need to consider how developers learn and adopt the library ecosystem as their applications evolve.
Decision: Design mana-ws to support progressive learning and implementation, allowing developers to adopt advanced features incrementally as their applications mature, without requiring upfront understanding of all capabilities.
Rationale:
- Aligns with natural learning progression
- Supports symbiotic evolution of applications and usage patterns
- Reduces initial barrier to entry
- Enables experimentation with advanced features in isolation
- Creates natural documentation hierarchy from basic to advanced
- Matches complexity to application maturity
- Respects developer time and cognitive resources
Consequences:
- Features must be designed for independent adoption
- Documentation should follow progressive disclosure principles
- Examples needed for common upgrade scenarios
- May increase number of configuration options
- Creates more natural onboarding experience
- Allows focused learning on immediately relevant features
- Supports both beginners and advanced users effectively