diff --git a/initiator/pool.go b/initiator/pool.go index d4e1d2f..b1a3aa1 100644 --- a/initiator/pool.go +++ b/initiator/pool.go @@ -147,58 +147,44 @@ func (p *Pool) Connect(id string) error { // Check for existing connection in pool p.mu.Lock() + defer p.mu.Unlock() + if p.closed { - p.mu.Unlock() return NewPoolError("pool is closed") } _, exists := p.peers[id] - p.mu.Unlock() if exists { return NewPoolError("connection already exists") } - // Create new connection + // Create new worker + stop := make(chan struct{}) + + worker, err := p.config.WorkerFactory(id, stop) + if err != nil { + close(stop) + return err + } + var logger *slog.Logger if p.logger != nil { logger = p.logger.With("id", id) } - conn, err := transport.NewConnection(id, p.config.ConnectionConfig, logger) - if err != nil { - return err - } - conn.SetDialer(p.dialer) - - // Attempt to connect - if err := conn.Connect(); err != nil { - return err + ctx := WorkerContext{ + Inbox: p.inbox, + Events: p.events, + Errors: p.errors, + PoolDone: p.done, + Logger: logger, + Dialer: p.dialer, + ConnectionConfig: p.config.ConnectionConfig, } - p.mu.Lock() - if p.closed { - // The pool closed while this connection was established. - p.mu.Unlock() - conn.Close() - return NewPoolError("pool is closed") - } + p.wg.Add(1) + go worker.Start(ctx, &p.wg) - // Add connection to pool - stop := make(chan struct{}) - if _, exists := p.peers[id]; exists { - // Another process connected to this url while this one was connecting - // Discard this connection and retain the existing one - p.mu.Unlock() - conn.Close() - return NewPoolError("connection already exists") - } - p.peers[id] = &Peer{id: id, worker: nil, stop: stop} // TODO: create worker - p.mu.Unlock() - - select { - case p.events <- PoolEvent{ID: id, Kind: EventConnected}: - case <-p.done: - return nil - } + p.peers[id] = &Peer{id: id, worker: worker, stop: stop} return nil }