3
|
1 from galaxy.jobs import JobDestination
|
|
2 import os
|
|
3 import sys
|
|
4 import json
|
|
5 import cStringIO
|
|
6 import logging
|
|
7
|
|
8 log = logging.getLogger( __name__ )
|
|
9
|
|
10
|
|
11 def dump(obj, nested_level=0, output=sys.stdout):
|
|
12 spacing = ' '
|
|
13 if type(obj) == dict:
|
|
14 print >> output, '%s{' % ((nested_level) * spacing)
|
|
15 for k, v in obj.items():
|
|
16 if hasattr(v, '__iter__'):
|
|
17 print >> output, '%s%s:' % ((nested_level + 1) * spacing, k)
|
|
18 dump(v, nested_level + 1, output)
|
|
19 else:
|
|
20 print >> output, '%s%s: %s' % ((nested_level + 1) * spacing, k, v)
|
|
21 print >> output, '%s}' % (nested_level * spacing)
|
|
22 elif type(obj) == list:
|
|
23 print >> output, '%s[' % ((nested_level) * spacing)
|
|
24 for v in obj:
|
|
25 if hasattr(v, '__iter__'):
|
|
26 dump(v, nested_level + 1, output)
|
|
27 else:
|
|
28 print >> output, '%s%s' % ((nested_level + 1) * spacing, v)
|
|
29 print >> output, '%s]' % ((nested_level) * spacing)
|
|
30 else:
|
|
31 print >> output, '%s%s' % (nested_level * spacing, obj)
|
|
32
|
|
33
|
|
34 def dynamic_slurm_cluster_gatk(job, tool_id):
|
|
35 # Allocate extra time
|
|
36 inp_data = dict( [ ( da.name, da.dataset ) for da in job.input_datasets ] )
|
|
37 inp_data.update( [ ( da.name, da.dataset ) for da in job.input_library_datasets ] )
|
|
38 inp_data.update( [ ( da.name, json.loads(da.value) ) for da in job.parameters ] )
|
|
39 out = cStringIO.StringIO()
|
|
40 dump(inp_data, 1, out)
|
|
41 log.debug(out.getvalue())
|
|
42
|
|
43 nativeSpecs = '--nodes=1 --ntasks=1'
|
|
44
|
|
45 # runner doesn't allow to specify --cpus-per-task
|
|
46 # thus the mem calculation gets messy with more than 1 node
|
|
47 # --> translate nt ==> nodes, nct ==> ntasks
|
|
48
|
|
49 if 'cond_threads' not in inp_data:
|
|
50 return JobDestination(runner="slurm")
|
|
51
|
|
52 if inp_data['cond_threads']['cond_threads_enabled'] == "True":
|
|
53 nNodes = int(inp_data['cond_threads']['nt'])
|
|
54 nCPU = int(inp_data['cond_threads']['nct'])
|
|
55 nMEM = int(inp_data['cond_threads']['mem'])
|
|
56 if nMEM > 0:
|
|
57 nativeSpecs = '--nodes=%d --ntasks=%d --mem=%d' % (nNodes, nCPU*nNodes, nMEM)
|
|
58 else:
|
|
59 nativeSpecs = '--nodes=%d --ntasks=%d' % (nNodes, nCPU*nNodes)
|
|
60
|
|
61 return JobDestination(runner="slurm", params={"nativeSpecification": nativeSpecs})
|
|
62
|