view data_manager/data_manager_diamond_database_builder.py @ 0:ce62d0912b10 draft

Imported from capsule None
author bgruening
date Sun, 08 Feb 2015 10:05:20 -0500
parents
children 5a0d0bee4f8d
line wrap: on
line source

#!/usr/bin/env python
import sys
import os
import tempfile
import shutil
import optparse
import urllib2
import subprocess
from ftplib import FTP
import tarfile
import zipfile
import gzip
import bz2

from galaxy.util.json import from_json_string, to_json_string

CHUNK_SIZE = 2**20 #1mb

def cleanup_before_exit( tmp_dir ):
    if tmp_dir and os.path.exists( tmp_dir ):
        shutil.rmtree( tmp_dir )


def stop_err(msg):
    sys.stderr.write(msg)
    sys.exit(1)


def _get_files_in_ftp_path( ftp, path ):
    path_contents = []
    ftp.retrlines( 'MLSD %s' % ( path ), path_contents.append )
    return [ line.split( ';' )[ -1 ].lstrip() for line in path_contents ]


def _get_stream_readers_for_tar( file_obj, tmp_dir ):
    fasta_tar = tarfile.open( fileobj=file_obj, mode='r:*' )
    return [ fasta_tar.extractfile( member ) for member in fasta_tar.getmembers() ]


def _get_stream_readers_for_zip( file_obj, tmp_dir ):
    fasta_zip = zipfile.ZipFile( file_obj, 'r' )
    rval = []
    for member in fasta_zip.namelist():
        fasta_zip.extract( member, tmp_dir )
        rval.append( open( os.path.join( tmp_dir, member ), 'rb' ) )
    return rval


def _get_stream_readers_for_gzip( file_obj, tmp_dir ):
    return [ gzip.GzipFile( fileobj=file_obj, mode='rb' ) ]


def _get_stream_readers_for_bz2( file_obj, tmp_dir ):
    return [ bz2.BZ2File( file_obj.name, 'rb' ) ]


def download_from_ncbi( data_manager_dict, params, target_directory, database_id, database_name ):
    NCBI_FTP_SERVER = 'ftp.ncbi.nlm.nih.gov'
    NCBI_DOWNLOAD_PATH = '/blast/db/FASTA/'
    COMPRESSED_EXTENSIONS = [ ( '.tar.gz', _get_stream_readers_for_tar ), ( '.tar.bz2', _get_stream_readers_for_tar ), ( '.zip', _get_stream_readers_for_zip ), ( '.gz', _get_stream_readers_for_gzip ), ( '.bz2', _get_stream_readers_for_bz2 ) ]

    ncbi_identifier = params['param_dict']['reference_source']['requested_identifier']
    ftp = FTP( NCBI_FTP_SERVER )
    ftp.login()

    path_contents = _get_files_in_ftp_path( ftp, NCBI_DOWNLOAD_PATH )

    ncbi_file_name = None
    get_stream_reader = None
    ext = None
    for ext, get_stream_reader in COMPRESSED_EXTENSIONS:
        if "%s%s" % ( ncbi_identifier, ext ) in path_contents:
            ncbi_file_name = "%s%s%s" % ( NCBI_DOWNLOAD_PATH, ncbi_identifier, ext )
            break

    if not ncbi_file_name:
        raise Exception( 'Unable to determine filename for NCBI database for %s: %s' % ( ncbi_identifier, path_contents ) )

    tmp_dir = tempfile.mkdtemp( prefix='tmp-data-manager-ncbi-' )
    ncbi_fasta_filename = os.path.join( tmp_dir, "%s%s" % ( ncbi_identifier, ext ) )
    
    fasta_base_filename = "%s.fa" % database_id
    fasta_filename = os.path.join( target_directory, fasta_base_filename )
    fasta_writer = open( fasta_filename, 'wb+' )
    
    tmp_extract_dir = os.path.join ( tmp_dir, 'extracted_fasta' )
    os.mkdir( tmp_extract_dir )
    
    tmp_fasta = open( ncbi_fasta_filename, 'wb+' )
    
    ftp.retrbinary( 'RETR %s' % ncbi_file_name, tmp_fasta.write )
    
    tmp_fasta.flush()
    tmp_fasta.seek( 0 )
    
    fasta_readers = get_stream_reader( tmp_fasta, tmp_extract_dir )
    
    data_table_entry = _stream_fasta_to_file( fasta_readers, target_directory, database_id, database_name, params )
    _add_data_table_entry( data_manager_dict, data_table_entry )
    
    for fasta_reader in fasta_readers:
        fasta_reader.close()
    tmp_fasta.close()
    cleanup_before_exit( tmp_dir )


def download_from_url( data_manager_dict, params, target_directory, database_id, database_name ):
    #TODO: we should automatically do decompression here
    urls = filter( bool, map( lambda x: x.strip(), params['param_dict']['reference_source']['user_url'].split( '\n' ) ) )
    fasta_reader = [ urllib2.urlopen( url ) for url in urls ]
    
    data_table_entry = _stream_fasta_to_file( fasta_reader, target_directory, database_id, database_name, params )
    _add_data_table_entry( data_manager_dict, data_table_entry )


def download_from_history( data_manager_dict, params, target_directory, database_id, database_name ):
    #TODO: allow multiple FASTA input files
    input_filename = params['param_dict']['reference_source']['input_fasta']
    if isinstance( input_filename, list ):
        fasta_reader = [ open( filename, 'rb' ) for filename in input_filename ]
    else:
        fasta_reader = open( input_filename )
    
    data_table_entry = _stream_fasta_to_file( fasta_reader, target_directory, database_id, database_name, params )
    _add_data_table_entry( data_manager_dict, data_table_entry )


def copy_from_directory( data_manager_dict, params, target_directory, database_id, database_name ):
    input_filename = params['param_dict']['reference_source']['fasta_filename']
    create_symlink = params['param_dict']['reference_source']['create_symlink'] == 'create_symlink'
    if create_symlink:
        data_table_entry = _create_symlink( input_filename, target_directory, database_id, database_name )
    else:
        if isinstance( input_filename, list ):
            fasta_reader = [ open( filename, 'rb' ) for filename in input_filename ]
        else:
            fasta_reader = open( input_filename )    
        data_table_entry = _stream_fasta_to_file( fasta_reader, target_directory, database_id, database_name, params )
    _add_data_table_entry( data_manager_dict, data_table_entry )


def _add_data_table_entry( data_manager_dict, data_table_entry ):
    data_manager_dict['data_tables'] = data_manager_dict.get( 'data_tables', {} )
    data_manager_dict['data_tables']['diamond_database'] = data_manager_dict['data_tables'].get( 'diamond_database', [] )
    data_manager_dict['data_tables']['diamond_database'].append( data_table_entry )
    return data_manager_dict


def _stream_fasta_to_file( fasta_stream, target_directory, database_id, database_name, params, close_stream=True ):
    fasta_base_filename = "%s.fa" % database_id
    fasta_filename = os.path.join( target_directory, fasta_base_filename )

    temp_fasta = tempfile.NamedTemporaryFile( delete=False, suffix=".fasta" )
    temp_fasta.close()
    fasta_writer = open( temp_fasta.name, 'wb+' )

    if isinstance( fasta_stream, list ) and len( fasta_stream ) == 1:
        fasta_stream = fasta_stream[0]

    if isinstance( fasta_stream, list ):
        last_char = None
        for fh in fasta_stream:
            if last_char not in [ None, '\n', '\r' ]:
                fasta_writer.write( '\n' )
            while True:
                data = fh.read( CHUNK_SIZE )
                if data:
                    fasta_writer.write( data )
                    last_char = data[-1]
                else:
                    break
            if close_stream:
                fh.close()
    else:
        while True:
            data = fasta_stream.read( CHUNK_SIZE )
            if data:
                fasta_writer.write( data )
            else:
                break
        if close_stream:
            fasta_stream.close()

    fasta_writer.close()

    args = [ 'diamond', 'makedb', '--in', temp_fasta.name, '--db', fasta_filename]

    tmp_stderr = tempfile.NamedTemporaryFile( prefix = "tmp-data-manager-diamond-database-builder-stderr" )
    proc = subprocess.Popen( args=args, shell=False, cwd=target_directory, stderr=tmp_stderr.fileno() )
    return_code = proc.wait()
    if return_code:
        tmp_stderr.flush()
        tmp_stderr.seek(0)
        print >> sys.stderr, "Error building diamond database:"
        while True:
            chunk = tmp_stderr.read( CHUNK_SIZE )
            if not chunk:
                break
            sys.stderr.write( chunk )
        sys.exit( return_code )
    tmp_stderr.close()
    os.remove( temp_fasta.name )
    return dict( value=database_id, name=database_name, db_path="%s.dmnd" % fasta_base_filename )


def _create_symlink( input_filename, target_directory, database_id, database_name ):
    fasta_base_filename = "%s.fa" % database_id
    fasta_filename = os.path.join( target_directory, fasta_base_filename )
    os.symlink( input_filename, fasta_filename )
    return dict( value=database_id, name=database_name, db_path=fasta_base_filename )


REFERENCE_SOURCE_TO_DOWNLOAD = dict( ncbi=download_from_ncbi, url=download_from_url, history=download_from_history, directory=copy_from_directory )

def main():
    #Parse Command Line
    parser = optparse.OptionParser()
    parser.add_option( '-d', '--dbkey_description', dest='dbkey_description', action='store', type="string", default=None, help='dbkey_description' )
    (options, args) = parser.parse_args()
    
    filename = args[0]
    
    params = from_json_string( open( filename ).read() )
    target_directory = params[ 'output_data' ][0]['extra_files_path']
    os.mkdir( target_directory )
    data_manager_dict = {}

    database_id = params['param_dict']['database_id']
    database_name = params['param_dict']['database_name']

    #Fetch the FASTA
    REFERENCE_SOURCE_TO_DOWNLOAD[ params['param_dict']['reference_source']['reference_source_selector'] ]( data_manager_dict, params, target_directory, database_id, database_name )
    
    #save info to json file
    open( filename, 'wb' ).write( to_json_string( data_manager_dict ) )

if __name__ == "__main__":
    main()