| 
18
 | 
     1 import unittest
 | 
| 
 | 
     2 import os
 | 
| 
 | 
     3 from SMART.Java.Python.FindOverlapsOptim import FindOverlapsOptim
 | 
| 
 | 
     4 from SMART.Java.Python.ncList.NCListCursor import NCListCursor
 | 
| 
 | 
     5 from SMART.Java.Python.ncList.test.MockFindOverlapsWithSeveralIntervals import *
 | 
| 
 | 
     6 
 | 
| 
 | 
     7 REFERENCE = 0
 | 
| 
 | 
     8 QUERY = 1
 | 
| 
 | 
     9 
 | 
| 
 | 
    10 class Test_FindOverlapsOptim(unittest.TestCase):
 | 
| 
 | 
    11 
 | 
| 
 | 
    12     def setUp(self):
 | 
| 
 | 
    13         self._inputRefGff3FileName = 'sorted_Ref.gff3'
 | 
| 
 | 
    14         self._inputQueryGff3FileName = 'sorted_Query.gff3'
 | 
| 
 | 
    15         self._writeQueryGff3File(self._inputQueryGff3FileName)
 | 
| 
 | 
    16         self._outputGff3FileName = 'overlaps.gff3'
 | 
| 
 | 
    17         iMock = MockFindOverlapsWithServeralIntervals_case1()
 | 
| 
 | 
    18         iMock.write(self._inputRefGff3FileName)
 | 
| 
 | 
    19         self._iFOO = FindOverlapsOptim(0)
 | 
| 
 | 
    20         self._iFOO.setRefFileName(self._inputRefGff3FileName, "gff3")
 | 
| 
 | 
    21         self._iFOO.setQueryFileName(self._inputQueryGff3FileName, "gff3")
 | 
| 
 | 
    22         self._iFOO.setOutputFileName(self._outputGff3FileName)
 | 
| 
 | 
    23         self._iFOO.prepareIntermediateFiles()
 | 
| 
 | 
    24         self._iFOO.createNCLists()
 | 
| 
 | 
    25         self._queryNcList = self._iFOO._ncLists[QUERY]["chr1"]
 | 
| 
 | 
    26         self._refNcList   = self._iFOO._ncLists[REFERENCE]["chr1"]
 | 
| 
 | 
    27         
 | 
| 
 | 
    28     def tearDown(self):
 | 
| 
 | 
    29         os.remove(self._inputRefGff3FileName)
 | 
| 
 | 
    30         os.remove(self._inputQueryGff3FileName)
 | 
| 
 | 
    31         os.remove(self._outputGff3FileName)
 | 
| 
 | 
    32     
 | 
| 
 | 
    33     def test_isOverlapping_true(self):
 | 
| 
 | 
    34         queryCursor = NCListCursor(None, self._queryNcList, 5, 0)
 | 
| 
 | 
    35         refCursor   = NCListCursor(None, self._refNcList,   4, 0)
 | 
| 
 | 
    36         obs = self._iFOO.isOverlapping(queryCursor, refCursor)
 | 
| 
 | 
    37         exp = 0
 | 
| 
 | 
    38         self.assertEquals(exp, obs)
 | 
| 
 | 
    39         
 | 
| 
 | 
    40     def test_isOverlapping_false_left(self):
 | 
| 
 | 
    41         queryCursor = NCListCursor(None, self._queryNcList, 5, 0)
 | 
| 
 | 
    42         refCursor   = NCListCursor(None, self._refNcList,   2, 0)
 | 
| 
 | 
    43         obs = self._iFOO.isOverlapping(queryCursor, refCursor)
 | 
| 
 | 
    44         exp = -1
 | 
| 
 | 
    45         self.assertEquals(exp, obs)
 | 
| 
 | 
    46     
 | 
| 
 | 
    47     def test_isOverlapping_false_right(self):
 | 
| 
 | 
    48         queryCursor = NCListCursor(None, self._queryNcList, 5, 0)
 | 
| 
 | 
    49         refCursor   = NCListCursor(None, self._refNcList,   1, 0)
 | 
| 
 | 
    50         obs = self._iFOO.isOverlapping(queryCursor, refCursor)
 | 
| 
 | 
    51         exp = 1
 | 
| 
 | 
    52         self.assertEquals(exp, obs) 
 | 
| 
 | 
    53            
 | 
| 
 | 
    54     def test_isLastElement_true(self):
 | 
| 
 | 
    55         refCursor = NCListCursor(None, self._refNcList, 4, 0)
 | 
| 
 | 
    56         obsBool   = refCursor.isLast()
 | 
| 
 | 
    57         expBool   = True
 | 
| 
 | 
    58         self.assertEquals(expBool, obsBool)
 | 
| 
 | 
    59     
 | 
| 
 | 
    60     def test_isLastElement_false(self):
 | 
| 
 | 
    61         refCursor = NCListCursor(None, self._refNcList, 3, 0)
 | 
| 
 | 
    62         obsBool   = refCursor.isLast()
 | 
| 
 | 
    63         expBool   = False
 | 
| 
 | 
    64         self.assertEquals(expBool, obsBool)  
 | 
| 
 | 
    65         
 | 
| 
 | 
    66     def test_isLastElement_highestLevel_true(self):
 | 
| 
 | 
    67         refCursor = NCListCursor(None, self._refNcList, 1, 0)
 | 
| 
 | 
    68         obsBool   = refCursor.isLast()
 | 
| 
 | 
    69         expBool   = True
 | 
| 
 | 
    70         self.assertEquals(expBool, obsBool)
 | 
| 
 | 
    71     
 | 
| 
 | 
    72     def test_isLastElement_highestLevel_false(self):
 | 
| 
 | 
    73         refCursor = NCListCursor(None, self._refNcList, 0, 0)
 | 
| 
 | 
    74         obsBool   = refCursor.isLast()
 | 
| 
 | 
    75         expBool   = False
 | 
| 
 | 
    76         self.assertEquals(expBool, obsBool)           
 | 
| 
 | 
    77 
 | 
| 
 | 
    78     def test_findOverlapIter(self):
 | 
| 
 | 
    79         queryCursor           = NCListCursor(None, self._queryNcList, 2, 0)
 | 
| 
 | 
    80         refCursor             = NCListCursor(None, self._refNcList,   0, 0)
 | 
| 
 | 
    81         queryTranscript       = queryCursor.getTranscript()
 | 
| 
 | 
    82         done                  = False
 | 
| 
 | 
    83         (cursor, done, empty) = self._iFOO.findOverlapIter(queryTranscript, refCursor, done)
 | 
| 
 | 
    84         obsFirstOverlapLAddr  = (cursor._lIndex, done, empty)
 | 
| 
 | 
    85         expFirstOverlapLAddr  = 4, True, False
 | 
| 
 | 
    86         self.assertEquals(expFirstOverlapLAddr, obsFirstOverlapLAddr)
 | 
| 
 | 
    87         
 | 
| 
 | 
    88     def test_not_findOverlapIter(self):
 | 
| 
 | 
    89         queryCursor           = NCListCursor(None, self._queryNcList, 4, 0)
 | 
| 
 | 
    90         refCursor             = NCListCursor(None, self._refNcList,   1, 0)
 | 
| 
 | 
    91         queryTranscript       = queryCursor.getTranscript()
 | 
| 
 | 
    92         done                  = False
 | 
| 
 | 
    93         (cursor, done, empty) = self._iFOO.findOverlapIter(queryTranscript, refCursor, done)
 | 
| 
 | 
    94         obsFirstOverlapLAddr  = (cursor._lIndex, done, empty)
 | 
| 
 | 
    95         expFirstOverlapLAddr  = -1, False, True
 | 
| 
 | 
    96         self.assertEquals(expFirstOverlapLAddr, obsFirstOverlapLAddr)
 | 
| 
 | 
    97         
 | 
| 
 | 
    98     def test_findOverlapIter_not_the_first_RefOverlap(self):
 | 
| 
 | 
    99         queryCursor           = NCListCursor(None, self._queryNcList, 3, 0)
 | 
| 
 | 
   100         refCursor             = NCListCursor(None, self._refNcList,   4, 0)
 | 
| 
 | 
   101         queryTranscript       = queryCursor.getTranscript()
 | 
| 
 | 
   102         done                  = True
 | 
| 
 | 
   103         (cursor, done, empty) = self._iFOO.findOverlapIter(queryTranscript, refCursor, done)
 | 
| 
 | 
   104         obsFirstOverlapLAddr  = (cursor._lIndex, done, empty)
 | 
| 
 | 
   105         expFirstOverlapLAddr  = 1, True, False
 | 
| 
 | 
   106         self.assertEquals(expFirstOverlapLAddr, obsFirstOverlapLAddr)
 | 
| 
 | 
   107         
 | 
| 
 | 
   108     def test_moveDown(self):
 | 
| 
 | 
   109         refCursor = NCListCursor(None, self._refNcList, 0, 0)
 | 
| 
 | 
   110         refCursor.moveDown()
 | 
| 
 | 
   111         expFirstChildLAddr = 2
 | 
| 
 | 
   112         self.assertEquals(expFirstChildLAddr, refCursor._lIndex) 
 | 
| 
 | 
   113     
 | 
| 
 | 
   114     def test_moveUp(self):
 | 
| 
 | 
   115         refCursor = NCListCursor(None, self._refNcList, 4, 0)
 | 
| 
 | 
   116         refCursor.moveUp()
 | 
| 
 | 
   117         expFirstChildLAddr = 0
 | 
| 
 | 
   118         self.assertEquals(expFirstChildLAddr, refCursor._lIndex) 
 | 
| 
 | 
   119     
 | 
| 
 | 
   120     def test_moveRight(self):
 | 
| 
 | 
   121         refCursor = NCListCursor(None, self._refNcList, 3, 0)
 | 
| 
 | 
   122         refCursor.moveRight()
 | 
| 
 | 
   123         expFirstChildLAddr = 4
 | 
| 
 | 
   124         self.assertEquals(expFirstChildLAddr, refCursor._lIndex) 
 | 
| 
 | 
   125        
 | 
| 
 | 
   126     def test_moveNext(self):
 | 
| 
 | 
   127         refCursor = NCListCursor(None, self._refNcList, 6, 0)
 | 
| 
 | 
   128         refCursor.moveNext()
 | 
| 
 | 
   129         expFirstChildLAddr = 1
 | 
| 
 | 
   130         self.assertEquals(expFirstChildLAddr, refCursor._lIndex) 
 | 
| 
 | 
   131 
 | 
| 
 | 
   132     def test_not_findOverlapIter_between2RefIntervals(self):
 | 
| 
 | 
   133         inputQueryGff3FileName = 'query2.gff3'
 | 
| 
 | 
   134         self._writeQueryGff3File2(inputQueryGff3FileName)
 | 
| 
 | 
   135         self._outputGff3FileName = 'overlaps.gff3'
 | 
| 
 | 
   136         iMock = MockFindOverlapsWithServeralIntervals_case1()
 | 
| 
 | 
   137         iMock.write(self._inputRefGff3FileName)
 | 
| 
 | 
   138         _iFOO = FindOverlapsOptim(0)
 | 
| 
 | 
   139         _iFOO.setRefFileName(self._inputRefGff3FileName, "gff3")
 | 
| 
 | 
   140         _iFOO.setQueryFileName(inputQueryGff3FileName, "gff3")
 | 
| 
 | 
   141         _iFOO.setOutputFileName(self._outputGff3FileName)
 | 
| 
 | 
   142         _iFOO.prepareIntermediateFiles()
 | 
| 
 | 
   143         _iFOO.createNCLists()
 | 
| 
 | 
   144         _queryNcList          = _iFOO._ncLists[QUERY]["chr1"]
 | 
| 
 | 
   145         _refNcList            = _iFOO._ncLists[REFERENCE]["chr1"]
 | 
| 
 | 
   146         queryCursor           = NCListCursor(None, _queryNcList, 0, 0)
 | 
| 
 | 
   147         refCursor             = NCListCursor(None, _refNcList,   0, 0)
 | 
| 
 | 
   148         queryTranscript       = queryCursor.getTranscript()
 | 
| 
 | 
   149         done                  = True
 | 
| 
 | 
   150         (cursor, done, empty) = _iFOO.findOverlapIter(queryTranscript, refCursor, done)
 | 
| 
 | 
   151         lIndex                = cursor._lIndex
 | 
| 
 | 
   152         obsFirstOverlapLAddr  = (lIndex, done, empty)
 | 
| 
 | 
   153         expFirstOverlapLAddr  = 1, False, True
 | 
| 
 | 
   154         self.assertEquals(expFirstOverlapLAddr, obsFirstOverlapLAddr)
 | 
| 
 | 
   155         os.remove(inputQueryGff3FileName) 
 | 
| 
 | 
   156 
 | 
| 
 | 
   157     def _writeQueryGff3File2(self, fileName):
 | 
| 
 | 
   158         f = open(fileName, 'w')
 | 
| 
 | 
   159         f.write("chr1\tquery\ttest1\t1100\t1150\t126\t+\t.\tID=test1.1;Name=test1.1\n")
 | 
| 
 | 
   160         f.write("chr1\tquery\ttest2\t1250\t1300\t781\t+\t.\tID=test1.2;Name=test1.2\n")
 | 
| 
 | 
   161         f.close()  
 | 
| 
 | 
   162         
 | 
| 
 | 
   163     def _writeQueryGff3File(self, fileName):
 | 
| 
 | 
   164         f = open(fileName, 'w')
 | 
| 
 | 
   165         f.write("chr1\tquery\ttest1.1\t25\t150\t126\t+\t.\tID=test1.1;Name=test1.1\n")
 | 
| 
 | 
   166         f.write("chr1\tquery\ttest1.2\t70\t850\t781\t+\t.\tID=test1.2;Name=test1.2\n")
 | 
| 
 | 
   167         f.write("chr1\tquery\ttest1.3\t550\t850\t201\t+\t.\tID=test1.3;Name=test1.3\n")
 | 
| 
 | 
   168         f.write("chr1\tquery\ttest1.4\t925\t1025\t101\t+\t.\tID=test1.4;Name=test1.4\n")
 | 
| 
 | 
   169         f.write("chr1\tquery\ttest1.5\t1201\t1210\t10\t+\t.\tID=test1.5;Name=test1.5\n")
 | 
| 
 | 
   170         f.write("chr1\tquery\ttest1.6\t1500\t1600\t101\t+\t.\tID=test1.6;Name=test1.6\n")
 | 
| 
 | 
   171         f.close()
 | 
| 
 | 
   172         
 | 
| 
 | 
   173 if __name__ == "__main__":
 | 
| 
 | 
   174     unittest.main()
 |