From bce69a146b3c6d50c717fdd047943fce2af190e8 Mon Sep 17 00:00:00 2001 From: Jay Date: Fri, 22 May 2026 16:16:03 -0400 Subject: [PATCH] Phase 3: serialize writeEventsToDatabases; remove inner goroutines and channels --- AGENTS.md | 4 +++ write.go | 78 ++++++++++++++----------------------------------------- 2 files changed, 24 insertions(+), 58 deletions(-) diff --git a/AGENTS.md b/AGENTS.md index ef9bd09..80d13e8 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -1,5 +1,9 @@ # go-heartwood +## Agent Skills +- Skills are located in `~/.pi/agent/skills/` +- Skill scripts are located in their respective skill directories under `scripts/` -- look for them there, not in this repository. + ## Testing - Baseline: `go test -count=1 ./...` - Do not use `-race`: `boltdb/bolt@v1.3.1` triggers a `checkptr` panic under race instrumentation; this is an upstream bug, not a project defect. diff --git a/write.go b/write.go index b1921dc..6d275ee 100644 --- a/write.go +++ b/write.go @@ -251,64 +251,41 @@ func writeEventsToDatabases( resultChan chan WriteResult, ) { defer wg.Done() - var localWg sync.WaitGroup - boltEventChan := make(chan EventTraveller) - graphEventChan := make(chan EventTraveller) - - boltErrorChan := make(chan error, 1) - graphResultChan := make(chan WriteResult, 1) - - localWg.Add(2) - go writeEventsToBoltDB(&localWg, boltdb, boltEventChan, boltErrorChan) - go writeEventsToGraphDB(&localWg, driver, graphEventChan, boltErrorChan, graphResultChan) - - // Fan out events to both writers + var travellers []EventTraveller for traveller := range inChan { - boltEventChan <- traveller - graphEventChan <- traveller + travellers = append(travellers, traveller) } - close(boltEventChan) - close(graphEventChan) - localWg.Wait() + boltErr := writeEventsToBoltDB(boltdb, travellers) + if boltErr != nil { + resultChan <- WriteResult{ + Error: fmt.Errorf("boltdb write failed, aborting graph write: %w", boltErr), + } + return + } - graphResult := <-graphResultChan - resultChan <- graphResult + summaries, err := writeEventsToGraphDB(driver, travellers) + resultChan <- WriteResult{ + ResultSummaries: summaries, + Error: err, + } } -func writeEventsToBoltDB( - wg *sync.WaitGroup, - boltdb *bolt.DB, - inChan chan EventTraveller, - errorChan chan error, -) { - defer wg.Done() +func writeEventsToBoltDB(boltdb *bolt.DB, travellers []EventTraveller) error { var events []EventBlob - - for traveller := range inChan { + for _, traveller := range travellers { events = append(events, EventBlob{ID: []byte(traveller.ID), JSON: traveller.JSON}) } - - err := BatchWriteEvents(boltdb, events) - - errorChan <- err - close(errorChan) + return BatchWriteEvents(boltdb, events) } -func writeEventsToGraphDB( - wg *sync.WaitGroup, - driver neo4j.Driver, - inChan chan EventTraveller, - boltErrorChan chan error, - resultChan chan WriteResult, -) { - defer wg.Done() +func writeEventsToGraphDB(driver neo4j.Driver, travellers []EventTraveller) ([]neo4j.ResultSummary, error) { matchKeys := NewSimpleMatchKeys() batch := NewBatchSubgraph(matchKeys) - for traveller := range inChan { + for _, traveller := range travellers { for _, node := range traveller.Subgraph.Nodes() { batch.AddNode(node) } @@ -317,22 +294,7 @@ func writeEventsToGraphDB( } } - boltErr := <-boltErrorChan - if boltErr != nil { - resultChan <- WriteResult{ - Error: fmt.Errorf( - "boltdb write failed, aborting graph write: %w", boltErr, - )} - close(resultChan) - return - } - - summaries, err := MergeSubgraph(context.Background(), driver, batch) - resultChan <- WriteResult{ - ResultSummaries: summaries, - Error: err, - } - close(resultChan) + return MergeSubgraph(context.Background(), driver, batch) } func collectTravellers(wg *sync.WaitGroup, inChan chan EventTraveller, resultChan chan []EventTraveller) {