Mercurial > repos > guerler > springsuite
comparison planemo/lib/python3.7/site-packages/boto/emr/step.py @ 0:d30785e31577 draft
"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
| author | guerler |
|---|---|
| date | Fri, 31 Jul 2020 00:18:57 -0400 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:d30785e31577 |
|---|---|
| 1 # Copyright (c) 2010 Spotify AB | |
| 2 # Copyright (c) 2010-2011 Yelp | |
| 3 # | |
| 4 # Permission is hereby granted, free of charge, to any person obtaining a | |
| 5 # copy of this software and associated documentation files (the | |
| 6 # "Software"), to deal in the Software without restriction, including | |
| 7 # without limitation the rights to use, copy, modify, merge, publish, dis- | |
| 8 # tribute, sublicense, and/or sell copies of the Software, and to permit | |
| 9 # persons to whom the Software is furnished to do so, subject to the fol- | |
| 10 # lowing conditions: | |
| 11 # | |
| 12 # The above copyright notice and this permission notice shall be included | |
| 13 # in all copies or substantial portions of the Software. | |
| 14 # | |
| 15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS | |
| 16 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- | |
| 17 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT | |
| 18 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, | |
| 19 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
| 20 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS | |
| 21 # IN THE SOFTWARE. | |
| 22 | |
| 23 from boto.compat import six | |
| 24 | |
| 25 | |
| 26 class Step(object): | |
| 27 """ | |
| 28 Jobflow Step base class | |
| 29 """ | |
| 30 def jar(self): | |
| 31 """ | |
| 32 :rtype: str | |
| 33 :return: URI to the jar | |
| 34 """ | |
| 35 raise NotImplemented() | |
| 36 | |
| 37 def args(self): | |
| 38 """ | |
| 39 :rtype: list(str) | |
| 40 :return: List of arguments for the step | |
| 41 """ | |
| 42 raise NotImplemented() | |
| 43 | |
| 44 def main_class(self): | |
| 45 """ | |
| 46 :rtype: str | |
| 47 :return: The main class name | |
| 48 """ | |
| 49 raise NotImplemented() | |
| 50 | |
| 51 | |
| 52 class JarStep(Step): | |
| 53 """ | |
| 54 Custom jar step | |
| 55 """ | |
| 56 def __init__(self, name, jar, main_class=None, | |
| 57 action_on_failure='TERMINATE_JOB_FLOW', step_args=None): | |
| 58 """ | |
| 59 A elastic mapreduce step that executes a jar | |
| 60 | |
| 61 :type name: str | |
| 62 :param name: The name of the step | |
| 63 :type jar: str | |
| 64 :param jar: S3 URI to the Jar file | |
| 65 :type main_class: str | |
| 66 :param main_class: The class to execute in the jar | |
| 67 :type action_on_failure: str | |
| 68 :param action_on_failure: An action, defined in the EMR docs to | |
| 69 take on failure. | |
| 70 :type step_args: list(str) | |
| 71 :param step_args: A list of arguments to pass to the step | |
| 72 """ | |
| 73 self.name = name | |
| 74 self._jar = jar | |
| 75 self._main_class = main_class | |
| 76 self.action_on_failure = action_on_failure | |
| 77 | |
| 78 if isinstance(step_args, six.string_types): | |
| 79 step_args = [step_args] | |
| 80 | |
| 81 self.step_args = step_args | |
| 82 | |
| 83 def jar(self): | |
| 84 return self._jar | |
| 85 | |
| 86 def args(self): | |
| 87 args = [] | |
| 88 | |
| 89 if self.step_args: | |
| 90 args.extend(self.step_args) | |
| 91 | |
| 92 return args | |
| 93 | |
| 94 def main_class(self): | |
| 95 return self._main_class | |
| 96 | |
| 97 | |
| 98 class StreamingStep(Step): | |
| 99 """ | |
| 100 Hadoop streaming step | |
| 101 """ | |
| 102 def __init__(self, name, mapper, reducer=None, combiner=None, | |
| 103 action_on_failure='TERMINATE_JOB_FLOW', | |
| 104 cache_files=None, cache_archives=None, | |
| 105 step_args=None, input=None, output=None, | |
| 106 jar='/home/hadoop/contrib/streaming/hadoop-streaming.jar'): | |
| 107 """ | |
| 108 A hadoop streaming elastic mapreduce step | |
| 109 | |
| 110 :type name: str | |
| 111 :param name: The name of the step | |
| 112 :type mapper: str | |
| 113 :param mapper: The mapper URI | |
| 114 :type reducer: str | |
| 115 :param reducer: The reducer URI | |
| 116 :type combiner: str | |
| 117 :param combiner: The combiner URI. Only works for Hadoop 0.20 | |
| 118 and later! | |
| 119 :type action_on_failure: str | |
| 120 :param action_on_failure: An action, defined in the EMR docs to | |
| 121 take on failure. | |
| 122 :type cache_files: list(str) | |
| 123 :param cache_files: A list of cache files to be bundled with the job | |
| 124 :type cache_archives: list(str) | |
| 125 :param cache_archives: A list of jar archives to be bundled with | |
| 126 the job | |
| 127 :type step_args: list(str) | |
| 128 :param step_args: A list of arguments to pass to the step | |
| 129 :type input: str or a list of str | |
| 130 :param input: The input uri | |
| 131 :type output: str | |
| 132 :param output: The output uri | |
| 133 :type jar: str | |
| 134 :param jar: The hadoop streaming jar. This can be either a local | |
| 135 path on the master node, or an s3:// URI. | |
| 136 """ | |
| 137 self.name = name | |
| 138 self.mapper = mapper | |
| 139 self.reducer = reducer | |
| 140 self.combiner = combiner | |
| 141 self.action_on_failure = action_on_failure | |
| 142 self.cache_files = cache_files | |
| 143 self.cache_archives = cache_archives | |
| 144 self.input = input | |
| 145 self.output = output | |
| 146 self._jar = jar | |
| 147 | |
| 148 if isinstance(step_args, six.string_types): | |
| 149 step_args = [step_args] | |
| 150 | |
| 151 self.step_args = step_args | |
| 152 | |
| 153 def jar(self): | |
| 154 return self._jar | |
| 155 | |
| 156 def main_class(self): | |
| 157 return None | |
| 158 | |
| 159 def args(self): | |
| 160 args = [] | |
| 161 | |
| 162 # put extra args BEFORE -mapper and -reducer so that e.g. -libjar | |
| 163 # will work | |
| 164 if self.step_args: | |
| 165 args.extend(self.step_args) | |
| 166 | |
| 167 args.extend(['-mapper', self.mapper]) | |
| 168 | |
| 169 if self.combiner: | |
| 170 args.extend(['-combiner', self.combiner]) | |
| 171 | |
| 172 if self.reducer: | |
| 173 args.extend(['-reducer', self.reducer]) | |
| 174 else: | |
| 175 args.extend(['-jobconf', 'mapred.reduce.tasks=0']) | |
| 176 | |
| 177 if self.input: | |
| 178 if isinstance(self.input, list): | |
| 179 for input in self.input: | |
| 180 args.extend(('-input', input)) | |
| 181 else: | |
| 182 args.extend(('-input', self.input)) | |
| 183 if self.output: | |
| 184 args.extend(('-output', self.output)) | |
| 185 | |
| 186 if self.cache_files: | |
| 187 for cache_file in self.cache_files: | |
| 188 args.extend(('-cacheFile', cache_file)) | |
| 189 | |
| 190 if self.cache_archives: | |
| 191 for cache_archive in self.cache_archives: | |
| 192 args.extend(('-cacheArchive', cache_archive)) | |
| 193 | |
| 194 return args | |
| 195 | |
| 196 def __repr__(self): | |
| 197 return '%s.%s(name=%r, mapper=%r, reducer=%r, action_on_failure=%r, cache_files=%r, cache_archives=%r, step_args=%r, input=%r, output=%r, jar=%r)' % ( | |
| 198 self.__class__.__module__, self.__class__.__name__, | |
| 199 self.name, self.mapper, self.reducer, self.action_on_failure, | |
| 200 self.cache_files, self.cache_archives, self.step_args, | |
| 201 self.input, self.output, self._jar) | |
| 202 | |
| 203 | |
| 204 class ScriptRunnerStep(JarStep): | |
| 205 | |
| 206 ScriptRunnerJar = 's3n://us-east-1.elasticmapreduce/libs/script-runner/script-runner.jar' | |
| 207 | |
| 208 def __init__(self, name, **kw): | |
| 209 super(ScriptRunnerStep, self).__init__(name, self.ScriptRunnerJar, **kw) | |
| 210 | |
| 211 | |
| 212 class PigBase(ScriptRunnerStep): | |
| 213 | |
| 214 BaseArgs = ['s3n://us-east-1.elasticmapreduce/libs/pig/pig-script', | |
| 215 '--base-path', 's3n://us-east-1.elasticmapreduce/libs/pig/'] | |
| 216 | |
| 217 | |
| 218 class InstallPigStep(PigBase): | |
| 219 """ | |
| 220 Install pig on emr step | |
| 221 """ | |
| 222 | |
| 223 InstallPigName = 'Install Pig' | |
| 224 | |
| 225 def __init__(self, pig_versions='latest'): | |
| 226 step_args = [] | |
| 227 step_args.extend(self.BaseArgs) | |
| 228 step_args.extend(['--install-pig']) | |
| 229 step_args.extend(['--pig-versions', pig_versions]) | |
| 230 super(InstallPigStep, self).__init__(self.InstallPigName, step_args=step_args) | |
| 231 | |
| 232 | |
| 233 class PigStep(PigBase): | |
| 234 """ | |
| 235 Pig script step | |
| 236 """ | |
| 237 | |
| 238 def __init__(self, name, pig_file, pig_versions='latest', pig_args=[]): | |
| 239 step_args = [] | |
| 240 step_args.extend(self.BaseArgs) | |
| 241 step_args.extend(['--pig-versions', pig_versions]) | |
| 242 step_args.extend(['--run-pig-script', '--args', '-f', pig_file]) | |
| 243 step_args.extend(pig_args) | |
| 244 super(PigStep, self).__init__(name, step_args=step_args) | |
| 245 | |
| 246 | |
| 247 class HiveBase(ScriptRunnerStep): | |
| 248 | |
| 249 BaseArgs = ['s3n://us-east-1.elasticmapreduce/libs/hive/hive-script', | |
| 250 '--base-path', 's3n://us-east-1.elasticmapreduce/libs/hive/'] | |
| 251 | |
| 252 | |
| 253 class InstallHiveStep(HiveBase): | |
| 254 """ | |
| 255 Install Hive on EMR step | |
| 256 """ | |
| 257 InstallHiveName = 'Install Hive' | |
| 258 | |
| 259 def __init__(self, hive_versions='latest', hive_site=None): | |
| 260 step_args = [] | |
| 261 step_args.extend(self.BaseArgs) | |
| 262 step_args.extend(['--install-hive']) | |
| 263 step_args.extend(['--hive-versions', hive_versions]) | |
| 264 if hive_site is not None: | |
| 265 step_args.extend(['--hive-site=%s' % hive_site]) | |
| 266 super(InstallHiveStep, self).__init__(self.InstallHiveName, | |
| 267 step_args=step_args) | |
| 268 | |
| 269 | |
| 270 class HiveStep(HiveBase): | |
| 271 """ | |
| 272 Hive script step | |
| 273 """ | |
| 274 | |
| 275 def __init__(self, name, hive_file, hive_versions='latest', | |
| 276 hive_args=None): | |
| 277 step_args = [] | |
| 278 step_args.extend(self.BaseArgs) | |
| 279 step_args.extend(['--hive-versions', hive_versions]) | |
| 280 step_args.extend(['--run-hive-script', '--args', '-f', hive_file]) | |
| 281 if hive_args is not None: | |
| 282 step_args.extend(hive_args) | |
| 283 super(HiveStep, self).__init__(name, step_args=step_args) |
