view commons/tools/tests/Test_ChangeSequenceHeaders.py @ 31:0ab839023fe4

Uploaded
author m-zytnicki
date Tue, 30 Apr 2013 14:33:21 -0400
parents 94ab73e8a190
children
line wrap: on
line source

# Copyright INRA (Institut National de la Recherche Agronomique)
# http://www.inra.fr
# http://urgi.versailles.inra.fr
#
# This software is governed by the CeCILL license under French law and
# abiding by the rules of distribution of free software.  You can  use, 
# modify and/ or redistribute the software under the terms of the CeCILL
# license as circulated by CEA, CNRS and INRIA at the following URL
# "http://www.cecill.info". 
#
# As a counterpart to the access to the source code and  rights to copy,
# modify and redistribute granted by the license, users are provided only
# with a limited warranty  and the software's author,  the holder of the
# economic rights,  and the successive licensors  have only  limited
# liability. 
#
# In this respect, the user's attention is drawn to the risks associated
# with loading,  using,  modifying and/or developing or reproducing the
# software by the user in light of its specific status of free software,
# that may mean  that it is complicated to manipulate,  and  that  also
# therefore means  that it is reserved for developers  and  experienced
# professionals having in-depth computer knowledge. Users are therefore
# encouraged to load and test the software's suitability as regards their
# requirements in conditions enabling the security of their systems and/or 
# data to be ensured and,  more generally, to use and operate it in the 
# same conditions as regards security. 
#
# The fact that you are presently reading this means that you have had
# knowledge of the CeCILL license and that you accept its terms.


import unittest
import os
import time
from commons.core.utils.FileUtils import FileUtils
from commons.tools.ChangeSequenceHeaders import ChangeSequenceHeaders


class Test_ChangeSequenceHeaders( unittest.TestCase ):
    
    def setUp( self ):
        self._i = ChangeSequenceHeaders()
        self._uniqId = "%s_%s" % ( time.strftime("%Y%m%d%H%M%S") , os.getpid() )
        
        
    def tearDown( self ):
        self._i = None
        self._uniqId = None
        
        
    def test_script_no_input_file( self ):
        cDir = os.getcwd()
        
        inFile = "dummyInFaFile_%s" % ( self._uniqId )
        
        obsFile = "dummyObsFile_%s" % ( self._uniqId )
        
        cmd = "python ../ChangeSequenceHeaders.py"
        cmd += " -i %s" % ( inFile )
        cmd += " -f fasta"
        cmd += " -s 1"
        cmd += " -p TE"
        cmd += " -o %s" % ( obsFile )
        exitStatus = os.system( cmd )
        
        self.assertFalse( exitStatus == 0 )
        
        os.chdir( cDir )
        
        
    def test_shortenSequenceHeadersForFastaFile_fasta_script( self ):
        cDir = os.getcwd()
        
        inFile = "dummyInFaFile_%s" % ( self._uniqId )
        inF = open( inFile, "w" )
        inF.write( ">DmelChr4-B-G387-MAP16\nATGTACGATGACGATCAG\n" )
        inF.write( ">consensus524\nGTGCGGATGGAACAGT\n" )
        inF.close()
        
        linkFile = "dummyLinkFile_%s" % ( self._uniqId )
        
        expFile = "dummyExpFile_%s" % ( self._uniqId )
        expF = open( expFile, "w" )
        expF.write( ">TE1\nATGTACGATGACGATCAG\n" )
        expF.write( ">TE2\nGTGCGGATGGAACAGT\n" )
        expF.close()
        
        obsFile = "dummyObsFile_%s" % ( self._uniqId )
        
        cmd = "python ../ChangeSequenceHeaders.py"
        cmd += " -i %s" % ( inFile )
        cmd += " -f fasta"
        cmd += " -s 1"
        cmd += " -p TE"
        cmd += " -l %s" % ( linkFile )
        cmd += " -o %s" % ( obsFile )
        exitStatus = os.system( cmd )
        
        self.assertTrue( exitStatus == 0 )
        self.assertTrue( FileUtils.are2FilesIdentical( expFile, obsFile ) )
        
        for f in [ inFile, linkFile, expFile, obsFile ]:
            os.remove( f )
        os.chdir( cDir )
        
        
    def test_retrieveInitialSequenceHeaders_fasta_script( self ):
        cDir = os.getcwd()
        
        inFile = "dummyInFaFile_%s" % ( self._uniqId )
        inF = open( inFile, "w" )
        inF.write( ">seq2\nATGTACGATGACGATCAG\n" )
        inF.write( ">seq1\nGTGCGGATGGAACAGT\n" )
        inF.close()
        
        linkFile = "dummyLinkFile_%s" % ( self._uniqId )
        linkF = open( linkFile, "w" )
        linkF.write( "seq1\tconsensus524\t1\t18\n" )
        linkF.write( "seq2\tDmelChr4-B-G387-MAP16\t1\t16\n" )
        linkF.write( "seq3\treference2\n" )
        linkF.close()
        
        expFile = "dummyExpFile_%s" % ( self._uniqId )
        expF = open( expFile, "w" )
        expF.write( ">DmelChr4-B-G387-MAP16\nATGTACGATGACGATCAG\n" )
        expF.write( ">consensus524\nGTGCGGATGGAACAGT\n" )
        expF.close()
        
        obsFile = "dummyObsFile_%s" % ( self._uniqId )
        
        cmd = "python ../ChangeSequenceHeaders.py"
        cmd += " -i %s" % ( inFile )
        cmd += " -f fasta"
        cmd += " -s 2"
        cmd += " -l %s" % ( linkFile )
        cmd += " -o %s" % ( obsFile )
        exitStatus = os.system( cmd )
        
        self.assertTrue( exitStatus == 0 )
        self.assertTrue( FileUtils.are2FilesIdentical( expFile, obsFile ) )
        
        for f in [ inFile, linkFile, expFile, obsFile ]:
            os.remove( f )
        os.chdir( cDir )
        
    def test_retrieveInitialSequenceHeaders_fastaFromClustering_afterLTRHarvest_Blastclust( self ):
        cDir = os.getcwd()
        
        inFile = "dummyInFaFile_%s" % ( self._uniqId )
        inF = open( inFile, "w" )
        inF.write( ">BlastclustCluster1Mb3_seq2\nATGTACGATGACGATCAG\n" )
        inF.write( ">BlastclustCluster8Mb4_seq1\nGTGCGGATGGAACAGT\n" )
        inF.close()
        
        linkFile = "dummyLinkFile_%s" % ( self._uniqId )
        linkF = open( linkFile, "w" )
        linkF.write( "seq1\tchunk1 (dbseq-nr 1) [41806,41825]\t1\t18\n" )
        linkF.write( "seq2\tchunk2 (dbseq-nr 6) [41006,41023]\t1\t16\n" )
        linkF.write( "seq3\treference2\n" )
        linkF.close()
        
        expFile = "dummyExpFile_%s" % ( self._uniqId )
        expF = open( expFile, "w" )
        expF.write( ">BlastclustCluster1Mb3_chunk2 (dbseq-nr 6) [41006,41023]\nATGTACGATGACGATCAG\n" )
        expF.write( ">BlastclustCluster8Mb4_chunk1 (dbseq-nr 1) [41806,41825]\nGTGCGGATGGAACAGT\n" )
        expF.close()
        
        obsFile = "dummyObsFile_%s" % ( self._uniqId )
        
        whichCluster = "A"
        cmd = "python ../ChangeSequenceHeaders.py"
        cmd += " -i %s" % ( inFile )
        cmd += " -f fasta"
        cmd += " -s 2"
        cmd += " -l %s" % ( linkFile )
        cmd += " -w %s" % ( whichCluster )
        cmd += " -o %s" % ( obsFile )
        exitStatus = os.system( cmd )
        
        self.assertTrue( exitStatus == 0 )
        self.assertTrue( FileUtils.are2FilesIdentical( expFile, obsFile ) )
        
        for f in [ inFile, linkFile, expFile, obsFile ]:
            os.remove( f )
        os.chdir( cDir )
        
    def test_retrieveInitialSequenceHeaders_fastaFromClustering_forClusterConsensus_Blastclust( self ):
        cDir = os.getcwd()
        
        inFile = "dummyInFaFile_%s" % ( self._uniqId )
        inF = open( inFile, "w" )
        inF.write( ">BlastclustCluster8Mb4_seq1\nGTGCGGATGGAACAGT\n" )
        inF.write( ">BlastclustCluster1Mb3_seq2\nATGTACGATGACGATCAG\n" )
        inF.close()
        
        linkFile = "dummyLinkFile_%s" % ( self._uniqId )
        linkF = open( linkFile, "w" )
        linkF.write( "seq1\tDHX-incomp_DmelChr4-B-R1-Map4\t1\t18\n" )
        linkF.write( "seq2\tRLX-incomp_DmelChr4-B-R12-Map3_reversed\t1\t16\n" )
        linkF.write( "seq3\treference2\n" )
        linkF.close()
        
        expFile = "dummyExpFile_%s" % ( self._uniqId )
        expF = open( expFile, "w" )
        expF.write( ">DHX-incomp_Blc8_DmelChr4-B-R1-Map4\nGTGCGGATGGAACAGT\n" )
        expF.write( ">RLX-incomp_Blc1_DmelChr4-B-R12-Map3_reversed\nATGTACGATGACGATCAG\n" )
        expF.close()
        
        obsFile = "dummyObsFile_%s" % ( self._uniqId )
        
        whichCluster = "B"
        cmd = "python ../ChangeSequenceHeaders.py"
        cmd += " -i %s" % ( inFile )
        cmd += " -f fasta"
        cmd += " -s 2"
        cmd += " -l %s" % ( linkFile )
        cmd += " -w %s" % ( whichCluster )
        cmd += " -o %s" % ( obsFile )
        exitStatus = os.system( cmd )
        
        self.assertTrue( exitStatus == 0 )
        self.assertTrue( FileUtils.are2FilesIdentical( expFile, obsFile ) )
        
        for f in [ inFile, linkFile, expFile, obsFile ]:
            os.remove( f )
        os.chdir( cDir )
        
    def test_retrieveInitialSequenceHeaders_fastaFromClustering_afterLTRHarvest_MCL( self ):
        cDir = os.getcwd()
        
        inFile = "dummyInFaFile_%s" % ( self._uniqId )
        inF = open( inFile, "w" )
        inF.write( ">MCLCluster1Mb3_seq2\nATGTACGATGACGATCAG\n" )
        inF.write( ">MCLCluster8Mb4_seq1\nGTGCGGATGGAACAGT\n" )
        inF.close()
        
        linkFile = "dummyLinkFile_%s" % ( self._uniqId )
        linkF = open( linkFile, "w" )
        linkF.write( "seq1\tchunk1 (dbseq-nr 1) [41806,41825]\t1\t18\n" )
        linkF.write( "seq2\tchunk2 (dbseq-nr 6) [41006,41023]\t1\t16\n" )
        linkF.write( "seq3\treference2\n" )
        linkF.close()
        
        expFile = "dummyExpFile_%s" % ( self._uniqId )
        expF = open( expFile, "w" )
        expF.write( ">MCLCluster1Mb3_chunk2 (dbseq-nr 6) [41006,41023]\nATGTACGATGACGATCAG\n" )
        expF.write( ">MCLCluster8Mb4_chunk1 (dbseq-nr 1) [41806,41825]\nGTGCGGATGGAACAGT\n" )
        expF.close()
        
        obsFile = "dummyObsFile_%s" % ( self._uniqId )
        
        whichCluster = "A"
        cmd = "python ../ChangeSequenceHeaders.py"
        cmd += " -i %s" % ( inFile )
        cmd += " -f fasta"
        cmd += " -s 2"
        cmd += " -l %s" % ( linkFile )
        cmd += " -w %s" % ( whichCluster )
        cmd += " -o %s" % ( obsFile )
        exitStatus = os.system( cmd )
        
        self.assertTrue( exitStatus == 0 )
        self.assertTrue( FileUtils.are2FilesIdentical( expFile, obsFile ) )
        
        for f in [ inFile, linkFile, expFile, obsFile ]:
            os.remove( f )
        os.chdir( cDir )
        
    def test_retrieveInitialSequenceHeaders_fastaFromClustering_forClusterConsensus_MCL( self ):
        cDir = os.getcwd()
        
        inFile = "dummyInFaFile_%s" % ( self._uniqId )
        inF = open( inFile, "w" )
        inF.write( ">MCLCluster8Mb4_seq1\nGTGCGGATGGAACAGT\n" )
        inF.write( ">MCLCluster1Mb3_seq2\nATGTACGATGACGATCAG\n" )
        inF.close()
        
        linkFile = "dummyLinkFile_%s" % ( self._uniqId )
        linkF = open( linkFile, "w" )
        linkF.write( "seq1\tDHX-incomp_DmelChr4-B-R1-Map4\t1\t18\n" )
        linkF.write( "seq2\tRLX-incomp_DmelChr4-B-R12-Map3_reversed\t1\t16\n" )
        linkF.write( "seq3\treference2\n" )
        linkF.close()
        
        expFile = "dummyExpFile_%s" % ( self._uniqId )
        expF = open( expFile, "w" )
        expF.write( ">DHX-incomp_MCL8_DmelChr4-B-R1-Map4\nGTGCGGATGGAACAGT\n" )
        expF.write( ">RLX-incomp_MCL1_DmelChr4-B-R12-Map3_reversed\nATGTACGATGACGATCAG\n" )
        expF.close()
        
        obsFile = "dummyObsFile_%s" % ( self._uniqId )
        
        whichCluster = "B"
        cmd = "python ../ChangeSequenceHeaders.py"
        cmd += " -i %s" % ( inFile )
        cmd += " -f fasta"
        cmd += " -s 2"
        cmd += " -l %s" % ( linkFile )
        cmd += " -w %s" % ( whichCluster )
        cmd += " -o %s" % ( obsFile )
        exitStatus = os.system( cmd )
        
        self.assertTrue( exitStatus == 0 )
        self.assertTrue( FileUtils.are2FilesIdentical( expFile, obsFile ) )
        
        for f in [ inFile, linkFile, expFile, obsFile ]:
            os.remove( f )
        os.chdir( cDir )
        
    def test_retrieveInitialSequenceHeaders_newick_script( self ):
        cDir = os.getcwd()
        
        inFile = "dummyInFile_%s" % ( self._uniqId )
        inF = open( inFile, "w" )
        inF.write( "(seq4:0.012511,(seq3:0.005340,seq2:0.002201))" )
        inF.close()
        
        linkFile = "dummyLinkFile_%s" % ( self._uniqId )
        linkF = open( linkFile, "w" )
        linkF.write( "seq1\t1360\n" )
        linkF.write( "seq2\tDmelChr4-B-P2.0-MAP3_classII-TIR-comp|1cl-1gr|26copies\n" )
        linkF.write( "seq3\tDmelChr4-B-G20-MAP3_classII-TIR-comp|1cl-1gr|53copies\n" )
        linkF.write( "seq4\tDmelChr4-B-G14-MAP17_classII-TIR-comp|1cl-1gr|41copies\n" )
        linkF.close()
        
        expFile = "dummyExpFile_%s" % ( self._uniqId )
        expF = open( expFile, "w" )
        expF.write( "(DmelChr4-B-G14-MAP17_classII-TIR-comp|1cl-1gr|41copies:0.012511,(DmelChr4-B-G20-MAP3_classII-TIR-comp|1cl-1gr|53copies:0.005340,DmelChr4-B-P2.0-MAP3_classII-TIR-comp|1cl-1gr|26copies:0.002201))" )
        expF.close()
        
        obsFile = "dummyObsFile_%s" % ( self._uniqId )
        
        cmd = "python ../ChangeSequenceHeaders.py"
        cmd += " -i %s" % ( inFile )
        cmd += " -f newick"
        cmd += " -s 2"
        cmd += " -l %s" % ( linkFile )
        cmd += " -o %s" % ( obsFile )
        exitStatus = os.system( cmd )
        
        self.assertTrue( exitStatus == 0 )
        self.assertTrue( FileUtils.are2FilesIdentical( expFile, obsFile ) )
        
        for f in [ inFile, linkFile, expFile, obsFile ]:
            os.remove( f )
        os.chdir( cDir )
        
        
    def test_retrieveInitialSequenceHeadersForAlignFile( self ):
        cDir = os.getcwd()
        
        linkFile = "dummyLinkFile_%s" % ( self._uniqId )
        linkF = open( linkFile, "w" )
        linkF.write( "seq1\tname=Dm_Blaster_Piler_30.38_Map_8|category=classI|order=LTR|completeness=comp\t1\t1000\n" )
        linkF.write( "seq2\tname=Dm_Blaster_Recon_34_Map_20|category=classI|order=LTR|completeness=comp\t1\t800\n" )
        linkF.close()
        
        inFile = "dummyAlignFile_%s" % ( self._uniqId )
        inFileHandler = open( inFile, "w" )
        inFileHandler.write( "seq1\t1\t100\tseq2\t110\t11\t1e-38\t254\t98.5\n" )
        inFileHandler.write( "seq2\t11\t110\tseq1\t100\t1\t1e-38\t254\t98.5\n" )
        inFileHandler.close()
        
        expFile = "dummyExpAlignFile_%s" % ( self._uniqId )
        expFileHandler = open( expFile, "w" )
        expFileHandler.write( "name=Dm_Blaster_Piler_30.38_Map_8|category=classI|order=LTR|completeness=comp\t1\t100\tname=Dm_Blaster_Recon_34_Map_20|category=classI|order=LTR|completeness=comp\t110\t11\t1e-38\t254\t98.500000\n" )
        expFileHandler.write( "name=Dm_Blaster_Recon_34_Map_20|category=classI|order=LTR|completeness=comp\t11\t110\tname=Dm_Blaster_Piler_30.38_Map_8|category=classI|order=LTR|completeness=comp\t100\t1\t1e-38\t254\t98.500000\n" )
        expFileHandler.close()
        
        obsFile = "dummyObsAlignFile_%s" % ( self._uniqId )
        
        cmd = "python ../ChangeSequenceHeaders.py"
        cmd += " -i %s" % ( inFile )
        cmd += " -f align"
        cmd += " -s 2"
        cmd += " -l %s" % ( linkFile )
        cmd += " -o %s" % ( obsFile )
        exitStatus = os.system( cmd )
        
        self.assertTrue( exitStatus == 0 )
        self.assertTrue( FileUtils.are2FilesIdentical( expFile, obsFile ) )
        
        for f in [ inFile, linkFile, expFile, obsFile ]:
            os.remove( f )
        os.chdir( cDir )
        
        
    def test_retrieveInitialSequenceHeadersForPathFile( self ):
        cDir = os.getcwd()
        
        linkFile = "dummyLinkFile_%s" % ( self._uniqId )
        linkF = open( linkFile, "w" )
        linkF.write( "seq1\tname=Dm_Blaster_Piler_30.38_Map_8|category=classI|order=LTR|completeness=comp\t1\t1000\n" )
        linkF.write( "seq2\tname=Dm_Blaster_Recon_34_Map_20|category=classI|order=LTR|completeness=comp\t1\t800\n" )
        linkF.close()
        
        inFile = "dummyAlignFile_%s" % ( self._uniqId )
        inFileHandler = open( inFile, "w" )
        inFileHandler.write( "11\tseq1\t1\t100\tseq2\t110\t11\t1e-38\t254\t98.5\n" )
        inFileHandler.write( "2\tseq2\t11\t110\tseq1\t100\t1\t1e-38\t254\t98.5\n" )
        inFileHandler.close()
        
        expFile = "dummyExpAlignFile_%s" % ( self._uniqId )
        expFileHandler = open( expFile, "w" )
        expFileHandler.write( "11\tname=Dm_Blaster_Piler_30.38_Map_8|category=classI|order=LTR|completeness=comp\t1\t100\tname=Dm_Blaster_Recon_34_Map_20|category=classI|order=LTR|completeness=comp\t110\t11\t1e-38\t254\t98.500000\n" )
        expFileHandler.write( "2\tname=Dm_Blaster_Recon_34_Map_20|category=classI|order=LTR|completeness=comp\t11\t110\tname=Dm_Blaster_Piler_30.38_Map_8|category=classI|order=LTR|completeness=comp\t100\t1\t1e-38\t254\t98.500000\n" )
        expFileHandler.close()
        
        obsFile = "dummyObsAlignFile_%s" % ( self._uniqId )
        
        cmd = "python ../ChangeSequenceHeaders.py"
        cmd += " -i %s" % ( inFile )
        cmd += " -f path"
        cmd += " -s 2"
        cmd += " -l %s" % ( linkFile )
        cmd += " -o %s" % ( obsFile )
        exitStatus = os.system( cmd )
        
        self.assertTrue( exitStatus == 0 )
        self.assertTrue( FileUtils.are2FilesIdentical( expFile, obsFile ) )
        
        for f in [ inFile, linkFile, expFile, obsFile ]:
            os.remove( f )
        os.chdir( cDir )
        
        
if __name__ == "__main__":
        unittest.main()