Source code for gcloud_bigtable.row_data

# Copyright 2015 Google Inc. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Container for Google Cloud Bigtable Cells and Streaming Row Contents."""


import copy

from gcloud_bigtable._helpers import _microseconds_to_timestamp


[docs]class Cell(object): """Representation of a Google Cloud Bigtable Cell. :type value: bytes :param value: The value stored in the cell. :type timestamp: :class:`datetime.datetime` :param timestamp: The timestamp when the cell was stored. """ def __init__(self, value, timestamp): self.value = value self.timestamp = timestamp @classmethod
[docs] def from_pb(cls, cell_pb): """Create a new cell from a Cell protobuf. :type cell_pb: :class:`._generated.bigtable_data_pb2.Cell` :param cell_pb: The protobuf to convert. :rtype: :class:`Cell` :returns: The cell corresponding to the protobuf. """ timestamp = _microseconds_to_timestamp(cell_pb.timestamp_micros) return cls(cell_pb.value, timestamp)
def __eq__(self, other): if not isinstance(other, self.__class__): return False return (other.value == self.value and other.timestamp == self.timestamp) def __ne__(self, other): return not self.__eq__(other)
[docs]class PartialRowData(object): """Representation of partial row in a Google Cloud Bigtable Table. These are expected to be updated directly from a :class:`._generated.bigtable_service_messages_pb2.ReadRowsResponse` :type row_key: bytes :param row_key: The key for the row holding the (partial) data. """ def __init__(self, row_key): self._row_key = row_key self._cells = {} self._committed = False def __eq__(self, other): if not isinstance(other, self.__class__): return False return (other._row_key == self._row_key and other._committed == self._committed and other._cells == self._cells) def __ne__(self, other): return not self.__eq__(other) @property def cells(self): """Property returning all the cells accumulated on this partial row. :rtype: dict :returns: Dictionary of the :class:`Cell` objects accumulated. This dictionary has two-levels of keys (first for column families and second for column names/qualifiers within a family). For a given column, a list of :class:`Cell` objects is stored. """ return copy.deepcopy(self._cells) @property def row_key(self): """Getter for the current (partial) row's key. :rtype: bytes :returns: The current (partial) row's key. """ return self._row_key @property def committed(self): """Getter for the committed status of the (partial) row. :rtype: bool :returns: The committed status of the (partial) row. """ return self._committed
[docs] def clear(self): """Clears all cells that have been added.""" self._committed = False self._cells.clear()
def _handle_commit_row(self, chunk, index, last_chunk_index): """Handles a ``commit_row`` chunk. :type chunk: ``ReadRowsResponse.Chunk`` :param chunk: The chunk being handled. :type index: int :param index: The current index of the chunk. :type last_chunk_index: int :param last_chunk_index: The index of the last chunk. :raises: :class:`ValueError <exceptions.ValueError>` if the value of ``commit_row`` is :data:`False` or if the chunk passed is not the last chunk in a response. """ # NOTE: We assume the caller has checked that the ``ONEOF`` property # for ``chunk`` is ``commit_row``. if not chunk.commit_row: raise ValueError('Received commit_row that was False.') if index != last_chunk_index: raise ValueError('Commit row chunk was not the last chunk') else: self._committed = True def _handle_reset_row(self, chunk): """Handles a ``reset_row`` chunk. :type chunk: ``ReadRowsResponse.Chunk`` :param chunk: The chunk being handled. :raises: :class:`ValueError <exceptions.ValueError>` if the value of ``reset_row`` is :data:`False` """ # NOTE: We assume the caller has checked that the ``ONEOF`` property # for ``chunk`` is ``reset_row``. if not chunk.reset_row: raise ValueError('Received reset_row that was False.') self.clear() def _handle_row_contents(self, chunk): """Handles a ``row_contents`` chunk. :type chunk: ``ReadRowsResponse.Chunk`` :param chunk: The chunk being handled. """ # NOTE: We assume the caller has checked that the ``ONEOF`` property # for ``chunk`` is ``row_contents``. # chunk.row_contents is ._generated.bigtable_data_pb2.Family column_family_id = chunk.row_contents.name column_family_dict = self._cells.setdefault(column_family_id, {}) for column in chunk.row_contents.columns: cells = [Cell.from_pb(cell) for cell in column.cells] column_name = column.qualifier column_cells = column_family_dict.setdefault(column_name, []) column_cells.extend(cells)
[docs] def update_from_read_rows(self, read_rows_response_pb): """Updates the current row from a ``ReadRows`` response. :type read_rows_response_pb: :class:`._generated.bigtable_service_messages_pb2.ReadRowsResponse` :param read_rows_response_pb: A response streamed back as part of a ``ReadRows`` request. :raises: :class:`ValueError <exceptions.ValueError>` if the current partial row has already been committed, if the row key on the response doesn't match the current one or if there is a chunk encountered with an unexpected ``ONEOF`` protobuf property. """ if self._committed: raise ValueError('The row has been committed') if read_rows_response_pb.row_key != self.row_key: raise ValueError('Response row key (%r) does not match current ' 'one (%r).' % (read_rows_response_pb.row_key, self.row_key)) last_chunk_index = len(read_rows_response_pb.chunks) - 1 for index, chunk in enumerate(read_rows_response_pb.chunks): chunk_property = chunk.WhichOneof('chunk') if chunk_property == 'row_contents': self._handle_row_contents(chunk) elif chunk_property == 'reset_row': self._handle_reset_row(chunk) elif chunk_property == 'commit_row': self._handle_commit_row(chunk, index, last_chunk_index) else: # NOTE: This includes chunk_property == None since we always # want a value to be set raise ValueError('Unexpected chunk property: %s' % ( chunk_property,))
[docs]class PartialRowsData(object): """Convenience wrapper for consuming a ``ReadRows`` streaming response. :type response_iterator: :class:`grpc.framework.alpha._reexport._CancellableIterator` :param response_iterator: A streaming iterator returned from a ``ReadRows`` request. """ def __init__(self, response_iterator): # We expect an iterator of `data_messages_pb2.ReadRowsResponse` self._response_iterator = response_iterator self._rows = {} def __eq__(self, other): if not isinstance(other, self.__class__): return False return other._response_iterator == self._response_iterator def __ne__(self, other): return not self.__eq__(other) @property def rows(self): """Property returning all rows accumulated from the stream. :rtype: dict :returns: Dictionary of :class:`PartialRowData`. """ # NOTE: To avoid duplication large objects, this is just the # mutable private data. return self._rows
[docs] def cancel(self): """Cancels the iterator, closing the stream.""" self._response_iterator.cancel()
[docs] def consume_next(self): """Consumes the next ``ReadRowsResponse`` from the stream. Parses the response and stores it as a :class:`PartialRowData` in a dictionary owned by this object. :raises: :class:`StopIteration <exceptions.StopIteration>` if the response iterator has no more responses to stream. """ read_rows_response = self._response_iterator.next() row_key = read_rows_response.row_key partial_row = self._rows.get(row_key) if partial_row is None: partial_row = self._rows[row_key] = PartialRowData(row_key) # NOTE: This is not atomic in the case of failures. partial_row.update_from_read_rows(read_rows_response)
[docs] def consume_all(self, max_loops=None): """Consume the streamed responses until there are no more. This simply calls :meth:`consume_next` until there are no more to consume. :type max_loops: int :param max_loops: (Optional) Maximum number of times to try to consume an additional ``ReadRowsResponse``. You can use this to avoid long wait times. """ curr_loop = 0 if max_loops is None: max_loops = float('inf') while curr_loop < max_loops: curr_loop += 1 try: self.consume_next() except StopIteration: break