diff commons/core/sql/test/Test_TableSeqAdaptator.py @ 6:769e306b7933

Change the repository level.
author yufei-luo
date Fri, 18 Jan 2013 04:54:14 -0500
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/commons/core/sql/test/Test_TableSeqAdaptator.py	Fri Jan 18 04:54:14 2013 -0500
@@ -0,0 +1,321 @@
+# Copyright INRA (Institut National de la Recherche Agronomique)
+# http://www.inra.fr
+# http://urgi.versailles.inra.fr
+#
+# This software is governed by the CeCILL license under French law and
+# abiding by the rules of distribution of free software.  You can  use, 
+# modify and/ or redistribute the software under the terms of the CeCILL
+# license as circulated by CEA, CNRS and INRIA at the following URL
+# "http://www.cecill.info". 
+#
+# As a counterpart to the access to the source code and  rights to copy,
+# modify and redistribute granted by the license, users are provided only
+# with a limited warranty  and the software's author,  the holder of the
+# economic rights,  and the successive licensors  have only  limited
+# liability. 
+#
+# In this respect, the user's attention is drawn to the risks associated
+# with loading,  using,  modifying and/or developing or reproducing the
+# software by the user in light of its specific status of free software,
+# that may mean  that it is complicated to manipulate,  and  that  also
+# therefore means  that it is reserved for developers  and  experienced
+# professionals having in-depth computer knowledge. Users are therefore
+# encouraged to load and test the software's suitability as regards their
+# requirements in conditions enabling the security of their systems and/or 
+# data to be ensured and,  more generally, to use and operate it in the 
+# same conditions as regards security. 
+#
+# The fact that you are presently reading this means that you have had
+# knowledge of the CeCILL license and that you accept its terms.
+
+
+import unittest
+import os
+import time
+from commons.core.sql.DbMySql import DbMySql
+from commons.core.sql.TableSeqAdaptator import TableSeqAdaptator
+from commons.core.seq.Bioseq import Bioseq
+from commons.core.coord.Set import Set
+from commons.core.utils.FileUtils import FileUtils
+
+
+class Test_TableSeqAdaptator( unittest.TestCase ):
+    
+    def setUp( self ):
+        self._uniqId = "%s_%s" % ( time.strftime("%Y%m%d%H%M%S") , os.getpid() )
+        self.fileUtils = FileUtils()
+        self._configFileName = "dummyConfigFile_%s" % ( self._uniqId )
+        configF = open(self._configFileName, "w" )
+        configF.write( "[repet_env]\n" )
+        configF.write( "repet_host: %s\n" % ( os.environ["REPET_HOST"] ) )
+        configF.write( "repet_user: %s\n" % ( os.environ["REPET_USER"] ) )
+        configF.write( "repet_pw: %s\n" % ( os.environ["REPET_PW"] ) )
+        configF.write( "repet_db: %s\n" % ( os.environ["REPET_DB"] ) )
+        configF.write( "repet_port: %s\n" % ( os.environ["REPET_PORT"] ) )
+        configF.close()
+        self._db = DbMySql( cfgFileName=self._configFileName )
+        self._table = "dummySeqTable_%s" % ( self._uniqId )
+        self._tsA = TableSeqAdaptator( self._db, self._table )
+        
+        
+    def tearDown( self ):
+        self._db.dropTable( self._table )
+        self._db.close()
+        os.remove( self._configFileName )
+        self._configFileName = ""
+        
+        
+##################################################################################
+################## Tests for methods in ITableSeqAdaptator #######################
+##################################################################################
+        
+    def test_insert( self ):
+        bs = Bioseq( "seq1", "AGCGATGACGATGCGAGT" )
+        self._db.createTable( self._table, "fasta" )
+        self._tsA.insert( bs )
+        
+        expBioseqTuple = (("seq1", "AGCGATGACGATGCGAGT", "seq1", 18L), )
+        
+        sqlCmd = "SELECT * FROM %s" % ( self._table )
+        self._db.execute( sqlCmd )
+        obsBioseqTuple = self._db.cursor.fetchall()
+        
+        self.assertEqual( expBioseqTuple, obsBioseqTuple )
+        
+        
+    def test_insertList( self ):
+        bs1 = Bioseq( "seq1 desc", "AGCGATGACGATGCGAGT" )
+        bs2 = Bioseq( "seq2", "AGCGATGACGATGCGAGT")
+        bs3 = Bioseq( "seq3", "GCGATGCAGATGACGGCGGATGC")
+        lBioseq = [ bs1, bs2, bs3 ]
+        self._db.createTable( self._table, "fasta" )
+        self._tsA.insertList( lBioseq )
+        
+        tuple1 = ("seq1", "AGCGATGACGATGCGAGT", "seq1 desc", 18L)
+        tuple2 = ("seq2", "AGCGATGACGATGCGAGT", "seq2", 18L)
+        tuple3 = ("seq3", "GCGATGCAGATGACGGCGGATGC", "seq3", 23L)
+        expBioseqTuple = ( tuple1, tuple2, tuple3 )
+        
+        sqlCmd = "SELECT * FROM %s" % ( self._table )
+        self._db.execute( sqlCmd )
+        obsBioseqTuple = self._db.cursor.fetchall()
+        
+        self.assertEquals(expBioseqTuple, obsBioseqTuple)
+        
+        
+    def test_getAccessionsList(self):
+        faFileName = "dummyFaFile_%s" % ( self._uniqId )
+        faF = open( faFileName, "w" )
+        faF.write(">seq1\n")
+        faF.write("AGCGATGACGATGCGAGT\n")
+        faF.write(">seq2\n")
+        faF.write("GCGATGCAGATGACGGCGGATGC\n")
+        faF.close()
+        self._db.createTable( self._table, "fasta", faFileName )
+        lExp = [ "seq1", "seq2" ]
+        lExp.sort()
+        lObs = self._tsA.getAccessionsList()
+        lObs.sort()
+        self.assertEqual( lObs, lExp )
+        os.remove( faFileName )
+        
+        
+    def test_saveAccessionsListInFastaFile(self):
+        expFileName = "dummyFaFile_%s" % ( self._uniqId )
+        expF = open( expFileName, "w" )
+        expF.write(">seq1\n")
+        expF.write("AGCGATGACGATGCGAGT\n")
+        expF.write(">seq2\n")
+        expF.write("GCGATGCAGATGACGGCGGATGC\n")
+        expF.close()
+        self._db.createTable( self._table, "fasta", expFileName )
+        lAccessions = [ "seq1", "seq2" ]
+        obsFileName = "dummyObsFile_%s" % ( self._uniqId )
+        self._tsA.saveAccessionsListInFastaFile( lAccessions, obsFileName )
+        self.assertTrue( self.fileUtils.are2FilesIdentical( obsFileName, expFileName ) )
+        os.remove( expFileName )
+        os.remove( obsFileName )
+        
+    def test_exportInFastaFile(self):
+        expFileName = "dummyFaFile_%s" % ( self._uniqId )
+        faF = open( expFileName, "w" )
+        faF.write(">seq1\n")
+        faF.write("AGCGATGACGATGCGAGT\n")
+        faF.write(">seq2\n")
+        faF.write("GCGATGCAGATGACGGCGGATGC\n")
+        faF.close()
+        self._db.createTable( self._table, "fasta", expFileName )
+        obsFileName = "dummyFaFileObs_%s" % ( self._uniqId )
+        self._tsA.exportInFastaFile( obsFileName )
+        self.assertTrue( self.fileUtils.are2FilesIdentical( obsFileName, expFileName ) )
+        os.remove( expFileName )
+        os.remove( obsFileName )
+
+##################################################################################
+########################### Tests for other methods ##############################
+##################################################################################
+        
+    def test_insertWithBioseqEmpty( self ):
+        bs = Bioseq( "", "" )
+        self._db.createTable( self._table, "fasta" )
+        exp = None
+        obs = self._tsA.insert(bs)
+        self.assertEqual( exp, obs )
+        
+        
+    def test_getBioseqFromHeader( self ):
+        faFileName = "dummyFaFile_%s" % ( self._uniqId )
+        faF = open( faFileName, "w" )
+        faF.write(">seq1\n")
+        faF.write("AGCGATGACGATGCGAGT\n")
+        faF.write(">seq2\n")
+        faF.write("GCGATGCAGATGACGGCGGATGC\n")
+        faF.close()
+        self._db.createTable( self._table, "fasta", faFileName )
+        exp = Bioseq( "seq1", "AGCGATGACGATGCGAGT" )
+        obs = self._tsA.getBioseqFromHeader( "seq1" )
+        self.assertEqual( obs, exp )
+        exp = Bioseq( "seq2", "GCGATGCAGATGACGGCGGATGC" )
+        obs = self._tsA.getBioseqFromHeader( "seq2" )
+        self.assertEqual( obs, exp )
+        os.remove( faFileName )
+        
+        
+    def test_getSeqLengthFromAccession( self ):
+        inFileName = "dummyFaFile_%s" % ( self._uniqId )
+        inF = open( inFileName, "w" )
+        inF.write(">seq1\n")
+        inF.write("AGCGATGACGATGCGAGT\n")
+        inF.write(">seq2\n")
+        inF.write("GCGATGCAGATGACGGCGGATGC\n")
+        inF.close()
+        self._db.createTable( self._table, "fasta", inFileName )
+        exp = 18
+        obs = self._tsA.getSeqLengthFromAccession( "seq1" )
+        self.assertEqual( obs, exp )
+        os.remove( inFileName )
+
+
+    def test_getSeqLengthFromDescription( self ):
+        inFileName = "dummyFaFile_%s" % ( self._uniqId )
+        inF = open( inFileName, "w" )
+        inF.write(">seq1 descriptionfield\n")
+        inF.write("AGCGATGACGATGCGAGT\n")
+        inF.write(">seq2 descriptionfield\n")
+        inF.write("GCGATGCAGATGACGGCGGATGC\n")
+        inF.close()
+        self._db.createTable( self._table, "fasta", inFileName )
+        exp = 18
+        obs = self._tsA.getSeqLengthFromDescription( "seq1 descriptionfield" )
+        self.assertEqual( obs, exp )
+        os.remove( inFileName )
+        
+        
+    def test_getAccessionAndLengthList( self ):
+        inFileName = "dummyFaFile_%s" % ( self._uniqId )
+        inF = open( inFileName, "w" )
+        inF.write(">seq1\n")
+        inF.write("AGCGATGACGATGCGAGT\n")
+        inF.write(">seq2\n")
+        inF.write("GCGATGCAGATGACGGCGGATGC\n")
+        inF.close()
+        self._db.createTable( self._table, "fasta", inFileName )
+        lSeq1 = ("seq1", 18)
+        lSeq2 = ("seq2", 23)
+        lExp = [lSeq1,lSeq2]
+        lObs = self._tsA.getAccessionAndLengthList()
+        self.assertEqual( lObs, lExp )
+        os.remove( inFileName )
+        
+        
+    def test_getSeqLengthFromAccessionWithSingleQuote( self ):
+        inFileName = "dummyFaFile_%s" % ( self._uniqId )
+        inF = open( inFileName, "w" )
+        inF.write(">seq1'\n")
+        inF.write("AGCGATGACGATGCGAGT\n")
+        inF.write(">seq2\n")
+        inF.write("GCGATGCAGATGACGGCGGATGC\n")
+        inF.close()
+        self._db.createTable( self._table, "fasta", inFileName )
+        exp = 18
+        obs = self._tsA.getSeqLengthFromAccession( "seq1'" )
+        self.assertEqual( obs, exp )
+        os.remove( inFileName )
+        
+        
+    def test_getSubSequence_directStrand( self ):
+        self._db.createTable( self._table, "seq" )
+        chr = Bioseq()
+        chr.setHeader( "chr2" )
+        chr.setSequence( "AAAAAAAAAATTTTTGGGGGGGGGG" )
+        self._tsA.insert( chr )
+        exp = "TTTGGG"
+        obs = self._tsA.getSubSequence( "chr2", 13, 18 )
+        self.assertEqual( exp, obs )
+        
+        
+    def test_getSubSequence_reverseStrand( self ):
+        self._db.createTable( self._table, "seq" )
+        chr = Bioseq()
+        chr.setHeader( "chr2" )
+        chr.setSequence( "AAAAAAAAAATTTTTGGGGGGGGGG" )
+        self._tsA.insert( chr )
+        exp = "CCCAAA"
+        obs = self._tsA.getSubSequence( "chr2", 18, 13 )
+        self.assertEqual( exp, obs )
+        
+        
+    def test_getBioseqFromSetList_directStrand( self ):
+        self._db.createTable( self._table, "seq" )
+        chr = Bioseq()
+        chr.setHeader( "chr2" )
+        chr.setSequence( "AAAAAAAAAATTTTTGGGGGGGGGG" )
+        self._tsA.insert( chr )
+        lSets = []
+        lSets.append( Set( 3, "Dm-B-G600-Map3_classI-LTR-incomp", "chr2", 1, 10 ) )
+        lSets.append( Set( 3, "Dm-B-G600-Map3_classI-LTR-incomp", "chr2", 16, 25 ) )
+        exp = Bioseq( "Dm-B-G600-Map3_classI-LTR-incomp::3 chr2 1..10,16..25", "AAAAAAAAAAGGGGGGGGGG" )
+        obs = self._tsA.getBioseqFromSetList( lSets )
+        self.assertEqual( exp, obs )
+        
+        
+    def test_getBioseqFromSetList_reverseStrand( self ):
+        self._db.createTable( self._table, "seq" )
+        chr = Bioseq()
+        chr.setHeader( "chr2" )
+        chr.setSequence( "AAAAAAAAAATTTTTGGGGGGGGGG" )
+        self._tsA.insert( chr )
+        lSets = []
+        lSets.append( Set( 3, "Dm-B-G600-Map3_classI-LTR-incomp", "chr2", 10, 1 ) )
+        lSets.append( Set( 3, "Dm-B-G600-Map3_classI-LTR-incomp", "chr2", 25, 16 ) )
+        exp = Bioseq( "Dm-B-G600-Map3_classI-LTR-incomp::3 chr2 25..16,10..1", "CCCCCCCCCCTTTTTTTTTT" )
+        obs = self._tsA.getBioseqFromSetList( lSets )
+        self.assertEqual( exp, obs )
+        
+        
+    def test_isAccessionInTable_true( self ):
+        self._db.createTable( self._table, "seq" )
+        chr = Bioseq()
+        chr.setHeader( "chr2" )
+        chr.setSequence( "AAAAAAAAAATTTTTGGGGGGGGGG" )
+        self._tsA.insert( chr )
+        
+        obs = self._tsA.isAccessionInTable( "chr2" )
+        self.assertTrue( obs )
+        
+        
+    def test_isAccessionInTable_false( self ):
+        self._db.createTable( self._table, "seq" )
+        chr = Bioseq()
+        chr.setHeader( "chr2" )
+        chr.setSequence( "AAAAAAAAAATTTTTGGGGGGGGGG" )
+        self._tsA.insert( chr )
+        
+        obs = self._tsA.isAccessionInTable( "chr1" )
+        self.assertFalse( obs )
+        
+        
+test_suite = unittest.TestSuite()
+test_suite.addTest( unittest.makeSuite( Test_TableSeqAdaptator ) )
+if __name__ == "__main__":
+    unittest.TextTestRunner(verbosity=2).run( test_suite )