comparison commons/core/sql/test/Test_TableSeqAdaptator.py @ 6:769e306b7933

Change the repository level.
author yufei-luo
date Fri, 18 Jan 2013 04:54:14 -0500
parents
children
comparison
equal deleted inserted replaced
5:ea3082881bf8 6:769e306b7933
1 # Copyright INRA (Institut National de la Recherche Agronomique)
2 # http://www.inra.fr
3 # http://urgi.versailles.inra.fr
4 #
5 # This software is governed by the CeCILL license under French law and
6 # abiding by the rules of distribution of free software. You can use,
7 # modify and/ or redistribute the software under the terms of the CeCILL
8 # license as circulated by CEA, CNRS and INRIA at the following URL
9 # "http://www.cecill.info".
10 #
11 # As a counterpart to the access to the source code and rights to copy,
12 # modify and redistribute granted by the license, users are provided only
13 # with a limited warranty and the software's author, the holder of the
14 # economic rights, and the successive licensors have only limited
15 # liability.
16 #
17 # In this respect, the user's attention is drawn to the risks associated
18 # with loading, using, modifying and/or developing or reproducing the
19 # software by the user in light of its specific status of free software,
20 # that may mean that it is complicated to manipulate, and that also
21 # therefore means that it is reserved for developers and experienced
22 # professionals having in-depth computer knowledge. Users are therefore
23 # encouraged to load and test the software's suitability as regards their
24 # requirements in conditions enabling the security of their systems and/or
25 # data to be ensured and, more generally, to use and operate it in the
26 # same conditions as regards security.
27 #
28 # The fact that you are presently reading this means that you have had
29 # knowledge of the CeCILL license and that you accept its terms.
30
31
32 import unittest
33 import os
34 import time
35 from commons.core.sql.DbMySql import DbMySql
36 from commons.core.sql.TableSeqAdaptator import TableSeqAdaptator
37 from commons.core.seq.Bioseq import Bioseq
38 from commons.core.coord.Set import Set
39 from commons.core.utils.FileUtils import FileUtils
40
41
42 class Test_TableSeqAdaptator( unittest.TestCase ):
43
44 def setUp( self ):
45 self._uniqId = "%s_%s" % ( time.strftime("%Y%m%d%H%M%S") , os.getpid() )
46 self.fileUtils = FileUtils()
47 self._configFileName = "dummyConfigFile_%s" % ( self._uniqId )
48 configF = open(self._configFileName, "w" )
49 configF.write( "[repet_env]\n" )
50 configF.write( "repet_host: %s\n" % ( os.environ["REPET_HOST"] ) )
51 configF.write( "repet_user: %s\n" % ( os.environ["REPET_USER"] ) )
52 configF.write( "repet_pw: %s\n" % ( os.environ["REPET_PW"] ) )
53 configF.write( "repet_db: %s\n" % ( os.environ["REPET_DB"] ) )
54 configF.write( "repet_port: %s\n" % ( os.environ["REPET_PORT"] ) )
55 configF.close()
56 self._db = DbMySql( cfgFileName=self._configFileName )
57 self._table = "dummySeqTable_%s" % ( self._uniqId )
58 self._tsA = TableSeqAdaptator( self._db, self._table )
59
60
61 def tearDown( self ):
62 self._db.dropTable( self._table )
63 self._db.close()
64 os.remove( self._configFileName )
65 self._configFileName = ""
66
67
68 ##################################################################################
69 ################## Tests for methods in ITableSeqAdaptator #######################
70 ##################################################################################
71
72 def test_insert( self ):
73 bs = Bioseq( "seq1", "AGCGATGACGATGCGAGT" )
74 self._db.createTable( self._table, "fasta" )
75 self._tsA.insert( bs )
76
77 expBioseqTuple = (("seq1", "AGCGATGACGATGCGAGT", "seq1", 18L), )
78
79 sqlCmd = "SELECT * FROM %s" % ( self._table )
80 self._db.execute( sqlCmd )
81 obsBioseqTuple = self._db.cursor.fetchall()
82
83 self.assertEqual( expBioseqTuple, obsBioseqTuple )
84
85
86 def test_insertList( self ):
87 bs1 = Bioseq( "seq1 desc", "AGCGATGACGATGCGAGT" )
88 bs2 = Bioseq( "seq2", "AGCGATGACGATGCGAGT")
89 bs3 = Bioseq( "seq3", "GCGATGCAGATGACGGCGGATGC")
90 lBioseq = [ bs1, bs2, bs3 ]
91 self._db.createTable( self._table, "fasta" )
92 self._tsA.insertList( lBioseq )
93
94 tuple1 = ("seq1", "AGCGATGACGATGCGAGT", "seq1 desc", 18L)
95 tuple2 = ("seq2", "AGCGATGACGATGCGAGT", "seq2", 18L)
96 tuple3 = ("seq3", "GCGATGCAGATGACGGCGGATGC", "seq3", 23L)
97 expBioseqTuple = ( tuple1, tuple2, tuple3 )
98
99 sqlCmd = "SELECT * FROM %s" % ( self._table )
100 self._db.execute( sqlCmd )
101 obsBioseqTuple = self._db.cursor.fetchall()
102
103 self.assertEquals(expBioseqTuple, obsBioseqTuple)
104
105
106 def test_getAccessionsList(self):
107 faFileName = "dummyFaFile_%s" % ( self._uniqId )
108 faF = open( faFileName, "w" )
109 faF.write(">seq1\n")
110 faF.write("AGCGATGACGATGCGAGT\n")
111 faF.write(">seq2\n")
112 faF.write("GCGATGCAGATGACGGCGGATGC\n")
113 faF.close()
114 self._db.createTable( self._table, "fasta", faFileName )
115 lExp = [ "seq1", "seq2" ]
116 lExp.sort()
117 lObs = self._tsA.getAccessionsList()
118 lObs.sort()
119 self.assertEqual( lObs, lExp )
120 os.remove( faFileName )
121
122
123 def test_saveAccessionsListInFastaFile(self):
124 expFileName = "dummyFaFile_%s" % ( self._uniqId )
125 expF = open( expFileName, "w" )
126 expF.write(">seq1\n")
127 expF.write("AGCGATGACGATGCGAGT\n")
128 expF.write(">seq2\n")
129 expF.write("GCGATGCAGATGACGGCGGATGC\n")
130 expF.close()
131 self._db.createTable( self._table, "fasta", expFileName )
132 lAccessions = [ "seq1", "seq2" ]
133 obsFileName = "dummyObsFile_%s" % ( self._uniqId )
134 self._tsA.saveAccessionsListInFastaFile( lAccessions, obsFileName )
135 self.assertTrue( self.fileUtils.are2FilesIdentical( obsFileName, expFileName ) )
136 os.remove( expFileName )
137 os.remove( obsFileName )
138
139 def test_exportInFastaFile(self):
140 expFileName = "dummyFaFile_%s" % ( self._uniqId )
141 faF = open( expFileName, "w" )
142 faF.write(">seq1\n")
143 faF.write("AGCGATGACGATGCGAGT\n")
144 faF.write(">seq2\n")
145 faF.write("GCGATGCAGATGACGGCGGATGC\n")
146 faF.close()
147 self._db.createTable( self._table, "fasta", expFileName )
148 obsFileName = "dummyFaFileObs_%s" % ( self._uniqId )
149 self._tsA.exportInFastaFile( obsFileName )
150 self.assertTrue( self.fileUtils.are2FilesIdentical( obsFileName, expFileName ) )
151 os.remove( expFileName )
152 os.remove( obsFileName )
153
154 ##################################################################################
155 ########################### Tests for other methods ##############################
156 ##################################################################################
157
158 def test_insertWithBioseqEmpty( self ):
159 bs = Bioseq( "", "" )
160 self._db.createTable( self._table, "fasta" )
161 exp = None
162 obs = self._tsA.insert(bs)
163 self.assertEqual( exp, obs )
164
165
166 def test_getBioseqFromHeader( self ):
167 faFileName = "dummyFaFile_%s" % ( self._uniqId )
168 faF = open( faFileName, "w" )
169 faF.write(">seq1\n")
170 faF.write("AGCGATGACGATGCGAGT\n")
171 faF.write(">seq2\n")
172 faF.write("GCGATGCAGATGACGGCGGATGC\n")
173 faF.close()
174 self._db.createTable( self._table, "fasta", faFileName )
175 exp = Bioseq( "seq1", "AGCGATGACGATGCGAGT" )
176 obs = self._tsA.getBioseqFromHeader( "seq1" )
177 self.assertEqual( obs, exp )
178 exp = Bioseq( "seq2", "GCGATGCAGATGACGGCGGATGC" )
179 obs = self._tsA.getBioseqFromHeader( "seq2" )
180 self.assertEqual( obs, exp )
181 os.remove( faFileName )
182
183
184 def test_getSeqLengthFromAccession( self ):
185 inFileName = "dummyFaFile_%s" % ( self._uniqId )
186 inF = open( inFileName, "w" )
187 inF.write(">seq1\n")
188 inF.write("AGCGATGACGATGCGAGT\n")
189 inF.write(">seq2\n")
190 inF.write("GCGATGCAGATGACGGCGGATGC\n")
191 inF.close()
192 self._db.createTable( self._table, "fasta", inFileName )
193 exp = 18
194 obs = self._tsA.getSeqLengthFromAccession( "seq1" )
195 self.assertEqual( obs, exp )
196 os.remove( inFileName )
197
198
199 def test_getSeqLengthFromDescription( self ):
200 inFileName = "dummyFaFile_%s" % ( self._uniqId )
201 inF = open( inFileName, "w" )
202 inF.write(">seq1 descriptionfield\n")
203 inF.write("AGCGATGACGATGCGAGT\n")
204 inF.write(">seq2 descriptionfield\n")
205 inF.write("GCGATGCAGATGACGGCGGATGC\n")
206 inF.close()
207 self._db.createTable( self._table, "fasta", inFileName )
208 exp = 18
209 obs = self._tsA.getSeqLengthFromDescription( "seq1 descriptionfield" )
210 self.assertEqual( obs, exp )
211 os.remove( inFileName )
212
213
214 def test_getAccessionAndLengthList( self ):
215 inFileName = "dummyFaFile_%s" % ( self._uniqId )
216 inF = open( inFileName, "w" )
217 inF.write(">seq1\n")
218 inF.write("AGCGATGACGATGCGAGT\n")
219 inF.write(">seq2\n")
220 inF.write("GCGATGCAGATGACGGCGGATGC\n")
221 inF.close()
222 self._db.createTable( self._table, "fasta", inFileName )
223 lSeq1 = ("seq1", 18)
224 lSeq2 = ("seq2", 23)
225 lExp = [lSeq1,lSeq2]
226 lObs = self._tsA.getAccessionAndLengthList()
227 self.assertEqual( lObs, lExp )
228 os.remove( inFileName )
229
230
231 def test_getSeqLengthFromAccessionWithSingleQuote( self ):
232 inFileName = "dummyFaFile_%s" % ( self._uniqId )
233 inF = open( inFileName, "w" )
234 inF.write(">seq1'\n")
235 inF.write("AGCGATGACGATGCGAGT\n")
236 inF.write(">seq2\n")
237 inF.write("GCGATGCAGATGACGGCGGATGC\n")
238 inF.close()
239 self._db.createTable( self._table, "fasta", inFileName )
240 exp = 18
241 obs = self._tsA.getSeqLengthFromAccession( "seq1'" )
242 self.assertEqual( obs, exp )
243 os.remove( inFileName )
244
245
246 def test_getSubSequence_directStrand( self ):
247 self._db.createTable( self._table, "seq" )
248 chr = Bioseq()
249 chr.setHeader( "chr2" )
250 chr.setSequence( "AAAAAAAAAATTTTTGGGGGGGGGG" )
251 self._tsA.insert( chr )
252 exp = "TTTGGG"
253 obs = self._tsA.getSubSequence( "chr2", 13, 18 )
254 self.assertEqual( exp, obs )
255
256
257 def test_getSubSequence_reverseStrand( self ):
258 self._db.createTable( self._table, "seq" )
259 chr = Bioseq()
260 chr.setHeader( "chr2" )
261 chr.setSequence( "AAAAAAAAAATTTTTGGGGGGGGGG" )
262 self._tsA.insert( chr )
263 exp = "CCCAAA"
264 obs = self._tsA.getSubSequence( "chr2", 18, 13 )
265 self.assertEqual( exp, obs )
266
267
268 def test_getBioseqFromSetList_directStrand( self ):
269 self._db.createTable( self._table, "seq" )
270 chr = Bioseq()
271 chr.setHeader( "chr2" )
272 chr.setSequence( "AAAAAAAAAATTTTTGGGGGGGGGG" )
273 self._tsA.insert( chr )
274 lSets = []
275 lSets.append( Set( 3, "Dm-B-G600-Map3_classI-LTR-incomp", "chr2", 1, 10 ) )
276 lSets.append( Set( 3, "Dm-B-G600-Map3_classI-LTR-incomp", "chr2", 16, 25 ) )
277 exp = Bioseq( "Dm-B-G600-Map3_classI-LTR-incomp::3 chr2 1..10,16..25", "AAAAAAAAAAGGGGGGGGGG" )
278 obs = self._tsA.getBioseqFromSetList( lSets )
279 self.assertEqual( exp, obs )
280
281
282 def test_getBioseqFromSetList_reverseStrand( self ):
283 self._db.createTable( self._table, "seq" )
284 chr = Bioseq()
285 chr.setHeader( "chr2" )
286 chr.setSequence( "AAAAAAAAAATTTTTGGGGGGGGGG" )
287 self._tsA.insert( chr )
288 lSets = []
289 lSets.append( Set( 3, "Dm-B-G600-Map3_classI-LTR-incomp", "chr2", 10, 1 ) )
290 lSets.append( Set( 3, "Dm-B-G600-Map3_classI-LTR-incomp", "chr2", 25, 16 ) )
291 exp = Bioseq( "Dm-B-G600-Map3_classI-LTR-incomp::3 chr2 25..16,10..1", "CCCCCCCCCCTTTTTTTTTT" )
292 obs = self._tsA.getBioseqFromSetList( lSets )
293 self.assertEqual( exp, obs )
294
295
296 def test_isAccessionInTable_true( self ):
297 self._db.createTable( self._table, "seq" )
298 chr = Bioseq()
299 chr.setHeader( "chr2" )
300 chr.setSequence( "AAAAAAAAAATTTTTGGGGGGGGGG" )
301 self._tsA.insert( chr )
302
303 obs = self._tsA.isAccessionInTable( "chr2" )
304 self.assertTrue( obs )
305
306
307 def test_isAccessionInTable_false( self ):
308 self._db.createTable( self._table, "seq" )
309 chr = Bioseq()
310 chr.setHeader( "chr2" )
311 chr.setSequence( "AAAAAAAAAATTTTTGGGGGGGGGG" )
312 self._tsA.insert( chr )
313
314 obs = self._tsA.isAccessionInTable( "chr1" )
315 self.assertFalse( obs )
316
317
318 test_suite = unittest.TestSuite()
319 test_suite.addTest( unittest.makeSuite( Test_TableSeqAdaptator ) )
320 if __name__ == "__main__":
321 unittest.TextTestRunner(verbosity=2).run( test_suite )