diff --git a/services/search/pkg/config/engine.go b/services/search/pkg/config/engine.go index 15cd29a378a..fc7623d8ac9 100644 --- a/services/search/pkg/config/engine.go +++ b/services/search/pkg/config/engine.go @@ -9,4 +9,5 @@ type Engine struct { // EngineBleve configures the bleve engine type EngineBleve struct { Datapath string `yaml:"data_path" env:"SEARCH_ENGINE_BLEVE_DATA_PATH" desc:"The directory where the filesystem will store search data. If not defined, the root directory derives from $OCIS_BASE_DATA_PATH/search." introductionVersion:"pre5.0"` + Scale bool `yaml:"scale" env:"SEARCH_ENGINE_BLEVE_SCALE" desc:"Enable scaling of the search index (bleve). If set to 'true', the instance of the search service will no longer have exclusive write access to the index. Note when scaling search, all instances of the search service must be set to true! For 'false', which is the default, the running search service has exclusive access to the index as long it is running. This locks out other search processes tying to access the index." introductionVersion:"%%NEXT%%"` } diff --git a/services/search/pkg/engine/bleve.go b/services/search/pkg/engine/bleve.go index c6f765b5db5..e5ebe3f51df 100644 --- a/services/search/pkg/engine/bleve.go +++ b/services/search/pkg/engine/bleve.go @@ -5,7 +5,6 @@ import ( "errors" "math" "path" - "path/filepath" "reflect" "strings" "time" @@ -31,44 +30,44 @@ import ( searchMessage "github.com/owncloud/ocis/v2/protogen/gen/ocis/messages/search/v0" searchService "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/search/v0" "github.com/owncloud/ocis/v2/services/search/pkg/content" + bleveEngine "github.com/owncloud/ocis/v2/services/search/pkg/engine/bleve" searchQuery "github.com/owncloud/ocis/v2/services/search/pkg/query" ) // Bleve represents a search engine which utilizes bleve to search and store resources. type Bleve struct { - index bleve.Index + indexGetter bleveEngine.IndexGetter queryCreator searchQuery.Creator[query.Query] } -// NewBleveIndex returns a new bleve index -// given path must exist. -func NewBleveIndex(root string) (bleve.Index, error) { - destination := filepath.Join(root, "bleve") - index, err := bleve.Open(destination) - if errors.Is(bleve.ErrorIndexPathDoesNotExist, err) { - m, err := BuildBleveMapping() - if err != nil { - return nil, err - } - index, err = bleve.New(destination, m) - if err != nil { - return nil, err - } - - return index, nil - } - - return index, err -} - // NewBleveEngine creates a new Bleve instance -func NewBleveEngine(index bleve.Index, queryCreator searchQuery.Creator[query.Query]) *Bleve { +// If scalable is set to true, one connection to the index is created and +// closed per operation, so multiple operations can be executed in parallel. +// If set to false, only one write connection is created for the whole +// service, which will lock the index for other processes. In this case, +// you must close the engine yourself. +func NewBleveEngine(indexGetter bleveEngine.IndexGetter, queryCreator searchQuery.Creator[query.Query]) *Bleve { return &Bleve{ - index: index, + indexGetter: indexGetter, queryCreator: queryCreator, } } +// Close will get the index and close it. If the indexGetter is returning +// new instances, this method will close just the new returned instance but +// not any other instances that might be in use. +// +// This method is useful if "memory" and "persistent" (not "persistentScale") +// index getters are used. +func (b *Bleve) Close() error { + // regardless of the implementation, we want to close the index + bleveIndex, _, err := b.indexGetter.GetIndex() + if err != nil { + return err + } + return bleveIndex.Close() +} + // BuildBleveMapping builds a bleve index mapping which can be used for indexing func BuildBleveMapping() (mapping.IndexMapping, error) { nameMapping := bleve.NewTextFieldMapping() @@ -123,6 +122,12 @@ func BuildBleveMapping() (mapping.IndexMapping, error) { // Search executes a search request operation within the index. // Returns a SearchIndexResponse object or an error. func (b *Bleve) Search(ctx context.Context, sir *searchService.SearchIndexRequest) (*searchService.SearchIndexResponse, error) { + bleveIndex, closeFn, err := b.indexGetter.GetIndex(bleveEngine.ReadOnly(true)) + if err != nil { + return nil, err + } + defer closeFn() + createdQuery, err := b.queryCreator.Create(sir.Query) if err != nil { if searchQuery.IsValidationError(err) { @@ -169,7 +174,7 @@ func (b *Bleve) Search(ctx context.Context, sir *searchService.SearchIndexReques } bleveReq.Fields = []string{"*"} - res, err := b.index.Search(bleveReq) + res, err := bleveIndex.Search(bleveReq) if err != nil { return nil, err } @@ -237,19 +242,31 @@ func (b *Bleve) Search(ctx context.Context, sir *searchService.SearchIndexReques // Upsert indexes or stores Resource data fields. func (b *Bleve) Upsert(id string, r Resource) error { - return b.index.Index(id, r) + bleveIndex, closeFn, err := b.indexGetter.GetIndex() + if err != nil { + return err + } + defer closeFn() + + return bleveIndex.Index(id, r) } // Move updates the resource location and all of its necessary fields. func (b *Bleve) Move(id string, parentid string, target string) error { - r, err := b.getResource(id) + bleveIndex, closeFn, err := b.indexGetter.GetIndex() + if err != nil { + return err + } + defer closeFn() + + r, err := b.getResource(bleveIndex, id) if err != nil { return err } currentPath := r.Path nextPath := utils.MakeRelativePath(target) - r, err = b.updateEntity(id, func(r *Resource) { + r, err = b.updateEntity(bleveIndex, id, func(r *Resource) { r.Path = nextPath r.Name = path.Base(nextPath) r.ParentID = parentid @@ -266,13 +283,13 @@ func (b *Bleve) Move(id string, parentid string, target string) error { bleveReq := bleve.NewSearchRequest(q) bleveReq.Size = math.MaxInt bleveReq.Fields = []string{"*"} - res, err := b.index.Search(bleveReq) + res, err := bleveIndex.Search(bleveReq) if err != nil { return err } for _, h := range res.Hits { - _, err := b.updateEntity(h.ID, func(r *Resource) { + _, err := b.updateEntity(bleveIndex, h.ID, func(r *Resource) { r.Path = strings.Replace(r.Path, currentPath, nextPath, 1) }) if err != nil { @@ -289,29 +306,53 @@ func (b *Bleve) Move(id string, parentid string, target string) error { // instead of removing the resource it just marks it as deleted! // can be undone func (b *Bleve) Delete(id string) error { - return b.setDeleted(id, true) + bleveIndex, closeFn, err := b.indexGetter.GetIndex() + if err != nil { + return err + } + defer closeFn() + + return b.setDeleted(bleveIndex, id, true) } // Restore is the counterpart to Delete. // It restores the resource which makes it available again. func (b *Bleve) Restore(id string) error { - return b.setDeleted(id, false) + bleveIndex, closeFn, err := b.indexGetter.GetIndex() + if err != nil { + return err + } + defer closeFn() + + return b.setDeleted(bleveIndex, id, false) } // Purge removes a resource from the index, irreversible operation. func (b *Bleve) Purge(id string) error { - return b.index.Delete(id) + bleveIndex, closeFn, err := b.indexGetter.GetIndex() + if err != nil { + return err + } + defer closeFn() + + return bleveIndex.Delete(id) } // DocCount returns the number of resources in the index. func (b *Bleve) DocCount() (uint64, error) { - return b.index.DocCount() + bleveIndex, closeFn, err := b.indexGetter.GetIndex(bleveEngine.ReadOnly(true)) + if err != nil { + return 0, err + } + defer closeFn() + + return bleveIndex.DocCount() } -func (b *Bleve) getResource(id string) (*Resource, error) { +func (b *Bleve) getResource(bleveIndex bleve.Index, id string) (*Resource, error) { req := bleve.NewSearchRequest(bleve.NewDocIDQuery([]string{id})) req.Fields = []string{"*"} - res, err := b.index.Search(req) + res, err := bleveIndex.Search(req) if err != nil { return nil, err } @@ -446,19 +487,19 @@ func getPhotoValue[T any](fields map[string]interface{}) *T { return nil } -func (b *Bleve) updateEntity(id string, mutateFunc func(r *Resource)) (*Resource, error) { - it, err := b.getResource(id) +func (b *Bleve) updateEntity(bleveIndex bleve.Index, id string, mutateFunc func(r *Resource)) (*Resource, error) { + it, err := b.getResource(bleveIndex, id) if err != nil { return nil, err } mutateFunc(it) - return it, b.index.Index(it.ID, it) + return it, bleveIndex.Index(it.ID, it) } -func (b *Bleve) setDeleted(id string, deleted bool) error { - it, err := b.updateEntity(id, func(r *Resource) { +func (b *Bleve) setDeleted(bleveIndex bleve.Index, id string, deleted bool) error { + it, err := b.updateEntity(bleveIndex, id, func(r *Resource) { r.Deleted = deleted }) if err != nil { @@ -473,13 +514,13 @@ func (b *Bleve) setDeleted(id string, deleted bool) error { bleveReq := bleve.NewSearchRequest(q) bleveReq.Size = math.MaxInt bleveReq.Fields = []string{"*"} - res, err := b.index.Search(bleveReq) + res, err := bleveIndex.Search(bleveReq) if err != nil { return err } for _, h := range res.Hits { - _, err := b.updateEntity(h.ID, func(r *Resource) { + _, err := b.updateEntity(bleveIndex, h.ID, func(r *Resource) { r.Deleted = deleted }) if err != nil { diff --git a/services/search/pkg/engine/bleve/index.go b/services/search/pkg/engine/bleve/index.go new file mode 100644 index 00000000000..9685b7dd1ab --- /dev/null +++ b/services/search/pkg/engine/bleve/index.go @@ -0,0 +1,160 @@ +package bleve + +import ( + "errors" + "path/filepath" + + "github.com/blevesearch/bleve/v2" + "github.com/blevesearch/bleve/v2/mapping" +) + +// IndexGetter is an interface that provides a way to get an index. +// Implementations might differ in how the index is created and how the +// index is gotten (reused, created on the fly, etc). +// +// The GetIndex method returns a function that must be called to close the index. +// Some implementations might require the index to be kept opened, meaning +// the index should be closed only when the application is shutting down. In +// this case, the returned function to close the index should do nothing (not +// closing the index). If the index can be closed and reopened safely at any +// time, the returned function should close the index. +// Calling the returned function to close the index is fine regardless of the +// implementation, and it will act as a no-op if the index should be kept opened. +type IndexGetter interface { + GetIndex(opts ...GetIndexOption) (bleve.Index, func(), error) +} + +// IndexGetterMemory is an implementation of IndexGetter that uses an in-memory +// index. The implementation caches the index and returns the same index every +// time GetIndex is called. +// The data won't be persisted between runs, and closing the index will wipe +// the data. +// The close function returned by GetIndex won't do anything. The index should +// be kept opened until the application is shutting down. +// This is useful for testing and small datasets. +type IndexGetterMemory struct { + mapping mapping.IndexMapping + index bleve.Index +} + +// NewIndexGetterMemory creates a new IndexGetterMemory. This implementation +// creates a new in-memory index every time GetIndex is called. As such, the +// index must be kept opened. Closing the index will result in wiping the +// data. +func NewIndexGetterMemory(mapping mapping.IndexMapping) *IndexGetterMemory { + return &IndexGetterMemory{ + mapping: mapping, + } +} + +// GetIndex creates a new in-memory index every time it is called. +// The options are ignored in this implementation. +func (i *IndexGetterMemory) GetIndex(_ ...GetIndexOption) (bleve.Index, func(), error) { + closeFn := func() {} // no-op + if i.index != nil { + return i.index, closeFn, nil + } + + index, err := bleve.NewMemOnly(i.mapping) + if err != nil { + return nil, closeFn, err + } + + i.index = index + return i.index, closeFn, nil +} + +// IndexGetterPersistent is an implementation of IndexGetter that persists the +// index on the filesystem. The implementation caches the index and returns the +// same index every time GetIndex is called. +// The close function returned by GetIndex won't do anything. The index should +// be kept opened until the application is shutting down. +type IndexGetterPersistent struct { + rootDir string + mapping mapping.IndexMapping + index bleve.Index +} + +// NewIndexGetterPersistent creates a new IndexGetterPersistent. The index +// will be persisted on the FS. If the index does not exist, it will be +// created. If the index exists, it will be opened. +// +// The index will be cached and reused every time GetIndex is called. You +// should not close the index unless you are shutting down the application. +func NewIndexGetterPersistent(rootDir string, mapping mapping.IndexMapping) *IndexGetterPersistent { + return &IndexGetterPersistent{ + rootDir: rootDir, + mapping: mapping, + } +} + +// GetIndex returns the cached index. The options are ignored in this +// implementation. +func (i *IndexGetterPersistent) GetIndex(_ ...GetIndexOption) (bleve.Index, func(), error) { + closeFn := func() {} // no-op + if i.index != nil { + return i.index, closeFn, nil + } + + destination := filepath.Join(i.rootDir, "bleve") + index, err := bleve.Open(destination) + if errors.Is(bleve.ErrorIndexPathDoesNotExist, err) { + index, err = bleve.New(destination, i.mapping) + if err != nil { + return nil, closeFn, err + } + } else if err != nil { + return nil, closeFn, err + } + + i.index = index + return i.index, closeFn, nil +} + +// IndexGetterPersistentScale is an implementation of IndexGetter that persists +// the index on the filesystem. The implementation does not cache the index and +// creates a new connection to the index every time GetIndex is called. +// The close function returned by GetIndex must be called to close the index, as +// soon as you the operations on the index are done. +type IndexGetterPersistentScale struct { + rootDir string + mapping mapping.IndexMapping +} + +// NewIndexGetterPersistentScale creates a new IndexGetterPersistentScale. +// The index will be persisted on the FS. If the index does not exist, it will +// be created. If the index exists, it will be opened. +// The GetIndex method will create a new connection to the index every time +// it is called. That connection must be closed after use. +func NewIndexGetterPersistentScale(rootDir string, mapping mapping.IndexMapping) *IndexGetterPersistentScale { + return &IndexGetterPersistentScale{ + rootDir: rootDir, + mapping: mapping, + } +} + +// GetIndex creates a new connection to the index every time it is called. +// You can use the ReadOnly option to open the index in read-only mode. This +// allow read-only operations to be performed in parallel. +// In order to avoid blocking write operations, you should close the index +// as soon as you are done with it. +func (i *IndexGetterPersistentScale) GetIndex(opts ...GetIndexOption) (bleve.Index, func(), error) { + options := newGetIndexOptions(opts...) + destination := filepath.Join(i.rootDir, "bleve") + params := map[string]interface{}{ + "read_only": options.ReadOnly, + } + + closeFn := func() {} // no-op + index, err := bleve.OpenUsing(destination, params) + if errors.Is(bleve.ErrorIndexPathDoesNotExist, err) { + index, err = bleve.New(destination, i.mapping) + if err != nil { + return nil, closeFn, err + } + } else if err != nil { + return nil, closeFn, err + } + + return index, func() { index.Close() }, nil +} diff --git a/services/search/pkg/engine/bleve/option.go b/services/search/pkg/engine/bleve/option.go new file mode 100644 index 00000000000..b22302548f6 --- /dev/null +++ b/services/search/pkg/engine/bleve/option.go @@ -0,0 +1,27 @@ +package bleve + +// GetIndexOption is a function that sets some option for the GetIndex method. +type GetIndexOption func(o *GetIndexOptions) + +// GetIndexOptions contains the options for the GetIndex method. +type GetIndexOptions struct { + ReadOnly bool +} + +// ReadOnly is an option to opens the index in read-only mode. +// This option should allow running multiple read-only operations in parallel. +// The behavior of write operations is not defined when this option is used. +func ReadOnly(b bool) GetIndexOption { + return func(o *GetIndexOptions) { + o.ReadOnly = b + } +} + +// newGetIndexOptions creates a new GetIndexOptions with the given options. +func newGetIndexOptions(opts ...GetIndexOption) GetIndexOptions { + o := GetIndexOptions{} + for _, opt := range opts { + opt(&o) + } + return o +} diff --git a/services/search/pkg/engine/bleve_test.go b/services/search/pkg/engine/bleve_test.go index 7209487a5d0..937901a0f8b 100644 --- a/services/search/pkg/engine/bleve_test.go +++ b/services/search/pkg/engine/bleve_test.go @@ -15,6 +15,7 @@ import ( searchsvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/search/v0" "github.com/owncloud/ocis/v2/services/search/pkg/content" "github.com/owncloud/ocis/v2/services/search/pkg/engine" + bleveEngine "github.com/owncloud/ocis/v2/services/search/pkg/engine/bleve" "github.com/owncloud/ocis/v2/services/search/pkg/query/bleve" ) @@ -59,10 +60,12 @@ var _ = Describe("Bleve", func() { mapping, err := engine.BuildBleveMapping() Expect(err).ToNot(HaveOccurred()) - idx, err = bleveSearch.NewMemOnly(mapping) + indexGetter := bleveEngine.NewIndexGetterMemory(mapping) + + idx, _, err = indexGetter.GetIndex() // IndexGetterMemory ignores closeFn Expect(err).ToNot(HaveOccurred()) - eng = engine.NewBleveEngine(idx, bleve.DefaultCreator) + eng = engine.NewBleveEngine(indexGetter, bleve.DefaultCreator) Expect(err).ToNot(HaveOccurred()) rootResource = engine.Resource{ @@ -91,13 +94,6 @@ var _ = Describe("Bleve", func() { } }) - Describe("New", func() { - It("returns a new index instance", func() { - b := engine.NewBleveEngine(idx, bleve.DefaultCreator) - Expect(b).ToNot(BeNil()) - }) - }) - Describe("Search", func() { Context("by other fields than filename", func() { It("finds files by tags", func() { diff --git a/services/search/pkg/service/grpc/v0/service.go b/services/search/pkg/service/grpc/v0/service.go index 82dfd3249dc..e2f901d1b93 100644 --- a/services/search/pkg/service/grpc/v0/service.go +++ b/services/search/pkg/service/grpc/v0/service.go @@ -30,6 +30,7 @@ import ( "github.com/owncloud/ocis/v2/services/search/pkg/config" "github.com/owncloud/ocis/v2/services/search/pkg/content" "github.com/owncloud/ocis/v2/services/search/pkg/engine" + bleveEngine "github.com/owncloud/ocis/v2/services/search/pkg/engine/bleve" "github.com/owncloud/ocis/v2/services/search/pkg/query/bleve" "github.com/owncloud/ocis/v2/services/search/pkg/search" ) @@ -45,16 +46,24 @@ func NewHandler(opts ...Option) (searchsvc.SearchProviderHandler, func(), error) var eng engine.Engine switch cfg.Engine.Type { case "bleve": - idx, err := engine.NewBleveIndex(cfg.Engine.Bleve.Datapath) + bleveMapping, err := engine.BuildBleveMapping() if err != nil { return nil, teardown, err } + var indexGetter bleveEngine.IndexGetter + indexGetter = bleveEngine.NewIndexGetterPersistent(cfg.Engine.Bleve.Datapath, bleveMapping) + if cfg.Engine.Bleve.Scale { + indexGetter = bleveEngine.NewIndexGetterPersistentScale(cfg.Engine.Bleve.Datapath, bleveMapping) + } + + bleveEngine := engine.NewBleveEngine(indexGetter, bleve.DefaultCreator) + teardown = func() { - _ = idx.Close() + _ = bleveEngine.Close() } + eng = bleveEngine - eng = engine.NewBleveEngine(idx, bleve.DefaultCreator) default: return nil, teardown, fmt.Errorf("unknown search engine: %s", cfg.Engine.Type) }