Mercurial > repos > yufei-luo > s_mart
comparison commons/tools/LaunchBlasterInParallel.py @ 18:94ab73e8a190
Uploaded
author | m-zytnicki |
---|---|
date | Mon, 29 Apr 2013 03:20:15 -0400 |
parents | |
children |
comparison
equal
deleted
inserted
replaced
17:b0e8584489e6 | 18:94ab73e8a190 |
---|---|
1 #!/usr/bin/env python | |
2 | |
3 # Copyright INRA (Institut National de la Recherche Agronomique) | |
4 # http://www.inra.fr | |
5 # http://urgi.versailles.inra.fr | |
6 # | |
7 # This software is governed by the CeCILL license under French law and | |
8 # abiding by the rules of distribution of free software. You can use, | |
9 # modify and/ or redistribute the software under the terms of the CeCILL | |
10 # license as circulated by CEA, CNRS and INRIA at the following URL | |
11 # "http://www.cecill.info". | |
12 # | |
13 # As a counterpart to the access to the source code and rights to copy, | |
14 # modify and redistribute granted by the license, users are provided only | |
15 # with a limited warranty and the software's author, the holder of the | |
16 # economic rights, and the successive licensors have only limited | |
17 # liability. | |
18 # | |
19 # In this respect, the user's attention is drawn to the risks associated | |
20 # with loading, using, modifying and/or developing or reproducing the | |
21 # software by the user in light of its specific status of free software, | |
22 # that may mean that it is complicated to manipulate, and that also | |
23 # therefore means that it is reserved for developers and experienced | |
24 # professionals having in-depth computer knowledge. Users are therefore | |
25 # encouraged to load and test the software's suitability as regards their | |
26 # requirements in conditions enabling the security of their systems and/or | |
27 # data to be ensured and, more generally, to use and operate it in the | |
28 # same conditions as regards security. | |
29 # | |
30 # The fact that you are presently reading this means that you have had | |
31 # knowledge of the CeCILL license and that you accept its terms. | |
32 | |
33 import os | |
34 import shutil | |
35 from commons.core.LoggerFactory import LoggerFactory | |
36 from commons.core.sql.DbFactory import DbFactory | |
37 from commons.core.sql.TableJobAdaptatorFactory import TableJobAdaptatorFactory | |
38 from commons.core.launcher.Launcher import Launcher | |
39 from commons.core.utils.FileUtils import FileUtils | |
40 from commons.core.utils.RepetOptionParser import RepetOptionParser | |
41 from commons.core.checker.ConfigChecker import ConfigRules, ConfigChecker | |
42 from commons.tools.MergeMatchsFiles import MergeMatchsFiles | |
43 | |
44 LOG_DEPTH = "repet.tools" | |
45 | |
46 ##Launch BLASTER in parallel | |
47 # | |
48 class LaunchBlasterInParallel(object): | |
49 | |
50 def __init__(self, queryDirectory = "", subjectFilePath = "", outFileName = "", configFileName = "", groupId = "", queryPattern = ".*\.fa", \ | |
51 doAllByall = False, nbCPU = 1, eValue="1e-300", type = "ncbi", program="blastn", extraParams="", verbosity = 0): | |
52 self._queryDirectory = queryDirectory | |
53 self._queryPattern = queryPattern | |
54 self.setSubjectFilePath(subjectFilePath) | |
55 self._outFileName = outFileName | |
56 self._configFileName = configFileName | |
57 self.setGroupId(groupId) | |
58 self._doAllByall = doAllByall | |
59 self._blastType = type | |
60 self._program = program | |
61 self._extraParams = extraParams | |
62 self._nbCPU = nbCPU | |
63 self._jobSectionName = "jobs" | |
64 self._blasterSectionName = "alignment" | |
65 self._prepareDataSectionName = "prepare_data" | |
66 self._eValue = eValue | |
67 | |
68 self._doClean = None | |
69 self._verbosity = verbosity | |
70 self._log = LoggerFactory.createLogger("%s.%s" % (LOG_DEPTH, self.__class__.__name__), self._verbosity) | |
71 | |
72 def setAttributesFromCmdLine(self): | |
73 description = "Launch Blaster in parallel." | |
74 epilog = "\nExample 1: launch without verbosity and keep temporary files.\n" | |
75 epilog += "\t$ python LaunchBlasterInParallel.py -q query -o query.align -v 0" | |
76 epilog += "\n\t" | |
77 epilog += "\nExample 2: launch with verbosity to have errors (level 1) and basic information (level 2), and delete temporary files.\n" | |
78 epilog += "\t$ python LaunchBlasterInParallel.py -q query -o query.align -s nr.fa -c -v 2" | |
79 parser = RepetOptionParser(description = description, epilog = epilog) | |
80 parser.add_option("-q", "--query", dest = "query", action = "store", type = "string", help = "query fasta directory absolute path [compulsory]", default = "") | |
81 parser.add_option("-s", "--subject", dest = "subject", action = "store", type = "string", help = "subject fasta absolute path [compulsory] [format: fasta]", default = "") | |
82 parser.add_option("-o", "--out", dest = "outFileName", action = "store", type = "string", help = "output align file name [compulsory] [format: align]", default = "") | |
83 parser.add_option("-C", "--config", dest = "configFileName",action = "store", type = "string", help = "configuration file name [compulsory] [format: cfg]", default = "") | |
84 parser.add_option("-g", "--groupId", dest = "groupId", action = "store", type = "string", help = "jobs groupId [default: Blaster_<pid>]", default = "") | |
85 parser.add_option("-p", "--queryPattern",dest = "queryPattern", action = "store", type = "string", help = "query file pattern [default: .*\.fa]", default = ".*\.fa") | |
86 parser.add_option("-a", "--aba", dest = "doAllByall", action = "store_true", help = "all-by-all Blast [default: False]", default = False) | |
87 parser.add_option("-e", "--eValue", dest = "eValue", action = "store", type = "string", help = "Blast e-value [default: 1e300]", default = "1e-300") | |
88 parser.add_option("-t", "--type", dest = "type", action = "store", type = "string", help = "Blast type [ncbi, wu, blastplus] [default: ncbi]", default = "ncbi") | |
89 parser.add_option("-u", "--program", dest = "program", action = "store", type = "string", help = "Blast program type [blastn, blastx, blastx] [default: blastn]", default = "blastn") | |
90 parser.add_option("-x", "--extraParams",dest = "extraParams", action = "store", type = "string", help = "Additional blast program parameters[default: '']", default = "") | |
91 parser.add_option("-n", "--ncpu", dest = "cpu", action = "store", type = "int", help = "Number of CPUs to use [default: 1]", default = 1) | |
92 parser.add_option("-v", "--verbosity", dest = "verbosity", action = "store", type = "int", help = "verbosity [default: 1]", default = 1) | |
93 options = parser.parse_args()[0] | |
94 self._setAttributesFromOptions(options) | |
95 | |
96 def _setAttributesFromOptions(self, options): | |
97 self.setQueryDirectory(options.query) | |
98 self.setQueryPattern(options.queryPattern) | |
99 self.setSubjectFilePath(options.subject) | |
100 self.setOutFileName(options.outFileName) | |
101 self.setConfigFileName(options.configFileName) | |
102 self.setGroupId(options.groupId) | |
103 self.setDoAllByall(options.doAllByall) | |
104 self.setEValue(options.eValue) | |
105 self.setType(options.type) | |
106 self.setProgram(options.program) | |
107 self.setExtraParams(options.extraParams) | |
108 self.setCPU(options.cpu) | |
109 self.setVerbosity(options.verbosity) | |
110 | |
111 def setQueryDirectory(self, queryDirectory): | |
112 self._queryDirectory = queryDirectory | |
113 | |
114 def setQueryPattern(self, queryPattern): | |
115 self._queryPattern = queryPattern | |
116 | |
117 def setSubjectFilePath(self, subjectFilePath): | |
118 self._subjectFilePath = subjectFilePath | |
119 self._subjectFileName = os.path.basename(subjectFilePath) | |
120 | |
121 def setOutFileName(self, outFileName): | |
122 self._outFileName = outFileName | |
123 | |
124 def setConfigFileName(self, configFileName): | |
125 self._configFileName = configFileName | |
126 | |
127 def setGroupId(self, groupId): | |
128 if groupId == "": | |
129 self._groupId = "Blaster_%s" % os.getpid() | |
130 else: | |
131 self._groupId = groupId | |
132 | |
133 def setDoAllByall(self, doAllByall): | |
134 self._doAllByall = doAllByall | |
135 | |
136 def setType(self, blastType): | |
137 self._blastType = blastType | |
138 | |
139 def setProgram(self, program): | |
140 self._program = program | |
141 | |
142 def setExtraParams(self, extraParams): | |
143 self._extraParams = extraParams | |
144 | |
145 def setEValue(self, eValue): | |
146 self._eValue = eValue | |
147 | |
148 def setCPU(self, cpu): | |
149 self._nbCPU = cpu | |
150 | |
151 def setDoClean(self, doClean): | |
152 self._doClean = doClean | |
153 | |
154 def setVerbosity(self, verbosity): | |
155 self._verbosity = verbosity | |
156 | |
157 def _checkOptions(self): | |
158 if self._queryPattern == "": | |
159 self._logAndRaise("ERROR: Missing input fasta file name") | |
160 | |
161 def _logAndRaise(self, errorMsg): | |
162 self._log.error(errorMsg) | |
163 raise Exception(errorMsg) | |
164 | |
165 def _checkConfig(self): | |
166 iConfigRules = ConfigRules() | |
167 iConfigRules.addRuleSection(section=self._jobSectionName, mandatory=True) | |
168 iConfigRules.addRuleOption(section=self._jobSectionName, option ="resources", mandatory=True, type="string") | |
169 iConfigRules.addRuleOption(section=self._jobSectionName, option ="tmpDir", mandatory=True, type="string") | |
170 iConfigRules.addRuleOption(section=self._jobSectionName, option ="copy", mandatory=True, type="bool") | |
171 iConfigRules.addRuleOption(section=self._jobSectionName, option ="clean", mandatory=True, type="bool") | |
172 iConfigRules.addRuleOption(section=self._blasterSectionName, option ="blast", mandatory=True, type="string", set = ("ncbi", "blastplus", "wu")) | |
173 iConfigRules.addRuleOption(section=self._blasterSectionName, option ="Evalue", mandatory=True, type="string") | |
174 iConfigRules.addRuleOption(section=self._blasterSectionName, option ="length", mandatory=True, type="string") | |
175 iConfigRules.addRuleOption(section=self._blasterSectionName, option ="identity", mandatory=True, type="string") | |
176 | |
177 iConfigChecker = ConfigChecker(self._configFileName, iConfigRules) | |
178 self._iConfig = iConfigChecker.getConfig() | |
179 self._setAttributesFromConfig() | |
180 | |
181 def _setAttributesFromConfig(self): | |
182 self._chunkLength = self._iConfig.get(self._prepareDataSectionName, "chunk_length") | |
183 self._chunkOverlap = self._iConfig.get(self._prepareDataSectionName, "chunk_overlap") | |
184 self._resources = self._iConfig.get(self._jobSectionName, "resources") | |
185 self._tmpDir = self._iConfig.get(self._jobSectionName, "tmpDir") | |
186 self._isCopyOnNode = self._iConfig.get(self._jobSectionName, "copy") | |
187 self._doClean = self._iConfig.get(self._jobSectionName, "clean") | |
188 self._blastType = self._iConfig.get(self._blasterSectionName, "blast") | |
189 self._eValue = self._iConfig.get(self._blasterSectionName, "Evalue") | |
190 self._length = self._iConfig.get(self._blasterSectionName, "length") | |
191 self._identity = self._iConfig.get(self._blasterSectionName, "identity") | |
192 | |
193 if self._isCopyOnNode and not self._tmpDir: | |
194 self._isCopyOnNode = False | |
195 self._log.debug("The copy option is: %s." % self._isCopyOnNode) | |
196 | |
197 def _getLaunchBlasterCmd(self, iLauncher, file): | |
198 lArgs = [] | |
199 lArgs.append("-u %s" % self._program) | |
200 lArgs.append("-q %s" % file) | |
201 lArgs.append("-s %s" % self._subjectFileName) | |
202 if self._doAllByall: | |
203 lArgs.append("-a") | |
204 lArgs.append("-e %s" % self._eValue) | |
205 lArgs.append("-l %s" % self._length) | |
206 lArgs.append("-d %s" % self._identity) | |
207 lArgs.append("-t %s" % self._blastType) | |
208 lArgs.append("-x '%s'" % self._extraParams) | |
209 if self._doClean: | |
210 lArgs.append("-c") | |
211 lArgs.append("-v %i" % (self._verbosity - 1)) | |
212 return iLauncher.getSystemCommand("LaunchBlaster.py", lArgs) | |
213 | |
214 def _getRmvPairAlignInChunkOverlapsCmd(self, iLauncher, inFileName, outFileName): | |
215 lArgs = [] | |
216 lArgs.append("-i %s" % inFileName) | |
217 lArgs.append("-l %s" % self._chunkLength) | |
218 lArgs.append("-o %s" % self._chunkOverlap) | |
219 lArgs.append("-m 10") | |
220 lArgs.append("-O %s" % outFileName) | |
221 lArgs.append("-v %d" % (self._verbosity - 1)) | |
222 return iLauncher.getSystemCommand("RmvPairAlignInChunkOverlaps.py", lArgs) | |
223 | |
224 def run(self): | |
225 LoggerFactory.setLevel(self._log, self._verbosity) | |
226 self._checkConfig() | |
227 self._checkOptions() | |
228 self._log.info("START LaunchBlasterInParallel") | |
229 self._log.debug("Query file name: %s" % self._queryPattern) | |
230 self._log.debug("Subject file name: %s" % self._subjectFileName) | |
231 | |
232 cDir = os.getcwd() | |
233 if not self._tmpDir: | |
234 self._tmpDir = cDir | |
235 | |
236 acronym = "Blaster" | |
237 iDb = DbFactory.createInstance() | |
238 jobdb = TableJobAdaptatorFactory.createInstance(iDb, "jobs") | |
239 iLauncher = Launcher(jobdb, os.getcwd(), "", "", cDir, self._tmpDir, "jobs", self._resources, self._groupId, acronym, chooseTemplateWithCopy = self._isCopyOnNode) | |
240 | |
241 lCmdsTuples = [] | |
242 fileSize = float(os.path.getsize(self._subjectFilePath) + 5000000) / 1000000000 | |
243 | |
244 lCmdSize = [] | |
245 lCmdCopy = [] | |
246 if self._isCopyOnNode: | |
247 lCmdSize.append("fileSize = %f" % fileSize) | |
248 lCmdCopy.append("shutil.copy(\"%s\", \".\")" % self._subjectFilePath) | |
249 | |
250 lFiles = FileUtils.getFileNamesList(self._queryDirectory, self._queryPattern) | |
251 for file in lFiles: | |
252 lCmds = [] | |
253 lCmds.append(self._getLaunchBlasterCmd(iLauncher, file)) | |
254 lCmdStart = [] | |
255 if self._isCopyOnNode: | |
256 lCmdStart.append("os.symlink(\"../%s\", \"%s\")" % (self._subjectFileName, self._subjectFileName)) | |
257 lCmdStart.append("shutil.copy(\"%s/%s\", \".\")" % (self._queryDirectory, file)) | |
258 else: | |
259 lCmdStart.append("os.symlink(\"%s\", \"%s\")" % (self._subjectFilePath, self._subjectFileName)) | |
260 lCmdStart.append("os.symlink(\"%s/%s\", \"%s\")" % (self._queryDirectory, file, file)) | |
261 lCmdFinish = [] | |
262 lCmdFinish.append("if os.path.exists(\"%s.align\"):" % file) | |
263 lCmdFinish.append("\tshutil.move(\"%s.align\", \"%s/.\" )" % (file, cDir)) | |
264 lCmdFinish.append("shutil.move(\"%s.param\", \"%s/.\" )" % (file, cDir)) | |
265 lCmdsTuples.append(iLauncher.prepareCommands_withoutIndentation(lCmds, lCmdStart, lCmdFinish, lCmdSize, lCmdCopy)) | |
266 | |
267 iLauncher.runLauncherForMultipleJobs("Blaster", lCmdsTuples, self._doClean, self._isCopyOnNode) | |
268 | |
269 tmpFileName = "tmp_%s.align" % os.getpid() | |
270 iMMF = MergeMatchsFiles("align", "tmp_%s" % os.getpid(), allByAll = self._doAllByall, clean = self._doClean) | |
271 iMMF.run() | |
272 | |
273 if self._doAllByall: | |
274 iDb = DbFactory.createInstance() | |
275 jobdb = TableJobAdaptatorFactory.createInstance(iDb, "jobs") | |
276 iLauncher = Launcher(jobdb, os.getcwd(), "", "", cDir, self._tmpDir, "jobs", self._resources, "%s_RmvPairAlignInChunkOverlaps" % self._groupId) | |
277 | |
278 lCmdsTuples = [] | |
279 lCmds = [] | |
280 lCmds.append(self._getRmvPairAlignInChunkOverlapsCmd(iLauncher, tmpFileName, self._outFileName)) | |
281 lCmdStart = [] | |
282 lCmdStart.append("os.symlink(\"%s/%s\", \"%s\")" % (cDir, tmpFileName, tmpFileName)) | |
283 lCmdFinish = [] | |
284 lCmdFinish.append("shutil.move(\"%s\", \"%s/.\")" % (self._outFileName, cDir)) | |
285 lCmdsTuples.append(iLauncher.prepareCommands_withoutIndentation(lCmds, lCmdStart, lCmdFinish)) | |
286 | |
287 iLauncher.runLauncherForMultipleJobs("RmvPairAlignInChunkOverlaps", lCmdsTuples, self._doClean) | |
288 if self._doClean: | |
289 os.remove(tmpFileName) | |
290 else: | |
291 shutil.move(tmpFileName, self._outFileName) | |
292 | |
293 if self._doClean: | |
294 FileUtils.removeFilesByPattern("*.param") | |
295 | |
296 self._log.info("END LaunchBlasterInParallel") | |
297 | |
298 if __name__ == "__main__": | |
299 iLaunch = LaunchBlasterInParallel() | |
300 iLaunch.setAttributesFromCmdLine() | |
301 iLaunch.run() |