Phase 3: serialize writeEventsToDatabases; remove inner goroutines and channels

This commit is contained in:
Jay
2026-05-22 16:16:03 -04:00
parent 94cfb35fb2
commit bce69a146b
2 changed files with 24 additions and 58 deletions
+4
View File
@@ -1,5 +1,9 @@
# go-heartwood # go-heartwood
## Agent Skills
- Skills are located in `~/.pi/agent/skills/`
- Skill scripts are located in their respective skill directories under `scripts/` -- look for them there, not in this repository.
## Testing ## Testing
- Baseline: `go test -count=1 ./...` - Baseline: `go test -count=1 ./...`
- Do not use `-race`: `boltdb/bolt@v1.3.1` triggers a `checkptr` panic under race instrumentation; this is an upstream bug, not a project defect. - Do not use `-race`: `boltdb/bolt@v1.3.1` triggers a `checkptr` panic under race instrumentation; this is an upstream bug, not a project defect.
+20 -58
View File
@@ -251,64 +251,41 @@ func writeEventsToDatabases(
resultChan chan WriteResult, resultChan chan WriteResult,
) { ) {
defer wg.Done() defer wg.Done()
var localWg sync.WaitGroup
boltEventChan := make(chan EventTraveller) var travellers []EventTraveller
graphEventChan := make(chan EventTraveller)
boltErrorChan := make(chan error, 1)
graphResultChan := make(chan WriteResult, 1)
localWg.Add(2)
go writeEventsToBoltDB(&localWg, boltdb, boltEventChan, boltErrorChan)
go writeEventsToGraphDB(&localWg, driver, graphEventChan, boltErrorChan, graphResultChan)
// Fan out events to both writers
for traveller := range inChan { for traveller := range inChan {
boltEventChan <- traveller travellers = append(travellers, traveller)
graphEventChan <- traveller
} }
close(boltEventChan)
close(graphEventChan)
localWg.Wait() boltErr := writeEventsToBoltDB(boltdb, travellers)
if boltErr != nil {
resultChan <- WriteResult{
Error: fmt.Errorf("boltdb write failed, aborting graph write: %w", boltErr),
}
return
}
graphResult := <-graphResultChan summaries, err := writeEventsToGraphDB(driver, travellers)
resultChan <- graphResult resultChan <- WriteResult{
ResultSummaries: summaries,
Error: err,
}
} }
func writeEventsToBoltDB( func writeEventsToBoltDB(boltdb *bolt.DB, travellers []EventTraveller) error {
wg *sync.WaitGroup,
boltdb *bolt.DB,
inChan chan EventTraveller,
errorChan chan error,
) {
defer wg.Done()
var events []EventBlob var events []EventBlob
for _, traveller := range travellers {
for traveller := range inChan {
events = append(events, events = append(events,
EventBlob{ID: []byte(traveller.ID), JSON: traveller.JSON}) EventBlob{ID: []byte(traveller.ID), JSON: traveller.JSON})
} }
return BatchWriteEvents(boltdb, events)
err := BatchWriteEvents(boltdb, events)
errorChan <- err
close(errorChan)
} }
func writeEventsToGraphDB( func writeEventsToGraphDB(driver neo4j.Driver, travellers []EventTraveller) ([]neo4j.ResultSummary, error) {
wg *sync.WaitGroup,
driver neo4j.Driver,
inChan chan EventTraveller,
boltErrorChan chan error,
resultChan chan WriteResult,
) {
defer wg.Done()
matchKeys := NewSimpleMatchKeys() matchKeys := NewSimpleMatchKeys()
batch := NewBatchSubgraph(matchKeys) batch := NewBatchSubgraph(matchKeys)
for traveller := range inChan { for _, traveller := range travellers {
for _, node := range traveller.Subgraph.Nodes() { for _, node := range traveller.Subgraph.Nodes() {
batch.AddNode(node) batch.AddNode(node)
} }
@@ -317,22 +294,7 @@ func writeEventsToGraphDB(
} }
} }
boltErr := <-boltErrorChan return MergeSubgraph(context.Background(), driver, batch)
if boltErr != nil {
resultChan <- WriteResult{
Error: fmt.Errorf(
"boltdb write failed, aborting graph write: %w", boltErr,
)}
close(resultChan)
return
}
summaries, err := MergeSubgraph(context.Background(), driver, batch)
resultChan <- WriteResult{
ResultSummaries: summaries,
Error: err,
}
close(resultChan)
} }
func collectTravellers(wg *sync.WaitGroup, inChan chan EventTraveller, resultChan chan []EventTraveller) { func collectTravellers(wg *sync.WaitGroup, inChan chan EventTraveller, resultChan chan []EventTraveller) {