Repository URL to install this package:
|
Version:
3.0.0.dev0 ▾
|
import json
import logging
import os
import shlex
import subprocess
import sys
from typing import Dict, List, Optional
from ray._private.services import get_ray_jars_dir
from ray._private.utils import update_envs
from ray.core.generated.common_pb2 import Language
from ray.util.annotations import DeveloperAPI
logger = logging.getLogger(__name__)
@DeveloperAPI
class RuntimeEnvContext:
"""A context used to describe the created runtime env."""
def __init__(
self,
command_prefix: List[str] = None,
env_vars: Dict[str, str] = None,
py_executable: Optional[str] = None,
override_worker_entrypoint: Optional[str] = None,
java_jars: List[str] = None,
):
self.command_prefix = command_prefix or []
self.env_vars = env_vars or {}
self.py_executable = py_executable or sys.executable
self.override_worker_entrypoint: Optional[str] = override_worker_entrypoint
self.java_jars = java_jars or []
def serialize(self) -> str:
return json.dumps(self.__dict__)
@staticmethod
def deserialize(json_string):
return RuntimeEnvContext(**json.loads(json_string))
def exec_worker(self, passthrough_args: List[str], language: Language):
update_envs(self.env_vars)
if language == Language.PYTHON and sys.platform == "win32":
executable = [self.py_executable]
elif language == Language.PYTHON:
executable = ["exec", self.py_executable]
elif language == Language.JAVA:
executable = ["java"]
ray_jars = os.path.join(get_ray_jars_dir(), "*")
local_java_jars = []
for java_jar in self.java_jars:
local_java_jars.append(f"{java_jar}/*")
local_java_jars.append(java_jar)
class_path_args = ["-cp", ray_jars + ":" + str(":".join(local_java_jars))]
passthrough_args = class_path_args + passthrough_args
elif sys.platform == "win32":
executable = []
else:
executable = ["exec"]
# By default, raylet uses the path to default_worker.py on host.
# However, the path to default_worker.py inside the container
# can be different. We need the user to specify the path to
# default_worker.py inside the container.
if self.override_worker_entrypoint:
logger.debug(
f"Changing the worker entrypoint from {passthrough_args[0]} to "
f"{self.override_worker_entrypoint}."
)
passthrough_args[0] = self.override_worker_entrypoint
if sys.platform == "win32":
def quote(s):
s = s.replace("&", "%26")
return s
passthrough_args = [quote(s) for s in passthrough_args]
cmd = [*self.command_prefix, *executable, *passthrough_args]
logger.debug(f"Exec'ing worker with command: {cmd}")
subprocess.Popen(cmd, shell=True).wait()
else:
# We use shlex to do the necessary shell escape
# of special characters in passthrough_args.
passthrough_args = [shlex.quote(s) for s in passthrough_args]
cmd = [*self.command_prefix, *executable, *passthrough_args]
# TODO(SongGuyang): We add this env to command for macOS because it doesn't
# work for the C++ process of `os.execvp`. We should find a better way to
# fix it.
MACOS_LIBRARY_PATH_ENV_NAME = "DYLD_LIBRARY_PATH"
if MACOS_LIBRARY_PATH_ENV_NAME in os.environ:
cmd.insert(
0,
f"{MACOS_LIBRARY_PATH_ENV_NAME}="
f"{os.environ[MACOS_LIBRARY_PATH_ENV_NAME]}",
)
logger.debug(f"Exec'ing worker with command: {cmd}")
# PyCharm will monkey patch the os.execvp at
# .pycharm_helpers/pydev/_pydev_bundle/pydev_monkey.py
# The monkey patched os.execvp function has a different
# signature. So, we use os.execvp("executable", args=[])
# instead of os.execvp(file="executable", args=[])
os.execvp("bash", args=["bash", "-c", " ".join(cmd)])