Source code for gcloud_bigtable.row

# Copyright 2015 Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""User friendly container for Google Cloud Bigtable Row."""


import six
import struct

from gcloud_bigtable._generated import bigtable_data_pb2 as data_pb2
from gcloud_bigtable._generated import (
    bigtable_service_messages_pb2 as messages_pb2)
from gcloud_bigtable._helpers import _parse_family_pb
from gcloud_bigtable._helpers import _timestamp_to_microseconds
from gcloud_bigtable._helpers import _to_bytes


_MAX_MUTATIONS = 100000


[docs]class Row(object): """Representation of a Google Cloud Bigtable Row. .. note:: A :class:`Row` accumulates mutations locally via the :meth:`set_cell`, :meth:`delete`, :meth:`delete_cell` and :meth:`delete_cells` methods. To actually send these mutations to the Google Cloud Bigtable API, you must call :meth:`commit`. If a ``filter`` is set on the :class:`Row`, the mutations must have an associated state: :data:`True` or :data:`False`. The mutations will be applied conditionally, based on whether the filter matches any cells in the :class:`Row` or not. :type row_key: bytes :param row_key: The key for the current row. :type table: :class:`.table.Table` :param table: The table that owns the row. :type filter: :class:`RowFilter`, :class:`RowFilterChain`, :class:`RowFilterUnion` or :class:`ConditionalRowFilter` :param filter: (Optional) Filter to be used for conditional mutations. If a filter is set, then the :class:`Row` will accumulate mutations for either a :data:`True` or :data:`False` state. When :meth:`commit`-ed, the mutations for the :data:`True` state will be applied if the filter matches any cells in the row, otherwise the :data:`False` state will be. """ ALL_COLUMNS = object() """Sentinel value used to indicate all columns in a column family.""" def __init__(self, row_key, table, filter=None): self._row_key = _to_bytes(row_key) self._table = table self._filter = filter self._rule_pb_list = [] if self._filter is None: self._pb_mutations = [] self._true_pb_mutations = None self._false_pb_mutations = None else: self._pb_mutations = None self._true_pb_mutations = [] self._false_pb_mutations = [] @property def table(self): """Getter for row's table. :rtype: :class:`.table.Table` :returns: The table stored on the row. """ return self._table @property def row_key(self): """Getter for row's key. :rtype: bytes :returns: The key for the row. """ return self._row_key @property def filter(self): """Getter for row's filter. :rtype: :class:`RowFilter`, :class:`RowFilterChain`, :class:`RowFilterUnion`, :class:`ConditionalRowFilter` or :data:`NoneType <types.NoneType>` :returns: The filter for the row. """ return self._filter @property def client(self): """Getter for row's client. :rtype: :class:`.client.Client` :returns: The client that owns this row. """ return self.table.client @property def timeout_seconds(self): """Getter for row's default timeout seconds. :rtype: int :returns: The timeout seconds default. """ return self.table.timeout_seconds def _get_mutations(self, state=None): """Gets the list of mutations for a given state. If the state is :data`None` but there is a filter set, then we've reached an invalid state. Similarly if no filter is set but the state is not :data:`None`. :type state: bool :param state: (Optional) The state that the mutation should be applied in. Unset if the mutation is not conditional, otherwise :data:`True` or :data:`False`. :rtype: list :returns: The list to add new mutations to (for the current state). :raises: :class:`ValueError <exceptions.ValueError>` """ if state is None: if self.filter is not None: raise ValueError('A filter is set on the current row, but no ' 'state given for the mutation') return self._pb_mutations else: if self.filter is None: raise ValueError('No filter was set on the current row, but a ' 'state was given for the mutation') if state: return self._true_pb_mutations else: return self._false_pb_mutations
[docs] def set_cell(self, column_family_id, column, value, timestamp=None, state=None): """Sets a value in this row. The cell is determined by the ``row_key`` of the :class:`Row` and the ``column``. The ``column`` must be in an existing :class:`.column_family.ColumnFamily` (as determined by ``column_family_id``). .. note:: This method adds a mutation to the accumulated mutations on this :class:`Row`, but does not make an API request. To actually send an API request (with the mutations) to the Google Cloud Bigtable API, call :meth:`commit`. :type column_family_id: str :param column_family_id: The column family that contains the column. Must be of the form ``[_a-zA-Z0-9][-_.a-zA-Z0-9]*``. :type column: bytes :param column: The column within the column family where the cell is located. :type value: bytes or :class:`int` :param value: The value to set in the cell. If an integer is used, will be interpreted as a 64-bit big-endian signed integer (8 bytes). :type timestamp: :class:`datetime.datetime` :param timestamp: (Optional) The timestamp of the operation. :type state: bool :param state: (Optional) The state that the mutation should be applied in. Unset if the mutation is not conditional, otherwise :data:`True` or :data:`False`. """ column = _to_bytes(column) if isinstance(value, six.integer_types): value = struct.pack('>q', value) value = _to_bytes(value) if timestamp is None: # Use -1 for current Bigtable server time. timestamp_micros = -1 else: timestamp_micros = _timestamp_to_microseconds(timestamp) mutation_val = data_pb2.Mutation.SetCell( family_name=column_family_id, column_qualifier=column, timestamp_micros=timestamp_micros, value=value, ) mutation_pb = data_pb2.Mutation(set_cell=mutation_val) self._get_mutations(state).append(mutation_pb)
[docs] def append_cell_value(self, column_family_id, column, value): """Appends a value to an existing cell. .. note:: This method adds a read-modify rule protobuf to the accumulated read-modify rules on this :class:`Row`, but does not make an API request. To actually send an API request (with the rules) to the Google Cloud Bigtable API, call :meth:`commit`. :type column_family_id: str :param column_family_id: The column family that contains the column. Must be of the form ``[_a-zA-Z0-9][-_.a-zA-Z0-9]*``. :type column: bytes :param column: The column within the column family where the cell is located. :type value: bytes :param value: The value to append to the existing value in the cell. If the targeted cell is unset, it will be treated as containing the empty string. """ column = _to_bytes(column) value = _to_bytes(value) rule_pb = data_pb2.ReadModifyWriteRule(family_name=column_family_id, column_qualifier=column, append_value=value) self._rule_pb_list.append(rule_pb)
[docs] def increment_cell_value(self, column_family_id, column, int_value): """Increments a value in an existing cell. Assumes the value in the cell is stored as a 64 bit integer serialized to bytes. .. note:: This method adds a read-modify rule protobuf to the accumulated read-modify rules on this :class:`Row`, but does not make an API request. To actually send an API request (with the rules) to the Google Cloud Bigtable API, call :meth:`commit`. :type column_family_id: str :param column_family_id: The column family that contains the column. Must be of the form ``[_a-zA-Z0-9][-_.a-zA-Z0-9]*``. :type column: bytes :param column: The column within the column family where the cell is located. :type int_value: int :param int_value: The value to increment the existing value in the cell by. If the targeted cell is unset, it will be treated as containing a zero. Otherwise, the targeted cell must contain an 8-byte value (interpreted as a 64-bit big-endian signed integer), or the entire request will fail. """ column = _to_bytes(column) rule_pb = data_pb2.ReadModifyWriteRule(family_name=column_family_id, column_qualifier=column, increment_amount=int_value) self._rule_pb_list.append(rule_pb)
[docs] def delete(self, state=None): """Deletes this row from the table. .. note:: This method adds a mutation to the accumulated mutations on this :class:`Row`, but does not make an API request. To actually send an API request (with the mutations) to the Google Cloud Bigtable API, call :meth:`commit`. :type state: bool :param state: (Optional) The state that the mutation should be applied in. Unset if the mutation is not conditional, otherwise :data:`True` or :data:`False`. """ mutation_val = data_pb2.Mutation.DeleteFromRow() mutation_pb = data_pb2.Mutation(delete_from_row=mutation_val) self._get_mutations(state).append(mutation_pb)
[docs] def delete_cell(self, column_family_id, column, time_range=None, state=None): """Deletes cell in this row. .. note:: This method adds a mutation to the accumulated mutations on this :class:`Row`, but does not make an API request. To actually send an API request (with the mutations) to the Google Cloud Bigtable API, call :meth:`commit`. :type column_family_id: str :param column_family_id: The column family that contains the column or columns with cells being deleted. Must be of the form ``[_a-zA-Z0-9][-_.a-zA-Z0-9]*``. :type column: bytes :param column: The column within the column family that will have a cell deleted. :type time_range: :class:`TimestampRange` :param time_range: (Optional) The range of time within which cells should be deleted. :type state: bool :param state: (Optional) The state that the mutation should be applied in. Unset if the mutation is not conditional, otherwise :data:`True` or :data:`False`. """ self.delete_cells(column_family_id, [column], time_range=time_range, state=state)
[docs] def delete_cells(self, column_family_id, columns, time_range=None, state=None): """Deletes cells in this row. .. note:: This method adds a mutation to the accumulated mutations on this :class:`Row`, but does not make an API request. To actually send an API request (with the mutations) to the Google Cloud Bigtable API, call :meth:`commit`. :type column_family_id: str :param column_family_id: The column family that contains the column or columns with cells being deleted. Must be of the form ``[_a-zA-Z0-9][-_.a-zA-Z0-9]*``. :type columns: :class:`list` of :class:`str` / :func:`unicode <unicode>`, or :class:`object` :param columns: The columns within the column family that will have cells deleted. If :attr:`Row.ALL_COLUMNS` is used then the entire column family will be deleted from the row. :type time_range: :class:`TimestampRange` :param time_range: (Optional) The range of time within which cells should be deleted. :type state: bool :param state: (Optional) The state that the mutation should be applied in. Unset if the mutation is not conditional, otherwise :data:`True` or :data:`False`. """ mutations_list = self._get_mutations(state) if columns is self.ALL_COLUMNS: mutation_val = data_pb2.Mutation.DeleteFromFamily( family_name=column_family_id, ) mutation_pb = data_pb2.Mutation(delete_from_family=mutation_val) mutations_list.append(mutation_pb) else: delete_kwargs = {} if time_range is not None: delete_kwargs['time_range'] = time_range.to_pb() to_append = [] for column in columns: column = _to_bytes(column) # time_range will never change if present, but the rest of # delete_kwargs will delete_kwargs.update( family_name=column_family_id, column_qualifier=column, ) mutation_val = data_pb2.Mutation.DeleteFromColumn( **delete_kwargs) mutation_pb = data_pb2.Mutation( delete_from_column=mutation_val) to_append.append(mutation_pb) # We don't add the mutations until all columns have been # processed without error. mutations_list.extend(to_append)
def _commit_mutate(self, timeout_seconds=None): """Makes a ``MutateRow`` API request. Assumes no filter is set on the :class:`Row` and is meant to be called by :meth:`commit`. :type timeout_seconds: int :param timeout_seconds: Number of seconds for request time-out. If not passed, defaults to value set on row. :raises: :class:`ValueError <exceptions.ValueError>` if the number of mutations exceeds the ``_MAX_MUTATIONS``. """ mutations_list = self._get_mutations(None) num_mutations = len(mutations_list) if num_mutations == 0: return if num_mutations > _MAX_MUTATIONS: raise ValueError('%d total mutations exceed the maximum allowable ' '%d.' % (num_mutations, _MAX_MUTATIONS)) request_pb = messages_pb2.MutateRowRequest( table_name=self.table.name, row_key=self.row_key, mutations=mutations_list, ) timeout_seconds = timeout_seconds or self.timeout_seconds response = self.client.data_stub.MutateRow.async(request_pb, timeout_seconds) # We expect a `._generated.empty_pb2.Empty`. response.result() def _commit_check_and_mutate(self, timeout_seconds=None): """Makes a ``CheckAndMutateRow`` API request. Assumes a filter is set on the :class:`Row` and is meant to be called by :meth:`commit`. :type timeout_seconds: int :param timeout_seconds: Number of seconds for request time-out. If not passed, defaults to value set on row. :rtype: bool :returns: Flag indicating if the filter was matched (which also indicates which set of mutations were applied by the server). :raises: :class:`ValueError <exceptions.ValueError>` if the number of mutations exceeds the ``_MAX_MUTATIONS``. """ true_mutations = self._get_mutations(True) false_mutations = self._get_mutations(False) num_true_mutations = len(true_mutations) num_false_mutations = len(false_mutations) if num_true_mutations == 0 and num_false_mutations == 0: return if (num_true_mutations > _MAX_MUTATIONS or num_false_mutations > _MAX_MUTATIONS): raise ValueError( 'Exceed the maximum allowable mutations (%d). Had %s true ' 'mutations and %d false mutations.' % ( _MAX_MUTATIONS, num_true_mutations, num_false_mutations)) request_pb = messages_pb2.CheckAndMutateRowRequest( table_name=self.table.name, row_key=self.row_key, predicate_filter=self.filter.to_pb(), true_mutations=true_mutations, false_mutations=false_mutations, ) timeout_seconds = timeout_seconds or self.timeout_seconds response = self.client.data_stub.CheckAndMutateRow.async( request_pb, timeout_seconds) # We expect a `.messages_pb2.CheckAndMutateRowResponse` check_and_mutate_row_response = response.result() return check_and_mutate_row_response.predicate_matched
[docs] def clear_mutations(self): """Removes all currently accumulated mutations on the current row.""" if self.filter is None: self._pb_mutations[:] = [] else: self._true_pb_mutations[:] = [] self._false_pb_mutations[:] = []
[docs] def commit(self, timeout_seconds=None): """Makes a ``MutateRow`` or ``CheckAndMutateRow`` API request. If no mutations have been created in the row, no request is made. Mutations are applied atomically and in order, meaning that earlier mutations can be masked / negated by later ones. Cells already present in the row are left unchanged unless explicitly changed by a mutation. After committing the accumulated mutations, resets the local mutations to an empty list. In the case that a filter is set on the :class:`Row`, the mutations will be applied conditionally, based on whether the filter matches any cells in the :class:`Row` or not. (Each method which adds a mutation has a ``state`` parameter for this purpose.) :type timeout_seconds: int :param timeout_seconds: Number of seconds for request time-out. If not passed, defaults to value set on row. :rtype: :class:`bool` or :data:`NoneType <types.NoneType>` :returns: :data:`None` if there is no filter, otherwise a flag indicating if the filter was matched (which also indicates which set of mutations were applied by the server). :raises: :class:`ValueError <exceptions.ValueError>` if the number of mutations exceeds the ``_MAX_MUTATIONS``. """ if self.filter is None: result = self._commit_mutate(timeout_seconds=timeout_seconds) else: result = self._commit_check_and_mutate( timeout_seconds=timeout_seconds) # Reset mutations after commit-ing request. self.clear_mutations() return result
[docs] def clear_modification_rules(self): """Removes all currently accumulated modifications on current row.""" self._rule_pb_list[:] = []
[docs] def commit_modifications(self, timeout_seconds=None): """Makes a ``ReadModifyWriteRow`` API request. This commits modifications made by :meth:`append_cell_value` and :meth:`increment_cell_value`. If no modifications were made, makes no API request and just returns ``{}``. Modifies a row atomically, reading the latest existing timestamp/value from the specified columns and writing a new value by appending / incrementing. The new cell created uses either the current server time or the highest timestamp of a cell in that column (if it exceeds the server time). :type timeout_seconds: int :param timeout_seconds: Number of seconds for request time-out. If not passed, defaults to value set on row. :rtype: dict :returns: The new contents of all modified cells. Returned as a dictionary of column families, each of which holds a dictionary of columns. Each column contains a list of cells modified. Each cell is represented with a two-tuple with the value (in bytes) and the timestamp for the cell. For example: .. code:: python { u'col-fam-id': { b'col-name1': [ (b'cell-val', datetime.datetime(...)), (b'cell-val-newer', datetime.datetime(...)), ], b'col-name2': [ (b'altcol-cell-val', datetime.datetime(...)), ], }, u'col-fam-id2': { b'col-name3-but-other-fam': [ (b'foo', datetime.datetime(...)), ], }, } """ if len(self._rule_pb_list) == 0: return {} request_pb = messages_pb2.ReadModifyWriteRowRequest( table_name=self.table.name, row_key=self.row_key, rules=self._rule_pb_list, ) timeout_seconds = timeout_seconds or self.timeout_seconds response = self.client.data_stub.ReadModifyWriteRow.async( request_pb, timeout_seconds) # We expect a `.data_pb2.Row` row_response = response.result() # Reset modifications after commit-ing request. self.clear_modification_rules() # NOTE: We expect row_response.key == self.row_key but don't check. return _parse_rmw_row_response(row_response) # NOTE: For developers, this class may seem to be a bit verbose, i.e. # a list of property names and **kwargs may do the trick better # than actually listing every single argument. However, for the sake # of users and documentation, listing every single argument is more # useful.
[docs]class RowFilter(object): """Basic filter to apply to cells in a row. These values can be combined via :class:`RowFilterChain`, :class:`RowFilterUnion` and :class:`ConditionalRowFilter`. The regex filters must be valid RE2 patterns. See Google's `RE2 reference`_ for the accepted syntax. .. _RE2 reference: https://github.com/google/re2/wiki/Syntax .. note:: At most one of the keyword arguments can be specified at once. .. note:: For :class:`bytes` regex filters (``row_key``, ``column_qualifier`` and ``value``), special care need be used with the expression used. Since each of these properties can contain arbitrary bytes, the ``\\C`` escape sequence must be used if a true wildcard is desired. The ``.`` character will not match the new line character ``\\n``, which may be present in a binary value. :type row_key_regex_filter: bytes :param row_key_regex_filter: A regular expression (RE2) to match cells from rows with row keys that satisfy this regex. For a ``CheckAndMutateRowRequest``, this filter is unnecessary since the row key is already specified. :type family_name_regex_filter: str :param family_name_regex_filter: A regular expression (RE2) to match cells from columns in a given column family. For technical reasons, the regex must not contain the ``':'`` character, even if it isnot being uses as a literal. :type column_qualifier_regex_filter: bytes :param column_qualifier_regex_filter: A regular expression (RE2) to match cells from column that match this regex (irrespective of column family). :type value_regex_filter: bytes :param value_regex_filter: A regular expression (RE2) to match cells with values that match this regex. :type column_range_filter: :class:`ColumnRange` :param column_range_filter: Range of columns to limit cells to. :type timestamp_range_filter: :class:`TimestampRange` :param timestamp_range_filter: Range of time that cells should match against. :type value_range_filter: :class:`CellValueRange` :param value_range_filter: Range of cell values to filter for. :type cells_per_row_offset_filter: int :param cells_per_row_offset_filter: Skips the first N cells of the row. :type cells_per_row_limit_filter: int :param cells_per_row_limit_filter: Matches only the first N cells of the row. :type cells_per_column_limit_filter: int :param cells_per_column_limit_filter: Matches only the most recent N cells within each column. This filters a (family name, column) pair, based on timestamps of each cell. :type row_sample_filter: float :param row_sample_filter: Non-deterministic filter. Matches all cells from a row with probability p, and matches no cells from the row with probability 1-p. (Here, the probability p is ``row_sample_filter``.) :type strip_value_transformer: bool :param strip_value_transformer: If :data:`True`, replaces each cell's value with the empty string. As the name indicates, this is more useful as a transformer than a generic query / filter. :raises: :class:`TypeError <exceptions.TypeError>` if not exactly one value set in the constructor. """ def __init__(self, row_key_regex_filter=None, family_name_regex_filter=None, column_qualifier_regex_filter=None, value_regex_filter=None, column_range_filter=None, timestamp_range_filter=None, value_range_filter=None, cells_per_row_offset_filter=None, cells_per_row_limit_filter=None, cells_per_column_limit_filter=None, row_sample_filter=None, strip_value_transformer=None): self.row_key_regex_filter = row_key_regex_filter self.family_name_regex_filter = family_name_regex_filter self.column_qualifier_regex_filter = column_qualifier_regex_filter self.value_regex_filter = value_regex_filter self.column_range_filter = column_range_filter self.timestamp_range_filter = timestamp_range_filter self.value_range_filter = value_range_filter self.cells_per_row_offset_filter = cells_per_row_offset_filter self.cells_per_row_limit_filter = cells_per_row_limit_filter self.cells_per_column_limit_filter = cells_per_column_limit_filter self.row_sample_filter = row_sample_filter self.strip_value_transformer = strip_value_transformer self._check_single_value() def _check_single_value(self): """Checks that exactly one value is set on the instance. :raises: :class:`TypeError <exceptions.TypeError>` if not exactly one value set on the instance. """ values_set = ( int(self.row_key_regex_filter is not None) + int(self.family_name_regex_filter is not None) + int(self.column_qualifier_regex_filter is not None) + int(self.value_regex_filter is not None) + int(self.column_range_filter is not None) + int(self.timestamp_range_filter is not None) + int(self.value_range_filter is not None) + int(self.cells_per_row_offset_filter is not None) + int(self.cells_per_row_limit_filter is not None) + int(self.cells_per_column_limit_filter is not None) + int(self.row_sample_filter is not None) + int(self.strip_value_transformer is not None) ) if values_set != 1: raise TypeError('Exactly one value must be set in a row filter') def __eq__(self, other): if not isinstance(other, self.__class__): return False return ( other.row_key_regex_filter == self.row_key_regex_filter and other.family_name_regex_filter == self.family_name_regex_filter and (other.column_qualifier_regex_filter == self.column_qualifier_regex_filter) and other.value_regex_filter == self.value_regex_filter and other.column_range_filter == self.column_range_filter and other.timestamp_range_filter == self.timestamp_range_filter and other.value_range_filter == self.value_range_filter and (other.cells_per_row_offset_filter == self.cells_per_row_offset_filter) and (other.cells_per_row_limit_filter == self.cells_per_row_limit_filter) and (other.cells_per_column_limit_filter == self.cells_per_column_limit_filter) and other.row_sample_filter == self.row_sample_filter and other.strip_value_transformer == self.strip_value_transformer ) def __ne__(self, other): return not self.__eq__(other)
[docs] def to_pb(self): """Converts the :class:`RowFilter` to a protobuf. :rtype: :class:`.data_pb2.RowFilter` :returns: The converted current object. """ self._check_single_value() row_filter_kwargs = {} if self.row_key_regex_filter is not None: row_filter_kwargs['row_key_regex_filter'] = _to_bytes( self.row_key_regex_filter) if self.family_name_regex_filter is not None: row_filter_kwargs['family_name_regex_filter'] = ( self.family_name_regex_filter) if self.column_qualifier_regex_filter is not None: row_filter_kwargs['column_qualifier_regex_filter'] = _to_bytes( self.column_qualifier_regex_filter) if self.value_regex_filter is not None: row_filter_kwargs['value_regex_filter'] = _to_bytes( self.value_regex_filter) if self.column_range_filter is not None: row_filter_kwargs['column_range_filter'] = ( self.column_range_filter.to_pb()) if self.timestamp_range_filter is not None: row_filter_kwargs['timestamp_range_filter'] = ( self.timestamp_range_filter.to_pb()) if self.value_range_filter is not None: row_filter_kwargs['value_range_filter'] = ( self.value_range_filter.to_pb()) if self.cells_per_row_offset_filter is not None: row_filter_kwargs['cells_per_row_offset_filter'] = ( self.cells_per_row_offset_filter) if self.cells_per_row_limit_filter is not None: row_filter_kwargs['cells_per_row_limit_filter'] = ( self.cells_per_row_limit_filter) if self.cells_per_column_limit_filter is not None: row_filter_kwargs['cells_per_column_limit_filter'] = ( self.cells_per_column_limit_filter) if self.row_sample_filter is not None: row_filter_kwargs['row_sample_filter'] = ( self.row_sample_filter) if self.strip_value_transformer is not None: row_filter_kwargs['strip_value_transformer'] = ( self.strip_value_transformer) return data_pb2.RowFilter(**row_filter_kwargs)
[docs]class TimestampRange(object): """Range of time with inclusive lower and exclusive upper bounds. :type start: :class:`datetime.datetime` :param start: (Optional) The (inclusive) lower bound of the timestamp range. If omitted, defaults to Unix epoch. :type end: :class:`datetime.datetime` :param end: (Optional) The (exclusive) upper bound of the timestamp range. If omitted, defaults to "infinity" (no upper bound). """ def __init__(self, start=None, end=None): self.start = start self.end = end def __eq__(self, other): if not isinstance(other, self.__class__): return False return (other.start == self.start and other.end == self.end) def __ne__(self, other): return not self.__eq__(other)
[docs] def to_pb(self): """Converts the :class:`TimestampRange` to a protobuf. :rtype: :class:`.data_pb2.TimestampRange` :returns: The converted current object. """ timestamp_range_kwargs = {} if self.start is not None: timestamp_range_kwargs['start_timestamp_micros'] = ( _timestamp_to_microseconds(self.start)) if self.end is not None: timestamp_range_kwargs['end_timestamp_micros'] = ( _timestamp_to_microseconds(self.end)) return data_pb2.TimestampRange(**timestamp_range_kwargs)
[docs]class ColumnRange(object): """A range of columns to restrict to in a row filter. Both the start and end column can be included or excluded in the range. By default, we include them both, but this can be changed with optional flags. :type column_family_id: str :param column_family_id: The column family that contains the columns. Must be of the form ``[_a-zA-Z0-9][-_.a-zA-Z0-9]*``. :type start_column: bytes :param start_column: The start of the range of columns. If no value is used, it is interpreted as the empty string (inclusive) by the backend. :type end_column: bytes :param end_column: The end of the range of columns. If no value is used, it is interpreted as the infinite string (exclusive) by the backend. :type inclusive_start: bool :param inclusive_start: Boolean indicating if the start column should be included in the range (or excluded). :type inclusive_end: bool :param inclusive_end: Boolean indicating if the end column should be included in the range (or excluded). """ def __init__(self, column_family_id, start_column=None, end_column=None, inclusive_start=True, inclusive_end=True): self.column_family_id = column_family_id self.start_column = start_column self.end_column = end_column self.inclusive_start = inclusive_start self.inclusive_end = inclusive_end def __eq__(self, other): if not isinstance(other, self.__class__): return False return (other.column_family_id == self.column_family_id and other.start_column == self.start_column and other.end_column == self.end_column and other.inclusive_start == self.inclusive_start and other.inclusive_end == self.inclusive_end) def __ne__(self, other): return not self.__eq__(other)
[docs] def to_pb(self): """Converts the :class:`ColumnRange` to a protobuf. :rtype: :class:`.data_pb2.ColumnRange` :returns: The converted current object. """ column_range_kwargs = {'family_name': self.column_family_id} if self.start_column is not None: if self.inclusive_start: key = 'start_qualifier_inclusive' else: key = 'start_qualifier_exclusive' column_range_kwargs[key] = _to_bytes(self.start_column) if self.end_column is not None: if self.inclusive_end: key = 'end_qualifier_inclusive' else: key = 'end_qualifier_exclusive' column_range_kwargs[key] = _to_bytes(self.end_column) return data_pb2.ColumnRange(**column_range_kwargs)
[docs]class CellValueRange(object): """A range of values to restrict to in a row filter. With only match cells that have values in this range. Both the start and end value can be included or excluded in the range. By default, we include them both, but this can be changed with optional flags. :type start_value: bytes :param start_value: The start of the range of values. If no value is used, it is interpreted as the empty string (inclusive) by the backend. :type end_value: bytes :param end_value: The end of the range of values. If no value is used, it is interpreted as the infinite string (exclusive) by the backend. :type inclusive_start: bool :param inclusive_start: Boolean indicating if the start value should be included in the range (or excluded). :type inclusive_end: bool :param inclusive_end: Boolean indicating if the end value should be included in the range (or excluded). """ def __init__(self, start_value=None, end_value=None, inclusive_start=True, inclusive_end=True): self.start_value = start_value self.end_value = end_value self.inclusive_start = inclusive_start self.inclusive_end = inclusive_end def __eq__(self, other): if not isinstance(other, self.__class__): return False return (other.start_value == self.start_value and other.end_value == self.end_value and other.inclusive_start == self.inclusive_start and other.inclusive_end == self.inclusive_end) def __ne__(self, other): return not self.__eq__(other)
[docs] def to_pb(self): """Converts the :class:`CellValueRange` to a protobuf. :rtype: :class:`.data_pb2.ValueRange` :returns: The converted current object. """ value_range_kwargs = {} if self.start_value is not None: if self.inclusive_start: key = 'start_value_inclusive' else: key = 'start_value_exclusive' value_range_kwargs[key] = _to_bytes(self.start_value) if self.end_value is not None: if self.inclusive_end: key = 'end_value_inclusive' else: key = 'end_value_exclusive' value_range_kwargs[key] = _to_bytes(self.end_value) return data_pb2.ValueRange(**value_range_kwargs)
[docs]class RowFilterChain(object): """Chain of row filters. Sends rows through several filters in sequence. The filters are "chained" together to process a row. After the first filter is applied, the second is applied to the filtered output and so on for subsequent filters. :type filters: list :param filters: List of :class:`RowFilter`, :class:`RowFilterChain`, :class:`RowFilterUnion` and/or :class:`ConditionalRowFilter` """ def __init__(self, filters=None): self.filters = filters def __eq__(self, other): if not isinstance(other, self.__class__): return False return other.filters == self.filters def __ne__(self, other): return not self.__eq__(other)
[docs] def to_pb(self): """Converts the :class:`RowFilterChain` to a protobuf. :rtype: :class:`.data_pb2.RowFilter` :returns: The converted current object. """ chain = data_pb2.RowFilter.Chain( filters=[row_filter.to_pb() for row_filter in self.filters]) return data_pb2.RowFilter(chain=chain)
[docs]class RowFilterUnion(object): """Union of row filters. Sends rows through several filters simultaneously, then merges / interleaves all the filtered results together. If multiple cells are produced with the same column and timestamp, they will all appear in the output row in an unspecified mutual order. :type filters: list :param filters: List of :class:`RowFilter`, :class:`RowFilterChain`, :class:`RowFilterUnion` and/or :class:`ConditionalRowFilter` """ def __init__(self, filters=None): self.filters = filters def __eq__(self, other): if not isinstance(other, self.__class__): return False return other.filters == self.filters def __ne__(self, other): return not self.__eq__(other)
[docs] def to_pb(self): """Converts the :class:`RowFilterUnion` to a protobuf. :rtype: :class:`.data_pb2.RowFilter` :returns: The converted current object. """ interleave = data_pb2.RowFilter.Interleave( filters=[row_filter.to_pb() for row_filter in self.filters]) return data_pb2.RowFilter(interleave=interleave)
[docs]class ConditionalRowFilter(object): """Conditional filter Executes one of two filters based on another filter. If the ``base_filter`` returns any cells in the row, then ``true_filter`` is executed. If not, then ``false_filter`` is executed. .. note:: The ``base_filter`` does not execute atomically with the true and false filters, which may lead to inconsistent or unexpected results. Additionally, executing a :class:`ConditionalRowFilter` has poor performance on the server, especially when ``false_filter`` is set. :type base_filter: :class:`RowFilter`, :class:`RowFilterChain`, :class:`RowFilterUnion` or :class:`ConditionalRowFilter` :param base_filter: The filter to condition on before executing the true/false filters. :type true_filter: :class:`RowFilter`, :class:`RowFilterChain`, :class:`RowFilterUnion` or :class:`ConditionalRowFilter` :param true_filter: (Optional) The filter to execute if there are any cells matching ``base_filter``. If not provided, no results will be returned in the true case. :type false_filter: :class:`RowFilter`, :class:`RowFilterChain`, :class:`RowFilterUnion` or :class:`ConditionalRowFilter` :param false_filter: (Optional) The filter to execute if there are no cells matching ``base_filter``. If not provided, no results will be returned in the false case. """ def __init__(self, base_filter, true_filter=None, false_filter=None): self.base_filter = base_filter self.true_filter = true_filter self.false_filter = false_filter def __eq__(self, other): if not isinstance(other, self.__class__): return False return (other.base_filter == self.base_filter and other.true_filter == self.true_filter and other.false_filter == self.false_filter) def __ne__(self, other): return not self.__eq__(other)
[docs] def to_pb(self): """Converts the :class:`ConditionalRowFilter` to a protobuf. :rtype: :class:`.data_pb2.RowFilter` :returns: The converted current object. """ condition_kwargs = {'predicate_filter': self.base_filter.to_pb()} if self.true_filter is not None: condition_kwargs['true_filter'] = self.true_filter.to_pb() if self.false_filter is not None: condition_kwargs['false_filter'] = self.false_filter.to_pb() condition = data_pb2.RowFilter.Condition(**condition_kwargs) return data_pb2.RowFilter(condition=condition)
def _parse_rmw_row_response(row_response): """Parses the response to a ``ReadModifyWriteRow`` request. :type row_response: :class:`.data_pb2.Row` :param row_response: The response row (with only modified cells) from a ``ReadModifyWriteRow`` request. :rtype: dict :returns: The new contents of all modified cells. Returned as a dictionary of column families, each of which holds a dictionary of columns. Each column contains a list of cells modified. Each cell is represented with a two-tuple with the value (in bytes) and the timestamp for the cell. For example: .. code:: python { u'col-fam-id': { b'col-name1': [ (b'cell-val', datetime.datetime(...)), (b'cell-val-newer', datetime.datetime(...)), ], b'col-name2': [ (b'altcol-cell-val', datetime.datetime(...)), ], }, u'col-fam-id2': { b'col-name3-but-other-fam': [ (b'foo', datetime.datetime(...)), ], }, } """ result = {} for column_family in row_response.families: column_family_id, curr_family = _parse_family_pb(column_family) result[column_family_id] = curr_family return result