comparison commons/tools/LaunchMatcherInParallel.py @ 18:94ab73e8a190

Uploaded
author m-zytnicki
date Mon, 29 Apr 2013 03:20:15 -0400
parents
children
comparison
equal deleted inserted replaced
17:b0e8584489e6 18:94ab73e8a190
1 #!/usr/bin/env python
2
3 # Copyright INRA (Institut National de la Recherche Agronomique)
4 # http://www.inra.fr
5 # http://urgi.versailles.inra.fr
6 #
7 # This software is governed by the CeCILL license under French law and
8 # abiding by the rules of distribution of free software. You can use,
9 # modify and/ or redistribute the software under the terms of the CeCILL
10 # license as circulated by CEA, CNRS and INRIA at the following URL
11 # "http://www.cecill.info".
12 #
13 # As a counterpart to the access to the source code and rights to copy,
14 # modify and redistribute granted by the license, users are provided only
15 # with a limited warranty and the software's author, the holder of the
16 # economic rights, and the successive licensors have only limited
17 # liability.
18 #
19 # In this respect, the user's attention is drawn to the risks associated
20 # with loading, using, modifying and/or developing or reproducing the
21 # software by the user in light of its specific status of free software,
22 # that may mean that it is complicated to manipulate, and that also
23 # therefore means that it is reserved for developers and experienced
24 # professionals having in-depth computer knowledge. Users are therefore
25 # encouraged to load and test the software's suitability as regards their
26 # requirements in conditions enabling the security of their systems and/or
27 # data to be ensured and, more generally, to use and operate it in the
28 # same conditions as regards security.
29 #
30 # The fact that you are presently reading this means that you have had
31 # knowledge of the CeCILL license and that you accept its terms.
32
33 from commons.core.LoggerFactory import LoggerFactory
34 from commons.core.sql.DbFactory import DbFactory
35 from commons.core.sql.TableJobAdaptatorFactory import TableJobAdaptatorFactory
36 from commons.core.launcher.Launcher import Launcher
37 from commons.core.launcher.LauncherUtils import LauncherUtils
38 from commons.core.utils.FileUtils import FileUtils
39 from commons.core.utils.RepetOptionParser import RepetOptionParser
40 from commons.core.checker.ConfigChecker import ConfigRules, ConfigChecker
41 from commons.core.coord.AlignUtils import AlignUtils
42 import shutil
43 import os
44
45 LOG_DEPTH = "repet.tools"
46
47
48 class LaunchMatcherInParallel(object):
49 def __init__(self, align="", queryFileName="", subjectFileName="", evalue="1e-10", doJoin=False, keepConflict=False, prefix="", alignPattern = ".*\.align", \
50 config = "", groupId = "", maxFileSize = 1000000, mergeResults=True, workingDir="tmpMatcher", doClean = False, verbosity = 0):
51 self._alignFileName = align
52 self._queryFileName = queryFileName
53 self.setSubjectFileName(subjectFileName)
54 self.setOutPrefix(prefix)
55 self._alignPattern = alignPattern
56 self._doJoin = doJoin
57 self._eValue = evalue
58 self._keepConflict = keepConflict
59 self._configFileName = config
60 self.setGroupId(groupId)
61 self._maxFileSize = maxFileSize
62 self._mergeResults = mergeResults
63 self._doClean = doClean
64 self._workingDir = workingDir
65 self._verbosity = verbosity
66 self._log = LoggerFactory.createLogger("%s.%s" % (LOG_DEPTH, self.__class__.__name__), self._verbosity)
67 self._jobSectionName = "jobs"
68
69 def setAttributesFromCmdLine(self):
70 description = "Launch Matcher in parallel."
71 epilog = "\nExample 1: launch without verbosity and keep temporary files.\n"
72 epilog += "\t$ python LaunchMatcherInParallel.py -a in.align -v 0"
73 epilog += "\n\t"
74 epilog += "\nExample 2: launch with verbosity to have errors (level 1) and basic information (level 2), and delete temporary files.\n"
75 epilog += "\t$ python LaunchMatcherInParallel.py -a in.align -q query.fa -s subject.fa -o query -c -v 2"
76 parser = RepetOptionParser(description = description, epilog = epilog)
77 parser.add_option("-a", "--align", dest = "align", action = "store", type = "string", help = "input align file name [compulsory] [format: align]", default = "")
78 parser.add_option("-q", "--query", dest = "query", action = "store", type = "string", help = "query fasta file name [optional] [format: fasta]", default = "")
79 parser.add_option("-s", "--subject", dest = "subject", action = "store", type = "string", help = "subject fasta file name [optional] [format: fasta]", default = "")
80 parser.add_option("-e", "--evalue", dest = "evalue", action = "store", type = "string", help = "E-value filter [default: 1e10]", default = "1e-10")
81 parser.add_option("-j", "--join", dest = "doJoin", action = "store_true", help = "join matches [default: False]", default = False)
82 parser.add_option("-k", "--keepConflict",dest = "keepConflict", action = "store_true", help = "keep conflicting subjects [default: False]", default = False)
83 parser.add_option("-o", "--outPrefix", dest = "outPrefix", action = "store", type = "string", help = "output file prefix [default: align file name]", default = "")
84 parser.add_option("-p", "--alignPattern",dest = "alignPattern", action = "store", type = "string", help = "align file pattern [default: .*\.align]", default = ".*\.align")
85 parser.add_option("-n", "--maxFileSize",dest = "maxFileSize", action = "store", type = "int", help = "max file size (1 file for 1 job) [default: 100000]", default = 10000)
86 parser.add_option("-m", "--notMergeResults",dest = "notMergeResults", action = "store_false", help = "don't merge results files [default: True]", default = True)
87 parser.add_option("-w", "--workingDir",dest = "workingDir", action = "store", type = "string", help = "working directory [default: tmpMatcher]", default = "tmpMatcher")
88 parser.add_option("-c", "--clean", dest = "doClean", action = "store_true", help = "clean temporary files [default: False]", default = False)
89 parser.add_option("-v", "--verbosity", dest = "verbosity", action = "store", type = "int", help = "verbosity [default: 1]", default = 1)
90 options = parser.parse_args()[0]
91 self._setAttributesFromOptions(options)
92
93 def _setAttributesFromOptions(self, options):
94 self.setAlignFileName(options.align)
95 self.setQueryFileName(options.query)
96 self.setSubjectFileName(options.subject)
97 self.setEvalue(options.evalue)
98 self.setDoJoin(options.doJoin)
99 self.setKeepConflicts(options.keepConflict)
100 self.setOutPrefix(options.outPrefix)
101 self.setAlignPattern(options.alignPattern)
102 self.setMaxFileSize(options.maxFileSize)
103 self.setMergeResults(options.notMergeResults)
104 self.setWorkingDir(options.workingDir)
105 self.setDoClean(options.doClean)
106 self.setVerbosity(options.verbosity)
107
108 def setAlignFileName(self, alignFileName):
109 self._alignFileName = alignFileName
110
111 def setQueryFileName(self, queryFileName):
112 self._queryFileName = queryFileName
113
114 def setSubjectFileName(self, subjectFileName):
115 self._subjectFileName = subjectFileName
116
117 def setEvalue(self, evalue):
118 self._eValue = evalue
119
120 def setDoJoin(self, doJoin):
121 self._doJoin = doJoin
122
123 def setKeepConflicts(self, keepConflict):
124 self._keepConflict = keepConflict
125
126 def setOutPrefix(self, outPrefix):
127 if outPrefix == "":
128 self._outPrefix = self._alignFileName
129 else:
130 self._outPrefix = outPrefix
131
132 def setAlignPattern(self, alignPattern):
133 self._alignPattern = alignPattern
134
135 def setGroupId(self, groupId):
136 if groupId == "":
137 self._groupId = "Matcher_%s" % os.getpid()
138 else:
139 self._groupId = groupId
140
141 def setMaxFileSize(self, maxFileSize):
142 self._maxFileSize = maxFileSize
143
144 def setMergeResults(self, mergeResults):
145 self._mergeResults = mergeResults
146
147 def setWorkingDir(self, workingDir):
148 self._workingDir = workingDir
149
150 def setDoClean(self, doClean):
151 self._doClean = doClean
152
153 def setVerbosity(self, verbosity):
154 self._verbosity = verbosity
155
156 def _checkOptions(self):
157 if self._alignFileName == "":
158 self._logAndRaise("ERROR: Missing input align file name")
159
160 def _logAndRaise(self, errorMsg):
161 self._log.error(errorMsg)
162 raise Exception(errorMsg)
163
164 def _checkConfig(self):
165 iConfigRules = ConfigRules()
166 iConfigRules.addRuleSection(section=self._jobSectionName, mandatory=True)
167 iConfigRules.addRuleOption(section=self._jobSectionName, option ="resources", mandatory=True, type="string")
168 iConfigRules.addRuleOption(section=self._jobSectionName, option ="tmpDir", mandatory=True, type="string")
169 iConfigRules.addRuleOption(section=self._jobSectionName, option ="copy", mandatory=True, type="bool")
170 iConfigRules.addRuleOption(section=self._jobSectionName, option ="clean", mandatory=True, type="bool")
171
172 iConfigChecker = ConfigChecker(self._configFileName, iConfigRules)
173 self._iConfig = iConfigChecker.getConfig()
174 self._setAttributesFromConfig()
175
176 def _setAttributesFromConfig(self):
177 self._resources = self._iConfig.get(self._jobSectionName, "resources")
178 self._tmpDir = self._iConfig.get(self._jobSectionName, "tmpDir")
179 self._isCopyOnNode = self._iConfig.get(self._jobSectionName, "copy")
180 self._doClean = self._iConfig.get(self._jobSectionName, "clean")
181
182 if self._isCopyOnNode and not self._tmpDir:
183 self._isCopyOnNode = False
184 self._log.debug("The copy option is: %s." % self._isCopyOnNode)
185
186 def _getLaunchMatcherCmd(self, iLauncher, file):
187 lArgs = []
188 lArgs.append("-a %s" % file)
189 if self._queryFileName:
190 lArgs.append("-q %s" % self._queryFileName)
191 if self._subjectFileName:
192 lArgs.append("-s %s" % self._subjectFileName)
193 lArgs.append("-e %s" % self._eValue)
194 lArgs.append("-o %s" % file)
195 if self._doJoin:
196 lArgs.append("-j")
197 if self._keepConflict:
198 lArgs.append("-k")
199 lArgs.append("-v %i" % (self._verbosity - 1))
200 return iLauncher.getSystemCommand("LaunchMatcher.py", lArgs)
201
202 def _splitAlignFilePerSeq(self):
203 lAlign = AlignUtils.getAlignListFromFile(self._alignFileName)
204 lAlignList = AlignUtils.splitAlignListByQueryName(lAlign)
205 inputFileNameWithoutExtension = os.path.splitext(os.path.basename(self._alignFileName))[0]
206 AlignUtils.createAlignFiles(lAlignList, inputFileNameWithoutExtension, self._workingDir)
207
208 def _writeTabHeader(self, outTabFileName):
209 with open(outTabFileName, 'w') as f:
210 f.write("query.name\tquery.start\tquery.end\tquery.length\tquery.length.%\tmatch.length.%\tsubject.name\tsubject.start\tsubject.end\tsubject.length\tsubject.length.%\tE.value\tScore\tIdentity\tpath\n")
211
212 def run(self):
213 LoggerFactory.setLevel(self._log, self._verbosity)
214 self._checkConfig()
215 self._checkOptions()
216
217 self._log.info("START LaunchMatcherInParallel")
218 self._log.debug("Align file name: %s" % self._alignFileName)
219 self._log.debug("Query file name: %s" % self._queryFileName)
220 self._log.debug("Subject file name: %s" % self._subjectFileName)
221 if not os.path.exists(self._workingDir):
222 os.makedirs(self._workingDir)
223 else:
224 self._doClean = False
225 self._splitAlignFilePerSeq()
226 os.chdir(self._workingDir)
227 os.symlink("../%s" % self._queryFileName, self._queryFileName)
228 if self._queryFileName != self._subjectFileName:
229 os.symlink("../%s" % self._subjectFileName, self._subjectFileName)
230
231 cDir = os.getcwd()
232 if not self._tmpDir:
233 self._tmpDir = cDir
234
235 acronym = "Matcher"
236 iDb = DbFactory.createInstance()
237 jobdb = TableJobAdaptatorFactory.createInstance(iDb, "jobs")
238 iLauncher = Launcher(jobdb, os.getcwd(), "", "", cDir, self._tmpDir, "jobs", self._resources, self._groupId, acronym, chooseTemplateWithCopy = self._isCopyOnNode)
239
240 lCmdsTuples = []
241 lCmdSize = []
242 lCmdCopy = []
243
244 lFiles = FileUtils.getFileNamesList(".", self._alignPattern)
245 lFileSizeTuples = []
246 for fileName in lFiles:
247 fileSize = os.path.getsize(fileName)
248 lFileSizeTuples.append((fileName, fileSize))
249 lFileSizeList = LauncherUtils.createHomogeneousSizeList(lFileSizeTuples, self._maxFileSize)
250
251 for lFiles in lFileSizeList:
252 lCmds = []
253 lCmdStart = []
254 lCmdFinish = []
255 if self._queryFileName:
256 lCmdStart.append("os.symlink(\"%s/%s\", \"%s\")" % (cDir, self._queryFileName, self._queryFileName))
257 if self._subjectFileName and self._subjectFileName != self._queryFileName:
258 lCmdStart.append("os.symlink(\"%s/%s\", \"%s\")" % (cDir, self._subjectFileName, self._subjectFileName))
259 for file in lFiles:
260 lCmds.append(self._getLaunchMatcherCmd(iLauncher, file))
261 lCmdStart.append("os.symlink(\"%s/%s\", \"%s\")" % (cDir, file, file))
262 lCmdFinish.append("if os.path.exists(\"%s.match.path\"):" % file)
263 lCmdFinish.append("\tshutil.move(\"%s.match.path\", \"%s/.\" )" % (file, cDir))
264 lCmdFinish.append("if os.path.exists(\"%s.match.tab\"):" % file)
265 lCmdFinish.append("\tshutil.move(\"%s.match.tab\", \"%s/.\" )" % (file, cDir))
266 lCmdsTuples.append(iLauncher.prepareCommands_withoutIndentation(lCmds, lCmdStart, lCmdFinish, lCmdSize, lCmdCopy))
267
268 iLauncher.runLauncherForMultipleJobs("Matcher", lCmdsTuples, self._doClean, self._isCopyOnNode)
269
270 if self._mergeResults:
271 FileUtils.catFilesByPattern("*.match.path", "../%s.match.path" % self._outPrefix)
272 if self._queryFileName or self._subjectFileName:
273 outTabFileName = "../%s.match.tab" % self._outPrefix
274 self._writeTabHeader(outTabFileName)
275 FileUtils.catFilesByPattern("*.match.tab", outTabFileName, skipHeaders = True)
276 os.chdir("..")
277 if self._doClean and self._mergeResults:
278 self._log.warning("Working directory will be cleaned")
279 shutil.rmtree(self._workingDir)
280 self._log.info("END LaunchMatchInParallel")
281
282 if __name__ == "__main__":
283 iLaunch = LaunchMatcherInParallel()
284 iLaunch.setAttributesFromCmdLine()
285 iLaunch.run()