Mercurial > repos > yufei-luo > s_mart
view commons/pyRepetUnit/blastnForClassifierStep1/tests/Test_RepbaseBLRnForClassifierStep1.py @ 18:94ab73e8a190
Uploaded
author | m-zytnicki |
---|---|
date | Mon, 29 Apr 2013 03:20:15 -0400 |
parents | |
children |
line wrap: on
line source
import unittest import os import shutil import ConfigParser import sys from commons.pyRepetUnit.blastnForClassifierStep1.RepbaseBLRnForClassifierStep1 import RepbaseBLRnForClassifierStep1 from commons.core.utils.FileUtils import FileUtils from commons.core.sql.DbFactory import DbFactory from commons.core.sql.TablePathAdaptator import TablePathAdaptator from commons.core.coord.Path import Path import pyRepet.launcher.programLauncher if not os.environ.has_key( "REPET_PATH" ): print "*** Error: no environment variable REPET_PATH" sys.exit(1) sys.path.append( os.environ["REPET_PATH"] ) NB_EXPECTED_LINES_IN_PATH_FILE = 10 CURRENT_DIR = os.getcwd() class Test_RepbaseBLRnForClassifierStep1( unittest.TestCase ): def setUp( self ): os.chdir(CURRENT_DIR) self._inFileName = "dummyFileName" self._launch_1 = "log = os.system( \"" self._launch_2 = "\" )\n" self._launch_2 += "if log != 0:\n" self._launch_2 += "\tsys.exit(1)\n" self._cDir = "/home/user/dummy_cdir" self._tmpDir = "/home/user/dummy_tmpDir" self._configFileName = "configFile" self._repbaseName = "dummyRepbase_nt.fa" f = open(self._configFileName, "w") f.write("[repet_env]\n") f.write("repet_host: %s\n" % os.environ["REPET_HOST"]) f.write("repet_user: %s\n" % os.environ["REPET_USER"]) f.write("repet_pw: %s\n" % os.environ["REPET_PW"]) f.write("repet_db: %s\n" % os.environ["REPET_DB"]) f.write("repet_port: %s\n" % os.environ["REPET_PORT"]) f.write("[detect_features]\n") f.write("TE_BLRn: yes\n") f.write("TE_nucl_bank: %s\n" % self._repbaseName) f.write("wublast: yes\n") f.close() self._verbose = 0 self._config = ConfigParser.ConfigParser() self._config.readfp( open(self._configFileName) ) self._pL = pyRepet.launcher.programLauncher.programLauncher() self._project = "dummyProject" self._repbaseBLRn = RepbaseBLRnForClassifierStep1(self._inFileName, self._launch_1, self._launch_2, self._cDir, self._tmpDir, self._configFileName, self._verbose, self._pL, self._project) self._expFileName = "expFile" self._bank = self._config.get("detect_features","TE_nucl_bank") self._db = DbFactory.createInstance() def tearDown( self ): self._db.dropTable("%s_TE_BLRn_path" % (self._project)) os.chdir(CURRENT_DIR) self._repbaseBLRn = None os.remove(self._configFileName) if os.path.isfile("%s_BLRn_%s.align.clean_match.path" % (self._project, self._bank)): os.remove("%s_BLRn_%s.align.clean_match.path" % (self._project, self._bank)) if os.path.isfile("%s_BLRn_%s.align.clean_match.path.tmp" % (self._project, self._bank)): os.remove("%s_BLRn_%s.align.clean_match.path.tmp" % (self._project, self._bank)) if os.path.isfile("batch_1.fa_BLRn_%s.*" % (self._bank)): os.remove("batch_1.fa_BLRn_%s.*" % (self._bank)) if os.path.isfile("batch_2.fa_BLRn_%s.*" % (self._bank)): os.remove("batch_2.fa_BLRn_%s.*" % (self._bank)) if os.path.isfile("batch_1.fa_BLRn_%s.align.clean_match.path" % (self._bank)): os.remove("batch_1.fa_BLRn_%s.align.clean_match.path" % (self._bank)) if os.path.isfile("batch_2.fa_BLRn_%s.align.clean_match.path" % (self._bank)): os.remove("batch_2.fa_BLRn_%s.align.clean_match.path" % (self._bank)) if os.path.isfile(self._repbaseName): os.remove(self._repbaseName) os.remove(self._repbaseName + "_cut") os.remove(self._repbaseName + "_cut.xnd") os.remove(self._repbaseName + "_cut.xns") os.remove(self._repbaseName + "_cut.xnt") os.remove(self._repbaseName + ".Nstretch.map") os.remove("last_time_stamp.log") if os.path.isdir( "TE_BLRn" ): os.chdir( "TE_BLRn" ) os.chdir(CURRENT_DIR) shutil.rmtree("TE_BLRn") if os.path.isfile(self._expFileName): os.remove(self._expFileName) def test_formatRepbase_ntIfNecessaryWithoutLog( self ): f = open(self._repbaseName, "w") f.write(">seq1\n") f.write("ATGCGTGCGTAAATGCGTAATGCGTAAATGCGTAAATTTGCGCGTGGCGTATGCGTGCAT\n") f.write("ATGCGTGCGTAAATGCGTAATGCGTAAATGCGTTTTTTTGCGCGTGGCGTATGCGTGCAT\n") f.write("ATGCGTGCGTAAATGCGTAATGCGTAAATGCGTAAATTTGCGCGTAAAGTATGCGTGCAT\n") f.write(">seq2\n") f.write("ATGCGTGCGTAAATGCGTAATGCGTAAATGCGTAAATTTGCGCGTGGCGTATGCGTGCAT\n") f.write("ATGCGTGCGTAAATGCGTAATGCGTAAATGCGTTTTTTTGCGCGTGGCGTATGCGTGCAT\n") f.write("ATGCGTGCGTAAATGCGTAATGCGTAAATGCGTAAATTTGCGCGTAAAGTATGCGTGCAT\n") f.close() f = open(self._expFileName, "w") f.write(">1 seq1 {Cut} 1..180\n") f.write("ATGCGTGCGTAAATGCGTAATGCGTAAATGCGTAAATTTGCGCGTGGCGTATGCGTGCAT\n") f.write("ATGCGTGCGTAAATGCGTAATGCGTAAATGCGTTTTTTTGCGCGTGGCGTATGCGTGCAT\n") f.write("ATGCGTGCGTAAATGCGTAATGCGTAAATGCGTAAATTTGCGCGTAAAGTATGCGTGCAT\n") f.write(">2 seq2 {Cut} 1..180\n") f.write("ATGCGTGCGTAAATGCGTAATGCGTAAATGCGTAAATTTGCGCGTGGCGTATGCGTGCAT\n") f.write("ATGCGTGCGTAAATGCGTAATGCGTAAATGCGTTTTTTTGCGCGTGGCGTATGCGTGCAT\n") f.write("ATGCGTGCGTAAATGCGTAATGCGTAAATGCGTAAATTTGCGCGTAAAGTATGCGTGCAT\n") f.close() self._repbaseBLRn.formatRepbase_ntIfNecessary() obsFileName = self._repbaseName + "_cut" self.assertTrue(FileUtils.are2FilesIdentical(self._expFileName, obsFileName)) def test_formatRepbase_ntIfNecessary_withLogFile( self ): f = open(self._repbaseName, "w") f.write(">seq1\n") f.write("ATGCGTGCGTAAATGCGTAATGCGTAAATGCGTAAATTTGCGCGTGGCGTATGCGTGCAT\n") f.write("ATGCGTGCGTAAATGCGTAATGCGTAAATGCGTTTTTTTGCGCGTGGCGTATGCGTGCAT\n") f.write("ATGCGTGCGTAAATGCGTAATGCGTAAATGCGTAAATTTGCGCGTAAAGTATGCGTGCAT\n") f.write(">seq2\n") f.write("ATGCGTGCGTAAATGCGTAATGCGTAAATGCGTAAATTTGCGCGTGGCGTATGCGTGCAT\n") f.write("ATGCGTGCGTAAATGCGTAATGCGTAAATGCGTTTTTTTGCGCGTGGCGTATGCGTGCAT\n") f.write("ATGCGTGCGTAAATGCGTAATGCGTAAATGCGTAAATTTGCGCGTAAAGTATGCGTGCAT\n") f.close() f = open(self._expFileName, "w") f.write(">1 seq1 {Cut} 1..180\n") f.write("ATGCGTGCGTAAATGCGTAATGCGTAAATGCGTAAATTTGCGCGTGGCGTATGCGTGCAT\n") f.write("ATGCGTGCGTAAATGCGTAATGCGTAAATGCGTTTTTTTGCGCGTGGCGTATGCGTGCAT\n") f.write("ATGCGTGCGTAAATGCGTAATGCGTAAATGCGTAAATTTGCGCGTAAAGTATGCGTGCAT\n") f.write(">2 seq2 {Cut} 1..180\n") f.write("ATGCGTGCGTAAATGCGTAATGCGTAAATGCGTAAATTTGCGCGTGGCGTATGCGTGCAT\n") f.write("ATGCGTGCGTAAATGCGTAATGCGTAAATGCGTTTTTTTGCGCGTGGCGTATGCGTGCAT\n") f.write("ATGCGTGCGTAAATGCGTAATGCGTAAATGCGTAAATTTGCGCGTAAAGTATGCGTGCAT\n") f.close() repbaseBLRnWithLog = RepbaseBLRnForClassifierStep1(self._inFileName, self._launch_1, self._launch_2, self._cDir, self._tmpDir, self._configFileName, self._verbose, self._pL, self._project) repbaseBLRnWithLog.formatRepbase_ntIfNecessary() obsFileName = self._repbaseName + "_cut" self.assertTrue(FileUtils.are2FilesIdentical(self._expFileName, obsFileName)) def test_createCmdToLaunch( self ): bank = self._config.get("detect_features","TE_nucl_bank") obsCmd = self._repbaseBLRn.createCmdToLaunch() expCmd = self._launch_1 + os.environ["REPET_PATH"] + "/bin/blaster" expCmd += " -q %s" % ( self._inFileName ) expCmd += " -s %s/%s" % ( self._cDir, bank ) expCmd += " -B %s_BLRn_%s" % ( self._inFileName, bank ) expCmd += " -n blastn" if self._config.get("detect_features","wublast") == "yes": expCmd += " -W" expCmd += " -r" expCmd += " -v 1" expCmd += self._launch_2 expCmd += "if not os.path.exists( \"%s/%s_BLRn_%s.param\" ):\n" % ( self._cDir, self._inFileName, bank ) expCmd += "\tos.system( \"mv %s_BLRn_%s.param %s\" )\n" % ( self._inFileName, bank, self._cDir ) expCmd += "if os.path.exists( \"%s_cut\" ):\n" % ( self._inFileName ) expCmd += "\tos.system( \"rm -f %s_cut*\" )\n" % ( self._inFileName ) expCmd += "if os.path.exists( \"%s.Nstretch.map\" ):\n" % ( self._inFileName ) expCmd += "\tos.remove( \"%s.Nstretch.map\" )\n" % ( self._inFileName ) expCmd += "if os.path.exists( \"%s_BLRn_%s.raw\" ):\n" % ( self._inFileName, bank ) expCmd += "\tos.remove( \"%s_BLRn_%s.raw\" )\n" % ( self._inFileName, bank ) expCmd += "if os.path.exists( \"%s_BLRn_%s.seq_treated\" ):\n" % ( self._inFileName, bank ) expCmd += "\tos.remove( \"%s_BLRn_%s.seq_treated\" )\n" % ( self._inFileName, bank ) expCmd += self._launch_1 expCmd += os.environ["REPET_PATH"] + "/bin/matcher" expCmd += " -m %s_BLRn_%s.align" % ( self._inFileName, bank ) expCmd += " -q %s" % ( self._inFileName ) expCmd += " -s %s/%s" % ( self._cDir, bank ) expCmd += " -j" expCmd += " -v 1" expCmd += self._launch_2 expCmd += "if not os.path.exists( \"%s/%s_BLRn_%s.align.clean_match.path\" ):\n" % ( self._cDir, self._inFileName, bank ) expCmd += "\tos.system( \"mv %s_BLRn_%s.align.clean_match.path %s\" )\n" % ( self._inFileName, bank, self._cDir ) expCmd += "if not os.path.exists( \"%s/%s_BLRn_%s.align.clean_match.param\" ):\n" % ( self._cDir, self._inFileName, bank ) expCmd += "\tos.system( \"mv %s_BLRn_%s.align.clean_match.param %s\" )\n" % ( self._inFileName, bank, self._cDir ) expCmd += "if os.path.exists( \"%s_BLRn_%s.align\" ):\n" % ( self._inFileName, bank ) expCmd += "\tos.remove( \"%s_BLRn_%s.align\" )\n" % ( self._inFileName, bank ) expCmd += "if os.path.exists( \"%s_BLRn_%s.align.clean_match.fa\" ):\n" % ( self._inFileName, bank ) expCmd += "\tos.remove( \"%s_BLRn_%s.align.clean_match.fa\" )\n" % ( self._inFileName, bank ) expCmd += "if os.path.exists( \"%s_BLRn_%s.align.clean_match.map\" ):\n" % ( self._inFileName, bank ) expCmd += "\tos.remove( \"%s_BLRn_%s.align.clean_match.map\" )\n" % ( self._inFileName, bank ) expCmd += "if os.path.exists( \"%s_BLRn_%s.align.clean_match.tab\" ):\n" % ( self._inFileName, bank ) expCmd += "\tos.remove( \"%s_BLRn_%s.align.clean_match.tab\" )\n" % ( self._inFileName, bank ) if self._tmpDir != self._cDir: expCmd += "if os.path.exists( \"%s\" ):\n" % ( bank ) expCmd += "\tos.remove( \"%s\" )\n" % ( bank ) self.assertEquals( expCmd, obsCmd ) def test_concatPathFile( self ): os.chdir(CURRENT_DIR) if not FileUtils.isRessourceExists("TE_BLRn"): os.mkdir( "TE_BLRn" ) self._createPathFiles(self._bank) os.chdir( "TE_BLRn" ) f = open(self._expFileName, "w") f.write("1\tQueryName1\t2\t250\tsubjectName1\t5\t255\t4.1e-39\t132\t88.2\n") f.write("2\tQueryName1\t255\t550\tsubjectName2\t5\t255\t0.0002\t32\t78.2\n") f.write("3\tQueryName2\t1\t150\tsubjectName1\t250\t400\t5.1e-59\t132\t98\n") f.write("4\tQueryName3\t2\t250\tsubjectName3\t5\t255\t4.1e-39\t132\t88.2\n") f.write("5\tQueryName1\t300\t450\tsubjectName1\t300\t450\t4.1e-39\t132\t80.2\n") f.write("1\tQueryName4\t2\t250\tsubjectName1\t5\t255\t4.1e-39\t132\t88.2\n") f.write("2\tQueryName4\t255\t550\tsubjectName2\t5\t255\t0.0002\t32\t78.2\n") f.write("3\tQueryName5\t1\t150\tsubjectName1\t250\t400\t5.1e-59\t132\t98\n") f.write("4\tQueryName6\t2\t250\tsubjectName3\t5\t255\t4.1e-39\t132\t88.2\n") f.write("5\tQueryName7\t300\t450\tsubjectName1\t300\t450\t4.1e-39\t132\t80.2\n") f.close() FileUtils.sortFileContent(self._expFileName) obsFileName = "%s_BLRn_%s.align.clean_match.path.tmp" % (self._project, self._bank) self._repbaseBLRn._concatPathFile(self._bank) FileUtils.sortFileContent(obsFileName) self.assertTrue(FileUtils.are2FilesIdentical(self._expFileName, obsFileName)) def test_adaptIDInPathFile( self ): f = open("%s_BLRn_%s.align.clean_match.path.tmp" % (self._project, self._bank), "w") f.write("1\tQueryName1\t2\t250\tsubjectName1\t5\t255\t4.1e-39\t132\t88.2\n") f.write("2\tQueryName1\t255\t550\tsubjectName2\t5\t255\t0.0002\t32\t78.2\n") f.write("3\tQueryName2\t1\t150\tsubjectName1\t250\t400\t5.1e-59\t132\t98\n") f.write("4\tQueryName3\t2\t250\tsubjectName3\t5\t255\t4.1e-39\t132\t88.2\n") f.write("5\tQueryName1\t300\t450\tsubjectName1\t300\t450\t4.1e-39\t132\t80.2\n") f.write("1\tQueryName4\t2\t250\tsubjectName1\t5\t255\t4.1e-39\t132\t88.2\n") f.write("2\tQueryName4\t255\t550\tsubjectName2\t5\t255\t0.0002\t32\t78.2\n") f.write("3\tQueryName5\t1\t150\tsubjectName1\t250\t400\t5.1e-59\t132\t98\n") f.write("4\tQueryName6\t2\t250\tsubjectName3\t5\t255\t4.1e-39\t132\t88.2\n") f.write("5\tQueryName7\t300\t450\tsubjectName1\t300\t450\t4.1e-39\t132\t80.2\n") f.close() self._repbaseBLRn._adaptIDInPathFile(self._bank) f = open(self._expFileName, "w") f.write("1\tQueryName1\t2\t250\tsubjectName1\t5\t255\t4.1e-39\t132\t88.2\n") f.write("2\tQueryName1\t255\t550\tsubjectName2\t5\t255\t0.0002\t32\t78.2\n") f.write("3\tQueryName2\t1\t150\tsubjectName1\t250\t400\t5.1e-59\t132\t98\n") f.write("4\tQueryName3\t2\t250\tsubjectName3\t5\t255\t4.1e-39\t132\t88.2\n") f.write("5\tQueryName1\t300\t450\tsubjectName1\t300\t450\t4.1e-39\t132\t80.2\n") f.write("6\tQueryName4\t2\t250\tsubjectName1\t5\t255\t4.1e-39\t132\t88.2\n") f.write("7\tQueryName4\t255\t550\tsubjectName2\t5\t255\t0.0002\t32\t78.2\n") f.write("8\tQueryName5\t1\t150\tsubjectName1\t250\t400\t5.1e-59\t132\t98\n") f.write("9\tQueryName6\t2\t250\tsubjectName3\t5\t255\t4.1e-39\t132\t88.2\n") f.write("10\tQueryName7\t300\t450\tsubjectName1\t300\t450\t4.1e-39\t132\t80.2\n") f.close() obsFileName = "%s_BLRn_%s.align.clean_match.path" % (self._project, self._bank) self.assertTrue(FileUtils.are2FilesIdentical(self._expFileName, obsFileName)) def test_loadPathFileInTable( self ): os.chdir(CURRENT_DIR) if not FileUtils.isRessourceExists("TE_BLRn"): os.mkdir( "TE_BLRn" ) self._createPathFiles(self._bank) os.chdir( "TE_BLRn" ) f = open("%s_BLRn_%s.align.clean_match.path" % (self._project, self._bank), "w") f.write("1\tQueryName1\t2\t250\tsubjectName1\t5\t255\t4.1e-39\t132\t88.2\n") f.write("2\tQueryName1\t255\t550\tsubjectName2\t5\t255\t0.0002\t32\t78.2\n") f.write("3\tQueryName2\t1\t150\tsubjectName1\t250\t400\t5.1e-59\t132\t98\n") f.write("4\tQueryName3\t2\t250\tsubjectName3\t5\t255\t4.1e-39\t132\t88.2\n") f.write("5\tQueryName1\t300\t450\tsubjectName1\t300\t450\t4.1e-39\t132\t80.2\n") f.write("6\tQueryName4\t2\t250\tsubjectName1\t5\t255\t4.1e-39\t132\t88.2\n") f.write("7\tQueryName4\t255\t550\tsubjectName2\t5\t255\t0.0002\t32\t78.2\n") f.write("8\tQueryName5\t1\t150\tsubjectName1\t250\t400\t5.1e-59\t132\t98\n") f.write("9\tQueryName6\t2\t250\tsubjectName3\t5\t255\t4.1e-39\t132\t88.2\n") f.write("10\tQueryName7\t300\t450\tsubjectName1\t300\t450\t4.1e-39\t132\t80.2\n") f.close() self._repbaseBLRn._loadPathFileInTable(self._bank) resultFromFilePathList = self._readPathResultsFromFileAndFillList() resultFromTablePathList = self._readPathResultsFromTableAndFillList() self.assertEquals(resultFromFilePathList, resultFromTablePathList) def test_findAndRemoveUselessFiles( self ): self._createFile("%s_BLRn_%s.align.clean_match.path.tmp" % (self._project, self._bank)) self._createFile("batch_1.fa_BLRn_%s.*" % (self._bank)) self._createFile("batch_2.fa_BLRn_%s.*" % (self._bank)) self._repbaseBLRn._findAndRemoveUselessFiles(self._bank) self.assertFalse(FileUtils.isRessourceExists("%s_BLRn_%s.align.clean_match.path.tmp" % (self._project, self._bank))) self.assertFalse(FileUtils.isRessourceExists("batch_1.fa_BLRn_%s.*" % (self._bank))) self.assertFalse(FileUtils.isRessourceExists("batch_2.fa_BLRn_%s.*" % (self._bank))) def test_collectRepbaseBLRn( self ): os.chdir(CURRENT_DIR) if not FileUtils.isRessourceExists("TE_BLRn"): os.mkdir( "TE_BLRn" ) self._createPathFiles(self._bank) os.chdir( "TE_BLRn" ) self._repbaseBLRn.collectRepbaseBLRn() self.assertTrue(FileUtils.isRessourceExists("%s_BLRn_%s.align.clean_match.path" % ( self._project, self._bank ))) self.assertEquals(NB_EXPECTED_LINES_IN_PATH_FILE, FileUtils.getNbLinesInSingleFile("%s_BLRn_%s.align.clean_match.path" % ( self._project, self._bank ))) self.assertTrue(self._db.doesTableExist("%s_TE_BLRn_path" % (self._project))) self._db.execute('select * from %s_TE_BLRn_path' % (self._project)) self.assertEquals(NB_EXPECTED_LINES_IN_PATH_FILE, len(self._db.fetchall())) self._db.dropTable("%s_TE_BLRn_path" % (self._project)) def _createPathFiles( self, bankFull ): bank = os.path.split(bankFull)[1] fileName = "batch_1.fa_BLRn_" + bank + ".align.clean_match.path" f = open(fileName, "w") f.write("1\tQueryName1\t2\t250\tsubjectName1\t5\t255\t4.1e-39\t132\t88.2\n") f.write("2\tQueryName1\t255\t550\tsubjectName2\t5\t255\t0.0002\t32\t78.2\n") f.write("3\tQueryName2\t1\t150\tsubjectName1\t250\t400\t5.1e-59\t132\t98\n") f.write("4\tQueryName3\t2\t250\tsubjectName3\t5\t255\t4.1e-39\t132\t88.2\n") f.write("5\tQueryName1\t300\t450\tsubjectName1\t300\t450\t4.1e-39\t132\t80.2\n") f.close() fileName = "batch_2.fa_BLRn_" + bank + ".align.clean_match.path" f = open(fileName, "w") f.write("1\tQueryName4\t2\t250\tsubjectName1\t5\t255\t4.1e-39\t132\t88.2\n") f.write("2\tQueryName4\t255\t550\tsubjectName2\t5\t255\t0.0002\t32\t78.2\n") f.write("3\tQueryName5\t1\t150\tsubjectName1\t250\t400\t5.1e-59\t132\t98\n") f.write("4\tQueryName6\t2\t250\tsubjectName3\t5\t255\t4.1e-39\t132\t88.2\n") f.write("5\tQueryName7\t300\t450\tsubjectName1\t300\t450\t4.1e-39\t132\t80.2\n") f.close() def _readPathResultsFromTableAndFillList( self ): tablePathAdaptatorInstance = TablePathAdaptator (self._db, "%s_TE_BLRn_path" % (self._project)) pathList = tablePathAdaptatorInstance.getListOfAllPaths() return pathList def _readPathResultsFromFileAndFillList( self ): pathInstance = Path() pathList = [] f = open( "%s_BLRn_%s.align.clean_match.path" % (self._project, self._bank) , "r") while pathInstance.read( f ): pathList.append(pathInstance) pathInstance = Path() f.close() return pathList def _createFile( self, nameFile ): f = open(nameFile, "w") f.close() test_suite = unittest.TestSuite() test_suite.addTest( unittest.makeSuite( Test_RepbaseBLRnForClassifierStep1 ) ) if __name__ == "__main__": unittest.TextTestRunner(verbosity=2).run( test_suite )