Mercurial > repos > yufei-luo > s_mart
diff commons/core/sql/test/Test_TableSeqAdaptator.py @ 6:769e306b7933
Change the repository level.
author | yufei-luo |
---|---|
date | Fri, 18 Jan 2013 04:54:14 -0500 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/commons/core/sql/test/Test_TableSeqAdaptator.py Fri Jan 18 04:54:14 2013 -0500 @@ -0,0 +1,321 @@ +# Copyright INRA (Institut National de la Recherche Agronomique) +# http://www.inra.fr +# http://urgi.versailles.inra.fr +# +# This software is governed by the CeCILL license under French law and +# abiding by the rules of distribution of free software. You can use, +# modify and/ or redistribute the software under the terms of the CeCILL +# license as circulated by CEA, CNRS and INRIA at the following URL +# "http://www.cecill.info". +# +# As a counterpart to the access to the source code and rights to copy, +# modify and redistribute granted by the license, users are provided only +# with a limited warranty and the software's author, the holder of the +# economic rights, and the successive licensors have only limited +# liability. +# +# In this respect, the user's attention is drawn to the risks associated +# with loading, using, modifying and/or developing or reproducing the +# software by the user in light of its specific status of free software, +# that may mean that it is complicated to manipulate, and that also +# therefore means that it is reserved for developers and experienced +# professionals having in-depth computer knowledge. Users are therefore +# encouraged to load and test the software's suitability as regards their +# requirements in conditions enabling the security of their systems and/or +# data to be ensured and, more generally, to use and operate it in the +# same conditions as regards security. +# +# The fact that you are presently reading this means that you have had +# knowledge of the CeCILL license and that you accept its terms. + + +import unittest +import os +import time +from commons.core.sql.DbMySql import DbMySql +from commons.core.sql.TableSeqAdaptator import TableSeqAdaptator +from commons.core.seq.Bioseq import Bioseq +from commons.core.coord.Set import Set +from commons.core.utils.FileUtils import FileUtils + + +class Test_TableSeqAdaptator( unittest.TestCase ): + + def setUp( self ): + self._uniqId = "%s_%s" % ( time.strftime("%Y%m%d%H%M%S") , os.getpid() ) + self.fileUtils = FileUtils() + self._configFileName = "dummyConfigFile_%s" % ( self._uniqId ) + configF = open(self._configFileName, "w" ) + configF.write( "[repet_env]\n" ) + configF.write( "repet_host: %s\n" % ( os.environ["REPET_HOST"] ) ) + configF.write( "repet_user: %s\n" % ( os.environ["REPET_USER"] ) ) + configF.write( "repet_pw: %s\n" % ( os.environ["REPET_PW"] ) ) + configF.write( "repet_db: %s\n" % ( os.environ["REPET_DB"] ) ) + configF.write( "repet_port: %s\n" % ( os.environ["REPET_PORT"] ) ) + configF.close() + self._db = DbMySql( cfgFileName=self._configFileName ) + self._table = "dummySeqTable_%s" % ( self._uniqId ) + self._tsA = TableSeqAdaptator( self._db, self._table ) + + + def tearDown( self ): + self._db.dropTable( self._table ) + self._db.close() + os.remove( self._configFileName ) + self._configFileName = "" + + +################################################################################## +################## Tests for methods in ITableSeqAdaptator ####################### +################################################################################## + + def test_insert( self ): + bs = Bioseq( "seq1", "AGCGATGACGATGCGAGT" ) + self._db.createTable( self._table, "fasta" ) + self._tsA.insert( bs ) + + expBioseqTuple = (("seq1", "AGCGATGACGATGCGAGT", "seq1", 18L), ) + + sqlCmd = "SELECT * FROM %s" % ( self._table ) + self._db.execute( sqlCmd ) + obsBioseqTuple = self._db.cursor.fetchall() + + self.assertEqual( expBioseqTuple, obsBioseqTuple ) + + + def test_insertList( self ): + bs1 = Bioseq( "seq1 desc", "AGCGATGACGATGCGAGT" ) + bs2 = Bioseq( "seq2", "AGCGATGACGATGCGAGT") + bs3 = Bioseq( "seq3", "GCGATGCAGATGACGGCGGATGC") + lBioseq = [ bs1, bs2, bs3 ] + self._db.createTable( self._table, "fasta" ) + self._tsA.insertList( lBioseq ) + + tuple1 = ("seq1", "AGCGATGACGATGCGAGT", "seq1 desc", 18L) + tuple2 = ("seq2", "AGCGATGACGATGCGAGT", "seq2", 18L) + tuple3 = ("seq3", "GCGATGCAGATGACGGCGGATGC", "seq3", 23L) + expBioseqTuple = ( tuple1, tuple2, tuple3 ) + + sqlCmd = "SELECT * FROM %s" % ( self._table ) + self._db.execute( sqlCmd ) + obsBioseqTuple = self._db.cursor.fetchall() + + self.assertEquals(expBioseqTuple, obsBioseqTuple) + + + def test_getAccessionsList(self): + faFileName = "dummyFaFile_%s" % ( self._uniqId ) + faF = open( faFileName, "w" ) + faF.write(">seq1\n") + faF.write("AGCGATGACGATGCGAGT\n") + faF.write(">seq2\n") + faF.write("GCGATGCAGATGACGGCGGATGC\n") + faF.close() + self._db.createTable( self._table, "fasta", faFileName ) + lExp = [ "seq1", "seq2" ] + lExp.sort() + lObs = self._tsA.getAccessionsList() + lObs.sort() + self.assertEqual( lObs, lExp ) + os.remove( faFileName ) + + + def test_saveAccessionsListInFastaFile(self): + expFileName = "dummyFaFile_%s" % ( self._uniqId ) + expF = open( expFileName, "w" ) + expF.write(">seq1\n") + expF.write("AGCGATGACGATGCGAGT\n") + expF.write(">seq2\n") + expF.write("GCGATGCAGATGACGGCGGATGC\n") + expF.close() + self._db.createTable( self._table, "fasta", expFileName ) + lAccessions = [ "seq1", "seq2" ] + obsFileName = "dummyObsFile_%s" % ( self._uniqId ) + self._tsA.saveAccessionsListInFastaFile( lAccessions, obsFileName ) + self.assertTrue( self.fileUtils.are2FilesIdentical( obsFileName, expFileName ) ) + os.remove( expFileName ) + os.remove( obsFileName ) + + def test_exportInFastaFile(self): + expFileName = "dummyFaFile_%s" % ( self._uniqId ) + faF = open( expFileName, "w" ) + faF.write(">seq1\n") + faF.write("AGCGATGACGATGCGAGT\n") + faF.write(">seq2\n") + faF.write("GCGATGCAGATGACGGCGGATGC\n") + faF.close() + self._db.createTable( self._table, "fasta", expFileName ) + obsFileName = "dummyFaFileObs_%s" % ( self._uniqId ) + self._tsA.exportInFastaFile( obsFileName ) + self.assertTrue( self.fileUtils.are2FilesIdentical( obsFileName, expFileName ) ) + os.remove( expFileName ) + os.remove( obsFileName ) + +################################################################################## +########################### Tests for other methods ############################## +################################################################################## + + def test_insertWithBioseqEmpty( self ): + bs = Bioseq( "", "" ) + self._db.createTable( self._table, "fasta" ) + exp = None + obs = self._tsA.insert(bs) + self.assertEqual( exp, obs ) + + + def test_getBioseqFromHeader( self ): + faFileName = "dummyFaFile_%s" % ( self._uniqId ) + faF = open( faFileName, "w" ) + faF.write(">seq1\n") + faF.write("AGCGATGACGATGCGAGT\n") + faF.write(">seq2\n") + faF.write("GCGATGCAGATGACGGCGGATGC\n") + faF.close() + self._db.createTable( self._table, "fasta", faFileName ) + exp = Bioseq( "seq1", "AGCGATGACGATGCGAGT" ) + obs = self._tsA.getBioseqFromHeader( "seq1" ) + self.assertEqual( obs, exp ) + exp = Bioseq( "seq2", "GCGATGCAGATGACGGCGGATGC" ) + obs = self._tsA.getBioseqFromHeader( "seq2" ) + self.assertEqual( obs, exp ) + os.remove( faFileName ) + + + def test_getSeqLengthFromAccession( self ): + inFileName = "dummyFaFile_%s" % ( self._uniqId ) + inF = open( inFileName, "w" ) + inF.write(">seq1\n") + inF.write("AGCGATGACGATGCGAGT\n") + inF.write(">seq2\n") + inF.write("GCGATGCAGATGACGGCGGATGC\n") + inF.close() + self._db.createTable( self._table, "fasta", inFileName ) + exp = 18 + obs = self._tsA.getSeqLengthFromAccession( "seq1" ) + self.assertEqual( obs, exp ) + os.remove( inFileName ) + + + def test_getSeqLengthFromDescription( self ): + inFileName = "dummyFaFile_%s" % ( self._uniqId ) + inF = open( inFileName, "w" ) + inF.write(">seq1 descriptionfield\n") + inF.write("AGCGATGACGATGCGAGT\n") + inF.write(">seq2 descriptionfield\n") + inF.write("GCGATGCAGATGACGGCGGATGC\n") + inF.close() + self._db.createTable( self._table, "fasta", inFileName ) + exp = 18 + obs = self._tsA.getSeqLengthFromDescription( "seq1 descriptionfield" ) + self.assertEqual( obs, exp ) + os.remove( inFileName ) + + + def test_getAccessionAndLengthList( self ): + inFileName = "dummyFaFile_%s" % ( self._uniqId ) + inF = open( inFileName, "w" ) + inF.write(">seq1\n") + inF.write("AGCGATGACGATGCGAGT\n") + inF.write(">seq2\n") + inF.write("GCGATGCAGATGACGGCGGATGC\n") + inF.close() + self._db.createTable( self._table, "fasta", inFileName ) + lSeq1 = ("seq1", 18) + lSeq2 = ("seq2", 23) + lExp = [lSeq1,lSeq2] + lObs = self._tsA.getAccessionAndLengthList() + self.assertEqual( lObs, lExp ) + os.remove( inFileName ) + + + def test_getSeqLengthFromAccessionWithSingleQuote( self ): + inFileName = "dummyFaFile_%s" % ( self._uniqId ) + inF = open( inFileName, "w" ) + inF.write(">seq1'\n") + inF.write("AGCGATGACGATGCGAGT\n") + inF.write(">seq2\n") + inF.write("GCGATGCAGATGACGGCGGATGC\n") + inF.close() + self._db.createTable( self._table, "fasta", inFileName ) + exp = 18 + obs = self._tsA.getSeqLengthFromAccession( "seq1'" ) + self.assertEqual( obs, exp ) + os.remove( inFileName ) + + + def test_getSubSequence_directStrand( self ): + self._db.createTable( self._table, "seq" ) + chr = Bioseq() + chr.setHeader( "chr2" ) + chr.setSequence( "AAAAAAAAAATTTTTGGGGGGGGGG" ) + self._tsA.insert( chr ) + exp = "TTTGGG" + obs = self._tsA.getSubSequence( "chr2", 13, 18 ) + self.assertEqual( exp, obs ) + + + def test_getSubSequence_reverseStrand( self ): + self._db.createTable( self._table, "seq" ) + chr = Bioseq() + chr.setHeader( "chr2" ) + chr.setSequence( "AAAAAAAAAATTTTTGGGGGGGGGG" ) + self._tsA.insert( chr ) + exp = "CCCAAA" + obs = self._tsA.getSubSequence( "chr2", 18, 13 ) + self.assertEqual( exp, obs ) + + + def test_getBioseqFromSetList_directStrand( self ): + self._db.createTable( self._table, "seq" ) + chr = Bioseq() + chr.setHeader( "chr2" ) + chr.setSequence( "AAAAAAAAAATTTTTGGGGGGGGGG" ) + self._tsA.insert( chr ) + lSets = [] + lSets.append( Set( 3, "Dm-B-G600-Map3_classI-LTR-incomp", "chr2", 1, 10 ) ) + lSets.append( Set( 3, "Dm-B-G600-Map3_classI-LTR-incomp", "chr2", 16, 25 ) ) + exp = Bioseq( "Dm-B-G600-Map3_classI-LTR-incomp::3 chr2 1..10,16..25", "AAAAAAAAAAGGGGGGGGGG" ) + obs = self._tsA.getBioseqFromSetList( lSets ) + self.assertEqual( exp, obs ) + + + def test_getBioseqFromSetList_reverseStrand( self ): + self._db.createTable( self._table, "seq" ) + chr = Bioseq() + chr.setHeader( "chr2" ) + chr.setSequence( "AAAAAAAAAATTTTTGGGGGGGGGG" ) + self._tsA.insert( chr ) + lSets = [] + lSets.append( Set( 3, "Dm-B-G600-Map3_classI-LTR-incomp", "chr2", 10, 1 ) ) + lSets.append( Set( 3, "Dm-B-G600-Map3_classI-LTR-incomp", "chr2", 25, 16 ) ) + exp = Bioseq( "Dm-B-G600-Map3_classI-LTR-incomp::3 chr2 25..16,10..1", "CCCCCCCCCCTTTTTTTTTT" ) + obs = self._tsA.getBioseqFromSetList( lSets ) + self.assertEqual( exp, obs ) + + + def test_isAccessionInTable_true( self ): + self._db.createTable( self._table, "seq" ) + chr = Bioseq() + chr.setHeader( "chr2" ) + chr.setSequence( "AAAAAAAAAATTTTTGGGGGGGGGG" ) + self._tsA.insert( chr ) + + obs = self._tsA.isAccessionInTable( "chr2" ) + self.assertTrue( obs ) + + + def test_isAccessionInTable_false( self ): + self._db.createTable( self._table, "seq" ) + chr = Bioseq() + chr.setHeader( "chr2" ) + chr.setSequence( "AAAAAAAAAATTTTTGGGGGGGGGG" ) + self._tsA.insert( chr ) + + obs = self._tsA.isAccessionInTable( "chr1" ) + self.assertFalse( obs ) + + +test_suite = unittest.TestSuite() +test_suite.addTest( unittest.makeSuite( Test_TableSeqAdaptator ) ) +if __name__ == "__main__": + unittest.TextTestRunner(verbosity=2).run( test_suite )