Mercurial > repos > guerler > springsuite
comparison planemo/lib/python3.7/site-packages/boto/emr/connection.py @ 0:d30785e31577 draft
"planemo upload commit 6eee67778febed82ddd413c3ca40b3183a3898f1"
| author | guerler |
|---|---|
| date | Fri, 31 Jul 2020 00:18:57 -0400 |
| parents | |
| children |
comparison
equal
deleted
inserted
replaced
| -1:000000000000 | 0:d30785e31577 |
|---|---|
| 1 # Copyright (c) 2010 Spotify AB | |
| 2 # Copyright (c) 2010-2011 Yelp | |
| 3 # | |
| 4 # Permission is hereby granted, free of charge, to any person obtaining a | |
| 5 # copy of this software and associated documentation files (the | |
| 6 # "Software"), to deal in the Software without restriction, including | |
| 7 # without limitation the rights to use, copy, modify, merge, publish, dis- | |
| 8 # tribute, sublicense, and/or sell copies of the Software, and to permit | |
| 9 # persons to whom the Software is furnished to do so, subject to the fol- | |
| 10 # lowing conditions: | |
| 11 # | |
| 12 # The above copyright notice and this permission notice shall be included | |
| 13 # in all copies or substantial portions of the Software. | |
| 14 # | |
| 15 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS | |
| 16 # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- | |
| 17 # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT | |
| 18 # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, | |
| 19 # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, | |
| 20 # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS | |
| 21 # IN THE SOFTWARE. | |
| 22 | |
| 23 """ | |
| 24 Represents a connection to the EMR service | |
| 25 """ | |
| 26 import types | |
| 27 | |
| 28 import boto | |
| 29 import boto.utils | |
| 30 from boto.ec2.regioninfo import RegionInfo | |
| 31 from boto.emr.emrobject import AddInstanceGroupsResponse, BootstrapActionList, \ | |
| 32 Cluster, ClusterSummaryList, HadoopStep, \ | |
| 33 InstanceGroupList, InstanceList, JobFlow, \ | |
| 34 JobFlowStepList, \ | |
| 35 ModifyInstanceGroupsResponse, \ | |
| 36 RunJobFlowResponse, StepSummaryList | |
| 37 from boto.emr.step import JarStep | |
| 38 from boto.connection import AWSQueryConnection | |
| 39 from boto.exception import EmrResponseError | |
| 40 from boto.compat import six | |
| 41 | |
| 42 | |
| 43 class EmrConnection(AWSQueryConnection): | |
| 44 | |
| 45 APIVersion = boto.config.get('Boto', 'emr_version', '2009-03-31') | |
| 46 DefaultRegionName = boto.config.get('Boto', 'emr_region_name', 'us-east-1') | |
| 47 DefaultRegionEndpoint = boto.config.get('Boto', 'emr_region_endpoint', | |
| 48 'elasticmapreduce.us-east-1.amazonaws.com') | |
| 49 ResponseError = EmrResponseError | |
| 50 | |
| 51 | |
| 52 | |
| 53 # Constants for AWS Console debugging | |
| 54 DebuggingJar = 's3://{region_name}.elasticmapreduce/libs/script-runner/script-runner.jar' | |
| 55 DebuggingArgs = 's3://{region_name}.elasticmapreduce/libs/state-pusher/0.1/fetch' | |
| 56 | |
| 57 def __init__(self, aws_access_key_id=None, aws_secret_access_key=None, | |
| 58 is_secure=True, port=None, proxy=None, proxy_port=None, | |
| 59 proxy_user=None, proxy_pass=None, debug=0, | |
| 60 https_connection_factory=None, region=None, path='/', | |
| 61 security_token=None, validate_certs=True, profile_name=None): | |
| 62 if not region: | |
| 63 region = RegionInfo(self, self.DefaultRegionName, | |
| 64 self.DefaultRegionEndpoint) | |
| 65 self.region = region | |
| 66 super(EmrConnection, self).__init__(aws_access_key_id, | |
| 67 aws_secret_access_key, | |
| 68 is_secure, port, proxy, proxy_port, | |
| 69 proxy_user, proxy_pass, | |
| 70 self.region.endpoint, debug, | |
| 71 https_connection_factory, path, | |
| 72 security_token, | |
| 73 validate_certs=validate_certs, | |
| 74 profile_name=profile_name) | |
| 75 # Many of the EMR hostnames are of the form: | |
| 76 # <region>.<service_name>.amazonaws.com | |
| 77 # rather than the more common: | |
| 78 # <service_name>.<region>.amazonaws.com | |
| 79 # so we need to explicitly set the region_name and service_name | |
| 80 # for the SigV4 signing. | |
| 81 self.auth_region_name = self.region.name | |
| 82 self.auth_service_name = 'elasticmapreduce' | |
| 83 | |
| 84 def _required_auth_capability(self): | |
| 85 return ['hmac-v4'] | |
| 86 | |
| 87 def describe_cluster(self, cluster_id): | |
| 88 """ | |
| 89 Describes an Elastic MapReduce cluster | |
| 90 | |
| 91 :type cluster_id: str | |
| 92 :param cluster_id: The cluster id of interest | |
| 93 """ | |
| 94 params = { | |
| 95 'ClusterId': cluster_id | |
| 96 } | |
| 97 return self.get_object('DescribeCluster', params, Cluster) | |
| 98 | |
| 99 def describe_jobflow(self, jobflow_id): | |
| 100 """ | |
| 101 This method is deprecated. We recommend you use list_clusters, | |
| 102 describe_cluster, list_steps, list_instance_groups and | |
| 103 list_bootstrap_actions instead. | |
| 104 | |
| 105 Describes a single Elastic MapReduce job flow | |
| 106 | |
| 107 :type jobflow_id: str | |
| 108 :param jobflow_id: The job flow id of interest | |
| 109 """ | |
| 110 jobflows = self.describe_jobflows(jobflow_ids=[jobflow_id]) | |
| 111 if jobflows: | |
| 112 return jobflows[0] | |
| 113 | |
| 114 def describe_jobflows(self, states=None, jobflow_ids=None, | |
| 115 created_after=None, created_before=None): | |
| 116 """ | |
| 117 This method is deprecated. We recommend you use list_clusters, | |
| 118 describe_cluster, list_steps, list_instance_groups and | |
| 119 list_bootstrap_actions instead. | |
| 120 | |
| 121 Retrieve all the Elastic MapReduce job flows on your account | |
| 122 | |
| 123 :type states: list | |
| 124 :param states: A list of strings with job flow states wanted | |
| 125 | |
| 126 :type jobflow_ids: list | |
| 127 :param jobflow_ids: A list of job flow IDs | |
| 128 :type created_after: datetime | |
| 129 :param created_after: Bound on job flow creation time | |
| 130 | |
| 131 :type created_before: datetime | |
| 132 :param created_before: Bound on job flow creation time | |
| 133 """ | |
| 134 params = {} | |
| 135 | |
| 136 if states: | |
| 137 self.build_list_params(params, states, 'JobFlowStates.member') | |
| 138 if jobflow_ids: | |
| 139 self.build_list_params(params, jobflow_ids, 'JobFlowIds.member') | |
| 140 if created_after: | |
| 141 params['CreatedAfter'] = created_after.strftime( | |
| 142 boto.utils.ISO8601) | |
| 143 if created_before: | |
| 144 params['CreatedBefore'] = created_before.strftime( | |
| 145 boto.utils.ISO8601) | |
| 146 | |
| 147 return self.get_list('DescribeJobFlows', params, [('member', JobFlow)]) | |
| 148 | |
| 149 def describe_step(self, cluster_id, step_id): | |
| 150 """ | |
| 151 Describe an Elastic MapReduce step | |
| 152 | |
| 153 :type cluster_id: str | |
| 154 :param cluster_id: The cluster id of interest | |
| 155 :type step_id: str | |
| 156 :param step_id: The step id of interest | |
| 157 """ | |
| 158 params = { | |
| 159 'ClusterId': cluster_id, | |
| 160 'StepId': step_id | |
| 161 } | |
| 162 | |
| 163 return self.get_object('DescribeStep', params, HadoopStep) | |
| 164 | |
| 165 def list_bootstrap_actions(self, cluster_id, marker=None): | |
| 166 """ | |
| 167 Get a list of bootstrap actions for an Elastic MapReduce cluster | |
| 168 | |
| 169 :type cluster_id: str | |
| 170 :param cluster_id: The cluster id of interest | |
| 171 :type marker: str | |
| 172 :param marker: Pagination marker | |
| 173 """ | |
| 174 params = { | |
| 175 'ClusterId': cluster_id | |
| 176 } | |
| 177 | |
| 178 if marker: | |
| 179 params['Marker'] = marker | |
| 180 | |
| 181 return self.get_object('ListBootstrapActions', params, BootstrapActionList) | |
| 182 | |
| 183 def list_clusters(self, created_after=None, created_before=None, | |
| 184 cluster_states=None, marker=None): | |
| 185 """ | |
| 186 List Elastic MapReduce clusters with optional filtering | |
| 187 | |
| 188 :type created_after: datetime | |
| 189 :param created_after: Bound on cluster creation time | |
| 190 :type created_before: datetime | |
| 191 :param created_before: Bound on cluster creation time | |
| 192 :type cluster_states: list | |
| 193 :param cluster_states: Bound on cluster states | |
| 194 :type marker: str | |
| 195 :param marker: Pagination marker | |
| 196 """ | |
| 197 params = {} | |
| 198 if created_after: | |
| 199 params['CreatedAfter'] = created_after.strftime( | |
| 200 boto.utils.ISO8601) | |
| 201 if created_before: | |
| 202 params['CreatedBefore'] = created_before.strftime( | |
| 203 boto.utils.ISO8601) | |
| 204 if marker: | |
| 205 params['Marker'] = marker | |
| 206 | |
| 207 if cluster_states: | |
| 208 self.build_list_params(params, cluster_states, 'ClusterStates.member') | |
| 209 | |
| 210 return self.get_object('ListClusters', params, ClusterSummaryList) | |
| 211 | |
| 212 def list_instance_groups(self, cluster_id, marker=None): | |
| 213 """ | |
| 214 List EC2 instance groups in a cluster | |
| 215 | |
| 216 :type cluster_id: str | |
| 217 :param cluster_id: The cluster id of interest | |
| 218 :type marker: str | |
| 219 :param marker: Pagination marker | |
| 220 """ | |
| 221 params = { | |
| 222 'ClusterId': cluster_id | |
| 223 } | |
| 224 | |
| 225 if marker: | |
| 226 params['Marker'] = marker | |
| 227 | |
| 228 return self.get_object('ListInstanceGroups', params, InstanceGroupList) | |
| 229 | |
| 230 def list_instances(self, cluster_id, instance_group_id=None, | |
| 231 instance_group_types=None, marker=None): | |
| 232 """ | |
| 233 List EC2 instances in a cluster | |
| 234 | |
| 235 :type cluster_id: str | |
| 236 :param cluster_id: The cluster id of interest | |
| 237 :type instance_group_id: str | |
| 238 :param instance_group_id: The EC2 instance group id of interest | |
| 239 :type instance_group_types: list | |
| 240 :param instance_group_types: Filter by EC2 instance group type | |
| 241 :type marker: str | |
| 242 :param marker: Pagination marker | |
| 243 """ | |
| 244 params = { | |
| 245 'ClusterId': cluster_id | |
| 246 } | |
| 247 | |
| 248 if instance_group_id: | |
| 249 params['InstanceGroupId'] = instance_group_id | |
| 250 if marker: | |
| 251 params['Marker'] = marker | |
| 252 | |
| 253 if instance_group_types: | |
| 254 self.build_list_params(params, instance_group_types, | |
| 255 'InstanceGroupTypes.member') | |
| 256 | |
| 257 return self.get_object('ListInstances', params, InstanceList) | |
| 258 | |
| 259 def list_steps(self, cluster_id, step_states=None, marker=None): | |
| 260 """ | |
| 261 List cluster steps | |
| 262 | |
| 263 :type cluster_id: str | |
| 264 :param cluster_id: The cluster id of interest | |
| 265 :type step_states: list | |
| 266 :param step_states: Filter by step states | |
| 267 :type marker: str | |
| 268 :param marker: Pagination marker | |
| 269 """ | |
| 270 params = { | |
| 271 'ClusterId': cluster_id | |
| 272 } | |
| 273 | |
| 274 if marker: | |
| 275 params['Marker'] = marker | |
| 276 | |
| 277 if step_states: | |
| 278 self.build_list_params(params, step_states, 'StepStates.member') | |
| 279 | |
| 280 return self.get_object('ListSteps', params, StepSummaryList) | |
| 281 | |
| 282 def add_tags(self, resource_id, tags): | |
| 283 """ | |
| 284 Create new metadata tags for the specified resource id. | |
| 285 | |
| 286 :type resource_id: str | |
| 287 :param resource_id: The cluster id | |
| 288 | |
| 289 :type tags: dict | |
| 290 :param tags: A dictionary containing the name/value pairs. | |
| 291 If you want to create only a tag name, the | |
| 292 value for that tag should be the empty string | |
| 293 (e.g. '') or None. | |
| 294 """ | |
| 295 assert isinstance(resource_id, six.string_types) | |
| 296 params = { | |
| 297 'ResourceId': resource_id, | |
| 298 } | |
| 299 params.update(self._build_tag_list(tags)) | |
| 300 return self.get_status('AddTags', params, verb='POST') | |
| 301 | |
| 302 def remove_tags(self, resource_id, tags): | |
| 303 """ | |
| 304 Remove metadata tags for the specified resource id. | |
| 305 | |
| 306 :type resource_id: str | |
| 307 :param resource_id: The cluster id | |
| 308 | |
| 309 :type tags: list | |
| 310 :param tags: A list of tag names to remove. | |
| 311 """ | |
| 312 params = { | |
| 313 'ResourceId': resource_id, | |
| 314 } | |
| 315 params.update(self._build_string_list('TagKeys', tags)) | |
| 316 return self.get_status('RemoveTags', params, verb='POST') | |
| 317 | |
| 318 def terminate_jobflow(self, jobflow_id): | |
| 319 """ | |
| 320 Terminate an Elastic MapReduce job flow | |
| 321 | |
| 322 :type jobflow_id: str | |
| 323 :param jobflow_id: A jobflow id | |
| 324 """ | |
| 325 self.terminate_jobflows([jobflow_id]) | |
| 326 | |
| 327 def terminate_jobflows(self, jobflow_ids): | |
| 328 """ | |
| 329 Terminate an Elastic MapReduce job flow | |
| 330 | |
| 331 :type jobflow_ids: list | |
| 332 :param jobflow_ids: A list of job flow IDs | |
| 333 """ | |
| 334 params = {} | |
| 335 self.build_list_params(params, jobflow_ids, 'JobFlowIds.member') | |
| 336 return self.get_status('TerminateJobFlows', params, verb='POST') | |
| 337 | |
| 338 def add_jobflow_steps(self, jobflow_id, steps): | |
| 339 """ | |
| 340 Adds steps to a jobflow | |
| 341 | |
| 342 :type jobflow_id: str | |
| 343 :param jobflow_id: The job flow id | |
| 344 :type steps: list(boto.emr.Step) | |
| 345 :param steps: A list of steps to add to the job | |
| 346 """ | |
| 347 if not isinstance(steps, list): | |
| 348 steps = [steps] | |
| 349 params = {} | |
| 350 params['JobFlowId'] = jobflow_id | |
| 351 | |
| 352 # Step args | |
| 353 step_args = [self._build_step_args(step) for step in steps] | |
| 354 params.update(self._build_step_list(step_args)) | |
| 355 | |
| 356 return self.get_object( | |
| 357 'AddJobFlowSteps', params, JobFlowStepList, verb='POST') | |
| 358 | |
| 359 def add_instance_groups(self, jobflow_id, instance_groups): | |
| 360 """ | |
| 361 Adds instance groups to a running cluster. | |
| 362 | |
| 363 :type jobflow_id: str | |
| 364 :param jobflow_id: The id of the jobflow which will take the | |
| 365 new instance groups | |
| 366 | |
| 367 :type instance_groups: list(boto.emr.InstanceGroup) | |
| 368 :param instance_groups: A list of instance groups to add to the job | |
| 369 """ | |
| 370 if not isinstance(instance_groups, list): | |
| 371 instance_groups = [instance_groups] | |
| 372 params = {} | |
| 373 params['JobFlowId'] = jobflow_id | |
| 374 params.update(self._build_instance_group_list_args(instance_groups)) | |
| 375 | |
| 376 return self.get_object('AddInstanceGroups', params, | |
| 377 AddInstanceGroupsResponse, verb='POST') | |
| 378 | |
| 379 def modify_instance_groups(self, instance_group_ids, new_sizes): | |
| 380 """ | |
| 381 Modify the number of nodes and configuration settings in an | |
| 382 instance group. | |
| 383 | |
| 384 :type instance_group_ids: list(str) | |
| 385 :param instance_group_ids: A list of the ID's of the instance | |
| 386 groups to be modified | |
| 387 | |
| 388 :type new_sizes: list(int) | |
| 389 :param new_sizes: A list of the new sizes for each instance group | |
| 390 """ | |
| 391 if not isinstance(instance_group_ids, list): | |
| 392 instance_group_ids = [instance_group_ids] | |
| 393 if not isinstance(new_sizes, list): | |
| 394 new_sizes = [new_sizes] | |
| 395 | |
| 396 instance_groups = zip(instance_group_ids, new_sizes) | |
| 397 | |
| 398 params = {} | |
| 399 for k, ig in enumerate(instance_groups): | |
| 400 # could be wrong - the example amazon gives uses | |
| 401 # InstanceRequestCount, while the api documentation | |
| 402 # says InstanceCount | |
| 403 params['InstanceGroups.member.%d.InstanceGroupId' % (k+1) ] = ig[0] | |
| 404 params['InstanceGroups.member.%d.InstanceCount' % (k+1) ] = ig[1] | |
| 405 | |
| 406 return self.get_object('ModifyInstanceGroups', params, | |
| 407 ModifyInstanceGroupsResponse, verb='POST') | |
| 408 | |
| 409 def run_jobflow(self, name, log_uri=None, ec2_keyname=None, | |
| 410 availability_zone=None, | |
| 411 master_instance_type='m1.small', | |
| 412 slave_instance_type='m1.small', num_instances=1, | |
| 413 action_on_failure='TERMINATE_JOB_FLOW', keep_alive=False, | |
| 414 enable_debugging=False, | |
| 415 hadoop_version=None, | |
| 416 steps=None, | |
| 417 bootstrap_actions=[], | |
| 418 instance_groups=None, | |
| 419 additional_info=None, | |
| 420 ami_version=None, | |
| 421 api_params=None, | |
| 422 visible_to_all_users=None, | |
| 423 job_flow_role=None, | |
| 424 service_role=None): | |
| 425 """ | |
| 426 Runs a job flow | |
| 427 :type name: str | |
| 428 :param name: Name of the job flow | |
| 429 | |
| 430 :type log_uri: str | |
| 431 :param log_uri: URI of the S3 bucket to place logs | |
| 432 | |
| 433 :type ec2_keyname: str | |
| 434 :param ec2_keyname: EC2 key used for the instances | |
| 435 | |
| 436 :type availability_zone: str | |
| 437 :param availability_zone: EC2 availability zone of the cluster | |
| 438 | |
| 439 :type master_instance_type: str | |
| 440 :param master_instance_type: EC2 instance type of the master | |
| 441 | |
| 442 :type slave_instance_type: str | |
| 443 :param slave_instance_type: EC2 instance type of the slave nodes | |
| 444 | |
| 445 :type num_instances: int | |
| 446 :param num_instances: Number of instances in the Hadoop cluster | |
| 447 | |
| 448 :type action_on_failure: str | |
| 449 :param action_on_failure: Action to take if a step terminates | |
| 450 | |
| 451 :type keep_alive: bool | |
| 452 :param keep_alive: Denotes whether the cluster should stay | |
| 453 alive upon completion | |
| 454 | |
| 455 :type enable_debugging: bool | |
| 456 :param enable_debugging: Denotes whether AWS console debugging | |
| 457 should be enabled. | |
| 458 | |
| 459 :type hadoop_version: str | |
| 460 :param hadoop_version: Version of Hadoop to use. This no longer | |
| 461 defaults to '0.20' and now uses the AMI default. | |
| 462 | |
| 463 :type steps: list(boto.emr.Step) | |
| 464 :param steps: List of steps to add with the job | |
| 465 | |
| 466 :type bootstrap_actions: list(boto.emr.BootstrapAction) | |
| 467 :param bootstrap_actions: List of bootstrap actions that run | |
| 468 before Hadoop starts. | |
| 469 | |
| 470 :type instance_groups: list(boto.emr.InstanceGroup) | |
| 471 :param instance_groups: Optional list of instance groups to | |
| 472 use when creating this job. | |
| 473 NB: When provided, this argument supersedes num_instances | |
| 474 and master/slave_instance_type. | |
| 475 | |
| 476 :type ami_version: str | |
| 477 :param ami_version: Amazon Machine Image (AMI) version to use | |
| 478 for instances. Values accepted by EMR are '1.0', '2.0', and | |
| 479 'latest'; EMR currently defaults to '1.0' if you don't set | |
| 480 'ami_version'. | |
| 481 | |
| 482 :type additional_info: JSON str | |
| 483 :param additional_info: A JSON string for selecting additional features | |
| 484 | |
| 485 :type api_params: dict | |
| 486 :param api_params: a dictionary of additional parameters to pass | |
| 487 directly to the EMR API (so you don't have to upgrade boto to | |
| 488 use new EMR features). You can also delete an API parameter | |
| 489 by setting it to None. | |
| 490 | |
| 491 :type visible_to_all_users: bool | |
| 492 :param visible_to_all_users: Whether the job flow is visible to all IAM | |
| 493 users of the AWS account associated with the job flow. If this | |
| 494 value is set to ``True``, all IAM users of that AWS | |
| 495 account can view and (if they have the proper policy permissions | |
| 496 set) manage the job flow. If it is set to ``False``, only | |
| 497 the IAM user that created the job flow can view and manage | |
| 498 it. | |
| 499 | |
| 500 :type job_flow_role: str | |
| 501 :param job_flow_role: An IAM role for the job flow. The EC2 | |
| 502 instances of the job flow assume this role. The default role is | |
| 503 ``EMRJobflowDefault``. In order to use the default role, | |
| 504 you must have already created it using the CLI. | |
| 505 | |
| 506 :type service_role: str | |
| 507 :param service_role: The IAM role that will be assumed by the Amazon | |
| 508 EMR service to access AWS resources on your behalf. | |
| 509 | |
| 510 :rtype: str | |
| 511 :return: The jobflow id | |
| 512 """ | |
| 513 steps = steps or [] | |
| 514 params = {} | |
| 515 if action_on_failure: | |
| 516 params['ActionOnFailure'] = action_on_failure | |
| 517 if log_uri: | |
| 518 params['LogUri'] = log_uri | |
| 519 params['Name'] = name | |
| 520 | |
| 521 # Common instance args | |
| 522 common_params = self._build_instance_common_args(ec2_keyname, | |
| 523 availability_zone, | |
| 524 keep_alive, | |
| 525 hadoop_version) | |
| 526 params.update(common_params) | |
| 527 | |
| 528 # NB: according to the AWS API's error message, we must | |
| 529 # "configure instances either using instance count, master and | |
| 530 # slave instance type or instance groups but not both." | |
| 531 # | |
| 532 # Thus we switch here on the truthiness of instance_groups. | |
| 533 if not instance_groups: | |
| 534 # Instance args (the common case) | |
| 535 instance_params = self._build_instance_count_and_type_args( | |
| 536 master_instance_type, | |
| 537 slave_instance_type, | |
| 538 num_instances) | |
| 539 params.update(instance_params) | |
| 540 else: | |
| 541 # Instance group args (for spot instances or a heterogenous cluster) | |
| 542 list_args = self._build_instance_group_list_args(instance_groups) | |
| 543 instance_params = dict( | |
| 544 ('Instances.%s' % k, v) for k, v in six.iteritems(list_args) | |
| 545 ) | |
| 546 params.update(instance_params) | |
| 547 | |
| 548 # Debugging step from EMR API docs | |
| 549 if enable_debugging: | |
| 550 debugging_step = JarStep(name='Setup Hadoop Debugging', | |
| 551 action_on_failure='TERMINATE_JOB_FLOW', | |
| 552 main_class=None, | |
| 553 jar=self.DebuggingJar.format(region_name=self.region.name), | |
| 554 step_args=self.DebuggingArgs.format(region_name=self.region.name)) | |
| 555 steps.insert(0, debugging_step) | |
| 556 | |
| 557 # Step args | |
| 558 if steps: | |
| 559 step_args = [self._build_step_args(step) for step in steps] | |
| 560 params.update(self._build_step_list(step_args)) | |
| 561 | |
| 562 if bootstrap_actions: | |
| 563 bootstrap_action_args = [self._build_bootstrap_action_args(bootstrap_action) for bootstrap_action in bootstrap_actions] | |
| 564 params.update(self._build_bootstrap_action_list(bootstrap_action_args)) | |
| 565 | |
| 566 if ami_version: | |
| 567 params['AmiVersion'] = ami_version | |
| 568 | |
| 569 if additional_info is not None: | |
| 570 params['AdditionalInfo'] = additional_info | |
| 571 | |
| 572 if api_params: | |
| 573 for key, value in six.iteritems(api_params): | |
| 574 if value is None: | |
| 575 params.pop(key, None) | |
| 576 else: | |
| 577 params[key] = value | |
| 578 | |
| 579 if visible_to_all_users is not None: | |
| 580 if visible_to_all_users: | |
| 581 params['VisibleToAllUsers'] = 'true' | |
| 582 else: | |
| 583 params['VisibleToAllUsers'] = 'false' | |
| 584 | |
| 585 if job_flow_role is not None: | |
| 586 params['JobFlowRole'] = job_flow_role | |
| 587 | |
| 588 if service_role is not None: | |
| 589 params['ServiceRole'] = service_role | |
| 590 | |
| 591 response = self.get_object( | |
| 592 'RunJobFlow', params, RunJobFlowResponse, verb='POST') | |
| 593 return response.jobflowid | |
| 594 | |
| 595 def set_termination_protection(self, jobflow_id, | |
| 596 termination_protection_status): | |
| 597 """ | |
| 598 Set termination protection on specified Elastic MapReduce job flows | |
| 599 | |
| 600 :type jobflow_ids: list or str | |
| 601 :param jobflow_ids: A list of job flow IDs | |
| 602 | |
| 603 :type termination_protection_status: bool | |
| 604 :param termination_protection_status: Termination protection status | |
| 605 """ | |
| 606 assert termination_protection_status in (True, False) | |
| 607 | |
| 608 params = {} | |
| 609 params['TerminationProtected'] = (termination_protection_status and "true") or "false" | |
| 610 self.build_list_params(params, [jobflow_id], 'JobFlowIds.member') | |
| 611 | |
| 612 return self.get_status('SetTerminationProtection', params, verb='POST') | |
| 613 | |
| 614 def set_visible_to_all_users(self, jobflow_id, visibility): | |
| 615 """ | |
| 616 Set whether specified Elastic Map Reduce job flows are visible to all IAM users | |
| 617 | |
| 618 :type jobflow_ids: list or str | |
| 619 :param jobflow_ids: A list of job flow IDs | |
| 620 | |
| 621 :type visibility: bool | |
| 622 :param visibility: Visibility | |
| 623 """ | |
| 624 assert visibility in (True, False) | |
| 625 | |
| 626 params = {} | |
| 627 params['VisibleToAllUsers'] = (visibility and "true") or "false" | |
| 628 self.build_list_params(params, [jobflow_id], 'JobFlowIds.member') | |
| 629 | |
| 630 return self.get_status('SetVisibleToAllUsers', params, verb='POST') | |
| 631 | |
| 632 def _build_bootstrap_action_args(self, bootstrap_action): | |
| 633 bootstrap_action_params = {} | |
| 634 bootstrap_action_params['ScriptBootstrapAction.Path'] = bootstrap_action.path | |
| 635 | |
| 636 try: | |
| 637 bootstrap_action_params['Name'] = bootstrap_action.name | |
| 638 except AttributeError: | |
| 639 pass | |
| 640 | |
| 641 args = bootstrap_action.args() | |
| 642 if args: | |
| 643 self.build_list_params(bootstrap_action_params, args, 'ScriptBootstrapAction.Args.member') | |
| 644 | |
| 645 return bootstrap_action_params | |
| 646 | |
| 647 def _build_step_args(self, step): | |
| 648 step_params = {} | |
| 649 step_params['ActionOnFailure'] = step.action_on_failure | |
| 650 step_params['HadoopJarStep.Jar'] = step.jar() | |
| 651 | |
| 652 main_class = step.main_class() | |
| 653 if main_class: | |
| 654 step_params['HadoopJarStep.MainClass'] = main_class | |
| 655 | |
| 656 args = step.args() | |
| 657 if args: | |
| 658 self.build_list_params(step_params, args, 'HadoopJarStep.Args.member') | |
| 659 | |
| 660 step_params['Name'] = step.name | |
| 661 return step_params | |
| 662 | |
| 663 def _build_bootstrap_action_list(self, bootstrap_actions): | |
| 664 if not isinstance(bootstrap_actions, list): | |
| 665 bootstrap_actions = [bootstrap_actions] | |
| 666 | |
| 667 params = {} | |
| 668 for i, bootstrap_action in enumerate(bootstrap_actions): | |
| 669 for key, value in six.iteritems(bootstrap_action): | |
| 670 params['BootstrapActions.member.%s.%s' % (i + 1, key)] = value | |
| 671 return params | |
| 672 | |
| 673 def _build_step_list(self, steps): | |
| 674 if not isinstance(steps, list): | |
| 675 steps = [steps] | |
| 676 | |
| 677 params = {} | |
| 678 for i, step in enumerate(steps): | |
| 679 for key, value in six.iteritems(step): | |
| 680 params['Steps.member.%s.%s' % (i+1, key)] = value | |
| 681 return params | |
| 682 | |
| 683 def _build_string_list(self, field, items): | |
| 684 if not isinstance(items, list): | |
| 685 items = [items] | |
| 686 | |
| 687 params = {} | |
| 688 for i, item in enumerate(items): | |
| 689 params['%s.member.%s' % (field, i + 1)] = item | |
| 690 return params | |
| 691 | |
| 692 def _build_tag_list(self, tags): | |
| 693 assert isinstance(tags, dict) | |
| 694 | |
| 695 params = {} | |
| 696 for i, key_value in enumerate(sorted(six.iteritems(tags)), start=1): | |
| 697 key, value = key_value | |
| 698 current_prefix = 'Tags.member.%s' % i | |
| 699 params['%s.Key' % current_prefix] = key | |
| 700 if value: | |
| 701 params['%s.Value' % current_prefix] = value | |
| 702 return params | |
| 703 | |
| 704 def _build_instance_common_args(self, ec2_keyname, availability_zone, | |
| 705 keep_alive, hadoop_version): | |
| 706 """ | |
| 707 Takes a number of parameters used when starting a jobflow (as | |
| 708 specified in run_jobflow() above). Returns a comparable dict for | |
| 709 use in making a RunJobFlow request. | |
| 710 """ | |
| 711 params = { | |
| 712 'Instances.KeepJobFlowAliveWhenNoSteps': str(keep_alive).lower(), | |
| 713 } | |
| 714 | |
| 715 if hadoop_version: | |
| 716 params['Instances.HadoopVersion'] = hadoop_version | |
| 717 if ec2_keyname: | |
| 718 params['Instances.Ec2KeyName'] = ec2_keyname | |
| 719 if availability_zone: | |
| 720 params['Instances.Placement.AvailabilityZone'] = availability_zone | |
| 721 | |
| 722 return params | |
| 723 | |
| 724 def _build_instance_count_and_type_args(self, master_instance_type, | |
| 725 slave_instance_type, num_instances): | |
| 726 """ | |
| 727 Takes a master instance type (string), a slave instance type | |
| 728 (string), and a number of instances. Returns a comparable dict | |
| 729 for use in making a RunJobFlow request. | |
| 730 """ | |
| 731 params = {'Instances.MasterInstanceType': master_instance_type, | |
| 732 'Instances.SlaveInstanceType': slave_instance_type, | |
| 733 'Instances.InstanceCount': num_instances} | |
| 734 return params | |
| 735 | |
| 736 def _build_instance_group_args(self, instance_group): | |
| 737 """ | |
| 738 Takes an InstanceGroup; returns a dict that, when its keys are | |
| 739 properly prefixed, can be used for describing InstanceGroups in | |
| 740 RunJobFlow or AddInstanceGroups requests. | |
| 741 """ | |
| 742 params = {'InstanceCount': instance_group.num_instances, | |
| 743 'InstanceRole': instance_group.role, | |
| 744 'InstanceType': instance_group.type, | |
| 745 'Name': instance_group.name, | |
| 746 'Market': instance_group.market} | |
| 747 if instance_group.market == 'SPOT': | |
| 748 params['BidPrice'] = instance_group.bidprice | |
| 749 return params | |
| 750 | |
| 751 def _build_instance_group_list_args(self, instance_groups): | |
| 752 """ | |
| 753 Takes a list of InstanceGroups, or a single InstanceGroup. Returns | |
| 754 a comparable dict for use in making a RunJobFlow or AddInstanceGroups | |
| 755 request. | |
| 756 """ | |
| 757 if not isinstance(instance_groups, list): | |
| 758 instance_groups = [instance_groups] | |
| 759 | |
| 760 params = {} | |
| 761 for i, instance_group in enumerate(instance_groups): | |
| 762 ig_dict = self._build_instance_group_args(instance_group) | |
| 763 for key, value in six.iteritems(ig_dict): | |
| 764 params['InstanceGroups.member.%d.%s' % (i+1, key)] = value | |
| 765 return params |
