comparison env/lib/python3.9/site-packages/bioblend/cloudman/launch.py @ 0:4f3585e2f14b draft default tip

"planemo upload commit 60cee0fc7c0cda8592644e1aad72851dec82c959"
author shellac
date Mon, 22 Mar 2021 18:12:50 +0000
parents
children
comparison
equal deleted inserted replaced
-1:000000000000 0:4f3585e2f14b
1 """
2 Setup and launch a CloudMan instance.
3 """
4 import datetime
5 import socket
6 from http.client import HTTPConnection
7 from urllib.parse import urlparse
8
9 import boto
10 import yaml
11 from boto.compat import http_client
12 from boto.ec2.regioninfo import RegionInfo
13 from boto.exception import EC2ResponseError, S3ResponseError
14 from boto.s3.connection import OrdinaryCallingFormat, S3Connection, SubdomainCallingFormat
15
16 import bioblend
17 from bioblend.util import Bunch
18
19
20 # Uncomment the following line if no logging from boto is desired
21 # bioblend.logging.getLogger('boto').setLevel(bioblend.logging.CRITICAL)
22 # Uncomment the following line if logging at the prompt is desired
23 # bioblend.set_stream_logger(__name__)
24 def instance_types(cloud_name='generic'):
25 """
26 Return a list of dictionaries containing details about the available
27 instance types for the given `cloud_name`.
28
29 :type cloud_name: str
30 :param cloud_name: A name of the cloud for which the list of instance
31 types will be returned. Valid values are: `aws`,
32 `nectar`, `generic`.
33
34 :rtype: list
35 :return: A list of dictionaries describing instance types. Each dict will
36 contain the following keys: `name`, `model`, and `description`.
37 """
38 instance_list = []
39 if cloud_name.lower() == 'aws':
40 instance_list.append({"model": "c3.large",
41 "name": "Compute optimized Large",
42 "description": "2 vCPU/4GB RAM"})
43 instance_list.append({"model": "c3.2xlarge",
44 "name": "Compute optimized 2xLarge",
45 "description": "8 vCPU/15GB RAM"})
46 instance_list.append({"model": "c3.8xlarge",
47 "name": "Compute optimized 8xLarge",
48 "description": "32 vCPU/60GB RAM"})
49 elif cloud_name.lower() in ['nectar', 'generic']:
50 instance_list.append({"model": "m1.small",
51 "name": "Small",
52 "description": "1 vCPU / 4GB RAM"})
53 instance_list.append({"model": "m1.medium",
54 "name": "Medium",
55 "description": "2 vCPU / 8GB RAM"})
56 instance_list.append({"model": "m1.large",
57 "name": "Large",
58 "description": "4 vCPU / 16GB RAM"})
59 instance_list.append({"model": "m1.xlarge",
60 "name": "Extra Large",
61 "description": "8 vCPU / 32GB RAM"})
62 instance_list.append({"model": "m1.xxlarge",
63 "name": "Extra-extra Large",
64 "description": "16 vCPU / 64GB RAM"})
65 return instance_list
66
67
68 class CloudManLauncher:
69
70 def __init__(self, access_key, secret_key, cloud=None):
71 """
72 Define the environment in which this instance of CloudMan will be launched.
73
74 Besides providing the credentials, optionally provide the ``cloud``
75 object. This object must define the properties required to establish a
76 `boto <https://github.com/boto/boto/>`_ connection to that cloud. See
77 this method's implementation for an example of the required fields.
78 Note that as long the as provided object defines the required fields,
79 it can really by implemented as anything (e.g., a Bunch, a database
80 object, a custom class). If no value for the ``cloud`` argument is
81 provided, the default is to use the Amazon cloud.
82 """
83 self.access_key = access_key
84 self.secret_key = secret_key
85 if cloud is None:
86 # Default to an EC2-compatible object
87 self.cloud = Bunch(id='1', # for compatibility w/ DB representation
88 name="Amazon",
89 cloud_type="ec2",
90 bucket_default="cloudman",
91 region_name="us-east-1",
92 region_endpoint="ec2.amazonaws.com",
93 ec2_port="",
94 ec2_conn_path="/",
95 cidr_range="",
96 is_secure=True,
97 s3_host="s3.amazonaws.com",
98 s3_port="",
99 s3_conn_path='/')
100 else:
101 self.cloud = cloud
102 self.ec2_conn = self.connect_ec2(
103 self.access_key,
104 self.secret_key,
105 self.cloud)
106 self.vpc_conn = self.connect_vpc(
107 self.access_key,
108 self.secret_key,
109 self.cloud)
110 # Define exceptions from http_client that we want to catch and retry
111 self.http_exceptions = (http_client.HTTPException, socket.error,
112 socket.gaierror, http_client.BadStatusLine)
113
114 def __repr__(self):
115 return "Cloud: {}; acct ID: {}".format(
116 self.cloud.name, self.access_key)
117
118 def launch(self, cluster_name, image_id, instance_type, password,
119 kernel_id=None, ramdisk_id=None, key_name='cloudman_key_pair',
120 security_groups=None, placement='', subnet_id=None,
121 ebs_optimized=False, **kwargs):
122 """
123 Check all the prerequisites (key pair and security groups) for
124 launching a CloudMan instance, compose the user data based on the
125 parameters specified in the arguments and the cloud properties as
126 defined in the object's ``cloud`` field.
127
128 For the current list of user data fields that can be provided via
129 ``kwargs``, see `<https://galaxyproject.org/cloudman/userdata/>`_
130
131 Return a dict containing the properties and info with which an instance
132 was launched, namely: ``sg_names`` containing the names of the security
133 groups, ``kp_name`` containing the name of the key pair, ``kp_material``
134 containing the private portion of the key pair (*note* that this portion
135 of the key is available and can be retrieved *only* at the time the key
136 is created, which will happen only if no key with the name provided in
137 the ``key_name`` argument exists), ``rs`` containing the
138 `boto <https://github.com/boto/boto/>`_ ``ResultSet`` object,
139 ``instance_id`` containing the ID of a started instance, and
140 ``error`` containing an error message if there was one.
141 """
142 if security_groups is None:
143 security_groups = ['CloudMan']
144 ret = {'sg_names': [],
145 'sg_ids': [],
146 'kp_name': '',
147 'kp_material': '',
148 'rs': None,
149 'instance_id': '',
150 'error': None}
151 # First satisfy the prerequisites
152 for sg in security_groups:
153 # Get VPC ID in case we're launching into a VPC
154 vpc_id = None
155 if subnet_id:
156 try:
157 sn = self.vpc_conn.get_all_subnets(subnet_id)[0]
158 vpc_id = sn.vpc_id
159 except (EC2ResponseError, IndexError):
160 bioblend.log.exception("Trouble fetching subnet %s", subnet_id)
161 cmsg = self.create_cm_security_group(sg, vpc_id=vpc_id)
162 ret['error'] = cmsg['error']
163 if ret['error']:
164 return ret
165 if cmsg['name']:
166 ret['sg_names'].append(cmsg['name'])
167 ret['sg_ids'].append(cmsg['sg_id'])
168 if subnet_id:
169 # Must setup a network interface if launching into VPC
170 security_groups = None
171 interface = boto.ec2.networkinterface.NetworkInterfaceSpecification(
172 subnet_id=subnet_id, groups=[cmsg['sg_id']],
173 associate_public_ip_address=True)
174 network_interfaces = (boto.ec2.networkinterface.
175 NetworkInterfaceCollection(interface))
176 else:
177 network_interfaces = None
178 kp_info = self.create_key_pair(key_name)
179 ret['kp_name'] = kp_info['name']
180 ret['kp_material'] = kp_info['material']
181 ret['error'] = kp_info['error']
182 if ret['error']:
183 return ret
184 # If not provided, try to find a placement
185 # TODO: Should placement always be checked? To make sure it's correct
186 # for existing clusters.
187 if not placement:
188 placement = self._find_placement(
189 cluster_name).get('placement', None)
190 # Compose user data for launching an instance, ensuring we have the
191 # required fields
192 kwargs['access_key'] = self.access_key
193 kwargs['secret_key'] = self.secret_key
194 kwargs['cluster_name'] = cluster_name
195 kwargs['password'] = password
196 kwargs['cloud_name'] = self.cloud.name
197 ud = self._compose_user_data(kwargs)
198 # Now launch an instance
199 try:
200
201 rs = None
202 rs = self.ec2_conn.run_instances(image_id=image_id,
203 instance_type=instance_type,
204 key_name=key_name,
205 security_groups=security_groups,
206 # The following two arguments are
207 # provided in the network_interface
208 # instead of arguments:
209 # security_group_ids=security_group_ids,
210 # subnet_id=subnet_id,
211 network_interfaces=network_interfaces,
212 user_data=ud,
213 kernel_id=kernel_id,
214 ramdisk_id=ramdisk_id,
215 placement=placement,
216 ebs_optimized=ebs_optimized)
217 ret['rs'] = rs
218 except EC2ResponseError as e:
219 err_msg = "Problem launching an instance: {} (code {}; status {})" \
220 .format(str(e), e.error_code, e.status)
221 bioblend.log.exception(err_msg)
222 ret['error'] = err_msg
223 return ret
224 else:
225 if rs:
226 try:
227 bioblend.log.info("Launched an instance with ID %s", rs.instances[0].id)
228 ret['instance_id'] = rs.instances[0].id
229 ret['instance_ip'] = rs.instances[0].ip_address
230 except EC2ResponseError as e:
231 err_msg = "Problem with the launched instance object: {} " \
232 "(code {}; status {})" \
233 .format(str(e), e.error_code, e.status)
234 bioblend.log.exception(err_msg)
235 ret['error'] = err_msg
236 else:
237 ret['error'] = ("No response after launching an instance. Check "
238 "your account permissions and try again.")
239 return ret
240
241 def create_cm_security_group(self, sg_name='CloudMan', vpc_id=None):
242 """
243 Create a security group with all authorizations required to run CloudMan.
244
245 If the group already exists, check its rules and add the missing ones.
246
247 :type sg_name: str
248 :param sg_name: A name for the security group to be created.
249
250 :type vpc_id: str
251 :param vpc_id: VPC ID under which to create the security group.
252
253 :rtype: dict
254 :return: A dictionary containing keys ``name`` (with the value being the
255 name of the security group that was created), ``error``
256 (with the value being the error message if there was an error
257 or ``None`` if no error was encountered), and ``ports``
258 (containing the list of tuples with port ranges that were
259 opened or attempted to be opened).
260
261 .. versionchanged:: 0.6.1
262 The return value changed from a string to a dict
263 """
264 ports = (('20', '21'), # FTP
265 ('22', '22'), # SSH
266 ('80', '80'), # Web UI
267 ('443', '443'), # SSL Web UI
268 ('8800', '8800'), # NodeJS Proxy for Galaxy IPython IE
269 ('9600', '9700'), # HTCondor
270 ('30000', '30100')) # FTP transfer
271 progress = {'name': None,
272 'sg_id': None,
273 'error': None,
274 'ports': ports}
275 cmsg = None
276 filters = None
277 if vpc_id:
278 filters = {'vpc-id': vpc_id}
279 # Check if this security group already exists
280 try:
281 sgs = self.ec2_conn.get_all_security_groups(filters=filters)
282 except EC2ResponseError as e:
283 err_msg = ("Problem getting security groups. This could indicate a "
284 "problem with your account credentials or permissions: "
285 "{} (code {}; status {})"
286 .format(str(e), e.error_code, e.status))
287 bioblend.log.exception(err_msg)
288 progress['error'] = err_msg
289 return progress
290 for sg in sgs:
291 if sg.name == sg_name:
292 cmsg = sg
293 bioblend.log.debug("Security group '%s' already exists; will add authorizations next.", sg_name)
294 break
295 # If it does not exist, create security group
296 if cmsg is None:
297 bioblend.log.debug("Creating Security Group %s", sg_name)
298 try:
299 cmsg = self.ec2_conn.create_security_group(sg_name, 'A security '
300 'group for CloudMan',
301 vpc_id=vpc_id)
302 except EC2ResponseError as e:
303 err_msg = "Problem creating security group '{}': {} (code {}; " \
304 "status {})" \
305 .format(sg_name, str(e), e.error_code, e.status)
306 bioblend.log.exception(err_msg)
307 progress['error'] = err_msg
308 if cmsg:
309 progress['name'] = cmsg.name
310 progress['sg_id'] = cmsg.id
311 # Add appropriate authorization rules
312 # If these rules already exist, nothing will be changed in the SG
313 for port in ports:
314 try:
315 if not self.rule_exists(
316 cmsg.rules, from_port=port[0], to_port=port[1]):
317 cmsg.authorize(
318 ip_protocol='tcp',
319 from_port=port[0],
320 to_port=port[1],
321 cidr_ip='0.0.0.0/0')
322 else:
323 bioblend.log.debug("Rule (%s:%s) already exists in the SG", port[0], port[1])
324 except EC2ResponseError as e:
325 err_msg = "A problem adding security group authorizations: {} " \
326 "(code {}; status {})" \
327 .format(str(e), e.error_code, e.status)
328 bioblend.log.exception(err_msg)
329 progress['error'] = err_msg
330 # Add ICMP (i.e., ping) rule required by HTCondor
331 try:
332 if not self.rule_exists(
333 cmsg.rules, from_port='-1', to_port='-1', ip_protocol='icmp'):
334 cmsg.authorize(
335 ip_protocol='icmp',
336 from_port=-1,
337 to_port=-1,
338 cidr_ip='0.0.0.0/0')
339 else:
340 bioblend.log.debug(
341 f"ICMP rule already exists in {sg_name} SG.")
342 except EC2ResponseError as e:
343 err_msg = "A problem with security ICMP rule authorization: {} " \
344 "(code {}; status {})" \
345 .format(str(e), e.error_code, e.status)
346 bioblend.log.exception(err_msg)
347 progress['err_msg'] = err_msg
348 # Add rule that allows communication between instances in the same
349 # SG
350 # A flag to indicate if group rule already exists
351 g_rule_exists = False
352 for rule in cmsg.rules:
353 for grant in rule.grants:
354 if grant.name == cmsg.name:
355 g_rule_exists = True
356 bioblend.log.debug(
357 "Group rule already exists in the SG.")
358 if g_rule_exists:
359 break
360 if not g_rule_exists:
361 try:
362 cmsg.authorize(
363 src_group=cmsg,
364 ip_protocol='tcp',
365 from_port=0,
366 to_port=65535)
367 except EC2ResponseError as e:
368 err_msg = "A problem with security group group " \
369 "authorization: {} (code {}; status {})" \
370 .format(str(e), e.error_code, e.status)
371 bioblend.log.exception(err_msg)
372 progress['err_msg'] = err_msg
373 bioblend.log.info("Done configuring '%s' security group", cmsg.name)
374 else:
375 bioblend.log.warning(
376 f"Did not create security group '{sg_name}'")
377 return progress
378
379 def rule_exists(
380 self, rules, from_port, to_port, ip_protocol='tcp', cidr_ip='0.0.0.0/0'):
381 """
382 A convenience method to check if an authorization rule in a security group
383 already exists.
384 """
385 for rule in rules:
386 if rule.ip_protocol == ip_protocol and rule.from_port == from_port and \
387 rule.to_port == to_port and cidr_ip in [ip.cidr_ip for ip in rule.grants]:
388 return True
389 return False
390
391 def create_key_pair(self, key_name='cloudman_key_pair'):
392 """
393 If a key pair with the provided ``key_name`` does not exist, create it.
394
395 :type sg_name: str
396 :param sg_name: A name for the key pair to be created.
397
398 :rtype: dict
399 :return: A dictionary containing keys ``name`` (with the value being the
400 name of the key pair that was created), ``error``
401 (with the value being the error message if there was an error
402 or ``None`` if no error was encountered), and ``material``
403 (containing the unencrypted PEM encoded RSA private key if the
404 key was created or ``None`` if the key already eixsted).
405
406 .. versionchanged:: 0.6.1
407 The return value changed from a tuple to a dict
408 """
409 progress = {'name': None,
410 'material': None,
411 'error': None}
412 kp = None
413 # Check if a key pair under the given name already exists. If it does not,
414 # create it, else return.
415 try:
416 kps = self.ec2_conn.get_all_key_pairs()
417 except EC2ResponseError as e:
418 err_msg = "Problem getting key pairs: {} (code {}; status {})" \
419 .format(str(e), e.error_code, e.status)
420 bioblend.log.exception(err_msg)
421 progress['error'] = err_msg
422 return progress
423 for akp in kps:
424 if akp.name == key_name:
425 bioblend.log.info("Key pair '%s' already exists; reusing it.", key_name)
426 progress['name'] = akp.name
427 return progress
428 try:
429 kp = self.ec2_conn.create_key_pair(key_name)
430 except EC2ResponseError as e:
431 err_msg = "Problem creating key pair '{}': {} (code {}; status {})" \
432 .format(key_name, str(e), e.error_code, e.status)
433 bioblend.log.exception(err_msg)
434 progress['error'] = err_msg
435 return progress
436 bioblend.log.info("Created key pair '%s'", kp.name)
437 progress['name'] = kp.name
438 progress['material'] = kp.material
439 return progress
440
441 def assign_floating_ip(self, ec2_conn, instance):
442 try:
443 bioblend.log.debug("Allocating a new floating IP address.")
444 address = ec2_conn.allocate_address()
445 except EC2ResponseError:
446 bioblend.log.exception("Exception allocating a new floating IP address")
447 bioblend.log.info("Associating floating IP %s to instance %s", address.public_ip, instance.id)
448 ec2_conn.associate_address(instance_id=instance.id,
449 public_ip=address.public_ip)
450
451 def get_status(self, instance_id):
452 """
453 Check on the status of an instance. ``instance_id`` needs to be a
454 ``boto``-library copatible instance ID (e.g., ``i-8fehrdss``).If
455 ``instance_id`` is not provided, the ID obtained when launching
456 *the most recent* instance is used. Note that this assumes the instance
457 being checked on was launched using this class. Also note that the same
458 class may be used to launch multiple instances but only the most recent
459 ``instance_id`` is kept while any others will to be explicitly specified.
460
461 This method also allows the required ``ec2_conn`` connection object to be
462 provided at invocation time. If the object is not provided, credentials
463 defined for the class are used (ability to specify a custom ``ec2_conn``
464 helps in case of stateless method invocations).
465
466 Return a ``state`` dict containing the following keys: ``instance_state``,
467 ``public_ip``, ``placement``, and ``error``, which capture CloudMan's
468 current state. For ``instance_state``, expected values are: ``pending``,
469 ``booting``, ``running``, or ``error`` and represent the state of the
470 underlying instance. Other keys will return an empty value until the
471 ``instance_state`` enters ``running`` state.
472 """
473 ec2_conn = self.ec2_conn
474 rs = None
475 state = {'instance_state': "",
476 'public_ip': "",
477 'placement': "",
478 'error': ""}
479
480 # Make sure we have an instance ID
481 if instance_id is None:
482 err = "Missing instance ID, cannot check the state."
483 bioblend.log.error(err)
484 state['error'] = err
485 return state
486 try:
487 rs = ec2_conn.get_all_instances([instance_id])
488 if rs is not None:
489 inst_state = rs[0].instances[0].update()
490 public_ip = rs[0].instances[0].ip_address
491 state['public_ip'] = public_ip
492 if inst_state == 'running':
493 # if there's a private ip, but no public ip
494 # attempt auto allocation of floating IP
495 if rs[0].instances[0].private_ip_address and not public_ip:
496 self.assign_floating_ip(ec2_conn, rs[0].instances[0])
497 cm_url = f"http://{public_ip}/cloud"
498 # Wait until the CloudMan URL is accessible to return the
499 # data
500 if self._checkURL(cm_url) is True:
501 state['instance_state'] = inst_state
502 state['placement'] = rs[0].instances[0].placement
503 else:
504 state['instance_state'] = 'booting'
505 else:
506 state['instance_state'] = inst_state
507 except Exception as e:
508 err = f"Problem updating instance '{instance_id}' state: {e}"
509 bioblend.log.error(err)
510 state['error'] = err
511 return state
512
513 def get_clusters_pd(self, include_placement=True):
514 """
515 Return *persistent data* of all existing clusters for this account.
516
517 :type include_placement: bool
518 :param include_placement: Whether or not to include region placement for
519 the clusters. Setting this option will lead
520 to a longer function runtime.
521
522 :rtype: dict
523 :return: A dictionary containing keys ``clusters`` and ``error``. The
524 value of ``clusters`` will be a dictionary with the following keys
525 ``cluster_name``, ``persistent_data``, ``bucket_name`` and optionally
526 ``placement`` or an empty list if no clusters were found or an
527 error was encountered. ``persistent_data`` key value is yet
528 another dictionary containing given cluster's persistent data.
529 The value for the ``error`` key will contain a string with the
530 error message.
531
532 .. versionadded:: 0.3
533 .. versionchanged:: 0.7.0
534 The return value changed from a list to a dictionary.
535 """
536 clusters = []
537 response = {'clusters': clusters, 'error': None}
538 s3_conn = self.connect_s3(self.access_key, self.secret_key, self.cloud)
539 try:
540 buckets = s3_conn.get_all_buckets()
541 except S3ResponseError as e:
542 response['error'] = "S3ResponseError getting buckets: %s" % e
543 except self.http_exceptions as ex:
544 response['error'] = "Exception getting buckets: %s" % ex
545 if response['error']:
546 bioblend.log.exception(response['error'])
547 return response
548 for bucket in [b for b in buckets if b.name.startswith('cm-')]:
549 try:
550 # TODO: first lookup if persistent_data.yaml key exists
551 pd = bucket.get_key('persistent_data.yaml')
552 except S3ResponseError:
553 # This can fail for a number of reasons for non-us and/or
554 # CNAME'd buckets but it is not a terminal error
555 bioblend.log.warning("Problem fetching persistent_data.yaml from bucket %s", bucket)
556 continue
557 if pd:
558 # We are dealing with a CloudMan bucket
559 pd_contents = pd.get_contents_as_string()
560 pd = yaml.load(pd_contents)
561 if 'cluster_name' in pd:
562 cluster_name = pd['cluster_name']
563 else:
564 for key in bucket.list():
565 if key.name.endswith('.clusterName'):
566 cluster_name = key.name.split('.clusterName')[0]
567 cluster = {'cluster_name': cluster_name,
568 'persistent_data': pd,
569 'bucket_name': bucket.name}
570 # Look for cluster's placement too
571 if include_placement:
572 placement = self._find_placement(cluster_name, cluster)
573 cluster['placement'] = placement
574 clusters.append(cluster)
575 response['clusters'] = clusters
576 return response
577
578 def get_cluster_pd(self, cluster_name):
579 """
580 Return *persistent data* (as a dict) associated with a cluster with the
581 given ``cluster_name``. If a cluster with the given name is not found,
582 return an empty dict.
583
584 .. versionadded:: 0.3
585 """
586 cluster = {}
587 clusters = self.get_clusters_pd().get('clusters', [])
588 for c in clusters:
589 if c['cluster_name'] == cluster_name:
590 cluster = c
591 break
592 return cluster
593
594 def connect_ec2(self, a_key, s_key, cloud=None):
595 """
596 Create and return an EC2-compatible connection object for the given cloud.
597
598 See ``_get_cloud_info`` method for more details on the requirements for
599 the ``cloud`` parameter. If no value is provided, the class field is used.
600 """
601 if cloud is None:
602 cloud = self.cloud
603 ci = self._get_cloud_info(cloud)
604 r = RegionInfo(name=ci['region_name'], endpoint=ci['region_endpoint'])
605 ec2_conn = boto.connect_ec2(aws_access_key_id=a_key,
606 aws_secret_access_key=s_key,
607 is_secure=ci['is_secure'],
608 region=r,
609 port=ci['ec2_port'],
610 path=ci['ec2_conn_path'],
611 validate_certs=False)
612 return ec2_conn
613
614 def connect_s3(self, a_key, s_key, cloud=None):
615 """
616 Create and return an S3-compatible connection object for the given cloud.
617
618 See ``_get_cloud_info`` method for more details on the requirements for
619 the ``cloud`` parameter. If no value is provided, the class field is used.
620 """
621 if cloud is None:
622 cloud = self.cloud
623 ci = self._get_cloud_info(cloud)
624 if ci['cloud_type'] == 'amazon':
625 calling_format = SubdomainCallingFormat()
626 else:
627 calling_format = OrdinaryCallingFormat()
628 s3_conn = S3Connection(
629 aws_access_key_id=a_key, aws_secret_access_key=s_key,
630 is_secure=ci['is_secure'], port=ci['s3_port'], host=ci['s3_host'],
631 path=ci['s3_conn_path'], calling_format=calling_format)
632 return s3_conn
633
634 def connect_vpc(self, a_key, s_key, cloud=None):
635 """
636 Establish a connection to the VPC service.
637
638 TODO: Make this work with non-default clouds as well.
639 """
640 if cloud is None:
641 cloud = self.cloud
642 ci = self._get_cloud_info(cloud)
643 r = RegionInfo(name=ci['region_name'], endpoint=ci['region_endpoint'])
644 vpc_conn = boto.connect_vpc(
645 aws_access_key_id=a_key,
646 aws_secret_access_key=s_key,
647 is_secure=ci['is_secure'],
648 region=r,
649 port=ci['ec2_port'],
650 path=ci['ec2_conn_path'],
651 validate_certs=False)
652 return vpc_conn
653
654 def _compose_user_data(self, user_provided_data):
655 """
656 A convenience method used to compose and properly format the user data
657 required when requesting an instance.
658
659 ``user_provided_data`` is the data provided by a user required to identify
660 a cluster and user other user requirements.
661 """
662 form_data = {}
663 # Do not include the following fields in the user data but do include
664 # any 'advanced startup fields' that might be added in the future
665 excluded_fields = ['sg_name', 'image_id', 'instance_id', 'kp_name',
666 'cloud', 'cloud_type', 'public_dns', 'cidr_range',
667 'kp_material', 'placement', 'flavor_id']
668 for key, value in user_provided_data.items():
669 if key not in excluded_fields:
670 form_data[key] = value
671 # If the following user data keys are empty, do not include them in the
672 # request user data
673 udkeys = [
674 'post_start_script_url',
675 'worker_post_start_script_url',
676 'bucket_default',
677 'share_string']
678 for udkey in udkeys:
679 if udkey in form_data and form_data[udkey] == '':
680 del form_data[udkey]
681 # If bucket_default was not provided, add a default value to the user data
682 # (missing value does not play nicely with CloudMan's ec2autorun.py)
683 if not form_data.get(
684 'bucket_default', None) and self.cloud.bucket_default:
685 form_data['bucket_default'] = self.cloud.bucket_default
686 # Reuse the ``password`` for the ``freenxpass`` user data option
687 if 'freenxpass' not in form_data and 'password' in form_data:
688 form_data['freenxpass'] = form_data['password']
689 # Convert form_data into the YAML format
690 ud = yaml.dump(form_data, default_flow_style=False, allow_unicode=False)
691 # Also include connection info about the selected cloud
692 ci = self._get_cloud_info(self.cloud, as_str=True)
693 return ud + "\n" + ci
694
695 def _get_cloud_info(self, cloud, as_str=False):
696 """
697 Get connection information about a given cloud
698 """
699 ci = {}
700 ci['cloud_type'] = cloud.cloud_type
701 ci['region_name'] = cloud.region_name
702 ci['region_endpoint'] = cloud.region_endpoint
703 ci['is_secure'] = cloud.is_secure
704 ci['ec2_port'] = cloud.ec2_port if cloud.ec2_port != '' else None
705 ci['ec2_conn_path'] = cloud.ec2_conn_path
706 # Include cidr_range only if not empty
707 if cloud.cidr_range != '':
708 ci['cidr_range'] = cloud.cidr_range
709 ci['s3_host'] = cloud.s3_host
710 ci['s3_port'] = cloud.s3_port if cloud.s3_port != '' else None
711 ci['s3_conn_path'] = cloud.s3_conn_path
712 if as_str:
713 ci = yaml.dump(ci, default_flow_style=False, allow_unicode=False)
714 return ci
715
716 def _get_volume_placement(self, vol_id):
717 """
718 Returns the placement of a volume (or None, if it cannot be determined)
719 """
720 try:
721 vol = self.ec2_conn.get_all_volumes(volume_ids=[vol_id])
722 except EC2ResponseError as ec2e:
723 bioblend.log.error("EC2ResponseError querying for volume {}: {}"
724 .format(vol_id, ec2e))
725 vol = None
726 if vol:
727 return vol[0].zone
728 else:
729 bioblend.log.error("Requested placement of a volume '%s' that does not exist.", vol_id)
730 return None
731
732 def _find_placement(self, cluster_name, cluster=None):
733 """
734 Find a placement zone for a cluster with the name ``cluster_name``.
735
736 By default, this method will search for and fetch given cluster's
737 *persistent data*; alternatively, *persistent data* can be provided via
738 the ``cluster`` parameter. This dict needs to have ``persistent_data``
739 key with the contents of cluster's *persistent data*.
740 If the cluster or the volume associated with the cluster cannot be found,
741 cluster placement is set to ``None``.
742
743 :rtype: dict
744 :return: A dictionary with ``placement`` and ``error`` keywords.
745
746 .. versionchanged:: 0.7.0
747 The return value changed from a list to a dictionary.
748 """
749 placement = None
750 response = {'placement': placement, 'error': None}
751 cluster = cluster or self.get_cluster_pd(cluster_name)
752 if cluster and 'persistent_data' in cluster:
753 pd = cluster['persistent_data']
754 try:
755 if 'placement' in pd:
756 response['placement'] = pd['placement']
757 elif 'data_filesystems' in pd:
758 # We have v1 format persistent data so get the volume first and
759 # then the placement zone
760 vol_id = pd['data_filesystems']['galaxyData'][0]['vol_id']
761 response['placement'] = self._get_volume_placement(vol_id)
762 elif 'filesystems' in pd:
763 # V2 format.
764 for fs in [fs for fs in pd['filesystems'] if fs.get(
765 'kind', None) == 'volume' and 'ids' in fs]:
766 # All volumes must be in the same zone
767 vol_id = fs['ids'][0]
768 response['placement'] = self._get_volume_placement(
769 vol_id)
770 # No need to continue to iterate through
771 # filesystems, if we found one with a volume.
772 break
773 except Exception as exc:
774 response['error'] = ("Exception while finding placement for "
775 "cluster '{}'. This can indicate malformed "
776 "instance data. Or that this method is "
777 "broken: {}".format(cluster_name, exc))
778 bioblend.log.error(response['error'])
779 response['placement'] = None
780 else:
781 bioblend.log.debug("Insufficient info about cluster {} to get placement."
782 .format(cluster_name))
783 return response
784
785 def find_placements(
786 self, ec2_conn, instance_type, cloud_type, cluster_name=None):
787 """
788 Find a list of placement zones that support the specified instance type.
789
790 If ``cluster_name`` is given and a cluster with the given name exist,
791 return a list with only one entry where the given cluster lives.
792
793 Searching for available zones for a given instance type is done by
794 checking the spot prices in the potential availability zones for
795 support before deciding on a region:
796 http://blog.piefox.com/2011/07/ec2-availability-zones-and-instance.html
797
798 Note that, currently, instance-type based zone selection applies only to
799 AWS. For other clouds, all the available zones are returned (unless a
800 cluster is being recreated, in which case the cluster's placement zone is
801 returned sa stored in its persistent data.
802
803 :rtype: dict
804 :return: A dictionary with ``zones`` and ``error`` keywords.
805
806 .. versionchanged:: 0.3
807 Changed method name from ``_find_placements`` to ``find_placements``.
808 Also added ``cluster_name`` parameter.
809
810 .. versionchanged:: 0.7.0
811 The return value changed from a list to a dictionary.
812 """
813 # First look for a specific zone a given cluster is bound to
814 zones = []
815 response = {'zones': zones, 'error': None}
816 if cluster_name:
817 placement = self._find_placement(cluster_name)
818 if placement.get('error'):
819 response['error'] = placement['error']
820 return response
821 response['zones'] = placement.get('placement', [])
822 # If placement is not found, look for a list of available zones
823 if not response['zones']:
824 in_the_past = datetime.datetime.now() - datetime.timedelta(hours=1)
825 back_compatible_zone = "us-east-1e"
826 for zone in [
827 z for z in ec2_conn.get_all_zones() if z.state == 'available']:
828 # Non EC2 clouds may not support get_spot_price_history
829 if instance_type is None or cloud_type != 'ec2':
830 zones.append(zone.name)
831 elif ec2_conn.get_spot_price_history(instance_type=instance_type,
832 end_time=in_the_past.isoformat(),
833 availability_zone=zone.name):
834 zones.append(zone.name)
835 # Higher-lettered zones seem to have more availability currently
836 zones.sort(reverse=True)
837 if back_compatible_zone in zones:
838 zones = [back_compatible_zone] + \
839 [z for z in zones if z != back_compatible_zone]
840 if len(zones) == 0:
841 response['error'] = ("Did not find availabilty zone for {}"
842 .format(instance_type))
843 bioblend.log.error(response['error'])
844 zones.append(back_compatible_zone)
845 return response
846
847 def _checkURL(self, url):
848 """
849 Check if the ``url`` is *alive* (i.e., remote server returns code 200(OK)
850 or 401 (unauthorized)).
851 """
852 try:
853 p = urlparse(url)
854 h = HTTPConnection(p[1])
855 h.putrequest('HEAD', p[2])
856 h.endheaders()
857 r = h.getresponse()
858 # CloudMan UI is pwd protected so include 401
859 if r.status in (200, 401):
860 return True
861 except Exception:
862 # No response or no good response
863 pass
864 return False