Repository URL to install this package:
|
Version:
1.1.0 ▾
|
coolname
/
impl.py
|
|---|
"""
Do not import anything directly from this module.
"""
import hashlib
import itertools
import os
import os.path as op
import random
from random import randrange
import re
from .config import _CONF
from .exceptions import ConfigurationError, InitializationError
class AbstractNestedList(object):
def __init__(self, lists):
super(AbstractNestedList, self).__init__()
self._lists = [WordList(x) if x.__class__ is list else x
for x in lists]
# If this is set to True in a subclass,
# then subclass yields sequences instead of single words.
self.multiword = any(x.multiword for x in self._lists)
def __str__(self):
return '{}({}, len={})'.format(self.__class__.__name__, len(self._lists), self.length)
def __repr__(self):
return self.__str__()
def squash(self, hard, cache):
if len(self._lists) == 1:
return self._lists[0].squash(hard, cache)
else:
self._lists = [x.squash(hard, cache) for x in self._lists]
return self
def _dump(self, stream, indent='', object_ids=False):
stream.write(indent + _unicode(self) +
(' [id={}]'.format(id(self)) if object_ids else '') +
'\n')
indent += ' '
for sublist in self._lists:
sublist._dump(stream, indent, object_ids=object_ids)
# Poor man's `six`
try:
_unicode = unicode
_str_types = (str, _unicode) # pragma: nocover
except NameError:
_unicode = str
_str_types = str
# Convert value to bytes, for hashing
# (used to calculate WordList or PhraseList hash)
def _to_bytes(value):
if isinstance(value, _unicode):
return value.encode('utf-8')
elif isinstance(value, tuple):
return str(value).encode('utf-8')
else:
return value
class _BasicList(list, AbstractNestedList):
def __init__(self, sequence=None):
list.__init__(self, sequence)
AbstractNestedList.__init__(self, [])
self.length = len(self)
self.__hash = None
def __str__(self):
ls = [repr(x) for x in self[:4]]
if len(ls) == 4:
ls[3] = '...'
return '{}([{}], len={})'.format(self.__class__.__name__, ', '.join(ls), len(self))
def __repr__(self):
return self.__str__()
def squash(self, hard, cache):
return self
@property
def _hash(self):
if self.__hash is not None:
return self.__hash
md5 = hashlib.md5()
md5.update(_to_bytes(str(len(self))))
for x in self:
md5.update(_to_bytes(x))
self.__hash = md5.digest()
return self.__hash
class WordList(_BasicList):
"""List of single words."""
class PhraseList(_BasicList):
"""List of phrases (sequences of one or more words)."""
def __init__(self, sequence=None):
super(PhraseList, self).__init__(tuple(_split_phrase(x)) for x in sequence)
self.multiword = True
class WordAsPhraseWrapper(object):
multiword = True
def __init__(self, wordlist):
self._list = wordlist
self.length = len(wordlist)
def __len__(self):
return self.length
def __getitem__(self, i):
return (self._list[i], )
def squash(self, hard, cache):
return self
def __str__(self):
return '{}({})'.format(self.__class__.__name__, str(self._list))
def __repr__(self):
return '{}({})'.format(self.__class__.__name__, repr(self._list))
class NestedList(AbstractNestedList):
def __init__(self, lists):
super(NestedList, self).__init__(lists)
# If user mixes WordList and PhraseList in the same NestedList,
# we need to make sure that __getitem__ always returns tuple.
# For that, we wrap WordList instances.
if any(isinstance(x, WordList) for x in self._lists) and any(x.multiword for x in self._lists):
self._lists = [WordAsPhraseWrapper(x) if isinstance(x, WordList) else x for x in self._lists]
# Fattest lists first (to reduce average __getitem__ time)
self._lists.sort(key=lambda x: -x.length)
self.length = sum(x.length for x in self._lists)
def __getitem__(self, i):
# Retrieve item from appropriate list
for x in self._lists:
n = x.length
if i < n:
return x[i]
else:
i -= n
raise IndexError('list index out of range')
def squash(self, hard, cache):
# Cache is used to avoid data duplication.
# If we have 4 branches which finally point to the same list of nouns,
# why not using the same WordList instance for all 4 branches?
# This optimization is also applied to PhraseLists, just in case.
result = super(NestedList, self).squash(hard, cache)
if result is self and hard:
for cls in (WordList, PhraseList):
if all(isinstance(x, cls) for x in self._lists):
# Creating combined WordList/PhraseList and then checking cache
# is a little wasteful, but it has no long-term consequences.
# And it's simple!
result = cls(sorted(set(itertools.chain.from_iterable(self._lists))))
if result._hash in cache:
result = cache.get(result._hash)
else:
cache[result._hash] = result
return result
class CartesianList(AbstractNestedList):
def __init__(self, lists):
super(CartesianList, self).__init__(lists)
self.length = 1
for x in self._lists:
self.length *= x.length
# Let's say list lengths are 5, 7, 11, 13.
# divs = [7*11*13, 11*13, 13, 1]
divs = [1]
prod = 1
for x in reversed(self._lists[1:]):
prod *= x.length
divs.append(prod)
self._list_divs = tuple(zip(self._lists, reversed(divs)))
self.multiword = True
def __getitem__(self, i):
result = []
for sublist, n in self._list_divs:
x = sublist[i // n]
if sublist.multiword:
result.extend(x)
else:
result.append(x)
i %= n
return result
class Scalar(AbstractNestedList):
def __init__(self, value):
super(Scalar, self).__init__([])
self.value = value
self.length = 1
def __getitem__(self, i):
return self.value
def __str__(self):
return '{}(value={!r})'.format(self.__class__.__name__, self.value)
def random(self):
return self.value
class RandomGenerator(object):
"""
This class provides random name generation interface.
Create an instance of this class if you want to create custom
configuration.
If default implementation is enough, just use `generate`,
`generate_slug` and other exported functions.
"""
def __init__(self, config, rand=None):
self.random = rand # sets _random and _randrange
config = dict(config)
_validate_config(config)
lists = {}
_create_lists(config, lists, 'all', [])
self._lists = {}
for key, listdef in config.items():
# Other generators independent from 'all'
if listdef.get(_CONF.FIELD.GENERATOR) and key not in lists:
_create_lists(config, lists, key, [])
if (key == 'all' or key.isdigit() or listdef.get(_CONF.FIELD.GENERATOR)):
if key.isdigit():
pattern = int(key)
elif key == 'all':
pattern = None
else:
pattern = key
self._lists[pattern] = lists[key]
self._lists[None] = self._lists[None].squash(True, {})
# Should we avoid duplicates?
try:
ensure_unique = config['all'][_CONF.FIELD.ENSURE_UNIQUE]
if not isinstance(ensure_unique, bool):
raise ValueError('expected boolean, got {!r}'.format(ensure_unique))
self._ensure_unique = ensure_unique
except KeyError:
self._ensure_unique = False
except ValueError as ex:
raise ConfigurationError('Invalid {} value: {}'
.format(_CONF.FIELD.ENSURE_UNIQUE, ex))
# Should we avoid duplicating prefixes?
try:
self._check_prefix = int(config['all'][_CONF.FIELD.ENSURE_UNIQUE_PREFIX])
if self._check_prefix <= 0:
raise ValueError('expected a positive integer, got {!r}'.format(self._check_prefix))
except KeyError:
self._check_prefix = None
except ValueError as ex:
raise ConfigurationError('Invalid {} value: {}'
.format(_CONF.FIELD.ENSURE_UNIQUE_PREFIX, ex))
# Get max slug length
try:
self._max_slug_length = int(config['all'][_CONF.FIELD.MAX_SLUG_LENGTH])
except KeyError:
self._max_slug_length = None
except ValueError as ex:
raise ConfigurationError('Invalid {} value: {}'
.format(_CONF.FIELD.MAX_SLUG_LENGTH, ex))
# Make sure that generate() does not go into long loop.
# Default generator is a special case, we don't need check.
if (not config['all'].get('__nocheck') and
self._ensure_unique or self._check_prefix or self._max_slug_length):
self._check_not_hanging()
# Fire it up
assert self.generate_slug()
@property
def random(self):
return self._random
@random.setter
def random(self, rand):
if rand:
self._random = rand
else:
self._random = random
self._randrange = self._random.randrange
def generate(self, pattern=None):
"""
Generates and returns random name as a list of strings.
"""
lst = self._lists[pattern]
while True:
result = lst[self._randrange(lst.length)]
# 1. Check that there are no duplicates
# 2. Check that there are no duplicate prefixes
# 3. Check max slug length
n = len(result)
if (self._ensure_unique and len(set(result)) != n or
self._check_prefix and len(set(x[:self._check_prefix] for x in result)) != n or
self._max_slug_length and sum(len(x) for x in result) + n - 1 > self._max_slug_length):
continue
return result
def generate_slug(self, pattern=None):
"""
Generates and returns random name as a slug.
"""
return '-'.join(self.generate(pattern))
def get_combinations_count(self, pattern=None):
"""
Returns total number of unique combinations
for the given pattern.
"""
lst = self._lists[pattern]
return lst.length
def _dump(self, stream, pattern=None, object_ids=False):
"""Dumps current tree into a text stream."""
return self._lists[pattern]._dump(stream, '', object_ids=object_ids)
def _check_not_hanging(self):
"""
Rough check that generate() will not hang or be very slow.
Raises ConfigurationError if generate() spends too much time in retry loop.
Issues a warning.warn() if there is a risk of slowdown.
"""
# (field_name, predicate, warning_msg, exception_msg)
# predicate(g) is a function that returns True if generated combination g must be rejected,
# see checks in generate()
checks = []
# ensure_unique can lead to infinite loops for some tiny erroneous configs
if self._ensure_unique:
checks.append((
_CONF.FIELD.ENSURE_UNIQUE,
self._ensure_unique,
lambda g: len(set(g)) != len(g),
'{generate} may be slow because a significant fraction of combinations contain repeating words and {field_name} is set', # noqa
'Impossible to generate with {field_name}'
))
#
# max_slug_length can easily slow down or block generation if set too small
if self._max_slug_length:
checks.append((
_CONF.FIELD.MAX_SLUG_LENGTH,
self._max_slug_length,
lambda g: sum(len(x) for x in g) + len(g) - 1 > self._max_slug_length,
'{generate} may be slow because a significant fraction of combinations exceed {field_name}={field_value}', # noqa
'Impossible to generate with {field_name}={field_value}'
))
# Perform the relevant checks for all generators, starting from 'all'
n = 100
warning_treshold = 20 # fail probability: 0.04 for 2 attempts, 0.008 for 3 attempts, etc.
for lst_id, lst in sorted(self._lists.items(), key=lambda x: '' if x is None else str(x)):
context = {'generate': 'coolname.generate({})'.format('' if lst_id is None else repr(lst_id))}
# For each generator, perform checks
for field_name, field_value, predicate, warning_msg, exception_msg in checks:
context.update({'field_name': field_name, 'field_value': field_value})
bad_count = 0
for i in range(n):
g = lst[randrange(lst.length)]
if predicate(g):
bad_count += 1
if bad_count >= n:
raise ConfigurationError(exception_msg.format(**context))
elif bad_count >= warning_treshold:
import warnings
warnings.warn(warning_msg.format(**context))
def _is_str(value):
return value.__class__.__name__ in ('str', 'unicode')
# Translate phrases defined as strings to tuples
def _split_phrase(x):
if isinstance(x, _str_types):
return re.split(_unicode(r'\s+'), x.strip())
else:
return x
def _validate_config(config):
"""
A big and ugly method for config validation.
It would be nice to use cerberus, but we don't
want to introduce dependencies just for that.
"""
try:
referenced_sublists = set()
for key, listdef in list(config.items()):
# Check if section is a list
if not isinstance(listdef, dict):
raise ValueError('Value at key {!r} is not a dict'
.format(key))
# Check if it has correct type
if _CONF.FIELD.TYPE not in listdef:
raise ValueError('Config at key {!r} has no {!r}'
.format(key, _CONF.FIELD.TYPE))
# Nested or Cartesian
if listdef[_CONF.FIELD.TYPE] in (_CONF.TYPE.NESTED, _CONF.TYPE.CARTESIAN):
sublists = listdef.get(_CONF.FIELD.LISTS)
if sublists is None:
raise ValueError('Config at key {!r} has no {!r}'
.format(key, _CONF.FIELD.LISTS))
if (not isinstance(sublists, list) or not sublists or
not all(_is_str(x) for x in sublists)):
raise ValueError('Config at key {!r} has invalid {!r}'
.format(key, _CONF.FIELD.LISTS))
referenced_sublists.update(sublists)
# Const
elif listdef[_CONF.FIELD.TYPE] == _CONF.TYPE.CONST:
try:
value = listdef[_CONF.FIELD.VALUE]
except KeyError:
raise ValueError('Config at key {!r} has no {!r}'
.format(key, _CONF.FIELD.VALUE))
if not _is_str(value):
raise ValueError('Config at key {!r} has invalid {!r}'
.format(key, _CONF.FIELD.VALUE))
# Words
elif listdef[_CONF.FIELD.TYPE] == _CONF.TYPE.WORDS:
try:
words = listdef[_CONF.FIELD.WORDS]
except KeyError:
raise ValueError('Config at key {!r} has no {!r}'
.format(key, _CONF.FIELD.WORDS))
if not isinstance(words, list) or not words:
raise ValueError('Config at key {!r} has invalid {!r}'
.format(key, _CONF.FIELD.WORDS))
# Validate word length
try:
max_length = int(listdef[_CONF.FIELD.MAX_LENGTH])
except KeyError:
max_length = None
if max_length is not None:
for word in words:
if len(word) > max_length:
raise ValueError('Config at key {!r} has invalid word {!r} '
'(longer than {} characters)'
.format(key, word, max_length))
# Phrases (sequences of one or more words)
elif listdef[_CONF.FIELD.TYPE] == _CONF.TYPE.PHRASES:
try:
phrases = listdef[_CONF.FIELD.PHRASES]
except KeyError:
raise ValueError('Config at key {!r} has no {!r}'
.format(key, _CONF.FIELD.PHRASES))
if not isinstance(phrases, list) or not phrases:
raise ValueError('Config at key {!r} has invalid {!r}'
.format(key, _CONF.FIELD.PHRASES))
# Validate multi-word and max length
try:
number_of_words = int(listdef[_CONF.FIELD.NUMBER_OF_WORDS])
except KeyError:
number_of_words = None
try:
max_length = int(listdef[_CONF.FIELD.MAX_LENGTH])
except KeyError:
max_length = None
for phrase in phrases:
phrase = _split_phrase(phrase) # str -> sequence, if necessary
if not isinstance(phrase, (tuple, list)) or not all(isinstance(x, _str_types) for x in phrase):
raise ValueError('Config at key {!r} has invalid {!r}: '
'must be all string/tuple/list'
.format(key, _CONF.FIELD.PHRASES))
if number_of_words is not None and len(phrase) != number_of_words:
raise ValueError('Config at key {!r} has invalid phrase {!r} '
'({} word(s) but {}={})'
.format(key, ' '.join(phrase),
len(phrase), _CONF.FIELD.NUMBER_OF_WORDS, number_of_words))
if max_length is not None and sum(len(word) for word in phrase) > max_length:
raise ValueError('Config at key {!r} has invalid phrase {!r} '
'(longer than {} characters)'
.format(key, ' '.join(phrase), max_length))
else:
raise ValueError('Config at key {!r} has invalid {!r}'
.format(key, _CONF.FIELD.TYPE))
# Check that all sublists are defined
diff = referenced_sublists.difference(config.keys())
if diff:
raise ValueError('Lists are referenced but not defined: {}'
.format(', '.join(sorted(diff)[:10])))
except (KeyError, ValueError) as ex:
raise ConfigurationError(str(ex))
def _create_lists(config, results, current, stack, inside_cartesian=None):
"""
An ugly recursive method to transform config dict
into a tree of AbstractNestedList.
"""
# Have we done it already?
try:
return results[current]
except KeyError:
pass
# Check recursion depth and detect loops
if current in stack:
raise ConfigurationError('Rule {!r} is recursive: {!r}'.format(stack[0], stack))
if len(stack) > 99:
raise ConfigurationError('Rule {!r} is too deep'.format(stack[0]))
# Track recursion depth
stack.append(current)
try:
# Check what kind of list we have
listdef = config[current]
list_type = listdef[_CONF.FIELD.TYPE]
# 1. List of words
if list_type == _CONF.TYPE.WORDS:
results[current] = WordList(listdef['words'])
# List of phrases
elif list_type == _CONF.TYPE.PHRASES:
results[current] = PhraseList(listdef['phrases'])
# 2. Simple list of lists
elif list_type == _CONF.TYPE.NESTED:
results[current] = NestedList([_create_lists(config, results, x, stack,
inside_cartesian=inside_cartesian)
for x in listdef[_CONF.FIELD.LISTS]])
# 3. Cartesian list of lists
elif list_type == _CONF.TYPE.CARTESIAN:
if inside_cartesian is not None:
raise ConfigurationError("Cartesian list {!r} contains another Cartesian list "
"{!r}. Nested Cartesian lists are not allowed."
.format(inside_cartesian, current))
results[current] = CartesianList([_create_lists(config, results, x, stack,
inside_cartesian=current)
for x in listdef[_CONF.FIELD.LISTS]])
# 4. Scalar
elif list_type == _CONF.TYPE.CONST:
results[current] = Scalar(listdef[_CONF.FIELD.VALUE])
# Unknown type
else:
raise InitializationError("Unknown list type: {!r}".format(list_type))
# Return the result
return results[current]
finally:
stack.pop()
def _create_default_generator():
data_dir = os.getenv('COOLNAME_DATA_DIR')
data_module = os.getenv('COOLNAME_DATA_MODULE')
if not data_dir and not data_module:
data_dir = op.join(op.dirname(op.abspath(__file__)), 'data')
data_module = 'coolname.data' # used when imported from egg; consumes more memory
if data_dir and op.isdir(data_dir):
from coolname.loader import load_config
config = load_config(data_dir)
elif data_module:
import importlib
config = importlib.import_module(data_module).config
else:
raise ImportError('Configure valid COOLNAME_DATA_DIR and/or COOLNAME_DATA_MODULE')
config['all']['__nocheck'] = True
return RandomGenerator(config)
# Default generator is a global object
_default = _create_default_generator()
# Global functions are actually methods of the default generator.
# (most users don't care about creating generator instances)
generate = _default.generate
generate_slug = _default.generate_slug
get_combinations_count = _default.get_combinations_count
def replace_random(rand):
"""Replaces random number generator for the default RandomGenerator instance."""
_default.random = rand