Mercurial > repos > guerler > springsuite
diff planemo/lib/python3.7/site-packages/boto/emr/step.py @ 0:d30785e31577 draft
"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
| author | guerler | 
|---|---|
| date | Fri, 31 Jul 2020 00:18:57 -0400 | 
| parents | |
| children | 
line wrap: on
 line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/planemo/lib/python3.7/site-packages/boto/emr/step.py Fri Jul 31 00:18:57 2020 -0400 @@ -0,0 +1,283 @@ +# Copyright (c) 2010 Spotify AB +# Copyright (c) 2010-2011 Yelp +# +# Permission is hereby granted, free of charge, to any person obtaining a +# copy of this software and associated documentation files (the +# "Software"), to deal in the Software without restriction, including +# without limitation the rights to use, copy, modify, merge, publish, dis- +# tribute, sublicense, and/or sell copies of the Software, and to permit +# persons to whom the Software is furnished to do so, subject to the fol- +# lowing conditions: +# +# The above copyright notice and this permission notice shall be included +# in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS +# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- +# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT +# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, +# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +# IN THE SOFTWARE. + +from boto.compat import six + + +class Step(object): + """ + Jobflow Step base class + """ + def jar(self): + """ + :rtype: str + :return: URI to the jar + """ + raise NotImplemented() + + def args(self): + """ + :rtype: list(str) + :return: List of arguments for the step + """ + raise NotImplemented() + + def main_class(self): + """ + :rtype: str + :return: The main class name + """ + raise NotImplemented() + + +class JarStep(Step): + """ + Custom jar step + """ + def __init__(self, name, jar, main_class=None, + action_on_failure='TERMINATE_JOB_FLOW', step_args=None): + """ + A elastic mapreduce step that executes a jar + + :type name: str + :param name: The name of the step + :type jar: str + :param jar: S3 URI to the Jar file + :type main_class: str + :param main_class: The class to execute in the jar + :type action_on_failure: str + :param action_on_failure: An action, defined in the EMR docs to + take on failure. + :type step_args: list(str) + :param step_args: A list of arguments to pass to the step + """ + self.name = name + self._jar = jar + self._main_class = main_class + self.action_on_failure = action_on_failure + + if isinstance(step_args, six.string_types): + step_args = [step_args] + + self.step_args = step_args + + def jar(self): + return self._jar + + def args(self): + args = [] + + if self.step_args: + args.extend(self.step_args) + + return args + + def main_class(self): + return self._main_class + + +class StreamingStep(Step): + """ + Hadoop streaming step + """ + def __init__(self, name, mapper, reducer=None, combiner=None, + action_on_failure='TERMINATE_JOB_FLOW', + cache_files=None, cache_archives=None, + step_args=None, input=None, output=None, + jar='/home/hadoop/contrib/streaming/hadoop-streaming.jar'): + """ + A hadoop streaming elastic mapreduce step + + :type name: str + :param name: The name of the step + :type mapper: str + :param mapper: The mapper URI + :type reducer: str + :param reducer: The reducer URI + :type combiner: str + :param combiner: The combiner URI. Only works for Hadoop 0.20 + and later! + :type action_on_failure: str + :param action_on_failure: An action, defined in the EMR docs to + take on failure. + :type cache_files: list(str) + :param cache_files: A list of cache files to be bundled with the job + :type cache_archives: list(str) + :param cache_archives: A list of jar archives to be bundled with + the job + :type step_args: list(str) + :param step_args: A list of arguments to pass to the step + :type input: str or a list of str + :param input: The input uri + :type output: str + :param output: The output uri + :type jar: str + :param jar: The hadoop streaming jar. This can be either a local + path on the master node, or an s3:// URI. + """ + self.name = name + self.mapper = mapper + self.reducer = reducer + self.combiner = combiner + self.action_on_failure = action_on_failure + self.cache_files = cache_files + self.cache_archives = cache_archives + self.input = input + self.output = output + self._jar = jar + + if isinstance(step_args, six.string_types): + step_args = [step_args] + + self.step_args = step_args + + def jar(self): + return self._jar + + def main_class(self): + return None + + def args(self): + args = [] + + # put extra args BEFORE -mapper and -reducer so that e.g. -libjar + # will work + if self.step_args: + args.extend(self.step_args) + + args.extend(['-mapper', self.mapper]) + + if self.combiner: + args.extend(['-combiner', self.combiner]) + + if self.reducer: + args.extend(['-reducer', self.reducer]) + else: + args.extend(['-jobconf', 'mapred.reduce.tasks=0']) + + if self.input: + if isinstance(self.input, list): + for input in self.input: + args.extend(('-input', input)) + else: + args.extend(('-input', self.input)) + if self.output: + args.extend(('-output', self.output)) + + if self.cache_files: + for cache_file in self.cache_files: + args.extend(('-cacheFile', cache_file)) + + if self.cache_archives: + for cache_archive in self.cache_archives: + args.extend(('-cacheArchive', cache_archive)) + + return args + + def __repr__(self): + return '%s.%s(name=%r, mapper=%r, reducer=%r, action_on_failure=%r, cache_files=%r, cache_archives=%r, step_args=%r, input=%r, output=%r, jar=%r)' % ( + self.__class__.__module__, self.__class__.__name__, + self.name, self.mapper, self.reducer, self.action_on_failure, + self.cache_files, self.cache_archives, self.step_args, + self.input, self.output, self._jar) + + +class ScriptRunnerStep(JarStep): + + ScriptRunnerJar = 's3n://us-east-1.elasticmapreduce/libs/script-runner/script-runner.jar' + + def __init__(self, name, **kw): + super(ScriptRunnerStep, self).__init__(name, self.ScriptRunnerJar, **kw) + + +class PigBase(ScriptRunnerStep): + + BaseArgs = ['s3n://us-east-1.elasticmapreduce/libs/pig/pig-script', + '--base-path', 's3n://us-east-1.elasticmapreduce/libs/pig/'] + + +class InstallPigStep(PigBase): + """ + Install pig on emr step + """ + + InstallPigName = 'Install Pig' + + def __init__(self, pig_versions='latest'): + step_args = [] + step_args.extend(self.BaseArgs) + step_args.extend(['--install-pig']) + step_args.extend(['--pig-versions', pig_versions]) + super(InstallPigStep, self).__init__(self.InstallPigName, step_args=step_args) + + +class PigStep(PigBase): + """ + Pig script step + """ + + def __init__(self, name, pig_file, pig_versions='latest', pig_args=[]): + step_args = [] + step_args.extend(self.BaseArgs) + step_args.extend(['--pig-versions', pig_versions]) + step_args.extend(['--run-pig-script', '--args', '-f', pig_file]) + step_args.extend(pig_args) + super(PigStep, self).__init__(name, step_args=step_args) + + +class HiveBase(ScriptRunnerStep): + + BaseArgs = ['s3n://us-east-1.elasticmapreduce/libs/hive/hive-script', + '--base-path', 's3n://us-east-1.elasticmapreduce/libs/hive/'] + + +class InstallHiveStep(HiveBase): + """ + Install Hive on EMR step + """ + InstallHiveName = 'Install Hive' + + def __init__(self, hive_versions='latest', hive_site=None): + step_args = [] + step_args.extend(self.BaseArgs) + step_args.extend(['--install-hive']) + step_args.extend(['--hive-versions', hive_versions]) + if hive_site is not None: + step_args.extend(['--hive-site=%s' % hive_site]) + super(InstallHiveStep, self).__init__(self.InstallHiveName, + step_args=step_args) + + +class HiveStep(HiveBase): + """ + Hive script step + """ + + def __init__(self, name, hive_file, hive_versions='latest', + hive_args=None): + step_args = [] + step_args.extend(self.BaseArgs) + step_args.extend(['--hive-versions', hive_versions]) + step_args.extend(['--run-hive-script', '--args', '-f', hive_file]) + if hive_args is not None: + step_args.extend(hive_args) + super(HiveStep, self).__init__(name, step_args=step_args)
