Learn more  » Push, build, and install  RubyGems npm packages Python packages Maven artifacts PHP packages Go Modules Bower components Debian packages RPM packages NuGet packages

edgify / rook   python

Repository URL to install this package:

/ pyspark_daemon.py

"""
Loads Rook into pyspark workers
Usage: spark-submit --conf spark.python.daemon.module=rook.pyspark_daemon
"""
import pyspark.daemon
import functools
import six
import sys

from rook.config import ImportServiceConfig

original_worker_main = pyspark.daemon.worker_main


def worker_main(*args, **kwargs):
    try:
        import rook
        rook.start(log_file="", log_to_stderr=True)
        from rook.logger import logger
        logger.debug("Started Rook in Spark worker")
        from rook.interface import _rook as singleton, _TRUE_VALUES
        from rook.services import ImportService
        import_service = singleton.get_trigger_services().get_service(ImportService.NAME)

        def pickle_load_hook(orig_func, *args, **kwargs):
            obj = orig_func(*args, **kwargs)
            try:
                # this is here to deal with the delay of having the periodic thread call evaluate_module_list -
                # it could miss a module being imported.
                import_service.evaluate_module_list()
            except:  # lgtm[py/catch-base-exception]
                logger.exception("Silenced exception during module list evaluation")
            return obj

        # we may end up missing pickle module imports if we rely on the sys.modules polling thread
        import pyspark.serializers
        if ImportServiceConfig.USE_IMPORT_HOOK is False:  # only do this if we're not using the import hook
            pyspark.serializers.pickle.loads = functools.partial(pickle_load_hook, pyspark.serializers.pickle.loads)
            pyspark.serializers.pickle.load = functools.partial(pickle_load_hook, pyspark.serializers.pickle.load)
    except:  # lgtm[py/catch-base-exception]
        six.print_("Starting Rook in worker_main failed", file=sys.stderr)

    result = original_worker_main(*args, **kwargs)
    return result


pyspark.daemon.worker_main = worker_main

if __name__ == '__main__':
    pyspark.daemon.manager()