Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
163 changes: 58 additions & 105 deletions google/cloud/bigtable/row.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,16 +14,13 @@

"""User-friendly container for Google Cloud Bigtable Row."""


import struct

from google.cloud._helpers import _datetime_from_microseconds # type: ignore
from google.cloud._helpers import _microseconds_from_datetime # type: ignore
from google.cloud._helpers import _to_bytes # type: ignore
from google.cloud.bigtable_v2.types import data as data_v2_pb2

from google.cloud.bigtable.data import mutations
from google.cloud.bigtable.data import read_modify_write_rules as rmw_rules

_PACK_I64 = struct.Struct(">q").pack

MAX_MUTATIONS = 100000
"""The maximum number of mutations that a row can accumulate."""
Expand Down Expand Up @@ -157,26 +154,21 @@ def _set_cell(self, column_family_id, column, value, timestamp=None, state=None)
:param state: (Optional) The state that is passed along to
:meth:`_get_mutations`.
"""
column = _to_bytes(column)
if isinstance(value, int):
value = _PACK_I64(value)
value = _to_bytes(value)
if timestamp is None:
# Use -1 for current Bigtable server time.
timestamp_micros = -1
# Use current Bigtable server time.
timestamp_micros = mutations._SERVER_SIDE_TIMESTAMP
else:
timestamp_micros = _microseconds_from_datetime(timestamp)
# Truncate to millisecond granularity.
timestamp_micros -= timestamp_micros % 1000

mutation_val = data_v2_pb2.Mutation.SetCell(
family_name=column_family_id,
column_qualifier=column,
mutation = mutations.SetCell(
family=column_family_id,
qualifier=column,
new_value=value,
timestamp_micros=timestamp_micros,
value=value,
)
mutation_pb = data_v2_pb2.Mutation(set_cell=mutation_val)
self._get_mutations(state).append(mutation_pb)
self._get_mutations(state).append(mutation)

def _delete(self, state=None):
"""Helper for :meth:`delete`
Expand All @@ -191,9 +183,7 @@ def _delete(self, state=None):
:param state: (Optional) The state that is passed along to
:meth:`_get_mutations`.
"""
mutation_val = data_v2_pb2.Mutation.DeleteFromRow()
mutation_pb = data_v2_pb2.Mutation(delete_from_row=mutation_val)
self._get_mutations(state).append(mutation_pb)
self._get_mutations(state).append(mutations.DeleteAllFromRow())

def _delete_cells(self, column_family_id, columns, time_range=None, state=None):
"""Helper for :meth:`delete_cell` and :meth:`delete_cells`.
Expand All @@ -220,33 +210,32 @@ def _delete_cells(self, column_family_id, columns, time_range=None, state=None):
:param state: (Optional) The state that is passed along to
:meth:`_get_mutations`.
"""
mutations_list = self._get_mutations(state)
if columns is self.ALL_COLUMNS:
mutation_val = data_v2_pb2.Mutation.DeleteFromFamily(
family_name=column_family_id
self._get_mutations(state).append(
mutations.DeleteAllFromFamily(family_to_delete=column_family_id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it looks like you aren't using mutations_list, but it's still set at the top of the function (and then accessed at the bottom)

)
mutation_pb = data_v2_pb2.Mutation(delete_from_family=mutation_val)
mutations_list.append(mutation_pb)
else:
delete_kwargs = {}
if time_range is not None:
delete_kwargs["time_range"] = time_range._to_pb()
start_timestamp_micros = None
end_timestamp_micros = None
timestamps = time_range._to_dict() if time_range else {}
start_timestamp_micros = timestamps.get("start_timestamp_micros")
end_timestamp_micros = timestamps.get("end_timestamp_micros")

to_append = []
for column in columns:
column = _to_bytes(column)
# time_range will never change if present, but the rest of
# delete_kwargs will
delete_kwargs.update(
family_name=column_family_id, column_qualifier=column
to_append.append(
mutations.DeleteRangeFromColumn(
family=column_family_id,
qualifier=column,
start_timestamp_micros=start_timestamp_micros,
end_timestamp_micros=end_timestamp_micros,
)
)
mutation_val = data_v2_pb2.Mutation.DeleteFromColumn(**delete_kwargs)
mutation_pb = data_v2_pb2.Mutation(delete_from_column=mutation_val)
to_append.append(mutation_pb)

# We don't add the mutations until all columns have been
# processed without error.
mutations_list.extend(to_append)
self._get_mutations(state).extend(to_append)


class DirectRow(_SetDeleteRow):
Expand Down Expand Up @@ -284,7 +273,7 @@ class DirectRow(_SetDeleteRow):

def __init__(self, row_key, table=None):
super(DirectRow, self).__init__(row_key, table)
self._pb_mutations = []
self._mutations = []

def _get_mutations(self, state=None): # pylint: disable=unused-argument
"""Gets the list of mutations for a given state.
Expand All @@ -299,7 +288,12 @@ def _get_mutations(self, state=None): # pylint: disable=unused-argument
:rtype: list
:returns: The list to add new mutations to (for the current state).
"""
return self._pb_mutations
return self._mutations

def _get_mutation_pbs(self):
"""Gets the list of mutation protos."""

return [mut._to_pb() for mut in self._get_mutations()]

def get_mutations_size(self):
"""Gets the total mutations size for current row
Expand All @@ -313,7 +307,7 @@ def get_mutations_size(self):
"""

mutation_size = 0
for mutation in self._get_mutations():
for mutation in self._get_mutation_pbs():
mutation_size += mutation._pb.ByteSize()

return mutation_size
Expand Down Expand Up @@ -486,7 +480,7 @@ def clear(self):
:end-before: [END bigtable_api_row_clear]
:dedent: 4
"""
del self._pb_mutations[:]
del self._mutations[:]


class ConditionalRow(_SetDeleteRow):
Expand Down Expand Up @@ -597,17 +591,15 @@ def commit(self):
% (MAX_MUTATIONS, num_true_mutations, num_false_mutations)
)

data_client = self._table._instance._client.table_data_client
resp = data_client.check_and_mutate_row(
table_name=self._table.name,
table = self._table._table_impl
resp = table.check_and_mutate_row(
row_key=self._row_key,
predicate_filter=self._filter._to_pb(),
app_profile_id=self._table._app_profile_id,
true_mutations=true_mutations,
false_mutations=false_mutations,
predicate=self._filter,
true_case_mutations=true_mutations,
false_case_mutations=false_mutations,
)
self.clear()
return resp.predicate_matched
return resp

# pylint: disable=arguments-differ
def set_cell(self, column_family_id, column, value, timestamp=None, state=True):
Expand Down Expand Up @@ -797,7 +789,7 @@ class AppendRow(Row):

def __init__(self, row_key, table):
super(AppendRow, self).__init__(row_key, table)
self._rule_pb_list = []
self._rule_list = []

def clear(self):
"""Removes all currently accumulated modifications on current row.
Expand All @@ -809,7 +801,7 @@ def clear(self):
:end-before: [END bigtable_api_row_clear]
:dedent: 4
"""
del self._rule_pb_list[:]
del self._rule_list[:]

def append_cell_value(self, column_family_id, column, value):
"""Appends a value to an existing cell.
Expand Down Expand Up @@ -842,12 +834,11 @@ def append_cell_value(self, column_family_id, column, value):
the targeted cell is unset, it will be treated as
containing the empty string.
"""
column = _to_bytes(column)
value = _to_bytes(value)
rule_pb = data_v2_pb2.ReadModifyWriteRule(
family_name=column_family_id, column_qualifier=column, append_value=value
self._rule_list.append(
rmw_rules.AppendValueRule(
family=column_family_id, qualifier=column, append_value=value
)
)
self._rule_pb_list.append(rule_pb)

def increment_cell_value(self, column_family_id, column, int_value):
"""Increments a value in an existing cell.
Expand Down Expand Up @@ -886,13 +877,11 @@ def increment_cell_value(self, column_family_id, column, int_value):
big-endian signed integer), or the entire request
will fail.
"""
column = _to_bytes(column)
rule_pb = data_v2_pb2.ReadModifyWriteRule(
family_name=column_family_id,
column_qualifier=column,
increment_amount=int_value,
self._rule_list.append(
rmw_rules.IncrementRule(
family=column_family_id, qualifier=column, increment_amount=int_value
)
)
self._rule_pb_list.append(rule_pb)

def commit(self):
"""Makes a ``ReadModifyWriteRow`` API request.
Expand Down Expand Up @@ -925,7 +914,7 @@ def commit(self):
:raises: :class:`ValueError <exceptions.ValueError>` if the number of
mutations exceeds the :data:`MAX_MUTATIONS`.
"""
num_mutations = len(self._rule_pb_list)
num_mutations = len(self._rule_list)
if num_mutations == 0:
return {}
if num_mutations > MAX_MUTATIONS:
Expand All @@ -934,12 +923,10 @@ def commit(self):
"allowable %d." % (num_mutations, MAX_MUTATIONS)
)

data_client = self._table._instance._client.table_data_client
row_response = data_client.read_modify_write_row(
table_name=self._table.name,
table = self._table._table_impl
row_response = table.read_modify_write_row(
row_key=self._row_key,
rules=self._rule_pb_list,
app_profile_id=self._table._app_profile_id,
rules=self._rule_list,
)

# Reset modifications after commit-ing request.
Expand Down Expand Up @@ -983,47 +970,13 @@ def _parse_rmw_row_response(row_response):
}
"""
result = {}
for column_family in row_response.row.families:
column_family_id, curr_family = _parse_family_pb(column_family)
result[column_family_id] = curr_family
for cell in row_response.cells:
column_family = result.setdefault(cell.family, {})
column = column_family.setdefault(cell.qualifier, [])
column.append((cell.value, _datetime_from_microseconds(cell.timestamp_micros)))
return result


def _parse_family_pb(family_pb):
"""Parses a Family protobuf into a dictionary.

:type family_pb: :class:`._generated.data_pb2.Family`
:param family_pb: A protobuf

:rtype: tuple
:returns: A string and dictionary. The string is the name of the
column family and the dictionary has column names (within the
family) as keys and cell lists as values. Each cell is
represented with a two-tuple with the value (in bytes) and the
timestamp for the cell. For example:

.. code:: python

{
b'col-name1': [
(b'cell-val', datetime.datetime(...)),
(b'cell-val-newer', datetime.datetime(...)),
],
b'col-name2': [
(b'altcol-cell-val', datetime.datetime(...)),
],
}
"""
result = {}
for column in family_pb.columns:
result[column.qualifier] = cells = []
for cell in column.cells:
val_pair = (cell.value, _datetime_from_microseconds(cell.timestamp_micros))
cells.append(val_pair)

return family_pb.name, result


class PartialRowData(object):
"""Representation of partial row in a Google Cloud Bigtable Table.

Expand Down
2 changes: 1 addition & 1 deletion google/cloud/bigtable/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -1368,7 +1368,7 @@ def _compile_mutation_entries(table_name, rows):
for row in rows:
_check_row_table_name(table_name, row)
_check_row_type(row)
mutations = row._get_mutations()
mutations = row._get_mutation_pbs()
entries.append(entry_klass(row_key=row.row_key, mutations=mutations))
mutations_count += len(mutations)

Expand Down
Loading
Loading