# Copyright (c) 2010 Chris Moyer http://coredumped.org/
#
# Permission is hereby granted, free of charge, to any person obtaining a
# copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish, dis-
# tribute, sublicense, and/or sell copies of the Software, and to permit
# persons to whom the Software is furnished to do so, subject to the fol-
# lowing conditions:
#
# The above copyright notice and this permission notice shall be included
# in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# IN THE SOFTWARE.
from boto.exception import SDBResponseError
from boto.compat import six
class SequenceGenerator(object):
"""Generic Sequence Generator object, this takes a single
string as the "sequence" and uses that to figure out
what the next value in a string is. For example
if you give "ABC" and pass in "A" it will give you "B",
and if you give it "C" it will give you "AA".
If you set "rollover" to True in the above example, passing
in "C" would give you "A" again.
The Sequence string can be a string or any iterable
that has the "index" function and is indexable.
"""
__name__ = "SequenceGenerator"
def __init__(self, sequence_string, rollover=False):
"""Create a new SequenceGenerator using the sequence_string
as how to generate the next item.
:param sequence_string: The string or list that explains
how to generate the next item in the sequence
:type sequence_string: str,iterable
:param rollover: Rollover instead of incrementing when
we hit the end of the sequence
:type rollover: bool
"""
self.sequence_string = sequence_string
self.sequence_length = len(sequence_string[0])
self.rollover = rollover
self.last_item = sequence_string[-1]
self.__name__ = "%s('%s')" % (self.__class__.__name__, sequence_string)
def __call__(self, val, last=None):
"""Get the next value in the sequence"""
# If they pass us in a string that's not at least
# the lenght of our sequence, then return the
# first element in our sequence
if val is None or len(val) < self.sequence_length:
return self.sequence_string[0]
last_value = val[-self.sequence_length:]
if (not self.rollover) and (last_value == self.last_item):
val = "%s%s" % (self(val[:-self.sequence_length]), self._inc(last_value))
else:
val = "%s%s" % (val[:-self.sequence_length], self._inc(last_value))
return val
def _inc(self, val):
"""Increment a single value"""
assert(len(val) == self.sequence_length)
return self.sequence_string[(self.sequence_string.index(val) + 1) % len(self.sequence_string)]
#
# Simple Sequence Functions
#
def increment_by_one(cv=None, lv=None):
if cv is None:
return 0
return cv + 1
def double(cv=None, lv=None):
if cv is None:
return 1
return cv * 2
def fib(cv=1, lv=0):
"""The fibonacci sequence, this incrementer uses the
last value"""
if cv is None:
cv = 1
if lv is None:
lv = 0
return cv + lv
increment_string = SequenceGenerator("ABCDEFGHIJKLMNOPQRSTUVWXYZ")
class Sequence(object):
"""A simple Sequence using the new SDB "Consistent" features
Based largly off of the "Counter" example from mitch garnaat:
http://bitbucket.org/mitch/stupidbototricks/src/tip/counter.py"""
def __init__(self, id=None, domain_name=None, fnc=increment_by_one, init_val=None):
"""Create a new Sequence, using an optional function to
increment to the next number, by default we just increment by one.
Every parameter here is optional, if you don't specify any options
then you'll get a new SequenceGenerator with a random ID stored in the
default domain that increments by one and uses the default botoweb
environment
:param id: Optional ID (name) for this counter
:type id: str
:param domain_name: Optional domain name to use, by default we get this out of the
environment configuration
:type domain_name:str
:param fnc: Optional function to use for the incrementation, by default we just increment by one
There are several functions defined in this module.
Your function must accept "None" to get the initial value
:type fnc: function, str
:param init_val: Initial value, by default this is the first element in your sequence,
but you can pass in any value, even a string if you pass in a function that uses
strings instead of ints to increment
"""
self._db = None
self._value = None
self.last_value = None
self.domain_name = domain_name
self.id = id
if init_val is None:
init_val = fnc(init_val)
if self.id is None:
import uuid
self.id = str(uuid.uuid4())
self.item_type = type(fnc(None))
self.timestamp = None
# Allow us to pass in a full name to a function
if isinstance(fnc, six.string_types):
from boto.utils import find_class
fnc = find_class(fnc)
self.fnc = fnc
# Bootstrap the value last
if not self.val:
self.val = init_val
def set(self, val):
"""Set the value"""
import time
now = time.time()
expected_value = []
new_val = {}
new_val['timestamp'] = now
if self._value is not None:
new_val['last_value'] = self._value
expected_value = ['current_value', str(self._value)]
new_val['current_value'] = val
try:
self.db.put_attributes(self.id, new_val, expected_value=expected_value)
self.timestamp = new_val['timestamp']
except SDBResponseError as e:
if e.status == 409:
raise ValueError("Sequence out of sync")
else:
raise
def get(self):
"""Get the value"""
val = self.db.get_attributes(self.id, consistent_read=True)
if val:
if 'timestamp' in val:
self.timestamp = val['timestamp']
if 'current_value' in val:
self._value = self.item_type(val['current_value'])
if "last_value" in val and val['last_value'] is not None:
self.last_value = self.item_type(val['last_value'])
return self._value
val = property(get, set)
def __repr__(self):
return "%s('%s', '%s', '%s.%s', '%s')" % (
self.__class__.__name__,
self.id,
self.domain_name,
self.fnc.__module__, self.fnc.__name__,
self.val)
def _connect(self):
"""Connect to our domain"""
if not self._db:
import boto
sdb = boto.connect_sdb()
if not self.domain_name:
self.domain_name = boto.config.get("DB", "sequence_db", boto.config.get("DB", "db_name", "default"))
try:
self._db = sdb.get_domain(self.domain_name)
except SDBResponseError as e:
if e.status == 400:
self._db = sdb.create_domain(self.domain_name)
else:
raise
return self._db
db = property(_connect)
def next(self):
self.val = self.fnc(self.val, self.last_value)
return self.val
def delete(self):
"""Remove this sequence"""
self.db.delete_attributes(self.id)