| 0 | 1 #!/usr/bin/env python3 | 
|  | 2 import sys | 
|  | 3 import logging | 
|  | 4 import subprocess | 
|  | 5 import os | 
|  | 6 import tempfile | 
|  | 7 import config | 
|  | 8 import shutil | 
|  | 9 import itertools | 
|  | 10 import pickle | 
|  | 11 import shlex | 
|  | 12 from lib.parallel.parallel import parallel2 as parallel | 
|  | 13 from lib.parallel.parallel import get_max_proc | 
|  | 14 | 
|  | 15 REQUIRED_VERSION = (3, 4) | 
|  | 16 MAX_BUFFER_SIZE = 100000 | 
|  | 17 if sys.version_info < REQUIRED_VERSION: | 
|  | 18     raise Exception("\n\npython 3.4 or higher is required!\n") | 
|  | 19 LOGGER = logging.getLogger(__name__) | 
|  | 20 MAX_FASTA_IN_DIRECTORY = 1000 | 
|  | 21 | 
|  | 22 def assembly(sequences, hitsort, clusters_info, assembly_dir, | 
|  | 23              contigs_file, min_size_of_cluster_for_assembly = 0): | 
|  | 24     ''' | 
|  | 25     Runs assembly on sequences (SequenceSet). Assembly is | 
|  | 26     performed on each cluster separatelly, clusters are taken | 
|  | 27     from hitsort(Graph) | 
|  | 28     Cluster_listing - list of Clusters | 
|  | 29     if cluster.tandem_rank is 1 or 2 - no assembly is performed!! | 
|  | 30     ''' | 
|  | 31 | 
|  | 32     # iterate over large clusters, assembly is performed on sequences stored in | 
|  | 33     # cluster_little[index].fasta_file_full - for annotated clusters | 
|  | 34     # sequences of small clusters are retrieved from database | 
|  | 35     fasta_seqs = [ | 
|  | 36         i.fasta_file_full | 
|  | 37         for i in clusters_info | 
|  | 38         if not i.tandem_rank in config.SKIP_CAP3_ASSEMBLY_TANDEM_RANKS | 
|  | 39     ] | 
|  | 40     prefixes = ["CL{}".format(i.index) | 
|  | 41                 for i in clusters_info | 
|  | 42                 if not i.tandem_rank in config.SKIP_CAP3_ASSEMBLY_TANDEM_RANKS] | 
|  | 43     LOGGER.info("Number of clusters for assembly: {}".format(hitsort.number_of_clusters)) | 
|  | 44     LOGGER.info("Assembling large clusters") | 
|  | 45     assembly_files = parallel(cap3worker, fasta_seqs, prefixes) | 
|  | 46     LOGGER.info("Large clusters assembled") | 
|  | 47     # some clusters - tanem rank 1 assembled | 
|  | 48     j = 0 | 
|  | 49     for cl in clusters_info: | 
|  | 50         if cl.tandem_rank in config.SKIP_CAP3_ASSEMBLY_TANDEM_RANKS: | 
|  | 51             cl.assembly_files = {i: None for i in config.CAP3_FILES_MAPPING} | 
|  | 52             consensus_file = cl.dir + "/tarean_contigs.fasta" | 
|  | 53             cl.assembly_files["{}.{}.contigs"] = consensus_file | 
|  | 54             with open(cl.dir_tarean + "/tarean_contigs.fasta", | 
|  | 55                       'r') as fin, open(consensus_file, 'w') as fout: | 
|  | 56                 for line in fin: | 
|  | 57                     if line[0] == ">": | 
|  | 58                         line = ">CL{}Contig{}".format(cl.index, line[1:]) | 
|  | 59                     fout.write(line) | 
|  | 60         else: | 
|  | 61             cl.assembly_files = assembly_files[j] | 
|  | 62             j += 1 | 
|  | 63     # assembly of small clusters: | 
|  | 64     # connection to files were results will be concatenated | 
|  | 65     LOGGER.info("Assembly of small cluster - making tmp files") | 
|  | 66 | 
|  | 67     tmp_dir_root = tempfile.mkdtemp() | 
|  | 68     tmp_subdir = tempfile.mkdtemp(dir=tmp_dir_root) | 
|  | 69     nproc = get_max_proc() | 
|  | 70     prefixes = [[] for i in range(nproc)] | 
|  | 71     tmp_seq_files = [[] for i in range(nproc)] | 
|  | 72 | 
|  | 73     LOGGER.info("Assembly of small clusters - saving small cluster to tmp files") | 
|  | 74     seq_dictionary = sequences.toDict() | 
|  | 75     fasta_count = 0 | 
|  | 76     chunk_counter = itertools.cycle(range(nproc)) | 
|  | 77     for index in range(len(clusters_info) + 1, hitsort.number_of_clusters): | 
|  | 78         chunk = next(chunk_counter) | 
|  | 79         ids = hitsort.get_cluster_reads(index) | 
|  | 80         if len(ids) < min_size_of_cluster_for_assembly: | 
|  | 81             break | 
|  | 82         prefixes[chunk].append("CL{}".format(index)) | 
|  | 83         fasta_count += 1 | 
|  | 84         if fasta_count > MAX_FASTA_IN_DIRECTORY: | 
|  | 85             # create new subdir to keep number of files in directory low | 
|  | 86             fasta_count = 1 | 
|  | 87             tmp_subdir = tempfile.mkdtemp(dir=tmp_dir_root) | 
|  | 88         fasta_file_name = "{}/{}".format(tmp_subdir, index) | 
|  | 89         write_seqDict_to_fasta(file_name=fasta_file_name, sequences=seq_dictionary, subset=ids) | 
|  | 90         tmp_seq_files[chunk].append(fasta_file_name) | 
|  | 91     del seq_dictionary | 
|  | 92     LOGGER.info("Assembly of small clusters running") | 
|  | 93     pickled_fparts_small_contigs = parallel(cap3worker_multiple, tmp_seq_files, prefixes) | 
|  | 94     LOGGER.info("Assembly of small clusters done") | 
|  | 95     # append to all results | 
|  | 96     LOGGER.info("Assembly of small clusters - appending results") | 
|  | 97     all_con = {} | 
|  | 98     for fkey in config.CAP3_FILES_MAPPING: | 
|  | 99         if config.CAP3_FILES_MAPPING[fkey]: | 
|  | 100             all_con[fkey] = open("{}/{}".format( | 
|  | 101                 assembly_dir, config.CAP3_FILES_MAPPING[fkey]), 'w') | 
|  | 102         else: | 
|  | 103             all_con[fkey] = None | 
|  | 104     for pickled_fparts_multiple in pickled_fparts_small_contigs: | 
|  | 105         with open(pickled_fparts_multiple, 'rb') as fp: | 
|  | 106             fparts_multiple = pickle.load(fp) | 
|  | 107         os.unlink(pickled_fparts_multiple) | 
|  | 108         for fparts in fparts_multiple: | 
|  | 109             for fkey in fparts: | 
|  | 110                 if all_con[fkey]: | 
|  | 111                     with open(fparts[fkey]) as f: | 
|  | 112                         all_con[fkey].write(f.read()) | 
|  | 113                     os.unlink(fparts[fkey]) | 
|  | 114                 else: | 
|  | 115                     os.unlink(fparts[fkey]) | 
|  | 116 | 
|  | 117     for fkey in all_con: | 
|  | 118         if all_con[fkey]: | 
|  | 119             all_con[fkey].close() | 
|  | 120     LOGGER.info("Assembling - copying files to final location") | 
|  | 121     # copy all contigs to one location: - contigs_file | 
|  | 122     with open(contigs_file, 'w') as fout: | 
|  | 123         for cl in clusters_info: | 
|  | 124             # append contig from large clusters | 
|  | 125             with open(cl.assembly_files["{}.{}.contigs"], 'r') as fin: | 
|  | 126                 for i in fin: | 
|  | 127                     fout.write(i) | 
|  | 128         # append contigs from small clusters | 
|  | 129         small_aln_file = "{}/{}".format( | 
|  | 130             assembly_dir, config.CAP3_FILES_MAPPING['{}.{}.aln']) | 
|  | 131         # calculate profile on aln file of small clusters | 
|  | 132         file_base_name = small_aln_file[:-4] | 
|  | 133         os.system("align_parsing.pl -i {fn} -o {out}.info.fasta -p {out}.profile 2>&1".format(fn=small_aln_file, out=file_base_name)) | 
|  | 134         os.system("select_and_sort_contigs.pl {fn}.info.fasta 5 2>&1".format(fn=file_base_name)) | 
|  | 135         small_contig_file = file_base_name + ".info.fasta" | 
|  | 136         with open(small_contig_file, 'r') as fin: | 
|  | 137             for i in fin: | 
|  | 138                 fout.write(i) | 
|  | 139     shutil.rmtree(tmp_dir_root) | 
|  | 140 | 
|  | 141 def write_seqDict_to_fasta(file_name, sequences, subset): | 
|  | 142     with open(file_name, 'w') as f: | 
|  | 143         for i in subset: | 
|  | 144             f.write(">{}\n{}\n".format(i, sequences[i])) | 
|  | 145 | 
|  | 146 | 
|  | 147 | 
|  | 148 | 
|  | 149 def cap3worker(seqfile, prefix="cap", cap3args=" -p 80 -o 40 "): | 
|  | 150     prefix2 = "cap" | 
|  | 151     cmd = "cap3 " + seqfile + cap3args + " -x " + prefix2 | 
|  | 152     with open(seqfile + "." + prefix2 + ".aln", "w") as aln: | 
|  | 153         subprocess.check_call(shlex.split(cmd), shell=False, stdout=aln) | 
|  | 154     # this generate three files | 
|  | 155     files_dict = {} | 
|  | 156     for fkey in config.CAP3_FILENAMES: | 
|  | 157         fn = fkey.format(seqfile, prefix2) | 
|  | 158         fn_tmp = "{}.tmp".format(fn) | 
|  | 159         files_dict[fkey] = fn | 
|  | 160         if config.CAP3_PATTERNS_REPLACE[fkey]: | 
|  | 161             pattern, replacement = config.CAP3_PATTERNS_REPLACE[fkey] | 
|  | 162             with open(fn, "r") as con_in, open(fn_tmp, 'w') as con_out: | 
|  | 163                 for line in con_in: | 
|  | 164                     con_out.write(line.replace(pattern, replacement.format( | 
|  | 165                         prefix))) | 
|  | 166             os.rename(fn_tmp, fn) | 
|  | 167 | 
|  | 168     # make new meaningful names here | 
|  | 169 | 
|  | 170     for fkey in config.CAP3_FILES_GOODNAMES: | 
|  | 171         config.CAP3_FILES_GOODNAMES[fkey] | 
|  | 172         fn_goodname = os.path.dirname(files_dict[fkey]) + "/" + config.CAP3_FILES_GOODNAMES[fkey] | 
|  | 173         os.rename(files_dict[fkey], fn_goodname) | 
|  | 174         files_dict[fkey] = fn_goodname | 
|  | 175 | 
|  | 176     aln_file = files_dict["{}.{}.aln"] | 
|  | 177     file_base_name = aln_file[:-4] | 
|  | 178     os.system("align_parsing.pl -i {fn} -o {out}.info.fasta -p {out}.profile 2>&1".format(fn=aln_file, out=file_base_name)) | 
|  | 179     os.system("select_and_sort_contigs.pl {fn}.info.fasta 5".format(fn=file_base_name)) | 
|  | 180     # TODO -add new file to files_dict | 
|  | 181     # replace simple fasta with info.fasta | 
|  | 182     files_dict["{}.{}.contigs"] = file_base_name + ".info.fasta" | 
|  | 183     return files_dict | 
|  | 184 | 
|  | 185 def cap3worker_multiple(many_seqfile, many_prefixes, cap3args=" -p 80 -o 40 "): | 
|  | 186     ''' | 
|  | 187     purpose of this script is to run multiple assemblies within single process | 
|  | 188     avoiding running high number of short parallel subprocesses | 
|  | 189     As starting subprocess for each cap3 assembly was very ineffective, | 
|  | 190     all ap3 commands are written to single file and run using single subprocess | 
|  | 191     command - | 
|  | 192     ''' | 
|  | 193     cmd_file = tempfile.NamedTemporaryFile(mode="w",delete=False).name | 
|  | 194     with open(cmd_file,'w') as cmdf: | 
|  | 195         for seqfile, prefix in zip(many_seqfile, many_prefixes): | 
|  | 196             cmd = "cap3 " + seqfile + cap3args + " -x " + prefix + " > " + seqfile + "." + prefix + ".aln\n" | 
|  | 197             cmdf.write(cmd) | 
|  | 198     os.system("sh "+ cmd_file) | 
|  | 199     # collect results: | 
|  | 200     files_dict_many = [] | 
|  | 201     for seqfile, prefix in zip(many_seqfile, many_prefixes): | 
|  | 202         files_dict = {} | 
|  | 203         for fkey in config.CAP3_FILENAMES: | 
|  | 204             fn = fkey.format(seqfile, prefix) | 
|  | 205             fn_tmp = "{}.tmp".format(fn) | 
|  | 206             files_dict[fkey] = fn | 
|  | 207             if config.CAP3_PATTERNS_REPLACE[fkey]: | 
|  | 208                 pattern, replacement = config.CAP3_PATTERNS_REPLACE[fkey] | 
|  | 209                 with open(fn, "r") as con_in, open(fn_tmp, 'w') as con_out: | 
|  | 210                     for line in con_in: | 
|  | 211                         con_out.write(line.replace(pattern, replacement.format( | 
|  | 212                             prefix))) | 
|  | 213                 os.rename(fn_tmp, fn) | 
|  | 214         files_dict_many.append(files_dict) | 
|  | 215     # this is too large to be return directly - use picking | 
|  | 216     f = tempfile.NamedTemporaryFile(delete=False) | 
|  | 217     os.unlink(cmd_file) | 
|  | 218     pickle.dump(files_dict_many,file=f) | 
|  | 219     return f.name | 
|  | 220 | 
|  | 221 |