Repository URL to install this package:
|
Version:
0.5.10.dev2+gb5d94239a ▾
|
# -*- coding: utf-8 -*-
# Copyright (c) 2021, University of Luxembourg / DHARPA project
#
# Mozilla Public License, version 2.0 (see LICENSE or https://www.mozilla.org/en-US/MPL/2.0/)
"""Implementation of interfaces for *Kiara*."""
import contextlib
import os
import sys
import uuid
from pathlib import Path
from typing import (
TYPE_CHECKING,
Any,
ClassVar,
Dict,
Iterable,
Iterator,
Literal,
Union,
)
from kiara.defaults import KIARA_CONFIG_FILE_NAME, KIARA_MAIN_CONFIG_FILE
from kiara.exceptions import KiaraException
from kiara.utils.cli import terminal_print
if TYPE_CHECKING:
from rich.console import Console
from kiara.context import Kiara
from kiara.context.config import KiaraConfig
from kiara.interfaces.python_api import KiaraAPI
# log = structlog.getLogger()
# Global console used by alternative print
_console: Union["Console", None] = None
def create_console(
width: Union[None, int],
color_system: Union[
None, Literal["auto", "standard", "256", "truecolor", "windows"]
] = None,
) -> "Console":
"""Create a console instance."""
from rich.console import COLOR_SYSTEMS
from rich.highlighter import RegexHighlighter
from rich.theme import Theme
STYLE_OPTION = "bold cyan"
STYLE_ARGUMENT = "bold cyan"
STYLE_SWITCH = "bold green"
STYLE_METAVAR = "bold yellow"
STYLE_METAVAR_SEPARATOR = "dim"
STYLE_USAGE = "yellow"
theme = Theme(
{
"option": STYLE_OPTION,
"argument": STYLE_ARGUMENT,
"switch": STYLE_SWITCH,
"metavar": STYLE_METAVAR,
"metavar_sep": STYLE_METAVAR_SEPARATOR,
"usage": STYLE_USAGE,
}
)
class OptionHighlighter(RegexHighlighter):
"""Highlights our special options."""
highlights: ClassVar = [ # type: ignore
r"(^|\W)(?P<switch>\-\w+)(?![a-zA-Z0-9])",
r"(^|\W)(?P<option>\-\-[\w\-]+)(?![a-zA-Z0-9])",
r"(^|\W)(?P<argument>[A-Z0-9\_]+)(?![_a-zA-Z0-9])",
r"(?P<metavar>\<[^\>]+\>)",
r"(?P<usage>Usage: )",
]
highlighter = OptionHighlighter()
FORCE_TERMINAL = (
True
if os.getenv("GITHUB_ACTIONS")
or os.getenv("FORCE_COLOR")
or os.getenv("PY_COLORS")
else None
)
console_width = None
if width is not None:
console_width = width
else:
_console_width = os.environ.get("CONSOLE_WIDTH", None)
if _console_width:
try:
console_width = int(_console_width)
except Exception:
pass
from rich.console import Console
if color_system is not None:
_color_system = color_system
else:
_color_system = "auto"
if _color_system != "auto" and _color_system not in COLOR_SYSTEMS.keys():
raise Exception(
f"Invalid color system '{_color_system}': available options: auto, {', '.join(COLOR_SYSTEMS.keys())}."
)
_console = Console(
width=console_width,
theme=theme,
highlighter=highlighter,
color_system=_color_system,
force_terminal=FORCE_TERMINAL,
)
return _console
def get_console() -> "Console":
"""
Get a global Console instance.
Returns:
-------
Console: A console instance.
"""
global _console
if _console is None:
_console = create_console(width=None, color_system=None)
return _console
@contextlib.contextmanager
def get_proxy_console(
width: Union[None, int] = None,
color_system: Union[
None, Literal["auto", "standard", "256", "truecolor", "windows"]
] = None,
restore_default_console: bool = True,
) -> Iterator["Console"]:
"""
Get a console that proxies a remote one.
This should only be used in a single-threaded way.
"""
global _console
default_console = get_console()
old_width = default_console.width
changed = False
changed_width = False
# TODO: maybe use thread-local?
if width is None and color_system is None:
result = default_console
else:
if width is None:
width = default_console.width
if color_system is None:
color_system = default_console.color_system # type: ignore
if color_system != default_console.color_system:
result = create_console(width=width, color_system=color_system)
_console = result
changed = True
elif width != default_console.width:
default_console.width = width
changed_width = True
else:
result = default_console
yield result
if changed and restore_default_console:
_console = default_console
if changed_width and restore_default_console:
default_console.width = old_width
def set_console_width(width: Union[int, None] = None, prefer_env: bool = True):
global _console
if prefer_env or not width:
_width: Union[None, int] = None
try:
_width = int(os.environ.get("CONSOLE_WIDTH", None)) # type: ignore
except Exception:
pass
if _width:
width = _width
if width:
try:
width = int(width)
except Exception as e:
import structlog
log = structlog.getLogger()
log.debug("invalid.console_width", error=str(e))
from rich.console import Console
_console = Console(width=width)
if not width:
if "google.colab" in sys.modules or "jupyter_client" in sys.modules:
width = 140
if width:
import rich
con = rich.get_console()
con.width = width
class KiaraAPIWrap(object):
def __init__(
self,
config: Union[str, None],
context: Union[str, None],
pipelines: Union[None, Iterable[str]] = None,
ensure_plugins: Union[str, Iterable[str], None] = None,
exit_process: bool = True,
):
if not context:
context = os.environ.get("KIARA_CONTEXT", None)
self._config: Union[str, None] = config
self._context: Union[str, None] = context
self._pipelines: Union[None, Iterable[str]] = pipelines
self._ensure_plugins: Union[str, Iterable[str], None] = ensure_plugins
self._kiara_config: Union["KiaraConfig", None] = None
self._api: Union[KiaraAPI, None] = None
self._reload_process_if_plugins_installed = True
self._items: Dict[str, Any] = {}
self._exit_process: bool = exit_process
@property
def kiara_context_name(self) -> str:
if not self._context:
self._context = self.kiara_config.default_context
return self._context
@property
def exit_process(self) -> bool:
return self._exit_process
@exit_process.setter
def exit_process(self, exit_process: bool):
self._exit_process = exit_process
def exit(self, msg: Union[None, Any] = None, exit_code: int = 1):
if self._exit_process:
if msg:
terminal_print(msg)
sys.exit(exit_code)
else:
if not msg:
msg = "An error occured."
raise KiaraException(str(msg))
@property
def current_kiara_context_id(self) -> uuid.UUID:
return self.kiara_api.context.id
@property
def kiara(self) -> "Kiara":
return self.kiara_api.context
@property
def kiara_config(self) -> "KiaraConfig":
if self._kiara_config is not None:
return self._kiara_config
from kiara.context.config import KiaraConfig
# kiara_config: Optional[KiaraConfig] = None
exists = False
create = False
if self._config:
config_path = Path(self._config)
if config_path.exists():
if config_path.is_file():
config_file_path = config_path
exists = True
else:
config_file_path = config_path / KIARA_CONFIG_FILE_NAME
if config_file_path.exists():
exists = True
else:
config_path.parent.mkdir(parents=True, exist_ok=True)
config_file_path = config_path
else:
config_file_path = Path(KIARA_MAIN_CONFIG_FILE)
if not config_file_path.exists():
create = True
exists = False
else:
exists = True
if not exists:
if not create:
from kiara.utils.cli import terminal_print
terminal_print()
terminal_print(
f"Can't create kiara context, specified config file does not exist: {self._config}."
)
sys.exit(1)
kiara_config = KiaraConfig()
kiara_config.save(config_file_path)
else:
kiara_config = KiaraConfig.load_from_file(config_file_path)
self._kiara_config = kiara_config
return self._kiara_config
def lock_file(self, context: str) -> str:
"""The path to the lock file for this context."""
return "asdf"
@property
def kiara_api(self) -> "KiaraAPI":
if self._api is not None:
return self._api
from kiara.utils import log_message
context = self._context
if not context:
context = self.kiara_config.default_context
from kiara.interfaces.python_api import KiaraAPI
api = KiaraAPI(kiara_config=self.kiara_config)
if self._ensure_plugins:
installed = api.ensure_plugin_packages(self._ensure_plugins, update=False)
if installed and self._reload_process_if_plugins_installed:
log_message(
"replacing.process",
reason="reloading this process, in order to pick up new plugin packages",
)
os.execvp(sys.executable, (sys.executable,) + tuple(sys.argv)) # noqa
import fasteners
fasteners.InterProcessReaderWriterLock(self.lock_file(context))
api.set_active_context(context, create=True)
if self._pipelines:
for pipeline in self._pipelines:
ops = api.context.operation_registry.register_pipelines(pipeline)
for op_id in ops.keys():
log_message("register.pipeline", operation_id=op_id)
self._api = api
return self._api
def add_item(self, key: str, item: Any):
self._items[key] = item
def get_item(self, key: str) -> Any:
if key not in self._items.keys():
raise ValueError(f"No item with key '{key}'")
return self._items[key]