comparison commons/tools/TEclassifierPE_parallelized.py @ 18:94ab73e8a190

Uploaded
author m-zytnicki
date Mon, 29 Apr 2013 03:20:15 -0400
parents
children
comparison
equal deleted inserted replaced
17:b0e8584489e6 18:94ab73e8a190
1 #!/usr/bin/env python
2
3 # Copyright INRA (Institut National de la Recherche Agronomique)
4 # http://www.inra.fr
5 # http://urgi.versailles.inra.fr
6 #
7 # This software is governed by the CeCILL license under French law and
8 # abiding by the rules of distribution of free software. You can use,
9 # modify and/ or redistribute the software under the terms of the CeCILL
10 # license as circulated by CEA, CNRS and INRIA at the following URL
11 # "http://www.cecill.info".
12 #
13 # As a counterpart to the access to the source code and rights to copy,
14 # modify and redistribute granted by the license, users are provided only
15 # with a limited warranty and the software's author, the holder of the
16 # economic rights, and the successive licensors have only limited
17 # liability.
18 #
19 # In this respect, the user's attention is drawn to the risks associated
20 # with loading, using, modifying and/or developing or reproducing the
21 # software by the user in light of its specific status of free software,
22 # that may mean that it is complicated to manipulate, and that also
23 # therefore means that it is reserved for developers and experienced
24 # professionals having in-depth computer knowledge. Users are therefore
25 # encouraged to load and test the software's suitability as regards their
26 # requirements in conditions enabling the security of their systems and/or
27 # data to be ensured and, more generally, to use and operate it in the
28 # same conditions as regards security.
29 #
30 # The fact that you are presently reading this means that you have had
31 # knowledge of the CeCILL license and that you accept its terms.
32
33 import os
34 import sys
35 import shutil
36
37 if not "REPET_PATH" in os.environ.keys():
38 print "ERROR: no environment variable REPET_PATH"
39 sys.exit(1)
40 sys.path.append(os.environ["REPET_PATH"])
41 if not "PYTHONPATH" in os.environ.keys():
42 os.environ["PYTHONPATH"] = os.environ["REPET_PATH"]
43 else:
44 os.environ["PYTHONPATH"] = "%s:%s" % (os.environ["REPET_PATH"], os.environ["PYTHONPATH"])
45
46 from commons.core.LoggerFactory import LoggerFactory
47 from commons.core.utils.RepetOptionParser import RepetOptionParser
48 from commons.core.utils.FileUtils import FileUtils
49 from commons.core.checker.ConfigChecker import ConfigRules
50 from commons.core.checker.ConfigChecker import ConfigChecker
51 from commons.core.seq.FastaUtils import FastaUtils
52 from commons.core.sql.DbFactory import DbFactory
53 from commons.core.sql.TableJobAdaptatorFactory import TableJobAdaptatorFactory
54 from commons.core.launcher.Launcher import Launcher
55 from denovo_pipe.ReverseComplementAccordingToClassif import ReverseComplementAccordingToClassif
56 from denovo_pipe.DetectTEFeatures_parallelized import DetectTEFeatures_parallelized
57 from denovo_pipe.RenameHeaderClassif import RenameHeaderClassif
58 from denovo_pipe.LaunchPASTEC import LaunchPASTEC
59 from PASTEC.StatPastec import StatPastec
60
61 LOG_DEPTH = "repet.tools"
62 #LOG_FORMAT = "%(message)s"
63
64 ####TEclassifier PASTEC Edition - parallelized
65 #
66 class TEclassifierPE_parallelized(object):
67
68 def __init__(self, fastaFileName = "", configFileName = "", addWickerCode = False, reverseComp = False, doClean = False, verbosity = 0):
69 self._fastaFileName = fastaFileName
70 self._addWickerCode = addWickerCode
71 self._reverseComp = reverseComp
72 self._configFileName = configFileName
73 self._doClean = doClean
74 self._verbosity = verbosity
75 self._projectName = ""
76 self._log = LoggerFactory.createLogger("%s.%s" % (LOG_DEPTH, self.__class__.__name__), self._verbosity)
77
78 def setAttributesFromCmdLine(self):
79 description = "TE classifier PASTEC Edition.\n"
80 description += "Detect TE features on consensus and classify them. Give some classification statistics.\n"
81 description += "Can rename headers with classification info and Wicker's code at the beginning.\n"
82 description += "Can reverse-complement consensus if they are detected in reverse strand.\n"
83 epilog = "\n"
84 epilog += "Example 1: launch and clean temporary files\n"
85 epilog += "\t$ python TEclassifierPE.py -i consensus.fa -C TEclassifier.cfg -c\n"
86 epilog += "\n"
87 epilog += "Example 2: launch with 'rename headers' and 'reverse-complement' options\n"
88 epilog += "\t$ python TEclassifierPE.py -i consensus.fa -C TEclassifier.cfg -c -w -r\n"
89 parser = RepetOptionParser(description = description, epilog = epilog)
90 parser.add_option("-i", "--fasta", dest = "fastaFileName", action = "store", type = "string", help = "input fasta file name [compulsory] [format: fasta]", default = "")
91 parser.add_option("-C", "--config", dest = "configFileName",action = "store", type = "string", help = "configuration file name (e.g. TEclassifier.cfg) [compulsory]", default = "")
92 parser.add_option("-w", "--wicker", dest = "addWickerCode", action = "store_true", help = "add classification info and Wicker's code at the beginning of the headers [optional] [default: False]", default = False)
93 parser.add_option("-r", "--reverse", dest = "reverseComp", action = "store_true", help = "reverse-complement consensus if they are detected in reverse strand [optional] [default: False]", default = False)
94 parser.add_option("-c", "--clean", dest = "doClean", action = "store_true", help = "clean temporary files [optional] [default: False]", default = False)
95 parser.add_option("-v", "--verbosity", dest = "verbosity", action = "store", type = "int", help = "verbosity [optional] [default: 3, from 1 to 4]", default = 3)
96 options = parser.parse_args()[0]
97 self._setAttributesFromOptions(options)
98
99 def _setAttributesFromOptions(self, options):
100 self.setFastaFileName(options.fastaFileName)
101 self.setAddWickerCode(options.addWickerCode)
102 self.setReverseComp(options.reverseComp)
103 self.setConfigFileName(options.configFileName)
104 self.setDoClean(options.doClean)
105 self.setVerbosity(options.verbosity)
106
107 def _checkConfig(self):
108 iConfigRules = ConfigRules()
109 iConfigRules.addRuleOption(section="project", option ="project_name", mandatory=True, type="string")
110 sectionName = "classif_consensus"
111 iConfigRules.addRuleOption(section=sectionName, option ="clean", mandatory=True, type="bool")
112 iConfigRules.addRuleOption(section=sectionName, option ="limit_job_nb", type="int")
113 iConfigRules.addRuleOption(section=sectionName, option ="resources", type="string")
114 iConfigRules.addRuleOption(section=sectionName, option ="tmpDir", type="string")
115 iConfigChecker = ConfigChecker(self._configFileName, iConfigRules)
116 iConfig = iConfigChecker.getConfig()
117 self._setAttributesFromConfig(iConfig)
118
119 def _setAttributesFromConfig(self, iConfig):
120 self.setProjectName(iConfig.get("project", "project_name"))
121 sectionName = "classif_consensus"
122 self.setDoClean(iConfig.get(sectionName, "clean"))
123 self._maxJobNb = iConfig.get(sectionName, "limit_job_nb")
124 self._resources = iConfig.get(sectionName, "resources")
125 self._tmpDir = iConfig.get(sectionName, "tmpDir")
126
127 def setFastaFileName(self, fastaFileName):
128 self._fastaFileName = fastaFileName
129
130 def setConfigFileName(self, configFileName):
131 self._configFileName = configFileName
132
133 def setAddWickerCode(self, addWickerCode):
134 self._addWickerCode = addWickerCode
135
136 def setReverseComp(self, reverseComp):
137 self._reverseComp = reverseComp
138
139 def setDoClean(self, doClean):
140 self._doClean = doClean
141
142 def setVerbosity(self, verbosity):
143 self._verbosity = verbosity
144
145 def setProjectName(self, projectName):
146 self._projectName = projectName
147
148 def _checkOptions(self):
149 if self._fastaFileName == "":
150 self._logAndRaise("ERROR: Missing input fasta file name")
151
152 def _logAndRaise(self, errorMsg):
153 self._log.error(errorMsg)
154 raise Exception(errorMsg)
155
156 # def setup_env(config):
157 # os.environ["REPET_HOST"] = config.get("repet_env", "repet_host")
158 # os.environ["REPET_USER"] = config.get("repet_env", "repet_user")
159 # os.environ["REPET_PW"] = config.get("repet_env", "repet_pw")
160 # os.environ["REPET_DB"] = config.get("repet_env", "repet_db")
161 # os.environ["REPET_PORT"] = config.get("repet_env", "repet_port")
162 # os.environ["REPET_JOB_MANAGER"] = config.get("repet_env", "repet_job_manager")
163 # os.environ["REPET_QUEUE"] = config.get("repet_env", "repet_job_manager")
164 # os.environ["REPET_JOBS"] = "MySQL"
165
166 def getPASTECcommand(self, iLauncher, fileName):
167 lArgs = []
168 lArgs.append("-C %s" % self._configFileName)
169 lArgs.append("-P %s" % self._projectName)
170 lArgs.append("-S 2")
171 lArgs.append("-i %s" % fileName)
172 lArgs.append("-v %s" % self._verbosity)
173 return iLauncher.getSystemCommand("LaunchPASTEC.py", lArgs)
174
175 def run(self):
176 LoggerFactory.setLevel(self._log, self._verbosity)
177 if self._configFileName:
178 self._checkConfig()
179 self._checkOptions()
180 self._log.info("START TEclassifier PASTEC Edition")
181 self._log.debug("Fasta file name: %s" % self._fastaFileName)
182 nbSeq = FastaUtils.dbSize(self._fastaFileName)
183 self._log.debug("Total number of sequences: %i)" % nbSeq)
184
185 self._log.debug("Launch DetectTEFeatures on each batch")
186 iDF = DetectTEFeatures_parallelized(self._fastaFileName, self._projectName, self._configFileName, self._doClean, self._verbosity)
187 iDF.run()
188
189 self._log.debug("Insert banks in database")
190 iLP = LaunchPASTEC(self._configFileName, "1", projectName = self._projectName, verbose = self._verbosity)
191 iLP.run()
192
193 self._log.info("Split fasta file")
194 if self._maxJobNb == 0 or nbSeq / self._maxJobNb <= 1.0:
195 nbSeqPerBatch = nbSeq
196 else:
197 nbSeqPerBatch = nbSeq / self._maxJobNb + 1
198 FastaUtils.dbSplit(self._fastaFileName, nbSeqPerBatch, True, verbose = self._verbosity - 2)
199
200 self._log.info("Launch PASTEC on each batch")
201 queue = self._resources
202 cDir = os.getcwd()
203 if self._tmpDir != "":
204 tmpDir = self._tmpDir
205 else:
206 tmpDir = cDir
207
208 #TODO: allow not to parallelize
209 groupid = "%s_PASTEC" % self._projectName
210 acronym = "PASTEC"
211 iDb = DbFactory.createInstance()
212 iTJA = TableJobAdaptatorFactory.createInstance(iDb, "jobs")
213 iLauncher = Launcher(iTJA, os.getcwd(), "", "", cDir, tmpDir, "jobs", queue, groupid)
214 lCmdsTuples = []
215 lFiles = FileUtils.getFileNamesList("%s/batches" % cDir, "batch_")
216 if len(lFiles) == 0:
217 self._logAndRaise("ERROR: directory 'batches' is empty")
218 classifFileName = "%s.classif" % self._projectName
219 count = 0
220 for file in lFiles:
221 count += 1
222 lCmds = [self.getPASTECcommand(iLauncher, file)]
223 lCmdStart = []
224 lCmdStart.append("shutil.copy(\"%s/batches/%s\", \".\")" % (cDir, file))
225 lCmdStart.append("shutil.copy(\"%s/%s\", \".\")" % (cDir, self._configFileName))
226 lCmdFinish = []
227 lCmdFinish.append("shutil.move(\"%s\", \"%s/%s_%i\")" % (classifFileName, cDir, classifFileName, count))
228 lCmdsTuples.append(iLauncher.prepareCommands_withoutIndentation(lCmds, lCmdStart, lCmdFinish))
229 iLauncher.runLauncherForMultipleJobs(acronym, lCmdsTuples, self._doClean)
230
231 FileUtils.catFilesByPattern("%s_*" % classifFileName, classifFileName)
232 if self._doClean:
233 FileUtils.removeFilesByPattern("%s_*" % classifFileName)
234 shutil.rmtree("batches")
235
236 self._log.debug("Compute stats about classification")
237 iSP = StatPastec(classifFileName)
238 iSP.run()
239
240 if self._reverseComp:
241 self._log.debug("Reverse complement")
242 iRevComplAccording2Classif = ReverseComplementAccordingToClassif()
243 iRevComplAccording2Classif.setFastaFile(self._fastaFileName)
244 iRevComplAccording2Classif.setClassifFile(classifFileName)
245 iRevComplAccording2Classif.run()
246 newFastaFileName = "%s_negStrandReversed.fa" % os.path.splitext(self._fastaFileName)[0]
247 else:
248 newFastaFileName = self._fastaFileName
249
250 if self._addWickerCode:
251 self._log.debug("Rename headers according to Wicker's code")
252 iRHC = RenameHeaderClassif(classifFileName, newFastaFileName, self._projectName)
253 iRHC.setOutputFileName("")
254 iRHC.run()
255
256 self._log.info("END TEclassifier PASTEC Edition")
257
258 if __name__ == "__main__":
259 iLaunch = TEclassifierPE_parallelized()
260 iLaunch.setAttributesFromCmdLine()
261 iLaunch.run()