diff --git a/%5BRoadmap%5D-Nostr-Websocket-Library-Stack.md b/%5BRoadmap%5D-Nostr-Websocket-Library-Stack.md new file mode 100644 index 0000000..becc7d0 --- /dev/null +++ b/%5BRoadmap%5D-Nostr-Websocket-Library-Stack.md @@ -0,0 +1,4775 @@ +# Overall Nostr Websocket Data Transport Stack + +## Architecture Overview + +The Nostr WebSocket Data Transport Stack provides a layered approach to implementing the Nostr protocol over WebSockets. The stack consists of three complementary libraries that can be used independently or together, depending on application requirements: + +1. **`roots-ws`**: A foundation layer that provides pure functional primitives for working with Nostr protocol messages. This layer focuses on time-independent operations with no event loops or state management, implementing core message validation, serialization, and routing logic. + +2. **`honeybee`**: A practical implementation layer that builds on `roots-ws` to provide essential WebSocket transport functionality. This layer adds connection management, reconnection handling, and subscription management with appropriate event loops and state tracking. + +3. **`mana-ws`**: An extensions layer that provides specialized, optional functionality for advanced use cases. This layer implements sophisticated features like connection pooling, message compression, rate limiting, and detailed statistics tracking. + +The architecture follows these key principles: + +- **Pure Core**: The foundation is built on pure functions with minimal side effects +- **Composition Over Inheritance**: Higher layers compose functionality from lower layers rather than extending them +- **Progressive Enhancement**: Applications can start with just `roots-ws` and add higher layers as needs evolve +- **Functional Paradigm**: All layers use a functional approach with explicit state management +- **Clear Separation of Concerns**: Each layer has a distinct focus and responsibility + +This layered approach allows applications to choose the appropriate level of functionality while maintaining a consistent programming model across the stack. + +## Architecture Decision Records + +### ADR 1: Layered Architecture with Unidirectional Dependencies + +**Context**: Need to organize WebSocket transport functionality to support different levels of application complexity. + +**Decision**: Split functionality into three layers (`roots-ws`, `honeybee`, `mana-ws`) with unidirectional dependencies. + +**Rationale**: +- Allows applications to use only the functionality they need +- Creates natural boundaries between pure functionality and stateful behavior +- Promotes focused testing and simplified maintenance +- Enables incremental adoption for applications with evolving needs +- Follows the principle of separation of concerns + +**Consequences**: +- Requires careful API design to ensure layers compose well +- May lead to some duplication between layers for independence +- Documentation must clearly explain the role of each layer +- Testing becomes more modular and focused on each layer's responsibilities + +### ADR 2: Functional Programming Paradigm Across Layers + +**Context**: Need a consistent programming model that works well across all layers. + +**Decision**: Adopt functional programming patterns with namespaced functions operating on explicit data structures. + +**Rationale**: +- Creates a consistent mental model across all layers +- Improves testability through pure functions and explicit state +- Reduces coupling between components +- Aligns with Go's design philosophy +- Makes state management explicit and traceable + +**Consequences**: +- May be less familiar to developers used to OOP +- Requires clear documentation on function usage patterns +- Improves composability of library components +- Makes state changes and side effects more visible + +### ADR 3: Transport-Only Focus with No Protocol Semantics + +**Context**: Need to define the scope of the transport stack relative to other Nostr libraries. + +**Decision**: Focus exclusively on transport concerns, deferring protocol semantics to other libraries. + +**Rationale**: +- Creates a clear separation between transport and protocol semantics +- Allows the transport to be used with different protocol implementations +- Follows single responsibility principle +- Reduces coupling between transport and application logic +- Enables testing of transport independently from protocol semantics + +**Consequences**: +- Applications need to compose transport with protocol libraries +- Requires clear API boundaries between transport and protocol concerns +- May require additional integration code in some applications +- Simplifies evolution of transport and protocol independently + +### ADR 4: Explicit Role-Based Connection Model + +**Context**: Nostr protocol has inherent asymmetry in connection patterns. + +**Decision**: Model connections explicitly as either initiator or responder roles rather than using client/relay terminology. + +**Rationale**: +- Accurately represents the connection control model +- Avoids confusion with higher-level client/relay concepts +- Focuses on the transport protocol's communication pattern +- Creates clear API boundaries for each connection role +- Allows specialized behavior appropriate to each role + +**Consequences**: +- API must make it explicit which operations are permitted for each role +- Requires role-specific implementations of some functionality +- May require additional validation to prevent role violations +- Improves clarity of connection behavior expectations + +### ADR 5: Tiered Configuration Strategy + +**Context**: Different components require different levels of configurability. + +**Decision**: Implement a tiered configuration approach with defaults, profiles, component-level options, and advanced parameters. + +**Rationale**: +- Makes simple use cases simple with zero configuration +- Provides appropriate control for advanced users +- Organizes configuration into logical groupings +- Prevents configuration complexity from leaking into basic usage +- Allows applications to tune specific aspects without understanding all options + +**Consequences**: +- Requires careful selection of default values +- Increases implementation complexity for configuration management +- Improves documentation by organizing options into logical tiers +- Enables incremental configuration as application needs grow +- May require configuration builders to maintain clean interfaces + +# `roots-ws` + +## Functional Architecture + +### 1. Core Types + +**Purpose**: Defines basic types for WebSocket connections and message handling. + +```go +// Core types +type ConnectionStatus int + +const ( + // StatusDisconnected indicates the connection is not active and no connection attempt is in progress. + StatusDisconnected ConnectionStatus = iota + + // StatusConnecting indicates a connection attempt is currently in progress but not yet established. + StatusConnecting + + // StatusConnected indicates the connection is active and ready for message exchange. + StatusConnected + + // StatusClosing indicates the connection is in the process of shutting down gracefully. + StatusClosing + + // StatusClosed indicates the connection has been fully closed with a clean shutdown. + StatusClosed + + // StatusError indicates the connection has failed and is in an error state. + StatusError + + // StatusBackoff indicates a temporary disconnected state during a reconnection attempt. + StatusBackoff + + // StatusAwaiting indicates waiting for preconditions before attempting connection. + StatusAwaiting + + // StatusUpgrading indicates WebSocket protocol upgrade in progress. + StatusUpgrading +) +``` + +### 2. Envelope Handling + +**Purpose**: Provides a type and functions for working with Nostr protocol messages. + +```go +// Core envelope type +type Envelope []byte + +// Get message label +func GetLabel(env Envelope) (string, error) { + // Parse JSON to extract first element as string (label) + // Handle errors if data isn't valid JSON or doesn't contain a label +} + +// Get standard message labels +func GetStandardLabels() map[string]struct{} { + // Return map with standard Nostr message types: + // EVENT, REQ, CLOSE, CLOSED, EOSE, NOTICE, OK, AUTH +} + +// Check if label is standard +func IsStandardLabel(label string) bool { + // Check against list of standard Nostr message labels +} +``` + +### 3. Envelope Creation Functions + +**Purpose**: Creates standard Nostr protocol messages using pure functions. + +```go +// Create EVENT envelope for publishing events +func EncloseEvent(event []byte) Envelope { + // Create array with "EVENT" and event object + // Return as JSON bytes +} + +// Create EVENT envelope for subscription events +func EncloseSubscriptionEvent(subID string, event []byte) Envelope { + // Create array with "EVENT", subID, and event + // Return as JSON bytes +} + +// Create REQ envelope +func EncloseReq(subID string, filters [][]byte) Envelope { + // Create array with "REQ", subID, and filters + // Return as JSON bytes +} + +// Create CLOSE envelope +func EncloseClose(subID string) Envelope { + // Create array with "CLOSE" and subID + // Return as JSON bytes +} + +// Create CLOSED envelope +func EncloseClosed(subID string, message string) Envelope { + // Create array with "CLOSED", subID, and message + // Return as JSON bytes +} + +// Create NOTICE envelope +func EncloseNotice(message string) Envelope { + // Create array with "NOTICE" and message + // Return as JSON bytes +} + +// Create EOSE envelope +func EncloseEOSE(subID string) Envelope { + // Create array with "EOSE" and subID + // Return as JSON bytes +} + +// Create OK envelope +func EncloseOK(eventID string, status bool, message string) Envelope { + // Create array with "OK", eventID, status, and message + // Return as JSON bytes +} + +// Create AUTH challenge envelope +func EncloseAuthChallenge(challenge string) Envelope { + // Create array with "AUTH" and challenge + // Return as JSON bytes +} + +// Create AUTH response envelope +func EncloseAuthResponse(event []byte) Envelope { + // Create array with "AUTH" and event + // Return as JSON bytes +} +``` + +### 4. Envelope Parsing Functions + +**Purpose**: Extracts data from standard Nostr protocol messages. + +```go +// Helper functions for validation +func CheckArrayLength(arr []json.RawMessage, minLen int) error { + // Ensure array has at least the minimum required elements +} + +func CheckLabel(got, want string) error { + // Verify that envelope label matches expected value +} + +func ParseElement(element json.RawMessage, value interface{}, position string) error { + // Unmarshal array element into provided value with error handling +} + +// Extract event from EVENT envelope +func FindEvent(env Envelope) ([]byte, error) { + // Parse message as JSON array + // Validate array has correct structure for EVENT + // Extract event object + // Return event as JSON bytes +} + +// Extract subscription event +func FindSubscriptionEvent(env Envelope) (subID string, event []byte, err error) { + // Parse message as JSON array + // Validate array has correct structure for subscription EVENT + // Extract subscription ID and event + // Return both with appropriate error handling +} + +// Extract subscription request +func FindReq(env Envelope) (subID string, filters [][]byte, err error) { + // Parse message as JSON array + // Validate array has correct structure for REQ + // Extract subscription ID and filters + // Return both with appropriate error handling +} + +// Extract OK response +func FindOK(env Envelope) (eventID string, status bool, message string, err error) { + // Parse message as JSON array + // Validate array has correct structure for OK + // Extract eventID, status, and message + // Return all with appropriate error handling +} + +// Extract EOSE message +func FindEOSE(env Envelope) (subID string, err error) { + // Parse message as JSON array + // Validate array has correct structure for EOSE + // Extract subscription ID + // Return with appropriate error handling +} + +// Extract CLOSE message +func FindClose(env Envelope) (subID string, err error) { + // Parse message as JSON array + // Validate array has correct structure for CLOSE + // Extract subscription ID + // Return with appropriate error handling +} + +// Extract CLOSED message +func FindClosed(env Envelope) (subID string, message string, err error) { + // Parse message as JSON array + // Validate array has correct structure for CLOSED + // Extract subscription ID and message + // Return both with appropriate error handling +} + +// Extract NOTICE message +func FindNotice(env Envelope) (message string, err error) { + // Parse message as JSON array + // Validate array has correct structure for NOTICE + // Extract message + // Return with appropriate error handling +} + +// Extract AUTH challenge +func FindAuthChallenge(env Envelope) (challenge string, err error) { + // Parse message as JSON array + // Validate array has correct structure for AUTH challenge + // Extract challenge + // Return with appropriate error handling +} + +// Extract AUTH response +func FindAuthResponse(env Envelope) (event []byte, err error) { + // Parse message as JSON array + // Validate array has correct structure for AUTH response + // Extract event + // Return with appropriate error handling +} +``` + +### 5. Error Types + +**Purpose**: Define error types for the library. + +```go +// Data Structure Errors +var ( + InvalidJSON = errors.New("invalid JSON") + MissingField = errors.New("missing required field") + WrongFieldType = errors.New("wrong field type") +) + +// Envelope Errors +var ( + InvalidEnvelope = errors.New("invalid envelope format") + WrongEnvelopeLabel = errors.New("wrong envelope label") +) +``` + +## Architecture Decision Records + +### ADR 1: Pure Functional Approach + +**Context**: Need to define the programming paradigm for the foundation layer. + +**Decision**: Implement `roots-ws` as a pure functional library with no side effects or state management. + +**Rationale**: +- Creates a solid foundation for higher-level libraries to build upon +- Maximizes testability and predictability +- Eliminates concurrency issues at the foundation level +- Makes function composition more natural and explicit +- Aligns with functional programming best practices + +**Consequences**: +- Cannot include any event loops or timers in this layer +- Must pass all state explicitly through function parameters +- All I/O operations must be handled by higher layers +- Library will feel incomplete when used standalone +- Function signatures will be more explicit about dependencies + +### ADR 2: Raw JSON Handling + +**Context**: Need to decide how message content should be handled. + +**Decision**: Process messages as raw JSON at this layer, deferring semantic validation to consumers. + +**Rationale**: +- Avoids unnecessary serialization/deserialization cycles +- Maintains flexibility for consumers in how they process content +- Prevents tight coupling to specific JSON libraries +- Aligns with the transport-only focus of the library +- Makes message transformation more explicit + +**Consequences**: +- Cannot validate message content semantics, only structure +- Consumers need to implement their own content parsing +- Performance benefits from reduced parsing overhead +- More flexibility for consumers to use different JSON libraries +- Must clearly document format expectations + +### ADR 3: Explicit Error Types + +**Context**: Error handling approach needs to be defined. + +**Decision**: Define explicit error types rather than using generic errors. + +**Rationale**: +- Allows consumers to handle different error scenarios appropriately +- Creates clear documentation of possible failure modes +- Enables selective recovery strategies based on error type +- Improves debugging by providing specific error information +- Creates a consistent error handling pattern + +**Consequences**: +- Increases code surface area with many error definitions +- Requires discipline to use the right error types +- Simplifies error handling for consumers +- Makes error handling expectations explicit in documentation +- May lead to error handling anti-patterns if not well documented + +### ADR 4: No Dependencies + +**Context**: Need to determine dependency strategy for the foundation layer. + +**Decision**: `roots-ws` should have no dependencies outside the standard library. + +**Rationale**: +- Eliminates dependency conflicts for consumers +- Reduces maintenance burden from external API changes +- Creates a more stable foundation for higher layers +- Simplifies deployment and versioning +- Makes clear boundaries around functionality + +**Consequences**: +- May need to implement functionality available in external libraries +- Limited to standard library capabilities +- Reduces feature bloat by forcing simplicity +- Improves long-term stability +- May require more code than using external libraries + +### ADR 5: Language-Specific Implementations + +**Context**: Need to support multiple programming languages. + +**Decision**: Provide idiomatic implementations for each supported language rather than a universal API. + +**Rationale**: +- Allows each language implementation to follow best practices for that language +- Creates more natural APIs for consumers in each language +- Avoids awkward abstractions to force uniformity +- Enables leveraging language-specific features +- Improves adoption by feeling native to each language ecosystem + +**Consequences**: +- Implementations will diverge in details across languages +- Requires maintaining multiple codebases +- Better developer experience in each language +- More challenging to maintain consistency in behavior +- More flexibility to evolve each implementation independently + +### ADR 6: Exported Helper Functions + +**Context**: Need to decide which helper functions should be part of the public API to enable extensibility. + +**Decision**: Export core helper functions (`CheckArrayLength`, `CheckLabel`, `ParseElement`) that enable consumers to create their own envelope types consistent with the library's patterns. + +**Rationale**: + +- Allows consumers to extend the library with custom envelope types +- Ensures consistency in validation and error handling across custom and built-in envelopes +- Enables protocol evolution without requiring library updates +- Follows the principle of providing tools rather than just finished products +- Creates a flexible extension point that preserves architectural integrity + +**Consequences**: + +- Increases the public API surface area +- Requires careful documentation of helper function behaviors and expectations +- Makes breaking changes to helpers more impactful +- Creates shared validation patterns across the ecosystem +- Encourages protocol innovation while maintaining implementation consistency + +# `honeybee` + +## Functional Architecture + +### 1. Connection Management + +**Purpose**: Manages WebSocket connection lifecycle with event loops. + +**Dependencies**: `roots-ws` Connection types, Error types + +```go +package connection + +// Core connection type +type Connection struct { + Status ConnectionStatus + URI string + InChan chan []byte // Socket → Application + OutChan chan []byte // Application → Socket + ErrChan chan error // Internal errors + socket *websocket.Conn + closeOnce sync.Once // Ensures single close execution + + // Component references + ReconnectManager *reconnection.Manager // Optional reconnection handling + HealthMonitor *health.Monitor // Optional health monitoring + RateLimiter *ratelimit.Limiter // Optional rate limiting + CompressionEnabled bool // Whether compression is enabled + Metrics *metrics.Collector // Connection metrics tracking + + done chan struct{} // Signal to shut down goroutines +} + +// Configuration with sensible defaults +type Config struct { + // Connection timeout + ConnectionTimeout time.Duration + + // Component configurations + ReconnectConfig *reconnection.Config // Optional reconnection config + HealthConfig *health.Config // Optional health monitoring config + RateLimitConfig *ratelimit.Config // Optional rate limiting config + CompressionConfig *compression.Config // Optional compression config + MetricsEnabled bool // Whether to collect metrics + + // Message limits + MaxMessageSize int +} + +// Connection status values +type ConnectionStatus int + +const ( + StatusDisconnected ConnectionStatus = iota + StatusConnecting + StatusConnected + StatusClosing + StatusClosed + StatusError + StatusBackoff +) + +// Create new connection in disconnected state +func New(uri string, config Config) *Connection { + // Initialize connection with channels and disconnected status + // Initialize optional components if configs are provided + // Initialize metrics collector if enabled + // Return initialized connection +} + +// Connect to websocket endpoint (for initiator) +func Connect(conn *Connection) error { + // Set connection status to StatusConnecting + // Create dialer with connection timeout from config + // Set up compression options if enabled + // Establish websocket connection with timeout + // Start reader and writer goroutines + // Initialize health monitoring if configured + // Set connection status to StatusConnected on success +} + +// Accept existing websocket (for responder) +func FromSocket(socket *websocket.Conn, config Config) *Connection { + // Create connection from existing socket + // Start reader and writer goroutines + // Initialize health monitoring if configured + // Set up rate limiting if configured + // Enable compression if configured + // Initialize metrics if enabled +} + +// Close connection +func Close(conn *Connection) error { + // Safely close the websocket using closeOnce + // Clean up goroutines and channels + // Record close event in metrics if enabled +} + +// Get current connection state +func Status(conn *Connection) ConnectionStatus { + // Return current status +} + +// Internal reader goroutine +func startReader(conn *Connection) { + // Read envelopes from socket + // Apply rate limiting if enabled + // Record metrics if enabled + // Send to InChan + // Handle errors via ErrChan +} + +// Internal writer goroutine +func startWriter(conn *Connection) { + // Read from OutChan + // Apply compression if enabled + // Record metrics if enabled + // Write envelopes to socket + // Handle errors via ErrChan +} + +// Handle onclose event from websocket +func setupCloseHandler(conn *Connection) { + // Set close handler on websocket + // Send close error through ErrChan + // Update connection status + // Record disconnection in metrics if enabled + // Trigger reconnection if enabled +} +``` + +### 2. Reconnection Management + +**Purpose**: Handles automatic reconnection with exponential backoff. + +**Dependencies**: `roots-ws` Error types + +```go +package reconnection + +// Reconnection configuration with sensible defaults +type Config struct { + Enabled bool + MaxAttempts int // Maximum number of reconnection attempts (0 for unlimited) + InitialDelay time.Duration // Default: 1 second + MaxDelay time.Duration // Default: 30 seconds + BackoffFactor float64 // Default: 2.0 (doubles delay each attempt) + JitterFactor float64 // Default: 0.2 (adds ±20% randomization) +} + +// Default reconnection configuration +func DefaultConfig() *Config { + return &Config{ + Enabled: true, + MaxAttempts: 5, + InitialDelay: 1 * time.Second, + MaxDelay: 30 * time.Second, + BackoffFactor: 2.0, + JitterFactor: 0.2, + } +} + +// Reconnection manager handles retry logic +type Manager struct { + Config *Config + attempts int + lastAttempt time.Time + callbacks []func() // Functions to call on successful reconnect +} + +// Create new reconnection manager +func NewManager(config *Config) *Manager { + // Use default config if none provided + // Initialize manager with zero attempts + // Return initialized manager +} + +// Register reconnection callback +func (m *Manager) OnReconnect(callback func()) { + // Add callback to callbacks slice +} + +// Calculate backoff delay with jitter +func (m *Manager) CalculateDelay() time.Duration { + // Return zero if this is first attempt + // Calculate base delay: initialDelay * (backoffFactor ^ attempts) + // Add jitter: delay * (1 ± random * jitterFactor) + // Cap at maximum delay + // Return calculated delay +} + +// Check if should attempt reconnection +func (m *Manager) ShouldAttempt() bool { + // Return false if disabled + // Check if max attempts exceeded (unless unlimited) + // Return true if should attempt +} + +// Record reconnection attempt +func (m *Manager) RecordAttempt() { + // Increment attempts counter + // Update lastAttempt timestamp +} + +// Reset after successful connection +func (m *Manager) Reset() { + // Reset attempts to zero + // Clear lastAttempt timestamp +} + +// Handle successful reconnection +func (m *Manager) HandleReconnect() { + // Reset attempt counter + // Execute all registered callbacks +} +``` + +### 3. Connection Health Monitoring + +**Purpose**: Monitors connection health using ping/pong heartbeats. + +**Dependencies**: `roots-ws` Error types + +```go +package health + +// Health monitoring configuration +type Config struct { + PingInterval time.Duration // Default: 25 seconds + PongTimeout time.Duration // Default: 15 seconds + PingFailureThreshold int // Default: 3 consecutive failures +} + +// Default health configuration +func DefaultConfig() *Config { + return &Config{ + PingInterval: 25 * time.Second, + PongTimeout: 15 * time.Second, + PingFailureThreshold: 3, + } +} + +// Health monitor tracks connection health +type Monitor struct { + Config *Config + conn *websocket.Conn + lastPingSent time.Time + lastPongReceived time.Time + consecutiveFailures int + isHealthy bool +} + +// Create new health monitor +func NewMonitor(conn *websocket.Conn, config *Config) *Monitor { + // Use default config if none provided + // Initialize monitor with healthy state + // Set up ping/pong handlers on connection + // Return initialized monitor +} + +// Start health monitoring +func (m *Monitor) Start() { + // Start ticker with PingInterval + // Send pings at regular intervals + // Monitor pong responses + // Update health status based on responses +} + +// Stop health monitoring +func (m *Monitor) Stop() { + // Stop ticker + // Clean up resources +} + +// Check connection health +func (m *Monitor) IsHealthy() bool { + // Return current health status +} + +// Get time since last successful pong +func (m *Monitor) TimeSinceLastPong() time.Duration { + // Return duration since lastPongReceived + // Return max duration if no pong received yet +} + +// Handle ping send +func (m *Monitor) handlePingSend() { + // Send ping frame + // Update lastPingSent timestamp + // Schedule timeout check +} + +// Handle pong received +func (m *Monitor) handlePongReceived() { + // Update lastPongReceived timestamp + // Reset consecutive failures counter + // Update health status to healthy +} + +// Check for ping timeout +func (m *Monitor) checkPingTimeout() { + // Calculate time since last ping sent + // If exceeded PongTimeout: + // - Increment consecutive failures + // - Check if failures exceed threshold + // - Update health status if needed +} +``` + +### 4. Rate Limiting + +**Purpose**: Provides simple rate limiting to prevent message flooding. + +**Dependencies**: `roots-ws` Error types + +```go +package ratelimit + +// Rate limiting configuration +type Config struct { + Enabled bool // Whether rate limiting is active (default: false) + MessagesPerSec int // Maximum messages per second (default: 10) + BurstSize int // Maximum burst allowed (default: 30) +} + +// Default rate limit configuration +func DefaultConfig() *Config { + return &Config{ + Enabled: false, // Off by default + MessagesPerSec: 10, // Reasonable default + BurstSize: 30, // Allow short bursts + } +} + +// Token bucket for rate limiting +type Limiter struct { + rate float64 // Tokens per second + maxTokens int // Maximum capacity + availableTokens float64 // Current token count + lastRefill time.Time // Last time tokens were added + mutex sync.Mutex // For thread safety +} + +// Create new rate limiter with configuration +func NewLimiter(config *Config) *Limiter { + // If disabled, return nil + // Create token bucket with initial tokens + // Set rate based on configuration + // Return initialized limiter +} + +// Check if operation is allowed by rate limit +func Allow(limiter *Limiter) bool { + // Return true if limiter is nil (disabled) + // Lock for thread safety + // Refill tokens based on time elapsed + // Check if bucket has at least one token + // Consume token if available and return true + // Return false if no tokens available +} + +// Wait until operation is allowed with timeout +func Wait(limiter *Limiter, timeout time.Duration) error { + // Return nil if limiter is nil (disabled) + // Set up context with timeout + // Loop until context cancelled or operation allowed + // Calculate time until next token available + // Wait for token to become available or context cancelled +} + +// Refill tokens based on elapsed time +func refillTokens(limiter *Limiter) { + // Calculate elapsed time since last refill + // Calculate tokens to add based on rate + // Add tokens up to max capacity + // Update last refill time +} +``` + +### 5. Compression + +**Purpose**: Implements WebSocket compression support for bandwidth efficiency. + +**Dependencies**: None + +```go +package compression + +// Compression configuration +type Config struct { + Enabled bool // Whether compression is enabled (default: false) + Level int // Compression level 0-9 (default: 6) +} + +// Default compression configuration +func DefaultConfig() *Config { + return &Config{ + Enabled: false, // Off by default + Level: 6, // Default is balanced compression + } +} + +// Enable compression on WebSocket connection +func EnableCompression(conn *websocket.Conn, config *Config) error { + // Return early if compression not enabled + // Set compression parameters based on configuration + // Enable write compression on connection + // Return any errors from enabling compression +} + +// Create dialer with compression support +func ConfigureDialer(dialer *websocket.Dialer, config *Config) { + // Return early if compression not enabled + // Enable compression on dialer + // Set compression level if supported by implementation +} + +// Create compression options for server +func GetServerCompressionOptions(config *Config) *websocket.CompressionOptions { + // Return nil if compression not enabled + // Create compression options with configured level + // Set appropriate parameters for server-side compression + // Return compression options +} + +// Check if message should be compressed +func ShouldCompress(size int, contentType string) bool { + // Skip very small messages (under 256 bytes) + // Skip already compressed content types + // Return true for compressible content +} +``` + +### 6. Connection Metrics + +**Purpose**: Collects basic connection statistics for monitoring and debugging. + +**Dependencies**: None + +```go +package metrics + +// Connection metrics collector +type Collector struct { + StartTime time.Time // When the connection was established + MessagesSent int64 // Total messages sent + MessagesReceived int64 // Total messages received + Errors int64 // Total errors encountered + ReconnectionCount int // Number of reconnection attempts + BytesSent int64 // Total bytes sent + BytesReceived int64 // Total bytes received + LastMessageSent time.Time // Timestamp of last outgoing message + LastMessageReceived time.Time // Timestamp of last incoming message + mutex sync.RWMutex // For thread-safe updates to non-atomic fields +} + +// Create new metrics collector +func NewCollector() *Collector { + // Initialize collector with current timestamp + // Set initial counters to zero + // Return initialized collector +} + +// Record sent message +func RecordSent(collector *Collector, size int) { + // Skip if collector is nil + // Atomically increment sent counter + // Atomically add size to bytes sent + // Update last message sent timestamp +} + +// Record received message +func RecordReceived(collector *Collector, size int) { + // Skip if collector is nil + // Atomically increment received counter + // Atomically add size to bytes received + // Update last message received timestamp +} + +// Record error +func RecordError(collector *Collector) { + // Skip if collector is nil + // Atomically increment error counter +} + +// Record reconnection attempt +func RecordReconnection(collector *Collector) { + // Skip if collector is nil + // Safely increment reconnection counter +} + +// Get connection uptime +func GetUptime(collector *Collector) time.Duration { + // Return zero if collector is nil + // Calculate duration since start time +} + +// Calculate messages per second (recent rate) +func GetMessagesPerSecond(collector *Collector) float64 { + // Return zero if collector is nil or insufficient history + // Calculate messages per second based on total messages and uptime +} + +// Reset metrics (typically after reconnection) +func Reset(collector *Collector, resetStartTime bool) { + // Skip if collector is nil + // Reset message counters to zero + // Reset byte counters to zero + // Reset timestamps if requested + // Keep reconnection count +} + +// Get metrics snapshot as map +func GetSnapshot(collector *Collector) map[string]interface{} { + // Return empty map if collector is nil + // Create map with current metrics values + // Include derived metrics (uptime, message rates) + // Return complete snapshot +} +``` + +### 7. Message Router Implementation + +**Purpose**: Implements event loops for message routing based on type. + +**Dependencies**: `roots-ws` Message routing logic, Message validation + +```go +package router + +// Create routing channels +func CreateChannels(additionalLabels ...string) map[string]chan []byte { + // Initialize channels for each standard message type: + // EVENT, REQ, CLOSE, NOTICE, OK, EOSE, AUTH + // Set appropriate buffer sizes for each channel type + // based on expected message volume +} + +// Route a message to appropriate channel +func RouteEnvelope(msg []byte, channels map[string]chan []byte) error { + // Extract message type using roots.message.GetType() + // Verify message type is valid + // Use roots.router.DetermineRoute() to select appropriate channel + // Handle unknown message types gracefully + // Send message to channel with non-blocking behavior + // Return error if channel is full or message type is invalid +} + +// Process messages from connection +func StartRouting(conn *connection.Connection, channels map[string]chan []byte) { + // Launch goroutine for message processing + // Read from connection's raw message source + // Apply message validation and size checks using roots-ws functions + // Route each message to appropriate channel + // Handle routing errors without crashing the router + // Clean up when connection closes +} +``` + +### 8. Write Queue Implementation + +**Purpose**: Provides thread-safe message transmission. + +**Dependencies**: `roots-ws` Error types + +```go +package write + +// Thread-safe write queue +type Queue struct { + MaxSize int + conn *connection.Connection + mutex sync.Mutex +} + +// Create new write queue for a connection +func NewQueue(conn *connection.Connection, maxSize int) *Queue { + // Initialize queue with specified max size + // Return fully initialized queue +} + +// Send message through queue +func Send(queue *Queue, msg []byte) error { + // Check if message exceeds max size + // Acquire mutex lock for thread safety + // Write message to connection's OutChan + // Release mutex lock + // Return any write errors encountered +} + +// Start queue processing +func StartQueue(queue *Queue) { + // Start goroutine to monitor connection's OutChan + // Process messages in thread-safe manner + // Handle errors appropriately +} +``` + +### 9. Initiator Functions + +**Purpose**: Implements connection initiator patterns. + +**Dependencies**: `roots-ws` Protocol Messages + +```go +package initiator + +// Send EVENT envelope +func PublishEvent(queue *write.Queue, event []byte) error { + // Validate event format using roots-ws + // Create EVENT message using roots-ws + // Send through write queue + // Return any errors encountered +} + +// Send REQ envelope +func SendReq(queue *write.Queue, subID string, filters [][]byte) error { + // Validate subscription ID format and filters + // Create REQ message using roots-ws + // Send through write queue + // Return any errors encountered +} + +// Send CLOSE envelope +func SendClose(queue *write.Queue, subID string) error { + // Validate subscription ID format + // Create CLOSE message using roots-ws + // Send through write queue + // Return any errors encountered +} + +// Send AUTH response envelope +func SendAuthResponse(queue *write.Queue, event []byte) error { + // Validate event format using roots-ws + // Create AUTH message using roots-ws + // Send through write queue + // Return any errors encountered +} +``` + +### 10. Responder Functions + +**Purpose**: Implements connection responder patterns. + +**Dependencies**: `roots-ws` Protocol Messages + +```go +package responder + +// Send event within subscription context +func SendEvent(queue *write.Queue, subID string, event []byte) error { + // Validate subscription ID and event format + // Create EVENT message with subscription ID included + // Send through write queue + // Return any errors encountered +} + +// Send EOSE for subscription +func SendEOSE(queue *write.Queue, subID string) error { + // Validate subscription ID format + // Create EOSE message for subscription + // Send through write queue + // Return any errors encountered +} + +// Send NOTICE message +func SendNotice(queue *write.Queue, message string) error { + // Validate notice message content + // Create NOTICE message + // Send through write queue + // Return any errors encountered +} + +// Send OK message for event +func SendOK(queue *write.Queue, eventID string, success bool, message string) error { + // Validate event ID format + // Create OK message with event ID, success flag, and message + // Send through write queue + // Return any errors encountered +} + +// Send AUTH challenge +func SendAuthChallenge(queue *write.Queue, challenge string) error { + // Validate challenge format + // Create AUTH challenge message + // Send through write queue + // Return any errors encountered +} + +// Send CLOSED message +func SendClosed(queue *write.Queue, subID string, message string) error { + // Validate subscription ID format + // Create CLOSED message with subscription ID and reason + // Send through write queue + // Return any errors encountered +} +``` + +### 11. Subscription Management + +**Purpose**: Manages subscription state and handles reconnection. + +**Dependencies**: `roots-ws` Subscription types + +```go +package subscription + +// Registry tracks subscriptions for reconnection +type Registry struct { + mu sync.RWMutex + subscriptions map[string]Entry +} + +type Entry struct { + ID string + Filters [][]byte + Active bool +} + +// Create new registry +func NewRegistry() *Registry { + // Initialize empty registry with map + // Return initialized registry +} + +// Add subscription to registry +func (r *Registry) Add(id string, filters [][]byte) { + // Lock mutex for thread safety + // Create entry with provided ID and filters + // Set Active to true + // Add to subscriptions map + // Unlock mutex +} + +// Remove subscription from registry +func (r *Registry) Remove(id string) { + // Lock mutex for thread safety + // Delete entry from subscriptions map + // Unlock mutex +} + +// Get active subscriptions +func (r *Registry) GetActive() map[string][][]byte { + // Lock mutex for read access + // Create map of active subscription IDs to filters + // Unlock mutex + // Return active subscriptions map +} + +// Subscribe with registry tracking +func Subscribe(queue *write.Queue, registry *Registry, id string, filters [][]byte) error { + // Create REQ message using initiator.SendReq + // Register subscription in registry if successful + // Return any errors from send operation +} + +// Unsubscribe with registry tracking +func Unsubscribe(queue *write.Queue, registry *Registry, id string) error { + // Create CLOSE message using initiator.SendClose + // Remove from registry regardless of send result + // Return any errors from send operation +} + +// Register reconnect callback for a connection +func RegisterReconnectCallback(conn *connection.Connection, registry *Registry, queue *write.Queue) { + // Create callback function that reestablishes subscriptions + // Register callback with connection using OnReconnect +} + +// Reestablish subscriptions after reconnection +func ReestablishSubscriptions(queue *write.Queue, registry *Registry) []error { + // Get active subscriptions from registry + // Attempt to resubscribe to each using initiator.SendReq + // Collect any errors encountered + // Return array of errors (empty if all succeeded) +} +``` + +### 12. Connection Pool + +**Purpose**: Manages multiple connections with shared operations. + +**Dependencies**: Connection, Write Queue, Router types + +```go +package pool + +// Connection pool manages multiple relays +type Pool struct { + mu sync.RWMutex // Protects connections and queues maps + connections map[string]*connection.Connection + queues map[string]*write.Queue + Channels map[string]chan []byte +} + +// Create new pool with optional initial relays +func New(urls []string, config connection.Config) (*Pool, error) { + // Initialize pool with empty maps + // Set up shared routing channels + // Add initial relays if provided + // Return initialized pool +} + +// Add relay to the pool +func (p *Pool) AddRelay(url string) error { + // Lock mutex for thread safety + // Check if relay already exists + // Create new connection with config + // Create write queue for connection + // Start routing to shared channels + // Connect to relay + // Add to connections and queues maps + // Unlock mutex + // Return any errors encountered +} + +// Remove relay from pool +func (p *Pool) RemoveRelay(url string) error { + // Lock mutex for thread safety + // Check if relay exists in pool + // Close connection + // Remove from connections and queues maps + // Unlock mutex + // Return any errors encountered +} + +// Send to specific relay +func (p *Pool) SendTo(url string, env []byte) error { + // Lock mutex for read access + // Find queue for relay + // Unlock mutex + // Send envelope through queue + // Return any errors encountered +} + +// Broadcast to all relays +func (p *Pool) Broadcast(env []byte) []error { + // Lock mutex for read access + // Create copy of queues map to prevent deadlock + // Unlock mutex + // Send to all relays in copy + // Collect any errors + // Return array of errors (empty if all succeeded) +} + +// Subscribe to all relays +func (p *Pool) Subscribe(id string, filters [][]byte, registry *subscription.Registry) []error { + // Lock mutex for read access + // Create copy of queues map to prevent deadlock + // Unlock mutex + // Subscribe to all relays in copy + // Register subscription if successful + // Collect any errors + // Return array of errors (empty if all succeeded) +} + +// Unsubscribe from all relays +func (p *Pool) Unsubscribe(id string, registry *subscription.Registry) []error { + // Lock mutex for read access + // Create copy of queues map to prevent deadlock + // Unlock mutex + // Unsubscribe from all relays in copy + // Remove from registry + // Collect any errors + // Return array of errors (empty if all succeeded) +} + +// Enable automatic reconnection for all relays +func (p *Pool) EnableAutoReconnect(registry *subscription.Registry) { + // Lock mutex for read access + // Create copy of connections map to prevent deadlock + // Unlock mutex + // Register reconnect callbacks for all connections +} + +// Get connection status +func (p *Pool) Status(url string) (connection.ConnectionStatus, error) { + // Lock mutex for read access + // Find connection for relay + // Unlock mutex + // Return status or error if not found +} + +// Get all statuses +func (p *Pool) Statuses() map[string]connection.ConnectionStatus { + // Lock mutex for read access + // Create map of relay URLs to connection statuses + // Unlock mutex + // Return statuses map +} + +// Close all connections +func (p *Pool) Close() []error { + // Lock mutex for read access + // Create copy of connections map to prevent deadlock + // Unlock mutex + // Close all connections in copy + // Collect any errors + // Return array of errors (empty if all succeeded) +} +``` + +### 13. Error Types + +**Purpose**: Defines error types for the library. + +```go +package errors + +import "errors" + +// Connection errors +var ( + ErrConnectionFailed = errors.New("connection failed") + ErrConnectionTimeout = errors.New("connection timeout") + ErrConnectionClosed = errors.New("connection closed unexpectedly") + ErrInvalidState = errors.New("invalid state for operation") + ErrMaxReconnectAttemptsExceeded = errors.New("maximum reconnection attempts exceeded") + ErrPongTimeout = errors.New("pong response timeout") + ErrConnectionUnhealthy = errors.New("connection deemed unhealthy") +) + +// Write errors +var ( + ErrEnvelopeTooLarge = errors.New("envelope exceeds maximum size") + ErrWriteFailed = errors.New("write failed") +) + +// Router errors +var ( + ErrUnknownEnvelopeType = errors.New("unknown envelope type") + ErrChannelFull = errors.New("channel buffer full") +) + +// Pool errors +var ( + ErrRelayNotFound = errors.New("relay not found in pool") + ErrRelayAlreadyExists = errors.New("relay already exists in pool") +) + +// Subscription errors +var ( + ErrSubscriptionNotFound = errors.New("subscription not found") + ErrSubscriptionDuplicate = errors.New("subscription ID already exists") +) +``` + +### 14. Main Package + +**Purpose**: Exports key types and functions with sensible defaults. + +**Dependencies**: All other packages + +```go +package honeybee + +import ( + "time" + + "github.com/nostr-protocol/go-roots-ws" + "github.com/nostr-protocol/go-honeybee/connection" + "github.com/nostr-protocol/go-honeybee/errors" + "github.com/nostr-protocol/go-honeybee/health" + "github.com/nostr-protocol/go-honeybee/initiator" + "github.com/nostr-protocol/go-honeybee/pool" + "github.com/nostr-protocol/go-honeybee/reconnection" + "github.com/nostr-protocol/go-honeybee/responder" + "github.com/nostr-protocol/go-honeybee/router" + "github.com/nostr-protocol/go-honeybee/subscription" + "github.com/nostr-protocol/go-honeybee/write" +) + +// Export key types and functions for convenient access +type Connection = connection.Connection +type ConnectionStatus = connection.ConnectionStatus +type ConnectionConfig = connection.Config +type WriteQueue = write.Queue +type Pool = pool.Pool +type SubscriptionRegistry = subscription.Registry +type ReconnectionManager = reconnection.Manager +type ReconnectionConfig = reconnection.Config +type HealthMonitor = health.Monitor +type HealthConfig = health.Config + +// Default configuration +func DefaultConfig() connection.Config { + return connection.Config{ + ConnectionTimeout: 10 * time.Second, + ReconnectConfig: reconnection.DefaultConfig(), + HealthConfig: health.DefaultConfig(), + MaxMessageSize: 1024 * 1024, // 1MB + } +} +``` + +## Cross-Language Translation: Go to TypeScript with RxJS + +#### Core Architectural Principles + +**Recognize the fundamental paradigm difference:** +- Go is built around CSP (Communicating Sequential Processes) - goroutines passing data through channels +- TypeScript/RxJS is built around reactive streams - declarative transformations of event flows +- Don't translate patterns 1:1; translate *intent* into each language's natural expression + +**Design APIs that feel native:** +- Go users expect constructors that return values and errors, explicit `Start()` and `Close()` methods +- TypeScript users expect constructors that return ready-to-use objects, automatic activation on subscription +- The same feature should have different surface APIs in each language + +#### TypeScript Multi-Environment Strategy + +**Shared foundation with environment-specific transport:** +- `roots-ws` (TypeScript): Pure functions for envelope handling, identical across environments +- `honeybee-browser`: Reactive wrapper for browser WebSocket APIs +- `honeybee-node`: Reactive wrapper for Node.js WebSocket libraries +- Both honeybee variants expose identical observable-based APIs while using appropriate underlying implementations + +**Environment-aware design patterns:** +- Browser: Event-driven lifecycle, page visibility handling, CORS considerations +- Node: Process lifecycle, server patterns, granular connection control +- Shared: Message routing, subscription management, error handling patterns + +#### Go-Specific Considerations + +**Embrace structured concurrency:** +- Use goroutines liberally - they're cheap and the expected tool +- Always pair goroutines with cleanup using `context.Context` and `sync.WaitGroup` +- Channels are the primary communication mechanism between concurrent operations +- Make blocking behavior explicit and expected + +**Error handling:** +- Return errors as values, don't hide them in callbacks +- Provide typed errors users can switch on: `var ErrConnectionClosed = errors.New(...)` +- Use `fmt.Errorf` with `%w` for error wrapping to preserve context +- Consider sentinel errors for common conditions + +**Lifecycle management:** +- Explicit `Start()` and `Close()` methods are idiomatic +- `Close()` should be safe to call multiple times +- Use `defer` for cleanup - teach users this pattern +- Consider implementing `io.Closer` interface + +**Type system:** +- Prefer interfaces for dependencies (like `Dialer`, `Conn`) +- Keep interfaces small and focused (interface segregation) +- Use struct embedding for composition +- Type assertions with the `value, ok := x.(Type)` pattern for optional capabilities + +**Testing:** +- Provide interface-based abstractions specifically for testability +- Table-driven tests are the norm +- Use `testing.T` helpers and subtests +- Mock by implementing interfaces, not with frameworks + +#### TypeScript/RxJS-Specific Considerations + +**Embrace reactive composition:** +- Expose observables as the primary API, not methods that trigger side effects +- Let users compose streams with operators rather than providing pre-composed solutions +- Hot vs cold observables matter - document which is which +- Use subjects internally only when you need to push values into a stream + +**Declarative over imperative:** +- Prefer defining what streams represent over how to create them +- Use operators like `map`, `merge`, `switchMap` to transform streams +- Avoid manual subscription management - use `takeUntil` for cleanup +- Let the framework handle execution timing + +**Error handling:** +- Errors flow through the observable pipeline +- Provide separate error streams rather than mixing errors with data +- Use `catchError` to handle and recover from errors within streams +- Consider retry strategies with `retry`, `retryWhen` + +**Lifecycle management:** +- Construction should be side-effect free +- Activation happens on subscription, deactivation on unsubscription +- Use `finalize` operator for cleanup logic +- Provide a `destroy()` or `close()` method only for resource cleanup, not stream management + +**Type system:** +- Use TypeScript's structural typing - interfaces describe shapes +- Generics for reusable stream types: `Observable` +- Discriminated unions for state modeling +- Leverage type inference where possible + +**Testing:** +- Use marble testing for complex stream interactions +- Mock at the observable level, not internal implementation +- Test by subscribing and asserting on emitted values +- Use `TestScheduler` for time-based scenarios + +#### Environment-Specific Patterns + +**Browser WebSocket Handling:** +```typescript +// Browser: Event-driven API with limited control +class BrowserConnection { + private socket: WebSocket; + + constructor(url: string) { + this.socket = new WebSocket(url); + this.setupEventListeners(); + } + + private setupEventListeners() { + this.socket.onopen = () => this.statusSubject.next('connected'); + this.socket.onmessage = (event) => this.messagesSubject.next(event.data); + this.socket.onclose = () => this.handleReconnection(); + } + + // Limited connection parameter control + // CORS restrictions apply + // Browser manages some reconnection behavior +} +``` + +**Node.js WebSocket Handling:** +```typescript +// Node: More granular control and server patterns +class NodeConnection { + private socket: WebSocket; + + constructor(url: string, options: NodeWebSocketOptions) { + this.socket = new WebSocket(url, { + headers: options.headers, + rejectUnauthorized: options.ssl?.verify, + // Full control over connection parameters + }); + this.setupEventListeners(); + } + + // Server-side acceptance patterns + static fromIncoming(socket: WebSocket): NodeConnection { + return new NodeConnection(socket); + } + + // Custom authentication flows + // Process lifecycle integration + // Resource cleanup responsibilities +} +``` + +**Shared Observable Patterns:** +```typescript +// Both environments expose identical API patterns +interface HoneybeeConnection { + readonly messages$: Observable; + readonly connectionState$: Observable; + readonly errors$: Observable; + + send(data: Uint8Array): void; + close(): void; +} +``` + +#### Specific Feature Comparisons + +**Connection establishment:** +```go +// Go: explicit, returns error +conn, err := websocket.Connect("ws://example.com") +if err != nil { + return err +} +defer conn.Close() +``` + +```typescript +// TypeScript Browser: declarative, errors in stream +const conn = new BrowserConnection("ws://example.com"); +conn.messages$.subscribe(msg => /* ... */); +conn.errors$.subscribe(err => /* ... */); + +// TypeScript Node: declarative with more options +const conn = new NodeConnection("ws://example.com", { + headers: { 'Authorization': 'Bearer token' }, + ssl: { verify: true } +}); +conn.messages$.subscribe(msg => /* ... */); +``` + +**Reading messages:** +```go +// Go: blocking channel read in goroutine +go func() { + for msg := range conn.Incoming { + process(msg) + } +}() +``` + +```typescript +// TypeScript: subscribe to observable (both environments) +conn.messages$.subscribe(msg => process(msg)); +``` + +**Environment-specific optimizations:** +```typescript +// Browser: Page visibility API integration +conn.messages$.pipe( + pausable(pageVisibility$), + // Pause subscriptions when page hidden +).subscribe(msg => process(msg)); + +// Node: Process lifecycle integration +process.on('SIGTERM', () => { + conn.close(); + // Graceful shutdown +}); +``` + +**Reconnection logic:** +```go +// Go: explicit retry loop with backoff +for attempt := 0; attempt < maxRetries; attempt++ { + conn, err := dial() + if err == nil { + return conn, nil + } + time.Sleep(backoff(attempt)) +} +``` + +```typescript +// TypeScript Browser: Limited reconnection control +const conn$ = connect().pipe( + retryWhen(errors => errors.pipe( + delay(1000), + take(3) + )) +); + +// TypeScript Node: Full reconnection control +const conn$ = connect().pipe( + retryWhen(errors => errors.pipe( + switchMap((error, index) => + customBackoffStrategy(error, index) + ), + takeWhile(shouldRetry) + )) +); +``` + +#### Library Structure Comparison + +**Go Structure:** +``` +honeybee/ +├── connection/ // Core connection management +├── reconnection/ // Backoff and retry logic +├── health/ // Ping/pong monitoring +├── router/ // Message routing +└── pool/ // Multi-connection management +``` + +**TypeScript Structure:** +``` +roots-ws/ // Shared pure functions +├── envelope.ts // Message parsing/creation +├── validation.ts // Structure validation +└── errors.ts // Error types + +honeybee-browser/ // Browser-specific transport +├── connection.ts // Browser WebSocket wrapper +├── health.ts // Visibility-aware monitoring +└── pool.ts // Multi-relay management + +honeybee-node/ // Node-specific transport +├── connection.ts // Node WebSocket wrapper +├── server.ts // Server-side patterns +└── pool.ts // Process-aware management +``` + +#### Documentation Differences + +**Go documentation:** +- Focus on goroutine safety and channel semantics +- Show explicit error handling patterns +- Demonstrate lifecycle with defer +- Example code shows complete programs, not snippets + +**TypeScript documentation:** +- Focus on observable composition and operators +- Show stream transformations and combinations +- Demonstrate subscription management +- Document environment-specific behaviors and limitations +- Example code shows reactive chains and patterns +- Clearly distinguish browser vs Node capabilities + +#### Common Pitfalls to Avoid + +**Don't force Go patterns into TypeScript:** +- No "Start()" methods that users must call +- Don't require manual subscription management +- Don't expose channels-as-observables literally + +**Don't force TypeScript patterns into Go:** +- No "auto-starting" connections that happen in constructors +- Don't hide goroutines from users +- Don't try to make channels look like observables + +**Environment-specific pitfalls:** +- Don't assume browser capabilities in Node implementation +- Don't assume Node.js process control in browser implementation +- Don't leak environment-specific types across the API boundary +- Don't ignore CORS implications in browser documentation + +**Respect memory models:** +- Go: channel sends are synchronization points, document goroutine ownership +- TypeScript Browser: observables handle threading implicitly, respect event loop constraints +- TypeScript Node: observables handle threading implicitly, document process integration patterns + +**Handle cleanup appropriately:** +- Go: explicit Close() with context cancellation +- TypeScript Browser: automatic cleanup via unsubscription, respect page lifecycle +- TypeScript Node: automatic cleanup via unsubscription and finalize, respect process lifecycle + +The goal is that a Go developer looks at your Go library and thinks "this feels like Go," while TypeScript developers in both browser and Node environments think "this feels like reactive programming in my environment." They solve the same problems but use each platform's strengths rather than fighting against them. + +## Architecture Decision Records + +### ADR 1: Practical WebSocket Implementation + +**Context**: Need to define the implementation approach for the mid-level library. + +**Decision**: Implement `honeybee` as a practical, lean WebSocket transport with essential connection management, while deferring advanced features to higher levels. + +**Rationale**: +- Provides a usable implementation that doesn't require additional libraries for basic operation +- Creates clear expectations about what functionality is included +- Ensures stability by focusing on well-understood, essential features +- Makes the library approachable for most common use cases +- Enables straightforward testing and maintenance +- Establishes a foundation that can be enhanced by mana-ws components +- Creates clear boundaries between essential and advanced functionality + +**Consequences**: +- Must clearly define what constitutes "essential" vs "advanced" features +- Some consumers may need to add `mana-ws` for specialized requirements +- Implementation will be more complex than `roots-ws` but simpler than `mana-ws` +- Strikes a balance between functionality and simplicity +- Creates a natural upgrade path as application needs grow +- Interfaces designed to enable drop-in replacements from mana-ws +- Clear criteria established for what constitutes "essential" functionality + +### ADR 2: Event Loop Based Architecture + +**Context**: Need to determine how to handle asynchronous operations. + +**Decision**: Use event loops with channels (Go) or observables (TypeScript) for asynchronous message handling. + +**Rationale**: +- Leverages the natural concurrency models of each language +- Provides consistent patterns for handling async events +- Enables composition with existing application event loops +- Makes asynchronous operations explicit and manageable +- Creates clear patterns for resource management and cleanup + +**Consequences**: +- Requires careful channel/observable management to prevent leaks +- Creates more complex control flow than synchronous code +- Improves scalability for high-volume connections +- Requires education for developers unfamiliar with the pattern +- Enables consistent error handling patterns + +### ADR 3: Role-Specific Connection Patterns + +**Context**: Nostr protocol has asymmetric connection roles. + +**Decision**: Implement distinct initiator and responder patterns while sharing core connection code. + +**Rationale**: +- Creates clear API boundaries for each role +- Prevents incorrect usage of operations not permitted for a role +- Allows specialized optimizations for each role +- Makes role expectations explicit in code +- Follows the principle of least privilege + +**Consequences**: +- Increases API surface area with role-specific functions +- Improves clarity about permitted operations for each role +- May require some code duplication for shared functionality +- Makes incorrect usage harder by design +- Simplifies application code by providing role-appropriate functions + +### ADR 4: Simple State Management + +**Context**: Need to balance functionality with complexity. + +**Decision**: Implement simple, explicit state management without sophisticated tracking or analytics. + +**Rationale**: +- Keeps the library focused and understandable +- Provides essential functionality without complexity +- Makes state changes explicit and traceable +- Reduces potential for bugs in state transitions +- Aligns with the functional approach of the stack + +**Consequences**: +- Advanced state management must be implemented by consumers or `mana-ws` +- Functionality is more limited but more reliable +- State transitions are more predictable +- Less risk of unexpected behavior +- May require more manual management for complex applications + +### ADR 5: Basic Resilience Features + +**Context**: Need to determine what resilience features belong in this layer. + +**Decision**: Include basic reconnection and health monitoring with simple strategies. + +**Rationale**: +- Provides essential resilience without complex implementation +- Makes connections practical for real-world usage +- Handles common failure scenarios gracefully +- Avoids complex recovery strategies that belong in `mana-ws` +- Ensures basic reliability without overwhelming complexity + +**Consequences**: +- Limited to simpler reconnection strategies +- May not handle all edge cases optimally +- Provides "good enough" reliability for most applications +- Creates clear extension points for more sophisticated strategies +- Makes expected behavior clear and predictable + +### ADR 6: Dependency Minimization + +**Context**: Need to determine external dependency strategy. + +**Decision**: Limit dependencies to `roots-ws` and standard libraries only. + +**Rationale**: +- Reduces potential for dependency conflicts +- Simplifies maintenance and upgrades +- Creates predictable behavior across environments +- Improves security by limiting attack surface +- Makes the library more self-contained and portable + +**Consequences**: +- May require implementing functionality available in external libraries +- Increases code that must be maintained within the project +- Reduces unexpected behavior from external dependencies +- Improves long-term stability +- May limit some advanced features that would require external libraries + +### ADR 7: Simplicity as a First-Class Feature + +**Context**: Need to determine how to balance functionality with ease of understanding and maintenance. + +**Decision**: Treat simplicity as a first-class feature by deliberately deferring advanced functionality to separate repositories (mana-ws) and maintaining clear, focused APIs. + +**Rationale**: +- Creates a more approachable entry point for new developers +- Reduces cognitive load for common use cases +- Improves maintainability by constraining scope +- Enhances testability through focused functionality +- Provides clearer documentation and learning path +- Makes complexity an opt-in choice rather than a requirement +- Allows the library to excel at its core purpose + +**Consequences**: +- Some advanced use cases will require additional libraries +- Clear criteria needed for what belongs in honeybee vs. mana-ws +- Requires discipline to maintain simplicity over time +- Documentation must clearly explain the layered approach +- May need to resist pressure to add advanced features directly + +### ADR 8: Interface-First Design for Extension + +**Context**: Need to ensure honeybee components can be extended or replaced by more sophisticated implementations. + +**Decision**: Design honeybee using explicit interfaces for all core components, with minimal surface area that future extensions can implement while adding capabilities. + +**Rationale**: +- Enables drop-in replacements from advanced libraries +- Creates clear extension points for specialized behavior +- Focuses implementation on essential behaviors +- Makes dependencies explicit through interfaces +- Provides stable API contracts for both users and extenders +- Supports future evolution without breaking changes +- Aligns with Go's interface-based composition patterns + +**Consequences**: +- Requires more initial design effort +- Interface definitions must be stable and forward-thinking +- Implementation details must remain behind interfaces +- May constrain some implementation choices +- Enables cleaner separation between layers +- Improves testability through interface-based mocks + +# `mana-ws` + +## Functional Architecture + +### 1. Advanced Write Queue + +**Purpose**: Implements sophisticated back-pressure management, prioritization, and batching. + +**Dependencies**: `honeybee` WriteQueue, `roots-ws` Error types + +```go +// Go implementation +// Core types extend honeybee types +type AdvancedWriteQueue struct { + honeybee.WriteQueue + HighWatermark int + LowWatermark int + UnderPressure bool + PriorityLevels int // Number of priority levels (default: 3) + PriorityQueues [][]Message // Separate queues for different priorities + BatchSize int // Maximum messages to batch (default: 10) + BatchDelay time.Duration // Maximum delay before sending batch (default: 20ms) + Stats WriteQueueStats + bufferPool *sync.Pool // Pool of reusable buffers +} + +type WriteQueueStats struct { + MessagesQueued int64 + MessagesSent int64 + WriteErrors int64 + DroppedMessages int64 + DelayedMessages int64 + BatchesSent int64 + AverageBatchSize float64 + PressureEvents int64 + PressureDuration time.Duration +} + +// Message with priority information +type Message struct { + Content []byte + Priority int // Higher number = higher priority + Created time.Time +} + +// Create new advanced write queue with prioritization +func New(maxSize int, priorityLevels int) *AdvancedWriteQueue { + // Initialize queue with specified max size + // Set default watermarks (high=80%, low=50% of max size) + // Create priority queues array based on priorityLevels (default: 3) + // Initialize buffer pool for message batching + // Initialize statistics counters + // Set initial pressure state to false + // Return fully initialized queue +} + +// Initialize buffer pool for efficient memory usage +func initBufferPool() *sync.Pool { + // Create sync.Pool for byte buffers + // Set New function to create buffers with reasonable initial size + // Return initialized pool +} + +// Send message through queue with back pressure and priority +func SendWithPriority(queue *AdvancedWriteQueue, socket *websocket.Conn, msg []byte, priority int) error { + // Check if message exceeds max size + // Acquire mutex lock for thread safety + // Apply back pressure strategies if under pressure + // Add message to appropriate priority queue + // Process queue if batch ready or timer expired + // Update statistics + // Release mutex lock + // Return any errors encountered +} + +// Process queued messages in priority order with batching +func ProcessQueue(queue *AdvancedWriteQueue, socket *websocket.Conn) { + // Start from highest priority queue + // Get buffer from pool + // Batch multiple messages into single WebSocket frame if possible + // For each priority level: + // - Collect up to BatchSize messages or until queue empty + // - Add to buffer with appropriate framing + // Send batched messages as single WebSocket frame when possible + // Return buffer to pool after use + // Update statistics +} + +// Update queue statistics +func UpdateStats(queue *AdvancedWriteQueue, messageSize int, batchSize int, success bool) { + // Atomically update message counters + // Track bytes sent if successful + // Update batch statistics + // Increment error counter if failed + // Update queue size tracking + // Recalculate back pressure status + // Update exponential moving average of batch size +} + +// Check if queue is under back pressure +func IsUnderPressure(queue *AdvancedWriteQueue) bool { + // Return current back pressure state + // Thread-safe access to pressure flag +} + +// Update back pressure status +func UpdateBackPressure(queue *AdvancedWriteQueue) { + // Check current queue size against watermarks + // If under pressure and size drops below low watermark, release pressure + // If not under pressure and size exceeds high watermark, apply pressure + // Log pressure state changes + // Track pressure event duration + // Thread-safe update of pressure flag +} + +// Apply back pressure strategies +func ApplyBackPressure(queue *AdvancedWriteQueue, msg []byte, priority int) (shouldSend bool, delay time.Duration) { + // If not under pressure, allow message with no delay + // If under pressure, apply priority-based filtering: + // - Always allow highest priority messages (control messages, CLOSE, EOSE) + // - Apply progressive dropping based on priority levels + // - Lower priorities have higher drop probability when under pressure + // - Calculate appropriate delay for lower priority messages + // - Update statistics for dropped/delayed messages + // Return decision on whether to send and how long to delay +} + +// Create prepared message for efficient reuse +func PrepareMessage(queue *AdvancedWriteQueue, messageType int, data []byte) (*websocket.PreparedMessage, error) { + // Create WebSocket PreparedMessage (compressed if enabled) + // This allows the message to be prepared once but sent to multiple destinations + // Return prepared message or error +} + +// Send prepared message through queue +func SendPrepared(queue *AdvancedWriteQueue, socket *websocket.Conn, prepared *websocket.PreparedMessage, priority int) error { + // Similar to SendWithPriority but for prepared messages + // Avoids repeated compression/preparation costs for broadcast scenarios + // Update statistics + // Return any errors +} +``` + +### 2. Connection Pool + +**Purpose**: Manages multiple connections with advanced load distribution, health-based routing, and adaptive scaling. + +**Dependencies**: `honeybee` Connection Pool + +```go +// Core types extend honeybee types +type AdvancedPool struct { + honeybee.Pool + LoadBalancingStrategy Strategy + HealthThreshold float64 // Minimum health score for active connections + MaxConnectionsPerHost int // Connection limit per host + ResourceMonitor *ResourceMonitor + Stats PoolStatistics + connectionCache *lru.Cache // Cache recently used connections + mutex sync.RWMutex +} + +// Load balancing strategies +type Strategy int + +const ( + RoundRobin Strategy = iota + LeastConnections + ResponseTime + WeightedRandom + ConsistentHashing +) + +// Resource allocation monitor +type ResourceMonitor struct { + TotalMemoryLimit int64 + TotalConnectionLimit int + MemoryPerConnection int64 + CurrentMemoryUsage int64 + GoroutineLimit int + CurrentGoroutines int +} + +// Pool statistics +type PoolStatistics struct { + ActiveConnections int + TotalMessages int64 + MessagesPerSecond float64 + ResponseTimeAverage time.Duration + ResponseTimeP95 time.Duration + ConnectionDistribution map[string]int // Count by host + FailoverEvents int + ResourceUtilization float64 // 0.0-1.0 + lastCalculated time.Time +} + +// Create new advanced connection pool +func NewAdvancedPool(config Config) *AdvancedPool { + // Initialize pool with honeybee base pool + // Set up load balancing strategy + // Initialize connection cache for connection reuse + // Set up resource monitor + // Initialize connection statistics tracking + // Return initialized pool +} + +// Add connection with resource constraints +func (p *AdvancedPool) AddConnection(url string, config honeybee.ConnectionConfig) error { + // Check if adding connection would exceed resource limits + // Check if host already has maximum connections + // Add connection if resources available + // Update resource utilization + // Return error if resource constraints prevent addition +} + +// Get connection using load balancing strategy +func (p *AdvancedPool) GetConnection(key string) (*honeybee.Connection, error) { + // Use load balancing strategy to select connection + // For consistent hashing, use key to determine connection + // For least connections, find connection with fewest active requests + // For response time, prefer faster connections + // Return selected connection or error if none available +} + +// Monitor health across pool and rebalance as needed +func (p *AdvancedPool) MonitorHealth() { + // Periodically check health of all connections + // Remove unhealthy connections from active pool + // Rebalance load when connections change state + // Attempt recovery of unhealthy connections in background +} + +// Manage resource allocation dynamically +func (p *AdvancedPool) ManageResources() { + // Track memory and goroutine usage + // Adjust connection limits based on current system load + // Implement graceful degradation when approaching resource limits + // Scale back non-essential operations under resource pressure +} + +// Calculate and update pool statistics +func (p *AdvancedPool) UpdateStatistics() { + // Calculate messages per second rate + // Update response time metrics + // Track connection distribution + // Calculate resource utilization percentage + // Identify imbalances or potential problems +} + +// Efficient connection lifecycle management +func (p *AdvancedPool) ManageLifecycle() { + // Implement connection recycling for frequently used endpoints + // Use connection cache to reduce connection overhead + // Gracefully phase out connections that need replacement + // Pre-warm connections for known high-traffic endpoints +} +``` + +### 3. Advanced Subscription Repository + +**Purpose**: Implements sophisticated tracking and restoration of subscriptions with adaptive optimization. + +**Dependencies**: `honeybee` Subscription Registry, `roots-ws` Subscription types + +```go +// Core types extend honeybee types +type AdvancedSubscriptionRepository struct { + honeybee.SubscriptionRegistry + CreationTimes map[string]time.Time // When subscriptions were created + UsageStats map[string]UsageStatistics + PriorityMap map[string]int // Subscription priorities + RestorationOrder []string // Ordered list for restoration + MaxSubscriptions int // Limit on active subscriptions + bufferPool *sync.Pool // For filter processing +} + +type UsageStatistics struct { + MessageCount int64 + LastActivity time.Time + ActivityFrequency float64 // Messages per minute + ResponseTimes []time.Duration // Recent response times + AvgResponseTime time.Duration + ImportanceScore float64 // Calculated from usage patterns +} + +// Create new advanced repository +func NewAdvancedRepository(maxSubs int) *AdvancedSubscriptionRepository { + // Initialize base subscription registry + // Set up subscription statistics tracking + // Initialize buffer pool for filter operations + // Set maximum subscription limit + // Initialize ordering for intelligent restoration + // Return initialized repository +} + +// Add subscription with priority +func (r *AdvancedSubscriptionRepository) AddWithPriority(id string, filters [][]byte, priority int) { + // Add to base registry + // Record creation time + // Set initial priority + // Initialize usage statistics + // Update restoration order based on priority + // Enforce subscription limits if reached +} + +// Update subscription statistics on activity +func (r *AdvancedSubscriptionRepository) RecordActivity(id string, msgCount int, responseTime time.Duration) { + // Update message count + // Record last activity timestamp + // Update activity frequency calculation + // Add to response time history + // Recalculate importance score + // Potentially adjust priority based on usage patterns +} + +// Intelligently restore subscriptions after reconnection +func (r *AdvancedSubscriptionRepository) RestoreWithPriority(queue *honeybee.WriteQueue, rateLimit int) []error { + // Sort subscriptions by priority and importance + // Restore highest priority/most important first + // Apply rate limiting to avoid flooding connection + // Track restoration outcomes + // Update statistics after restoration + // Return errors for failed restorations +} + +// Optimize subscription set +func (r *AdvancedSubscriptionRepository) OptimizeSubscriptions() []string { + // Identify inactive or low-value subscriptions + // Suggest subscriptions to combine or eliminate + // Return list of subscription IDs to consider removing +} + +// Manage subscription lifecycle +func (r *AdvancedSubscriptionRepository) ManageLifecycle() { + // Periodically review subscription activity + // Age out unused subscriptions + // Consolidate similar subscriptions when possible + // Balance subscription load across connections +} +``` + +### 4. Advanced Reconnection + +**Purpose**: Implements sophisticated reconnection with circuit breaker pattern and predictive strategies. + +**Dependencies**: `honeybee` Reconnection, `roots-ws` Error types + +```go +// Core types extend honeybee types +type AdvancedReconnectionConfig struct { + honeybee.ReconnectionConfig + Jitter float64 // Random jitter factor (0.0-1.0) + CircuitBreakerConfig CircuitBreakerConfig + FailureCategories map[string]FailureStrategy // Different strategies for error types + ProbeTimeout time.Duration // Timeout for probe attempts in half-open state + AdaptiveStrategy bool // Whether to use adaptive backoff based on history +} + +type CircuitBreakerConfig struct { + Enabled bool + FailureThreshold int // Failures before opening circuit + ResetTimeout time.Duration // Time before attempting reset + HalfOpenMaxCalls int // Max calls in half-open state +} + +type CircuitState int + +const ( + CircuitClosed CircuitState = iota // Normal operation + CircuitOpen // Failing, no attempts allowed + CircuitHalfOpen // Testing if recovered +) + +type AdvancedReconnectionState struct { + honeybee.ReconnectionState + FailureCount int + CircuitState CircuitState + CircuitOpenUntil time.Time + FailureHistory []FailureRecord // Recent failures for pattern detection + SuccessfulConnects int // Consecutive successful connections + ReconnectionTimes []time.Duration // Historical reconnection times +} + +type FailureRecord struct { + Time time.Time + ErrorType string + Message string + Duration time.Duration // How long the attempt took +} + +type FailureStrategy struct { + MaxAttempts int + BaseDelay time.Duration + MaxDelay time.Duration + BackoffFactor float64 +} + +// Create default advanced reconnection config +func DefaultAdvancedConfig() AdvancedReconnectionConfig { + // Start with honeybee default config + baseConfig := honeybee.reconnection.DefaultConfig() + + return AdvancedReconnectionConfig{ + ReconnectionConfig: baseConfig, + Jitter: 0.2, + CircuitBreakerConfig: CircuitBreakerConfig{ + Enabled: true, + FailureThreshold: 10, + ResetTimeout: 5 * time.Minute, + HalfOpenMaxCalls: 3, + }, + FailureCategories: map[string]FailureStrategy{ + // Different strategies for different error types + "network": {MaxAttempts: 5, BaseDelay: 1 * time.Second, MaxDelay: 30 * time.Second, BackoffFactor: 2.0}, + "dns": {MaxAttempts: 3, BaseDelay: 5 * time.Second, MaxDelay: 60 * time.Second, BackoffFactor: 2.0}, + "server": {MaxAttempts: 10, BaseDelay: 500 * time.Millisecond, MaxDelay: 10 * time.Second, BackoffFactor: 1.5}, + "auth": {MaxAttempts: 2, BaseDelay: 10 * time.Second, MaxDelay: 60 * time.Second, BackoffFactor: 2.0}, + }, + ProbeTimeout: 10 * time.Second, + AdaptiveStrategy: true, + } +} + +// Create new advanced reconnection state +func NewAdvancedState() *AdvancedReconnectionState { + // Initialize with base reconnection state + baseState := honeybee.reconnection.NewState() + + return &AdvancedReconnectionState{ + ReconnectionState: *baseState, + FailureCount: 0, + CircuitState: CircuitClosed, + CircuitOpenUntil: time.Time{}, + FailureHistory: make([]FailureRecord, 0, 10), + SuccessfulConnects: 0, + ReconnectionTimes: make([]time.Duration, 0, 10), + } +} + +// Calculate next backoff delay with jitter +func CalculateDelayWithJitter(state *AdvancedReconnectionState, config AdvancedReconnectionConfig, errorType string) time.Duration { + // Get failure strategy for this error type + strategy, exists := config.FailureCategories[errorType] + if !exists { + strategy = config.FailureCategories["network"] // Default + } + + // Calculate base delay based on strategy + attempt := state.FailureCount + baseDelay := strategy.BaseDelay * time.Duration(math.Pow(float64(strategy.BackoffFactor), float64(attempt))) + + // Cap at max delay + if baseDelay > strategy.MaxDelay { + baseDelay = strategy.MaxDelay + } + + // Apply adaptive strategy if enabled + if config.AdaptiveStrategy { + // Analyze failure patterns to adjust delay + // If failures occur at regular intervals, avoid those times + // If time-of-day patterns exist, factor those in + } + + // Apply jitter: delay * (1 ± jitterFactor * random) + jitterRange := float64(baseDelay) * config.Jitter + jitterAmount := (rand.Float64() * 2 - 1) * jitterRange // -jitterRange to +jitterRange + + finalDelay := time.Duration(float64(baseDelay) + jitterAmount) + if finalDelay < 0 { + finalDelay = 0 + } + + return finalDelay +} + +// Check circuit breaker state +func CheckCircuitState(state *AdvancedReconnectionState, config AdvancedReconnectionConfig) CircuitState { + // If circuit is closed, return closed + if state.CircuitState == CircuitClosed { + return CircuitClosed + } + + // If circuit is open, check if reset timeout has passed + if state.CircuitState == CircuitOpen { + if time.Now().After(state.CircuitOpenUntil) { + // Transition to half-open + state.CircuitState = CircuitHalfOpen + return CircuitHalfOpen + } + return CircuitOpen + } + + // If half-open, allow limited testing + return CircuitHalfOpen +} + +// Start reconnection with circuit breaker +func StartReconnectionWithCircuitBreaker(conn *honeybee.Connection, state *AdvancedReconnectionState, config AdvancedReconnectionConfig, errorType string) error { + // Check circuit breaker state + circuitState := CheckCircuitState(state, config) + + // If circuit is open, return circuit breaker error + if circuitState == CircuitOpen { + return errors.New("circuit breaker open") + } + + // For half-open state, only allow limited probe attempts + if circuitState == CircuitHalfOpen { + // Enforce timeout for probe attempts + ctx, cancel := context.WithTimeout(context.Background(), config.ProbeTimeout) + defer cancel() + + // Attempt connection with timeout + // On success, close circuit + // On failure, reopen circuit + } + + // Increment failure count for closed circuit + state.FailureCount++ + + // Record failure details for pattern analysis + state.FailureHistory = append(state.FailureHistory, FailureRecord{ + Time: time.Now(), + ErrorType: errorType, + Message: "Connection failed", + }) + + // Check if failure count exceeds circuit breaker threshold + if state.FailureCount >= config.CircuitBreakerConfig.FailureThreshold { + // Open circuit + state.CircuitState = CircuitOpen + state.CircuitOpenUntil = time.Now().Add(config.CircuitBreakerConfig.ResetTimeout) + return errors.New("circuit breaker opened after repeated failures") + } + + // Calculate delay with jitter + delay := CalculateDelayWithJitter(state, config, errorType) + + // Apply backoff delay + time.Sleep(delay) + + // Use honeybee reconnection logic for actual reconnection attempt + return nil +} + +// Reset circuit breaker after successful connection +func ResetCircuitBreaker(state *AdvancedReconnectionState) { + // Reset failure count to zero + state.FailureCount = 0 + + // Close circuit if open or half-open + state.CircuitState = CircuitClosed + + // Reset circuit open until time + state.CircuitOpenUntil = time.Time{} + + // Increment successful connects counter + state.SuccessfulConnects++ + + // Record success for pattern analysis +} + +// Analyze failure patterns for optimized reconnection +func AnalyzeFailurePatterns(state *AdvancedReconnectionState) map[string]interface{} { + // Analyze timing patterns in failures + // Detect time-of-day patterns + // Identify error type frequencies + // Calculate success/failure ratio + // Return insights that can be used to optimize reconnection strategy + return nil // Placeholder +} +``` + +### 5. Advanced Connection Health + +**Purpose**: Implements sophisticated health monitoring with predictive failure detection. + +**Dependencies**: `honeybee` Health Monitor, `roots-ws` Error types + +```go +// Core types extend honeybee types +type AdvancedHealthConfig struct { + honeybee.HealthConfig + FailureThreshold int // Failures before connection marked unhealthy + DegradationThreshold int // Threshold for degraded status + LatencyThreshold time.Duration // Latency indicating performance issues + SamplingInterval time.Duration // How often to collect metrics + TrendWindowSize int // Number of samples for trend analysis + PredictionEnabled bool // Whether to use predictive modeling +} + +type HealthStatus int + +const ( + StatusHealthy HealthStatus = iota + StatusDegraded // Working but showing signs of issues + StatusUnhealthy // Not working reliably + StatusFailing // Consistently failing +) + +type AdvancedHealthState struct { + honeybee.HealthMonitor + Status HealthStatus + PingsSent int + PongsReceived int + MissedPongs int + PongLatencies []time.Duration // Recent latency measurements + LatencyPercentiles map[string]time.Duration // p50, p90, p99 + LatencyTrend float64 // Direction and magnitude of trend + ErrorRates map[string]float64 // Error rates by category + PredictiveScore float64 // 0.0-1.0, likelihood of failure + HealthHistory []HealthRecord // Time series of health measurements + LastUpdated time.Time +} + +type HealthRecord struct { + Timestamp time.Time + Status HealthStatus + Latency time.Duration + ErrorRate float64 + ResourceUsage float64 // CPU/memory used by connection +} + +// Create default advanced health config +func DefaultAdvancedConfig() AdvancedHealthConfig { + // Start with honeybee default config + baseConfig := honeybee.health.DefaultConfig() + + return AdvancedHealthConfig{ + HealthConfig: baseConfig, + FailureThreshold: 3, + DegradationThreshold: 2, + LatencyThreshold: 500 * time.Millisecond, + SamplingInterval: 10 * time.Second, + TrendWindowSize: 10, + PredictionEnabled: true, + } +} + +// Create new advanced health state +func NewAdvancedState() *AdvancedHealthState { + // Initialize with base health state + baseMonitor := honeybee.health.NewMonitor(nil, nil) + + return &AdvancedHealthState{ + HealthMonitor: *baseMonitor, + Status: StatusHealthy, + PingsSent: 0, + PongsReceived: 0, + MissedPongs: 0, + PongLatencies: make([]time.Duration, 0, 10), + LatencyPercentiles: make(map[string]time.Duration), + LatencyTrend: 0.0, + ErrorRates: make(map[string]float64), + PredictiveScore: 0.0, + HealthHistory: make([]HealthRecord, 0, 100), + LastUpdated: time.Now(), + } +} + +// Start advanced health monitoring +func StartAdvancedMonitoring(conn *websocket.Conn, state *AdvancedHealthState, config AdvancedHealthConfig) { + // Use honeybee health monitoring as base + honeybee.health.StartMonitoring(conn) + + // Add handlers for tracking additional metrics + // Set up latency tracking + // Initialize health score calculation + // Start trend analysis + // Set up periodic health assessment +} + +// Handle pong with latency tracking +func HandlePongWithLatency(state *AdvancedHealthState, sentTime time.Time) *AdvancedHealthState { + // Calculate latency between sent time and now + latency := time.Since(sentTime) + + // Update basic pong handling using honeybee + state.PongsReceived++ + + // Add latency to tracking array (limited size) + state.PongLatencies = append(state.PongLatencies, latency) + if len(state.PongLatencies) > 20 { + // Remove oldest entry + state.PongLatencies = state.PongLatencies[1:] + } + + // Update health history + state.HealthHistory = append(state.HealthHistory, HealthRecord{ + Timestamp: time.Now(), + Latency: latency, + Status: state.Status, + }) + + // Recalculate latency percentiles + calculateLatencyPercentiles(state) + + // Update latency trend + calculateLatencyTrend(state) + + // Update health score based on latency + calculateHealthScore(state, config) + + // Reset missed pongs counter + state.MissedPongs = 0 + + return state +} + +// Handle missed pong with health impact +func HandleMissedPong(state *AdvancedHealthState, config AdvancedHealthConfig) *AdvancedHealthState { + // Increment missed pongs counter + state.MissedPongs++ + + // Update health score based on missed pong + calculateHealthScore(state, config) + + // Update health history + state.HealthHistory = append(state.HealthHistory, HealthRecord{ + Timestamp: time.Now(), + Status: state.Status, + ErrorRate: float64(state.MissedPongs) / float64(state.PingsSent), + }) + + // Check against degradation threshold + if state.MissedPongs >= config.DegradationThreshold && state.Status == StatusHealthy { + state.Status = StatusDegraded + } + + // Check against failure threshold + if state.MissedPongs >= config.FailureThreshold { + state.Status = StatusUnhealthy + } + + return state +} + +// Calculate latency percentiles +func calculateLatencyPercentiles(state *AdvancedHealthState) { + // Need at least a few measurements + if len(state.PongLatencies) < 5 { + return + } + + // Sort latencies for percentile calculation + sortedLatencies := make([]time.Duration, len(state.PongLatencies)) + copy(sortedLatencies, state.PongLatencies) + sort.Slice(sortedLatencies, func(i, j int) bool { + return sortedLatencies[i] < sortedLatencies[j] + }) + + // Calculate p50 (median) + p50Index := len(sortedLatencies) / 2 + state.LatencyPercentiles["p50"] = sortedLatencies[p50Index] + + // Calculate p90 + p90Index := int(float64(len(sortedLatencies)) * 0.9) + if p90Index >= len(sortedLatencies) { + p90Index = len(sortedLatencies) - 1 + } + state.LatencyPercentiles["p90"] = sortedLatencies[p90Index] + + // Calculate p99 + p99Index := int(float64(len(sortedLatencies)) * 0.99) + if p99Index >= len(sortedLatencies) { + p99Index = len(sortedLatencies) - 1 + } + state.LatencyPercentiles["p99"] = sortedLatencies[p99Index] +} + +// Calculate latency trend +func calculateLatencyTrend(state *AdvancedHealthState) { + // Need enough history for trend + if len(state.PongLatencies) < 5 { + state.LatencyTrend = 0 + return + } + + // Simple linear regression on recent latencies + n := float64(len(state.PongLatencies)) + sumX := 0.0 + sumY := 0.0 + sumXY := 0.0 + sumXX := 0.0 + + for i, latency := range state.PongLatencies { + x := float64(i) + y := float64(latency.Milliseconds()) + + sumX += x + sumY += y + sumXY += x * y + sumXX += x * x + } + + // Calculate slope of trendline + slope := (n*sumXY - sumX*sumY) / (n*sumXX - sumX*sumX) + + // Normalize to -1.0 to 1.0 range for consistent reporting + // Positive means increasing latency (degradation) + // Negative means decreasing latency (improvement) + avgLatency := sumY / n + if avgLatency != 0 { + state.LatencyTrend = slope / avgLatency * 10 // Scale factor + } else { + state.LatencyTrend = 0 + } + + // Clamp to range + if state.LatencyTrend > 1.0 { + state.LatencyTrend = 1.0 + } else if state.LatencyTrend < -1.0 { + state.LatencyTrend = -1.0 + } +} + +// Calculate health score based on all metrics +func calculateHealthScore(state *AdvancedHealthState, config AdvancedHealthConfig) { + // Start with perfect score (1.0) + score := 1.0 + + // Reduce score based on missed pongs + if state.PingsSent > 0 { + missRate := float64(state.MissedPongs) / float64(state.PingsSent) + score -= missRate * 0.5 // Up to 50% reduction for all pongs missed + } + + // Reduce score based on latency relative to threshold + if len(state.PongLatencies) > 0 && config.LatencyThreshold > 0 { + avgLatency := average(state.PongLatencies) + if avgLatency > config.LatencyThreshold { + // Reduce score proportionally to how much threshold is exceeded + latencyFactor := float64(avgLatency) / float64(config.LatencyThreshold) + score -= math.Min(0.3, (latencyFactor-1.0)*0.1) // Up to 30% reduction + } + } + + // Reduce score based on negative trend + if state.LatencyTrend > 0 { + score -= state.LatencyTrend * 0.2 // Up to 20% reduction for worsening trend + } + + // Calculate predictive score based on patterns + if config.PredictionEnabled { + state.PredictiveScore = predictFailure(state) + // Incorporate predictive score into health score + score -= state.PredictiveScore * 0.2 // Up to 20% reduction for likely failure + } + + // Update status based on score + if score < 0.5 { + state.Status = StatusUnhealthy + } else if score < 0.8 { + state.Status = StatusDegraded + } else { + state.Status = StatusHealthy + } +} + +// Predict failure likelihood based on historical patterns +func predictFailure(state *AdvancedHealthState) float64 { + // Need enough history for prediction + if len(state.HealthHistory) < 10 { + return 0.0 + } + + // Detect patterns that historically preceded failures + // Look for increasing latency followed by errors + // Identify cyclic patterns in performance + // Check for correlation between resource usage and failures + + // This would use more sophisticated algorithms in production + // For now, just base on trend and recent errors + + // Simplified predictor: combine trend with recent error rate + recentErrorRate := 0.0 + if state.PingsSent > 0 { + recentErrorRate = float64(state.MissedPongs) / float64(state.PingsSent) + } + + // Weight trend more heavily if it's negative (worsening) + trendContribution := math.Max(0, state.LatencyTrend) * 0.6 + + return trendContribution + recentErrorRate*0.4 // Combined score +} + +// Calculate average of duration slice +func average(durations []time.Duration) time.Duration { + if len(durations) == 0 { + return 0 + } + + var sum time.Duration + for _, d := range durations { + sum += d + } + + return sum / time.Duration(len(durations)) +} + +// Check if connection is degraded but still working +func IsDegraded(state *AdvancedHealthState) bool { + return state.Status == StatusDegraded +} + +// Check connection health against threshold +func IsHealthyWithThreshold(state *AdvancedHealthState, threshold float64) bool { + // Calculate current health score + score := 1.0 - (float64(state.MissedPongs) / float64(max(1, state.PingsSent))) + + // Account for latency + if len(state.PongLatencies) > 0 { + avgLatency := average(state.PongLatencies) + if avgLatency > 300*time.Millisecond { + // Reduce score for high latency + score -= 0.2 + } + } + + return score >= threshold +} +``` + +### 6. In-Flight Operations + +**Purpose**: Tracks and manages ongoing operations with cancellation support. + +**Dependencies**: `roots-ws` Error types + +```go +// Core types +type OperationType string + +const ( + OperationSubscription OperationType = "subscription" + OperationPublication OperationType = "publication" + OperationConnection OperationType = "connection" + OperationQuery OperationType = "query" +) + +type OperationStatus int + +const ( + StatusPending OperationStatus = iota + StatusInProgress + StatusCompleted + StatusFailed + StatusCancelled + StatusTimedOut +) + +type Operation struct { + ID string + Type OperationType + StartTime time.Time + EndTime time.Time + Status OperationStatus + Cancelled bool + Done chan struct{} + Result interface{} + Error error + Priority int // Priority level for resource allocation + Metadata map[string]interface{} + mutex sync.Mutex +} + +type OperationRegistry struct { + Operations map[string]*Operation + Mutex sync.RWMutex + Stats OperationStats +} + +type OperationStats struct { + TotalOperations int64 + CompletedOperations int64 + FailedOperations int64 + CancelledOperations int64 + TimedOutOperations int64 + AvgCompletionTime time.Duration + OperationsByType map[OperationType]int64 +} + +// Create new operation registry +func NewRegistry() *OperationRegistry { + return &OperationRegistry{ + Operations: make(map[string]*Operation), + Mutex: sync.RWMutex{}, + Stats: OperationStats{ + OperationsByType: make(map[OperationType]int64), + }, + } +} + +// Generate unique operation ID +func generateOperationID() string { + // Generate UUID or other unique identifier + return fmt.Sprintf("op-%d-%s", time.Now().UnixNano(), generateRandomString(6)) +} + +// Create and register new operation +func RegisterOperation(registry *OperationRegistry, opType OperationType, priority int) *Operation { + // Generate unique operation ID + opID := generateOperationID() + + // Create operation object + op := &Operation{ + ID: opID, + Type: opType, + StartTime: time.Now(), + Status: StatusPending, + Done: make(chan struct{}), + Priority: priority, + Metadata: make(map[string]interface{}), + } + + // Acquire write lock + registry.Mutex.Lock() + + // Add operation to registry + registry.Operations[opID] = op + + // Update stats + registry.Stats.TotalOperations++ + registry.Stats.OperationsByType[opType]++ + + // Release lock + registry.Mutex.Unlock() + + return op +} + +// Mark operation as in progress +func StartOperation(registry *OperationRegistry, opID string) error { + // Acquire read lock to get operation + registry.Mutex.RLock() + op, exists := registry.Operations[opID] + registry.Mutex.RUnlock() + + if !exists { + return errors.New("operation not found") + } + + // Update operation status + op.mutex.Lock() + defer op.mutex.Unlock() + + if op.Status != StatusPending { + return errors.New("operation not in pending state") + } + + op.Status = StatusInProgress + return nil +} + +// Complete operation +func CompleteOperation(registry *OperationRegistry, opID string, result interface{}, err error) error { + // Acquire read lock to get operation + registry.Mutex.RLock() + op, exists := registry.Operations[opID] + registry.Mutex.RUnlock() + + if !exists { + return errors.New("operation not found") + } + + // Update operation + op.mutex.Lock() + + // Set result and error + op.Result = result + op.Error = err + op.EndTime = time.Now() + + // Update status based on error + if err != nil { + op.Status = StatusFailed + } else { + op.Status = StatusCompleted + } + + // Signal completion via done channel + close(op.Done) + + op.mutex.Unlock() + + // Update registry stats with write lock + registry.Mutex.Lock() + defer registry.Mutex.Unlock() + + // Update stats based on outcome + if err != nil { + registry.Stats.FailedOperations++ + } else { + registry.Stats.CompletedOperations++ + + // Update average completion time + completionTime := op.EndTime.Sub(op.StartTime) + + if registry.Stats.CompletedOperations > 1 { + // Calculate new average + oldWeight := float64(registry.Stats.CompletedOperations-1) / float64(registry.Stats.CompletedOperations) + newWeight := 1.0 / float64(registry.Stats.CompletedOperations) + + oldAvg := float64(registry.Stats.AvgCompletionTime.Nanoseconds()) + newTime := float64(completionTime.Nanoseconds()) + + newAvg := oldAvg*oldWeight + newTime*newWeight + registry.Stats.AvgCompletionTime = time.Duration(int64(newAvg)) + } else { + // First completion, just set the value + registry.Stats.AvgCompletionTime = completionTime + } + } + + return nil +} + +// Cancel operation +func CancelOperation(registry *OperationRegistry, opID string) error { + // Acquire read lock to get operation + registry.Mutex.RLock() + op, exists := registry.Operations[opID] + registry.Mutex.RUnlock() + + if !exists { + return errors.New("operation not found") + } + + // Update operation + op.mutex.Lock() + + // Check if already completed or cancelled + if op.Status == StatusCompleted || op.Status == StatusFailed || op.Status == StatusCancelled { + op.mutex.Unlock() + return errors.New("operation already completed or cancelled") + } + + // Mark as cancelled + op.Cancelled = true + op.Status = StatusCancelled + op.EndTime = time.Now() + op.Error = errors.New("operation cancelled") + + // Signal cancellation via done channel + close(op.Done) + + op.mutex.Unlock() + + // Update registry stats + registry.Mutex.Lock() + registry.Stats.CancelledOperations++ + registry.Mutex.Unlock() + + return nil +} + +// Wait for operation completion with timeout +func WaitForCompletion(op *Operation, timeout time.Duration) (interface{}, error) { + // Set up timer for timeout + timer := time.NewTimer(timeout) + defer timer.Stop() + + // Wait for completion or timeout + select { + case <-op.Done: + // Operation completed or cancelled + op.mutex.Lock() + defer op.mutex.Unlock() + + if op.Cancelled { + return nil, errors.New("operation cancelled") + } + + return op.Result, op.Error + + case <-timer.C: + // Timeout occurred + op.mutex.Lock() + if op.Status == StatusPending || op.Status == StatusInProgress { + op.Status = StatusTimedOut + } + op.mutex.Unlock() + + return nil, errors.New("operation timeout") + } +} + +// Cancel all operations of specific type +func CancelOperationsByType(registry *OperationRegistry, opType OperationType) int { + // Acquire read lock to get operations + registry.Mutex.RLock() + + // Find all operations of specified type + opsToCancel := make([]string, 0) + for id, op := range registry.Operations { + if op.Type == opType { + opsToCancel = append(opsToCancel, id) + } + } + + registry.Mutex.RUnlock() + + // Cancel each operation + cancelCount := 0 + for _, id := range opsToCancel { + err := CancelOperation(registry, id) + if err == nil { + cancelCount++ + } + } + + return cancelCount +} + +// Cancel all operations +func CancelAllOperations(registry *OperationRegistry) int { + // Acquire read lock to get all operation IDs + registry.Mutex.RLock() + + opsToCancel := make([]string, 0, len(registry.Operations)) + for id := range registry.Operations { + opsToCancel = append(opsToCancel, id) + } + + registry.Mutex.RUnlock() + + // Cancel each operation + cancelCount := 0 + for _, id := range opsToCancel { + err := CancelOperation(registry, id) + if err == nil { + cancelCount++ + } + } + + return cancelCount +} + +// Get operation statistics +func GetOperationStats(registry *OperationRegistry) OperationStats { + registry.Mutex.RLock() + defer registry.Mutex.RUnlock() + + // Return copy of stats to prevent race conditions + statsCopy := registry.Stats + + // Copy map to prevent race conditions + statsCopy.OperationsByType = make(map[OperationType]int64) + for k, v := range registry.Stats.OperationsByType { + statsCopy.OperationsByType[k] = v + } + + return statsCopy +} + +// Clean up completed operations +func CleanupCompletedOperations(registry *OperationRegistry, maxAge time.Duration) int { + now := time.Now() + cutoffTime := now.Add(-maxAge) + + registry.Mutex.Lock() + defer registry.Mutex.Unlock() + + removeCount := 0 + for id, op := range registry.Operations { + if op.EndTime.Before(cutoffTime) && + (op.Status == StatusCompleted || op.Status == StatusFailed || + op.Status == StatusCancelled || op.Status == StatusTimedOut) { + + delete(registry.Operations, id) + removeCount++ + } + } + + return removeCount +} +``` + +### 7. Compression + +**Purpose**: Compress WebSocket messages to reduce bandwidth with memory optimization. + +**Dependencies**: `roots-ws` Error types + +```go +// Core types +type CompressionConfig struct { + Enabled bool + Level int // 0-9, higher means more compression + ContextTakeover bool // Whether to reuse compression context + Threshold int // Messages smaller than this won't be compressed + MemoryLimit int // Max memory to use for compression + CacheTimeout time.Duration // How long to cache prepared messages +} + +type CompressionStats struct { + BytesBeforeCompression int64 + BytesAfterCompression int64 + MessagesCached int64 + CacheHits int64 + CacheMisses int64 + CompressionTime time.Duration + CompressionRatio float64 // Calculated ratio +} + +type MessageCache struct { + items map[uint64]*CachedMessage + mutex sync.RWMutex + stats CompressionStats + bufferPool *sync.Pool +} + +type CachedMessage struct { + Prepared *websocket.PreparedMessage + Hash uint64 + Size int + LastAccess time.Time + Expiration time.Time +} + +// Create default compression config +func DefaultConfig() CompressionConfig { + return CompressionConfig{ + Enabled: true, + Level: 6, // Medium compression level + ContextTakeover: false, // Disable context takeover to reduce memory + Threshold: 256, // Don't compress small messages + MemoryLimit: 64 * 1024 * 1024, // 64MB memory limit + CacheTimeout: 5 * time.Second, // Cache for 5 seconds + } +} + +// Create new message cache for compression efficiency +func NewMessageCache(config CompressionConfig) *MessageCache { + cache := &MessageCache{ + items: make(map[uint64]*CachedMessage), + bufferPool: &sync.Pool{ + New: func() interface{} { + // Create buffer with reasonable initial size + return bytes.NewBuffer(make([]byte, 0, 4096)) + }, + }, + } + + // Start cleanup goroutine + go cache.cleanup(config.CacheTimeout) + + return cache +} + +// Generate hash for message content +func hashMessage(data []byte) uint64 { + h := fnv.New64a() + h.Write(data) + return h.Sum64() +} + +// Set up compression for WebSocket +func ConfigureConnection(conn *websocket.Conn, config CompressionConfig) error { + // Skip if compression not enabled + if !config.Enabled { + return nil + } + + // Configure WebSocket compression parameters + parameters := websocket.CompressionOptions{ + Level: config.Level, + CompressionLevel: config.Level, // Same value used for both options + ClientNoContextTakeover: !config.ContextTakeover, + ServerNoContextTakeover: !config.ContextTakeover, + } + + // Apply compression configuration + return conn.EnableWriteCompression(parameters) +} + +// Get or create prepared message with memory optimizations +func GetOrCreatePrepared(cache *MessageCache, messageType int, data []byte, config CompressionConfig) (*websocket.PreparedMessage, error) { + // Don't compress small messages + if len(data) < config.Threshold || !config.Enabled { + // Still create PreparedMessage for consistent API + return websocket.NewPreparedMessage(messageType, data) + } + + // Generate hash of message content + hash := hashMessage(data) + + // Check cache first + cache.mutex.RLock() + if cachedMsg, exists := cache.items[hash]; exists { + // Update last access time + cachedMsg.LastAccess = time.Now() + cache.mutex.RUnlock() + + // Update stats + atomic.AddInt64(&cache.stats.CacheHits, 1) + + return cachedMsg.Prepared, nil + } + cache.mutex.RUnlock() + + // Cache miss, need to compress + atomic.AddInt64(&cache.stats.CacheMisses, 1) + + // Track compression time + startTime := time.Now() + + // Get buffer from pool + buf := cache.bufferPool.Get().(*bytes.Buffer) + buf.Reset() + defer cache.bufferPool.Put(buf) + + // Create flate writer with memory limit awareness + flateWriter, err := flate.NewWriter(buf, config.Level) + if err != nil { + return nil, err + } + + // Write data to compressor + bytesWritten, err := flateWriter.Write(data) + if err != nil { + return nil, err + } + + // Close flate writer to flush data + err = flateWriter.Close() + if err != nil { + return nil, err + } + + // Track compression stats + atomic.AddInt64(&cache.stats.BytesBeforeCompression, int64(len(data))) + atomic.AddInt64(&cache.stats.BytesAfterCompression, int64(buf.Len())) + + // Check if compression actually helped + var msgData []byte + if buf.Len() < len(data) { + // Use compressed data + msgData = make([]byte, buf.Len()) + copy(msgData, buf.Bytes()) + } else { + // Compression didn't help, use original + msgData = data + } + + // Create prepared message + prepared, err := websocket.NewPreparedMessage(messageType, msgData) + if err != nil { + return nil, err + } + + // Record compression time + compressionTime := time.Since(startTime) + atomic.AddInt64((*int64)(&cache.stats.CompressionTime), int64(compressionTime)) + + // Update cache + expirationTime := time.Now().Add(config.CacheTimeout) + cachedMsg := &CachedMessage{ + Prepared: prepared, + Hash: hash, + Size: len(data), + LastAccess: time.Now(), + Expiration: expirationTime, + } + + // Add to cache + cache.mutex.Lock() + cache.items[hash] = cachedMsg + cache.mutex.Unlock() + + // Track cache stats + atomic.AddInt64(&cache.stats.MessagesCached, 1) + + // Schedule automatic cleanup + time.AfterFunc(config.CacheTimeout, func() { + cache.mutex.Lock() + defer cache.mutex.Unlock() + + // Only remove if still the same item (hash collision possible) + if existing, exists := cache.items[hash]; exists && existing == cachedMsg { + delete(cache.items, hash) + } + }) + + // Update compression ratio + totalBefore := atomic.LoadInt64(&cache.stats.BytesBeforeCompression) + totalAfter := atomic.LoadInt64(&cache.stats.BytesAfterCompression) + if totalBefore > 0 { + ratio := float64(totalAfter) / float64(totalBefore) + atomic.StoreUint64((*uint64)(&cache.stats.CompressionRatio), math.Float64bits(ratio)) + } + + return prepared, nil +} + +// Cleanup expired cache entries +func (cache *MessageCache) cleanup(interval time.Duration) { + ticker := time.NewTicker(interval / 2) + defer ticker.Stop() + + for range ticker.C { + now := time.Now() + + cache.mutex.Lock() + for hash, msg := range cache.items { + if now.After(msg.Expiration) { + delete(cache.items, hash) + } + } + cache.mutex.Unlock() + } +} + +// Get compression statistics +func (cache *MessageCache) GetStats() CompressionStats { + return cache.stats +} + +// Reset cache +func (cache *MessageCache) Reset() { + cache.mutex.Lock() + defer cache.mutex.Unlock() + + // Clear all items + cache.items = make(map[uint64]*CachedMessage) + + // Reset stats + cache.stats = CompressionStats{} +} +``` + +### 8. Rate Limiting + +**Purpose**: Prevent flooding with token bucket algorithm and adaptive limits. + +**Dependencies**: `roots-ws` Error types + +```go +// Core types +type RateLimitConfig struct { + Enabled bool + MessagesPerSecond float64 // Default rate limit + BurstSize int // Maximum burst allowed + AdaptiveMode bool // Whether to adjust limits dynamically + MinRate float64 // Minimum rate allowed + MaxRate float64 // Maximum rate allowed + CategoryLimits map[string]CategoryLimit // Different limits for message types +} + +type CategoryLimit struct { + MessagesPerSecond float64 + BurstSize int + Priority int // Priority level (higher = more important) +} + +type RateLimiter struct { + config RateLimitConfig + defaultBucket *TokenBucket + categoryBuckets map[string]*TokenBucket + lastAdaptation time.Time + stats RateLimitStats + mutex sync.RWMutex +} + +type TokenBucket struct { + Rate float64 // Tokens per second + MaxTokens int // Maximum capacity + AvailableTokens float64 // Current token count + LastRefill time.Time // Last time tokens were added + mutex sync.Mutex +} + +type RateLimitStats struct { + AllowedMessages int64 + DelayedMessages int64 + RejectedMessages int64 + CurrentRate float64 + AdaptationEvents int64 + ThrottleEvents map[string]int64 +} + +// Create default rate limit config +func DefaultConfig() RateLimitConfig { + return RateLimitConfig{ + Enabled: true, + MessagesPerSecond: 10, + BurstSize: 30, + AdaptiveMode: true, + MinRate: 1.0, + MaxRate: 100.0, + CategoryLimits: map[string]CategoryLimit{ + "control": { + MessagesPerSecond: 50, + BurstSize: 100, + Priority: 10, + }, + "event": { + MessagesPerSecond: 5, + BurstSize: 20, + Priority: 5, + }, + "subscription": { + MessagesPerSecond: 2, + BurstSize: 10, + Priority: 3, + }, + }, + } +} + +// Create new rate limiter +func NewLimiter(config RateLimitConfig) *RateLimiter { + // Create default token bucket + defaultBucket := &TokenBucket{ + Rate: config.MessagesPerSecond, + MaxTokens: config.BurstSize, + AvailableTokens: float64(config.BurstSize), // Start with full bucket + LastRefill: time.Now(), + } + + // Create category buckets + categoryBuckets := make(map[string]*TokenBucket) + for category, limit := range config.CategoryLimits { + categoryBuckets[category] = &TokenBucket{ + Rate: limit.MessagesPerSecond, + MaxTokens: limit.BurstSize, + AvailableTokens: float64(limit.BurstSize), // Start with full bucket + LastRefill: time.Now(), + } + } + + return &RateLimiter{ + config: config, + defaultBucket: defaultBucket, + categoryBuckets: categoryBuckets, + lastAdaptation: time.Now(), + stats: RateLimitStats{ + ThrottleEvents: make(map[string]int64), + }, + } +} + +// Check if operation is allowed by rate limit +func (limiter *RateLimiter) Allow(category string) bool { + // Return true if rate limiting disabled + if !limiter.config.Enabled { + return true + } + + // Get appropriate bucket + bucket := limiter.getBucket(category) + + // Refill tokens and check availability + bucket.mutex.Lock() + defer bucket.mutex.Unlock() + + // Refill tokens based on time elapsed + limiter.refillTokens(bucket) + + // Check if bucket has at least one token + if bucket.AvailableTokens >= 1.0 { + // Consume token + bucket.AvailableTokens -= 1.0 + + // Update stats + atomic.AddInt64(&limiter.stats.AllowedMessages, 1) + + return true + } + + // No tokens available + atomic.AddInt64(&limiter.stats.RejectedMessages, 1) + + // Track category-specific throttling + limiter.mutex.Lock() + limiter.stats.ThrottleEvents[category]++ + limiter.mutex.Unlock() + + return false +} + +// Wait until operation is allowed +func (limiter *RateLimiter) Wait(ctx context.Context, category string) error { + // If rate limiting disabled, return immediately + if !limiter.config.Enabled { + return nil + } + + // Get appropriate bucket + bucket := limiter.getBucket(category) + + for { + // Check if context cancelled + if ctx.Err() != nil { + return ctx.Err() + } + + // Try to allow operation + bucket.mutex.Lock() + + // Refill tokens + limiter.refillTokens(bucket) + + if bucket.AvailableTokens >= 1.0 { + // Consume token + bucket.AvailableTokens -= 1.0 + + // Update stats + atomic.AddInt64(&limiter.stats.AllowedMessages, 1) + + bucket.mutex.Unlock() + return nil + } + + // Calculate time until next token available + timeToNextToken := time.Duration(1.0/bucket.Rate * float64(time.Second)) + + bucket.mutex.Unlock() + + // Update delayed stats + atomic.AddInt64(&limiter.stats.DelayedMessages, 1) + + // Wait for token to become available + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(timeToNextToken): + // Try again + } + } +} + +// Refill tokens based on elapsed time +func (limiter *RateLimiter) refillTokens(bucket *TokenBucket) { + now := time.Now() + elapsed := now.Sub(bucket.LastRefill).Seconds() + + // Calculate tokens to add + tokensToAdd := elapsed * bucket.Rate + + // Add tokens to bucket (up to max) + bucket.AvailableTokens = math.Min(float64(bucket.MaxTokens), bucket.AvailableTokens+tokensToAdd) + + // Update last refill time + bucket.LastRefill = now +} + +// Get appropriate token bucket for category +func (limiter *RateLimiter) getBucket(category string) *TokenBucket { + limiter.mutex.RLock() + defer limiter.mutex.RUnlock() + + if bucket, exists := limiter.categoryBuckets[category]; exists { + return bucket + } + + return limiter.defaultBucket +} + +// Adapt rate limits based on system conditions +func (limiter *RateLimiter) AdaptRateLimits(queueUtilization float64) { + // Skip if adaptive mode disabled + if !limiter.config.Enabled || !limiter.config.AdaptiveMode { + return + } + + // Minimum time between adaptations + if time.Since(limiter.lastAdaptation) < 5*time.Second { + return + } + + limiter.mutex.Lock() + defer limiter.mutex.Unlock() + + // Current rate + currentRate := limiter.defaultBucket.Rate + newRate := currentRate + + // Adjust based on queue utilization + if queueUtilization > 0.8 { + // Queue getting full, reduce rate + newRate = math.Max(limiter.config.MinRate, currentRate*0.8) + } else if queueUtilization < 0.2 { + // Queue mostly empty, increase rate + newRate = math.Min(limiter.config.MaxRate, currentRate*1.2) + } + + // Only update if significant change + if math.Abs(newRate-currentRate)/currentRate > 0.05 { + // Update default bucket rate + limiter.defaultBucket.mutex.Lock() + limiter.defaultBucket.Rate = newRate + limiter.defaultBucket.mutex.Unlock() + + // Track adaptation + atomic.AddInt64(&limiter.stats.AdaptationEvents, 1) + atomic.StoreUint64((*uint64)(&limiter.stats.CurrentRate), math.Float64bits(newRate)) + + // Update timestamp + limiter.lastAdaptation = time.Now() + } +} + +// Get rate limit statistics +func (limiter *RateLimiter) GetStats() RateLimitStats { + limiter.mutex.RLock() + defer limiter.mutex.RUnlock() + + // Create copy of stats to prevent race conditions + statsCopy := limiter.stats + + // Copy map to prevent race conditions + statsCopy.ThrottleEvents = make(map[string]int64) + for k, v := range limiter.stats.ThrottleEvents { + statsCopy.ThrottleEvents[k] = v + } + + return statsCopy +} +``` + +### 9. Message Replay + +**Purpose**: Buffers messages for replay after reconnection with intelligent prioritization. + +**Dependencies**: `roots-ws` Error types, `honeybee` Connection + +```go +// Core types +type ReplayConfig struct { + Enabled bool + MaxMessages int // Maximum messages to buffer + MaxAge time.Duration // Maximum message age to retain + MaxAttempts int // Maximum replay attempts per message + MaxBatchSize int // Maximum messages to replay at once + BatchDelay time.Duration // Delay between replay batches +} + +type MessageBuffer struct { + Config ReplayConfig + Messages []BufferedMessage + Mutex sync.RWMutex + Stats ReplayStats +} + +type BufferedMessage struct { + ID string + Content []byte + Timestamp time.Time + Attempts int + Priority int // Higher priority = replay first + MessageType string // Used for selective replay +} + +type ReplayStats struct { + MessagesBuffered int64 + MessagesReplayed int64 + FailedReplays int64 + DroppedMessages int64 + OldestMessage time.Time +} + +// Create default replay config +func DefaultConfig() ReplayConfig { + return ReplayConfig{ + Enabled: true, + MaxMessages: 1000, + MaxAge: 5 * time.Minute, + MaxAttempts: 3, + MaxBatchSize: 50, + BatchDelay: 100 * time.Millisecond, + } +} + +// Create new message buffer +func NewBuffer(config ReplayConfig) *MessageBuffer { + return &MessageBuffer{ + Config: config, + Messages: make([]BufferedMessage, 0, config.MaxMessages/10), // Start smaller, grow as needed + Stats: ReplayStats{}, + } +} + +// Add message to buffer with priority +func BufferMessageWithPriority(buffer *MessageBuffer, msg []byte, msgType string, priority int) string { + // Skip if buffering disabled + if !buffer.Config.Enabled { + return "" + } + + // Acquire write lock + buffer.Mutex.Lock() + defer buffer.Mutex.Unlock() + + // Generate unique ID for message + id := generateMessageID() + + // Create buffered message + bufferedMsg := BufferedMessage{ + ID: id, + Content: cloneBytes(msg), // Clone bytes to avoid external modification + Timestamp: time.Now(), + Attempts: 0, + Priority: priority, + MessageType: msgType, + } + + // Check if buffer is full + if len(buffer.Messages) >= buffer.Config.MaxMessages { + // Find lowest priority, oldest message to remove + oldestIdx := 0 + lowestPriority := math.MaxInt32 + oldestTime := time.Now() + + for i, m := range buffer.Messages { + // Lower priority or same priority but older + if m.Priority < lowestPriority || + (m.Priority == lowestPriority && m.Timestamp.Before(oldestTime)) { + lowestPriority = m.Priority + oldestTime = m.Timestamp + oldestIdx = i + } + } + + // Remove lowest priority message + buffer.Messages = append(buffer.Messages[:oldestIdx], buffer.Messages[oldestIdx+1:]...) + + // Update stats + buffer.Stats.DroppedMessages++ + } + + // Add new message to buffer + buffer.Messages = append(buffer.Messages, bufferedMsg) + + // Update stats + buffer.Stats.MessagesBuffered++ + if buffer.Stats.OldestMessage.IsZero() || bufferedMsg.Timestamp.Before(buffer.Stats.OldestMessage) { + buffer.Stats.OldestMessage = bufferedMsg.Timestamp + } + + return id +} + +// Clone byte slice to avoid external modification +func cloneBytes(data []byte) []byte { + clone := make([]byte, len(data)) + copy(clone, data) + return clone +} + +// Generate unique message ID +func generateMessageID() string { + return fmt.Sprintf("msg-%d-%s", time.Now().UnixNano(), generateRandomString(6)) +} + +// Mark message as sent +func MarkSent(buffer *MessageBuffer, id string) { + // Acquire write lock + buffer.Mutex.Lock() + defer buffer.Mutex.Unlock() + + // Find message with matching ID + for i, msg := range buffer.Messages { + if msg.ID == id { + // Remove message from buffer (successful send) + buffer.Messages = append(buffer.Messages[:i], buffer.Messages[i+1:]...) + break + } + } + + // Update stats if oldest message removed + if len(buffer.Messages) > 0 { + // Find new oldest + oldest := time.Now() + for _, msg := range buffer.Messages { + if msg.Timestamp.Before(oldest) { + oldest = msg.Timestamp + } + } + buffer.Stats.OldestMessage = oldest + } else { + buffer.Stats.OldestMessage = time.Time{} + } +} + +// Get messages to replay, ordered by priority +func GetPendingMessages(buffer *MessageBuffer, msgType string) []BufferedMessage { + // Acquire read lock + buffer.Mutex.RLock() + defer buffer.Mutex.RUnlock() + + now := time.Now() + maxAge := now.Add(-buffer.Config.MaxAge) + + // Filter messages by max attempts, age, and message type + pending := make([]BufferedMessage, 0, len(buffer.Messages)) + for _, msg := range buffer.Messages { + if msg.Attempts < buffer.Config.MaxAttempts && + msg.Timestamp.After(maxAge) && + (msgType == "" || msg.MessageType == msgType) { + pending = append(pending, msg) + } + } + + // Sort by priority (high to low) and then by age (oldest first) + sort.Slice(pending, func(i, j int) bool { + if pending[i].Priority != pending[j].Priority { + return pending[i].Priority > pending[j].Priority + } + return pending[i].Timestamp.Before(pending[j].Timestamp) + }) + + return pending +} + +// Record replay attempt +func RecordAttempt(buffer *MessageBuffer, id string) { + // Acquire write lock + buffer.Mutex.Lock() + defer buffer.Mutex.Unlock() + + // Find message with matching ID + for i := range buffer.Messages { + if buffer.Messages[i].ID == id { + // Increment attempt counter + buffer.Messages[i].Attempts++ + break + } + } +} + +// Clean expired messages +func CleanExpired(buffer *MessageBuffer) int { + // Acquire write lock + buffer.Mutex.Lock() + defer buffer.Mutex.Unlock() + + // Calculate expiration timestamp + now := time.Now() + maxAge := now.Add(-buffer.Config.MaxAge) + + // Filter out expired messages + newMessages := make([]BufferedMessage, 0, len(buffer.Messages)) + removed := 0 + + for _, msg := range buffer.Messages { + // Keep message if not expired and not over max attempts + if msg.Timestamp.After(maxAge) && msg.Attempts < buffer.Config.MaxAttempts { + newMessages = append(newMessages, msg) + } else { + removed++ + buffer.Stats.DroppedMessages++ + } + } + + // Update buffer + buffer.Messages = newMessages + + // Update oldest message timestamp + if len(newMessages) > 0 { + oldest := now + for _, msg := range newMessages { + if msg.Timestamp.Before(oldest) { + oldest = msg.Timestamp + } + } + buffer.Stats.OldestMessage = oldest + } else { + buffer.Stats.OldestMessage = time.Time{} + } + + return removed +} + +// Replay pending messages to connection with batching and rate limiting +func ReplayMessages(buffer *MessageBuffer, conn *honeybee.Connection, msgType string) []error { + // Skip if buffering disabled + if !buffer.Config.Enabled { + return nil + } + + // Get pending messages + pending := GetPendingMessages(buffer, msgType) + + // No messages to replay + if len(pending) == 0 { + return nil + } + + // Collect errors + errors := make([]error, 0) + + // Process in batches + for i := 0; i < len(pending); i += buffer.Config.MaxBatchSize { + // Calculate end of batch + end := i + buffer.Config.MaxBatchSize + if end > len(pending) { + end = len(pending) + } + + // Process batch + batch := pending[i:end] + for _, msg := range batch { + // Send message to connection + select { + case conn.OutChan <- msg.Content: + // Record successful replay + atomic.AddInt64(&buffer.Stats.MessagesReplayed, 1) + + // Mark as sent + MarkSent(buffer, msg.ID) + + default: + // Channel full or closed + err := errors.New("replay failed: connection channel full or closed") + errors = append(errors, err) + + // Record attempt + RecordAttempt(buffer, msg.ID) + atomic.AddInt64(&buffer.Stats.FailedReplays, 1) + } + } + + // Delay between batches to prevent overwhelming the connection + if end < len(pending) { + time.Sleep(buffer.Config.BatchDelay) + } + } + + return errors +} + +// Get replay statistics +func GetReplayStats(buffer *MessageBuffer) ReplayStats { + // Acquire read lock + buffer.Mutex.RLock() + defer buffer.Mutex.RUnlock() + + // Return copy of stats + return buffer.Stats +} +``` + +### 10. Connection Statistics + +**Purpose**: Track connection usage metrics with predictive analytics. + +**Dependencies**: `honeybee` Connection + +```go +// Core types +type ConnectionStats struct { + Connected time.Time + MessagesSent int64 + MessagesReceived int64 + BytesSent int64 + BytesReceived int64 + Errors int64 + AvgLatency time.Duration + LastMessageSent time.Time + LastMessageReceived time.Time + + // Advanced metrics + ErrorRates []float64 // Error rates over time periods (1m, 5m, 15m) + ThroughputRates []float64 // Messages per second over time periods + LatencyPercentiles []time.Duration // P50, P90, P99 latencies + ReconnectionCount int + HealthScoreHistory []float64 // Health scores over time + MessageTypeDistribution map[string]int64 // Count by message type + + // Quality metrics + QualityScore float64 // 0.0-1.0 overall quality rating + Stability float64 // Connection stability (0.0-1.0) + Performance float64 // Performance rating (0.0-1.0) + Reliability float64 // Reliability rating (0.0-1.0) + + // Resource utilization + MemoryUsage int64 // Bytes used by connection + GoroutineCount int // Goroutines associated with connection + + // Private tracking + lastCalculation time.Time + latencyHistory []time.Duration + mutex sync.RWMutex +} + +// Create new connection stats +func New() *ConnectionStats { + return &ConnectionStats{ + Connected: time.Now(), + ErrorRates: make([]float64, 3), // 1m, 5m, 15m rates + ThroughputRates: make([]float64, 3), // 1m, 5m, 15m rates + LatencyPercentiles: make([]time.Duration, 3), // P50, P90, P99 + HealthScoreHistory: make([]float64, 60), // Last 60 measurements + MessageTypeDistribution: make(map[string]int64), // Counts by message type + latencyHistory: make([]time.Duration, 0, 100), + lastCalculation: time.Now(), + QualityScore: 1.0, // Start with perfect score + Stability: 1.0, + Performance: 1.0, + Reliability: 1.0, + } +} + +// Record message sent with type tracking +func RecordMessageSent(stats *ConnectionStats, size int, messageType string) { + // Atomically increment messages sent counter + atomic.AddInt64(&stats.MessagesSent, 1) + + // Atomically add size to bytes sent counter + atomic.AddInt64(&stats.BytesSent, int64(size)) + + // Update last message sent timestamp + stats.mutex.Lock() + stats.LastMessageSent = time.Now() + + // Update message type distribution + stats.MessageTypeDistribution[messageType]++ + stats.mutex.Unlock() + + // Trigger rate calculation if needed + if time.Since(stats.lastCalculation) > 10*time.Second { + go stats.calculateRates() + } +} + +// Record message received with type tracking +func RecordMessageReceived(stats *ConnectionStats, size int, messageType string) { + // Atomically increment messages received counter + atomic.AddInt64(&stats.MessagesReceived, 1) + + // Atomically add size to bytes received counter + atomic.AddInt64(&stats.BytesReceived, int64(size)) + + // Update last message received timestamp + stats.mutex.Lock() + stats.LastMessageReceived = time.Now() + + // Update message type distribution + stats.MessageTypeDistribution[messageType]++ + stats.mutex.Unlock() + + // Trigger rate calculation if needed + if time.Since(stats.lastCalculation) > 10*time.Second { + go stats.calculateRates() + } +} + +// Record error with categorization +func RecordError(stats *ConnectionStats, errorType string) { + // Atomically increment error counter + atomic.AddInt64(&stats.Errors, 1) + + // Update error rate calculations + go stats.calculateRates() + + // Update quality metrics + go stats.updateQualityMetrics() +} + +// Record latency measurement +func RecordLatency(stats *ConnectionStats, latency time.Duration) { + stats.mutex.Lock() + + // Add to latency history + stats.latencyHistory = append(stats.latencyHistory, latency) + if len(stats.latencyHistory) > 100 { + // Keep history bounded + stats.latencyHistory = stats.latencyHistory[1:] + } + + // Calculate exponential moving average for average latency + if stats.AvgLatency == 0 { + stats.AvgLatency = latency + } else { + // 95% old value, 5% new value + oldWeight := 0.95 + newWeight := 0.05 + + oldAvg := float64(stats.AvgLatency.Nanoseconds()) + newLatency := float64(latency.Nanoseconds()) + + newAvg := oldAvg*oldWeight + newLatency*newWeight + stats.AvgLatency = time.Duration(int64(newAvg)) + } + + stats.mutex.Unlock() + + // Recalculate percentiles if we have enough data + if len(stats.latencyHistory) >= 10 { + go stats.calculateLatencyPercentiles() + } + + // Update quality metrics + go stats.updateQualityMetrics() +} + +// Record reconnection event +func RecordReconnection(stats *ConnectionStats) { + stats.mutex.Lock() + + // Increment reconnection counter + stats.ReconnectionCount++ + + // Reset certain metrics after reconnection + stats.lastCalculation = time.Now() + + stats.mutex.Unlock() + + // Update quality metrics + go stats.updateQualityMetrics() +} + +// Record health score +func RecordHealthScore(stats *ConnectionStats, score float64) { + stats.mutex.Lock() + + // Shift history and add new score + copy(stats.HealthScoreHistory[1:], stats.HealthScoreHistory) + stats.HealthScoreHistory[0] = score + + stats.mutex.Unlock() + + // Update quality metrics + go stats.updateQualityMetrics() +} + +// Calculate throughput and error rates +func (stats *ConnectionStats) calculateRates() { + stats.mutex.Lock() + defer stats.mutex.Unlock() + + now := time.Now() + elapsed := now.Sub(stats.lastCalculation).Seconds() + + if elapsed < 1.0 { + return // Avoid too frequent calculations + } + + // Calculate total messages + totalMessages := stats.MessagesSent + stats.MessagesReceived + + // Calculate messages per second for 1m window + rate := float64(totalMessages) / elapsed + stats.ThroughputRates[0] = rate + + // Calculate error rate for 1m window + if totalMessages > 0 { + errorRate := float64(stats.Errors) / float64(totalMessages) + stats.ErrorRates[0] = errorRate + } + + // Update 5m and 15m rates with exponential decay + // 5m rate: 63% new, 37% old + stats.ThroughputRates[1] = 0.63*rate + 0.37*stats.ThroughputRates[1] + stats.ErrorRates[1] = 0.63*stats.ErrorRates[0] + 0.37*stats.ErrorRates[1] + + // 15m rate: 28% new, 72% old + stats.ThroughputRates[2] = 0.28*rate + 0.72*stats.ThroughputRates[2] + stats.ErrorRates[2] = 0.28*stats.ErrorRates[0] + 0.72*stats.ErrorRates[2] + + // Update last calculation time + stats.lastCalculation = now +} + +// Calculate latency percentiles +func (stats *ConnectionStats) calculateLatencyPercentiles() { + stats.mutex.Lock() + defer stats.mutex.Unlock() + + // Need at least a few measurements + if len(stats.latencyHistory) < 5 { + return + } + + // Create sorted copy of latency history + sortedLatencies := make([]time.Duration, len(stats.latencyHistory)) + copy(sortedLatencies, stats.latencyHistory) + sort.Slice(sortedLatencies, func(i, j int) bool { + return sortedLatencies[i] < sortedLatencies[j] + }) + + // Calculate p50 (median) + p50Index := len(sortedLatencies) / 2 + stats.LatencyPercentiles[0] = sortedLatencies[p50Index] + + // Calculate p90 + p90Index := int(float64(len(sortedLatencies)) * 0.9) + if p90Index >= len(sortedLatencies) { + p90Index = len(sortedLatencies) - 1 + } + stats.LatencyPercentiles[1] = sortedLatencies[p90Index] + + // Calculate p99 + p99Index := int(float64(len(sortedLatencies)) * 0.99) + if p99Index >= len(sortedLatencies) { + p99Index = len(sortedLatencies) - 1 + } + stats.LatencyPercentiles[2] = sortedLatencies[p99Index] +} + +// Update quality metrics +func (stats *ConnectionStats) updateQualityMetrics() { + stats.mutex.Lock() + defer stats.mutex.Unlock() + + // Calculate stability based on reconnections and error rates + uptime := time.Since(stats.Connected).Minutes() + reconnRate := 0.0 + if uptime > 0 { + reconnRate = float64(stats.ReconnectionCount) / uptime + } + + // More than 1 reconnection per minute is bad + reconnFactor := math.Min(1.0, reconnRate) + errorFactor := stats.ErrorRates[0] // 1m error rate + + stats.Stability = 1.0 - (0.6*reconnFactor + 0.4*errorFactor) + if stats.Stability < 0 { + stats.Stability = 0 + } + + // Calculate performance based on latency + var perfScore float64 = 1.0 + + if stats.AvgLatency > 0 { + // Latency score: 1.0 for <50ms, 0.0 for >500ms + avgLatencyMs := float64(stats.AvgLatency.Milliseconds()) + + if avgLatencyMs < 50 { + perfScore = 1.0 + } else if avgLatencyMs > 500 { + perfScore = 0.0 + } else { + // Linear scale between 50ms and 500ms + perfScore = 1.0 - (avgLatencyMs-50.0)/450.0 + } + } + + stats.Performance = perfScore + + // Calculate reliability based on health scores + var healthSum float64 + healthCount := 0 + + for _, score := range stats.HealthScoreHistory { + if score > 0 { + healthSum += score + healthCount++ + } + } + + if healthCount > 0 { + stats.Reliability = healthSum / float64(healthCount) + } else { + stats.Reliability = 1.0 + } + + // Calculate overall quality score + stats.QualityScore = 0.4*stats.Stability + 0.3*stats.Performance + 0.3*stats.Reliability + + // Clamp to 0.0-1.0 range + if stats.QualityScore < 0.0 { + stats.QualityScore = 0.0 + } else if stats.QualityScore > 1.0 { + stats.QualityScore = 1.0 + } +} + +// Get uptime +func GetUptime(stats *ConnectionStats) time.Duration { + return time.Since(stats.Connected) +} + +// Get messages per second +func GetMessagesPerSecond(stats *ConnectionStats) float64 { + stats.mutex.RLock() + defer stats.mutex.RUnlock() + + return stats.ThroughputRates[0] +} + +// Get health trend +func GetHealthTrend(stats *ConnectionStats) float64 { + stats.mutex.RLock() + defer stats.mutex.RUnlock() + + // Need enough history + if len(stats.HealthScoreHistory) < 5 { + return 0.0 + } + + // Calculate linear regression slope + n := 0 + sumX := 0.0 + sumY := 0.0 + sumXY := 0.0 + sumXX := 0.0 + + for i, score := range stats.HealthScoreHistory { + if score <= 0 { + continue + } + + x := float64(i) + y := score + + sumX += x + sumY += y + sumXY += x * y + sumXX += x * x + n++ + } + + if n < 3 { + return 0.0 + } + + // Calculate slope + nf := float64(n) + slope := (nf*sumXY - sumX*sumY) / (nf*sumXX - sumX*sumX) + + // Normalize to -1.0 to 1.0 range + // Negative means declining health + // Positive means improving health + avgHealth := sumY / nf + if avgHealth != 0 { + slope = slope / avgHealth * 10.0 // Scale factor + } + + // Clamp to range + if slope > 1.0 { + slope = 1.0 + } else if slope < -1.0 { + slope = -1.0 + } + + return slope +} + +// Reset stats +func Reset(stats *ConnectionStats) { + stats.mutex.Lock() + defer stats.mutex.Unlock() + + // Keep connection time and reconnection count + connectedTime := stats.Connected + reconnCount := stats.ReconnectionCount + + // Create new stats with same connection time + newStats := New() + newStats.Connected = connectedTime + newStats.ReconnectionCount = reconnCount + + // Copy all fields + *stats = *newStats +} + +// Generate statistics report +func GenerateReport(stats *ConnectionStats) map[string]interface{} { + stats.mutex.RLock() + defer stats.mutex.RUnlock() + + // Calculate uptime + uptime := time.Since(stats.Connected) + + // Create report + report := map[string]interface{}{ + "uptime_seconds": uptime.Seconds(), + "messages_sent": stats.MessagesSent, + "messages_received": stats.MessagesReceived, + "bytes_sent": stats.BytesSent, + "bytes_received": stats.BytesReceived, + "errors": stats.Errors, + "reconnections": stats.ReconnectionCount, + "average_latency_ms": stats.AvgLatency.Milliseconds(), + + "throughput": map[string]float64{ + "messages_per_second_1m": stats.ThroughputRates[0], + "messages_per_second_5m": stats.ThroughputRates[1], + "messages_per_second_15m": stats.ThroughputRates[2], + }, + + "error_rates": map[string]float64{ + "error_rate_1m": stats.ErrorRates[0], + "error_rate_5m": stats.ErrorRates[1], + "error_rate_15m": stats.ErrorRates[2], + }, + + "latency": map[string]int64{ + "p50_ms": stats.LatencyPercentiles[0].Milliseconds(), + "p90_ms": stats.LatencyPercentiles[1].Milliseconds(), + "p99_ms": stats.LatencyPercentiles[2].Milliseconds(), + }, + + "quality": map[string]float64{ + "overall_score": stats.QualityScore, + "stability": stats.Stability, + "performance": stats.Performance, + "reliability": stats.Reliability, + }, + + "health_trend": GetHealthTrend(stats), + } + + // Add message type distribution + msgTypes := make(map[string]int64) + for typ, count := range stats.MessageTypeDistribution { + msgTypes[typ] = count + } + report["message_types"] = msgTypes + + return report +} +``` + +## Architecture Decision Records + +### ADR 1: Specialized Extensions Model + +**Context**: Need a way to provide advanced functionality without bloating the core. + +**Decision**: Implement specialized, independent extensions that build on `honeybee` but can be used selectively. + +**Rationale**: +- Allows applications to choose only the extensions they need +- Prevents complexity from leaking into core functionality +- Creates clear boundaries between essential and optional features +- Enables specialized optimizations for specific use cases +- Follows the principle of progressive enhancement +- Enables à la carte adoption of advanced features +- Preserves compatibility with honeybee-based applications + +**Consequences**: +- Increases total API surface area across multiple packages +- Improves application performance by avoiding unnecessary code +- Creates clearer documentation of advanced vs. basic functionality +- May require some duplication between extensions +- Allows independent evolution of each extension +- All extensions implement corresponding honeybee interfaces +- Extensions can be adopted individually without requiring full migration + +### ADR 2: High-Performance Focus + +**Context**: Advanced applications have different performance requirements than basic ones. + +**Decision**: Optimize `mana-ws` extensions for high-performance and high-volume scenarios. + +**Rationale**: +- Addresses needs of applications with demanding requirements +- Provides optimizations that would be overkill for simple applications +- Creates clear upgrade path as application needs grow +- Allows fine-tuning for specific deployment environments +- Targets use cases where performance is critical + +**Consequences**: +- Increases implementation complexity +- Requires more sophisticated testing +- Creates more configuration options +- Enables applications to scale to higher loads +- May have higher resource usage in some cases + +### ADR 3: Advanced State Management + +**Context**: Complex applications need sophisticated state tracking beyond basic connection management. + +**Decision**: Implement comprehensive state tracking with analytics and prediction capabilities. + +**Rationale**: +- Provides visibility into connection behavior for debugging +- Enables predictive adjustments to improve reliability +- Creates foundation for adaptive optimization +- Supports monitoring and alerting in production environments +- Helps identify patterns in connection behavior + +**Consequences**: +- Increases memory and CPU usage for tracking +- Provides valuable insights for complex applications +- May require tuning to balance tracking overhead +- Creates more complex internal state +- Enables more sophisticated recovery strategies + +### ADR 4: Adaptive Algorithms + +**Context**: Static configurations don't work well across diverse network conditions. + +**Decision**: Implement adaptive algorithms that adjust based on observed conditions. + +**Rationale**: +- Improves performance across varying network environments +- Reduces need for manual tuning +- Creates more resilient connections +- Adjusts automatically to changing conditions +- Prevents degradation under stress + +**Consequences**: +- Increases algorithm complexity +- Makes behavior less predictable in some cases +- Improves overall reliability +- Requires careful testing across diverse conditions +- May cause unexpected adaptation in edge cases + +### ADR 5: Pluggable Architecture + +**Context**: Different applications need different combinations of advanced features. + +**Decision**: Design extensions to be independently usable rather than as a monolithic advanced layer. + +**Rationale**: +- Allows applications to choose only what they need +- Prevents unnecessary code inclusion +- Simplifies testing of individual features +- Enables incremental adoption of advanced features +- Follows the principle of separation of concerns + +**Consequences**: +- Requires clear documentation of dependencies between extensions +- Increases number of import statements in application code +- Improves application size and performance +- May lead to some duplication between extensions +- Allows targeted updates to specific extensions + +### ADR 6: Detailed Analytics + +**Context**: Understanding connection behavior is important for optimization. + +**Decision**: Implement comprehensive statistics tracking and reporting. + +**Rationale**: +- Provides visibility into connection performance +- Enables data-driven optimization +- Supports monitoring and alerting +- Helps identify problems before they become critical +- Creates foundation for automatic tuning + +**Consequences**: +- Increases overhead of tracking and calculation +- Provides valuable insights for complex applications +- May require storage considerations for historical data +- Creates more observability in production +- Requires balancing detail with performance impact + +### ADR 7: Drop-in Component Compatibility + +**Context**: Need to determine how mana-ws components relate to their honeybee counterparts. + +**Decision**: Design mana-ws components to be drop-in replacements for honeybee components, implementing the same interfaces while providing additional capabilities through extended interfaces. + +**Rationale**: +- Enables incremental adoption of advanced features +- Allows selective enhancement of specific components +- Preserves investment in existing application code +- Creates clear upgrade paths as requirements evolve +- Follows the principle of progressive enhancement +- Respects the user's choice of complexity level +- Supports targeted optimization of performance bottlenecks + +**Consequences**: +- All mana-ws components must implement honeybee interfaces +- Need careful design of extended interfaces +- May require adapter patterns in some cases +- Enables feature detection through interface assertions +- Simplifies migration between implementations +- Requires comprehensive compatibility testing + +### ADR 8: Progressive Cognitive Load + +**Context**: Need to consider how developers learn and adopt the library ecosystem as their applications evolve. + +**Decision**: Design mana-ws to support progressive learning and implementation, allowing developers to adopt advanced features incrementally as their applications mature, without requiring upfront understanding of all capabilities. + +**Rationale**: +- Aligns with natural learning progression +- Supports symbiotic evolution of applications and usage patterns +- Reduces initial barrier to entry +- Enables experimentation with advanced features in isolation +- Creates natural documentation hierarchy from basic to advanced +- Matches complexity to application maturity +- Respects developer time and cognitive resources + +**Consequences**: +- Features must be designed for independent adoption +- Documentation should follow progressive disclosure principles +- Examples needed for common upgrade scenarios +- May increase number of configuration options +- Creates more natural onboarding experience +- Allows focused learning on immediately relevant features +- Supports both beginners and advanced users effectively \ No newline at end of file