diff --git a/README.md b/README.md index 0245d3a..0df41aa 100644 --- a/README.md +++ b/README.md @@ -24,7 +24,80 @@ replace git.wisehodl.dev/jay/go-heartwood => github.com/wisehodl/go-heartwood la ## Usage -*Usage examples will be added as the API stabilizes.* +*Further usage examples will be added as the API stabilizes.* + +### Writing Events + +```go +package main + +import ( + "context" + "encoding/json" + "log" + + "git.wisehodl.dev/jay/go-roots/events" + "git.wisehodl.dev/jay/go-roots/keys" + "git.wisehodl.dev/jay/go-heartwood" + + "github.com/boltdb/bolt" +) + +func main() { + ctx := context.Background() + + // Connect to Neo4j + driver, err := heartwood.ConnectNeo4j(ctx, "bolt://localhost:7687", "neo4j", "password") + if err != nil { + log.Fatal(err) + } + defer driver.Close(ctx) + + // Ensure the necessary indexes and constraints exist + if err := heartwood.SetNeo4jSchema(ctx, driver); err != nil { + log.Fatal(err) + } + + // Open BoltDB + boltdb, err := bolt.Open("events.db", 0600, nil) + if err != nil { + log.Fatal(err) + } + defer boltdb.Close() + + // Build events using go-roots + sk, _ := keys.GeneratePrivateKey() + pk, _ := keys.GetPublicKey(sk) + + event := events.Event{ + PubKey: pk, + CreatedAt: 1000, + Kind: 1, + Content: "hello from heartwood", + Tags: []events.Tag{}, + } + event.ID, _ = events.GenerateEventID(event) + event.Sig, _ = events.SignEvent(event, sk) + + eventJSON, _ := json.Marshal(event) + + // Write events + report, err := heartwood.WriteEvents( + [][]byte{eventJSON}, + driver, boltdb, + nil, // default WriteOptions + ) + if err != nil { + log.Fatal(err) + } + + log.Printf("created: %d, excluded: %d, duration: %s", + report.CreatedEventCount, + len(report.ExcludedEvents), + report.Duration, + ) +} +``` ## Testing diff --git a/boltdb.go b/boltdb.go index 807312d..2f62484 100644 --- a/boltdb.go +++ b/boltdb.go @@ -7,8 +7,8 @@ import ( const BucketName string = "events" type EventBlob struct { - ID string - JSON string + ID []byte + JSON []byte } func SetupBoltDB(boltdb *bolt.DB) error { @@ -39,9 +39,7 @@ func BatchWriteEvents(boltdb *bolt.DB, events []EventBlob) error { return boltdb.Update(func(tx *bolt.Tx) error { bucket := tx.Bucket([]byte(BucketName)) for _, event := range events { - if err := bucket.Put( - []byte(event.ID), []byte(event.JSON), - ); err != nil { + if err := bucket.Put(event.ID, event.JSON); err != nil { return err } } diff --git a/boltdb_test.go b/boltdb_test.go index f4df8d8..59637a5 100644 --- a/boltdb_test.go +++ b/boltdb_test.go @@ -58,7 +58,7 @@ func TestBatchCheckEventsExist(t *testing.T) { { name: "some exist", setupEvents: []EventBlob{ - {ID: "id1", JSON: `{"test":1}`}, + {ID: []byte("id1"), JSON: []byte(`{"test":1}`)}, }, checkIDs: []string{"id1", "id2"}, expectedMap: map[string]bool{ @@ -69,8 +69,8 @@ func TestBatchCheckEventsExist(t *testing.T) { { name: "all exist", setupEvents: []EventBlob{ - {ID: "id1", JSON: `{"test":1}`}, - {ID: "id2", JSON: `{"test":2}`}, + {ID: []byte("id1"), JSON: []byte(`{"test":1}`)}, + {ID: []byte("id2"), JSON: []byte(`{"test":2}`)}, }, checkIDs: []string{"id1", "id2"}, expectedMap: map[string]bool{ @@ -121,7 +121,7 @@ func TestBatchWriteEvents(t *testing.T) { { name: "single event", events: []EventBlob{ - {ID: "id1", JSON: `{"test":1}`}, + {ID: []byte("id1"), JSON: []byte(`{"test":1}`)}, }, verifyID: "id1", expectedJSON: `{"test":1}`, @@ -129,9 +129,9 @@ func TestBatchWriteEvents(t *testing.T) { { name: "multiple events", events: []EventBlob{ - {ID: "id1", JSON: `{"test":1}`}, - {ID: "id2", JSON: `{"test":2}`}, - {ID: "id3", JSON: `{"test":3}`}, + {ID: []byte("id1"), JSON: []byte(`{"test":1}`)}, + {ID: []byte("id2"), JSON: []byte(`{"test":2}`)}, + {ID: []byte("id3"), JSON: []byte(`{"test":3}`)}, }, verifyID: "id2", expectedJSON: `{"test":2}`, @@ -139,8 +139,8 @@ func TestBatchWriteEvents(t *testing.T) { { name: "duplicate id overwrites", events: []EventBlob{ - {ID: "id1", JSON: `{"test":1}`}, - {ID: "id1", JSON: `{"test":2}`}, + {ID: []byte("id1"), JSON: []byte(`{"test":1}`)}, + {ID: []byte("id1"), JSON: []byte(`{"test":2}`)}, }, verifyID: "id1", expectedJSON: `{"test":2}`, @@ -177,9 +177,9 @@ func TestWriteThenCheck(t *testing.T) { require.NoError(t, err) events := []EventBlob{ - {ID: "id1", JSON: `{"test":1}`}, - {ID: "id2", JSON: `{"test":2}`}, - {ID: "id3", JSON: `{"test":3}`}, + {ID: []byte("id1"), JSON: []byte(`{"test":1}`)}, + {ID: []byte("id2"), JSON: []byte(`{"test":2}`)}, + {ID: []byte("id3"), JSON: []byte(`{"test":3}`)}, } err = BatchWriteEvents(db, events) diff --git a/write.go b/write.go index afebc73..5f8000c 100644 --- a/write.go +++ b/write.go @@ -18,7 +18,7 @@ type WriteOptions struct { type EventTraveller struct { ID string - JSON string + JSON []byte Event roots.Event Subgraph *EventSubgraph Error error @@ -30,8 +30,7 @@ type WriteResult struct { } type WriteReport struct { - InvalidEvents []EventTraveller - SkippedEvents []EventTraveller + ExcludedEvents []EventTraveller CreatedEventCount int Neo4jResultSummaries []neo4j.ResultSummary Duration time.Duration @@ -39,7 +38,7 @@ type WriteReport struct { } func WriteEvents( - events []string, + events [][]byte, driver neo4j.Driver, boltdb *bolt.DB, opts *WriteOptions, ) (WriteReport, error) { @@ -59,7 +58,7 @@ func WriteEvents( var wg sync.WaitGroup // Create Event Travellers - jsonChan := make(chan string) + jsonChan := make(chan []byte) eventChan := make(chan EventTraveller) wg.Add(1) @@ -67,30 +66,30 @@ func WriteEvents( // Parse Event JSON parsedChan := make(chan EventTraveller) - invalidChan := make(chan EventTraveller) + parseExcludedChan := make(chan EventTraveller) wg.Add(1) - go parseEventJSON(&wg, eventChan, parsedChan, invalidChan) + go parseEventJSON(&wg, eventChan, parsedChan, parseExcludedChan) - // Collect Invalid Events - collectedInvalidChan := make(chan []EventTraveller) + // Collect Rejected Events + collectedParseExcludedChan := make(chan []EventTraveller) wg.Add(1) - go collectTravellers(&wg, invalidChan, collectedInvalidChan) + go collectTravellers(&wg, parseExcludedChan, collectedParseExcludedChan) // Enforce Policy Rules queuedChan := make(chan EventTraveller) - skippedChan := make(chan EventTraveller) + policyExcludedChan := make(chan EventTraveller) wg.Add(1) go enforcePolicyRules(&wg, driver, boltdb, opts.BoltReadBatchSize, - parsedChan, queuedChan, skippedChan) + parsedChan, queuedChan, policyExcludedChan) // Collect Skipped Events - collectedSkippedChan := make(chan []EventTraveller) + collectedPolicyExcludedChan := make(chan []EventTraveller) wg.Add(1) - go collectTravellers(&wg, skippedChan, collectedSkippedChan) + go collectTravellers(&wg, policyExcludedChan, collectedPolicyExcludedChan) // Convert Events To Subgraphs convertedChan := make(chan EventTraveller) @@ -116,14 +115,15 @@ func WriteEvents( wg.Wait() // Collect results - invalid := <-collectedInvalidChan - skipped := <-collectedSkippedChan + parseExcluded := <-collectedParseExcludedChan + policyExcluded := <-collectedPolicyExcludedChan writeResult := <-writeResultChan + excluded := append(parseExcluded, policyExcluded...) + return WriteReport{ - InvalidEvents: invalid, - SkippedEvents: skipped, - CreatedEventCount: len(events) - len(invalid) - len(skipped), + ExcludedEvents: excluded, + CreatedEventCount: len(events) - len(excluded), Neo4jResultSummaries: writeResult.ResultSummaries, Duration: time.Since(start), Error: writeResult.Error, @@ -139,7 +139,7 @@ func setDefaultWriteOptions(opts *WriteOptions) { } } -func createEventTravellers(wg *sync.WaitGroup, jsonChan chan string, eventChan chan EventTraveller) { +func createEventTravellers(wg *sync.WaitGroup, jsonChan chan []byte, eventChan chan EventTraveller) { defer wg.Done() for json := range jsonChan { eventChan <- EventTraveller{JSON: json} @@ -147,15 +147,22 @@ func createEventTravellers(wg *sync.WaitGroup, jsonChan chan string, eventChan c close(eventChan) } -func parseEventJSON(wg *sync.WaitGroup, inChan, parsedChan, invalidChan chan EventTraveller) { +func parseEventJSON(wg *sync.WaitGroup, inChan, parsedChan, excludedChan chan EventTraveller) { defer wg.Done() for traveller := range inChan { var event roots.Event - jsonBytes := []byte(traveller.JSON) + jsonBytes := traveller.JSON err := json.Unmarshal(jsonBytes, &event) if err != nil { - traveller.Error = err - invalidChan <- traveller + traveller.Error = fmt.Errorf("rejected: unrecognized event format: %w", err) + excludedChan <- traveller + continue + } + + err = roots.Validate(event) + if err != nil { + traveller.Error = fmt.Errorf("rejected: invalid event: %w", err) + excludedChan <- traveller continue } @@ -165,14 +172,14 @@ func parseEventJSON(wg *sync.WaitGroup, inChan, parsedChan, invalidChan chan Eve } close(parsedChan) - close(invalidChan) + close(excludedChan) } func enforcePolicyRules( wg *sync.WaitGroup, driver neo4j.Driver, boltdb *bolt.DB, batchSize int, - inChan, queuedChan, skippedChan chan EventTraveller, + inChan, queuedChan, excludedChan chan EventTraveller, ) { defer wg.Done() var batch []EventTraveller @@ -181,17 +188,17 @@ func enforcePolicyRules( batch = append(batch, traveller) if len(batch) >= batchSize { - processPolicyRulesBatch(boltdb, batch, queuedChan, skippedChan) + processPolicyRulesBatch(boltdb, batch, queuedChan, excludedChan) batch = []EventTraveller{} } } if len(batch) > 0 { - processPolicyRulesBatch(boltdb, batch, queuedChan, skippedChan) + processPolicyRulesBatch(boltdb, batch, queuedChan, excludedChan) } close(queuedChan) - close(skippedChan) + close(excludedChan) } func processPolicyRulesBatch( @@ -209,6 +216,7 @@ func processPolicyRulesBatch( for _, traveller := range batch { if existsMap[traveller.ID] { + traveller.Error = fmt.Errorf("skipped: event already exists") skippedChan <- traveller } else { queuedChan <- traveller @@ -273,7 +281,7 @@ func writeEventsToBoltDB( for traveller := range inChan { events = append(events, - EventBlob{ID: traveller.ID, JSON: traveller.JSON}) + EventBlob{ID: []byte(traveller.ID), JSON: traveller.JSON}) } err := BatchWriteEvents(boltdb, events) diff --git a/write_test.go b/write_test.go index 6dba39a..b893baa 100644 --- a/write_test.go +++ b/write_test.go @@ -1,7 +1,6 @@ package heartwood import ( - "fmt" roots "git.wisehodl.dev/jay/go-roots/events" "github.com/stretchr/testify/assert" "sync" @@ -10,12 +9,16 @@ import ( // Test helpers -func validEventJSON(id, pubkey string) string { - return fmt.Sprintf(`{"id":"%s","pubkey":"%s","created_at":1000,"kind":1,"content":"test","tags":[],"sig":"abc"}`, id, pubkey) +func validEventJSON() []byte { + return []byte(`{"id":"c7a702e6158744ca03508bbb4c90f9dbb0d6e88fefbfaa511d5ab24b4e3c48ad","pubkey":"cfa87f35acbde29ba1ab3ee42de527b2cad33ac487e80cf2d6405ea0042c8fef","created_at":1760740551,"kind":1,"tags":[],"content":"hello world","sig":"83b71e15649c9e9da362c175f988c36404cabf357a976d869102a74451cfb8af486f6088b5631033b4927bd46cad7a0d90d7f624aefc0ac260364aa65c36071a"}`) } -func invalidEventJSON() string { - return `{invalid json` +func invalidEventJSON() []byte { + return []byte(`{"id":"abc123","pubkey":"xyz789","created_at":1000,"kind":1,"content":"test","tags":[],"sig":"abc"}`) +} + +func malformedEventJSON() []byte { + return []byte(`{malformed json`) } // Pipeline stage tests @@ -23,28 +26,28 @@ func invalidEventJSON() string { func TestCreateEventTravellers(t *testing.T) { cases := []struct { name string - input []string + input [][]byte expected []EventTraveller }{ { name: "empty input", - input: []string{}, + input: [][]byte{}, expected: []EventTraveller{}, }, { name: "single json", - input: []string{"test1"}, + input: [][]byte{[]byte("test1")}, expected: []EventTraveller{ - {JSON: "test1"}, + {JSON: []byte("test1")}, }, }, { name: "multiple jsons", - input: []string{"test1", "test2", "test3"}, + input: [][]byte{[]byte("test1"), []byte("test2"), []byte("test3")}, expected: []EventTraveller{ - {JSON: "test1"}, - {JSON: "test2"}, - {JSON: "test3"}, + {JSON: []byte("test1")}, + {JSON: []byte("test2")}, + {JSON: []byte("test3")}, }, }, } @@ -52,7 +55,7 @@ func TestCreateEventTravellers(t *testing.T) { for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { var wg sync.WaitGroup - jsonChan := make(chan string) + jsonChan := make(chan []byte) eventChan := make(chan EventTraveller) wg.Add(1) @@ -86,37 +89,48 @@ func TestParseEventJSON(t *testing.T) { name string input []EventTraveller wantParsed int - wantInvalid int + wantRejected int checkParsedID bool expectedID string + wantErrorText string }{ { name: "valid event", input: []EventTraveller{ - {JSON: validEventJSON("abc123", "pubkey1")}, + {JSON: validEventJSON()}, }, wantParsed: 1, - wantInvalid: 0, + wantRejected: 0, checkParsedID: true, - expectedID: "abc123", + expectedID: "c7a702e6158744ca03508bbb4c90f9dbb0d6e88fefbfaa511d5ab24b4e3c48ad", }, { - name: "invalid json", + name: "invalid event", input: []EventTraveller{ {JSON: invalidEventJSON()}, }, - wantParsed: 0, - wantInvalid: 1, + wantParsed: 0, + wantRejected: 1, + wantErrorText: "rejected: invalid event", + }, + { + name: "malformed json", + input: []EventTraveller{ + {JSON: malformedEventJSON()}, + }, + wantParsed: 0, + wantRejected: 1, + wantErrorText: "rejected: unrecognized event format", }, { name: "mixed batch", input: []EventTraveller{ - {JSON: validEventJSON("abc123", "pubkey1")}, {JSON: invalidEventJSON()}, - {JSON: validEventJSON("def456", "pubkey2")}, + {JSON: malformedEventJSON()}, + {JSON: validEventJSON()}, }, - wantParsed: 2, - wantInvalid: 1, + wantParsed: 1, + wantRejected: 2, }, } @@ -125,10 +139,10 @@ func TestParseEventJSON(t *testing.T) { var wg sync.WaitGroup inChan := make(chan EventTraveller) parsedChan := make(chan EventTraveller) - invalidChan := make(chan EventTraveller) + rejectedChan := make(chan EventTraveller) wg.Add(1) - go parseEventJSON(&wg, inChan, parsedChan, invalidChan) + go parseEventJSON(&wg, inChan, parsedChan, rejectedChan) go func() { for _, traveller := range tc.input { @@ -138,7 +152,7 @@ func TestParseEventJSON(t *testing.T) { }() var parsed []EventTraveller - var invalid []EventTraveller + var rejected []EventTraveller var collectWg sync.WaitGroup collectWg.Add(2) @@ -152,8 +166,8 @@ func TestParseEventJSON(t *testing.T) { go func() { defer collectWg.Done() - for f := range invalidChan { - invalid = append(invalid, f) + for f := range rejectedChan { + rejected = append(rejected, f) } }() @@ -161,7 +175,7 @@ func TestParseEventJSON(t *testing.T) { wg.Wait() assert.Equal(t, tc.wantParsed, len(parsed)) - assert.Equal(t, tc.wantInvalid, len(invalid)) + assert.Equal(t, tc.wantRejected, len(rejected)) // Smoke test first parsed id if tc.checkParsedID && len(parsed) > 0 { @@ -169,9 +183,14 @@ func TestParseEventJSON(t *testing.T) { assert.NotEmpty(t, parsed[0].Event.ID) } - for _, inv := range invalid { - assert.NotNil(t, inv.Error) - assert.Empty(t, inv.Event.ID) + // Check error text on first rejected event + if tc.wantErrorText != "" { + assert.ErrorContains(t, rejected[0].Error, tc.wantErrorText) + } + + for _, reject := range rejected { + assert.NotNil(t, reject.Error) + assert.Empty(t, reject.Event.ID) } }) }