0
|
1 #!/usr/bin/env python
|
|
2
|
|
3 # Copyright (C) 2011-2014 CRS4.
|
|
4 #
|
|
5 # This file is part of Seal.
|
|
6 #
|
|
7 # Seal is free software: you can redistribute it and/or modify it
|
|
8 # under the terms of the GNU General Public License as published by the Free
|
|
9 # Software Foundation, either version 3 of the License, or (at your option)
|
|
10 # any later version.
|
|
11 #
|
|
12 # Seal is distributed in the hope that it will be useful, but
|
|
13 # WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY
|
|
14 # or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License
|
|
15 # for more details.
|
|
16 #
|
|
17 # You should have received a copy of the GNU General Public License along
|
|
18 # with Seal. If not, see <http://www.gnu.org/licenses/>.
|
|
19
|
|
20
|
|
21
|
|
22 """
|
|
23 Calls the Seal RecabTable tool. Then, it calls recab_table_fetch to
|
|
24 concatenate all the partial tables and create a single csv file.
|
|
25 """
|
|
26
|
|
27
|
|
28 # parameters:
|
|
29 # INPUT_DATA
|
|
30 # OUTPUT
|
|
31 # VCF
|
|
32 # NUM_REDUCERS
|
|
33 # [OTHER]
|
|
34
|
|
35 import os
|
|
36 import sys
|
|
37
|
|
38 import hadoop_galaxy.pathset as pathset
|
|
39 import subprocess
|
|
40 import tempfile
|
|
41 import pydoop.hdfs as phdfs
|
|
42
|
|
43 # XXX: add --append-python-path to the possible arguments?
|
|
44
|
|
45 def usage_error(msg=None):
|
|
46 if msg:
|
|
47 print >> sys.stderr, msg
|
|
48 print >> sys.stderr, os.path.basename(sys.argv[0]), "INPUT_DATA OUTPUT VCF NUM_REDUCERS [OTHER]"
|
|
49 sys.exit(1)
|
|
50
|
|
51
|
|
52 def run_recab(input_path, output_path, vcf, num_red, other_args):
|
|
53 mydir = os.path.abspath(os.path.dirname(__file__))
|
|
54 cmd = [
|
|
55 'hadoop_galaxy',
|
|
56 '--input', input_path,
|
|
57 '--output', output_path,
|
|
58 '--executable', 'seal',
|
|
59 'recab_table',
|
|
60 '--vcf-file', vcf,
|
|
61 '--num-reducers', num_red
|
|
62 ]
|
|
63
|
|
64 if other_args:
|
|
65 cmd.extend(other_args)
|
|
66
|
|
67 # now execute the hadoop job
|
|
68 subprocess.check_call(cmd)
|
|
69
|
|
70 def collect_table(pset, output_path):
|
|
71 # finally, fetch the result into the final output file
|
|
72 cmd = ['seal', 'recab_table_fetch']
|
|
73 cmd.extend(pset.get_paths())
|
|
74 cmd.append(output_path)
|
|
75 try:
|
|
76 # remove the file that galaxy creates. recab_table_fetch refuses to
|
|
77 # overwrite it
|
|
78 os.unlink(output_path)
|
|
79 except IOError:
|
|
80 pass
|
|
81 subprocess.check_call(cmd)
|
|
82
|
|
83 def cleanup(out_pathset):
|
|
84 # clean-up job output
|
|
85 for path in out_pathset:
|
|
86 try:
|
|
87 print >> sys.stderr, "Deleting output path", path
|
|
88 phdfs.rmr(path)
|
|
89 except StandardError as e:
|
|
90 print >> sys.stderr, "Error!", str(e)
|
|
91
|
|
92 def main(args):
|
|
93 if len(args) < 5:
|
|
94 usage_error()
|
|
95
|
|
96 input_data = args[0]
|
|
97 final_output = args[1]
|
|
98 vcf = args[2]
|
|
99 num_reducers = args[3]
|
|
100 other = args[4:]
|
|
101
|
|
102 # Create a temporary pathset to reference the recab_table
|
|
103 # output directory
|
|
104 with tempfile.NamedTemporaryFile(mode='rwb') as tmp_pathset_file:
|
|
105 try:
|
|
106 run_recab(input_data, tmp_pathset_file.name, vcf, num_reducers, other)
|
|
107 tmp_pathset_file.seek(0)
|
|
108 out_paths = pathset.FilePathset.from_file(tmp_pathset_file)
|
|
109 collect_table(out_paths, final_output)
|
|
110 finally:
|
|
111 cleanup(out_paths)
|
|
112
|
|
113 if __name__ == "__main__":
|
|
114 main(sys.argv[1:])
|
|
115
|
|
116 # vim: et ai ts=2 sw=2
|