Repository URL to install this package:
|
Version:
2.39.3 ▾
|
"""Generic utility functions that do not fit elsewhere."""
from colour import Color
from contextlib import contextmanager
from datetime import datetime
from functools import partial
from itertools import islice
from operator import mul
from typing import (
Any,
Callable,
Iterable,
Iterator,
Optional,
Sequence,
TypeVar,
)
__all__ = (
"clamp",
"color_to_rgb565",
"color_to_rgb8_triplet",
"consecutive_pairs",
"constant",
"datetime_to_unix_timestamp",
"divide_by",
"identity",
"is_timezone_aware",
"itersubclasses",
"longest_common_prefix",
"maybe_round",
"multiply_by",
"nop",
"once",
"optional_float",
"optional_int",
"overridden",
"rename_keys",
"to_uppercase_string",
"use",
)
T = TypeVar("T")
def chunks(it: Iterable[T], size: int) -> Iterator[tuple[T]]:
"""Takes an iterator or iterable and returns an iterator that yields chunks
of at most the given size from the input.
"""
it = iter(it)
return iter(lambda: tuple(islice(it, size)), ())
def clamp(value: T, lo: T, hi: T) -> T:
"""Clamps the given value between a minimum and a maximum allowed value
(both inclusive).
"""
return max(min(value, hi), lo) # type: ignore
def color_to_rgb565(color: Color) -> int:
"""Converts a color object into its RGB565 representation.
Parameters:
color: the color to convert
Returns:
the color in its RGB565 representation
"""
red, green, blue = color_to_rgb8_triplet(color)
return (
(((red >> 3) & 0x1F) << 11)
+ (((green >> 2) & 0x3F) << 5)
+ ((blue >> 3) & 0x1F)
)
def color_to_rgb8_triplet(color: Color) -> tuple[int, int, int]:
"""Converts a color object into its RGB8 triplet representation.
Parameters:
color: the color to convert
Returns:
the color in its RGB8 triplet representation
"""
return tuple(round(x * 255) for x in color.rgb) # type: ignore
T = TypeVar("T")
def consecutive_pairs(
iterable: Iterable[T], cyclic: bool = False
) -> Iterable[tuple[T, T]]:
"""Given an iterable, returns a generator that generates consecutive pairs
of objects from the iterable.
Parameters:
iterable: the iterable
Yields:
pairs of consecutive items from the iterable
cyclic (bool): whether the iterable should be considered "cyclic".
If this argument is ``True``, the function will yield a pair
consisting of the last element of the iterable paired with
the first one at the end.
"""
it = iter(iterable)
try:
prev = next(it)
except StopIteration:
return
first = prev if cyclic else None
try:
while True:
curr = next(it)
yield prev, curr
prev = curr
except StopIteration:
pass
if cyclic:
assert first is not None # to help the type inference
yield prev, first
def constant(x: Any) -> Callable[..., Any]:
"""Function factory that returns a function that accepts an arbitrary
number of arguments and always returns the same constant.
Parameters:
x (object): the constant to return
Returns:
callable: a function that always returns the given constant,
irrespectively of its input
"""
def func(*args, **kwds) -> Any:
return x
return func
def datetime_to_unix_timestamp(dt: datetime) -> float:
"""Converts a Python datetime object to a Unix timestamp, expressed in
the number of seconds since the Unix epoch.
The datetime object must be timezone-aware to avoid confusion with the
time zones.
Parameters:
dt: the Python datetime object
Returns:
the time elapsed since the Unix epoch, in seconds
Raises:
ValueError: if the given datetime is not timezone-aware
"""
if not is_timezone_aware(dt):
raise ValueError("datetime object must be timezone-aware")
return dt.timestamp()
def divide_by(value: float) -> Callable[[float], float]:
"""Returns a function that divides every number received as an input
with the given value.
"""
return partial(mul, 1.0 / value)
def identity(obj: Any) -> Any:
"""Identity function that returns its input argument."""
return obj
def is_timezone_aware(dt: datetime) -> bool:
"""Checks whether the given Python datetime object is timezone-aware
or not.
Parameters:
dt: the Python datetime object
Returns:
``True`` if the given object is timezone-aware, ``False`` otherwise
"""
return dt.tzinfo is not None and dt.tzinfo.utcoffset(dt) is not None
def itersubclasses(cls):
"""Iterates over all the subclasses of the given class in a depth-first
manner.
Parameters:
cls (type): the (new-style) Python class whose subclasses we are
iterating over
Yields:
type: the subclasses of the given class in DFS order, including
the class itself.
"""
queue = [cls]
while queue:
cls = queue.pop()
yield cls
queue.extend(cls.__subclasses__())
def longest_common_prefix(strings: Sequence[str]) -> str:
"""Finds the longest common prefix of a sequence of strings."""
if not strings:
return ""
shortest_string = min(strings, key=len)
for i, char in enumerate(shortest_string):
for other in strings:
if other[i] != char:
return shortest_string[:i]
return shortest_string
def maybe_round(value: Optional[float], ndigits: int = 0) -> Optional[float]:
"""Rounds the given value to the given number of digits if it is not
``None``; returns ``None`` otherwise.
"""
return round(value, ndigits) if value is not None else None
def multiply_by(term: float) -> Callable[[float], float]:
"""Returns a function that multiplies every number received as an input
with the given term.
"""
return partial(mul, term)
def nop(*args, **kwds):
"""Dummy function that can be called with any number of arguments and
does not return anything.
"""
pass
def once(func):
"""Decorator that decorates a function and allows it to be called only
once. Subsequent attempts to call the function will throw an exception.
"""
def wrapped(*args, **kwds):
if wrapped.called:
raise RuntimeError("{!r} can be called only once".format(func))
wrapped.called = True
return func(*args, **kwds)
wrapped.called = False
return wrapped
def optional_float(x: Any) -> Optional[float]:
"""Converts the given value into a float, unless it is `None`, in which
case it is returned intact.
Raises:
ValueError: if the given value cannot be converted into a float
"""
return float(x) if x is not None else None
def optional_int(x: Any) -> Optional[int]:
"""Converts the given value into an integer, unless it is `None`, in which
case it is returned intact.
Raises:
ValueError: if the given value cannot be converted into an integer
"""
return int(x) if x is not None else None
@contextmanager
def overridden(obj: Any, **kwds):
"""Context manager that updates an object or dictionary with some key-value
pairs, restoring the original values in the object or dictionary when the
context is exited.
When the input object is a dictionary, the given keyword arguments will be
registered as keys and values (obviously). When the input object is _not_
a dictionary, the given keyword arguments will be set on the object as
_attributes_. In both cases, the original values are restored when the
context exits.
"""
names = list(kwds.keys()) if kwds else []
originals = {}
is_dict = isinstance(obj, dict)
try:
if is_dict:
for name in names:
if name in obj:
originals[name] = obj[name]
obj[name] = kwds[name]
else:
for name in names:
if hasattr(obj, name):
originals[name] = getattr(obj, name)
setattr(obj, name, kwds[name])
yield
finally:
if is_dict:
for name in names:
if name in originals:
obj[name] = originals[name]
else:
del obj[name]
else:
for name in names:
if name in originals:
setattr(obj, name, originals[name])
else:
delattr(obj, name)
K = TypeVar("K")
V = TypeVar("V")
def rename_keys(
mapping: dict[K, K], *, copy: bool = False
) -> Callable[[dict[K, V]], dict[K, V]]:
"""Factory function that creates a mapper function that renames keys in
a dictionary.
"""
if not copy:
def in_place_mapper(input: dict[K, V]) -> dict[K, V]:
for old, new in mapping.items():
if old in input:
input[new] = input.pop(old)
return input
return in_place_mapper
else:
def mapper(input: dict[K, V]) -> dict[K, V]:
return {mapping.get(k, k): v for k, v in input.items()}
return mapper
def to_uppercase_string(value: Any) -> str:
"""Converts the given value into a string and casts the string into
uppercase.
"""
return str(value).upper()
@contextmanager
def use(disposer: Callable[[], None]):
"""Context manager that receives a disposer function and calls it when
exiting the context.
Typically it should be used with a function that returns a disposer to
ensure that the returned disposer is called when exiting the context.
"""
try:
yield
finally:
disposer()