class ResultSet(object):
"""
A class used to lazily handle page-to-page navigation through a set of
results.
It presents a transparent iterator interface, so that all the user has
to do is use it in a typical ``for`` loop (or list comprehension, etc.)
to fetch results, even if they weren't present in the current page of
results.
This is used by the ``Table.query`` & ``Table.scan`` methods.
Example::
>>> users = Table('users')
>>> results = ResultSet()
>>> results.to_call(users.query, username__gte='johndoe')
# Now iterate. When it runs out of results, it'll fetch the next page.
>>> for res in results:
... print res['username']
"""
def __init__(self, max_page_size=None):
super(ResultSet, self).__init__()
self.the_callable = None
self.call_args = []
self.call_kwargs = {}
self._results = []
self._offset = -1
self._results_left = True
self._last_key_seen = None
self._fetches = 0
self._max_page_size = max_page_size
self._limit = None
@property
def first_key(self):
return 'exclusive_start_key'
def _reset(self):
"""
Resets the internal state of the ``ResultSet``.
This prevents results from being cached long-term & consuming
excess memory.
Largely internal.
"""
self._results = []
self._offset = 0
def __iter__(self):
return self
def __next__(self):
self._offset += 1
if self._offset >= len(self._results):
if self._results_left is False:
raise StopIteration()
self.fetch_more()
# It's possible that previous call to ``fetch_more`` may not return
# anything useful but there may be more results. Loop until we get
# something back, making sure we guard for no results left.
while not len(self._results) and self._results_left:
self.fetch_more()
if self._offset < len(self._results):
if self._limit is not None:
self._limit -= 1
if self._limit < 0:
raise StopIteration()
return self._results[self._offset]
else:
raise StopIteration()
next = __next__
def to_call(self, the_callable, *args, **kwargs):
"""
Sets up the callable & any arguments to run it with.
This is stored for subsequent calls so that those queries can be
run without requiring user intervention.
Example::
# Just an example callable.
>>> def squares_to(y):
... for x in range(1, y):
... yield x**2
>>> rs = ResultSet()
# Set up what to call & arguments.
>>> rs.to_call(squares_to, y=3)
"""
if not callable(the_callable):
raise ValueError(
'You must supply an object or function to be called.'
)
# We pop the ``limit``, if present, to track how many we should return
# to the user. This isn't the same as the ``limit`` that the low-level
# DDB api calls use (which limit page size, not the overall result set).
self._limit = kwargs.pop('limit', None)
if self._limit is not None and self._limit < 0:
self._limit = None
self.the_callable = the_callable
self.call_args = args
self.call_kwargs = kwargs
def fetch_more(self):
"""
When the iterator runs out of results, this method is run to re-execute
the callable (& arguments) to fetch the next page.
Largely internal.
"""
self._reset()
args = self.call_args[:]
kwargs = self.call_kwargs.copy()
if self._last_key_seen is not None:
kwargs[self.first_key] = self._last_key_seen
# If the page size is greater than limit set them
# to the same value
if self._limit and self._max_page_size and self._max_page_size > self._limit:
self._max_page_size = self._limit
# Put in the max page size.
if self._max_page_size is not None:
kwargs['limit'] = self._max_page_size
elif self._limit is not None:
# If max_page_size is not set and limit is available
# use it as the page size
kwargs['limit'] = self._limit
results = self.the_callable(*args, **kwargs)
self._fetches += 1
new_results = results.get('results', [])
self._last_key_seen = results.get('last_key', None)
if len(new_results):
self._results.extend(results['results'])
# Check the limit, if it's present.
if self._limit is not None and self._limit >= 0:
limit = self._limit
limit -= len(results['results'])
# If we've exceeded the limit, we don't have any more
# results to look for.
if limit <= 0:
self._results_left = False
if self._last_key_seen is None:
self._results_left = False
class BatchGetResultSet(ResultSet):
def __init__(self, *args, **kwargs):
self._keys_left = kwargs.pop('keys', [])
self._max_batch_get = kwargs.pop('max_batch_get', 100)
super(BatchGetResultSet, self).__init__(*args, **kwargs)
def fetch_more(self):
self._reset()
args = self.call_args[:]
kwargs = self.call_kwargs.copy()
# Slice off the max we can fetch.
kwargs['keys'] = self._keys_left[:self._max_batch_get]
self._keys_left = self._keys_left[self._max_batch_get:]
if len(self._keys_left) <= 0:
self._results_left = False
results = self.the_callable(*args, **kwargs)
if not len(results.get('results', [])):
return
self._results.extend(results['results'])
for offset, key_data in enumerate(results.get('unprocessed_keys', [])):
# We've got an unprocessed key. Reinsert it into the list.
# DynamoDB only returns valid keys, so there should be no risk of
# missing keys ever making it here.
self._keys_left.insert(offset, key_data)
if len(self._keys_left) > 0:
self._results_left = True
# Decrease the limit, if it's present.
if self.call_kwargs.get('limit'):
self.call_kwargs['limit'] -= len(results['results'])