Fix deadlock by buffering return values.

Remove standalone error return value.
This commit is contained in:
Jay
2026-03-05 16:54:53 -05:00
parent 0ed07a81e7
commit 496112e0e0

View File

@@ -46,7 +46,7 @@ func WriteEvents(
events [][]byte, events [][]byte,
driver neo4j.Driver, boltdb *bolt.DB, driver neo4j.Driver, boltdb *bolt.DB,
opts *WriteOptions, opts *WriteOptions,
) (WriteReport, error) { ) WriteReport {
start := time.Now() start := time.Now()
if opts == nil { if opts == nil {
@@ -57,7 +57,7 @@ func WriteEvents(
err := SetupBoltDB(boltdb) err := SetupBoltDB(boltdb)
if err != nil { if err != nil {
return WriteReport{}, fmt.Errorf("error setting up bolt db: %w", err) return WriteReport{Error: fmt.Errorf("error setting up bolt db: %w", err)}
} }
var wg sync.WaitGroup var wg sync.WaitGroup
@@ -77,7 +77,7 @@ func WriteEvents(
go parseEventJSON(&wg, eventChan, parsedChan, parseExcludedChan) go parseEventJSON(&wg, eventChan, parsedChan, parseExcludedChan)
// Collect Rejected Events // Collect Rejected Events
collectedParseExcludedChan := make(chan []EventTraveller) collectedParseExcludedChan := make(chan []EventTraveller, 1)
wg.Add(1) wg.Add(1)
go collectTravellers(&wg, parseExcludedChan, collectedParseExcludedChan) go collectTravellers(&wg, parseExcludedChan, collectedParseExcludedChan)
@@ -91,7 +91,7 @@ func WriteEvents(
parsedChan, queuedChan, policyExcludedChan) parsedChan, queuedChan, policyExcludedChan)
// Collect Skipped Events // Collect Skipped Events
collectedPolicyExcludedChan := make(chan []EventTraveller) collectedPolicyExcludedChan := make(chan []EventTraveller, 1)
wg.Add(1) wg.Add(1)
go collectTravellers(&wg, policyExcludedChan, collectedPolicyExcludedChan) go collectTravellers(&wg, policyExcludedChan, collectedPolicyExcludedChan)
@@ -103,7 +103,7 @@ func WriteEvents(
go convertEventsToSubgraphs(&wg, opts.Expanders, queuedChan, convertedChan) go convertEventsToSubgraphs(&wg, opts.Expanders, queuedChan, convertedChan)
// Write Events To Databases // Write Events To Databases
writeResultChan := make(chan WriteResult) writeResultChan := make(chan WriteResult, 1)
wg.Add(1) wg.Add(1)
go writeEventsToDatabases(&wg, driver, boltdb, convertedChan, writeResultChan) go writeEventsToDatabases(&wg, driver, boltdb, convertedChan, writeResultChan)
@@ -132,7 +132,7 @@ func WriteEvents(
Neo4jResultSummaries: writeResult.ResultSummaries, Neo4jResultSummaries: writeResult.ResultSummaries,
Duration: time.Since(start), Duration: time.Since(start),
Error: writeResult.Error, Error: writeResult.Error,
}, writeResult.Error }
} }
func setDefaultWriteOptions(opts *WriteOptions) { func setDefaultWriteOptions(opts *WriteOptions) {
@@ -254,8 +254,8 @@ func writeEventsToDatabases(
boltEventChan := make(chan EventTraveller) boltEventChan := make(chan EventTraveller)
graphEventChan := make(chan EventTraveller) graphEventChan := make(chan EventTraveller)
boltErrorChan := make(chan error) boltErrorChan := make(chan error, 1)
graphResultChan := make(chan WriteResult) graphResultChan := make(chan WriteResult, 1)
localWg.Add(2) localWg.Add(2)
go writeEventsToBoltDB(&localWg, boltdb, boltEventChan, boltErrorChan) go writeEventsToBoltDB(&localWg, boltdb, boltEventChan, boltErrorChan)