diff --git a/graphstore/batch.go b/batchMerge.go similarity index 85% rename from graphstore/batch.go rename to batchMerge.go index b2ea2fa..816318f 100644 --- a/graphstore/batch.go +++ b/batchMerge.go @@ -1,10 +1,8 @@ -package graphstore +package heartwood import ( "context" "fmt" - "git.wisehodl.dev/jay/go-heartwood/cypher" - "git.wisehodl.dev/jay/go-heartwood/graph" "github.com/neo4j/neo4j-go-driver/v6/neo4j" "sort" "strings" @@ -16,7 +14,7 @@ type NodeBatch struct { MatchLabel string Labels []string MatchKeys []string - Nodes []*graph.Node + Nodes []*Node } type RelBatch struct { @@ -25,24 +23,24 @@ type RelBatch struct { StartMatchKeys []string EndLabel string EndMatchKeys []string - Rels []*graph.Relationship + Rels []*Relationship } type BatchSubgraph struct { - nodes map[string][]*graph.Node - rels map[string][]*graph.Relationship - matchProvider graph.MatchKeysProvider + nodes map[string][]*Node + rels map[string][]*Relationship + matchProvider MatchKeysProvider } -func NewBatchSubgraph(matchProvider graph.MatchKeysProvider) *BatchSubgraph { +func NewBatchSubgraph(matchProvider MatchKeysProvider) *BatchSubgraph { return &BatchSubgraph{ - nodes: make(map[string][]*graph.Node), - rels: make(map[string][]*graph.Relationship), + nodes: make(map[string][]*Node), + rels: make(map[string][]*Relationship), matchProvider: matchProvider, } } -func (s *BatchSubgraph) AddNode(node *graph.Node) error { +func (s *BatchSubgraph) AddNode(node *Node) error { // Verify that the node has defined match property values. matchLabel, _, err := node.MatchProps(s.matchProvider) @@ -54,16 +52,16 @@ func (s *BatchSubgraph) AddNode(node *graph.Node) error { batchKey := createNodeBatchKey(matchLabel, node.Labels.ToArray()) if _, exists := s.nodes[batchKey]; !exists { - s.nodes[batchKey] = []*graph.Node{} + s.nodes[batchKey] = []*Node{} } - // Add the node to the subgraph. + // Add the node to the sub s.nodes[batchKey] = append(s.nodes[batchKey], node) return nil } -func (s *BatchSubgraph) AddRel(rel *graph.Relationship) error { +func (s *BatchSubgraph) AddRel(rel *Relationship) error { // Verify that the start node has defined match property values. startLabel, _, err := rel.Start.MatchProps(s.matchProvider) @@ -81,10 +79,10 @@ func (s *BatchSubgraph) AddRel(rel *graph.Relationship) error { batchKey := createRelBatchKey(rel.Type, startLabel, endLabel) if _, exists := s.rels[batchKey]; !exists { - s.rels[batchKey] = []*graph.Relationship{} + s.rels[batchKey] = []*Relationship{} } - // Add the relationship to the subgraph. + // Add the relationship to the sub s.rels[batchKey] = append(s.rels[batchKey], rel) return nil @@ -287,10 +285,10 @@ func MergeNodes( tx neo4j.ManagedTransaction, batch NodeBatch, ) (*neo4j.ResultSummary, error) { - cypherLabels := cypher.ToCypherLabels(batch.Labels) - cypherProps := cypher.ToCypherProps(batch.MatchKeys, "node.") + cypherLabels := ToCypherLabels(batch.Labels) + cypherProps := ToCypherProps(batch.MatchKeys, "node.") - serializedNodes := []*graph.SerializedNode{} + serializedNodes := []*SerializedNode{} for _, node := range batch.Nodes { serializedNodes = append(serializedNodes, node.Serialize()) } @@ -326,13 +324,13 @@ func MergeRels( tx neo4j.ManagedTransaction, batch RelBatch, ) (*neo4j.ResultSummary, error) { - cypherType := cypher.ToCypherLabel(batch.Type) - startCypherLabel := cypher.ToCypherLabel(batch.StartLabel) - endCypherLabel := cypher.ToCypherLabel(batch.EndLabel) - startCypherProps := cypher.ToCypherProps(batch.StartMatchKeys, "rel.start.") - endCypherProps := cypher.ToCypherProps(batch.EndMatchKeys, "rel.end.") + cypherType := ToCypherLabel(batch.Type) + startCypherLabel := ToCypherLabel(batch.StartLabel) + endCypherLabel := ToCypherLabel(batch.EndLabel) + startCypherProps := ToCypherProps(batch.StartMatchKeys, "rel.start.") + endCypherProps := ToCypherProps(batch.EndMatchKeys, "rel.end.") - serializedRels := []*graph.SerializedRel{} + serializedRels := []*SerializedRel{} for _, rel := range batch.Rels { serializedRels = append(serializedRels, rel.Serialize()) } diff --git a/graphstore/batch_test.go b/batchMerge_test.go similarity index 73% rename from graphstore/batch_test.go rename to batchMerge_test.go index 9622c13..489d3ef 100644 --- a/graphstore/batch_test.go +++ b/batchMerge_test.go @@ -1,7 +1,6 @@ -package graphstore +package heartwood import ( - "git.wisehodl.dev/jay/go-heartwood/graph" "github.com/stretchr/testify/assert" "testing" ) @@ -41,21 +40,21 @@ func TestRelBatchKey(t *testing.T) { } func TestBatchSubgraphAddNode(t *testing.T) { - matchKeys := graph.NewSimpleMatchKeys() + matchKeys := NewSimpleMatchKeys() subgraph := NewBatchSubgraph(matchKeys) - node := graph.NewEventNode("abc123") + node := NewEventNode("abc123") err := subgraph.AddNode(node) assert.NoError(t, err) assert.Equal(t, 1, subgraph.NodeCount()) - assert.Equal(t, []*graph.Node{node}, subgraph.nodes["Event:Event"]) + assert.Equal(t, []*Node{node}, subgraph.nodes["Event:Event"]) } func TestBatchSubgraphAddNodeInvalid(t *testing.T) { - matchKeys := graph.NewSimpleMatchKeys() + matchKeys := NewSimpleMatchKeys() subgraph := NewBatchSubgraph(matchKeys) - node := graph.NewNode("Event", graph.Properties{}) + node := NewNode("Event", Properties{}) err := subgraph.AddNode(node) @@ -64,24 +63,24 @@ func TestBatchSubgraphAddNodeInvalid(t *testing.T) { } func TestBatchSubgraphAddRel(t *testing.T) { - matchKeys := graph.NewSimpleMatchKeys() + matchKeys := NewSimpleMatchKeys() subgraph := NewBatchSubgraph(matchKeys) - userNode := graph.NewUserNode("pubkey1") - eventNode := graph.NewEventNode("abc123") - rel := graph.NewSignedRel(userNode, eventNode, nil) + userNode := NewUserNode("pubkey1") + eventNode := NewEventNode("abc123") + rel := NewSignedRel(userNode, eventNode, nil) err := subgraph.AddRel(rel) assert.NoError(t, err) assert.Equal(t, 1, subgraph.RelCount()) - assert.Equal(t, []*graph.Relationship{rel}, subgraph.rels["SIGNED,User,Event"]) + assert.Equal(t, []*Relationship{rel}, subgraph.rels["SIGNED,User,Event"]) } func TestNodeBatches(t *testing.T) { - matchKeys := graph.NewSimpleMatchKeys() + matchKeys := NewSimpleMatchKeys() subgraph := NewBatchSubgraph(matchKeys) - node := graph.NewEventNode("abc123") + node := NewEventNode("abc123") subgraph.AddNode(node) batches, err := subgraph.NodeBatches() @@ -91,15 +90,15 @@ func TestNodeBatches(t *testing.T) { assert.Equal(t, "Event", batches[0].MatchLabel) assert.ElementsMatch(t, []string{"Event"}, batches[0].Labels) assert.ElementsMatch(t, []string{"id"}, batches[0].MatchKeys) - assert.Equal(t, []*graph.Node{node}, batches[0].Nodes) + assert.Equal(t, []*Node{node}, batches[0].Nodes) } func TestRelBatches(t *testing.T) { - matchKeys := graph.NewSimpleMatchKeys() + matchKeys := NewSimpleMatchKeys() subgraph := NewBatchSubgraph(matchKeys) - userNode := graph.NewUserNode("pubkey1") - eventNode := graph.NewEventNode("abc123") - rel := graph.NewSignedRel(userNode, eventNode, nil) + userNode := NewUserNode("pubkey1") + eventNode := NewEventNode("abc123") + rel := NewSignedRel(userNode, eventNode, nil) subgraph.AddRel(rel) batches, err := subgraph.RelBatches() @@ -111,5 +110,5 @@ func TestRelBatches(t *testing.T) { assert.ElementsMatch(t, []string{"pubkey"}, batches[0].StartMatchKeys) assert.Equal(t, "Event", batches[0].EndLabel) assert.ElementsMatch(t, []string{"id"}, batches[0].EndMatchKeys) - assert.Equal(t, []*graph.Relationship{rel}, batches[0].Rels) + assert.Equal(t, []*Relationship{rel}, batches[0].Rels) } diff --git a/boltdb.go b/boltdb.go new file mode 100644 index 0000000..9b29f29 --- /dev/null +++ b/boltdb.go @@ -0,0 +1,80 @@ +package heartwood + +import ( + "github.com/boltdb/bolt" +) + +// Interface + +type BoltDB interface { + Setup() error + BatchCheckEventsExist(eventIDs []string) map[string]bool + BatchWriteEvents(events []EventBlob) error +} + +func NewKVDB(boltdb *bolt.DB) BoltDB { + return &boltDB{db: boltdb} +} + +type boltDB struct { + db *bolt.DB +} + +func (b *boltDB) Setup() error { + return SetupBoltDB(b.db) +} + +func (b *boltDB) BatchCheckEventsExist(eventIDs []string) map[string]bool { + return BatchCheckEventsExist(b.db, eventIDs) +} + +func (b *boltDB) BatchWriteEvents(events []EventBlob) error { + return BatchWriteEvents(b.db, events) +} + +func SetupBoltDB(boltdb *bolt.DB) error { + return boltdb.Update(func(tx *bolt.Tx) error { + _, err := tx.CreateBucketIfNotExists([]byte(BucketName)) + return err + }) +} + +// Functions + +const BucketName string = "events" + +type EventBlob struct { + ID string + JSON string +} + +func BatchCheckEventsExist(boltdb *bolt.DB, eventIDs []string) map[string]bool { + existsMap := make(map[string]bool) + + boltdb.View(func(tx *bolt.Tx) error { + bucket := tx.Bucket([]byte(BucketName)) + if bucket == nil { + return nil + } + for _, id := range eventIDs { + existsMap[id] = bucket.Get([]byte(id)) != nil + } + return nil + }) + + return existsMap +} + +func BatchWriteEvents(boltdb *bolt.DB, events []EventBlob) error { + return boltdb.Update(func(tx *bolt.Tx) error { + bucket := tx.Bucket([]byte(BucketName)) + for _, event := range events { + if err := bucket.Put( + []byte(event.ID), []byte(event.JSON), + ); err != nil { + return err + } + } + return nil + }) +} diff --git a/cypher/cypher.go b/cypher.go similarity index 98% rename from cypher/cypher.go rename to cypher.go index 10adabe..271db62 100644 --- a/cypher/cypher.go +++ b/cypher.go @@ -1,4 +1,4 @@ -package cypher +package heartwood import ( "fmt" diff --git a/cypher/cypher_test.go b/cypher_test.go similarity index 97% rename from cypher/cypher_test.go rename to cypher_test.go index 8ce83c5..a5402ea 100644 --- a/cypher/cypher_test.go +++ b/cypher_test.go @@ -1,4 +1,4 @@ -package cypher +package heartwood import ( "github.com/stretchr/testify/assert" diff --git a/expanders.go b/expanders.go index fa2fa9c..66a691a 100644 --- a/expanders.go +++ b/expanders.go @@ -1,7 +1,6 @@ package heartwood import ( - "git.wisehodl.dev/jay/go-heartwood/graph" roots "git.wisehodl.dev/jay/go-roots/events" ) @@ -39,10 +38,10 @@ func ExpandTaggedEvents(e roots.Event, s *EventSubgraph) { continue } - referencedEvent := graph.NewEventNode(value) + referencedEvent := NewEventNode(value) s.AddNode(referencedEvent) - s.AddRel(graph.NewReferencesEventRel(tagNode, referencedEvent, nil)) + s.AddRel(NewReferencesEventRel(tagNode, referencedEvent, nil)) } } @@ -64,16 +63,16 @@ func ExpandTaggedUsers(e roots.Event, s *EventSubgraph) { continue } - referencedEvent := graph.NewUserNode(value) + referencedEvent := NewUserNode(value) s.AddNode(referencedEvent) - s.AddRel(graph.NewReferencesUserRel(tagNode, referencedEvent, nil)) + s.AddRel(NewReferencesUserRel(tagNode, referencedEvent, nil)) } } // Helpers -func findTagNode(nodes []*graph.Node, name, value string) *graph.Node { +func findTagNode(nodes []*Node, name, value string) *Node { for _, node := range nodes { if node.Props["name"] == name && node.Props["value"] == value { return node diff --git a/graph/graph.go b/graph.go similarity index 97% rename from graph/graph.go rename to graph.go index 030a19d..d167d9e 100644 --- a/graph/graph.go +++ b/graph.go @@ -1,7 +1,4 @@ -// This module defines types and functions for working with Neo4j graph -// entities. - -package graph +package heartwood import ( "fmt" diff --git a/graph/graph_test.go b/graph_test.go similarity index 99% rename from graph/graph_test.go rename to graph_test.go index 987196a..92409c6 100644 --- a/graph/graph_test.go +++ b/graph_test.go @@ -1,4 +1,4 @@ -package graph +package heartwood import ( "github.com/stretchr/testify/assert" diff --git a/graphstore/connect.go b/graphstore/connect.go deleted file mode 100644 index c3b5720..0000000 --- a/graphstore/connect.go +++ /dev/null @@ -1,23 +0,0 @@ -package graphstore - -import ( - "context" - "github.com/neo4j/neo4j-go-driver/v6/neo4j" -) - -// ConnectNeo4j creates a new Neo4j driver and verifies its connectivity. -func ConnectNeo4j(ctx context.Context, uri, user, password string) (*neo4j.Driver, error) { - driver, err := neo4j.NewDriver( - uri, - neo4j.BasicAuth(user, password, "")) - if err != nil { - return nil, err - } - - err = driver.VerifyConnectivity(ctx) - if err != nil { - return nil, err - } - - return &driver, nil -} diff --git a/graphstore/schema.go b/graphstore/schema.go deleted file mode 100644 index 13ed0c3..0000000 --- a/graphstore/schema.go +++ /dev/null @@ -1,42 +0,0 @@ -package graphstore - -import ( - "context" - "github.com/neo4j/neo4j-go-driver/v6/neo4j" -) - -// SetNeo4jSchema ensures that the necessary indexes and constraints exist in -// the database -func SetNeo4jSchema(ctx context.Context, driver neo4j.Driver) error { - schemaQueries := []string{ - `CREATE CONSTRAINT user_pubkey IF NOT EXISTS - FOR (n:User) REQUIRE n.pubkey IS UNIQUE`, - - `CREATE INDEX user_pubkey IF NOT EXISTS - FOR (n:User) ON (n.pubkey)`, - - `CREATE INDEX event_id IF NOT EXISTS - FOR (n:Event) ON (n.id)`, - - `CREATE INDEX event_kind IF NOT EXISTS - FOR (n:Event) ON (n.kind)`, - - `CREATE INDEX tag_name_value IF NOT EXISTS - FOR (n:Tag) ON (n.name, n.value)`, - } - - // Create indexes and constraints - for _, query := range schemaQueries { - _, err := neo4j.ExecuteQuery(ctx, driver, - query, - nil, - neo4j.EagerResultTransformer, - neo4j.ExecuteQueryWithDatabase("neo4j")) - - if err != nil { - return err - } - } - - return nil -} diff --git a/neo4j.go b/neo4j.go new file mode 100644 index 0000000..be19d42 --- /dev/null +++ b/neo4j.go @@ -0,0 +1,42 @@ +package heartwood + +import ( + "context" + "github.com/neo4j/neo4j-go-driver/v6/neo4j" +) + +// Interface + +type GraphDB interface { + MergeSubgraph(ctx context.Context, subgraph *BatchSubgraph) ([]neo4j.ResultSummary, error) +} + +func NewGraphDriver(driver neo4j.Driver) GraphDB { + return &graphdb{driver: driver} +} + +type graphdb struct { + driver neo4j.Driver +} + +func (n *graphdb) MergeSubgraph(ctx context.Context, subgraph *BatchSubgraph) ([]neo4j.ResultSummary, error) { + return MergeSubgraph(ctx, n.driver, subgraph) +} + +// Functions + +func ConnectNeo4j(ctx context.Context, uri, user, password string) (neo4j.Driver, error) { + driver, err := neo4j.NewDriver( + uri, + neo4j.BasicAuth(user, password, "")) + if err != nil { + return nil, err + } + + err = driver.VerifyConnectivity(ctx) + if err != nil { + return nil, err + } + + return driver, nil +} diff --git a/graph/schema.go b/schema.go similarity index 68% rename from graph/schema.go rename to schema.go index 4a2393e..39017b3 100644 --- a/graph/schema.go +++ b/schema.go @@ -1,10 +1,9 @@ -// This module provides methods for creating nodes and relationships according -// to a defined schema. - -package graph +package heartwood import ( + "context" "fmt" + "github.com/neo4j/neo4j-go-driver/v6/neo4j" ) // ======================================== @@ -99,3 +98,43 @@ func NewRelationshipWithValidation( return NewRelationship(rtype, start, end, props) } + +// ======================================== +// Schema Indexes and Constraints +// ======================================== + +// SetNeo4jSchema ensures that the necessary indexes and constraints exist in +// the database +func SetNeo4jSchema(ctx context.Context, driver neo4j.Driver) error { + schemaQueries := []string{ + `CREATE CONSTRAINT user_pubkey IF NOT EXISTS + FOR (n:User) REQUIRE n.pubkey IS UNIQUE`, + + `CREATE INDEX user_pubkey IF NOT EXISTS + FOR (n:User) ON (n.pubkey)`, + + `CREATE INDEX event_id IF NOT EXISTS + FOR (n:Event) ON (n.id)`, + + `CREATE INDEX event_kind IF NOT EXISTS + FOR (n:Event) ON (n.kind)`, + + `CREATE INDEX tag_name_value IF NOT EXISTS + FOR (n:Tag) ON (n.name, n.value)`, + } + + // Create indexes and constraints + for _, query := range schemaQueries { + _, err := neo4j.ExecuteQuery(ctx, driver, + query, + nil, + neo4j.EagerResultTransformer, + neo4j.ExecuteQueryWithDatabase("neo4j")) + + if err != nil { + return err + } + } + + return nil +} diff --git a/graph/schema_test.go b/schema_test.go similarity index 98% rename from graph/schema_test.go rename to schema_test.go index ae5882d..0f19065 100644 --- a/graph/schema_test.go +++ b/schema_test.go @@ -1,4 +1,4 @@ -package graph +package heartwood import ( "github.com/stretchr/testify/assert" diff --git a/graph/set.go b/set.go similarity index 97% rename from graph/set.go rename to set.go index 27c9c07..72b0814 100644 --- a/graph/set.go +++ b/set.go @@ -1,4 +1,4 @@ -package graph +package heartwood // Sets diff --git a/subgraph.go b/subgraph.go index 28e4215..177ace5 100644 --- a/subgraph.go +++ b/subgraph.go @@ -1,42 +1,41 @@ package heartwood import ( - "git.wisehodl.dev/jay/go-heartwood/graph" roots "git.wisehodl.dev/jay/go-roots/events" ) // Event subgraph struct type EventSubgraph struct { - nodes []*graph.Node - rels []*graph.Relationship + nodes []*Node + rels []*Relationship } func NewEventSubgraph() *EventSubgraph { return &EventSubgraph{ - nodes: []*graph.Node{}, - rels: []*graph.Relationship{}, + nodes: []*Node{}, + rels: []*Relationship{}, } } -func (s *EventSubgraph) AddNode(node *graph.Node) { +func (s *EventSubgraph) AddNode(node *Node) { s.nodes = append(s.nodes, node) } -func (s *EventSubgraph) AddRel(rel *graph.Relationship) { +func (s *EventSubgraph) AddRel(rel *Relationship) { s.rels = append(s.rels, rel) } -func (s *EventSubgraph) Nodes() []*graph.Node { +func (s *EventSubgraph) Nodes() []*Node { return s.nodes } -func (s *EventSubgraph) Rels() []*graph.Relationship { +func (s *EventSubgraph) Rels() []*Relationship { return s.rels } -func (s *EventSubgraph) NodesByLabel(label string) []*graph.Node { - nodes := []*graph.Node{} +func (s *EventSubgraph) NodesByLabel(label string) []*Node { + nodes := []*Node{} for _, node := range s.nodes { if node.Labels.Contains(label) { nodes = append(nodes, node) @@ -90,37 +89,37 @@ func EventToSubgraph(e roots.Event, p ExpanderPipeline) *EventSubgraph { return s } -func newEventNode(eventID string, createdAt int, kind int, content string) *graph.Node { - eventNode := graph.NewEventNode(eventID) +func newEventNode(eventID string, createdAt int, kind int, content string) *Node { + eventNode := NewEventNode(eventID) eventNode.Props["created_at"] = createdAt eventNode.Props["kind"] = kind eventNode.Props["content"] = content return eventNode } -func newUserNode(pubkey string) *graph.Node { - return graph.NewUserNode(pubkey) +func newUserNode(pubkey string) *Node { + return NewUserNode(pubkey) } -func newSignedRel(user, event *graph.Node) *graph.Relationship { - return graph.NewSignedRel(user, event, nil) +func newSignedRel(user, event *Node) *Relationship { + return NewSignedRel(user, event, nil) } -func newTagNodes(tags []roots.Tag) []*graph.Node { - nodes := []*graph.Node{} +func newTagNodes(tags []roots.Tag) []*Node { + nodes := []*Node{} for _, tag := range tags { if !isValidTag(tag) { continue } - nodes = append(nodes, graph.NewTagNode(tag[0], tag[1])) + nodes = append(nodes, NewTagNode(tag[0], tag[1])) } return nodes } -func newTagRels(event *graph.Node, tags []*graph.Node) []*graph.Relationship { - rels := []*graph.Relationship{} +func newTagRels(event *Node, tags []*Node) []*Relationship { + rels := []*Relationship{} for _, tag := range tags { - rels = append(rels, graph.NewTaggedRel(event, tag, nil)) + rels = append(rels, NewTaggedRel(event, tag, nil)) } return rels } diff --git a/subgraph_test.go b/subgraph_test.go index 23a5f59..7cf25af 100644 --- a/subgraph_test.go +++ b/subgraph_test.go @@ -2,7 +2,6 @@ package heartwood import ( "fmt" - "git.wisehodl.dev/jay/go-heartwood/graph" roots "git.wisehodl.dev/jay/go-roots/events" "github.com/stretchr/testify/assert" "reflect" @@ -22,21 +21,21 @@ var static = roots.Event{ Content: "hello", } -func newFullEventNode(id string, createdAt, kind int, content string) *graph.Node { - n := graph.NewEventNode(id) +func newFullEventNode(id string, createdAt, kind int, content string) *Node { + n := NewEventNode(id) n.Props["created_at"] = createdAt n.Props["kind"] = kind n.Props["content"] = content return n } -func baseSubgraph(eventID, pubkey string) (*EventSubgraph, *graph.Node, *graph.Node) { +func baseSubgraph(eventID, pubkey string) (*EventSubgraph, *Node, *Node) { s := NewEventSubgraph() eventNode := newFullEventNode(eventID, static.CreatedAt, static.Kind, static.Content) - userNode := graph.NewUserNode(pubkey) + userNode := NewUserNode(pubkey) s.AddNode(eventNode) s.AddNode(userNode) - s.AddRel(graph.NewSignedRel(userNode, eventNode, nil)) + s.AddRel(NewSignedRel(userNode, eventNode, nil)) return s, eventNode, userNode } @@ -66,9 +65,9 @@ func TestEventToSubgraph(t *testing.T) { }, expected: func() *EventSubgraph { s, eventNode, _ := baseSubgraph(ids["a"], ids["b"]) - tagNode := graph.NewTagNode("t", "bitcoin") + tagNode := NewTagNode("t", "bitcoin") s.AddNode(tagNode) - s.AddRel(graph.NewTaggedRel(eventNode, tagNode, nil)) + s.AddRel(NewTaggedRel(eventNode, tagNode, nil)) return s }(), }, @@ -93,12 +92,12 @@ func TestEventToSubgraph(t *testing.T) { }, expected: func() *EventSubgraph { s, eventNode, _ := baseSubgraph(ids["a"], ids["b"]) - tagNode := graph.NewTagNode("e", ids["c"]) - referencedEvent := graph.NewEventNode(ids["c"]) + tagNode := NewTagNode("e", ids["c"]) + referencedEvent := NewEventNode(ids["c"]) s.AddNode(tagNode) s.AddNode(referencedEvent) - s.AddRel(graph.NewTaggedRel(eventNode, tagNode, nil)) - s.AddRel(graph.NewReferencesEventRel(tagNode, referencedEvent, nil)) + s.AddRel(NewTaggedRel(eventNode, tagNode, nil)) + s.AddRel(NewReferencesEventRel(tagNode, referencedEvent, nil)) return s }(), }, @@ -111,9 +110,9 @@ func TestEventToSubgraph(t *testing.T) { }, expected: func() *EventSubgraph { s, eventNode, _ := baseSubgraph(ids["a"], ids["b"]) - tagNode := graph.NewTagNode("e", "notvalid") + tagNode := NewTagNode("e", "notvalid") s.AddNode(tagNode) - s.AddRel(graph.NewTaggedRel(eventNode, tagNode, nil)) + s.AddRel(NewTaggedRel(eventNode, tagNode, nil)) return s }(), }, @@ -126,12 +125,12 @@ func TestEventToSubgraph(t *testing.T) { }, expected: func() *EventSubgraph { s, eventNode, _ := baseSubgraph(ids["a"], ids["b"]) - tagNode := graph.NewTagNode("p", ids["d"]) - referencedUser := graph.NewUserNode(ids["d"]) + tagNode := NewTagNode("p", ids["d"]) + referencedUser := NewUserNode(ids["d"]) s.AddNode(tagNode) s.AddNode(referencedUser) - s.AddRel(graph.NewTaggedRel(eventNode, tagNode, nil)) - s.AddRel(graph.NewReferencesUserRel(tagNode, referencedUser, nil)) + s.AddRel(NewTaggedRel(eventNode, tagNode, nil)) + s.AddRel(NewReferencesUserRel(tagNode, referencedUser, nil)) return s }(), }, @@ -144,9 +143,9 @@ func TestEventToSubgraph(t *testing.T) { }, expected: func() *EventSubgraph { s, eventNode, _ := baseSubgraph(ids["a"], ids["b"]) - tagNode := graph.NewTagNode("p", "notvalid") + tagNode := NewTagNode("p", "notvalid") s.AddNode(tagNode) - s.AddRel(graph.NewTaggedRel(eventNode, tagNode, nil)) + s.AddRel(NewTaggedRel(eventNode, tagNode, nil)) return s }(), }, @@ -164,7 +163,7 @@ func TestEventToSubgraph(t *testing.T) { // helpers -func nodesEqual(expected, got *graph.Node) error { +func nodesEqual(expected, got *Node) error { // Compare label counts if expected.Labels.Length() != got.Labels.Length() { return fmt.Errorf( @@ -187,7 +186,7 @@ func nodesEqual(expected, got *graph.Node) error { return nil } -func relsEqual(expected, got *graph.Relationship) error { +func relsEqual(expected, got *Relationship) error { // Compare type if expected.Type != got.Type { return fmt.Errorf("type: expected %q, got %q", expected.Type, got.Type) @@ -209,7 +208,7 @@ func relsEqual(expected, got *graph.Relationship) error { return nil } -func propsEqual(expected, got graph.Properties) error { +func propsEqual(expected, got Properties) error { if len(expected) != len(got) { return fmt.Errorf( "number of props does not match. expected %d, got %d", @@ -231,10 +230,10 @@ func propsEqual(expected, got graph.Properties) error { func assertSubgraphsEqual(t *testing.T, expected, got *EventSubgraph) { t.Helper() - gotNodes := make([]*graph.Node, len(got.Nodes())) + gotNodes := make([]*Node, len(got.Nodes())) copy(gotNodes, got.Nodes()) - gotRels := make([]*graph.Relationship, len(got.Rels())) + gotRels := make([]*Relationship, len(got.Rels())) copy(gotRels, got.Rels()) for _, expectedNode := range expected.Nodes() { diff --git a/write.go b/write.go index 4325f83..e8f53bf 100644 --- a/write.go +++ b/write.go @@ -1,20 +1,25 @@ package heartwood import ( + "context" + "encoding/json" "fmt" roots "git.wisehodl.dev/jay/go-roots/events" - "github.com/boltdb/bolt" "github.com/neo4j/neo4j-go-driver/v6/neo4j" "sync" - // "git.wisehodl.dev/jay/go-heartwood/graph" "time" ) +type WriteOptions struct { + Expanders ExpanderPipeline + KVReadBatchSize int +} + type EventFollower struct { ID string JSON string Event roots.Event - Subgraph EventSubgraph + Subgraph *EventSubgraph Error error } @@ -34,11 +39,18 @@ type WriteReport struct { func WriteEvents( events []string, - driver *neo4j.Driver, boltdb *bolt.DB, + graphdb GraphDB, boltdb BoltDB, + opts *WriteOptions, ) (WriteReport, error) { start := time.Now() - err := setupBoltDB(boltdb) + if opts == nil { + opts = &WriteOptions{} + } + + setDefaultWriteOptions(opts) + + err := boltdb.Setup() if err != nil { return WriteReport{}, fmt.Errorf("error setting up bolt db: %w", err) } @@ -81,7 +93,10 @@ func WriteEvents( wg.Add(1) go func() { defer wg.Done() - enforcePolicyRules(driver, boltdb, parsedChan, queuedChan, skippedChan) + enforcePolicyRules( + graphdb, boltdb, + opts.KVReadBatchSize, + parsedChan, queuedChan, skippedChan) }() // Collect Skipped Events @@ -99,7 +114,7 @@ func WriteEvents( wg.Add(1) go func() { defer wg.Done() - convertEventsToSubgraphs(queuedChan, convertedChan) + convertEventsToSubgraphs(opts.Expanders, queuedChan, convertedChan) }() // Write Events To Databases @@ -109,7 +124,7 @@ func WriteEvents( go func() { defer wg.Done() writeEventsToDatabases( - driver, boltdb, + graphdb, boltdb, convertedChan, writeResultChan) }() @@ -139,20 +154,102 @@ func WriteEvents( }, writeResult.Error } -func setupBoltDB(boltdb *bolt.DB) error +func setDefaultWriteOptions(opts *WriteOptions) { + if opts.Expanders == nil { + opts.Expanders = NewExpanderPipeline(DefaultExpanders()...) + } + if opts.KVReadBatchSize == 0 { + opts.KVReadBatchSize = 100 + } +} -func createEventFollowers(jsonChan chan string, eventChan chan EventFollower) +func createEventFollowers(jsonChan chan string, eventChan chan EventFollower) { + for json := range jsonChan { + eventChan <- EventFollower{JSON: json} + } + close(eventChan) +} -func parseEventJSON(inChan, parsedChan, invalidChan chan EventFollower) +func parseEventJSON(inChan, parsedChan, invalidChan chan EventFollower) { + for follower := range inChan { + var event roots.Event + jsonBytes := []byte(follower.JSON) + err := json.Unmarshal(jsonBytes, &event) + if err != nil { + follower.Error = err + invalidChan <- follower + continue + } + + follower.ID = event.ID + follower.Event = event + parsedChan <- follower + } + + close(parsedChan) + close(invalidChan) +} func enforcePolicyRules( - driver *neo4j.Driver, boltdb *bolt.DB, - inChan, queuedChan, skippedChan chan EventFollower) + graphdb GraphDB, boltdb BoltDB, + batchSize int, + inChan, queuedChan, skippedChan chan EventFollower, +) { + batch := []EventFollower{} -func convertEventsToSubgraphs(inChan, convertedChan chan EventFollower) + for follower := range inChan { + batch = append(batch, follower) + + if len(batch) >= batchSize { + processPolicyRulesBatch(boltdb, batch, queuedChan, skippedChan) + batch = []EventFollower{} + } + } + + if len(batch) > 0 { + processPolicyRulesBatch(boltdb, batch, queuedChan, skippedChan) + } + + close(queuedChan) + close(skippedChan) +} + +func processPolicyRulesBatch( + boltdb BoltDB, + batch []EventFollower, + queuedChan, skippedChan chan EventFollower, +) { + eventIDs := []string{} + + for _, follower := range batch { + eventIDs = append(eventIDs, follower.ID) + } + + existsMap := boltdb.BatchCheckEventsExist(eventIDs) + + for _, follower := range batch { + if existsMap[follower.ID] { + skippedChan <- follower + } else { + queuedChan <- follower + } + } +} + +func convertEventsToSubgraphs( + expanders ExpanderPipeline, + inChan, convertedChan chan EventFollower, +) { + for follower := range inChan { + subgraph := EventToSubgraph(follower.Event, expanders) + follower.Subgraph = subgraph + convertedChan <- follower + } + close(convertedChan) +} func writeEventsToDatabases( - driver *neo4j.Driver, boltdb *bolt.DB, + graphdb GraphDB, boltdb BoltDB, inChan chan EventFollower, resultChan chan WriteResult, ) { @@ -171,12 +268,12 @@ func writeEventsToDatabases( defer wg.Done() writeEventsToKVStore( boltdb, - kvEventChan, kvErrorChan) + kvEventChan, kvWriteDone, kvErrorChan) }() go func() { defer wg.Done() - writeEventsToGraphStore( - driver, + writeEventsToGraphDriver( + graphdb, graphEventChan, kvWriteDone, graphResultChan) }() @@ -191,37 +288,86 @@ func writeEventsToDatabases( wg.Wait() kvError := <-kvErrorChan - if kvError != nil { - close(kvWriteDone) // signal abort - resultChan <- WriteResult{Error: kvError} - return - } - - // Signal graph writer to proceed - kvWriteDone <- struct{}{} - close(kvWriteDone) - graphResult := <-graphResultChan - if graphResult.Error != nil { - resultChan <- WriteResult{Error: graphResult.Error} - return + + var finalErr error + if kvError != nil && graphResult.Error != nil { + finalErr = fmt.Errorf("kvstore: %w; graphstore: %v", kvError, graphResult.Error) + } else if kvError != nil { + finalErr = fmt.Errorf("kvstore: %w", kvError) + } else if graphResult.Error != nil { + finalErr = fmt.Errorf("graphstore: %w", graphResult.Error) } - resultChan <- graphResult - + resultChan <- WriteResult{ + ResultSummaries: graphResult.ResultSummaries, + Error: finalErr, + } } func writeEventsToKVStore( - boltdb *bolt.DB, + boltdb BoltDB, inChan chan EventFollower, + done chan struct{}, resultChan chan error, -) +) { + events := []EventBlob{} -func writeEventsToGraphStore( - driver *neo4j.Driver, + for follower := range inChan { + events = append(events, + EventBlob{ID: follower.ID, JSON: follower.JSON}) + } + + err := boltdb.BatchWriteEvents(events) + if err != nil { + close(done) + } else { + done <- struct{}{} + close(done) + } + + resultChan <- err + close(resultChan) +} + +func writeEventsToGraphDriver( + graphdb GraphDB, inChan chan EventFollower, start chan struct{}, resultChan chan WriteResult, -) +) { + matchKeys := NewSimpleMatchKeys() + batch := NewBatchSubgraph(matchKeys) -func collectEvents(inChan chan EventFollower, resultChan chan []EventFollower) + for follower := range inChan { + for _, node := range follower.Subgraph.Nodes() { + batch.AddNode(node) + } + for _, rel := range follower.Subgraph.Rels() { + batch.AddRel(rel) + } + } + + _, ok := <-start + if !ok { + resultChan <- WriteResult{Error: fmt.Errorf("kv write failed, aborting graph write")} + close(resultChan) + return + } + + summaries, err := graphdb.MergeSubgraph(context.Background(), batch) + resultChan <- WriteResult{ + ResultSummaries: summaries, + Error: err, + } + close(resultChan) +} + +func collectEvents(inChan chan EventFollower, resultChan chan []EventFollower) { + collected := []EventFollower{} + for follower := range inChan { + collected = append(collected, follower) + } + resultChan <- collected + close(resultChan) +}