Repository URL to install this package:
|
Version:
0.4.1.1 ▾
|
import base64
from .exceptions import InvalidArgumentError
def compose_new_shard_iterator(stream_name, shard, shard_iterator_type, starting_sequence_number):
if shard_iterator_type == "AT_SEQUENCE_NUMBER":
last_sequence_id = int(starting_sequence_number) - 1
elif shard_iterator_type == "AFTER_SEQUENCE_NUMBER":
last_sequence_id = int(starting_sequence_number)
elif shard_iterator_type == "TRIM_HORIZON":
last_sequence_id = 0
elif shard_iterator_type == "LATEST":
last_sequence_id = shard.get_max_sequence_number()
else:
raise InvalidArgumentError("Invalid ShardIteratorType: {0}".format(shard_iterator_type))
return compose_shard_iterator(stream_name, shard, last_sequence_id)
def compose_shard_iterator(stream_name, shard, last_sequence_id):
return base64.encodestring(
"{0}:{1}:{2}".format(
stream_name,
shard.shard_id,
last_sequence_id,
).encode("utf-8")
).decode("utf-8")
def decompose_shard_iterator(shard_iterator):
return base64.decodestring(shard_iterator.encode("utf-8")).decode("utf-8").split(":")