Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 49 additions & 4 deletions opengin/contracts/rest/read_api.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ paths:
/entities/search:
post:
summary: Get entities based on id or filter criteria
description:
This endpoint allows you to search for entities using either a specific `id` or a set of filter criteria.
description: This endpoint allows you to search for entities using either a specific `id` or a set of filter criteria.

requestBody:
required: true
Expand Down Expand Up @@ -125,7 +124,7 @@ paths:
Structure may vary based on entity type.

/entities/{entityId}/attributes/{attributeName}:
get:
post:
summary: Get entity attribute
parameters:
- name: entityId
Expand Down Expand Up @@ -159,6 +158,53 @@ paths:
items:
type: string
default: ["*"]
requestBody:
description: |
This request body allows you to filter the attribute values based on row-level criteria.
You can specify multiple filters in the `records` array to refine your search.
Each filter consists of:
- `field_name`: The name of the field to filter on.
- `operator`: The comparison operator to use.
- `value`: The value to compare against.

Accepted operators:
- `eq`: Equal to (default)
- `neq`: Not equal to
- `gt`: Greater than
- `lt`: Less than
- `gte`: Greater than or equal to
- `lte`: Less than or equal to
- `contains`: Contains the specified string
- `notcontains`: Does not contain the specified string
required: false
content:
application/json:
schema:
type: object
properties:
records:
type: array
items:
type: object
properties:
field_name:
type: string
operator:
type: string
description: "The comparison operator to use."
enum:
- "eq"
- "neq"
- "gt"
- "lt"
- "gte"
- "lte"
- "contains"
- "notcontains"
default: "eq"
value:
type: string
description: "List of record filters to apply row base filtering"
responses:
200:
description: Attribute value(s)
Expand Down Expand Up @@ -273,4 +319,3 @@ paths:
format: date-time
direction:
type: string

3 changes: 2 additions & 1 deletion opengin/core-api/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ cp env.template .env
# after updating the required fields to be added to the environment
# (you can find the example env configurations here)
source .env
go test -v ./...
# make sure to clean the dbs before running the tests
go test -v ./... -count=1 -p=1
./core-service


Expand Down
83 changes: 77 additions & 6 deletions opengin/core-api/cmd/server/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,11 +182,17 @@ func (s *Server) ReadEntity(ctx context.Context, req *pb.ReadEntityRequest) (*pb
// Use the EntityAttributeProcessor to read and process attributes
processor := engine.NewEntityAttributeProcessor()

// Extract fields from the request attributes based on storage type
fields := extractFieldsFromAttributes(req.Entity.Attributes)
// Extract fields and record filters from the request attributes based on storage type
fields, recordFilters := extractFieldsFromAttributes(req.Entity.Attributes)
log.Printf("Extracted fields from attributes: %v", fields)

readOptions := engine.NewReadOptions(make(map[string]interface{}), fields...)
// Convert record filters to the filters map for ReadOptions
filtersMap := make(map[string]interface{})
if len(recordFilters) > 0 {
filtersMap["record_filters"] = recordFilters
}

readOptions := engine.NewReadOptions(filtersMap, fields...)

// Process the entity with attributes to get the results map
attributeResults := processor.ProcessEntityAttributes(ctx, req.Entity, "read", readOptions)
Expand Down Expand Up @@ -399,8 +405,9 @@ func (s *Server) ReadEntities(ctx context.Context, req *pb.ReadEntityRequest) (*
// extractFieldsFromAttributes extracts field names from entity attributes based on storage type
// TODO: Limitation in multi-value attribute reads.
// FIXME: https://github.com/LDFLK/nexoan/issues/285
func extractFieldsFromAttributes(attributes map[string]*pb.TimeBasedValueList) []string {
func extractFieldsFromAttributes(attributes map[string]*pb.TimeBasedValueList) ([]string, []postgres.RecordFilter) {
var fields []string
var record_filters []postgres.RecordFilter

for attrName, attrValueList := range attributes {
if attrValueList == nil || len(attrValueList.Values) == 0 {
Expand Down Expand Up @@ -428,6 +435,12 @@ func extractFieldsFromAttributes(attributes map[string]*pb.TimeBasedValueList) [
} else {
log.Printf("Warning: could not extract columns from tabular attribute %s: %v", attrName, err)
}

if filters, err := extractRecordFiltersFromTabularAttributes(value.Value); err == nil {
record_filters = append(record_filters, filters...)
} else {
log.Printf("Warning: could not extract rows from tabular attribute %s: %v", attrName, err)
}
case "graph":
// TODO: Handle graph data fields
log.Printf("Graph data fields extraction not implemented yet for attribute %s", attrName)
Expand All @@ -438,8 +451,7 @@ func extractFieldsFromAttributes(attributes map[string]*pb.TimeBasedValueList) [
log.Printf("Unknown storage type %s for attribute %s", storageType, attrName)
}
}

return fields
return fields, record_filters
}

// determineStorageTypeFromValue determines the storage type from a protobuf Any value
Expand Down Expand Up @@ -509,6 +521,65 @@ func extractColumnsFromTabularAttribute(anyValue *anypb.Any) ([]string, error) {
return columns, nil
}

// extractRecordFiltersFromTabularAttributes extracts row filters from a tabular attribute value
func extractRecordFiltersFromTabularAttributes(anyValue *anypb.Any) ([]postgres.RecordFilter, error) {
message, err := anyValue.UnmarshalNew()
if err != nil {
return nil, fmt.Errorf("failed to unmarshal Any value: %v", err)
}

structValue, ok := message.(*structpb.Struct)
if !ok {
return nil, fmt.Errorf("expected struct value")
}

rowsField, exists := structValue.Fields["rows"]
if !exists {
return nil, fmt.Errorf("no rows found")
}

rowFieldValue := rowsField.GetListValue()

if rowFieldValue == nil {
return nil, fmt.Errorf("rows field is not a list")
}

var filters []postgres.RecordFilter
for _, val := range rowFieldValue.Values {
datum := val.GetStructValue()
if datum == nil {
continue
}

filter := postgres.RecordFilter{}
fieldName, ok := datum.Fields["field_name"]
if !ok || fieldName.GetStringValue() == "" {
log.Printf("Warning: skipping record filter, missing or empty 'field_name'")
continue
}

operator, ok := datum.Fields["operator"]
if !ok || operator.GetStringValue() == "" {
log.Printf("Warning: skipping record filter, missing or empty 'operator'")
continue
}

value, ok := datum.Fields["value"]
if !ok || value.GetStringValue() == "" {
log.Printf("Warning: skipping record filter, missing or empty 'value'")
continue
}

filter.FieldName = fieldName.GetStringValue()
filter.Operator = operator.GetStringValue()
filter.Value = value.GetStringValue()

filters = append(filters, filter)
}

return filters, nil
}

// Start the gRPC server
func main() {
// Initialize MongoDB config
Expand Down
46 changes: 46 additions & 0 deletions opengin/core-api/db/repository/postgres/data_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -638,6 +638,13 @@ type TabularData struct {
Rows [][]interface{} `json:"rows"`
}

// RecordFilter represents a filter applied on the values in a particular column of the table
type RecordFilter struct {
FieldName string `json:"field_name"`
Operator string `json:"operator"` //accepted operators: 'eq','neq','gt','lt','gte','lte','contains','notcontains'
Value string `json:"value"`
}

// GetData retrieves data from a table with optional field selection and filters, returns it as pb.Any with JSON-formatted tabular data.
func (repo *PostgresRepository) GetData(ctx context.Context, tableName string, filters map[string]interface{}, fields ...string) (*anypb.Any, error) {
log.Printf("DEBUG: GetData: tableName=%s, \t\nfilters=%v, \t\nfields=%v", tableName, filters, fields)
Expand Down Expand Up @@ -668,6 +675,44 @@ func (repo *PostgresRepository) GetData(ctx context.Context, tableName string, f

// Add filters to the query
for key, value := range filters {
// Handle RecordFilter slices (operator-based row filtering)
if key == "record_filters" {
if recordFilters, ok := value.([]RecordFilter); ok {
for _, rf := range recordFilters {
sqlOp := "="
switch rf.Operator {
case "eq":
sqlOp = "="
case "neq":
sqlOp = "!="
case "gt":
sqlOp = ">"
case "lt":
sqlOp = "<"
case "lte":
sqlOp = "<="
case "gte":
sqlOp = ">="
case "contains":
sqlOp = "ILIKE"
case "notcontains":
sqlOp = "NOT ILIKE"
}

if rf.Operator == "contains" || rf.Operator == "notcontains" {
whereClauses = append(whereClauses, fmt.Sprintf("%s %s $%d", commons.SanitizeIdentifier(rf.FieldName), sqlOp, argCount))
args = append(args, "%"+rf.Value+"%")
} else {
whereClauses = append(whereClauses, fmt.Sprintf("%s %s $%d", commons.SanitizeIdentifier(rf.FieldName), sqlOp, argCount))
args = append(args, rf.Value)
}
argCount++
}
}
continue
}

// Default: simple equality filter
whereClauses = append(whereClauses, fmt.Sprintf("%s = $%d", commons.SanitizeIdentifier(key), argCount))
args = append(args, value)
argCount++
Expand Down Expand Up @@ -726,6 +771,7 @@ func (repo *PostgresRepository) GetData(ctx context.Context, tableName string, f
for rows.Next() {
rowValues := make([]interface{}, len(resultColumns))
rowPointers := make([]interface{}, len(resultColumns))

for i := range rowValues {
rowPointers[i] = &rowValues[i]
}
Expand Down
Loading
Loading