Repository URL to install this package:
| 
          
        
        Version: 
           
    
          3.4.37  ▾
        
   | 
# Copyright (c) 2010 Spotify AB
# Copyright (c) 2010-2011 Yelp
#
# Permission is hereby granted, free of charge, to any person obtaining a
# copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish, dis-
# tribute, sublicense, and/or sell copies of the Software, and to permit
# persons to whom the Software is furnished to do so, subject to the fol-
# lowing conditions:
#
# The above copyright notice and this permission notice shall be included
# in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
# IN THE SOFTWARE.
from boto.compat import six
class Step(object):
    """
    Jobflow Step base class
    """
    def jar(self):
        """
        :rtype: str
        :return: URI to the jar
        """
        raise NotImplemented()
    def args(self):
        """
        :rtype: list(str)
        :return: List of arguments for the step
        """
        raise NotImplemented()
    def main_class(self):
        """
        :rtype: str
        :return: The main class name
        """
        raise NotImplemented()
class JarStep(Step):
    """
    Custom jar step
    """
    def __init__(self, name, jar, main_class=None,
                 action_on_failure='TERMINATE_JOB_FLOW', step_args=None):
        """
        A elastic mapreduce step that executes a jar
        :type name: str
        :param name: The name of the step
        :type jar: str
        :param jar: S3 URI to the Jar file
        :type main_class: str
        :param main_class: The class to execute in the jar
        :type action_on_failure: str
        :param action_on_failure: An action, defined in the EMR docs to
            take on failure.
        :type step_args: list(str)
        :param step_args: A list of arguments to pass to the step
        """
        self.name = name
        self._jar = jar
        self._main_class = main_class
        self.action_on_failure = action_on_failure
        if isinstance(step_args, six.string_types):
            step_args = [step_args]
        self.step_args = step_args
    def jar(self):
        return self._jar
    def args(self):
        args = []
        if self.step_args:
            args.extend(self.step_args)
        return args
    def main_class(self):
        return self._main_class
class StreamingStep(Step):
    """
    Hadoop streaming step
    """
    def __init__(self, name, mapper, reducer=None, combiner=None,
                 action_on_failure='TERMINATE_JOB_FLOW',
                 cache_files=None, cache_archives=None,
                 step_args=None, input=None, output=None,
                 jar='/home/hadoop/contrib/streaming/hadoop-streaming.jar'):
        """
        A hadoop streaming elastic mapreduce step
        :type name: str
        :param name: The name of the step
        :type mapper: str
        :param mapper: The mapper URI
        :type reducer: str
        :param reducer: The reducer URI
        :type combiner: str
        :param combiner: The combiner URI. Only works for Hadoop 0.20
            and later!
        :type action_on_failure: str
        :param action_on_failure: An action, defined in the EMR docs to
            take on failure.
        :type cache_files: list(str)
        :param cache_files: A list of cache files to be bundled with the job
        :type cache_archives: list(str)
        :param cache_archives: A list of jar archives to be bundled with
            the job
        :type step_args: list(str)
        :param step_args: A list of arguments to pass to the step
        :type input: str or a list of str
        :param input: The input uri
        :type output: str
        :param output: The output uri
        :type jar: str
        :param jar: The hadoop streaming jar. This can be either a local
            path on the master node, or an s3:// URI.
        """
        self.name = name
        self.mapper = mapper
        self.reducer = reducer
        self.combiner = combiner
        self.action_on_failure = action_on_failure
        self.cache_files = cache_files
        self.cache_archives = cache_archives
        self.input = input
        self.output = output
        self._jar = jar
        if isinstance(step_args, six.string_types):
            step_args = [step_args]
        self.step_args = step_args
    def jar(self):
        return self._jar
    def main_class(self):
        return None
    def args(self):
        args = []
        # put extra args BEFORE -mapper and -reducer so that e.g. -libjar
        # will work
        if self.step_args:
            args.extend(self.step_args)
        args.extend(['-mapper', self.mapper])
        if self.combiner:
            args.extend(['-combiner', self.combiner])
        if self.reducer:
            args.extend(['-reducer', self.reducer])
        else:
            args.extend(['-jobconf', 'mapred.reduce.tasks=0'])
        if self.input:
            if isinstance(self.input, list):
                for input in self.input:
                    args.extend(('-input', input))
            else:
                args.extend(('-input', self.input))
        if self.output:
            args.extend(('-output', self.output))
        if self.cache_files:
            for cache_file in self.cache_files:
                args.extend(('-cacheFile', cache_file))
        if self.cache_archives:
            for cache_archive in self.cache_archives:
                args.extend(('-cacheArchive', cache_archive))
        return args
    def __repr__(self):
        return '%s.%s(name=%r, mapper=%r, reducer=%r, action_on_failure=%r, cache_files=%r, cache_archives=%r, step_args=%r, input=%r, output=%r, jar=%r)' % (
            self.__class__.__module__, self.__class__.__name__,
            self.name, self.mapper, self.reducer, self.action_on_failure,
            self.cache_files, self.cache_archives, self.step_args,
            self.input, self.output, self._jar)
class ScriptRunnerStep(JarStep):
    ScriptRunnerJar = 's3n://us-east-1.elasticmapreduce/libs/script-runner/script-runner.jar'
    def __init__(self, name, **kw):
        super(ScriptRunnerStep, self).__init__(name, self.ScriptRunnerJar, **kw)
class PigBase(ScriptRunnerStep):
    BaseArgs = ['s3n://us-east-1.elasticmapreduce/libs/pig/pig-script',
                '--base-path', 's3n://us-east-1.elasticmapreduce/libs/pig/']
class InstallPigStep(PigBase):
    """
    Install pig on emr step
    """
    InstallPigName = 'Install Pig'
    def __init__(self, pig_versions='latest'):
        step_args = []
        step_args.extend(self.BaseArgs)
        step_args.extend(['--install-pig'])
        step_args.extend(['--pig-versions', pig_versions])
        super(InstallPigStep, self).__init__(self.InstallPigName, step_args=step_args)
class PigStep(PigBase):
    """
    Pig script step
    """
    def __init__(self, name, pig_file, pig_versions='latest', pig_args=[]):
        step_args = []
        step_args.extend(self.BaseArgs)
        step_args.extend(['--pig-versions', pig_versions])
        step_args.extend(['--run-pig-script', '--args', '-f', pig_file])
        step_args.extend(pig_args)
        super(PigStep, self).__init__(name, step_args=step_args)
class HiveBase(ScriptRunnerStep):
    BaseArgs = ['s3n://us-east-1.elasticmapreduce/libs/hive/hive-script',
                '--base-path', 's3n://us-east-1.elasticmapreduce/libs/hive/']
class InstallHiveStep(HiveBase):
    """
    Install Hive on EMR step
    """
    InstallHiveName = 'Install Hive'
    def __init__(self, hive_versions='latest', hive_site=None):
        step_args = []
        step_args.extend(self.BaseArgs)
        step_args.extend(['--install-hive'])
        step_args.extend(['--hive-versions', hive_versions])
        if hive_site is not None:
            step_args.extend(['--hive-site=%s' % hive_site])
        super(InstallHiveStep, self).__init__(self.InstallHiveName,
                                  step_args=step_args)
class HiveStep(HiveBase):
    """
    Hive script step
    """
    def __init__(self, name, hive_file, hive_versions='latest',
                 hive_args=None):
        step_args = []
        step_args.extend(self.BaseArgs)
        step_args.extend(['--hive-versions', hive_versions])
        step_args.extend(['--run-hive-script', '--args', '-f', hive_file])
        if hive_args is not None:
            step_args.extend(hive_args)
        super(HiveStep, self).__init__(name, step_args=step_args)