1
[Roadmap] Nostr Websocket Library Stack
jay edited this page 2025-12-30 09:58:36 -05:00

Overall Nostr Websocket Data Transport Stack

Architecture Overview

The Nostr WebSocket Data Transport Stack provides a layered approach to implementing the Nostr protocol over WebSockets. The stack consists of three complementary libraries that can be used independently or together, depending on application requirements:

  1. roots-ws: A foundation layer that provides pure functional primitives for working with Nostr protocol messages. This layer focuses on time-independent operations with no event loops or state management, implementing core message validation, serialization, and routing logic.

  2. honeybee: A practical implementation layer that builds on roots-ws to provide essential WebSocket transport functionality. This layer adds connection management, reconnection handling, and subscription management with appropriate event loops and state tracking.

  3. mana-ws: An extensions layer that provides specialized, optional functionality for advanced use cases. This layer implements sophisticated features like connection pooling, message compression, rate limiting, and detailed statistics tracking.

The architecture follows these key principles:

  • Pure Core: The foundation is built on pure functions with minimal side effects
  • Composition Over Inheritance: Higher layers compose functionality from lower layers rather than extending them
  • Progressive Enhancement: Applications can start with just roots-ws and add higher layers as needs evolve
  • Functional Paradigm: All layers use a functional approach with explicit state management
  • Clear Separation of Concerns: Each layer has a distinct focus and responsibility

This layered approach allows applications to choose the appropriate level of functionality while maintaining a consistent programming model across the stack.

Architecture Decision Records

ADR 1: Layered Architecture with Unidirectional Dependencies

Context: Need to organize WebSocket transport functionality to support different levels of application complexity.

Decision: Split functionality into three layers (roots-ws, honeybee, mana-ws) with unidirectional dependencies.

Rationale:

  • Allows applications to use only the functionality they need
  • Creates natural boundaries between pure functionality and stateful behavior
  • Promotes focused testing and simplified maintenance
  • Enables incremental adoption for applications with evolving needs
  • Follows the principle of separation of concerns

Consequences:

  • Requires careful API design to ensure layers compose well
  • May lead to some duplication between layers for independence
  • Documentation must clearly explain the role of each layer
  • Testing becomes more modular and focused on each layer's responsibilities

ADR 2: Functional Programming Paradigm Across Layers

Context: Need a consistent programming model that works well across all layers.

Decision: Adopt functional programming patterns with namespaced functions operating on explicit data structures.

Rationale:

  • Creates a consistent mental model across all layers
  • Improves testability through pure functions and explicit state
  • Reduces coupling between components
  • Aligns with Go's design philosophy
  • Makes state management explicit and traceable

Consequences:

  • May be less familiar to developers used to OOP
  • Requires clear documentation on function usage patterns
  • Improves composability of library components
  • Makes state changes and side effects more visible

ADR 3: Transport-Only Focus with No Protocol Semantics

Context: Need to define the scope of the transport stack relative to other Nostr libraries.

Decision: Focus exclusively on transport concerns, deferring protocol semantics to other libraries.

Rationale:

  • Creates a clear separation between transport and protocol semantics
  • Allows the transport to be used with different protocol implementations
  • Follows single responsibility principle
  • Reduces coupling between transport and application logic
  • Enables testing of transport independently from protocol semantics

Consequences:

  • Applications need to compose transport with protocol libraries
  • Requires clear API boundaries between transport and protocol concerns
  • May require additional integration code in some applications
  • Simplifies evolution of transport and protocol independently

ADR 4: Explicit Role-Based Connection Model

Context: Nostr protocol has inherent asymmetry in connection patterns.

Decision: Model connections explicitly as either initiator or responder roles rather than using client/relay terminology.

Rationale:

  • Accurately represents the connection control model
  • Avoids confusion with higher-level client/relay concepts
  • Focuses on the transport protocol's communication pattern
  • Creates clear API boundaries for each connection role
  • Allows specialized behavior appropriate to each role

Consequences:

  • API must make it explicit which operations are permitted for each role
  • Requires role-specific implementations of some functionality
  • May require additional validation to prevent role violations
  • Improves clarity of connection behavior expectations

ADR 5: Tiered Configuration Strategy

Context: Different components require different levels of configurability.

Decision: Implement a tiered configuration approach with defaults, profiles, component-level options, and advanced parameters.

Rationale:

  • Makes simple use cases simple with zero configuration
  • Provides appropriate control for advanced users
  • Organizes configuration into logical groupings
  • Prevents configuration complexity from leaking into basic usage
  • Allows applications to tune specific aspects without understanding all options

Consequences:

  • Requires careful selection of default values
  • Increases implementation complexity for configuration management
  • Improves documentation by organizing options into logical tiers
  • Enables incremental configuration as application needs grow
  • May require configuration builders to maintain clean interfaces

roots-ws

Functional Architecture

1. Core Types

Purpose: Defines basic types for WebSocket connections and message handling.

// Core types
type ConnectionStatus int

const (
    // StatusDisconnected indicates the connection is not active and no connection attempt is in progress.
    StatusDisconnected ConnectionStatus = iota
    
    // StatusConnecting indicates a connection attempt is currently in progress but not yet established.
    StatusConnecting
    
    // StatusConnected indicates the connection is active and ready for message exchange.
    StatusConnected
    
    // StatusClosing indicates the connection is in the process of shutting down gracefully.
    StatusClosing
    
    // StatusClosed indicates the connection has been fully closed with a clean shutdown.
    StatusClosed
    
    // StatusError indicates the connection has failed and is in an error state.
    StatusError
    
    // StatusBackoff indicates a temporary disconnected state during a reconnection attempt.
    StatusBackoff
    
    // StatusAwaiting indicates waiting for preconditions before attempting connection.
    StatusAwaiting
    
    // StatusUpgrading indicates WebSocket protocol upgrade in progress.
    StatusUpgrading
)

2. Envelope Handling

Purpose: Provides a type and functions for working with Nostr protocol messages.

// Core envelope type
type Envelope []byte

// Get message label
func GetLabel(env Envelope) (string, error) {
    // Parse JSON to extract first element as string (label)
    // Handle errors if data isn't valid JSON or doesn't contain a label
}

// Get standard message labels
func GetStandardLabels() map[string]struct{} {
    // Return map with standard Nostr message types:
    // EVENT, REQ, CLOSE, CLOSED, EOSE, NOTICE, OK, AUTH
}

// Check if label is standard
func IsStandardLabel(label string) bool {
    // Check against list of standard Nostr message labels
}

3. Envelope Creation Functions

Purpose: Creates standard Nostr protocol messages using pure functions.

// Create EVENT envelope for publishing events
func EncloseEvent(event []byte) Envelope {
    // Create array with "EVENT" and event object
    // Return as JSON bytes
}

// Create EVENT envelope for subscription events
func EncloseSubscriptionEvent(subID string, event []byte) Envelope {
    // Create array with "EVENT", subID, and event
    // Return as JSON bytes
}

// Create REQ envelope
func EncloseReq(subID string, filters [][]byte) Envelope {
    // Create array with "REQ", subID, and filters
    // Return as JSON bytes
}

// Create CLOSE envelope
func EncloseClose(subID string) Envelope {
    // Create array with "CLOSE" and subID
    // Return as JSON bytes
}

// Create CLOSED envelope
func EncloseClosed(subID string, message string) Envelope {
    // Create array with "CLOSED", subID, and message
    // Return as JSON bytes
}

// Create NOTICE envelope
func EncloseNotice(message string) Envelope {
    // Create array with "NOTICE" and message
    // Return as JSON bytes
}

// Create EOSE envelope
func EncloseEOSE(subID string) Envelope {
    // Create array with "EOSE" and subID
    // Return as JSON bytes
}

// Create OK envelope
func EncloseOK(eventID string, status bool, message string) Envelope {
    // Create array with "OK", eventID, status, and message
    // Return as JSON bytes
}

// Create AUTH challenge envelope
func EncloseAuthChallenge(challenge string) Envelope {
    // Create array with "AUTH" and challenge
    // Return as JSON bytes
}

// Create AUTH response envelope
func EncloseAuthResponse(event []byte) Envelope {
    // Create array with "AUTH" and event
    // Return as JSON bytes
}

4. Envelope Parsing Functions

Purpose: Extracts data from standard Nostr protocol messages.

// Helper functions for validation
func CheckArrayLength(arr []json.RawMessage, minLen int) error {
    // Ensure array has at least the minimum required elements
}

func CheckLabel(got, want string) error {
    // Verify that envelope label matches expected value
}

func ParseElement(element json.RawMessage, value interface{}, position string) error {
    // Unmarshal array element into provided value with error handling
}

// Extract event from EVENT envelope
func FindEvent(env Envelope) ([]byte, error) {
    // Parse message as JSON array
    // Validate array has correct structure for EVENT
    // Extract event object
    // Return event as JSON bytes
}

// Extract subscription event
func FindSubscriptionEvent(env Envelope) (subID string, event []byte, err error) {
    // Parse message as JSON array
    // Validate array has correct structure for subscription EVENT
    // Extract subscription ID and event
    // Return both with appropriate error handling
}

// Extract subscription request
func FindReq(env Envelope) (subID string, filters [][]byte, err error) {
    // Parse message as JSON array
    // Validate array has correct structure for REQ
    // Extract subscription ID and filters
    // Return both with appropriate error handling
}

// Extract OK response
func FindOK(env Envelope) (eventID string, status bool, message string, err error) {
    // Parse message as JSON array
    // Validate array has correct structure for OK
    // Extract eventID, status, and message
    // Return all with appropriate error handling
}

// Extract EOSE message
func FindEOSE(env Envelope) (subID string, err error) {
    // Parse message as JSON array
    // Validate array has correct structure for EOSE
    // Extract subscription ID
    // Return with appropriate error handling
}

// Extract CLOSE message
func FindClose(env Envelope) (subID string, err error) {
    // Parse message as JSON array
    // Validate array has correct structure for CLOSE
    // Extract subscription ID
    // Return with appropriate error handling
}

// Extract CLOSED message
func FindClosed(env Envelope) (subID string, message string, err error) {
    // Parse message as JSON array
    // Validate array has correct structure for CLOSED
    // Extract subscription ID and message
    // Return both with appropriate error handling
}

// Extract NOTICE message
func FindNotice(env Envelope) (message string, err error) {
    // Parse message as JSON array
    // Validate array has correct structure for NOTICE
    // Extract message
    // Return with appropriate error handling
}

// Extract AUTH challenge
func FindAuthChallenge(env Envelope) (challenge string, err error) {
    // Parse message as JSON array
    // Validate array has correct structure for AUTH challenge
    // Extract challenge
    // Return with appropriate error handling
}

// Extract AUTH response
func FindAuthResponse(env Envelope) (event []byte, err error) {
    // Parse message as JSON array
    // Validate array has correct structure for AUTH response
    // Extract event
    // Return with appropriate error handling
}

5. Error Types

Purpose: Define error types for the library.

// Data Structure Errors
var (
    InvalidJSON    = errors.New("invalid JSON")
    MissingField   = errors.New("missing required field")
    WrongFieldType = errors.New("wrong field type")
)

// Envelope Errors
var (
    InvalidEnvelope    = errors.New("invalid envelope format")
    WrongEnvelopeLabel = errors.New("wrong envelope label")
)

Architecture Decision Records

ADR 1: Pure Functional Approach

Context: Need to define the programming paradigm for the foundation layer.

Decision: Implement roots-ws as a pure functional library with no side effects or state management.

Rationale:

  • Creates a solid foundation for higher-level libraries to build upon
  • Maximizes testability and predictability
  • Eliminates concurrency issues at the foundation level
  • Makes function composition more natural and explicit
  • Aligns with functional programming best practices

Consequences:

  • Cannot include any event loops or timers in this layer
  • Must pass all state explicitly through function parameters
  • All I/O operations must be handled by higher layers
  • Library will feel incomplete when used standalone
  • Function signatures will be more explicit about dependencies

ADR 2: Raw JSON Handling

Context: Need to decide how message content should be handled.

Decision: Process messages as raw JSON at this layer, deferring semantic validation to consumers.

Rationale:

  • Avoids unnecessary serialization/deserialization cycles
  • Maintains flexibility for consumers in how they process content
  • Prevents tight coupling to specific JSON libraries
  • Aligns with the transport-only focus of the library
  • Makes message transformation more explicit

Consequences:

  • Cannot validate message content semantics, only structure
  • Consumers need to implement their own content parsing
  • Performance benefits from reduced parsing overhead
  • More flexibility for consumers to use different JSON libraries
  • Must clearly document format expectations

ADR 3: Explicit Error Types

Context: Error handling approach needs to be defined.

Decision: Define explicit error types rather than using generic errors.

Rationale:

  • Allows consumers to handle different error scenarios appropriately
  • Creates clear documentation of possible failure modes
  • Enables selective recovery strategies based on error type
  • Improves debugging by providing specific error information
  • Creates a consistent error handling pattern

Consequences:

  • Increases code surface area with many error definitions
  • Requires discipline to use the right error types
  • Simplifies error handling for consumers
  • Makes error handling expectations explicit in documentation
  • May lead to error handling anti-patterns if not well documented

ADR 4: No Dependencies

Context: Need to determine dependency strategy for the foundation layer.

Decision: roots-ws should have no dependencies outside the standard library.

Rationale:

  • Eliminates dependency conflicts for consumers
  • Reduces maintenance burden from external API changes
  • Creates a more stable foundation for higher layers
  • Simplifies deployment and versioning
  • Makes clear boundaries around functionality

Consequences:

  • May need to implement functionality available in external libraries
  • Limited to standard library capabilities
  • Reduces feature bloat by forcing simplicity
  • Improves long-term stability
  • May require more code than using external libraries

ADR 5: Language-Specific Implementations

Context: Need to support multiple programming languages.

Decision: Provide idiomatic implementations for each supported language rather than a universal API.

Rationale:

  • Allows each language implementation to follow best practices for that language
  • Creates more natural APIs for consumers in each language
  • Avoids awkward abstractions to force uniformity
  • Enables leveraging language-specific features
  • Improves adoption by feeling native to each language ecosystem

Consequences:

  • Implementations will diverge in details across languages
  • Requires maintaining multiple codebases
  • Better developer experience in each language
  • More challenging to maintain consistency in behavior
  • More flexibility to evolve each implementation independently

ADR 6: Exported Helper Functions

Context: Need to decide which helper functions should be part of the public API to enable extensibility.

Decision: Export core helper functions (CheckArrayLength, CheckLabel, ParseElement) that enable consumers to create their own envelope types consistent with the library's patterns.

Rationale:

  • Allows consumers to extend the library with custom envelope types
  • Ensures consistency in validation and error handling across custom and built-in envelopes
  • Enables protocol evolution without requiring library updates
  • Follows the principle of providing tools rather than just finished products
  • Creates a flexible extension point that preserves architectural integrity

Consequences:

  • Increases the public API surface area
  • Requires careful documentation of helper function behaviors and expectations
  • Makes breaking changes to helpers more impactful
  • Creates shared validation patterns across the ecosystem
  • Encourages protocol innovation while maintaining implementation consistency

honeybee

Functional Architecture

1. Connection Management

Purpose: Manages WebSocket connection lifecycle with event loops.

Dependencies: roots-ws Connection types, Error types

package connection

// Core connection type
type Connection struct {
    Status                ConnectionStatus
    URI                   string
    InChan                chan []byte        // Socket → Application
    OutChan               chan []byte        // Application → Socket
    ErrChan               chan error         // Internal errors
    socket                *websocket.Conn
    closeOnce             sync.Once          // Ensures single close execution
    
    // Component references
    ReconnectManager      *reconnection.Manager  // Optional reconnection handling
    HealthMonitor         *health.Monitor        // Optional health monitoring
    RateLimiter           *ratelimit.Limiter     // Optional rate limiting
    CompressionEnabled    bool                   // Whether compression is enabled
    Metrics               *metrics.Collector     // Connection metrics tracking
    
    done                  chan struct{}      // Signal to shut down goroutines
}

// Configuration with sensible defaults
type Config struct {
    // Connection timeout
    ConnectionTimeout     time.Duration
    
    // Component configurations
    ReconnectConfig       *reconnection.Config  // Optional reconnection config
    HealthConfig          *health.Config        // Optional health monitoring config
    RateLimitConfig       *ratelimit.Config     // Optional rate limiting config
    CompressionConfig     *compression.Config   // Optional compression config
    MetricsEnabled        bool                  // Whether to collect metrics
    
    // Message limits
    MaxMessageSize        int
}

// Connection status values
type ConnectionStatus int

const (
    StatusDisconnected ConnectionStatus = iota
    StatusConnecting
    StatusConnected
    StatusClosing
    StatusClosed
    StatusError
    StatusBackoff
)

// Create new connection in disconnected state
func New(uri string, config Config) *Connection {
    // Initialize connection with channels and disconnected status
    // Initialize optional components if configs are provided
    // Initialize metrics collector if enabled
    // Return initialized connection
}

// Connect to websocket endpoint (for initiator)
func Connect(conn *Connection) error {
    // Set connection status to StatusConnecting
    // Create dialer with connection timeout from config
    // Set up compression options if enabled
    // Establish websocket connection with timeout
    // Start reader and writer goroutines
    // Initialize health monitoring if configured
    // Set connection status to StatusConnected on success
}

// Accept existing websocket (for responder)
func FromSocket(socket *websocket.Conn, config Config) *Connection {
    // Create connection from existing socket
    // Start reader and writer goroutines
    // Initialize health monitoring if configured
    // Set up rate limiting if configured
    // Enable compression if configured
    // Initialize metrics if enabled
}

// Close connection
func Close(conn *Connection) error {
    // Safely close the websocket using closeOnce
    // Clean up goroutines and channels
    // Record close event in metrics if enabled
}

// Get current connection state
func Status(conn *Connection) ConnectionStatus {
    // Return current status
}

// Internal reader goroutine
func startReader(conn *Connection) {
    // Read envelopes from socket
    // Apply rate limiting if enabled
    // Record metrics if enabled
    // Send to InChan
    // Handle errors via ErrChan
}

// Internal writer goroutine
func startWriter(conn *Connection) {
    // Read from OutChan
    // Apply compression if enabled
    // Record metrics if enabled
    // Write envelopes to socket
    // Handle errors via ErrChan
}

// Handle onclose event from websocket
func setupCloseHandler(conn *Connection) {
    // Set close handler on websocket
    // Send close error through ErrChan
    // Update connection status
    // Record disconnection in metrics if enabled
    // Trigger reconnection if enabled
}

2. Reconnection Management

Purpose: Handles automatic reconnection with exponential backoff.

Dependencies: roots-ws Error types

package reconnection

// Reconnection configuration with sensible defaults
type Config struct {
    Enabled           bool
    MaxAttempts       int         // Maximum number of reconnection attempts (0 for unlimited)
    InitialDelay      time.Duration // Default: 1 second
    MaxDelay          time.Duration // Default: 30 seconds  
    BackoffFactor     float64     // Default: 2.0 (doubles delay each attempt)
    JitterFactor      float64     // Default: 0.2 (adds ±20% randomization)
}

// Default reconnection configuration
func DefaultConfig() *Config {
    return &Config{
        Enabled:         true,
        MaxAttempts:     5,
        InitialDelay:    1 * time.Second,
        MaxDelay:        30 * time.Second,
        BackoffFactor:   2.0,
        JitterFactor:    0.2,
    }
}

// Reconnection manager handles retry logic
type Manager struct {
    Config            *Config
    attempts          int
    lastAttempt       time.Time
    callbacks         []func()    // Functions to call on successful reconnect
}

// Create new reconnection manager
func NewManager(config *Config) *Manager {
    // Use default config if none provided
    // Initialize manager with zero attempts
    // Return initialized manager
}

// Register reconnection callback
func (m *Manager) OnReconnect(callback func()) {
    // Add callback to callbacks slice
}

// Calculate backoff delay with jitter
func (m *Manager) CalculateDelay() time.Duration {
    // Return zero if this is first attempt
    // Calculate base delay: initialDelay * (backoffFactor ^ attempts)
    // Add jitter: delay * (1 ± random * jitterFactor)
    // Cap at maximum delay
    // Return calculated delay
}

// Check if should attempt reconnection
func (m *Manager) ShouldAttempt() bool {
    // Return false if disabled
    // Check if max attempts exceeded (unless unlimited)
    // Return true if should attempt
}

// Record reconnection attempt
func (m *Manager) RecordAttempt() {
    // Increment attempts counter
    // Update lastAttempt timestamp
}

// Reset after successful connection
func (m *Manager) Reset() {
    // Reset attempts to zero
    // Clear lastAttempt timestamp
}

// Handle successful reconnection
func (m *Manager) HandleReconnect() {
    // Reset attempt counter
    // Execute all registered callbacks
}

3. Connection Health Monitoring

Purpose: Monitors connection health using ping/pong heartbeats.

Dependencies: roots-ws Error types

package health

// Health monitoring configuration
type Config struct {
    PingInterval          time.Duration    // Default: 25 seconds
    PongTimeout           time.Duration    // Default: 15 seconds
    PingFailureThreshold  int              // Default: 3 consecutive failures
}

// Default health configuration
func DefaultConfig() *Config {
    return &Config{
        PingInterval:         25 * time.Second,
        PongTimeout:          15 * time.Second,
        PingFailureThreshold: 3,
    }
}

// Health monitor tracks connection health
type Monitor struct {
    Config               *Config
    conn                 *websocket.Conn
    lastPingSent         time.Time
    lastPongReceived     time.Time
    consecutiveFailures  int
    isHealthy            bool
}

// Create new health monitor
func NewMonitor(conn *websocket.Conn, config *Config) *Monitor {
    // Use default config if none provided
    // Initialize monitor with healthy state
    // Set up ping/pong handlers on connection
    // Return initialized monitor
}

// Start health monitoring
func (m *Monitor) Start() {
    // Start ticker with PingInterval
    // Send pings at regular intervals
    // Monitor pong responses
    // Update health status based on responses
}

// Stop health monitoring
func (m *Monitor) Stop() {
    // Stop ticker
    // Clean up resources
}

// Check connection health
func (m *Monitor) IsHealthy() bool {
    // Return current health status
}

// Get time since last successful pong
func (m *Monitor) TimeSinceLastPong() time.Duration {
    // Return duration since lastPongReceived
    // Return max duration if no pong received yet
}

// Handle ping send
func (m *Monitor) handlePingSend() {
    // Send ping frame
    // Update lastPingSent timestamp
    // Schedule timeout check
}

// Handle pong received
func (m *Monitor) handlePongReceived() {
    // Update lastPongReceived timestamp
    // Reset consecutive failures counter
    // Update health status to healthy
}

// Check for ping timeout
func (m *Monitor) checkPingTimeout() {
    // Calculate time since last ping sent
    // If exceeded PongTimeout:
    //   - Increment consecutive failures
    //   - Check if failures exceed threshold
    //   - Update health status if needed
}

4. Rate Limiting

Purpose: Provides simple rate limiting to prevent message flooding.

Dependencies: roots-ws Error types

package ratelimit

// Rate limiting configuration
type Config struct {
    Enabled         bool    // Whether rate limiting is active (default: false)
    MessagesPerSec  int     // Maximum messages per second (default: 10)
    BurstSize       int     // Maximum burst allowed (default: 30)
}

// Default rate limit configuration
func DefaultConfig() *Config {
    return &Config{
        Enabled:        false,      // Off by default
        MessagesPerSec: 10,         // Reasonable default
        BurstSize:      30,         // Allow short bursts
    }
}

// Token bucket for rate limiting
type Limiter struct {
    rate            float64     // Tokens per second
    maxTokens       int         // Maximum capacity
    availableTokens float64     // Current token count
    lastRefill      time.Time   // Last time tokens were added
    mutex           sync.Mutex  // For thread safety
}

// Create new rate limiter with configuration
func NewLimiter(config *Config) *Limiter {
    // If disabled, return nil
    // Create token bucket with initial tokens
    // Set rate based on configuration
    // Return initialized limiter
}

// Check if operation is allowed by rate limit
func Allow(limiter *Limiter) bool {
    // Return true if limiter is nil (disabled)
    // Lock for thread safety
    // Refill tokens based on time elapsed
    // Check if bucket has at least one token
    // Consume token if available and return true
    // Return false if no tokens available
}

// Wait until operation is allowed with timeout
func Wait(limiter *Limiter, timeout time.Duration) error {
    // Return nil if limiter is nil (disabled)
    // Set up context with timeout
    // Loop until context cancelled or operation allowed
    // Calculate time until next token available
    // Wait for token to become available or context cancelled
}

// Refill tokens based on elapsed time
func refillTokens(limiter *Limiter) {
    // Calculate elapsed time since last refill
    // Calculate tokens to add based on rate
    // Add tokens up to max capacity
    // Update last refill time
}

5. Compression

Purpose: Implements WebSocket compression support for bandwidth efficiency.

Dependencies: None

package compression

// Compression configuration
type Config struct {
    Enabled     bool    // Whether compression is enabled (default: false)
    Level       int     // Compression level 0-9 (default: 6)
}

// Default compression configuration
func DefaultConfig() *Config {
    return &Config{
        Enabled: false,  // Off by default
        Level:   6,      // Default is balanced compression
    }
}

// Enable compression on WebSocket connection
func EnableCompression(conn *websocket.Conn, config *Config) error {
    // Return early if compression not enabled
    // Set compression parameters based on configuration
    // Enable write compression on connection
    // Return any errors from enabling compression
}

// Create dialer with compression support
func ConfigureDialer(dialer *websocket.Dialer, config *Config) {
    // Return early if compression not enabled
    // Enable compression on dialer
    // Set compression level if supported by implementation
}

// Create compression options for server
func GetServerCompressionOptions(config *Config) *websocket.CompressionOptions {
    // Return nil if compression not enabled
    // Create compression options with configured level
    // Set appropriate parameters for server-side compression
    // Return compression options
}

// Check if message should be compressed
func ShouldCompress(size int, contentType string) bool {
    // Skip very small messages (under 256 bytes)
    // Skip already compressed content types
    // Return true for compressible content
}

6. Connection Metrics

Purpose: Collects basic connection statistics for monitoring and debugging.

Dependencies: None

package metrics

// Connection metrics collector
type Collector struct {
    StartTime           time.Time       // When the connection was established
    MessagesSent        int64           // Total messages sent
    MessagesReceived    int64           // Total messages received
    Errors              int64           // Total errors encountered
    ReconnectionCount   int             // Number of reconnection attempts
    BytesSent           int64           // Total bytes sent
    BytesReceived       int64           // Total bytes received
    LastMessageSent     time.Time       // Timestamp of last outgoing message
    LastMessageReceived time.Time       // Timestamp of last incoming message
    mutex               sync.RWMutex    // For thread-safe updates to non-atomic fields
}

// Create new metrics collector
func NewCollector() *Collector {
    // Initialize collector with current timestamp
    // Set initial counters to zero
    // Return initialized collector
}

// Record sent message
func RecordSent(collector *Collector, size int) {
    // Skip if collector is nil
    // Atomically increment sent counter
    // Atomically add size to bytes sent
    // Update last message sent timestamp
}

// Record received message
func RecordReceived(collector *Collector, size int) {
    // Skip if collector is nil
    // Atomically increment received counter
    // Atomically add size to bytes received
    // Update last message received timestamp
}

// Record error
func RecordError(collector *Collector) {
    // Skip if collector is nil
    // Atomically increment error counter
}

// Record reconnection attempt
func RecordReconnection(collector *Collector) {
    // Skip if collector is nil
    // Safely increment reconnection counter
}

// Get connection uptime
func GetUptime(collector *Collector) time.Duration {
    // Return zero if collector is nil
    // Calculate duration since start time
}

// Calculate messages per second (recent rate)
func GetMessagesPerSecond(collector *Collector) float64 {
    // Return zero if collector is nil or insufficient history
    // Calculate messages per second based on total messages and uptime
}

// Reset metrics (typically after reconnection)
func Reset(collector *Collector, resetStartTime bool) {
    // Skip if collector is nil
    // Reset message counters to zero
    // Reset byte counters to zero
    // Reset timestamps if requested
    // Keep reconnection count
}

// Get metrics snapshot as map
func GetSnapshot(collector *Collector) map[string]interface{} {
    // Return empty map if collector is nil
    // Create map with current metrics values
    // Include derived metrics (uptime, message rates)
    // Return complete snapshot
}

7. Message Router Implementation

Purpose: Implements event loops for message routing based on type.

Dependencies: roots-ws Message routing logic, Message validation

package router

// Create routing channels
func CreateChannels(additionalLabels ...string) map[string]chan []byte {
    // Initialize channels for each standard message type:
    // EVENT, REQ, CLOSE, NOTICE, OK, EOSE, AUTH
    // Set appropriate buffer sizes for each channel type
    // based on expected message volume
}

// Route a message to appropriate channel
func RouteEnvelope(msg []byte, channels map[string]chan []byte) error {
    // Extract message type using roots.message.GetType()
    // Verify message type is valid
    // Use roots.router.DetermineRoute() to select appropriate channel
    // Handle unknown message types gracefully
    // Send message to channel with non-blocking behavior
    // Return error if channel is full or message type is invalid
}

// Process messages from connection
func StartRouting(conn *connection.Connection, channels map[string]chan []byte) {
    // Launch goroutine for message processing
    // Read from connection's raw message source
    // Apply message validation and size checks using roots-ws functions
    // Route each message to appropriate channel
    // Handle routing errors without crashing the router
    // Clean up when connection closes
}

8. Write Queue Implementation

Purpose: Provides thread-safe message transmission.

Dependencies: roots-ws Error types

package write

// Thread-safe write queue
type Queue struct {
    MaxSize int
    conn    *connection.Connection
    mutex   sync.Mutex
}

// Create new write queue for a connection
func NewQueue(conn *connection.Connection, maxSize int) *Queue {
    // Initialize queue with specified max size
    // Return fully initialized queue
}

// Send message through queue
func Send(queue *Queue, msg []byte) error {
    // Check if message exceeds max size
    // Acquire mutex lock for thread safety
    // Write message to connection's OutChan
    // Release mutex lock
    // Return any write errors encountered
}

// Start queue processing
func StartQueue(queue *Queue) {
    // Start goroutine to monitor connection's OutChan
    // Process messages in thread-safe manner
    // Handle errors appropriately
}

9. Initiator Functions

Purpose: Implements connection initiator patterns.

Dependencies: roots-ws Protocol Messages

package initiator

// Send EVENT envelope
func PublishEvent(queue *write.Queue, event []byte) error {
    // Validate event format using roots-ws
    // Create EVENT message using roots-ws
    // Send through write queue
    // Return any errors encountered
}

// Send REQ envelope
func SendReq(queue *write.Queue, subID string, filters [][]byte) error {
    // Validate subscription ID format and filters
    // Create REQ message using roots-ws
    // Send through write queue
    // Return any errors encountered
}

// Send CLOSE envelope
func SendClose(queue *write.Queue, subID string) error {
    // Validate subscription ID format
    // Create CLOSE message using roots-ws
    // Send through write queue
    // Return any errors encountered
}

// Send AUTH response envelope
func SendAuthResponse(queue *write.Queue, event []byte) error {
    // Validate event format using roots-ws
    // Create AUTH message using roots-ws
    // Send through write queue
    // Return any errors encountered
}

10. Responder Functions

Purpose: Implements connection responder patterns.

Dependencies: roots-ws Protocol Messages

package responder

// Send event within subscription context
func SendEvent(queue *write.Queue, subID string, event []byte) error {
    // Validate subscription ID and event format
    // Create EVENT message with subscription ID included
    // Send through write queue
    // Return any errors encountered
}

// Send EOSE for subscription
func SendEOSE(queue *write.Queue, subID string) error {
    // Validate subscription ID format
    // Create EOSE message for subscription
    // Send through write queue
    // Return any errors encountered
}

// Send NOTICE message
func SendNotice(queue *write.Queue, message string) error {
    // Validate notice message content
    // Create NOTICE message
    // Send through write queue
    // Return any errors encountered
}

// Send OK message for event
func SendOK(queue *write.Queue, eventID string, success bool, message string) error {
    // Validate event ID format
    // Create OK message with event ID, success flag, and message
    // Send through write queue
    // Return any errors encountered
}

// Send AUTH challenge
func SendAuthChallenge(queue *write.Queue, challenge string) error {
    // Validate challenge format
    // Create AUTH challenge message
    // Send through write queue
    // Return any errors encountered
}

// Send CLOSED message
func SendClosed(queue *write.Queue, subID string, message string) error {
    // Validate subscription ID format
    // Create CLOSED message with subscription ID and reason
    // Send through write queue
    // Return any errors encountered
}

11. Subscription Management

Purpose: Manages subscription state and handles reconnection.

Dependencies: roots-ws Subscription types

package subscription

// Registry tracks subscriptions for reconnection
type Registry struct {
    mu            sync.RWMutex
    subscriptions map[string]Entry
}

type Entry struct {
    ID      string
    Filters [][]byte
    Active  bool
}

// Create new registry
func NewRegistry() *Registry {
    // Initialize empty registry with map
    // Return initialized registry
}

// Add subscription to registry
func (r *Registry) Add(id string, filters [][]byte) {
    // Lock mutex for thread safety
    // Create entry with provided ID and filters
    // Set Active to true
    // Add to subscriptions map
    // Unlock mutex
}

// Remove subscription from registry
func (r *Registry) Remove(id string) {
    // Lock mutex for thread safety
    // Delete entry from subscriptions map
    // Unlock mutex
}

// Get active subscriptions
func (r *Registry) GetActive() map[string][][]byte {
    // Lock mutex for read access
    // Create map of active subscription IDs to filters
    // Unlock mutex
    // Return active subscriptions map
}

// Subscribe with registry tracking
func Subscribe(queue *write.Queue, registry *Registry, id string, filters [][]byte) error {
    // Create REQ message using initiator.SendReq
    // Register subscription in registry if successful
    // Return any errors from send operation
}

// Unsubscribe with registry tracking
func Unsubscribe(queue *write.Queue, registry *Registry, id string) error {
    // Create CLOSE message using initiator.SendClose
    // Remove from registry regardless of send result
    // Return any errors from send operation
}

// Register reconnect callback for a connection
func RegisterReconnectCallback(conn *connection.Connection, registry *Registry, queue *write.Queue) {
    // Create callback function that reestablishes subscriptions
    // Register callback with connection using OnReconnect
}

// Reestablish subscriptions after reconnection
func ReestablishSubscriptions(queue *write.Queue, registry *Registry) []error {
    // Get active subscriptions from registry
    // Attempt to resubscribe to each using initiator.SendReq
    // Collect any errors encountered
    // Return array of errors (empty if all succeeded)
}

12. Connection Pool

Purpose: Manages multiple connections with shared operations.

Dependencies: Connection, Write Queue, Router types

package pool

// Connection pool manages multiple relays
type Pool struct {
    mu          sync.RWMutex     // Protects connections and queues maps
    connections map[string]*connection.Connection
    queues      map[string]*write.Queue
    Channels    map[string]chan []byte
}

// Create new pool with optional initial relays
func New(urls []string, config connection.Config) (*Pool, error) {
    // Initialize pool with empty maps
    // Set up shared routing channels
    // Add initial relays if provided
    // Return initialized pool
}

// Add relay to the pool
func (p *Pool) AddRelay(url string) error {
    // Lock mutex for thread safety
    // Check if relay already exists
    // Create new connection with config
    // Create write queue for connection
    // Start routing to shared channels
    // Connect to relay
    // Add to connections and queues maps
    // Unlock mutex
    // Return any errors encountered
}

// Remove relay from pool
func (p *Pool) RemoveRelay(url string) error {
    // Lock mutex for thread safety
    // Check if relay exists in pool
    // Close connection
    // Remove from connections and queues maps
    // Unlock mutex
    // Return any errors encountered
}

// Send to specific relay
func (p *Pool) SendTo(url string, env []byte) error {
    // Lock mutex for read access
    // Find queue for relay
    // Unlock mutex
    // Send envelope through queue
    // Return any errors encountered
}

// Broadcast to all relays
func (p *Pool) Broadcast(env []byte) []error {
    // Lock mutex for read access
    // Create copy of queues map to prevent deadlock
    // Unlock mutex
    // Send to all relays in copy
    // Collect any errors
    // Return array of errors (empty if all succeeded)
}

// Subscribe to all relays
func (p *Pool) Subscribe(id string, filters [][]byte, registry *subscription.Registry) []error {
    // Lock mutex for read access
    // Create copy of queues map to prevent deadlock
    // Unlock mutex
    // Subscribe to all relays in copy
    // Register subscription if successful
    // Collect any errors
    // Return array of errors (empty if all succeeded)
}

// Unsubscribe from all relays
func (p *Pool) Unsubscribe(id string, registry *subscription.Registry) []error {
    // Lock mutex for read access
    // Create copy of queues map to prevent deadlock
    // Unlock mutex
    // Unsubscribe from all relays in copy
    // Remove from registry
    // Collect any errors
    // Return array of errors (empty if all succeeded)
}

// Enable automatic reconnection for all relays
func (p *Pool) EnableAutoReconnect(registry *subscription.Registry) {
    // Lock mutex for read access
    // Create copy of connections map to prevent deadlock
    // Unlock mutex
    // Register reconnect callbacks for all connections
}

// Get connection status
func (p *Pool) Status(url string) (connection.ConnectionStatus, error) {
    // Lock mutex for read access
    // Find connection for relay
    // Unlock mutex
    // Return status or error if not found
}

// Get all statuses
func (p *Pool) Statuses() map[string]connection.ConnectionStatus {
    // Lock mutex for read access
    // Create map of relay URLs to connection statuses
    // Unlock mutex
    // Return statuses map
}

// Close all connections
func (p *Pool) Close() []error {
    // Lock mutex for read access
    // Create copy of connections map to prevent deadlock
    // Unlock mutex
    // Close all connections in copy
    // Collect any errors
    // Return array of errors (empty if all succeeded)
}

13. Error Types

Purpose: Defines error types for the library.

package errors

import "errors"

// Connection errors
var (
    ErrConnectionFailed  = errors.New("connection failed")
    ErrConnectionTimeout = errors.New("connection timeout")
    ErrConnectionClosed  = errors.New("connection closed unexpectedly")
    ErrInvalidState      = errors.New("invalid state for operation")
    ErrMaxReconnectAttemptsExceeded = errors.New("maximum reconnection attempts exceeded")
    ErrPongTimeout       = errors.New("pong response timeout")
    ErrConnectionUnhealthy = errors.New("connection deemed unhealthy")
)

// Write errors
var (
    ErrEnvelopeTooLarge = errors.New("envelope exceeds maximum size")
    ErrWriteFailed      = errors.New("write failed")
)

// Router errors
var (
    ErrUnknownEnvelopeType = errors.New("unknown envelope type")
    ErrChannelFull         = errors.New("channel buffer full")
)

// Pool errors
var (
    ErrRelayNotFound      = errors.New("relay not found in pool")
    ErrRelayAlreadyExists = errors.New("relay already exists in pool")
)

// Subscription errors
var (
    ErrSubscriptionNotFound  = errors.New("subscription not found")
    ErrSubscriptionDuplicate = errors.New("subscription ID already exists") 
)

14. Main Package

Purpose: Exports key types and functions with sensible defaults.

Dependencies: All other packages

package honeybee

import (
    "time"
    
    "github.com/nostr-protocol/go-roots-ws"
    "github.com/nostr-protocol/go-honeybee/connection"
    "github.com/nostr-protocol/go-honeybee/errors"
    "github.com/nostr-protocol/go-honeybee/health"
    "github.com/nostr-protocol/go-honeybee/initiator"
    "github.com/nostr-protocol/go-honeybee/pool"
    "github.com/nostr-protocol/go-honeybee/reconnection"
    "github.com/nostr-protocol/go-honeybee/responder"
    "github.com/nostr-protocol/go-honeybee/router"
    "github.com/nostr-protocol/go-honeybee/subscription"
    "github.com/nostr-protocol/go-honeybee/write"
)

// Export key types and functions for convenient access
type Connection = connection.Connection
type ConnectionStatus = connection.ConnectionStatus
type ConnectionConfig = connection.Config
type WriteQueue = write.Queue
type Pool = pool.Pool
type SubscriptionRegistry = subscription.Registry
type ReconnectionManager = reconnection.Manager
type ReconnectionConfig = reconnection.Config
type HealthMonitor = health.Monitor
type HealthConfig = health.Config

// Default configuration
func DefaultConfig() connection.Config {
    return connection.Config{
        ConnectionTimeout: 10 * time.Second,
        ReconnectConfig:   reconnection.DefaultConfig(),
        HealthConfig:      health.DefaultConfig(),
        MaxMessageSize:    1024 * 1024, // 1MB
    }
}

Cross-Language Translation: Go to TypeScript with RxJS

Core Architectural Principles

Recognize the fundamental paradigm difference:

  • Go is built around CSP (Communicating Sequential Processes) - goroutines passing data through channels
  • TypeScript/RxJS is built around reactive streams - declarative transformations of event flows
  • Don't translate patterns 1:1; translate intent into each language's natural expression

Design APIs that feel native:

  • Go users expect constructors that return values and errors, explicit Start() and Close() methods
  • TypeScript users expect constructors that return ready-to-use objects, automatic activation on subscription
  • The same feature should have different surface APIs in each language

TypeScript Multi-Environment Strategy

Shared foundation with environment-specific transport:

  • roots-ws (TypeScript): Pure functions for envelope handling, identical across environments
  • honeybee-browser: Reactive wrapper for browser WebSocket APIs
  • honeybee-node: Reactive wrapper for Node.js WebSocket libraries
  • Both honeybee variants expose identical observable-based APIs while using appropriate underlying implementations

Environment-aware design patterns:

  • Browser: Event-driven lifecycle, page visibility handling, CORS considerations
  • Node: Process lifecycle, server patterns, granular connection control
  • Shared: Message routing, subscription management, error handling patterns

Go-Specific Considerations

Embrace structured concurrency:

  • Use goroutines liberally - they're cheap and the expected tool
  • Always pair goroutines with cleanup using context.Context and sync.WaitGroup
  • Channels are the primary communication mechanism between concurrent operations
  • Make blocking behavior explicit and expected

Error handling:

  • Return errors as values, don't hide them in callbacks
  • Provide typed errors users can switch on: var ErrConnectionClosed = errors.New(...)
  • Use fmt.Errorf with %w for error wrapping to preserve context
  • Consider sentinel errors for common conditions

Lifecycle management:

  • Explicit Start() and Close() methods are idiomatic
  • Close() should be safe to call multiple times
  • Use defer for cleanup - teach users this pattern
  • Consider implementing io.Closer interface

Type system:

  • Prefer interfaces for dependencies (like Dialer, Conn)
  • Keep interfaces small and focused (interface segregation)
  • Use struct embedding for composition
  • Type assertions with the value, ok := x.(Type) pattern for optional capabilities

Testing:

  • Provide interface-based abstractions specifically for testability
  • Table-driven tests are the norm
  • Use testing.T helpers and subtests
  • Mock by implementing interfaces, not with frameworks

TypeScript/RxJS-Specific Considerations

Embrace reactive composition:

  • Expose observables as the primary API, not methods that trigger side effects
  • Let users compose streams with operators rather than providing pre-composed solutions
  • Hot vs cold observables matter - document which is which
  • Use subjects internally only when you need to push values into a stream

Declarative over imperative:

  • Prefer defining what streams represent over how to create them
  • Use operators like map, merge, switchMap to transform streams
  • Avoid manual subscription management - use takeUntil for cleanup
  • Let the framework handle execution timing

Error handling:

  • Errors flow through the observable pipeline
  • Provide separate error streams rather than mixing errors with data
  • Use catchError to handle and recover from errors within streams
  • Consider retry strategies with retry, retryWhen

Lifecycle management:

  • Construction should be side-effect free
  • Activation happens on subscription, deactivation on unsubscription
  • Use finalize operator for cleanup logic
  • Provide a destroy() or close() method only for resource cleanup, not stream management

Type system:

  • Use TypeScript's structural typing - interfaces describe shapes
  • Generics for reusable stream types: Observable<T>
  • Discriminated unions for state modeling
  • Leverage type inference where possible

Testing:

  • Use marble testing for complex stream interactions
  • Mock at the observable level, not internal implementation
  • Test by subscribing and asserting on emitted values
  • Use TestScheduler for time-based scenarios

Environment-Specific Patterns

Browser WebSocket Handling:

// Browser: Event-driven API with limited control
class BrowserConnection {
    private socket: WebSocket;
    
    constructor(url: string) {
        this.socket = new WebSocket(url);
        this.setupEventListeners();
    }
    
    private setupEventListeners() {
        this.socket.onopen = () => this.statusSubject.next('connected');
        this.socket.onmessage = (event) => this.messagesSubject.next(event.data);
        this.socket.onclose = () => this.handleReconnection();
    }
    
    // Limited connection parameter control
    // CORS restrictions apply
    // Browser manages some reconnection behavior
}

Node.js WebSocket Handling:

// Node: More granular control and server patterns
class NodeConnection {
    private socket: WebSocket;
    
    constructor(url: string, options: NodeWebSocketOptions) {
        this.socket = new WebSocket(url, {
            headers: options.headers,
            rejectUnauthorized: options.ssl?.verify,
            // Full control over connection parameters
        });
        this.setupEventListeners();
    }
    
    // Server-side acceptance patterns
    static fromIncoming(socket: WebSocket): NodeConnection {
        return new NodeConnection(socket);
    }
    
    // Custom authentication flows
    // Process lifecycle integration
    // Resource cleanup responsibilities
}

Shared Observable Patterns:

// Both environments expose identical API patterns
interface HoneybeeConnection {
    readonly messages$: Observable<Uint8Array>;
    readonly connectionState$: Observable<ConnectionState>;
    readonly errors$: Observable<Error>;
    
    send(data: Uint8Array): void;
    close(): void;
}

Specific Feature Comparisons

Connection establishment:

// Go: explicit, returns error  
conn, err := websocket.Connect("ws://example.com")  
if err != nil {  
    return err  
}  
defer conn.Close()  
// TypeScript Browser: declarative, errors in stream  
const conn = new BrowserConnection("ws://example.com");  
conn.messages$.subscribe(msg => /* ... */);  
conn.errors$.subscribe(err => /* ... */);

// TypeScript Node: declarative with more options
const conn = new NodeConnection("ws://example.com", {
    headers: { 'Authorization': 'Bearer token' },
    ssl: { verify: true }
});
conn.messages$.subscribe(msg => /* ... */);

Reading messages:

// Go: blocking channel read in goroutine  
go func() {  
    for msg := range conn.Incoming {  
        process(msg)  
    }  
}()  
// TypeScript: subscribe to observable (both environments)
conn.messages$.subscribe(msg => process(msg));

Environment-specific optimizations:

// Browser: Page visibility API integration
conn.messages$.pipe(
    pausable(pageVisibility$),
    // Pause subscriptions when page hidden
).subscribe(msg => process(msg));

// Node: Process lifecycle integration  
process.on('SIGTERM', () => {
    conn.close();
    // Graceful shutdown
});

Reconnection logic:

// Go: explicit retry loop with backoff  
for attempt := 0; attempt < maxRetries; attempt++ {  
    conn, err := dial()  
    if err == nil {  
        return conn, nil  
    }  
    time.Sleep(backoff(attempt))  
}  
// TypeScript Browser: Limited reconnection control
const conn$ = connect().pipe(  
    retryWhen(errors => errors.pipe(  
        delay(1000),
        take(3)  
    ))  
);

// TypeScript Node: Full reconnection control
const conn$ = connect().pipe(
    retryWhen(errors => errors.pipe(
        switchMap((error, index) => 
            customBackoffStrategy(error, index)
        ),
        takeWhile(shouldRetry)
    ))
);

Library Structure Comparison

Go Structure:

honeybee/
├── connection/     // Core connection management
├── reconnection/   // Backoff and retry logic  
├── health/        // Ping/pong monitoring
├── router/        // Message routing
└── pool/          // Multi-connection management

TypeScript Structure:

roots-ws/          // Shared pure functions
├── envelope.ts    // Message parsing/creation
├── validation.ts  // Structure validation  
└── errors.ts      // Error types

honeybee-browser/  // Browser-specific transport
├── connection.ts  // Browser WebSocket wrapper
├── health.ts      // Visibility-aware monitoring
└── pool.ts        // Multi-relay management

honeybee-node/     // Node-specific transport  
├── connection.ts  // Node WebSocket wrapper
├── server.ts      // Server-side patterns
└── pool.ts        // Process-aware management

Documentation Differences

Go documentation:

  • Focus on goroutine safety and channel semantics
  • Show explicit error handling patterns
  • Demonstrate lifecycle with defer
  • Example code shows complete programs, not snippets

TypeScript documentation:

  • Focus on observable composition and operators
  • Show stream transformations and combinations
  • Demonstrate subscription management
  • Document environment-specific behaviors and limitations
  • Example code shows reactive chains and patterns
  • Clearly distinguish browser vs Node capabilities

Common Pitfalls to Avoid

Don't force Go patterns into TypeScript:

  • No "Start()" methods that users must call
  • Don't require manual subscription management
  • Don't expose channels-as-observables literally

Don't force TypeScript patterns into Go:

  • No "auto-starting" connections that happen in constructors
  • Don't hide goroutines from users
  • Don't try to make channels look like observables

Environment-specific pitfalls:

  • Don't assume browser capabilities in Node implementation
  • Don't assume Node.js process control in browser implementation
  • Don't leak environment-specific types across the API boundary
  • Don't ignore CORS implications in browser documentation

Respect memory models:

  • Go: channel sends are synchronization points, document goroutine ownership
  • TypeScript Browser: observables handle threading implicitly, respect event loop constraints
  • TypeScript Node: observables handle threading implicitly, document process integration patterns

Handle cleanup appropriately:

  • Go: explicit Close() with context cancellation
  • TypeScript Browser: automatic cleanup via unsubscription, respect page lifecycle
  • TypeScript Node: automatic cleanup via unsubscription and finalize, respect process lifecycle

The goal is that a Go developer looks at your Go library and thinks "this feels like Go," while TypeScript developers in both browser and Node environments think "this feels like reactive programming in my environment." They solve the same problems but use each platform's strengths rather than fighting against them.

Architecture Decision Records

ADR 1: Practical WebSocket Implementation

Context: Need to define the implementation approach for the mid-level library.

Decision: Implement honeybee as a practical, lean WebSocket transport with essential connection management, while deferring advanced features to higher levels.

Rationale:

  • Provides a usable implementation that doesn't require additional libraries for basic operation
  • Creates clear expectations about what functionality is included
  • Ensures stability by focusing on well-understood, essential features
  • Makes the library approachable for most common use cases
  • Enables straightforward testing and maintenance
  • Establishes a foundation that can be enhanced by mana-ws components
  • Creates clear boundaries between essential and advanced functionality

Consequences:

  • Must clearly define what constitutes "essential" vs "advanced" features
  • Some consumers may need to add mana-ws for specialized requirements
  • Implementation will be more complex than roots-ws but simpler than mana-ws
  • Strikes a balance between functionality and simplicity
  • Creates a natural upgrade path as application needs grow
  • Interfaces designed to enable drop-in replacements from mana-ws
  • Clear criteria established for what constitutes "essential" functionality

ADR 2: Event Loop Based Architecture

Context: Need to determine how to handle asynchronous operations.

Decision: Use event loops with channels (Go) or observables (TypeScript) for asynchronous message handling.

Rationale:

  • Leverages the natural concurrency models of each language
  • Provides consistent patterns for handling async events
  • Enables composition with existing application event loops
  • Makes asynchronous operations explicit and manageable
  • Creates clear patterns for resource management and cleanup

Consequences:

  • Requires careful channel/observable management to prevent leaks
  • Creates more complex control flow than synchronous code
  • Improves scalability for high-volume connections
  • Requires education for developers unfamiliar with the pattern
  • Enables consistent error handling patterns

ADR 3: Role-Specific Connection Patterns

Context: Nostr protocol has asymmetric connection roles.

Decision: Implement distinct initiator and responder patterns while sharing core connection code.

Rationale:

  • Creates clear API boundaries for each role
  • Prevents incorrect usage of operations not permitted for a role
  • Allows specialized optimizations for each role
  • Makes role expectations explicit in code
  • Follows the principle of least privilege

Consequences:

  • Increases API surface area with role-specific functions
  • Improves clarity about permitted operations for each role
  • May require some code duplication for shared functionality
  • Makes incorrect usage harder by design
  • Simplifies application code by providing role-appropriate functions

ADR 4: Simple State Management

Context: Need to balance functionality with complexity.

Decision: Implement simple, explicit state management without sophisticated tracking or analytics.

Rationale:

  • Keeps the library focused and understandable
  • Provides essential functionality without complexity
  • Makes state changes explicit and traceable
  • Reduces potential for bugs in state transitions
  • Aligns with the functional approach of the stack

Consequences:

  • Advanced state management must be implemented by consumers or mana-ws
  • Functionality is more limited but more reliable
  • State transitions are more predictable
  • Less risk of unexpected behavior
  • May require more manual management for complex applications

ADR 5: Basic Resilience Features

Context: Need to determine what resilience features belong in this layer.

Decision: Include basic reconnection and health monitoring with simple strategies.

Rationale:

  • Provides essential resilience without complex implementation
  • Makes connections practical for real-world usage
  • Handles common failure scenarios gracefully
  • Avoids complex recovery strategies that belong in mana-ws
  • Ensures basic reliability without overwhelming complexity

Consequences:

  • Limited to simpler reconnection strategies
  • May not handle all edge cases optimally
  • Provides "good enough" reliability for most applications
  • Creates clear extension points for more sophisticated strategies
  • Makes expected behavior clear and predictable

ADR 6: Dependency Minimization

Context: Need to determine external dependency strategy.

Decision: Limit dependencies to roots-ws and standard libraries only.

Rationale:

  • Reduces potential for dependency conflicts
  • Simplifies maintenance and upgrades
  • Creates predictable behavior across environments
  • Improves security by limiting attack surface
  • Makes the library more self-contained and portable

Consequences:

  • May require implementing functionality available in external libraries
  • Increases code that must be maintained within the project
  • Reduces unexpected behavior from external dependencies
  • Improves long-term stability
  • May limit some advanced features that would require external libraries

ADR 7: Simplicity as a First-Class Feature

Context: Need to determine how to balance functionality with ease of understanding and maintenance.

Decision: Treat simplicity as a first-class feature by deliberately deferring advanced functionality to separate repositories (mana-ws) and maintaining clear, focused APIs.

Rationale:

  • Creates a more approachable entry point for new developers
  • Reduces cognitive load for common use cases
  • Improves maintainability by constraining scope
  • Enhances testability through focused functionality
  • Provides clearer documentation and learning path
  • Makes complexity an opt-in choice rather than a requirement
  • Allows the library to excel at its core purpose

Consequences:

  • Some advanced use cases will require additional libraries
  • Clear criteria needed for what belongs in honeybee vs. mana-ws
  • Requires discipline to maintain simplicity over time
  • Documentation must clearly explain the layered approach
  • May need to resist pressure to add advanced features directly

ADR 8: Interface-First Design for Extension

Context: Need to ensure honeybee components can be extended or replaced by more sophisticated implementations.

Decision: Design honeybee using explicit interfaces for all core components, with minimal surface area that future extensions can implement while adding capabilities.

Rationale:

  • Enables drop-in replacements from advanced libraries
  • Creates clear extension points for specialized behavior
  • Focuses implementation on essential behaviors
  • Makes dependencies explicit through interfaces
  • Provides stable API contracts for both users and extenders
  • Supports future evolution without breaking changes
  • Aligns with Go's interface-based composition patterns

Consequences:

  • Requires more initial design effort
  • Interface definitions must be stable and forward-thinking
  • Implementation details must remain behind interfaces
  • May constrain some implementation choices
  • Enables cleaner separation between layers
  • Improves testability through interface-based mocks

mana-ws

Functional Architecture

1. Advanced Write Queue

Purpose: Implements sophisticated back-pressure management, prioritization, and batching.

Dependencies: honeybee WriteQueue, roots-ws Error types

// Go implementation
// Core types extend honeybee types
type AdvancedWriteQueue struct {
    honeybee.WriteQueue
    HighWatermark    int
    LowWatermark     int
    UnderPressure    bool
    PriorityLevels   int        // Number of priority levels (default: 3)
    PriorityQueues   [][]Message // Separate queues for different priorities
    BatchSize        int        // Maximum messages to batch (default: 10)
    BatchDelay       time.Duration // Maximum delay before sending batch (default: 20ms)
    Stats            WriteQueueStats
    bufferPool       *sync.Pool  // Pool of reusable buffers
}

type WriteQueueStats struct {
    MessagesQueued   int64
    MessagesSent     int64
    WriteErrors      int64
    DroppedMessages  int64
    DelayedMessages  int64
    BatchesSent      int64
    AverageBatchSize float64
    PressureEvents   int64
    PressureDuration time.Duration
}

// Message with priority information
type Message struct {
    Content  []byte
    Priority int    // Higher number = higher priority
    Created  time.Time
}

// Create new advanced write queue with prioritization
func New(maxSize int, priorityLevels int) *AdvancedWriteQueue {
    // Initialize queue with specified max size
    // Set default watermarks (high=80%, low=50% of max size)
    // Create priority queues array based on priorityLevels (default: 3)
    // Initialize buffer pool for message batching
    // Initialize statistics counters
    // Set initial pressure state to false
    // Return fully initialized queue
}

// Initialize buffer pool for efficient memory usage
func initBufferPool() *sync.Pool {
    // Create sync.Pool for byte buffers
    // Set New function to create buffers with reasonable initial size
    // Return initialized pool
}

// Send message through queue with back pressure and priority
func SendWithPriority(queue *AdvancedWriteQueue, socket *websocket.Conn, msg []byte, priority int) error {
    // Check if message exceeds max size
    // Acquire mutex lock for thread safety
    // Apply back pressure strategies if under pressure
    // Add message to appropriate priority queue
    // Process queue if batch ready or timer expired
    // Update statistics
    // Release mutex lock
    // Return any errors encountered
}

// Process queued messages in priority order with batching
func ProcessQueue(queue *AdvancedWriteQueue, socket *websocket.Conn) {
    // Start from highest priority queue
    // Get buffer from pool
    // Batch multiple messages into single WebSocket frame if possible
    // For each priority level:
    //   - Collect up to BatchSize messages or until queue empty
    //   - Add to buffer with appropriate framing
    // Send batched messages as single WebSocket frame when possible
    // Return buffer to pool after use
    // Update statistics
}

// Update queue statistics
func UpdateStats(queue *AdvancedWriteQueue, messageSize int, batchSize int, success bool) {
    // Atomically update message counters
    // Track bytes sent if successful
    // Update batch statistics
    // Increment error counter if failed
    // Update queue size tracking
    // Recalculate back pressure status
    // Update exponential moving average of batch size
}

// Check if queue is under back pressure
func IsUnderPressure(queue *AdvancedWriteQueue) bool {
    // Return current back pressure state
    // Thread-safe access to pressure flag
}

// Update back pressure status
func UpdateBackPressure(queue *AdvancedWriteQueue) {
    // Check current queue size against watermarks
    // If under pressure and size drops below low watermark, release pressure
    // If not under pressure and size exceeds high watermark, apply pressure
    // Log pressure state changes
    // Track pressure event duration
    // Thread-safe update of pressure flag
}

// Apply back pressure strategies
func ApplyBackPressure(queue *AdvancedWriteQueue, msg []byte, priority int) (shouldSend bool, delay time.Duration) {
    // If not under pressure, allow message with no delay
    // If under pressure, apply priority-based filtering:
    //   - Always allow highest priority messages (control messages, CLOSE, EOSE)
    //   - Apply progressive dropping based on priority levels
    //   - Lower priorities have higher drop probability when under pressure
    //   - Calculate appropriate delay for lower priority messages
    //   - Update statistics for dropped/delayed messages
    // Return decision on whether to send and how long to delay
}

// Create prepared message for efficient reuse
func PrepareMessage(queue *AdvancedWriteQueue, messageType int, data []byte) (*websocket.PreparedMessage, error) {
    // Create WebSocket PreparedMessage (compressed if enabled)
    // This allows the message to be prepared once but sent to multiple destinations
    // Return prepared message or error
}

// Send prepared message through queue
func SendPrepared(queue *AdvancedWriteQueue, socket *websocket.Conn, prepared *websocket.PreparedMessage, priority int) error {
    // Similar to SendWithPriority but for prepared messages
    // Avoids repeated compression/preparation costs for broadcast scenarios
    // Update statistics
    // Return any errors
}

2. Connection Pool

Purpose: Manages multiple connections with advanced load distribution, health-based routing, and adaptive scaling.

Dependencies: honeybee Connection Pool

// Core types extend honeybee types
type AdvancedPool struct {
    honeybee.Pool
    LoadBalancingStrategy Strategy
    HealthThreshold       float64       // Minimum health score for active connections
    MaxConnectionsPerHost int           // Connection limit per host
    ResourceMonitor       *ResourceMonitor
    Stats                 PoolStatistics
    connectionCache       *lru.Cache    // Cache recently used connections
    mutex                 sync.RWMutex
}

// Load balancing strategies
type Strategy int

const (
    RoundRobin Strategy = iota
    LeastConnections
    ResponseTime
    WeightedRandom
    ConsistentHashing
)

// Resource allocation monitor
type ResourceMonitor struct {
    TotalMemoryLimit   int64
    TotalConnectionLimit int
    MemoryPerConnection int64
    CurrentMemoryUsage  int64
    GoroutineLimit     int
    CurrentGoroutines  int
}

// Pool statistics
type PoolStatistics struct {
    ActiveConnections    int
    TotalMessages        int64
    MessagesPerSecond    float64
    ResponseTimeAverage  time.Duration
    ResponseTimeP95      time.Duration
    ConnectionDistribution map[string]int // Count by host
    FailoverEvents       int
    ResourceUtilization  float64 // 0.0-1.0
    lastCalculated       time.Time
}

// Create new advanced connection pool
func NewAdvancedPool(config Config) *AdvancedPool {
    // Initialize pool with honeybee base pool
    // Set up load balancing strategy
    // Initialize connection cache for connection reuse
    // Set up resource monitor
    // Initialize connection statistics tracking
    // Return initialized pool
}

// Add connection with resource constraints
func (p *AdvancedPool) AddConnection(url string, config honeybee.ConnectionConfig) error {
    // Check if adding connection would exceed resource limits
    // Check if host already has maximum connections
    // Add connection if resources available
    // Update resource utilization
    // Return error if resource constraints prevent addition
}

// Get connection using load balancing strategy
func (p *AdvancedPool) GetConnection(key string) (*honeybee.Connection, error) {
    // Use load balancing strategy to select connection
    // For consistent hashing, use key to determine connection
    // For least connections, find connection with fewest active requests
    // For response time, prefer faster connections
    // Return selected connection or error if none available
}

// Monitor health across pool and rebalance as needed
func (p *AdvancedPool) MonitorHealth() {
    // Periodically check health of all connections
    // Remove unhealthy connections from active pool
    // Rebalance load when connections change state
    // Attempt recovery of unhealthy connections in background
}

// Manage resource allocation dynamically
func (p *AdvancedPool) ManageResources() {
    // Track memory and goroutine usage
    // Adjust connection limits based on current system load
    // Implement graceful degradation when approaching resource limits
    // Scale back non-essential operations under resource pressure
}

// Calculate and update pool statistics
func (p *AdvancedPool) UpdateStatistics() {
    // Calculate messages per second rate
    // Update response time metrics
    // Track connection distribution
    // Calculate resource utilization percentage
    // Identify imbalances or potential problems
}

// Efficient connection lifecycle management
func (p *AdvancedPool) ManageLifecycle() {
    // Implement connection recycling for frequently used endpoints
    // Use connection cache to reduce connection overhead
    // Gracefully phase out connections that need replacement
    // Pre-warm connections for known high-traffic endpoints
}

3. Advanced Subscription Repository

Purpose: Implements sophisticated tracking and restoration of subscriptions with adaptive optimization.

Dependencies: honeybee Subscription Registry, roots-ws Subscription types

// Core types extend honeybee types
type AdvancedSubscriptionRepository struct {
    honeybee.SubscriptionRegistry
    CreationTimes    map[string]time.Time   // When subscriptions were created
    UsageStats       map[string]UsageStatistics
    PriorityMap      map[string]int         // Subscription priorities
    RestorationOrder []string               // Ordered list for restoration
    MaxSubscriptions int                    // Limit on active subscriptions
    bufferPool       *sync.Pool             // For filter processing
}

type UsageStatistics struct {
    MessageCount       int64
    LastActivity       time.Time
    ActivityFrequency  float64      // Messages per minute
    ResponseTimes      []time.Duration  // Recent response times
    AvgResponseTime    time.Duration
    ImportanceScore    float64      // Calculated from usage patterns
}

// Create new advanced repository
func NewAdvancedRepository(maxSubs int) *AdvancedSubscriptionRepository {
    // Initialize base subscription registry
    // Set up subscription statistics tracking
    // Initialize buffer pool for filter operations
    // Set maximum subscription limit
    // Initialize ordering for intelligent restoration
    // Return initialized repository
}

// Add subscription with priority
func (r *AdvancedSubscriptionRepository) AddWithPriority(id string, filters [][]byte, priority int) {
    // Add to base registry
    // Record creation time
    // Set initial priority
    // Initialize usage statistics
    // Update restoration order based on priority
    // Enforce subscription limits if reached
}

// Update subscription statistics on activity
func (r *AdvancedSubscriptionRepository) RecordActivity(id string, msgCount int, responseTime time.Duration) {
    // Update message count
    // Record last activity timestamp
    // Update activity frequency calculation
    // Add to response time history
    // Recalculate importance score
    // Potentially adjust priority based on usage patterns
}

// Intelligently restore subscriptions after reconnection
func (r *AdvancedSubscriptionRepository) RestoreWithPriority(queue *honeybee.WriteQueue, rateLimit int) []error {
    // Sort subscriptions by priority and importance
    // Restore highest priority/most important first
    // Apply rate limiting to avoid flooding connection
    // Track restoration outcomes
    // Update statistics after restoration
    // Return errors for failed restorations
}

// Optimize subscription set
func (r *AdvancedSubscriptionRepository) OptimizeSubscriptions() []string {
    // Identify inactive or low-value subscriptions
    // Suggest subscriptions to combine or eliminate
    // Return list of subscription IDs to consider removing
}

// Manage subscription lifecycle
func (r *AdvancedSubscriptionRepository) ManageLifecycle() {
    // Periodically review subscription activity
    // Age out unused subscriptions
    // Consolidate similar subscriptions when possible
    // Balance subscription load across connections
}

4. Advanced Reconnection

Purpose: Implements sophisticated reconnection with circuit breaker pattern and predictive strategies.

Dependencies: honeybee Reconnection, roots-ws Error types

// Core types extend honeybee types
type AdvancedReconnectionConfig struct {
    honeybee.ReconnectionConfig
    Jitter              float64         // Random jitter factor (0.0-1.0)
    CircuitBreakerConfig CircuitBreakerConfig
    FailureCategories   map[string]FailureStrategy  // Different strategies for error types
    ProbeTimeout        time.Duration   // Timeout for probe attempts in half-open state
    AdaptiveStrategy    bool            // Whether to use adaptive backoff based on history
}

type CircuitBreakerConfig struct {
    Enabled             bool
    FailureThreshold    int             // Failures before opening circuit
    ResetTimeout        time.Duration   // Time before attempting reset
    HalfOpenMaxCalls    int             // Max calls in half-open state
}

type CircuitState int

const (
    CircuitClosed CircuitState = iota    // Normal operation
    CircuitOpen                          // Failing, no attempts allowed
    CircuitHalfOpen                      // Testing if recovered
)

type AdvancedReconnectionState struct {
    honeybee.ReconnectionState
    FailureCount        int
    CircuitState        CircuitState
    CircuitOpenUntil    time.Time
    FailureHistory      []FailureRecord  // Recent failures for pattern detection
    SuccessfulConnects  int              // Consecutive successful connections
    ReconnectionTimes   []time.Duration  // Historical reconnection times
}

type FailureRecord struct {
    Time        time.Time
    ErrorType   string
    Message     string
    Duration    time.Duration  // How long the attempt took
}

type FailureStrategy struct {
    MaxAttempts     int
    BaseDelay       time.Duration
    MaxDelay        time.Duration
    BackoffFactor   float64
}

// Create default advanced reconnection config
func DefaultAdvancedConfig() AdvancedReconnectionConfig {
    // Start with honeybee default config
    baseConfig := honeybee.reconnection.DefaultConfig()
    
    return AdvancedReconnectionConfig{
        ReconnectionConfig:  baseConfig,
        Jitter:              0.2,
        CircuitBreakerConfig: CircuitBreakerConfig{
            Enabled:         true,
            FailureThreshold: 10,
            ResetTimeout:    5 * time.Minute,
            HalfOpenMaxCalls: 3,
        },
        FailureCategories: map[string]FailureStrategy{
            // Different strategies for different error types
            "network":     {MaxAttempts: 5, BaseDelay: 1 * time.Second, MaxDelay: 30 * time.Second, BackoffFactor: 2.0},
            "dns":         {MaxAttempts: 3, BaseDelay: 5 * time.Second, MaxDelay: 60 * time.Second, BackoffFactor: 2.0},
            "server":      {MaxAttempts: 10, BaseDelay: 500 * time.Millisecond, MaxDelay: 10 * time.Second, BackoffFactor: 1.5},
            "auth":        {MaxAttempts: 2, BaseDelay: 10 * time.Second, MaxDelay: 60 * time.Second, BackoffFactor: 2.0},
        },
        ProbeTimeout:       10 * time.Second,
        AdaptiveStrategy:   true,
    }
}

// Create new advanced reconnection state
func NewAdvancedState() *AdvancedReconnectionState {
    // Initialize with base reconnection state
    baseState := honeybee.reconnection.NewState()
    
    return &AdvancedReconnectionState{
        ReconnectionState: *baseState,
        FailureCount:      0,
        CircuitState:      CircuitClosed,
        CircuitOpenUntil:  time.Time{},
        FailureHistory:    make([]FailureRecord, 0, 10),
        SuccessfulConnects: 0,
        ReconnectionTimes: make([]time.Duration, 0, 10),
    }
}

// Calculate next backoff delay with jitter
func CalculateDelayWithJitter(state *AdvancedReconnectionState, config AdvancedReconnectionConfig, errorType string) time.Duration {
    // Get failure strategy for this error type
    strategy, exists := config.FailureCategories[errorType]
    if !exists {
        strategy = config.FailureCategories["network"] // Default
    }
    
    // Calculate base delay based on strategy
    attempt := state.FailureCount
    baseDelay := strategy.BaseDelay * time.Duration(math.Pow(float64(strategy.BackoffFactor), float64(attempt)))
    
    // Cap at max delay
    if baseDelay > strategy.MaxDelay {
        baseDelay = strategy.MaxDelay
    }
    
    // Apply adaptive strategy if enabled
    if config.AdaptiveStrategy {
        // Analyze failure patterns to adjust delay
        // If failures occur at regular intervals, avoid those times
        // If time-of-day patterns exist, factor those in
    }
    
    // Apply jitter: delay * (1 ± jitterFactor * random)
    jitterRange := float64(baseDelay) * config.Jitter
    jitterAmount := (rand.Float64() * 2 - 1) * jitterRange // -jitterRange to +jitterRange
    
    finalDelay := time.Duration(float64(baseDelay) + jitterAmount)
    if finalDelay < 0 {
        finalDelay = 0
    }
    
    return finalDelay
}

// Check circuit breaker state
func CheckCircuitState(state *AdvancedReconnectionState, config AdvancedReconnectionConfig) CircuitState {
    // If circuit is closed, return closed
    if state.CircuitState == CircuitClosed {
        return CircuitClosed
    }
    
    // If circuit is open, check if reset timeout has passed
    if state.CircuitState == CircuitOpen {
        if time.Now().After(state.CircuitOpenUntil) {
            // Transition to half-open
            state.CircuitState = CircuitHalfOpen
            return CircuitHalfOpen
        }
        return CircuitOpen
    }
    
    // If half-open, allow limited testing
    return CircuitHalfOpen
}

// Start reconnection with circuit breaker
func StartReconnectionWithCircuitBreaker(conn *honeybee.Connection, state *AdvancedReconnectionState, config AdvancedReconnectionConfig, errorType string) error {
    // Check circuit breaker state
    circuitState := CheckCircuitState(state, config)
    
    // If circuit is open, return circuit breaker error
    if circuitState == CircuitOpen {
        return errors.New("circuit breaker open")
    }
    
    // For half-open state, only allow limited probe attempts
    if circuitState == CircuitHalfOpen {
        // Enforce timeout for probe attempts
        ctx, cancel := context.WithTimeout(context.Background(), config.ProbeTimeout)
        defer cancel()
        
        // Attempt connection with timeout
        // On success, close circuit
        // On failure, reopen circuit
    }
    
    // Increment failure count for closed circuit
    state.FailureCount++
    
    // Record failure details for pattern analysis
    state.FailureHistory = append(state.FailureHistory, FailureRecord{
        Time:      time.Now(),
        ErrorType: errorType,
        Message:   "Connection failed",
    })
    
    // Check if failure count exceeds circuit breaker threshold
    if state.FailureCount >= config.CircuitBreakerConfig.FailureThreshold {
        // Open circuit
        state.CircuitState = CircuitOpen
        state.CircuitOpenUntil = time.Now().Add(config.CircuitBreakerConfig.ResetTimeout)
        return errors.New("circuit breaker opened after repeated failures")
    }
    
    // Calculate delay with jitter
    delay := CalculateDelayWithJitter(state, config, errorType)
    
    // Apply backoff delay
    time.Sleep(delay)
    
    // Use honeybee reconnection logic for actual reconnection attempt
    return nil
}

// Reset circuit breaker after successful connection
func ResetCircuitBreaker(state *AdvancedReconnectionState) {
    // Reset failure count to zero
    state.FailureCount = 0
    
    // Close circuit if open or half-open
    state.CircuitState = CircuitClosed
    
    // Reset circuit open until time
    state.CircuitOpenUntil = time.Time{}
    
    // Increment successful connects counter
    state.SuccessfulConnects++
    
    // Record success for pattern analysis
}

// Analyze failure patterns for optimized reconnection
func AnalyzeFailurePatterns(state *AdvancedReconnectionState) map[string]interface{} {
    // Analyze timing patterns in failures
    // Detect time-of-day patterns
    // Identify error type frequencies
    // Calculate success/failure ratio
    // Return insights that can be used to optimize reconnection strategy
    return nil // Placeholder
}

5. Advanced Connection Health

Purpose: Implements sophisticated health monitoring with predictive failure detection.

Dependencies: honeybee Health Monitor, roots-ws Error types

// Core types extend honeybee types
type AdvancedHealthConfig struct {
    honeybee.HealthConfig
    FailureThreshold     int            // Failures before connection marked unhealthy
    DegradationThreshold int            // Threshold for degraded status
    LatencyThreshold     time.Duration  // Latency indicating performance issues
    SamplingInterval     time.Duration  // How often to collect metrics
    TrendWindowSize      int            // Number of samples for trend analysis
    PredictionEnabled    bool           // Whether to use predictive modeling
}

type HealthStatus int

const (
    StatusHealthy HealthStatus = iota
    StatusDegraded            // Working but showing signs of issues
    StatusUnhealthy           // Not working reliably
    StatusFailing             // Consistently failing
)

type AdvancedHealthState struct {
    honeybee.HealthMonitor
    Status               HealthStatus
    PingsSent            int
    PongsReceived        int
    MissedPongs          int
    PongLatencies        []time.Duration    // Recent latency measurements
    LatencyPercentiles   map[string]time.Duration  // p50, p90, p99
    LatencyTrend         float64            // Direction and magnitude of trend
    ErrorRates           map[string]float64 // Error rates by category
    PredictiveScore      float64            // 0.0-1.0, likelihood of failure
    HealthHistory        []HealthRecord     // Time series of health measurements
    LastUpdated          time.Time
}

type HealthRecord struct {
    Timestamp      time.Time
    Status         HealthStatus
    Latency        time.Duration
    ErrorRate      float64
    ResourceUsage  float64    // CPU/memory used by connection
}

// Create default advanced health config
func DefaultAdvancedConfig() AdvancedHealthConfig {
    // Start with honeybee default config
    baseConfig := honeybee.health.DefaultConfig()
    
    return AdvancedHealthConfig{
        HealthConfig:       baseConfig,
        FailureThreshold:   3,
        DegradationThreshold: 2,
        LatencyThreshold:   500 * time.Millisecond,
        SamplingInterval:   10 * time.Second,
        TrendWindowSize:    10,
        PredictionEnabled:  true,
    }
}

// Create new advanced health state
func NewAdvancedState() *AdvancedHealthState {
    // Initialize with base health state
    baseMonitor := honeybee.health.NewMonitor(nil, nil)
    
    return &AdvancedHealthState{
        HealthMonitor:     *baseMonitor,
        Status:            StatusHealthy,
        PingsSent:         0,
        PongsReceived:     0,
        MissedPongs:       0,
        PongLatencies:     make([]time.Duration, 0, 10),
        LatencyPercentiles: make(map[string]time.Duration),
        LatencyTrend:      0.0,
        ErrorRates:        make(map[string]float64),
        PredictiveScore:   0.0,
        HealthHistory:     make([]HealthRecord, 0, 100),
        LastUpdated:       time.Now(),
    }
}

// Start advanced health monitoring
func StartAdvancedMonitoring(conn *websocket.Conn, state *AdvancedHealthState, config AdvancedHealthConfig) {
    // Use honeybee health monitoring as base
    honeybee.health.StartMonitoring(conn)
    
    // Add handlers for tracking additional metrics
    // Set up latency tracking
    // Initialize health score calculation
    // Start trend analysis
    // Set up periodic health assessment
}

// Handle pong with latency tracking
func HandlePongWithLatency(state *AdvancedHealthState, sentTime time.Time) *AdvancedHealthState {
    // Calculate latency between sent time and now
    latency := time.Since(sentTime)
    
    // Update basic pong handling using honeybee
    state.PongsReceived++
    
    // Add latency to tracking array (limited size)
    state.PongLatencies = append(state.PongLatencies, latency)
    if len(state.PongLatencies) > 20 {
        // Remove oldest entry
        state.PongLatencies = state.PongLatencies[1:]
    }
    
    // Update health history
    state.HealthHistory = append(state.HealthHistory, HealthRecord{
        Timestamp: time.Now(),
        Latency:   latency,
        Status:    state.Status,
    })
    
    // Recalculate latency percentiles
    calculateLatencyPercentiles(state)
    
    // Update latency trend
    calculateLatencyTrend(state)
    
    // Update health score based on latency
    calculateHealthScore(state, config)
    
    // Reset missed pongs counter
    state.MissedPongs = 0
    
    return state
}

// Handle missed pong with health impact
func HandleMissedPong(state *AdvancedHealthState, config AdvancedHealthConfig) *AdvancedHealthState {
    // Increment missed pongs counter
    state.MissedPongs++
    
    // Update health score based on missed pong
    calculateHealthScore(state, config)
    
    // Update health history
    state.HealthHistory = append(state.HealthHistory, HealthRecord{
        Timestamp: time.Now(),
        Status:    state.Status,
        ErrorRate: float64(state.MissedPongs) / float64(state.PingsSent),
    })
    
    // Check against degradation threshold
    if state.MissedPongs >= config.DegradationThreshold && state.Status == StatusHealthy {
        state.Status = StatusDegraded
    }
    
    // Check against failure threshold
    if state.MissedPongs >= config.FailureThreshold {
        state.Status = StatusUnhealthy
    }
    
    return state
}

// Calculate latency percentiles
func calculateLatencyPercentiles(state *AdvancedHealthState) {
    // Need at least a few measurements
    if len(state.PongLatencies) < 5 {
        return
    }
    
    // Sort latencies for percentile calculation
    sortedLatencies := make([]time.Duration, len(state.PongLatencies))
    copy(sortedLatencies, state.PongLatencies)
    sort.Slice(sortedLatencies, func(i, j int) bool {
        return sortedLatencies[i] < sortedLatencies[j]
    })
    
    // Calculate p50 (median)
    p50Index := len(sortedLatencies) / 2
    state.LatencyPercentiles["p50"] = sortedLatencies[p50Index]
    
    // Calculate p90
    p90Index := int(float64(len(sortedLatencies)) * 0.9)
    if p90Index >= len(sortedLatencies) {
        p90Index = len(sortedLatencies) - 1
    }
    state.LatencyPercentiles["p90"] = sortedLatencies[p90Index]
    
    // Calculate p99
    p99Index := int(float64(len(sortedLatencies)) * 0.99)
    if p99Index >= len(sortedLatencies) {
        p99Index = len(sortedLatencies) - 1
    }
    state.LatencyPercentiles["p99"] = sortedLatencies[p99Index]
}

// Calculate latency trend
func calculateLatencyTrend(state *AdvancedHealthState) {
    // Need enough history for trend
    if len(state.PongLatencies) < 5 {
        state.LatencyTrend = 0
        return
    }
    
    // Simple linear regression on recent latencies
    n := float64(len(state.PongLatencies))
    sumX := 0.0
    sumY := 0.0
    sumXY := 0.0
    sumXX := 0.0
    
    for i, latency := range state.PongLatencies {
        x := float64(i)
        y := float64(latency.Milliseconds())
        
        sumX += x
        sumY += y
        sumXY += x * y
        sumXX += x * x
    }
    
    // Calculate slope of trendline
    slope := (n*sumXY - sumX*sumY) / (n*sumXX - sumX*sumX)
    
    // Normalize to -1.0 to 1.0 range for consistent reporting
    // Positive means increasing latency (degradation)
    // Negative means decreasing latency (improvement)
    avgLatency := sumY / n
    if avgLatency != 0 {
        state.LatencyTrend = slope / avgLatency * 10 // Scale factor
    } else {
        state.LatencyTrend = 0
    }
    
    // Clamp to range
    if state.LatencyTrend > 1.0 {
        state.LatencyTrend = 1.0
    } else if state.LatencyTrend < -1.0 {
        state.LatencyTrend = -1.0
    }
}

// Calculate health score based on all metrics
func calculateHealthScore(state *AdvancedHealthState, config AdvancedHealthConfig) {
    // Start with perfect score (1.0)
    score := 1.0
    
    // Reduce score based on missed pongs
    if state.PingsSent > 0 {
        missRate := float64(state.MissedPongs) / float64(state.PingsSent)
        score -= missRate * 0.5 // Up to 50% reduction for all pongs missed
    }
    
    // Reduce score based on latency relative to threshold
    if len(state.PongLatencies) > 0 && config.LatencyThreshold > 0 {
        avgLatency := average(state.PongLatencies)
        if avgLatency > config.LatencyThreshold {
            // Reduce score proportionally to how much threshold is exceeded
            latencyFactor := float64(avgLatency) / float64(config.LatencyThreshold)
            score -= math.Min(0.3, (latencyFactor-1.0)*0.1) // Up to 30% reduction
        }
    }
    
    // Reduce score based on negative trend
    if state.LatencyTrend > 0 {
        score -= state.LatencyTrend * 0.2 // Up to 20% reduction for worsening trend
    }
    
    // Calculate predictive score based on patterns
    if config.PredictionEnabled {
        state.PredictiveScore = predictFailure(state)
        // Incorporate predictive score into health score
        score -= state.PredictiveScore * 0.2 // Up to 20% reduction for likely failure
    }
    
    // Update status based on score
    if score < 0.5 {
        state.Status = StatusUnhealthy
    } else if score < 0.8 {
        state.Status = StatusDegraded
    } else {
        state.Status = StatusHealthy
    }
}

// Predict failure likelihood based on historical patterns
func predictFailure(state *AdvancedHealthState) float64 {
    // Need enough history for prediction
    if len(state.HealthHistory) < 10 {
        return 0.0
    }
    
    // Detect patterns that historically preceded failures
    // Look for increasing latency followed by errors
    // Identify cyclic patterns in performance
    // Check for correlation between resource usage and failures
    
    // This would use more sophisticated algorithms in production
    // For now, just base on trend and recent errors
    
    // Simplified predictor: combine trend with recent error rate
    recentErrorRate := 0.0
    if state.PingsSent > 0 {
        recentErrorRate = float64(state.MissedPongs) / float64(state.PingsSent)
    }
    
    // Weight trend more heavily if it's negative (worsening)
    trendContribution := math.Max(0, state.LatencyTrend) * 0.6
    
    return trendContribution + recentErrorRate*0.4 // Combined score
}

// Calculate average of duration slice
func average(durations []time.Duration) time.Duration {
    if len(durations) == 0 {
        return 0
    }
    
    var sum time.Duration
    for _, d := range durations {
        sum += d
    }
    
    return sum / time.Duration(len(durations))
}

// Check if connection is degraded but still working
func IsDegraded(state *AdvancedHealthState) bool {
    return state.Status == StatusDegraded
}

// Check connection health against threshold
func IsHealthyWithThreshold(state *AdvancedHealthState, threshold float64) bool {
    // Calculate current health score
    score := 1.0 - (float64(state.MissedPongs) / float64(max(1, state.PingsSent)))
    
    // Account for latency
    if len(state.PongLatencies) > 0 {
        avgLatency := average(state.PongLatencies)
        if avgLatency > 300*time.Millisecond {
            // Reduce score for high latency
            score -= 0.2
        }
    }
    
    return score >= threshold
}

6. In-Flight Operations

Purpose: Tracks and manages ongoing operations with cancellation support.

Dependencies: roots-ws Error types

// Core types
type OperationType string

const (
    OperationSubscription OperationType = "subscription"
    OperationPublication  OperationType = "publication"
    OperationConnection   OperationType = "connection"
    OperationQuery        OperationType = "query"
)

type OperationStatus int

const (
    StatusPending OperationStatus = iota
    StatusInProgress
    StatusCompleted
    StatusFailed
    StatusCancelled
    StatusTimedOut
)

type Operation struct {
    ID          string
    Type        OperationType
    StartTime   time.Time
    EndTime     time.Time
    Status      OperationStatus
    Cancelled   bool
    Done        chan struct{}
    Result      interface{}
    Error       error
    Priority    int            // Priority level for resource allocation
    Metadata    map[string]interface{}
    mutex       sync.Mutex
}

type OperationRegistry struct {
    Operations  map[string]*Operation
    Mutex       sync.RWMutex
    Stats       OperationStats
}

type OperationStats struct {
    TotalOperations      int64
    CompletedOperations  int64
    FailedOperations     int64
    CancelledOperations  int64
    TimedOutOperations   int64
    AvgCompletionTime    time.Duration
    OperationsByType     map[OperationType]int64
}

// Create new operation registry
func NewRegistry() *OperationRegistry {
    return &OperationRegistry{
        Operations: make(map[string]*Operation),
        Mutex:      sync.RWMutex{},
        Stats: OperationStats{
            OperationsByType: make(map[OperationType]int64),
        },
    }
}

// Generate unique operation ID
func generateOperationID() string {
    // Generate UUID or other unique identifier
    return fmt.Sprintf("op-%d-%s", time.Now().UnixNano(), generateRandomString(6))
}

// Create and register new operation
func RegisterOperation(registry *OperationRegistry, opType OperationType, priority int) *Operation {
    // Generate unique operation ID
    opID := generateOperationID()
    
    // Create operation object
    op := &Operation{
        ID:        opID,
        Type:      opType,
        StartTime: time.Now(),
        Status:    StatusPending,
        Done:      make(chan struct{}),
        Priority:  priority,
        Metadata:  make(map[string]interface{}),
    }
    
    // Acquire write lock
    registry.Mutex.Lock()
    
    // Add operation to registry
    registry.Operations[opID] = op
    
    // Update stats
    registry.Stats.TotalOperations++
    registry.Stats.OperationsByType[opType]++
    
    // Release lock
    registry.Mutex.Unlock()
    
    return op
}

// Mark operation as in progress
func StartOperation(registry *OperationRegistry, opID string) error {
    // Acquire read lock to get operation
    registry.Mutex.RLock()
    op, exists := registry.Operations[opID]
    registry.Mutex.RUnlock()
    
    if !exists {
        return errors.New("operation not found")
    }
    
    // Update operation status
    op.mutex.Lock()
    defer op.mutex.Unlock()
    
    if op.Status != StatusPending {
        return errors.New("operation not in pending state")
    }
    
    op.Status = StatusInProgress
    return nil
}

// Complete operation
func CompleteOperation(registry *OperationRegistry, opID string, result interface{}, err error) error {
    // Acquire read lock to get operation
    registry.Mutex.RLock()
    op, exists := registry.Operations[opID]
    registry.Mutex.RUnlock()
    
    if !exists {
        return errors.New("operation not found")
    }
    
    // Update operation
    op.mutex.Lock()
    
    // Set result and error
    op.Result = result
    op.Error = err
    op.EndTime = time.Now()
    
    // Update status based on error
    if err != nil {
        op.Status = StatusFailed
    } else {
        op.Status = StatusCompleted
    }
    
    // Signal completion via done channel
    close(op.Done)
    
    op.mutex.Unlock()
    
    // Update registry stats with write lock
    registry.Mutex.Lock()
    defer registry.Mutex.Unlock()
    
    // Update stats based on outcome
    if err != nil {
        registry.Stats.FailedOperations++
    } else {
        registry.Stats.CompletedOperations++
        
        // Update average completion time
        completionTime := op.EndTime.Sub(op.StartTime)
        
        if registry.Stats.CompletedOperations > 1 {
            // Calculate new average
            oldWeight := float64(registry.Stats.CompletedOperations-1) / float64(registry.Stats.CompletedOperations)
            newWeight := 1.0 / float64(registry.Stats.CompletedOperations)
            
            oldAvg := float64(registry.Stats.AvgCompletionTime.Nanoseconds())
            newTime := float64(completionTime.Nanoseconds())
            
            newAvg := oldAvg*oldWeight + newTime*newWeight
            registry.Stats.AvgCompletionTime = time.Duration(int64(newAvg))
        } else {
            // First completion, just set the value
            registry.Stats.AvgCompletionTime = completionTime
        }
    }
    
    return nil
}

// Cancel operation
func CancelOperation(registry *OperationRegistry, opID string) error {
    // Acquire read lock to get operation
    registry.Mutex.RLock()
    op, exists := registry.Operations[opID]
    registry.Mutex.RUnlock()
    
    if !exists {
        return errors.New("operation not found")
    }
    
    // Update operation
    op.mutex.Lock()
    
    // Check if already completed or cancelled
    if op.Status == StatusCompleted || op.Status == StatusFailed || op.Status == StatusCancelled {
        op.mutex.Unlock()
        return errors.New("operation already completed or cancelled")
    }
    
    // Mark as cancelled
    op.Cancelled = true
    op.Status = StatusCancelled
    op.EndTime = time.Now()
    op.Error = errors.New("operation cancelled")
    
    // Signal cancellation via done channel
    close(op.Done)
    
    op.mutex.Unlock()
    
    // Update registry stats
    registry.Mutex.Lock()
    registry.Stats.CancelledOperations++
    registry.Mutex.Unlock()
    
    return nil
}

// Wait for operation completion with timeout
func WaitForCompletion(op *Operation, timeout time.Duration) (interface{}, error) {
    // Set up timer for timeout
    timer := time.NewTimer(timeout)
    defer timer.Stop()
    
    // Wait for completion or timeout
    select {
    case <-op.Done:
        // Operation completed or cancelled
        op.mutex.Lock()
        defer op.mutex.Unlock()
        
        if op.Cancelled {
            return nil, errors.New("operation cancelled")
        }
        
        return op.Result, op.Error
        
    case <-timer.C:
        // Timeout occurred
        op.mutex.Lock()
        if op.Status == StatusPending || op.Status == StatusInProgress {
            op.Status = StatusTimedOut
        }
        op.mutex.Unlock()
        
        return nil, errors.New("operation timeout")
    }
}

// Cancel all operations of specific type
func CancelOperationsByType(registry *OperationRegistry, opType OperationType) int {
    // Acquire read lock to get operations
    registry.Mutex.RLock()
    
    // Find all operations of specified type
    opsToCancel := make([]string, 0)
    for id, op := range registry.Operations {
        if op.Type == opType {
            opsToCancel = append(opsToCancel, id)
        }
    }
    
    registry.Mutex.RUnlock()
    
    // Cancel each operation
    cancelCount := 0
    for _, id := range opsToCancel {
        err := CancelOperation(registry, id)
        if err == nil {
            cancelCount++
        }
    }
    
    return cancelCount
}

// Cancel all operations
func CancelAllOperations(registry *OperationRegistry) int {
    // Acquire read lock to get all operation IDs
    registry.Mutex.RLock()
    
    opsToCancel := make([]string, 0, len(registry.Operations))
    for id := range registry.Operations {
        opsToCancel = append(opsToCancel, id)
    }
    
    registry.Mutex.RUnlock()
    
    // Cancel each operation
    cancelCount := 0
    for _, id := range opsToCancel {
        err := CancelOperation(registry, id)
        if err == nil {
            cancelCount++
        }
    }
    
    return cancelCount
}

// Get operation statistics
func GetOperationStats(registry *OperationRegistry) OperationStats {
    registry.Mutex.RLock()
    defer registry.Mutex.RUnlock()
    
    // Return copy of stats to prevent race conditions
    statsCopy := registry.Stats
    
    // Copy map to prevent race conditions
    statsCopy.OperationsByType = make(map[OperationType]int64)
    for k, v := range registry.Stats.OperationsByType {
        statsCopy.OperationsByType[k] = v
    }
    
    return statsCopy
}

// Clean up completed operations
func CleanupCompletedOperations(registry *OperationRegistry, maxAge time.Duration) int {
    now := time.Now()
    cutoffTime := now.Add(-maxAge)
    
    registry.Mutex.Lock()
    defer registry.Mutex.Unlock()
    
    removeCount := 0
    for id, op := range registry.Operations {
        if op.EndTime.Before(cutoffTime) && 
           (op.Status == StatusCompleted || op.Status == StatusFailed || 
            op.Status == StatusCancelled || op.Status == StatusTimedOut) {
            
            delete(registry.Operations, id)
            removeCount++
        }
    }
    
    return removeCount
}

7. Compression

Purpose: Compress WebSocket messages to reduce bandwidth with memory optimization.

Dependencies: roots-ws Error types

// Core types
type CompressionConfig struct {
    Enabled             bool
    Level               int    // 0-9, higher means more compression
    ContextTakeover     bool   // Whether to reuse compression context
    Threshold           int    // Messages smaller than this won't be compressed
    MemoryLimit         int    // Max memory to use for compression
    CacheTimeout        time.Duration // How long to cache prepared messages
}

type CompressionStats struct {
    BytesBeforeCompression int64
    BytesAfterCompression  int64
    MessagesCached         int64
    CacheHits              int64
    CacheMisses            int64
    CompressionTime        time.Duration
    CompressionRatio       float64  // Calculated ratio
}

type MessageCache struct {
    items       map[uint64]*CachedMessage
    mutex       sync.RWMutex
    stats       CompressionStats
    bufferPool  *sync.Pool
}

type CachedMessage struct {
    Prepared    *websocket.PreparedMessage
    Hash        uint64
    Size        int
    LastAccess  time.Time
    Expiration  time.Time
}

// Create default compression config
func DefaultConfig() CompressionConfig {
    return CompressionConfig{
        Enabled:        true,
        Level:          6,          // Medium compression level
        ContextTakeover: false,     // Disable context takeover to reduce memory
        Threshold:      256,        // Don't compress small messages
        MemoryLimit:    64 * 1024 * 1024,  // 64MB memory limit
        CacheTimeout:   5 * time.Second,   // Cache for 5 seconds
    }
}

// Create new message cache for compression efficiency
func NewMessageCache(config CompressionConfig) *MessageCache {
    cache := &MessageCache{
        items:      make(map[uint64]*CachedMessage),
        bufferPool: &sync.Pool{
            New: func() interface{} {
                // Create buffer with reasonable initial size
                return bytes.NewBuffer(make([]byte, 0, 4096))
            },
        },
    }
    
    // Start cleanup goroutine
    go cache.cleanup(config.CacheTimeout)
    
    return cache
}

// Generate hash for message content
func hashMessage(data []byte) uint64 {
    h := fnv.New64a()
    h.Write(data)
    return h.Sum64()
}

// Set up compression for WebSocket
func ConfigureConnection(conn *websocket.Conn, config CompressionConfig) error {
    // Skip if compression not enabled
    if !config.Enabled {
        return nil
    }
    
    // Configure WebSocket compression parameters
    parameters := websocket.CompressionOptions{
        Level:            config.Level,
        CompressionLevel: config.Level,  // Same value used for both options
        ClientNoContextTakeover: !config.ContextTakeover,
        ServerNoContextTakeover: !config.ContextTakeover,
    }
    
    // Apply compression configuration
    return conn.EnableWriteCompression(parameters)
}

// Get or create prepared message with memory optimizations
func GetOrCreatePrepared(cache *MessageCache, messageType int, data []byte, config CompressionConfig) (*websocket.PreparedMessage, error) {
    // Don't compress small messages
    if len(data) < config.Threshold || !config.Enabled {
        // Still create PreparedMessage for consistent API
        return websocket.NewPreparedMessage(messageType, data)
    }
    
    // Generate hash of message content
    hash := hashMessage(data)
    
    // Check cache first
    cache.mutex.RLock()
    if cachedMsg, exists := cache.items[hash]; exists {
        // Update last access time
        cachedMsg.LastAccess = time.Now()
        cache.mutex.RUnlock()
        
        // Update stats
        atomic.AddInt64(&cache.stats.CacheHits, 1)
        
        return cachedMsg.Prepared, nil
    }
    cache.mutex.RUnlock()
    
    // Cache miss, need to compress
    atomic.AddInt64(&cache.stats.CacheMisses, 1)
    
    // Track compression time
    startTime := time.Now()
    
    // Get buffer from pool
    buf := cache.bufferPool.Get().(*bytes.Buffer)
    buf.Reset()
    defer cache.bufferPool.Put(buf)
    
    // Create flate writer with memory limit awareness
    flateWriter, err := flate.NewWriter(buf, config.Level)
    if err != nil {
        return nil, err
    }
    
    // Write data to compressor
    bytesWritten, err := flateWriter.Write(data)
    if err != nil {
        return nil, err
    }
    
    // Close flate writer to flush data
    err = flateWriter.Close()
    if err != nil {
        return nil, err
    }
    
    // Track compression stats
    atomic.AddInt64(&cache.stats.BytesBeforeCompression, int64(len(data)))
    atomic.AddInt64(&cache.stats.BytesAfterCompression, int64(buf.Len()))
    
    // Check if compression actually helped
    var msgData []byte
    if buf.Len() < len(data) {
        // Use compressed data
        msgData = make([]byte, buf.Len())
        copy(msgData, buf.Bytes())
    } else {
        // Compression didn't help, use original
        msgData = data
    }
    
    // Create prepared message
    prepared, err := websocket.NewPreparedMessage(messageType, msgData)
    if err != nil {
        return nil, err
    }
    
    // Record compression time
    compressionTime := time.Since(startTime)
    atomic.AddInt64((*int64)(&cache.stats.CompressionTime), int64(compressionTime))
    
    // Update cache
    expirationTime := time.Now().Add(config.CacheTimeout)
    cachedMsg := &CachedMessage{
        Prepared:   prepared,
        Hash:       hash,
        Size:       len(data),
        LastAccess: time.Now(),
        Expiration: expirationTime,
    }
    
    // Add to cache
    cache.mutex.Lock()
    cache.items[hash] = cachedMsg
    cache.mutex.Unlock()
    
    // Track cache stats
    atomic.AddInt64(&cache.stats.MessagesCached, 1)
    
    // Schedule automatic cleanup
    time.AfterFunc(config.CacheTimeout, func() {
        cache.mutex.Lock()
        defer cache.mutex.Unlock()
        
        // Only remove if still the same item (hash collision possible)
        if existing, exists := cache.items[hash]; exists && existing == cachedMsg {
            delete(cache.items, hash)
        }
    })
    
    // Update compression ratio
    totalBefore := atomic.LoadInt64(&cache.stats.BytesBeforeCompression)
    totalAfter := atomic.LoadInt64(&cache.stats.BytesAfterCompression)
    if totalBefore > 0 {
        ratio := float64(totalAfter) / float64(totalBefore)
        atomic.StoreUint64((*uint64)(&cache.stats.CompressionRatio), math.Float64bits(ratio))
    }
    
    return prepared, nil
}

// Cleanup expired cache entries
func (cache *MessageCache) cleanup(interval time.Duration) {
    ticker := time.NewTicker(interval / 2)
    defer ticker.Stop()
    
    for range ticker.C {
        now := time.Now()
        
        cache.mutex.Lock()
        for hash, msg := range cache.items {
            if now.After(msg.Expiration) {
                delete(cache.items, hash)
            }
        }
        cache.mutex.Unlock()
    }
}

// Get compression statistics
func (cache *MessageCache) GetStats() CompressionStats {
    return cache.stats
}

// Reset cache
func (cache *MessageCache) Reset() {
    cache.mutex.Lock()
    defer cache.mutex.Unlock()
    
    // Clear all items
    cache.items = make(map[uint64]*CachedMessage)
    
    // Reset stats
    cache.stats = CompressionStats{}
}

8. Rate Limiting

Purpose: Prevent flooding with token bucket algorithm and adaptive limits.

Dependencies: roots-ws Error types

// Core types
type RateLimitConfig struct {
    Enabled            bool
    MessagesPerSecond  float64   // Default rate limit
    BurstSize          int       // Maximum burst allowed
    AdaptiveMode       bool      // Whether to adjust limits dynamically
    MinRate            float64   // Minimum rate allowed
    MaxRate            float64   // Maximum rate allowed
    CategoryLimits     map[string]CategoryLimit  // Different limits for message types
}

type CategoryLimit struct {
    MessagesPerSecond  float64
    BurstSize          int
    Priority           int       // Priority level (higher = more important)
}

type RateLimiter struct {
    config             RateLimitConfig
    defaultBucket      *TokenBucket
    categoryBuckets    map[string]*TokenBucket
    lastAdaptation     time.Time
    stats              RateLimitStats
    mutex              sync.RWMutex
}

type TokenBucket struct {
    Rate               float64   // Tokens per second
    MaxTokens          int       // Maximum capacity
    AvailableTokens    float64   // Current token count
    LastRefill         time.Time // Last time tokens were added
    mutex              sync.Mutex
}

type RateLimitStats struct {
    AllowedMessages    int64
    DelayedMessages    int64
    RejectedMessages   int64
    CurrentRate        float64
    AdaptationEvents   int64
    ThrottleEvents     map[string]int64
}

// Create default rate limit config
func DefaultConfig() RateLimitConfig {
    return RateLimitConfig{
        Enabled:           true,
        MessagesPerSecond: 10,
        BurstSize:         30,
        AdaptiveMode:      true,
        MinRate:           1.0,
        MaxRate:           100.0,
        CategoryLimits: map[string]CategoryLimit{
            "control": {
                MessagesPerSecond: 50,
                BurstSize:         100,
                Priority:          10,
            },
            "event": {
                MessagesPerSecond: 5,
                BurstSize:         20,
                Priority:          5,
            },
            "subscription": {
                MessagesPerSecond: 2,
                BurstSize:         10,
                Priority:          3,
            },
        },
    }
}

// Create new rate limiter
func NewLimiter(config RateLimitConfig) *RateLimiter {
    // Create default token bucket
    defaultBucket := &TokenBucket{
        Rate:            config.MessagesPerSecond,
        MaxTokens:       config.BurstSize,
        AvailableTokens: float64(config.BurstSize), // Start with full bucket
        LastRefill:      time.Now(),
    }
    
    // Create category buckets
    categoryBuckets := make(map[string]*TokenBucket)
    for category, limit := range config.CategoryLimits {
        categoryBuckets[category] = &TokenBucket{
            Rate:            limit.MessagesPerSecond,
            MaxTokens:       limit.BurstSize,
            AvailableTokens: float64(limit.BurstSize), // Start with full bucket
            LastRefill:      time.Now(),
        }
    }
    
    return &RateLimiter{
        config:          config,
        defaultBucket:   defaultBucket,
        categoryBuckets: categoryBuckets,
        lastAdaptation:  time.Now(),
        stats: RateLimitStats{
            ThrottleEvents: make(map[string]int64),
        },
    }
}

// Check if operation is allowed by rate limit
func (limiter *RateLimiter) Allow(category string) bool {
    // Return true if rate limiting disabled
    if !limiter.config.Enabled {
        return true
    }
    
    // Get appropriate bucket
    bucket := limiter.getBucket(category)
    
    // Refill tokens and check availability
    bucket.mutex.Lock()
    defer bucket.mutex.Unlock()
    
    // Refill tokens based on time elapsed
    limiter.refillTokens(bucket)
    
    // Check if bucket has at least one token
    if bucket.AvailableTokens >= 1.0 {
        // Consume token
        bucket.AvailableTokens -= 1.0
        
        // Update stats
        atomic.AddInt64(&limiter.stats.AllowedMessages, 1)
        
        return true
    }
    
    // No tokens available
    atomic.AddInt64(&limiter.stats.RejectedMessages, 1)
    
    // Track category-specific throttling
    limiter.mutex.Lock()
    limiter.stats.ThrottleEvents[category]++
    limiter.mutex.Unlock()
    
    return false
}

// Wait until operation is allowed
func (limiter *RateLimiter) Wait(ctx context.Context, category string) error {
    // If rate limiting disabled, return immediately
    if !limiter.config.Enabled {
        return nil
    }
    
    // Get appropriate bucket
    bucket := limiter.getBucket(category)
    
    for {
        // Check if context cancelled
        if ctx.Err() != nil {
            return ctx.Err()
        }
        
        // Try to allow operation
        bucket.mutex.Lock()
        
        // Refill tokens
        limiter.refillTokens(bucket)
        
        if bucket.AvailableTokens >= 1.0 {
            // Consume token
            bucket.AvailableTokens -= 1.0
            
            // Update stats
            atomic.AddInt64(&limiter.stats.AllowedMessages, 1)
            
            bucket.mutex.Unlock()
            return nil
        }
        
        // Calculate time until next token available
        timeToNextToken := time.Duration(1.0/bucket.Rate * float64(time.Second))
        
        bucket.mutex.Unlock()
        
        // Update delayed stats
        atomic.AddInt64(&limiter.stats.DelayedMessages, 1)
        
        // Wait for token to become available
        select {
        case <-ctx.Done():
            return ctx.Err()
        case <-time.After(timeToNextToken):
            // Try again
        }
    }
}

// Refill tokens based on elapsed time
func (limiter *RateLimiter) refillTokens(bucket *TokenBucket) {
    now := time.Now()
    elapsed := now.Sub(bucket.LastRefill).Seconds()
    
    // Calculate tokens to add
    tokensToAdd := elapsed * bucket.Rate
    
    // Add tokens to bucket (up to max)
    bucket.AvailableTokens = math.Min(float64(bucket.MaxTokens), bucket.AvailableTokens+tokensToAdd)
    
    // Update last refill time
    bucket.LastRefill = now
}

// Get appropriate token bucket for category
func (limiter *RateLimiter) getBucket(category string) *TokenBucket {
    limiter.mutex.RLock()
    defer limiter.mutex.RUnlock()
    
    if bucket, exists := limiter.categoryBuckets[category]; exists {
        return bucket
    }
    
    return limiter.defaultBucket
}

// Adapt rate limits based on system conditions
func (limiter *RateLimiter) AdaptRateLimits(queueUtilization float64) {
    // Skip if adaptive mode disabled
    if !limiter.config.Enabled || !limiter.config.AdaptiveMode {
        return
    }
    
    // Minimum time between adaptations
    if time.Since(limiter.lastAdaptation) < 5*time.Second {
        return
    }
    
    limiter.mutex.Lock()
    defer limiter.mutex.Unlock()
    
    // Current rate
    currentRate := limiter.defaultBucket.Rate
    newRate := currentRate
    
    // Adjust based on queue utilization
    if queueUtilization > 0.8 {
        // Queue getting full, reduce rate
        newRate = math.Max(limiter.config.MinRate, currentRate*0.8)
    } else if queueUtilization < 0.2 {
        // Queue mostly empty, increase rate
        newRate = math.Min(limiter.config.MaxRate, currentRate*1.2)
    }
    
    // Only update if significant change
    if math.Abs(newRate-currentRate)/currentRate > 0.05 {
        // Update default bucket rate
        limiter.defaultBucket.mutex.Lock()
        limiter.defaultBucket.Rate = newRate
        limiter.defaultBucket.mutex.Unlock()
        
        // Track adaptation
        atomic.AddInt64(&limiter.stats.AdaptationEvents, 1)
        atomic.StoreUint64((*uint64)(&limiter.stats.CurrentRate), math.Float64bits(newRate))
        
        // Update timestamp
        limiter.lastAdaptation = time.Now()
    }
}

// Get rate limit statistics
func (limiter *RateLimiter) GetStats() RateLimitStats {
    limiter.mutex.RLock()
    defer limiter.mutex.RUnlock()
    
    // Create copy of stats to prevent race conditions
    statsCopy := limiter.stats
    
    // Copy map to prevent race conditions
    statsCopy.ThrottleEvents = make(map[string]int64)
    for k, v := range limiter.stats.ThrottleEvents {
        statsCopy.ThrottleEvents[k] = v
    }
    
    return statsCopy
}

9. Message Replay

Purpose: Buffers messages for replay after reconnection with intelligent prioritization.

Dependencies: roots-ws Error types, honeybee Connection

// Core types
type ReplayConfig struct {
    Enabled       bool
    MaxMessages   int           // Maximum messages to buffer
    MaxAge        time.Duration // Maximum message age to retain
    MaxAttempts   int           // Maximum replay attempts per message
    MaxBatchSize  int           // Maximum messages to replay at once
    BatchDelay    time.Duration // Delay between replay batches
}

type MessageBuffer struct {
    Config        ReplayConfig
    Messages      []BufferedMessage
    Mutex         sync.RWMutex
    Stats         ReplayStats
}

type BufferedMessage struct {
    ID            string
    Content       []byte
    Timestamp     time.Time
    Attempts      int
    Priority      int      // Higher priority = replay first
    MessageType   string   // Used for selective replay
}

type ReplayStats struct {
    MessagesBuffered   int64
    MessagesReplayed   int64
    FailedReplays      int64
    DroppedMessages    int64
    OldestMessage      time.Time
}

// Create default replay config
func DefaultConfig() ReplayConfig {
    return ReplayConfig{
        Enabled:      true,
        MaxMessages:  1000,
        MaxAge:       5 * time.Minute,
        MaxAttempts:  3,
        MaxBatchSize: 50,
        BatchDelay:   100 * time.Millisecond,
    }
}

// Create new message buffer
func NewBuffer(config ReplayConfig) *MessageBuffer {
    return &MessageBuffer{
        Config:   config,
        Messages: make([]BufferedMessage, 0, config.MaxMessages/10), // Start smaller, grow as needed
        Stats:    ReplayStats{},
    }
}

// Add message to buffer with priority
func BufferMessageWithPriority(buffer *MessageBuffer, msg []byte, msgType string, priority int) string {
    // Skip if buffering disabled
    if !buffer.Config.Enabled {
        return ""
    }
    
    // Acquire write lock
    buffer.Mutex.Lock()
    defer buffer.Mutex.Unlock()
    
    // Generate unique ID for message
    id := generateMessageID()
    
    // Create buffered message
    bufferedMsg := BufferedMessage{
        ID:          id,
        Content:     cloneBytes(msg), // Clone bytes to avoid external modification
        Timestamp:   time.Now(),
        Attempts:    0,
        Priority:    priority,
        MessageType: msgType,
    }
    
    // Check if buffer is full
    if len(buffer.Messages) >= buffer.Config.MaxMessages {
        // Find lowest priority, oldest message to remove
        oldestIdx := 0
        lowestPriority := math.MaxInt32
        oldestTime := time.Now()
        
        for i, m := range buffer.Messages {
            // Lower priority or same priority but older
            if m.Priority < lowestPriority || 
               (m.Priority == lowestPriority && m.Timestamp.Before(oldestTime)) {
                lowestPriority = m.Priority
                oldestTime = m.Timestamp
                oldestIdx = i
            }
        }
        
        // Remove lowest priority message
        buffer.Messages = append(buffer.Messages[:oldestIdx], buffer.Messages[oldestIdx+1:]...)
        
        // Update stats
        buffer.Stats.DroppedMessages++
    }
    
    // Add new message to buffer
    buffer.Messages = append(buffer.Messages, bufferedMsg)
    
    // Update stats
    buffer.Stats.MessagesBuffered++
    if buffer.Stats.OldestMessage.IsZero() || bufferedMsg.Timestamp.Before(buffer.Stats.OldestMessage) {
        buffer.Stats.OldestMessage = bufferedMsg.Timestamp
    }
    
    return id
}

// Clone byte slice to avoid external modification
func cloneBytes(data []byte) []byte {
    clone := make([]byte, len(data))
    copy(clone, data)
    return clone
}

// Generate unique message ID
func generateMessageID() string {
    return fmt.Sprintf("msg-%d-%s", time.Now().UnixNano(), generateRandomString(6))
}

// Mark message as sent
func MarkSent(buffer *MessageBuffer, id string) {
    // Acquire write lock
    buffer.Mutex.Lock()
    defer buffer.Mutex.Unlock()
    
    // Find message with matching ID
    for i, msg := range buffer.Messages {
        if msg.ID == id {
            // Remove message from buffer (successful send)
            buffer.Messages = append(buffer.Messages[:i], buffer.Messages[i+1:]...)
            break
        }
    }
    
    // Update stats if oldest message removed
    if len(buffer.Messages) > 0 {
        // Find new oldest
        oldest := time.Now()
        for _, msg := range buffer.Messages {
            if msg.Timestamp.Before(oldest) {
                oldest = msg.Timestamp
            }
        }
        buffer.Stats.OldestMessage = oldest
    } else {
        buffer.Stats.OldestMessage = time.Time{}
    }
}

// Get messages to replay, ordered by priority
func GetPendingMessages(buffer *MessageBuffer, msgType string) []BufferedMessage {
    // Acquire read lock
    buffer.Mutex.RLock()
    defer buffer.Mutex.RUnlock()
    
    now := time.Now()
    maxAge := now.Add(-buffer.Config.MaxAge)
    
    // Filter messages by max attempts, age, and message type
    pending := make([]BufferedMessage, 0, len(buffer.Messages))
    for _, msg := range buffer.Messages {
        if msg.Attempts < buffer.Config.MaxAttempts &&
           msg.Timestamp.After(maxAge) &&
           (msgType == "" || msg.MessageType == msgType) {
            pending = append(pending, msg)
        }
    }
    
    // Sort by priority (high to low) and then by age (oldest first)
    sort.Slice(pending, func(i, j int) bool {
        if pending[i].Priority != pending[j].Priority {
            return pending[i].Priority > pending[j].Priority
        }
        return pending[i].Timestamp.Before(pending[j].Timestamp)
    })
    
    return pending
}

// Record replay attempt
func RecordAttempt(buffer *MessageBuffer, id string) {
    // Acquire write lock
    buffer.Mutex.Lock()
    defer buffer.Mutex.Unlock()
    
    // Find message with matching ID
    for i := range buffer.Messages {
        if buffer.Messages[i].ID == id {
            // Increment attempt counter
            buffer.Messages[i].Attempts++
            break
        }
    }
}

// Clean expired messages
func CleanExpired(buffer *MessageBuffer) int {
    // Acquire write lock
    buffer.Mutex.Lock()
    defer buffer.Mutex.Unlock()
    
    // Calculate expiration timestamp
    now := time.Now()
    maxAge := now.Add(-buffer.Config.MaxAge)
    
    // Filter out expired messages
    newMessages := make([]BufferedMessage, 0, len(buffer.Messages))
    removed := 0
    
    for _, msg := range buffer.Messages {
        // Keep message if not expired and not over max attempts
        if msg.Timestamp.After(maxAge) && msg.Attempts < buffer.Config.MaxAttempts {
            newMessages = append(newMessages, msg)
        } else {
            removed++
            buffer.Stats.DroppedMessages++
        }
    }
    
    // Update buffer
    buffer.Messages = newMessages
    
    // Update oldest message timestamp
    if len(newMessages) > 0 {
        oldest := now
        for _, msg := range newMessages {
            if msg.Timestamp.Before(oldest) {
                oldest = msg.Timestamp
            }
        }
        buffer.Stats.OldestMessage = oldest
    } else {
        buffer.Stats.OldestMessage = time.Time{}
    }
    
    return removed
}

// Replay pending messages to connection with batching and rate limiting
func ReplayMessages(buffer *MessageBuffer, conn *honeybee.Connection, msgType string) []error {
    // Skip if buffering disabled
    if !buffer.Config.Enabled {
        return nil
    }
    
    // Get pending messages
    pending := GetPendingMessages(buffer, msgType)
    
    // No messages to replay
    if len(pending) == 0 {
        return nil
    }
    
    // Collect errors
    errors := make([]error, 0)
    
    // Process in batches
    for i := 0; i < len(pending); i += buffer.Config.MaxBatchSize {
        // Calculate end of batch
        end := i + buffer.Config.MaxBatchSize
        if end > len(pending) {
            end = len(pending)
        }
        
        // Process batch
        batch := pending[i:end]
        for _, msg := range batch {
            // Send message to connection
            select {
            case conn.OutChan <- msg.Content:
                // Record successful replay
                atomic.AddInt64(&buffer.Stats.MessagesReplayed, 1)
                
                // Mark as sent
                MarkSent(buffer, msg.ID)
                
            default:
                // Channel full or closed
                err := errors.New("replay failed: connection channel full or closed")
                errors = append(errors, err)
                
                // Record attempt
                RecordAttempt(buffer, msg.ID)
                atomic.AddInt64(&buffer.Stats.FailedReplays, 1)
            }
        }
        
        // Delay between batches to prevent overwhelming the connection
        if end < len(pending) {
            time.Sleep(buffer.Config.BatchDelay)
        }
    }
    
    return errors
}

// Get replay statistics
func GetReplayStats(buffer *MessageBuffer) ReplayStats {
    // Acquire read lock
    buffer.Mutex.RLock()
    defer buffer.Mutex.RUnlock()
    
    // Return copy of stats
    return buffer.Stats
}

10. Connection Statistics

Purpose: Track connection usage metrics with predictive analytics.

Dependencies: honeybee Connection

// Core types
type ConnectionStats struct {
    Connected             time.Time
    MessagesSent          int64
    MessagesReceived      int64
    BytesSent             int64
    BytesReceived         int64
    Errors                int64
    AvgLatency            time.Duration
    LastMessageSent       time.Time
    LastMessageReceived   time.Time
    
    // Advanced metrics
    ErrorRates            []float64        // Error rates over time periods (1m, 5m, 15m)
    ThroughputRates       []float64        // Messages per second over time periods
    LatencyPercentiles    []time.Duration  // P50, P90, P99 latencies
    ReconnectionCount     int
    HealthScoreHistory    []float64        // Health scores over time
    MessageTypeDistribution map[string]int64  // Count by message type
    
    // Quality metrics
    QualityScore         float64          // 0.0-1.0 overall quality rating
    Stability            float64          // Connection stability (0.0-1.0)
    Performance          float64          // Performance rating (0.0-1.0)
    Reliability          float64          // Reliability rating (0.0-1.0)
    
    // Resource utilization
    MemoryUsage          int64           // Bytes used by connection
    GoroutineCount       int             // Goroutines associated with connection
    
    // Private tracking
    lastCalculation      time.Time
    latencyHistory       []time.Duration
    mutex                sync.RWMutex
}

// Create new connection stats
func New() *ConnectionStats {
    return &ConnectionStats{
        Connected:             time.Now(),
        ErrorRates:            make([]float64, 3),        // 1m, 5m, 15m rates
        ThroughputRates:       make([]float64, 3),        // 1m, 5m, 15m rates
        LatencyPercentiles:    make([]time.Duration, 3),  // P50, P90, P99
        HealthScoreHistory:    make([]float64, 60),       // Last 60 measurements
        MessageTypeDistribution: make(map[string]int64),  // Counts by message type
        latencyHistory:        make([]time.Duration, 0, 100),
        lastCalculation:       time.Now(),
        QualityScore:          1.0,  // Start with perfect score
        Stability:             1.0,
        Performance:           1.0,
        Reliability:           1.0,
    }
}

// Record message sent with type tracking
func RecordMessageSent(stats *ConnectionStats, size int, messageType string) {
    // Atomically increment messages sent counter
    atomic.AddInt64(&stats.MessagesSent, 1)
    
    // Atomically add size to bytes sent counter
    atomic.AddInt64(&stats.BytesSent, int64(size))
    
    // Update last message sent timestamp
    stats.mutex.Lock()
    stats.LastMessageSent = time.Now()
    
    // Update message type distribution
    stats.MessageTypeDistribution[messageType]++
    stats.mutex.Unlock()
    
    // Trigger rate calculation if needed
    if time.Since(stats.lastCalculation) > 10*time.Second {
        go stats.calculateRates()
    }
}

// Record message received with type tracking
func RecordMessageReceived(stats *ConnectionStats, size int, messageType string) {
    // Atomically increment messages received counter
    atomic.AddInt64(&stats.MessagesReceived, 1)
    
    // Atomically add size to bytes received counter
    atomic.AddInt64(&stats.BytesReceived, int64(size))
    
    // Update last message received timestamp
    stats.mutex.Lock()
    stats.LastMessageReceived = time.Now()
    
    // Update message type distribution
    stats.MessageTypeDistribution[messageType]++
    stats.mutex.Unlock()
    
    // Trigger rate calculation if needed
    if time.Since(stats.lastCalculation) > 10*time.Second {
        go stats.calculateRates()
    }
}

// Record error with categorization
func RecordError(stats *ConnectionStats, errorType string) {
    // Atomically increment error counter
    atomic.AddInt64(&stats.Errors, 1)
    
    // Update error rate calculations
    go stats.calculateRates()
    
    // Update quality metrics
    go stats.updateQualityMetrics()
}

// Record latency measurement
func RecordLatency(stats *ConnectionStats, latency time.Duration) {
    stats.mutex.Lock()
    
    // Add to latency history
    stats.latencyHistory = append(stats.latencyHistory, latency)
    if len(stats.latencyHistory) > 100 {
        // Keep history bounded
        stats.latencyHistory = stats.latencyHistory[1:]
    }
    
    // Calculate exponential moving average for average latency
    if stats.AvgLatency == 0 {
        stats.AvgLatency = latency
    } else {
        // 95% old value, 5% new value
        oldWeight := 0.95
        newWeight := 0.05
        
        oldAvg := float64(stats.AvgLatency.Nanoseconds())
        newLatency := float64(latency.Nanoseconds())
        
        newAvg := oldAvg*oldWeight + newLatency*newWeight
        stats.AvgLatency = time.Duration(int64(newAvg))
    }
    
    stats.mutex.Unlock()
    
    // Recalculate percentiles if we have enough data
    if len(stats.latencyHistory) >= 10 {
        go stats.calculateLatencyPercentiles()
    }
    
    // Update quality metrics
    go stats.updateQualityMetrics()
}

// Record reconnection event
func RecordReconnection(stats *ConnectionStats) {
    stats.mutex.Lock()
    
    // Increment reconnection counter
    stats.ReconnectionCount++
    
    // Reset certain metrics after reconnection
    stats.lastCalculation = time.Now()
    
    stats.mutex.Unlock()
    
    // Update quality metrics
    go stats.updateQualityMetrics()
}

// Record health score
func RecordHealthScore(stats *ConnectionStats, score float64) {
    stats.mutex.Lock()
    
    // Shift history and add new score
    copy(stats.HealthScoreHistory[1:], stats.HealthScoreHistory)
    stats.HealthScoreHistory[0] = score
    
    stats.mutex.Unlock()
    
    // Update quality metrics
    go stats.updateQualityMetrics()
}

// Calculate throughput and error rates
func (stats *ConnectionStats) calculateRates() {
    stats.mutex.Lock()
    defer stats.mutex.Unlock()
    
    now := time.Now()
    elapsed := now.Sub(stats.lastCalculation).Seconds()
    
    if elapsed < 1.0 {
        return // Avoid too frequent calculations
    }
    
    // Calculate total messages
    totalMessages := stats.MessagesSent + stats.MessagesReceived
    
    // Calculate messages per second for 1m window
    rate := float64(totalMessages) / elapsed
    stats.ThroughputRates[0] = rate
    
    // Calculate error rate for 1m window
    if totalMessages > 0 {
        errorRate := float64(stats.Errors) / float64(totalMessages)
        stats.ErrorRates[0] = errorRate
    }
    
    // Update 5m and 15m rates with exponential decay
    // 5m rate: 63% new, 37% old
    stats.ThroughputRates[1] = 0.63*rate + 0.37*stats.ThroughputRates[1]
    stats.ErrorRates[1] = 0.63*stats.ErrorRates[0] + 0.37*stats.ErrorRates[1]
    
    // 15m rate: 28% new, 72% old
    stats.ThroughputRates[2] = 0.28*rate + 0.72*stats.ThroughputRates[2]
    stats.ErrorRates[2] = 0.28*stats.ErrorRates[0] + 0.72*stats.ErrorRates[2]
    
    // Update last calculation time
    stats.lastCalculation = now
}

// Calculate latency percentiles
func (stats *ConnectionStats) calculateLatencyPercentiles() {
    stats.mutex.Lock()
    defer stats.mutex.Unlock()
    
    // Need at least a few measurements
    if len(stats.latencyHistory) < 5 {
        return
    }
    
    // Create sorted copy of latency history
    sortedLatencies := make([]time.Duration, len(stats.latencyHistory))
    copy(sortedLatencies, stats.latencyHistory)
    sort.Slice(sortedLatencies, func(i, j int) bool {
        return sortedLatencies[i] < sortedLatencies[j]
    })
    
    // Calculate p50 (median)
    p50Index := len(sortedLatencies) / 2
    stats.LatencyPercentiles[0] = sortedLatencies[p50Index]
    
    // Calculate p90
    p90Index := int(float64(len(sortedLatencies)) * 0.9)
    if p90Index >= len(sortedLatencies) {
        p90Index = len(sortedLatencies) - 1
    }
    stats.LatencyPercentiles[1] = sortedLatencies[p90Index]
    
    // Calculate p99
    p99Index := int(float64(len(sortedLatencies)) * 0.99)
    if p99Index >= len(sortedLatencies) {
        p99Index = len(sortedLatencies) - 1
    }
    stats.LatencyPercentiles[2] = sortedLatencies[p99Index]
}

// Update quality metrics
func (stats *ConnectionStats) updateQualityMetrics() {
    stats.mutex.Lock()
    defer stats.mutex.Unlock()
    
    // Calculate stability based on reconnections and error rates
    uptime := time.Since(stats.Connected).Minutes()
    reconnRate := 0.0
    if uptime > 0 {
        reconnRate = float64(stats.ReconnectionCount) / uptime
    }
    
    // More than 1 reconnection per minute is bad
    reconnFactor := math.Min(1.0, reconnRate)
    errorFactor := stats.ErrorRates[0] // 1m error rate
    
    stats.Stability = 1.0 - (0.6*reconnFactor + 0.4*errorFactor)
    if stats.Stability < 0 {
        stats.Stability = 0
    }
    
    // Calculate performance based on latency
    var perfScore float64 = 1.0
    
    if stats.AvgLatency > 0 {
        // Latency score: 1.0 for <50ms, 0.0 for >500ms
        avgLatencyMs := float64(stats.AvgLatency.Milliseconds())
        
        if avgLatencyMs < 50 {
            perfScore = 1.0
        } else if avgLatencyMs > 500 {
            perfScore = 0.0
        } else {
            // Linear scale between 50ms and 500ms
            perfScore = 1.0 - (avgLatencyMs-50.0)/450.0
        }
    }
    
    stats.Performance = perfScore
    
    // Calculate reliability based on health scores
    var healthSum float64
    healthCount := 0
    
    for _, score := range stats.HealthScoreHistory {
        if score > 0 {
            healthSum += score
            healthCount++
        }
    }
    
    if healthCount > 0 {
        stats.Reliability = healthSum / float64(healthCount)
    } else {
        stats.Reliability = 1.0
    }
    
    // Calculate overall quality score
    stats.QualityScore = 0.4*stats.Stability + 0.3*stats.Performance + 0.3*stats.Reliability
    
    // Clamp to 0.0-1.0 range
    if stats.QualityScore < 0.0 {
        stats.QualityScore = 0.0
    } else if stats.QualityScore > 1.0 {
        stats.QualityScore = 1.0
    }
}

// Get uptime
func GetUptime(stats *ConnectionStats) time.Duration {
    return time.Since(stats.Connected)
}

// Get messages per second
func GetMessagesPerSecond(stats *ConnectionStats) float64 {
    stats.mutex.RLock()
    defer stats.mutex.RUnlock()
    
    return stats.ThroughputRates[0]
}

// Get health trend
func GetHealthTrend(stats *ConnectionStats) float64 {
    stats.mutex.RLock()
    defer stats.mutex.RUnlock()
    
    // Need enough history
    if len(stats.HealthScoreHistory) < 5 {
        return 0.0
    }
    
    // Calculate linear regression slope
    n := 0
    sumX := 0.0
    sumY := 0.0
    sumXY := 0.0
    sumXX := 0.0
    
    for i, score := range stats.HealthScoreHistory {
        if score <= 0 {
            continue
        }
        
        x := float64(i)
        y := score
        
        sumX += x
        sumY += y
        sumXY += x * y
        sumXX += x * x
        n++
    }
    
    if n < 3 {
        return 0.0
    }
    
    // Calculate slope
    nf := float64(n)
    slope := (nf*sumXY - sumX*sumY) / (nf*sumXX - sumX*sumX)
    
    // Normalize to -1.0 to 1.0 range
    // Negative means declining health
    // Positive means improving health
    avgHealth := sumY / nf
    if avgHealth != 0 {
        slope = slope / avgHealth * 10.0 // Scale factor
    }
    
    // Clamp to range
    if slope > 1.0 {
        slope = 1.0
    } else if slope < -1.0 {
        slope = -1.0
    }
    
    return slope
}

// Reset stats
func Reset(stats *ConnectionStats) {
    stats.mutex.Lock()
    defer stats.mutex.Unlock()
    
    // Keep connection time and reconnection count
    connectedTime := stats.Connected
    reconnCount := stats.ReconnectionCount
    
    // Create new stats with same connection time
    newStats := New()
    newStats.Connected = connectedTime
    newStats.ReconnectionCount = reconnCount
    
    // Copy all fields
    *stats = *newStats
}

// Generate statistics report
func GenerateReport(stats *ConnectionStats) map[string]interface{} {
    stats.mutex.RLock()
    defer stats.mutex.RUnlock()
    
    // Calculate uptime
    uptime := time.Since(stats.Connected)
    
    // Create report
    report := map[string]interface{}{
        "uptime_seconds":     uptime.Seconds(),
        "messages_sent":      stats.MessagesSent,
        "messages_received":  stats.MessagesReceived,
        "bytes_sent":         stats.BytesSent,
        "bytes_received":     stats.BytesReceived,
        "errors":             stats.Errors,
        "reconnections":      stats.ReconnectionCount,
        "average_latency_ms": stats.AvgLatency.Milliseconds(),
        
        "throughput": map[string]float64{
            "messages_per_second_1m":  stats.ThroughputRates[0],
            "messages_per_second_5m":  stats.ThroughputRates[1],
            "messages_per_second_15m": stats.ThroughputRates[2],
        },
        
        "error_rates": map[string]float64{
            "error_rate_1m":  stats.ErrorRates[0],
            "error_rate_5m":  stats.ErrorRates[1],
            "error_rate_15m": stats.ErrorRates[2],
        },
        
        "latency": map[string]int64{
            "p50_ms": stats.LatencyPercentiles[0].Milliseconds(),
            "p90_ms": stats.LatencyPercentiles[1].Milliseconds(),
            "p99_ms": stats.LatencyPercentiles[2].Milliseconds(),
        },
        
        "quality": map[string]float64{
            "overall_score": stats.QualityScore,
            "stability":     stats.Stability,
            "performance":   stats.Performance,
            "reliability":   stats.Reliability,
        },
        
        "health_trend": GetHealthTrend(stats),
    }
    
    // Add message type distribution
    msgTypes := make(map[string]int64)
    for typ, count := range stats.MessageTypeDistribution {
        msgTypes[typ] = count
    }
    report["message_types"] = msgTypes
    
    return report
}

Architecture Decision Records

ADR 1: Specialized Extensions Model

Context: Need a way to provide advanced functionality without bloating the core.

Decision: Implement specialized, independent extensions that build on honeybee but can be used selectively.

Rationale:

  • Allows applications to choose only the extensions they need
  • Prevents complexity from leaking into core functionality
  • Creates clear boundaries between essential and optional features
  • Enables specialized optimizations for specific use cases
  • Follows the principle of progressive enhancement
  • Enables à la carte adoption of advanced features
  • Preserves compatibility with honeybee-based applications

Consequences:

  • Increases total API surface area across multiple packages
  • Improves application performance by avoiding unnecessary code
  • Creates clearer documentation of advanced vs. basic functionality
  • May require some duplication between extensions
  • Allows independent evolution of each extension
  • All extensions implement corresponding honeybee interfaces
  • Extensions can be adopted individually without requiring full migration

ADR 2: High-Performance Focus

Context: Advanced applications have different performance requirements than basic ones.

Decision: Optimize mana-ws extensions for high-performance and high-volume scenarios.

Rationale:

  • Addresses needs of applications with demanding requirements
  • Provides optimizations that would be overkill for simple applications
  • Creates clear upgrade path as application needs grow
  • Allows fine-tuning for specific deployment environments
  • Targets use cases where performance is critical

Consequences:

  • Increases implementation complexity
  • Requires more sophisticated testing
  • Creates more configuration options
  • Enables applications to scale to higher loads
  • May have higher resource usage in some cases

ADR 3: Advanced State Management

Context: Complex applications need sophisticated state tracking beyond basic connection management.

Decision: Implement comprehensive state tracking with analytics and prediction capabilities.

Rationale:

  • Provides visibility into connection behavior for debugging
  • Enables predictive adjustments to improve reliability
  • Creates foundation for adaptive optimization
  • Supports monitoring and alerting in production environments
  • Helps identify patterns in connection behavior

Consequences:

  • Increases memory and CPU usage for tracking
  • Provides valuable insights for complex applications
  • May require tuning to balance tracking overhead
  • Creates more complex internal state
  • Enables more sophisticated recovery strategies

ADR 4: Adaptive Algorithms

Context: Static configurations don't work well across diverse network conditions.

Decision: Implement adaptive algorithms that adjust based on observed conditions.

Rationale:

  • Improves performance across varying network environments
  • Reduces need for manual tuning
  • Creates more resilient connections
  • Adjusts automatically to changing conditions
  • Prevents degradation under stress

Consequences:

  • Increases algorithm complexity
  • Makes behavior less predictable in some cases
  • Improves overall reliability
  • Requires careful testing across diverse conditions
  • May cause unexpected adaptation in edge cases

ADR 5: Pluggable Architecture

Context: Different applications need different combinations of advanced features.

Decision: Design extensions to be independently usable rather than as a monolithic advanced layer.

Rationale:

  • Allows applications to choose only what they need
  • Prevents unnecessary code inclusion
  • Simplifies testing of individual features
  • Enables incremental adoption of advanced features
  • Follows the principle of separation of concerns

Consequences:

  • Requires clear documentation of dependencies between extensions
  • Increases number of import statements in application code
  • Improves application size and performance
  • May lead to some duplication between extensions
  • Allows targeted updates to specific extensions

ADR 6: Detailed Analytics

Context: Understanding connection behavior is important for optimization.

Decision: Implement comprehensive statistics tracking and reporting.

Rationale:

  • Provides visibility into connection performance
  • Enables data-driven optimization
  • Supports monitoring and alerting
  • Helps identify problems before they become critical
  • Creates foundation for automatic tuning

Consequences:

  • Increases overhead of tracking and calculation
  • Provides valuable insights for complex applications
  • May require storage considerations for historical data
  • Creates more observability in production
  • Requires balancing detail with performance impact

ADR 7: Drop-in Component Compatibility

Context: Need to determine how mana-ws components relate to their honeybee counterparts.

Decision: Design mana-ws components to be drop-in replacements for honeybee components, implementing the same interfaces while providing additional capabilities through extended interfaces.

Rationale:

  • Enables incremental adoption of advanced features
  • Allows selective enhancement of specific components
  • Preserves investment in existing application code
  • Creates clear upgrade paths as requirements evolve
  • Follows the principle of progressive enhancement
  • Respects the user's choice of complexity level
  • Supports targeted optimization of performance bottlenecks

Consequences:

  • All mana-ws components must implement honeybee interfaces
  • Need careful design of extended interfaces
  • May require adapter patterns in some cases
  • Enables feature detection through interface assertions
  • Simplifies migration between implementations
  • Requires comprehensive compatibility testing

ADR 8: Progressive Cognitive Load

Context: Need to consider how developers learn and adopt the library ecosystem as their applications evolve.

Decision: Design mana-ws to support progressive learning and implementation, allowing developers to adopt advanced features incrementally as their applications mature, without requiring upfront understanding of all capabilities.

Rationale:

  • Aligns with natural learning progression
  • Supports symbiotic evolution of applications and usage patterns
  • Reduces initial barrier to entry
  • Enables experimentation with advanced features in isolation
  • Creates natural documentation hierarchy from basic to advanced
  • Matches complexity to application maturity
  • Respects developer time and cognitive resources

Consequences:

  • Features must be designed for independent adoption
  • Documentation should follow progressive disclosure principles
  • Examples needed for common upgrade scenarios
  • May increase number of configuration options
  • Creates more natural onboarding experience
  • Allows focused learning on immediately relevant features
  • Supports both beginners and advanced users effectively