From f09b2e1b7b6d8083aa5444af20f1c79e5f91d397 Mon Sep 17 00:00:00 2001 From: Radmehr Soleimanian Date: Sat, 27 Sep 2025 19:07:19 +0330 Subject: [PATCH] feat: a read-optimized FactStore --- factstore/benchmark_test.go | 135 ++++++++++++++++++ factstore/columnarstore.go | 265 ++++++++++++++++++++++++++++++++++++ factstore/factstore_test.go | 5 +- 3 files changed, 404 insertions(+), 1 deletion(-) create mode 100644 factstore/benchmark_test.go create mode 100644 factstore/columnarstore.go diff --git a/factstore/benchmark_test.go b/factstore/benchmark_test.go new file mode 100644 index 0000000..088a6a9 --- /dev/null +++ b/factstore/benchmark_test.go @@ -0,0 +1,135 @@ +package factstore + +import ( + "fmt" + "math/rand" + "testing" + + "github.com/google/mangle/ast" +) + +func BenchmarkAdd(b *testing.B) { + for _, store := range []FactStoreWithRemove{ + NewSimpleInMemoryStore(), + NewIndexedInMemoryStore(), + NewMultiIndexedInMemoryStore(), + NewMultiIndexedArrayInMemoryStore(), + NewConcurrentFactStore(NewSimpleInMemoryStore()), + NewColumnarStore(), + } { + b.Run(fmt.Sprintf("%T", store), func(b *testing.B) { + b.ReportAllocs() + for i := 0; i < b.N; i++ { + p := ast.PredicateSym{Symbol: fmt.Sprintf("p%d", rand.Intn(10)), Arity: 2} + c1 := ast.String(fmt.Sprintf("c%d", rand.Intn(100))) + c2 := ast.String(fmt.Sprintf("c%d", rand.Intn(100))) + store.Add(ast.Atom{p, []ast.BaseTerm{c1, c2}}) + } + }) + } +} + +func BenchmarkGetFacts(b *testing.B) { + for _, store := range []FactStoreWithRemove{ + NewSimpleInMemoryStore(), + NewIndexedInMemoryStore(), + NewMultiIndexedInMemoryStore(), + NewMultiIndexedArrayInMemoryStore(), + NewConcurrentFactStore(NewSimpleInMemoryStore()), + NewColumnarStore(), + } { + for i := 0; i < 1000000; i++ { + p := ast.PredicateSym{Symbol: fmt.Sprintf("p%d", i%10), Arity: 2} + c1 := ast.String(fmt.Sprintf("c%d", i%100)) + c2 := ast.String(fmt.Sprintf("c%d", i%100)) + store.Add(ast.Atom{p, []ast.BaseTerm{c1, c2}}) + } + + b.Run(fmt.Sprintf("%T", store), func(b *testing.B) { + b.ReportAllocs() + for i := 0; i < b.N; i++ { + p := ast.PredicateSym{Symbol: fmt.Sprintf("p%d", rand.Intn(10)), Arity: 2} + c1 := ast.String(fmt.Sprintf("c%d", rand.Intn(100))) + store.GetFacts(ast.Atom{p, []ast.BaseTerm{c1, ast.Variable{"_"}}}, func(a ast.Atom) error { + return nil + }) + } + }) + } +} + +func BenchmarkGetFacts_BigQuery(b *testing.B) { + for _, store := range []FactStoreWithRemove{ + NewSimpleInMemoryStore(), + NewIndexedInMemoryStore(), + NewMultiIndexedInMemoryStore(), + NewMultiIndexedArrayInMemoryStore(), + NewConcurrentFactStore(NewSimpleInMemoryStore()), + NewColumnarStore(), + } { + for i := 0; i < 1000000; i++ { + p := ast.PredicateSym{Symbol: "p", Arity: 3} + c1 := ast.String(fmt.Sprintf("c%d", i%2)) // 2 distinct values. + c2 := ast.String(fmt.Sprintf("c%d", i)) + c3 := ast.String(fmt.Sprintf("c%d", i)) + store.Add(ast.Atom{p, []ast.BaseTerm{c1, c2, c3}}) + } + + b.Run(fmt.Sprintf("%T", store), func(b *testing.B) { + b.ReportAllocs() + b.ResetTimer() + for i := 0; i < b.N; i++ { + p := ast.PredicateSym{Symbol: "p", Arity: 3} + c1 := ast.String("c0") + store.GetFacts(ast.Atom{p, []ast.BaseTerm{c1, ast.Variable{"_"}, ast.Variable{"_"}}}, func(a ast.Atom) error { + return nil + }) + } + }) + } +} + +func BenchmarkMerge(b *testing.B) { + sourceStore := NewSimpleInMemoryStore() + mg := NewColumnarStore() + for i := 0; i < 10000; i++ { + p := ast.PredicateSym{Symbol: fmt.Sprintf("p%d", i%20), Arity: 2} + c1 := ast.String(fmt.Sprintf("c%d", i%200)) + c2 := ast.String(fmt.Sprintf("c%d", i%200)) + sourceStore.Add(ast.Atom{p, []ast.BaseTerm{c1, c2}}) + } + + for i := 0; i < 10000; i++ { + p := ast.PredicateSym{Symbol: fmt.Sprintf("p%d", i%20), Arity: 2} + c1 := ast.String(fmt.Sprintf("c%d", i%200)) + c2 := ast.String(fmt.Sprintf("c%d", i%200)) + mg.Add(ast.Atom{p, []ast.BaseTerm{c1, c2}}) + } + + benchmarks := []struct { + name string + storeFunc func() FactStore + }{ + { + name: "SimpleInMemoryStore", + storeFunc: func() FactStore { return NewSimpleInMemoryStore() }, + }, + { + name: "ColumnarFactStore", + storeFunc: func() FactStore { return NewColumnarStore() }, + }, + } + for _, bm := range benchmarks { + b.Run(bm.name, func(b *testing.B) { + b.ReportAllocs() + // This inner loop is what's timed. + for i := 0; i < b.N; i++ { + // We must exclude the creation of the destination store from the benchmark time. + b.StopTimer() + destStore := bm.storeFunc() + b.StartTimer() + destStore.Merge(mg) + } + }) + } +} \ No newline at end of file diff --git a/factstore/columnarstore.go b/factstore/columnarstore.go new file mode 100644 index 0000000..7d75f10 --- /dev/null +++ b/factstore/columnarstore.go @@ -0,0 +1,265 @@ +// Copyright 2022 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package factstore + +import ( + "github.com/google/mangle/ast" +) + +type ColumnarStore struct { + tables map[ast.PredicateSym]*table +} + +func NewColumnarStore() *ColumnarStore { + return &ColumnarStore{ + tables: make(map[ast.PredicateSym]*table), + } +} + +// Add implements the FactStore interface by adding the fact to the backing map. +func (s *ColumnarStore) Add(a ast.Atom) bool { + t, ok := s.tables[a.Predicate] + if !ok { + t = newTable(a.Predicate.Arity) + s.tables[a.Predicate] = t + } + return t.add(a) +} + +// Remove removes the fact from the backing map. +func (s *ColumnarStore) Remove(a ast.Atom) bool { + t, ok := s.tables[a.Predicate] + if ok { + return t.remove(a) + } + return false +} + +// GetFacts retrieves facts that match a given query. +func (s *ColumnarStore) GetFacts(a ast.Atom, fn func(ast.Atom) error) error { + t, ok := s.tables[a.Predicate] + if ok { + return t.getFacts(a, fn) + } + return nil // should return error why coder doesn't return any error +} + +// Contains checks if a fact is in the store. +func (s *ColumnarStore) Contains(a ast.Atom) bool { + t, ok := s.tables[a.Predicate] + if ok { + return t.contains(a) + } + return false +} + +// EstimateFactCount returns the estimated number of facts in the store. +func (s *ColumnarStore) EstimateFactCount() int { + var count int + for _, t := range s.tables { + count += t.size() + } + return count +} + +// Merge merges another fact store into this one. +func (s *ColumnarStore) Merge(other ReadOnlyFactStore) { + // optimization if other = columnar? + // if other, ok := other.(*ColumnarStore); ok { + // for pred, table := range other.tables { + // s.tables[pred] = table + // } + // return + // } FIXME: THIS IS WRONG + for _, pred := range other.ListPredicates() { + other.GetFacts(ast.NewQuery(pred), func(fact ast.Atom) error { + s.Add(fact) + return nil + }) + } +} + +// ListPredicates lists all predicates in the store. +func (s *ColumnarStore) ListPredicates() []ast.PredicateSym { + preds := make([]ast.PredicateSym, 0, len(s.tables)) + for pred := range s.tables { + preds = append(preds, pred) + } + return preds +} + +// table stores all facts for a single predicate. +type table struct { + facts []ast.Atom + primary map[uint64][]int // Index for the entire atom hash. + indices []map[uint64][]int // Index for each argument hash. + free []int // A freelist of indices of removed facts. +} + +func newTable(arity int) *table { + indices := make([]map[uint64][]int, arity) + for i := range indices { + indices[i] = make(map[uint64][]int) + } + return &table{ + facts: make([]ast.Atom, 0, 1024), + primary: make(map[uint64][]int), + indices: indices, + free: make([]int, 0, 16), + } +} + +func (t *table) size() int { + return len(t.facts) - len(t.free) +} + +func (t *table) add(a ast.Atom) bool { + key := a.Hash() + if candidates, ok := t.primary[key]; ok { + for _, idx := range candidates { + if !isNil(t.facts[idx]) && t.facts[idx].Equals(a) { + return false // Already exists. + } + } + } + + // maybe let lines 131 - 149 run concurrently at all? + // a semaphore for table to control throughput? + var idx int + if len(t.free) > 0 { + idx = t.free[len(t.free)-1] + // add a lock & update free concurrently?? + // maybe try a pool of indices and update in intervals? + // overhead for writes tbh + t.free = t.free[:len(t.free)-1] + t.facts[idx] = a + } else { + idx = len(t.facts) + t.facts = append(t.facts, a) + } + + t.primary[key] = append(t.primary[key], idx) + for i, arg := range a.Args { + h := arg.Hash() + t.indices[i][h] = append(t.indices[i][h], idx) + } + return true +} + +func (t *table) remove(a ast.Atom) bool { + aHash := a.Hash() + candidates, ok := t.primary[aHash] + if !ok { + return false + } + + factIdx := -1 + candidateIdx := -1 + for i, idx := range candidates { + if !isNil(t.facts[idx]) && t.facts[idx].Equals(a) { + factIdx = idx + candidateIdx = i + break + } + } + + if factIdx == -1 { + return false + } + + candidates[candidateIdx] = candidates[len(candidates)-1] + newCandidates := candidates[:len(candidates)-1] + if len(newCandidates) == 0 { + delete(t.primary, aHash) + } else { + t.primary[aHash] = newCandidates + } + + // Remove from columnar indices. + for i, arg := range a.Args { + h := arg.Hash() + list := t.indices[i][h] + for j, idx := range list { + if idx == factIdx { + list[j] = list[len(list)-1] + t.indices[i][h] = list[:len(list)-1] + break + } + } + } + + // Mark the slot as free. + t.facts[factIdx] = ast.Atom{} // Zero out the atom. + t.free = append(t.free, factIdx) + return true +} + +func (t *table) getFacts(a ast.Atom, fn func(ast.Atom) error) error { + // Find the smallest set of candidate indices to check. + var candidates []int + var foundConst bool + for i, arg := range a.Args { + if _, isVar := arg.(ast.Variable); !isVar { + h := arg.Hash() + if list, ok := t.indices[i][h]; ok { + if !foundConst || len(list) < len(candidates) { + candidates = list + foundConst = true + } + } else { + // Hash not found, so no facts can match. + return nil + } + } + } + + if !foundConst { // No constants in query, scan all facts. + for _, fact := range t.facts { + if !isNil(fact) && Matches(a.Args, fact.Args) { + if err := fn(fact); err != nil { + return err + } + } + } + return nil + } + + for _, idx := range candidates { + fact := t.facts[idx] + if !isNil(fact) && Matches(a.Args, fact.Args) { + if err := fn(fact); err != nil { + return err + } + } + } + return nil +} + +func (t *table) contains(a ast.Atom) bool { + aHash := a.Hash() + if candidates, ok := t.primary[aHash]; ok { + for _, idx := range candidates { + if !isNil(t.facts[idx]) && t.facts[idx].Equals(a) { + return true + } + } + } + return false +} + +// isNil checks if an atom is the zero value. +func isNil(a ast.Atom) bool { + return a.Predicate.Symbol == "" +} diff --git a/factstore/factstore_test.go b/factstore/factstore_test.go index 4616691..5c233a3 100644 --- a/factstore/factstore_test.go +++ b/factstore/factstore_test.go @@ -51,7 +51,10 @@ func TestAddContainsRemove(t *testing.T) { NewMultiIndexedInMemoryStore(), NewMultiIndexedArrayInMemoryStore(), NewMergedStore([]FactStore{NewSimpleInMemoryStore()}, NewSimpleInMemoryStore()), - NewConcurrentFactStore(NewSimpleInMemoryStore())} { + NewConcurrentFactStore(NewSimpleInMemoryStore()), + NewColumnarStore(), + NewMergedStore([]FactStore{NewColumnarStore()}, NewColumnarStore()), + } { t.Run(fmt.Sprintf("%T", fs), func(t *testing.T) { tests := []ast.Atom{ atom("baz()"),