comparison commons/tools/LaunchBlasterInParallel.py @ 18:94ab73e8a190

Uploaded
author m-zytnicki
date Mon, 29 Apr 2013 03:20:15 -0400
parents
children
comparison
equal deleted inserted replaced
17:b0e8584489e6 18:94ab73e8a190
1 #!/usr/bin/env python
2
3 # Copyright INRA (Institut National de la Recherche Agronomique)
4 # http://www.inra.fr
5 # http://urgi.versailles.inra.fr
6 #
7 # This software is governed by the CeCILL license under French law and
8 # abiding by the rules of distribution of free software. You can use,
9 # modify and/ or redistribute the software under the terms of the CeCILL
10 # license as circulated by CEA, CNRS and INRIA at the following URL
11 # "http://www.cecill.info".
12 #
13 # As a counterpart to the access to the source code and rights to copy,
14 # modify and redistribute granted by the license, users are provided only
15 # with a limited warranty and the software's author, the holder of the
16 # economic rights, and the successive licensors have only limited
17 # liability.
18 #
19 # In this respect, the user's attention is drawn to the risks associated
20 # with loading, using, modifying and/or developing or reproducing the
21 # software by the user in light of its specific status of free software,
22 # that may mean that it is complicated to manipulate, and that also
23 # therefore means that it is reserved for developers and experienced
24 # professionals having in-depth computer knowledge. Users are therefore
25 # encouraged to load and test the software's suitability as regards their
26 # requirements in conditions enabling the security of their systems and/or
27 # data to be ensured and, more generally, to use and operate it in the
28 # same conditions as regards security.
29 #
30 # The fact that you are presently reading this means that you have had
31 # knowledge of the CeCILL license and that you accept its terms.
32
33 import os
34 import shutil
35 from commons.core.LoggerFactory import LoggerFactory
36 from commons.core.sql.DbFactory import DbFactory
37 from commons.core.sql.TableJobAdaptatorFactory import TableJobAdaptatorFactory
38 from commons.core.launcher.Launcher import Launcher
39 from commons.core.utils.FileUtils import FileUtils
40 from commons.core.utils.RepetOptionParser import RepetOptionParser
41 from commons.core.checker.ConfigChecker import ConfigRules, ConfigChecker
42 from commons.tools.MergeMatchsFiles import MergeMatchsFiles
43
44 LOG_DEPTH = "repet.tools"
45
46 ##Launch BLASTER in parallel
47 #
48 class LaunchBlasterInParallel(object):
49
50 def __init__(self, queryDirectory = "", subjectFilePath = "", outFileName = "", configFileName = "", groupId = "", queryPattern = ".*\.fa", \
51 doAllByall = False, nbCPU = 1, eValue="1e-300", type = "ncbi", program="blastn", extraParams="", verbosity = 0):
52 self._queryDirectory = queryDirectory
53 self._queryPattern = queryPattern
54 self.setSubjectFilePath(subjectFilePath)
55 self._outFileName = outFileName
56 self._configFileName = configFileName
57 self.setGroupId(groupId)
58 self._doAllByall = doAllByall
59 self._blastType = type
60 self._program = program
61 self._extraParams = extraParams
62 self._nbCPU = nbCPU
63 self._jobSectionName = "jobs"
64 self._blasterSectionName = "alignment"
65 self._prepareDataSectionName = "prepare_data"
66 self._eValue = eValue
67
68 self._doClean = None
69 self._verbosity = verbosity
70 self._log = LoggerFactory.createLogger("%s.%s" % (LOG_DEPTH, self.__class__.__name__), self._verbosity)
71
72 def setAttributesFromCmdLine(self):
73 description = "Launch Blaster in parallel."
74 epilog = "\nExample 1: launch without verbosity and keep temporary files.\n"
75 epilog += "\t$ python LaunchBlasterInParallel.py -q query -o query.align -v 0"
76 epilog += "\n\t"
77 epilog += "\nExample 2: launch with verbosity to have errors (level 1) and basic information (level 2), and delete temporary files.\n"
78 epilog += "\t$ python LaunchBlasterInParallel.py -q query -o query.align -s nr.fa -c -v 2"
79 parser = RepetOptionParser(description = description, epilog = epilog)
80 parser.add_option("-q", "--query", dest = "query", action = "store", type = "string", help = "query fasta directory absolute path [compulsory]", default = "")
81 parser.add_option("-s", "--subject", dest = "subject", action = "store", type = "string", help = "subject fasta absolute path [compulsory] [format: fasta]", default = "")
82 parser.add_option("-o", "--out", dest = "outFileName", action = "store", type = "string", help = "output align file name [compulsory] [format: align]", default = "")
83 parser.add_option("-C", "--config", dest = "configFileName",action = "store", type = "string", help = "configuration file name [compulsory] [format: cfg]", default = "")
84 parser.add_option("-g", "--groupId", dest = "groupId", action = "store", type = "string", help = "jobs groupId [default: Blaster_<pid>]", default = "")
85 parser.add_option("-p", "--queryPattern",dest = "queryPattern", action = "store", type = "string", help = "query file pattern [default: .*\.fa]", default = ".*\.fa")
86 parser.add_option("-a", "--aba", dest = "doAllByall", action = "store_true", help = "all-by-all Blast [default: False]", default = False)
87 parser.add_option("-e", "--eValue", dest = "eValue", action = "store", type = "string", help = "Blast e-value [default: 1e300]", default = "1e-300")
88 parser.add_option("-t", "--type", dest = "type", action = "store", type = "string", help = "Blast type [ncbi, wu, blastplus] [default: ncbi]", default = "ncbi")
89 parser.add_option("-u", "--program", dest = "program", action = "store", type = "string", help = "Blast program type [blastn, blastx, blastx] [default: blastn]", default = "blastn")
90 parser.add_option("-x", "--extraParams",dest = "extraParams", action = "store", type = "string", help = "Additional blast program parameters[default: '']", default = "")
91 parser.add_option("-n", "--ncpu", dest = "cpu", action = "store", type = "int", help = "Number of CPUs to use [default: 1]", default = 1)
92 parser.add_option("-v", "--verbosity", dest = "verbosity", action = "store", type = "int", help = "verbosity [default: 1]", default = 1)
93 options = parser.parse_args()[0]
94 self._setAttributesFromOptions(options)
95
96 def _setAttributesFromOptions(self, options):
97 self.setQueryDirectory(options.query)
98 self.setQueryPattern(options.queryPattern)
99 self.setSubjectFilePath(options.subject)
100 self.setOutFileName(options.outFileName)
101 self.setConfigFileName(options.configFileName)
102 self.setGroupId(options.groupId)
103 self.setDoAllByall(options.doAllByall)
104 self.setEValue(options.eValue)
105 self.setType(options.type)
106 self.setProgram(options.program)
107 self.setExtraParams(options.extraParams)
108 self.setCPU(options.cpu)
109 self.setVerbosity(options.verbosity)
110
111 def setQueryDirectory(self, queryDirectory):
112 self._queryDirectory = queryDirectory
113
114 def setQueryPattern(self, queryPattern):
115 self._queryPattern = queryPattern
116
117 def setSubjectFilePath(self, subjectFilePath):
118 self._subjectFilePath = subjectFilePath
119 self._subjectFileName = os.path.basename(subjectFilePath)
120
121 def setOutFileName(self, outFileName):
122 self._outFileName = outFileName
123
124 def setConfigFileName(self, configFileName):
125 self._configFileName = configFileName
126
127 def setGroupId(self, groupId):
128 if groupId == "":
129 self._groupId = "Blaster_%s" % os.getpid()
130 else:
131 self._groupId = groupId
132
133 def setDoAllByall(self, doAllByall):
134 self._doAllByall = doAllByall
135
136 def setType(self, blastType):
137 self._blastType = blastType
138
139 def setProgram(self, program):
140 self._program = program
141
142 def setExtraParams(self, extraParams):
143 self._extraParams = extraParams
144
145 def setEValue(self, eValue):
146 self._eValue = eValue
147
148 def setCPU(self, cpu):
149 self._nbCPU = cpu
150
151 def setDoClean(self, doClean):
152 self._doClean = doClean
153
154 def setVerbosity(self, verbosity):
155 self._verbosity = verbosity
156
157 def _checkOptions(self):
158 if self._queryPattern == "":
159 self._logAndRaise("ERROR: Missing input fasta file name")
160
161 def _logAndRaise(self, errorMsg):
162 self._log.error(errorMsg)
163 raise Exception(errorMsg)
164
165 def _checkConfig(self):
166 iConfigRules = ConfigRules()
167 iConfigRules.addRuleSection(section=self._jobSectionName, mandatory=True)
168 iConfigRules.addRuleOption(section=self._jobSectionName, option ="resources", mandatory=True, type="string")
169 iConfigRules.addRuleOption(section=self._jobSectionName, option ="tmpDir", mandatory=True, type="string")
170 iConfigRules.addRuleOption(section=self._jobSectionName, option ="copy", mandatory=True, type="bool")
171 iConfigRules.addRuleOption(section=self._jobSectionName, option ="clean", mandatory=True, type="bool")
172 iConfigRules.addRuleOption(section=self._blasterSectionName, option ="blast", mandatory=True, type="string", set = ("ncbi", "blastplus", "wu"))
173 iConfigRules.addRuleOption(section=self._blasterSectionName, option ="Evalue", mandatory=True, type="string")
174 iConfigRules.addRuleOption(section=self._blasterSectionName, option ="length", mandatory=True, type="string")
175 iConfigRules.addRuleOption(section=self._blasterSectionName, option ="identity", mandatory=True, type="string")
176
177 iConfigChecker = ConfigChecker(self._configFileName, iConfigRules)
178 self._iConfig = iConfigChecker.getConfig()
179 self._setAttributesFromConfig()
180
181 def _setAttributesFromConfig(self):
182 self._chunkLength = self._iConfig.get(self._prepareDataSectionName, "chunk_length")
183 self._chunkOverlap = self._iConfig.get(self._prepareDataSectionName, "chunk_overlap")
184 self._resources = self._iConfig.get(self._jobSectionName, "resources")
185 self._tmpDir = self._iConfig.get(self._jobSectionName, "tmpDir")
186 self._isCopyOnNode = self._iConfig.get(self._jobSectionName, "copy")
187 self._doClean = self._iConfig.get(self._jobSectionName, "clean")
188 self._blastType = self._iConfig.get(self._blasterSectionName, "blast")
189 self._eValue = self._iConfig.get(self._blasterSectionName, "Evalue")
190 self._length = self._iConfig.get(self._blasterSectionName, "length")
191 self._identity = self._iConfig.get(self._blasterSectionName, "identity")
192
193 if self._isCopyOnNode and not self._tmpDir:
194 self._isCopyOnNode = False
195 self._log.debug("The copy option is: %s." % self._isCopyOnNode)
196
197 def _getLaunchBlasterCmd(self, iLauncher, file):
198 lArgs = []
199 lArgs.append("-u %s" % self._program)
200 lArgs.append("-q %s" % file)
201 lArgs.append("-s %s" % self._subjectFileName)
202 if self._doAllByall:
203 lArgs.append("-a")
204 lArgs.append("-e %s" % self._eValue)
205 lArgs.append("-l %s" % self._length)
206 lArgs.append("-d %s" % self._identity)
207 lArgs.append("-t %s" % self._blastType)
208 lArgs.append("-x '%s'" % self._extraParams)
209 if self._doClean:
210 lArgs.append("-c")
211 lArgs.append("-v %i" % (self._verbosity - 1))
212 return iLauncher.getSystemCommand("LaunchBlaster.py", lArgs)
213
214 def _getRmvPairAlignInChunkOverlapsCmd(self, iLauncher, inFileName, outFileName):
215 lArgs = []
216 lArgs.append("-i %s" % inFileName)
217 lArgs.append("-l %s" % self._chunkLength)
218 lArgs.append("-o %s" % self._chunkOverlap)
219 lArgs.append("-m 10")
220 lArgs.append("-O %s" % outFileName)
221 lArgs.append("-v %d" % (self._verbosity - 1))
222 return iLauncher.getSystemCommand("RmvPairAlignInChunkOverlaps.py", lArgs)
223
224 def run(self):
225 LoggerFactory.setLevel(self._log, self._verbosity)
226 self._checkConfig()
227 self._checkOptions()
228 self._log.info("START LaunchBlasterInParallel")
229 self._log.debug("Query file name: %s" % self._queryPattern)
230 self._log.debug("Subject file name: %s" % self._subjectFileName)
231
232 cDir = os.getcwd()
233 if not self._tmpDir:
234 self._tmpDir = cDir
235
236 acronym = "Blaster"
237 iDb = DbFactory.createInstance()
238 jobdb = TableJobAdaptatorFactory.createInstance(iDb, "jobs")
239 iLauncher = Launcher(jobdb, os.getcwd(), "", "", cDir, self._tmpDir, "jobs", self._resources, self._groupId, acronym, chooseTemplateWithCopy = self._isCopyOnNode)
240
241 lCmdsTuples = []
242 fileSize = float(os.path.getsize(self._subjectFilePath) + 5000000) / 1000000000
243
244 lCmdSize = []
245 lCmdCopy = []
246 if self._isCopyOnNode:
247 lCmdSize.append("fileSize = %f" % fileSize)
248 lCmdCopy.append("shutil.copy(\"%s\", \".\")" % self._subjectFilePath)
249
250 lFiles = FileUtils.getFileNamesList(self._queryDirectory, self._queryPattern)
251 for file in lFiles:
252 lCmds = []
253 lCmds.append(self._getLaunchBlasterCmd(iLauncher, file))
254 lCmdStart = []
255 if self._isCopyOnNode:
256 lCmdStart.append("os.symlink(\"../%s\", \"%s\")" % (self._subjectFileName, self._subjectFileName))
257 lCmdStart.append("shutil.copy(\"%s/%s\", \".\")" % (self._queryDirectory, file))
258 else:
259 lCmdStart.append("os.symlink(\"%s\", \"%s\")" % (self._subjectFilePath, self._subjectFileName))
260 lCmdStart.append("os.symlink(\"%s/%s\", \"%s\")" % (self._queryDirectory, file, file))
261 lCmdFinish = []
262 lCmdFinish.append("if os.path.exists(\"%s.align\"):" % file)
263 lCmdFinish.append("\tshutil.move(\"%s.align\", \"%s/.\" )" % (file, cDir))
264 lCmdFinish.append("shutil.move(\"%s.param\", \"%s/.\" )" % (file, cDir))
265 lCmdsTuples.append(iLauncher.prepareCommands_withoutIndentation(lCmds, lCmdStart, lCmdFinish, lCmdSize, lCmdCopy))
266
267 iLauncher.runLauncherForMultipleJobs("Blaster", lCmdsTuples, self._doClean, self._isCopyOnNode)
268
269 tmpFileName = "tmp_%s.align" % os.getpid()
270 iMMF = MergeMatchsFiles("align", "tmp_%s" % os.getpid(), allByAll = self._doAllByall, clean = self._doClean)
271 iMMF.run()
272
273 if self._doAllByall:
274 iDb = DbFactory.createInstance()
275 jobdb = TableJobAdaptatorFactory.createInstance(iDb, "jobs")
276 iLauncher = Launcher(jobdb, os.getcwd(), "", "", cDir, self._tmpDir, "jobs", self._resources, "%s_RmvPairAlignInChunkOverlaps" % self._groupId)
277
278 lCmdsTuples = []
279 lCmds = []
280 lCmds.append(self._getRmvPairAlignInChunkOverlapsCmd(iLauncher, tmpFileName, self._outFileName))
281 lCmdStart = []
282 lCmdStart.append("os.symlink(\"%s/%s\", \"%s\")" % (cDir, tmpFileName, tmpFileName))
283 lCmdFinish = []
284 lCmdFinish.append("shutil.move(\"%s\", \"%s/.\")" % (self._outFileName, cDir))
285 lCmdsTuples.append(iLauncher.prepareCommands_withoutIndentation(lCmds, lCmdStart, lCmdFinish))
286
287 iLauncher.runLauncherForMultipleJobs("RmvPairAlignInChunkOverlaps", lCmdsTuples, self._doClean)
288 if self._doClean:
289 os.remove(tmpFileName)
290 else:
291 shutil.move(tmpFileName, self._outFileName)
292
293 if self._doClean:
294 FileUtils.removeFilesByPattern("*.param")
295
296 self._log.info("END LaunchBlasterInParallel")
297
298 if __name__ == "__main__":
299 iLaunch = LaunchBlasterInParallel()
300 iLaunch.setAttributesFromCmdLine()
301 iLaunch.run()