Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ see the [quick start](#quick-start).
- [Unit Tests](#unit-tests)
- [Compliance Tests](#compliance-tests)
- [Fuzzing Tests](#fuzzing-tests)
- [Generating Mock Files Using Mockery](./mocks/README.md)
- [Connection Methods](#connection-methods)
- [Plain TCP Connection](#plain-tcp-connection)
- [Secured TCP Connection (TLS)](#secured-tcp-connection-tls)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"encoding/hex"
"fmt"
"github.com/GoogleCloudPlatform/cloud-bigtable-ecosystem/cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/global/types"
"go.opentelemetry.io/otel/trace"
"strings"
"time"

Expand All @@ -31,7 +32,6 @@ import (

btpb "cloud.google.com/go/bigtable/apiv2/bigtablepb"
"github.com/GoogleCloudPlatform/cloud-bigtable-ecosystem/cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/metadata"
otelgo "github.com/GoogleCloudPlatform/cloud-bigtable-ecosystem/cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/otel"
"github.com/datastax/go-cassandra-native-protocol/message"
"go.uber.org/zap"
)
Expand All @@ -56,17 +56,17 @@ func NewBigtableClient(clients *types.BigtableClientManager, logger *zap.Logger,
func (btc *BigtableAdapter) Execute(ctx context.Context, query types.IExecutableQuery) (message.Message, error) {
switch q := query.(type) {
case *types.BoundDeleteQuery:
return btc.DeleteRow(ctx, q)
return btc.deleteRow(ctx, q)
case *types.BigtableWriteMutation:
return btc.mutateRow(ctx, q)
case *types.ExecutableSelectQuery:
return btc.ExecutePreparedStatement(ctx, q)
return btc.executePreparedStatement(ctx, q)
case *types.CreateTableStatementMap:
return btc.schemaManager.CreateTable(ctx, q)
case *types.AlterTableStatementMap:
return btc.schemaManager.AlterTable(ctx, q)
case *types.TruncateTableStatementMap:
err := btc.DropAllRows(ctx, q)
err := btc.dropAllRows(ctx, q)
return emptyRowsResult(), err
case *types.DropTableQuery:
return btc.schemaManager.DropTable(ctx, q)
Expand All @@ -88,11 +88,9 @@ func (btc *BigtableAdapter) Execute(ctx context.Context, query types.IExecutable
// Returns:
// - error: Error if the mutation fails.
func (btc *BigtableAdapter) mutateRow(ctx context.Context, input *types.BigtableWriteMutation) (message.Message, error) {
otelgo.AddAnnotation(ctx, applyingBigtableMutation)
span := trace.SpanFromContext(ctx)
mut := bigtable.NewMutation()

btc.Logger.Info("mutating row", zap.String("key", hex.EncodeToString([]byte(input.RowKey()))))

client, err := btc.clients.GetClient(input.Keyspace())
if err != nil {
return nil, err
Expand All @@ -119,7 +117,7 @@ func (btc *BigtableAdapter) mutateRow(ctx context.Context, input *types.Bigtable
}

err := tbl.Apply(ctx, string(input.RowKey()), conditionalMutation, bigtable.GetCondMutationResult(&matched))
otelgo.AddAnnotation(ctx, bigtableMutationApplied)
span.AddEvent(bigtableMutationApplied)
if err != nil {
return nil, err
}
Expand All @@ -129,7 +127,7 @@ func (btc *BigtableAdapter) mutateRow(ctx context.Context, input *types.Bigtable

// If no conditions, apply the mutation directly
err = tbl.Apply(ctx, string(input.RowKey()), mut)
otelgo.AddAnnotation(ctx, bigtableMutationApplied)
span.AddEvent(bigtableMutationApplied)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -185,13 +183,13 @@ func (btc *BigtableAdapter) buildMutation(ctx context.Context, table *bigtable.T
return nil
}

func (btc *BigtableAdapter) DropAllRows(ctx context.Context, data *types.TruncateTableStatementMap) error {
func (btc *BigtableAdapter) dropAllRows(ctx context.Context, data *types.TruncateTableStatementMap) error {
_, err := btc.schemaManager.Schemas().GetTableSchema(data.Keyspace(), data.Table())
if err != nil {
return err
}

// performance optimization because DropAllRows can be slow
// performance optimization because dropAllRows can be slow
hasRows, err := btc.hasAnyRows(ctx, data.Keyspace(), data.Table())
if err != nil {
return err
Expand Down Expand Up @@ -263,20 +261,21 @@ func (btc *BigtableAdapter) InsertRow(ctx context.Context, input *types.Bigtable
return btc.mutateRow(ctx, input)
}

// UpdateRow - Updates a row in the specified bigtable table.
// updateRow - Updates a row in the specified bigtable table.
//
// Parameters:
// - ctx: Context for the operation, used for cancellation and deadlines.
// - data: PreparedUpdateQuery object containing the table, row key, columns, values, and DeleteColumnFamilies.
//
// Returns:
// - error: Error if the update fails.
func (btc *BigtableAdapter) UpdateRow(ctx context.Context, input *types.BigtableWriteMutation) (message.Message, error) {
func (btc *BigtableAdapter) updateRow(ctx context.Context, input *types.BigtableWriteMutation) (message.Message, error) {
return btc.mutateRow(ctx, input)
}

func (btc *BigtableAdapter) DeleteRow(ctx context.Context, deleteQueryData *types.BoundDeleteQuery) (message.Message, error) {
otelgo.AddAnnotation(ctx, applyingDeleteMutation)
func (btc *BigtableAdapter) deleteRow(ctx context.Context, deleteQueryData *types.BoundDeleteQuery) (message.Message, error) {
span := trace.SpanFromContext(ctx)
span.AddEvent(applyingDeleteMutation)
client, err := btc.clients.GetClient(deleteQueryData.Keyspace())
if err != nil {
return nil, err
Expand Down Expand Up @@ -308,7 +307,7 @@ func (btc *BigtableAdapter) DeleteRow(ctx context.Context, deleteQueryData *type
return nil, err
}
}
otelgo.AddAnnotation(ctx, deleteMutationApplied)
span.AddEvent(deleteMutationApplied)
return &message.VoidResult{}, nil
}

Expand Down Expand Up @@ -345,6 +344,7 @@ func (btc *BigtableAdapter) buildDeleteMutation(ctx context.Context, table *bigt
// - BulkOperationResponse: Response indicating the result of the bulk operation.
// - error: Error if the bulk mutation fails.
func (btc *BigtableAdapter) ApplyBulkMutation(ctx context.Context, keyspace types.Keyspace, tableName types.TableName, mutationData []types.IBigtableMutation) (BulkOperationResponse, error) {
span := trace.SpanFromContext(ctx)
client, err := btc.clients.GetClient(keyspace)
if err != nil {
return BulkOperationResponse{
Expand Down Expand Up @@ -385,9 +385,10 @@ func (btc *BigtableAdapter) ApplyBulkMutation(ctx context.Context, keyspace type
}, err
}
default:
err := fmt.Errorf("unhandled bulk mutation type %T", md)
return BulkOperationResponse{
FailedRows: fmt.Sprintf("All Rows are failed because: unsupported bulk operation: %T", v),
}, fmt.Errorf("unhandled bulk mutation type %T", md)
}, err
}
}
// create mutations from mutation data
Expand All @@ -398,7 +399,7 @@ func (btc *BigtableAdapter) ApplyBulkMutation(ctx context.Context, keyspace type
mutations = append(mutations, mutation)
rowKeys = append(rowKeys, string(key))
}
otelgo.AddAnnotation(ctx, applyingBulkMutation)
span.AddEvent(applyingBulkMutation)

errs, err := table.ApplyBulk(ctx, rowKeys, mutations)
if err != nil {
Expand All @@ -423,7 +424,7 @@ func (btc *BigtableAdapter) ApplyBulkMutation(ctx context.Context, keyspace type
FailedRows: "",
}
}
otelgo.AddAnnotation(ctx, bulkMutationApplied)
span.AddEvent(bulkMutationApplied)
return res, nil
}

Expand Down Expand Up @@ -510,8 +511,8 @@ func (btc *BigtableAdapter) PrepareStatement(ctx context.Context, query types.IP
return nil, nil
}

selectQuery, ok := query.(*types.PreparedSelectQuery)
if !ok {
selectQuery, isType := query.(*types.PreparedSelectQuery)
if !isType {
// only select queries can be prepared in Bigtable at this time
return nil, nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"github.com/GoogleCloudPlatform/cloud-bigtable-ecosystem/cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/global/constants"
"github.com/GoogleCloudPlatform/cloud-bigtable-ecosystem/cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/global/types"
"github.com/GoogleCloudPlatform/cloud-bigtable-ecosystem/cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/metadata"
"github.com/GoogleCloudPlatform/cloud-bigtable-ecosystem/cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/responsehandler"
"github.com/GoogleCloudPlatform/cloud-bigtable-ecosystem/cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/third_party/datastax/proxycore"
"github.com/GoogleCloudPlatform/cloud-bigtable-ecosystem/cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/utilities"
Expand All @@ -32,7 +33,7 @@ import (
"time"
)

// ExecutePreparedStatement - Executes a prepared statement on Bigtable and returns the result.
// executePreparedStatement - Executes a prepared statement on Bigtable and returns the result.
// Parameters:
// - ctx: Context for the operation, used for cancellation and deadlines.
// - query: rh.QueryMetadata containing the query and parameters.
Expand All @@ -42,7 +43,7 @@ import (
// - *message.RowsResult: The result of the select statement.
// - time.Duration: The total elapsed time for the operation.
// - error: Error if the statement preparation or execution fails.
func (btc *BigtableAdapter) ExecutePreparedStatement(ctx context.Context, query *types.ExecutableSelectQuery) (*message.RowsResult, error) {
func (btc *BigtableAdapter) executePreparedStatement(ctx context.Context, query *types.ExecutableSelectQuery) (*message.RowsResult, error) {
if query.CachedBTPrepare == nil {
return nil, fmt.Errorf("cannot execute select query because prepared bigtable query is nil")
}
Expand All @@ -59,14 +60,19 @@ func (btc *BigtableAdapter) ExecutePreparedStatement(ctx context.Context, query
return nil, fmt.Errorf("failed to bind parameters: %w", err)
}

table, err := btc.schemaManager.Schemas().GetTableSchema(query.Keyspace(), query.Table())
if err != nil {
return nil, err
}

var processingErr error
var rows []types.GoRow
executeErr := boundStmt.Execute(ctx, func(resultRow bigtable.ResultRow) bool {
r, convertErr := btc.convertResultRow(resultRow, query) // Call the implemented helper
r, convertErr := btc.convertResultRow(resultRow, query, table)
if convertErr != nil {
btc.Logger.Error("Failed to convert result row", zap.Error(convertErr), zap.String("btql", query.TranslatedQuery))
processingErr = convertErr // Capture the error
return false // Stop execution
processingErr = convertErr
return false // Stop execution
}
rows = append(rows, r)
return true // Continue processing
Expand All @@ -82,12 +88,7 @@ func (btc *BigtableAdapter) ExecutePreparedStatement(ctx context.Context, query
return responsehandler.BuildRowsResultResponse(query, rows, query.ProtocolVersion)
}

func (btc *BigtableAdapter) convertResultRow(resultRow bigtable.ResultRow, query *types.ExecutableSelectQuery) (types.GoRow, error) {
table, err := btc.schemaManager.Schemas().GetTableSchema(query.Keyspace(), query.Table())
if err != nil {
return nil, err
}

func (btc *BigtableAdapter) convertResultRow(resultRow bigtable.ResultRow, query *types.ExecutableSelectQuery, table *metadata.TableSchema) (types.GoRow, error) {
result := make(types.GoRow)
for i, colMeta := range resultRow.Metadata.Columns {
var val any
Expand Down Expand Up @@ -140,10 +141,6 @@ func (btc *BigtableAdapter) convertResultRow(resultRow bigtable.ResultRow, query
return nil, fmt.Errorf("result already set for column `%s`", key)
}

if key == "list_text" {
btc.Logger.Log(zap.InfoLevel, "list_text", zap.Any("value", val))
}

goValue, err := rowValueToGoValue(val, expectedType)
if err != nil {
return nil, fmt.Errorf("failed to convert result for '%s': %w", key, err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func TestDeleteRow(t *testing.T) {
require.NoError(t, err)

deleteQuery := types.NewBoundDeleteQuery(keyspace, tableName, "", rowKey, false, nil)
_, err = btc.DeleteRow(t.Context(), deleteQuery)
_, err = btc.deleteRow(t.Context(), deleteQuery)
require.NoError(t, err)

// Verify deletion
Expand Down Expand Up @@ -236,7 +236,7 @@ func TestMutateRowDeleteColumnFamily(t *testing.T) {
// Delete cf2
updateData := types.NewBigtableWriteMutation(keyspace, tableName, "", types.IfSpec{}, types.QueryTypeUpdate, key)
updateData.AddMutations(types.NewDeleteCellsOp("tags"))
_, err = btc.UpdateRow(t.Context(), updateData)
_, err = btc.updateRow(t.Context(), updateData)
require.NoError(t, err)

// Verify deletion by reading the row
Expand All @@ -259,7 +259,7 @@ func TestMutateRowDeleteQualifiers(t *testing.T) {
// Delete col1
updateData := types.NewBigtableWriteMutation(keyspace, tableName, "", types.IfSpec{}, types.QueryTypeUpdate, key)
updateData.AddMutations(types.NewDeleteColumnOp(types.BigtableColumn{Family: "cf1", Column: "col1"}))
_, err = btc.UpdateRow(t.Context(), updateData)
_, err = btc.updateRow(t.Context(), updateData)
require.NoError(t, err)

// Verify deletion by reading the row
Expand Down Expand Up @@ -289,7 +289,7 @@ func TestMutateRowIfExists(t *testing.T) {
// Update the row when it exists
updateData := types.NewBigtableWriteMutation(keyspace, tableName, "", types.IfSpec{IfExists: true}, types.QueryTypeUpdate, key1)
updateData.AddMutations(types.NewWriteCellOp("cf1", "col1", []byte("v2")))
res, err := btc.UpdateRow(t.Context(), updateData)
res, err := btc.updateRow(t.Context(), updateData)
require.NoError(t, err)
assert.True(t, wasApplied(res))

Expand All @@ -305,7 +305,7 @@ func TestMutateRowIfExists(t *testing.T) {
// Attempt to update a non-existent row
updateDataNonExistent := types.NewBigtableWriteMutation(keyspace, tableName, "", types.IfSpec{IfExists: true}, types.QueryTypeUpdate, key2)
updateDataNonExistent.AddMutations(types.NewWriteCellOp("cf1", "col1", []byte("v2")))
res, err = btc.UpdateRow(t.Context(), updateDataNonExistent)
res, err = btc.updateRow(t.Context(), updateDataNonExistent)
require.NoError(t, err)
assert.False(t, wasApplied(res))

Expand Down Expand Up @@ -350,7 +350,7 @@ func TestMutateRowInvalidKeyspace(t *testing.T) {

updateData := types.NewBigtableWriteMutation("invalid-keyspace", "any-table", "", types.IfSpec{}, types.QueryTypeUpdate, "row1")
updateData.AddMutations(types.NewWriteCellOp("cf1", "col1", []byte("value")))
_, err := localBtc.UpdateRow(t.Context(), updateData)
_, err := localBtc.updateRow(t.Context(), updateData)
require.Error(t, err)
assert.Contains(t, err.Error(), "bigtable client not found for keyspace 'invalid-keyspace'")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,11 @@ import (
"github.com/GoogleCloudPlatform/cloud-bigtable-ecosystem/cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/global/types"
"github.com/GoogleCloudPlatform/cloud-bigtable-ecosystem/cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/mem_table"
schemaMapping "github.com/GoogleCloudPlatform/cloud-bigtable-ecosystem/cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/metadata"
otelgo "github.com/GoogleCloudPlatform/cloud-bigtable-ecosystem/cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/otel"
"github.com/datastax/go-cassandra-native-protocol/message"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"strings"
)
Expand All @@ -20,9 +24,10 @@ type IQueryExecutor interface {
type QueryExecutorManager struct {
logger *zap.Logger
executors []IQueryExecutor
trace trace.Tracer
}

func NewQueryExecutorManager(logger *zap.Logger, s *schemaMapping.SchemaMetadata, bt *bigtableModule.BigtableAdapter, systemTables *mem_table.InMemEngine) *QueryExecutorManager {
func NewQueryExecutorManager(logger *zap.Logger, s *schemaMapping.SchemaMetadata, bt *bigtableModule.BigtableAdapter, systemTables *mem_table.InMemEngine, otelInst *otelgo.OpenTelemetry) *QueryExecutorManager {
return &QueryExecutorManager{
logger: logger,
executors: []IQueryExecutor{
Expand All @@ -31,15 +36,41 @@ func NewQueryExecutorManager(logger *zap.Logger, s *schemaMapping.SchemaMetadata
newSelectSystemTableExecutor(s, systemTables),
newBigtableExecutor(bt),
},
trace: otel.GetTracerProvider().Tracer("executor"),
}
}

func (m *QueryExecutorManager) Execute(ctx context.Context, client types.ICassandraClient, q types.IExecutableQuery) (message.Message, error) {
func (m *QueryExecutorManager) getExecutor(q types.IExecutableQuery) (IQueryExecutor, error) {
for _, e := range m.executors {
if e.CanRun(q) {
m.logger.Debug("executing query", zap.String("cql", q.CqlQuery()), zap.String("btql", q.BigtableQuery()))
return e.Execute(ctx, client, q)
return e, nil
}
}
return nil, fmt.Errorf("no executor found for query %s on keyspace %s", strings.ToUpper(q.QueryType().String()), q.Keyspace())
}

func (m *QueryExecutorManager) Execute(ctx context.Context, client types.ICassandraClient, q types.IExecutableQuery) (message.Message, error) {
otelCtx, span := m.trace.Start(ctx, "execute")
defer span.End()

otelgo.AddQueryAnnotations(span, types.Attributes{
QueryType: q.QueryType(),
Keyspace: q.Keyspace(),
Table: q.Table(),
})

executor, err := m.getExecutor(q)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, err
}

msg, err := executor.Execute(otelCtx, client, q)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, err
}
return msg, nil
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package types

type Attributes struct {
Method string
Err error
QueryType QueryType
Keyspace Keyspace
Table TableName
Status string
}

func (a *Attributes) setQueryInfo(q IQuery) {
a.Keyspace = q.Keyspace()
a.Table = q.Table()
a.QueryType = q.QueryType()
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ type OtelConfig struct {
Endpoint string
}
Traces struct {
ProjectId string
Endpoint string
SamplingRatio float64
}
Expand Down
Loading