diff commons/core/sql/test/Test_TableSetAdaptator.py @ 6:769e306b7933

Change the repository level.
author yufei-luo
date Fri, 18 Jan 2013 04:54:14 -0500
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/commons/core/sql/test/Test_TableSetAdaptator.py	Fri Jan 18 04:54:14 2013 -0500
@@ -0,0 +1,330 @@
+# Copyright INRA (Institut National de la Recherche Agronomique)
+# http://www.inra.fr
+# http://urgi.versailles.inra.fr
+#
+# This software is governed by the CeCILL license under French law and
+# abiding by the rules of distribution of free software.  You can  use, 
+# modify and/ or redistribute the software under the terms of the CeCILL
+# license as circulated by CEA, CNRS and INRIA at the following URL
+# "http://www.cecill.info". 
+#
+# As a counterpart to the access to the source code and  rights to copy,
+# modify and redistribute granted by the license, users are provided only
+# with a limited warranty  and the software's author,  the holder of the
+# economic rights,  and the successive licensors  have only  limited
+# liability. 
+#
+# In this respect, the user's attention is drawn to the risks associated
+# with loading,  using,  modifying and/or developing or reproducing the
+# software by the user in light of its specific status of free software,
+# that may mean  that it is complicated to manipulate,  and  that  also
+# therefore means  that it is reserved for developers  and  experienced
+# professionals having in-depth computer knowledge. Users are therefore
+# encouraged to load and test the software's suitability as regards their
+# requirements in conditions enabling the security of their systems and/or 
+# data to be ensured and,  more generally, to use and operate it in the 
+# same conditions as regards security. 
+#
+# The fact that you are presently reading this means that you have had
+# knowledge of the CeCILL license and that you accept its terms.
+
+
+import unittest
+import time
+import os
+from commons.core.sql.TableSetAdaptator import TableSetAdaptator
+from commons.core.sql.DbMySql import DbMySql
+from commons.core.coord.Set import Set
+
+
+class Test_TableSetAdaptator( unittest.TestCase ):
+
+    def setUp( self ):
+        self._uniqId = "%s_%s" % ( time.strftime("%Y%m%d%H%M%S") , os.getpid() )
+        self._configFileName = "dummyConfigFile_%s" % ( self._uniqId )
+        configF = open(self._configFileName, "w" )
+        configF.write( "[repet_env]\n" )
+        configF.write( "repet_host: %s\n" % ( os.environ["REPET_HOST"] ) )
+        configF.write( "repet_user: %s\n" % ( os.environ["REPET_USER"] ) )
+        configF.write( "repet_pw: %s\n" % ( os.environ["REPET_PW"] ) )
+        configF.write( "repet_db: %s\n" % ( os.environ["REPET_DB"] ) )
+        configF.write( "repet_port: %s\n" % ( os.environ["REPET_PORT"] ) )
+        configF.close()
+        self._iDb = DbMySql( cfgFileName=self._configFileName )
+        self._table = "dummySetTable_%s" % ( self._uniqId )
+        self._tSetA = TableSetAdaptator( self._iDb, self._table )
+                
+    def tearDown( self ):
+        self._uniqId = None
+        self._iDb.dropTable( self._table )
+        self._iDb.close()
+        self._table = None
+        self._tSetA = None
+        os.remove( self._configFileName )
+        self._configFileName = ""
+
+    def test_insert(self):
+        set2Insert = Set()
+        set2Insert.id = 1
+        set2Insert.name = "name1"
+        set2Insert.seqname = "name2"
+        set2Insert.start = 1L
+        set2Insert.end = 50L
+        self._iDb.createTable( self._table, "set", "" )
+        self._tSetA.insert( set2Insert, False )
+        sqlCmd = "SELECT * FROM %s" % ( self._table )
+        self._iDb.execute( sqlCmd )
+        expTsetTuple = ((1, "name1", "name2", 1L, 50L),)
+        obsTsetTuples = self._iDb.cursor.fetchall()
+        self.assertEquals(expTsetTuple, obsTsetTuples )
+    
+    def test_insertList ( self ):
+        self._iDb.createTable( self._table, "set", "" )
+        set1 = Set()
+        set1.setFromString( "1\tname1\tdesc1\t1\t120\n" )
+        set2 = Set()
+        set2.setFromString( "2\tname2\tdesc2\t1\t20\n" )
+        lset = [ set1, set2 ]
+        self._tSetA.insertList( lset )
+        sqlCmd = "SELECT * FROM %s" % ( self._table )
+        self._iDb.execute( sqlCmd )
+        expTsetTuple = ((1, "name1", "desc1", 1, 120), (2, "name2", "desc2", 1, 20))
+        obsTsetTuples = self._iDb.cursor.fetchall()
+        self.assertEqual( expTsetTuple, obsTsetTuples )
+
+    def test_getIdList(self):
+        set2Insert = Set()
+        set2Insert.id = 1
+        set2Insert.name = "name1"
+        set2Insert.seqname = "name2"
+        set2Insert.start = 1
+        set2Insert.end = 50
+        self._iDb.createTable( self._table, "set", "" )
+        self._tSetA.insert( set2Insert )
+        l = self._tSetA.getIdList()
+        self.assertEquals( set2Insert.id, l[0] )
+        
+    def test_getSeqNameList(self):
+        self._iDb.createTable( self._table, "set", "" )
+        set1 = Set()
+        set1.setFromString( "1\tname1\tdesc1\t1\t120\n" )
+        set2 = Set()
+        set2.setFromString( "2\tname2\tdesc2\t1\t20\n" )
+        set3 = Set()
+        set3.setFromString( "2\tname2\tdesc2\t1\t50\n" )
+        for m in [ set1, set2, set3 ]: self._tSetA.insert( m )
+        lExp = ["desc1", "desc2"]
+        lObs = self._tSetA.getSeqNameList()
+        self.assertEqual( lObs, lExp )
+        
+    def test_getSetListFromSeqName(self):
+        self._iDb.createTable( self._table, "set", "" )
+        set1 = Set()
+        set1.setFromString( "1\tname1\tdesc1\t1\t120\n" )
+        set2 = Set()
+        set2.setFromString( "2\tname2\tdesc2\t1\t20\n" )
+        set3 = Set()
+        set3.setFromString( "3\tname2\tdesc2\t1\t50\n" )
+        for m in [ set1, set2, set3 ]: self._tSetA.insert( m )
+        explSet = [Set( 2,"name2", "desc2", 1, 20), Set( 3,"name2", "desc2", 1, 50)]
+        obslSet = self._tSetA.getSetListFromSeqName("desc2")
+        self.assertEqual( explSet, obslSet )
+
+    def test_getSetListFromId(self):
+        self._iDb.createTable( self._table, "set", "" )
+        set1 = Set()
+        set1.setFromString( "1\tname1\tdesc1\t1\t120\n" )
+        set2 = Set()
+        set2.setFromString( "2\tname2\tdesc2\t1\t20\n" )
+        lset = [ set1, set2 ]
+        self._tSetA.insertList( lset )
+        explSet = [Set( 2,"name2", "desc2", 1, 20)]
+        obslSet = self._tSetA.getSetListFromId(2)
+        self.assertEqual( explSet, obslSet )
+      
+    def test_getSetListFromIdList(self):
+        self._iDb.createTable( self._table, "set", "" )
+        set1 = Set()
+        set1.setFromString( "1\tname1\tdesc1\t1\t120\n" )
+        set2 = Set()
+        set2.setFromString( "2\tname2\tdesc2\t1\t20\n" )
+        lset = [ set1, set2 ]
+        self._tSetA.insertList( lset )
+        explSet = [Set( 1, "name1", "desc1", 1, 120), Set( 2,"name2", "desc2", 1, 20)]
+        lId = [1, 2]
+        obslSet = self._tSetA.getSetListFromIdList(lId)
+        self.assertEqual( explSet, obslSet )
+     
+    def test_getSetListFromIdList_emptyList(self):
+        self._iDb.createTable( self._table, "set", "" )
+        set1 = Set()
+        set1.setFromString( "1\tname1\tdesc1\t1\t120\n" )
+        set2 = Set()
+        set2.setFromString( "2\tname2\tdesc2\t1\t20\n" )
+        lset = [ set1, set2 ]
+        self._tSetA.insertList( lset )
+        explSet = []
+        lId = []
+        obslSet = self._tSetA.getSetListFromIdList(lId)
+        self.assertEqual( explSet, obslSet )
+     
+    def test_getSetListOverlappingCoord(self):
+        self._iDb.createTable( self._table, "set", "" )
+        set1 = Set()
+        set1.setFromString( "1\tname1\tdesc2\t1\t120\n" )
+        set2 = Set()
+        set2.setFromString( "2\tname2\tdesc2\t1\t20\n" )
+        lset = [ set1, set2 ]
+        self._tSetA.insertList( lset )
+        explSet = [Set( 1,"name1", "desc2", 1, 120)]
+        obslSet = self._tSetA.getSetListOverlappingCoord("desc2", 30, 70)
+        self.assertEqual( explSet, obslSet )
+      
+    def test_deleteFromId(self):
+        self._iDb.createTable( self._table, "set", "" )
+        set1 = Set()
+        set1.setFromString( "1\tname1\tdesc1\t1\t120\n" )
+        set2 = Set()
+        set2.setFromString( "2\tname2\tdesc2\t1\t20\n" )
+        set3 = Set()
+        set3.setFromString( "3\tname2\tdesc3\t1\t50\n" )
+        for m in [ set1, set2, set3 ]: self._tSetA.insert( m )
+        self._tSetA.deleteFromId(1)
+        expTSetTuples = (( 2,"name2", "desc2", 1, 20), ( 3,"name2", "desc3", 1, 50))
+        sqlCmd = "SELECT * FROM %s" % ( self._table )
+        self._iDb.execute( sqlCmd )
+        obsTsetTuples = self._iDb.cursor.fetchall()
+        
+        self.assertEqual( expTSetTuples, obsTsetTuples )
+  
+    def test_deleteFromIdList(self):
+        self._iDb.createTable( self._table, "set", "" )
+        set1 = Set()
+        set1.setFromString( "1\tname1\tdesc1\t1\t120\n" )
+        set2 = Set()
+        set2.setFromString( "2\tname2\tdesc2\t1\t20\n" )
+        set3 = Set()
+        set3.setFromString( "3\tname2\tdesc3\t1\t50\n" )
+        for m in [ set1, set2, set3 ]: self._tSetA.insert( m )
+        lId2del = [1, 2]
+        self._tSetA.deleteFromIdList(lId2del)
+        expTSetTuples = (( 3,"name2", "desc3", 1, 50),)
+        sqlCmd = "SELECT * FROM %s" % ( self._table )
+        self._iDb.execute( sqlCmd )
+        obsTsetTuples = self._iDb.cursor.fetchall()
+        
+        self.assertEqual( expTSetTuples, obsTsetTuples )
+    
+    def test_deleteFromIdListWithEmptyList(self):
+        self._iDb.createTable( self._table, "set", "" )
+        set1 = Set()
+        set1.setFromString( "1\tname1\tdesc1\t1\t120\n" )
+        set2 = Set()
+        set2.setFromString( "2\tname2\tdesc2\t1\t20\n" )
+        set3 = Set()
+        set3.setFromString( "3\tname2\tdesc3\t1\t50\n" )
+        for m in [ set1, set2, set3 ]: self._tSetA.insert( m )
+        lId2del = []
+        self._tSetA.deleteFromIdList(lId2del)
+        expTSetTuples = ((1L, 'name1', 'desc1', 1L, 120L), (2L, 'name2', 'desc2', 1L, 20L), (3L, 'name2', 'desc3', 1L, 50L))
+        sqlCmd = "SELECT * FROM %s" % ( self._table )
+        self._iDb.execute( sqlCmd )
+        obsTsetTuples = self._iDb.cursor.fetchall()
+        
+        self.assertEqual( expTSetTuples, obsTsetTuples )
+     
+    def test_joinTwoSets(self):
+        self._iDb.createTable( self._table, "set", "" )
+        idSet1 = 5
+        set1 = Set()
+        set1.setFromString( "5\tname1\tdesc1\t1\t120\n" ) 
+        idSet2 = 2
+        set2 = Set()
+        set2.setFromString( "2\tname2\tdesc2\t1\t20\n" )
+        lset = [ set1, set2 ]
+        self._tSetA.insertList( lset )
+        self._tSetA.joinTwoSets(idSet1, idSet2)
+        sqlCmd = "SELECT * FROM %s" % ( self._table )
+        self._iDb.execute( sqlCmd )
+        
+        expTSetTuples = ((2L, "name1", "desc1", 1L, 120L ), (2L, "name2", "desc2", 1L, 20L ))
+        obsTSetTuples = self._iDb.cursor.fetchall()
+        
+        self.assertEqual( expTSetTuples, obsTSetTuples)
+        self._iDb.dropTable(self._table)
+     
+    def test_joinTwoSetsWhereId1InfId2(self):
+        self._iDb.createTable( self._table, "set", "" )
+        idSet1 = 2
+        set1 = Set()
+        set1.setFromString( "5\tname1\tdesc1\t1\t120\n" ) 
+        
+        idSet2 = 5
+        set2 = Set()
+        set2.setFromString( "2\tname2\tdesc2\t1\t20\n" )
+        
+        lset = [ set1, set2 ]
+        self._tSetA.insertList( lset )
+
+        self._tSetA.joinTwoSets(idSet1, idSet2)
+        
+        sqlCmd = "SELECT * FROM %s" % ( self._table )
+        self._iDb.execute( sqlCmd )
+        
+        expTSetTuples = ((2L, "name1", "desc1", 1L, 120L ), (2L, "name2", "desc2", 1L, 20L ))
+        obsTSetTuples = self._iDb.cursor.fetchall()
+        
+        self.assertEqual( expTSetTuples, obsTSetTuples)
+        self._iDb.dropTable(self._table)
+     
+    def test_getNewId(self):
+        self._iDb.createTable( self._table, "set", "" )
+        set1 = Set()
+        set1.setFromString( "1\tname1\tdesc1\t1\t120\n" ) 
+        set2 = Set()
+        set2.setFromString( "2\tname2\tdesc2\t1\t20\n" )
+        set3 = Set()
+        set3.setFromString( "5\tname1\tdesc1\t1\t120\n" ) 
+        set4 = Set()
+        set4.setFromString( "8\tname2\tdesc2\t1\t20\n" )
+        lset = [ set1, set2, set3, set4 ]
+        self._tSetA.insertList( lset )
+        expId = 9
+        obsId = self._tSetA.getNewId()
+        self.assertEqual( expId, obsId)
+        self._iDb.dropTable(self._table)
+     
+    def test_getNewId_set_null(self):
+        self._iDb.createTable( self._table, "set", "" )
+        set1 = Set()
+        lset = [ set1 ]
+        self._tSetA.insertList( lset )
+        expId = 1
+        obsId = self._tSetA.getNewId()
+        self.assertEqual( expId, obsId)
+        self._iDb.dropTable(self._table)  
+        
+    def test_getListOfAllSets( self ):
+        self._iDb.createTable( self._table, "set" )
+        s1 = Set()
+        s1.setFromString( "1\tchr1\tTE3\t1\t10\n" )
+        s2a = Set()
+        s2a.setFromString( "2\tchr1\tTE2\t2\t9\n" )
+        s2b = Set()
+        s2b.setFromString( "2\tchr1\tTE2\t12\t19\n" )
+        lSets = [ s1, s2a, s2b ]
+        self._tSetA.insertList( lSets )
+        expLSets = [ s1, s2a, s2b ]
+        obsLSets = self._tSetA.getListOfAllSets()
+        self.assertEqual( expLSets, obsLSets )
+        
+    def test_getListOfAllSets_empty_table( self ):
+        self._iDb.createTable( self._table, "set" )
+        expList = []
+        obsList = self._tSetA.getListOfAllSets()
+        self.assertEqual( expList, obsList )     
+        
+test_suite = unittest.TestSuite()
+test_suite.addTest( unittest.makeSuite( Test_TableSetAdaptator ) )       
+if __name__ == "__main__":
+    unittest.TextTestRunner(verbosity=2).run( test_suite )