6
|
1 # Copyright INRA (Institut National de la Recherche Agronomique)
|
|
2 # http://www.inra.fr
|
|
3 # http://urgi.versailles.inra.fr
|
|
4 #
|
|
5 # This software is governed by the CeCILL license under French law and
|
|
6 # abiding by the rules of distribution of free software. You can use,
|
|
7 # modify and/ or redistribute the software under the terms of the CeCILL
|
|
8 # license as circulated by CEA, CNRS and INRIA at the following URL
|
|
9 # "http://www.cecill.info".
|
|
10 #
|
|
11 # As a counterpart to the access to the source code and rights to copy,
|
|
12 # modify and redistribute granted by the license, users are provided only
|
|
13 # with a limited warranty and the software's author, the holder of the
|
|
14 # economic rights, and the successive licensors have only limited
|
|
15 # liability.
|
|
16 #
|
|
17 # In this respect, the user's attention is drawn to the risks associated
|
|
18 # with loading, using, modifying and/or developing or reproducing the
|
|
19 # software by the user in light of its specific status of free software,
|
|
20 # that may mean that it is complicated to manipulate, and that also
|
|
21 # therefore means that it is reserved for developers and experienced
|
|
22 # professionals having in-depth computer knowledge. Users are therefore
|
|
23 # encouraged to load and test the software's suitability as regards their
|
|
24 # requirements in conditions enabling the security of their systems and/or
|
|
25 # data to be ensured and, more generally, to use and operate it in the
|
|
26 # same conditions as regards security.
|
|
27 #
|
|
28 # The fact that you are presently reading this means that you have had
|
|
29 # knowledge of the CeCILL license and that you accept its terms.
|
|
30
|
|
31
|
|
32 import unittest
|
|
33 import os
|
|
34 import time
|
|
35 from commons.core.sql.DbMySql import DbMySql
|
|
36 from commons.core.sql.TableSeqAdaptator import TableSeqAdaptator
|
|
37 from commons.core.seq.Bioseq import Bioseq
|
|
38 from commons.core.coord.Set import Set
|
|
39 from commons.core.utils.FileUtils import FileUtils
|
|
40
|
|
41
|
|
42 class Test_TableSeqAdaptator( unittest.TestCase ):
|
|
43
|
|
44 def setUp( self ):
|
|
45 self._uniqId = "%s_%s" % ( time.strftime("%Y%m%d%H%M%S") , os.getpid() )
|
|
46 self.fileUtils = FileUtils()
|
|
47 self._configFileName = "dummyConfigFile_%s" % ( self._uniqId )
|
|
48 configF = open(self._configFileName, "w" )
|
|
49 configF.write( "[repet_env]\n" )
|
|
50 configF.write( "repet_host: %s\n" % ( os.environ["REPET_HOST"] ) )
|
|
51 configF.write( "repet_user: %s\n" % ( os.environ["REPET_USER"] ) )
|
|
52 configF.write( "repet_pw: %s\n" % ( os.environ["REPET_PW"] ) )
|
|
53 configF.write( "repet_db: %s\n" % ( os.environ["REPET_DB"] ) )
|
|
54 configF.write( "repet_port: %s\n" % ( os.environ["REPET_PORT"] ) )
|
|
55 configF.close()
|
|
56 self._db = DbMySql( cfgFileName=self._configFileName )
|
|
57 self._table = "dummySeqTable_%s" % ( self._uniqId )
|
|
58 self._tsA = TableSeqAdaptator( self._db, self._table )
|
|
59
|
|
60
|
|
61 def tearDown( self ):
|
|
62 self._db.dropTable( self._table )
|
|
63 self._db.close()
|
|
64 os.remove( self._configFileName )
|
|
65 self._configFileName = ""
|
|
66
|
|
67
|
|
68 ##################################################################################
|
|
69 ################## Tests for methods in ITableSeqAdaptator #######################
|
|
70 ##################################################################################
|
|
71
|
|
72 def test_insert( self ):
|
|
73 bs = Bioseq( "seq1", "AGCGATGACGATGCGAGT" )
|
|
74 self._db.createTable( self._table, "fasta" )
|
|
75 self._tsA.insert( bs )
|
|
76
|
|
77 expBioseqTuple = (("seq1", "AGCGATGACGATGCGAGT", "seq1", 18L), )
|
|
78
|
|
79 sqlCmd = "SELECT * FROM %s" % ( self._table )
|
|
80 self._db.execute( sqlCmd )
|
|
81 obsBioseqTuple = self._db.cursor.fetchall()
|
|
82
|
|
83 self.assertEqual( expBioseqTuple, obsBioseqTuple )
|
|
84
|
|
85
|
|
86 def test_insertList( self ):
|
|
87 bs1 = Bioseq( "seq1 desc", "AGCGATGACGATGCGAGT" )
|
|
88 bs2 = Bioseq( "seq2", "AGCGATGACGATGCGAGT")
|
|
89 bs3 = Bioseq( "seq3", "GCGATGCAGATGACGGCGGATGC")
|
|
90 lBioseq = [ bs1, bs2, bs3 ]
|
|
91 self._db.createTable( self._table, "fasta" )
|
|
92 self._tsA.insertList( lBioseq )
|
|
93
|
|
94 tuple1 = ("seq1", "AGCGATGACGATGCGAGT", "seq1 desc", 18L)
|
|
95 tuple2 = ("seq2", "AGCGATGACGATGCGAGT", "seq2", 18L)
|
|
96 tuple3 = ("seq3", "GCGATGCAGATGACGGCGGATGC", "seq3", 23L)
|
|
97 expBioseqTuple = ( tuple1, tuple2, tuple3 )
|
|
98
|
|
99 sqlCmd = "SELECT * FROM %s" % ( self._table )
|
|
100 self._db.execute( sqlCmd )
|
|
101 obsBioseqTuple = self._db.cursor.fetchall()
|
|
102
|
|
103 self.assertEquals(expBioseqTuple, obsBioseqTuple)
|
|
104
|
|
105
|
|
106 def test_getAccessionsList(self):
|
|
107 faFileName = "dummyFaFile_%s" % ( self._uniqId )
|
|
108 faF = open( faFileName, "w" )
|
|
109 faF.write(">seq1\n")
|
|
110 faF.write("AGCGATGACGATGCGAGT\n")
|
|
111 faF.write(">seq2\n")
|
|
112 faF.write("GCGATGCAGATGACGGCGGATGC\n")
|
|
113 faF.close()
|
|
114 self._db.createTable( self._table, "fasta", faFileName )
|
|
115 lExp = [ "seq1", "seq2" ]
|
|
116 lExp.sort()
|
|
117 lObs = self._tsA.getAccessionsList()
|
|
118 lObs.sort()
|
|
119 self.assertEqual( lObs, lExp )
|
|
120 os.remove( faFileName )
|
|
121
|
|
122
|
|
123 def test_saveAccessionsListInFastaFile(self):
|
|
124 expFileName = "dummyFaFile_%s" % ( self._uniqId )
|
|
125 expF = open( expFileName, "w" )
|
|
126 expF.write(">seq1\n")
|
|
127 expF.write("AGCGATGACGATGCGAGT\n")
|
|
128 expF.write(">seq2\n")
|
|
129 expF.write("GCGATGCAGATGACGGCGGATGC\n")
|
|
130 expF.close()
|
|
131 self._db.createTable( self._table, "fasta", expFileName )
|
|
132 lAccessions = [ "seq1", "seq2" ]
|
|
133 obsFileName = "dummyObsFile_%s" % ( self._uniqId )
|
|
134 self._tsA.saveAccessionsListInFastaFile( lAccessions, obsFileName )
|
|
135 self.assertTrue( self.fileUtils.are2FilesIdentical( obsFileName, expFileName ) )
|
|
136 os.remove( expFileName )
|
|
137 os.remove( obsFileName )
|
|
138
|
|
139 def test_exportInFastaFile(self):
|
|
140 expFileName = "dummyFaFile_%s" % ( self._uniqId )
|
|
141 faF = open( expFileName, "w" )
|
|
142 faF.write(">seq1\n")
|
|
143 faF.write("AGCGATGACGATGCGAGT\n")
|
|
144 faF.write(">seq2\n")
|
|
145 faF.write("GCGATGCAGATGACGGCGGATGC\n")
|
|
146 faF.close()
|
|
147 self._db.createTable( self._table, "fasta", expFileName )
|
|
148 obsFileName = "dummyFaFileObs_%s" % ( self._uniqId )
|
|
149 self._tsA.exportInFastaFile( obsFileName )
|
|
150 self.assertTrue( self.fileUtils.are2FilesIdentical( obsFileName, expFileName ) )
|
|
151 os.remove( expFileName )
|
|
152 os.remove( obsFileName )
|
|
153
|
|
154 ##################################################################################
|
|
155 ########################### Tests for other methods ##############################
|
|
156 ##################################################################################
|
|
157
|
|
158 def test_insertWithBioseqEmpty( self ):
|
|
159 bs = Bioseq( "", "" )
|
|
160 self._db.createTable( self._table, "fasta" )
|
|
161 exp = None
|
|
162 obs = self._tsA.insert(bs)
|
|
163 self.assertEqual( exp, obs )
|
|
164
|
|
165
|
|
166 def test_getBioseqFromHeader( self ):
|
|
167 faFileName = "dummyFaFile_%s" % ( self._uniqId )
|
|
168 faF = open( faFileName, "w" )
|
|
169 faF.write(">seq1\n")
|
|
170 faF.write("AGCGATGACGATGCGAGT\n")
|
|
171 faF.write(">seq2\n")
|
|
172 faF.write("GCGATGCAGATGACGGCGGATGC\n")
|
|
173 faF.close()
|
|
174 self._db.createTable( self._table, "fasta", faFileName )
|
|
175 exp = Bioseq( "seq1", "AGCGATGACGATGCGAGT" )
|
|
176 obs = self._tsA.getBioseqFromHeader( "seq1" )
|
|
177 self.assertEqual( obs, exp )
|
|
178 exp = Bioseq( "seq2", "GCGATGCAGATGACGGCGGATGC" )
|
|
179 obs = self._tsA.getBioseqFromHeader( "seq2" )
|
|
180 self.assertEqual( obs, exp )
|
|
181 os.remove( faFileName )
|
|
182
|
|
183
|
|
184 def test_getSeqLengthFromAccession( self ):
|
|
185 inFileName = "dummyFaFile_%s" % ( self._uniqId )
|
|
186 inF = open( inFileName, "w" )
|
|
187 inF.write(">seq1\n")
|
|
188 inF.write("AGCGATGACGATGCGAGT\n")
|
|
189 inF.write(">seq2\n")
|
|
190 inF.write("GCGATGCAGATGACGGCGGATGC\n")
|
|
191 inF.close()
|
|
192 self._db.createTable( self._table, "fasta", inFileName )
|
|
193 exp = 18
|
|
194 obs = self._tsA.getSeqLengthFromAccession( "seq1" )
|
|
195 self.assertEqual( obs, exp )
|
|
196 os.remove( inFileName )
|
|
197
|
|
198
|
|
199 def test_getSeqLengthFromDescription( self ):
|
|
200 inFileName = "dummyFaFile_%s" % ( self._uniqId )
|
|
201 inF = open( inFileName, "w" )
|
|
202 inF.write(">seq1 descriptionfield\n")
|
|
203 inF.write("AGCGATGACGATGCGAGT\n")
|
|
204 inF.write(">seq2 descriptionfield\n")
|
|
205 inF.write("GCGATGCAGATGACGGCGGATGC\n")
|
|
206 inF.close()
|
|
207 self._db.createTable( self._table, "fasta", inFileName )
|
|
208 exp = 18
|
|
209 obs = self._tsA.getSeqLengthFromDescription( "seq1 descriptionfield" )
|
|
210 self.assertEqual( obs, exp )
|
|
211 os.remove( inFileName )
|
|
212
|
|
213
|
|
214 def test_getAccessionAndLengthList( self ):
|
|
215 inFileName = "dummyFaFile_%s" % ( self._uniqId )
|
|
216 inF = open( inFileName, "w" )
|
|
217 inF.write(">seq1\n")
|
|
218 inF.write("AGCGATGACGATGCGAGT\n")
|
|
219 inF.write(">seq2\n")
|
|
220 inF.write("GCGATGCAGATGACGGCGGATGC\n")
|
|
221 inF.close()
|
|
222 self._db.createTable( self._table, "fasta", inFileName )
|
|
223 lSeq1 = ("seq1", 18)
|
|
224 lSeq2 = ("seq2", 23)
|
|
225 lExp = [lSeq1,lSeq2]
|
|
226 lObs = self._tsA.getAccessionAndLengthList()
|
|
227 self.assertEqual( lObs, lExp )
|
|
228 os.remove( inFileName )
|
|
229
|
|
230
|
|
231 def test_getSeqLengthFromAccessionWithSingleQuote( self ):
|
|
232 inFileName = "dummyFaFile_%s" % ( self._uniqId )
|
|
233 inF = open( inFileName, "w" )
|
|
234 inF.write(">seq1'\n")
|
|
235 inF.write("AGCGATGACGATGCGAGT\n")
|
|
236 inF.write(">seq2\n")
|
|
237 inF.write("GCGATGCAGATGACGGCGGATGC\n")
|
|
238 inF.close()
|
|
239 self._db.createTable( self._table, "fasta", inFileName )
|
|
240 exp = 18
|
|
241 obs = self._tsA.getSeqLengthFromAccession( "seq1'" )
|
|
242 self.assertEqual( obs, exp )
|
|
243 os.remove( inFileName )
|
|
244
|
|
245
|
|
246 def test_getSubSequence_directStrand( self ):
|
|
247 self._db.createTable( self._table, "seq" )
|
|
248 chr = Bioseq()
|
|
249 chr.setHeader( "chr2" )
|
|
250 chr.setSequence( "AAAAAAAAAATTTTTGGGGGGGGGG" )
|
|
251 self._tsA.insert( chr )
|
|
252 exp = "TTTGGG"
|
|
253 obs = self._tsA.getSubSequence( "chr2", 13, 18 )
|
|
254 self.assertEqual( exp, obs )
|
|
255
|
|
256
|
|
257 def test_getSubSequence_reverseStrand( self ):
|
|
258 self._db.createTable( self._table, "seq" )
|
|
259 chr = Bioseq()
|
|
260 chr.setHeader( "chr2" )
|
|
261 chr.setSequence( "AAAAAAAAAATTTTTGGGGGGGGGG" )
|
|
262 self._tsA.insert( chr )
|
|
263 exp = "CCCAAA"
|
|
264 obs = self._tsA.getSubSequence( "chr2", 18, 13 )
|
|
265 self.assertEqual( exp, obs )
|
|
266
|
|
267
|
|
268 def test_getBioseqFromSetList_directStrand( self ):
|
|
269 self._db.createTable( self._table, "seq" )
|
|
270 chr = Bioseq()
|
|
271 chr.setHeader( "chr2" )
|
|
272 chr.setSequence( "AAAAAAAAAATTTTTGGGGGGGGGG" )
|
|
273 self._tsA.insert( chr )
|
|
274 lSets = []
|
|
275 lSets.append( Set( 3, "Dm-B-G600-Map3_classI-LTR-incomp", "chr2", 1, 10 ) )
|
|
276 lSets.append( Set( 3, "Dm-B-G600-Map3_classI-LTR-incomp", "chr2", 16, 25 ) )
|
|
277 exp = Bioseq( "Dm-B-G600-Map3_classI-LTR-incomp::3 chr2 1..10,16..25", "AAAAAAAAAAGGGGGGGGGG" )
|
|
278 obs = self._tsA.getBioseqFromSetList( lSets )
|
|
279 self.assertEqual( exp, obs )
|
|
280
|
|
281
|
|
282 def test_getBioseqFromSetList_reverseStrand( self ):
|
|
283 self._db.createTable( self._table, "seq" )
|
|
284 chr = Bioseq()
|
|
285 chr.setHeader( "chr2" )
|
|
286 chr.setSequence( "AAAAAAAAAATTTTTGGGGGGGGGG" )
|
|
287 self._tsA.insert( chr )
|
|
288 lSets = []
|
|
289 lSets.append( Set( 3, "Dm-B-G600-Map3_classI-LTR-incomp", "chr2", 10, 1 ) )
|
|
290 lSets.append( Set( 3, "Dm-B-G600-Map3_classI-LTR-incomp", "chr2", 25, 16 ) )
|
|
291 exp = Bioseq( "Dm-B-G600-Map3_classI-LTR-incomp::3 chr2 25..16,10..1", "CCCCCCCCCCTTTTTTTTTT" )
|
|
292 obs = self._tsA.getBioseqFromSetList( lSets )
|
|
293 self.assertEqual( exp, obs )
|
|
294
|
|
295
|
|
296 def test_isAccessionInTable_true( self ):
|
|
297 self._db.createTable( self._table, "seq" )
|
|
298 chr = Bioseq()
|
|
299 chr.setHeader( "chr2" )
|
|
300 chr.setSequence( "AAAAAAAAAATTTTTGGGGGGGGGG" )
|
|
301 self._tsA.insert( chr )
|
|
302
|
|
303 obs = self._tsA.isAccessionInTable( "chr2" )
|
|
304 self.assertTrue( obs )
|
|
305
|
|
306
|
|
307 def test_isAccessionInTable_false( self ):
|
|
308 self._db.createTable( self._table, "seq" )
|
|
309 chr = Bioseq()
|
|
310 chr.setHeader( "chr2" )
|
|
311 chr.setSequence( "AAAAAAAAAATTTTTGGGGGGGGGG" )
|
|
312 self._tsA.insert( chr )
|
|
313
|
|
314 obs = self._tsA.isAccessionInTable( "chr1" )
|
|
315 self.assertFalse( obs )
|
|
316
|
|
317
|
|
318 test_suite = unittest.TestSuite()
|
|
319 test_suite.addTest( unittest.makeSuite( Test_TableSeqAdaptator ) )
|
|
320 if __name__ == "__main__":
|
|
321 unittest.TextTestRunner(verbosity=2).run( test_suite )
|