| 
0
 | 
     1 #!/usr/bin/env python3
 | 
| 
 | 
     2 import sys
 | 
| 
 | 
     3 import logging
 | 
| 
 | 
     4 import subprocess
 | 
| 
 | 
     5 import os
 | 
| 
 | 
     6 import tempfile
 | 
| 
 | 
     7 import config
 | 
| 
 | 
     8 import shutil
 | 
| 
 | 
     9 import itertools
 | 
| 
 | 
    10 import pickle
 | 
| 
 | 
    11 import shlex
 | 
| 
 | 
    12 from lib.parallel.parallel import parallel2 as parallel
 | 
| 
 | 
    13 from lib.parallel.parallel import get_max_proc
 | 
| 
 | 
    14 
 | 
| 
 | 
    15 REQUIRED_VERSION = (3, 4)
 | 
| 
 | 
    16 MAX_BUFFER_SIZE = 100000
 | 
| 
 | 
    17 if sys.version_info < REQUIRED_VERSION:
 | 
| 
 | 
    18     raise Exception("\n\npython 3.4 or higher is required!\n")
 | 
| 
 | 
    19 LOGGER = logging.getLogger(__name__)
 | 
| 
 | 
    20 MAX_FASTA_IN_DIRECTORY = 1000
 | 
| 
 | 
    21 
 | 
| 
 | 
    22 def assembly(sequences, hitsort, clusters_info, assembly_dir,
 | 
| 
 | 
    23              contigs_file, min_size_of_cluster_for_assembly = 0):
 | 
| 
 | 
    24     '''
 | 
| 
 | 
    25     Runs assembly on sequences (SequenceSet). Assembly is
 | 
| 
 | 
    26     performed on each cluster separatelly, clusters are taken
 | 
| 
 | 
    27     from hitsort(Graph)
 | 
| 
 | 
    28     Cluster_listing - list of Clusters
 | 
| 
 | 
    29     if cluster.tandem_rank is 1 or 2 - no assembly is performed!!
 | 
| 
 | 
    30     '''
 | 
| 
 | 
    31 
 | 
| 
 | 
    32     # iterate over large clusters, assembly is performed on sequences stored in
 | 
| 
 | 
    33     # cluster_little[index].fasta_file_full - for annotated clusters
 | 
| 
 | 
    34     # sequences of small clusters are retrieved from database
 | 
| 
 | 
    35     fasta_seqs = [
 | 
| 
 | 
    36         i.fasta_file_full
 | 
| 
 | 
    37         for i in clusters_info
 | 
| 
 | 
    38         if not i.tandem_rank in config.SKIP_CAP3_ASSEMBLY_TANDEM_RANKS
 | 
| 
 | 
    39     ]
 | 
| 
 | 
    40     prefixes = ["CL{}".format(i.index)
 | 
| 
 | 
    41                 for i in clusters_info
 | 
| 
 | 
    42                 if not i.tandem_rank in config.SKIP_CAP3_ASSEMBLY_TANDEM_RANKS]
 | 
| 
 | 
    43     LOGGER.info("Number of clusters for assembly: {}".format(hitsort.number_of_clusters))
 | 
| 
 | 
    44     LOGGER.info("Assembling large clusters")
 | 
| 
 | 
    45     assembly_files = parallel(cap3worker, fasta_seqs, prefixes)
 | 
| 
 | 
    46     LOGGER.info("Large clusters assembled")
 | 
| 
 | 
    47     # some clusters - tanem rank 1 assembled
 | 
| 
 | 
    48     j = 0
 | 
| 
 | 
    49     for cl in clusters_info:
 | 
| 
 | 
    50         if cl.tandem_rank in config.SKIP_CAP3_ASSEMBLY_TANDEM_RANKS:
 | 
| 
 | 
    51             cl.assembly_files = {i: None for i in config.CAP3_FILES_MAPPING}
 | 
| 
 | 
    52             consensus_file = cl.dir + "/tarean_contigs.fasta"
 | 
| 
 | 
    53             cl.assembly_files["{}.{}.contigs"] = consensus_file
 | 
| 
 | 
    54             with open(cl.dir_tarean + "/tarean_contigs.fasta",
 | 
| 
 | 
    55                       'r') as fin, open(consensus_file, 'w') as fout:
 | 
| 
 | 
    56                 for line in fin:
 | 
| 
 | 
    57                     if line[0] == ">":
 | 
| 
 | 
    58                         line = ">CL{}Contig{}".format(cl.index, line[1:])
 | 
| 
 | 
    59                     fout.write(line)
 | 
| 
 | 
    60         else:
 | 
| 
 | 
    61             cl.assembly_files = assembly_files[j]
 | 
| 
 | 
    62             j += 1
 | 
| 
 | 
    63     # assembly of small clusters:
 | 
| 
 | 
    64     # connection to files were results will be concatenated
 | 
| 
 | 
    65     LOGGER.info("Assembly of small cluster - making tmp files")
 | 
| 
 | 
    66 
 | 
| 
 | 
    67     tmp_dir_root = tempfile.mkdtemp()
 | 
| 
 | 
    68     tmp_subdir = tempfile.mkdtemp(dir=tmp_dir_root)
 | 
| 
 | 
    69     nproc = get_max_proc()
 | 
| 
 | 
    70     prefixes = [[] for i in range(nproc)]
 | 
| 
 | 
    71     tmp_seq_files = [[] for i in range(nproc)]
 | 
| 
 | 
    72 
 | 
| 
 | 
    73     LOGGER.info("Assembly of small clusters - saving small cluster to tmp files")
 | 
| 
 | 
    74     seq_dictionary = sequences.toDict()
 | 
| 
 | 
    75     fasta_count = 0
 | 
| 
 | 
    76     chunk_counter = itertools.cycle(range(nproc))
 | 
| 
 | 
    77     for index in range(len(clusters_info) + 1, hitsort.number_of_clusters):
 | 
| 
 | 
    78         chunk = next(chunk_counter)
 | 
| 
 | 
    79         ids = hitsort.get_cluster_reads(index)
 | 
| 
 | 
    80         if len(ids) < min_size_of_cluster_for_assembly:
 | 
| 
 | 
    81             break
 | 
| 
 | 
    82         prefixes[chunk].append("CL{}".format(index))
 | 
| 
 | 
    83         fasta_count += 1
 | 
| 
 | 
    84         if fasta_count > MAX_FASTA_IN_DIRECTORY:
 | 
| 
 | 
    85             # create new subdir to keep number of files in directory low
 | 
| 
 | 
    86             fasta_count = 1
 | 
| 
 | 
    87             tmp_subdir = tempfile.mkdtemp(dir=tmp_dir_root)
 | 
| 
 | 
    88         fasta_file_name = "{}/{}".format(tmp_subdir, index)
 | 
| 
 | 
    89         write_seqDict_to_fasta(file_name=fasta_file_name, sequences=seq_dictionary, subset=ids)
 | 
| 
 | 
    90         tmp_seq_files[chunk].append(fasta_file_name)
 | 
| 
 | 
    91     del seq_dictionary
 | 
| 
 | 
    92     LOGGER.info("Assembly of small clusters running")
 | 
| 
 | 
    93     pickled_fparts_small_contigs = parallel(cap3worker_multiple, tmp_seq_files, prefixes)
 | 
| 
 | 
    94     LOGGER.info("Assembly of small clusters done")
 | 
| 
 | 
    95     # append to all results
 | 
| 
 | 
    96     LOGGER.info("Assembly of small clusters - appending results")
 | 
| 
 | 
    97     all_con = {}
 | 
| 
 | 
    98     for fkey in config.CAP3_FILES_MAPPING:
 | 
| 
 | 
    99         if config.CAP3_FILES_MAPPING[fkey]:
 | 
| 
 | 
   100             all_con[fkey] = open("{}/{}".format(
 | 
| 
 | 
   101                 assembly_dir, config.CAP3_FILES_MAPPING[fkey]), 'w')
 | 
| 
 | 
   102         else:
 | 
| 
 | 
   103             all_con[fkey] = None
 | 
| 
 | 
   104     for pickled_fparts_multiple in pickled_fparts_small_contigs:
 | 
| 
 | 
   105         with open(pickled_fparts_multiple, 'rb') as fp:
 | 
| 
 | 
   106             fparts_multiple = pickle.load(fp)
 | 
| 
 | 
   107         os.unlink(pickled_fparts_multiple)
 | 
| 
 | 
   108         for fparts in fparts_multiple:
 | 
| 
 | 
   109             for fkey in fparts:
 | 
| 
 | 
   110                 if all_con[fkey]:
 | 
| 
 | 
   111                     with open(fparts[fkey]) as f:
 | 
| 
 | 
   112                         all_con[fkey].write(f.read())
 | 
| 
 | 
   113                     os.unlink(fparts[fkey])
 | 
| 
 | 
   114                 else:
 | 
| 
 | 
   115                     os.unlink(fparts[fkey])
 | 
| 
 | 
   116     
 | 
| 
 | 
   117     for fkey in all_con:
 | 
| 
 | 
   118         if all_con[fkey]:
 | 
| 
 | 
   119             all_con[fkey].close()
 | 
| 
 | 
   120     LOGGER.info("Assembling - copying files to final location")
 | 
| 
 | 
   121     # copy all contigs to one location: - contigs_file
 | 
| 
 | 
   122     with open(contigs_file, 'w') as fout:
 | 
| 
 | 
   123         for cl in clusters_info:
 | 
| 
 | 
   124             # append contig from large clusters
 | 
| 
 | 
   125             with open(cl.assembly_files["{}.{}.contigs"], 'r') as fin:
 | 
| 
 | 
   126                 for i in fin:
 | 
| 
 | 
   127                     fout.write(i)
 | 
| 
 | 
   128         # append contigs from small clusters
 | 
| 
 | 
   129         small_aln_file = "{}/{}".format(
 | 
| 
 | 
   130             assembly_dir, config.CAP3_FILES_MAPPING['{}.{}.aln'])
 | 
| 
 | 
   131         # calculate profile on aln file of small clusters
 | 
| 
 | 
   132         file_base_name = small_aln_file[:-4]
 | 
| 
 | 
   133         os.system("align_parsing.pl -i {fn} -o {out}.info.fasta -p {out}.profile 2>&1".format(fn=small_aln_file, out=file_base_name))
 | 
| 
 | 
   134         os.system("select_and_sort_contigs.pl {fn}.info.fasta 5 2>&1".format(fn=file_base_name))
 | 
| 
 | 
   135         small_contig_file = file_base_name + ".info.fasta"
 | 
| 
 | 
   136         with open(small_contig_file, 'r') as fin:
 | 
| 
 | 
   137             for i in fin:
 | 
| 
 | 
   138                 fout.write(i)
 | 
| 
 | 
   139     shutil.rmtree(tmp_dir_root)
 | 
| 
 | 
   140 
 | 
| 
 | 
   141 def write_seqDict_to_fasta(file_name, sequences, subset):
 | 
| 
 | 
   142     with open(file_name, 'w') as f:
 | 
| 
 | 
   143         for i in subset:
 | 
| 
 | 
   144             f.write(">{}\n{}\n".format(i, sequences[i]))
 | 
| 
 | 
   145 
 | 
| 
 | 
   146 
 | 
| 
 | 
   147 
 | 
| 
 | 
   148 
 | 
| 
 | 
   149 def cap3worker(seqfile, prefix="cap", cap3args=" -p 80 -o 40 "):
 | 
| 
 | 
   150     prefix2 = "cap"
 | 
| 
 | 
   151     cmd = "cap3 " + seqfile + cap3args + " -x " + prefix2
 | 
| 
 | 
   152     with open(seqfile + "." + prefix2 + ".aln", "w") as aln:
 | 
| 
 | 
   153         subprocess.check_call(shlex.split(cmd), shell=False, stdout=aln)
 | 
| 
 | 
   154     # this generate three files
 | 
| 
 | 
   155     files_dict = {}
 | 
| 
 | 
   156     for fkey in config.CAP3_FILENAMES:
 | 
| 
 | 
   157         fn = fkey.format(seqfile, prefix2)
 | 
| 
 | 
   158         fn_tmp = "{}.tmp".format(fn)
 | 
| 
 | 
   159         files_dict[fkey] = fn
 | 
| 
 | 
   160         if config.CAP3_PATTERNS_REPLACE[fkey]:
 | 
| 
 | 
   161             pattern, replacement = config.CAP3_PATTERNS_REPLACE[fkey]
 | 
| 
 | 
   162             with open(fn, "r") as con_in, open(fn_tmp, 'w') as con_out:
 | 
| 
 | 
   163                 for line in con_in:
 | 
| 
 | 
   164                     con_out.write(line.replace(pattern, replacement.format(
 | 
| 
 | 
   165                         prefix)))
 | 
| 
 | 
   166             os.rename(fn_tmp, fn)
 | 
| 
 | 
   167     
 | 
| 
 | 
   168     # make new meaningful names here
 | 
| 
 | 
   169 
 | 
| 
 | 
   170     for fkey in config.CAP3_FILES_GOODNAMES:
 | 
| 
 | 
   171         config.CAP3_FILES_GOODNAMES[fkey]
 | 
| 
 | 
   172         fn_goodname = os.path.dirname(files_dict[fkey]) + "/" + config.CAP3_FILES_GOODNAMES[fkey]
 | 
| 
 | 
   173         os.rename(files_dict[fkey], fn_goodname)
 | 
| 
 | 
   174         files_dict[fkey] = fn_goodname
 | 
| 
 | 
   175 
 | 
| 
 | 
   176     aln_file = files_dict["{}.{}.aln"]
 | 
| 
 | 
   177     file_base_name = aln_file[:-4]
 | 
| 
 | 
   178     os.system("align_parsing.pl -i {fn} -o {out}.info.fasta -p {out}.profile 2>&1".format(fn=aln_file, out=file_base_name))
 | 
| 
 | 
   179     os.system("select_and_sort_contigs.pl {fn}.info.fasta 5".format(fn=file_base_name))
 | 
| 
 | 
   180     # TODO -add new file to files_dict
 | 
| 
 | 
   181     # replace simple fasta with info.fasta
 | 
| 
 | 
   182     files_dict["{}.{}.contigs"] = file_base_name + ".info.fasta"
 | 
| 
 | 
   183     return files_dict
 | 
| 
 | 
   184 
 | 
| 
 | 
   185 def cap3worker_multiple(many_seqfile, many_prefixes, cap3args=" -p 80 -o 40 "):
 | 
| 
 | 
   186     '''
 | 
| 
 | 
   187     purpose of this script is to run multiple assemblies within single process
 | 
| 
 | 
   188     avoiding running high number of short parallel subprocesses
 | 
| 
 | 
   189     As starting subprocess for each cap3 assembly was very ineffective,
 | 
| 
 | 
   190     all ap3 commands are written to single file and run using single subprocess
 | 
| 
 | 
   191     command -
 | 
| 
 | 
   192     '''
 | 
| 
 | 
   193     cmd_file = tempfile.NamedTemporaryFile(mode="w",delete=False).name
 | 
| 
 | 
   194     with open(cmd_file,'w') as cmdf:
 | 
| 
 | 
   195         for seqfile, prefix in zip(many_seqfile, many_prefixes):
 | 
| 
 | 
   196             cmd = "cap3 " + seqfile + cap3args + " -x " + prefix + " > " + seqfile + "." + prefix + ".aln\n"
 | 
| 
 | 
   197             cmdf.write(cmd)
 | 
| 
 | 
   198     os.system("sh "+ cmd_file)
 | 
| 
 | 
   199     # collect results:
 | 
| 
 | 
   200     files_dict_many = []
 | 
| 
 | 
   201     for seqfile, prefix in zip(many_seqfile, many_prefixes):
 | 
| 
 | 
   202         files_dict = {}
 | 
| 
 | 
   203         for fkey in config.CAP3_FILENAMES:
 | 
| 
 | 
   204             fn = fkey.format(seqfile, prefix)
 | 
| 
 | 
   205             fn_tmp = "{}.tmp".format(fn)
 | 
| 
 | 
   206             files_dict[fkey] = fn
 | 
| 
 | 
   207             if config.CAP3_PATTERNS_REPLACE[fkey]:
 | 
| 
 | 
   208                 pattern, replacement = config.CAP3_PATTERNS_REPLACE[fkey]
 | 
| 
 | 
   209                 with open(fn, "r") as con_in, open(fn_tmp, 'w') as con_out:
 | 
| 
 | 
   210                     for line in con_in:
 | 
| 
 | 
   211                         con_out.write(line.replace(pattern, replacement.format(
 | 
| 
 | 
   212                             prefix)))
 | 
| 
 | 
   213                 os.rename(fn_tmp, fn)
 | 
| 
 | 
   214         files_dict_many.append(files_dict)
 | 
| 
 | 
   215     # this is too large to be return directly - use picking
 | 
| 
 | 
   216     f = tempfile.NamedTemporaryFile(delete=False)
 | 
| 
 | 
   217     os.unlink(cmd_file)
 | 
| 
 | 
   218     pickle.dump(files_dict_many,file=f)
 | 
| 
 | 
   219     return f.name
 | 
| 
 | 
   220     
 | 
| 
 | 
   221 
 |