annotate change_o/DefineClones.py @ 0:8a5a2abbb870 draft default tip

Uploaded
author davidvanzessen
date Mon, 29 Aug 2016 05:36:10 -0400
parents
children
Ignore whitespace changes - Everywhere: Within whitespace: At end of lines:
rev   line source
0
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
1 #!/usr/bin/env python3
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
2 """
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
3 Assign Ig sequences into clones
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
4 """
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
5 # Info
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
6 __author__ = 'Namita Gupta, Jason Anthony Vander Heiden, Gur Yaari, Mohamed Uduman'
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
7 from changeo import __version__, __date__
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
8
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
9 # Imports
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
10 import os
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
11 import re
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
12 import sys
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
13 import numpy as np
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
14 from argparse import ArgumentParser
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
15 from collections import OrderedDict
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
16 from itertools import chain
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
17 from textwrap import dedent
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
18 from time import time
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
19 from Bio import pairwise2
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
20 from Bio.Seq import translate
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
21
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
22 # Presto and changeo imports
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
23 from presto.Defaults import default_out_args
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
24 from presto.IO import getFileType, getOutputHandle, printLog, printProgress
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
25 from presto.Multiprocessing import manageProcesses
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
26 from presto.Sequence import getDNAScoreDict
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
27 from changeo.Commandline import CommonHelpFormatter, getCommonArgParser, parseCommonArgs
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
28 from changeo.Distance import getDNADistMatrix, getAADistMatrix, \
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
29 hs1f_model, m1n_model, hs5f_model, \
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
30 calcDistances, formClusters
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
31 from changeo.IO import getDbWriter, readDbFile, countDbFile
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
32 from changeo.Multiprocessing import DbData, DbResult
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
33
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
34 # Defaults
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
35 default_translate = False
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
36 default_distance = 0.0
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
37 default_bygroup_model = 'hs1f'
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
38 default_hclust_model = 'chen2010'
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
39 default_seq_field = 'JUNCTION'
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
40 default_norm = 'len'
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
41 default_sym = 'avg'
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
42 default_linkage = 'single'
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
43
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
44 # TODO: should be in Distance, but need to be after function definitions
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
45 # Amino acid Hamming distance
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
46 aa_model = getAADistMatrix(mask_dist=1, gap_dist=0)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
47
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
48 # DNA Hamming distance
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
49 ham_model = getDNADistMatrix(mask_dist=0, gap_dist=0)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
50
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
51
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
52 # TODO: this function is an abstraction to facilitate later cleanup
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
53 def getModelMatrix(model):
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
54 """
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
55 Simple wrapper to get distance matrix from model name
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
56
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
57 Arguments:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
58 model = model name
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
59
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
60 Return:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
61 a pandas.DataFrame containing the character distance matrix
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
62 """
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
63 if model == 'aa':
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
64 return(aa_model)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
65 elif model == 'ham':
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
66 return(ham_model)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
67 elif model == 'm1n':
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
68 return(m1n_model)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
69 elif model == 'hs1f':
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
70 return(hs1f_model)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
71 elif model == 'hs5f':
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
72 return(hs5f_model)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
73 else:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
74 sys.stderr.write('Unrecognized distance model: %s.\n' % model)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
75
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
76
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
77 def indexJunctions(db_iter, fields=None, mode='gene', action='first'):
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
78 """
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
79 Identifies preclonal groups by V, J and junction length
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
80
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
81 Arguments:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
82 db_iter = an iterator of IgRecords defined by readDbFile
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
83 fields = additional annotation fields to use to group preclones;
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
84 if None use only V, J and junction length
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
85 mode = specificity of alignment call to use for assigning preclones;
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
86 one of ('allele', 'gene')
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
87 action = how to handle multiple value fields when assigning preclones;
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
88 one of ('first', 'set')
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
89
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
90 Returns:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
91 a dictionary of {(V, J, junction length):[IgRecords]}
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
92 """
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
93 # Define functions for grouping keys
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
94 if mode == 'allele' and fields is None:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
95 def _get_key(rec, act):
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
96 return (rec.getVAllele(act), rec.getJAllele(act),
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
97 None if rec.junction is None else len(rec.junction))
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
98 elif mode == 'gene' and fields is None:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
99 def _get_key(rec, act):
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
100 return (rec.getVGene(act), rec.getJGene(act),
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
101 None if rec.junction is None else len(rec.junction))
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
102 elif mode == 'allele' and fields is not None:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
103 def _get_key(rec, act):
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
104 vdj = [rec.getVAllele(act), rec.getJAllele(act),
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
105 None if rec.junction is None else len(rec.junction)]
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
106 ann = [rec.toDict().get(k, None) for k in fields]
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
107 return tuple(chain(vdj, ann))
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
108 elif mode == 'gene' and fields is not None:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
109 def _get_key(rec, act):
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
110 vdj = [rec.getVGene(act), rec.getJGene(act),
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
111 None if rec.junction is None else len(rec.junction)]
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
112 ann = [rec.toDict().get(k, None) for k in fields]
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
113 return tuple(chain(vdj, ann))
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
114
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
115 start_time = time()
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
116 clone_index = {}
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
117 rec_count = 0
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
118 for rec in db_iter:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
119 key = _get_key(rec, action)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
120
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
121 # Print progress
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
122 if rec_count == 0:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
123 print('PROGRESS> Grouping sequences')
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
124
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
125 printProgress(rec_count, step=1000, start_time=start_time)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
126 rec_count += 1
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
127
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
128 # Assigned passed preclone records to key and failed to index None
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
129 if all([k is not None and k != '' for k in key]):
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
130 #print key
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
131 # TODO: Has much slow. Should have less slow.
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
132 if action == 'set':
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
133
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
134 f_range = list(range(2, 3 + (len(fields) if fields else 0)))
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
135 vdj_range = list(range(2))
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
136
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
137 # Check for any keys that have matching columns and junction length and overlapping genes/alleles
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
138 to_remove = []
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
139 if len(clone_index) > (1 if None in clone_index else 0) and key not in clone_index:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
140 key = list(key)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
141 for k in clone_index:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
142 if k is not None and all([key[i] == k[i] for i in f_range]):
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
143 if all([not set(key[i]).isdisjoint(set(k[i])) for i in vdj_range]):
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
144 for i in vdj_range: key[i] = tuple(set(key[i]).union(set(k[i])))
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
145 to_remove.append(k)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
146
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
147 # Remove original keys, replace with union of all genes/alleles and append values to new key
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
148 val = [rec]
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
149 val += list(chain(*(clone_index.pop(k) for k in to_remove)))
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
150 clone_index[tuple(key)] = clone_index.get(tuple(key),[]) + val
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
151
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
152 elif action == 'first':
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
153 clone_index.setdefault(key, []).append(rec)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
154 else:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
155 clone_index.setdefault(None, []).append(rec)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
156
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
157 printProgress(rec_count, step=1000, start_time=start_time, end=True)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
158
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
159 return clone_index
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
160
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
161
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
162 def distanceClones(records, model=default_bygroup_model, distance=default_distance,
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
163 dist_mat=None, norm=default_norm, sym=default_sym,
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
164 linkage=default_linkage, seq_field=default_seq_field):
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
165 """
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
166 Separates a set of IgRecords into clones
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
167
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
168 Arguments:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
169 records = an iterator of IgRecords
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
170 model = substitution model used to calculate distance
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
171 distance = the distance threshold to assign clonal groups
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
172 dist_mat = pandas DataFrame of pairwise nucleotide or amino acid distances
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
173 norm = normalization method
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
174 sym = symmetry method
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
175 linkage = type of linkage
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
176 seq_field = sequence field used to calculate distance between records
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
177
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
178 Returns:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
179 a dictionary of lists defining {clone number: [IgRecords clonal group]}
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
180 """
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
181 # Get distance matrix if not provided
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
182 if dist_mat is None: dist_mat = getModelMatrix(model)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
183
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
184 # Determine length of n-mers
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
185 if model in ['hs1f', 'm1n', 'aa', 'ham']:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
186 nmer_len = 1
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
187 elif model in ['hs5f']:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
188 nmer_len = 5
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
189 else:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
190 sys.stderr.write('Unrecognized distance model: %s.\n' % model)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
191
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
192 # Define unique junction mapping
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
193 seq_map = {}
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
194 for ig in records:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
195 seq = ig.getSeqField(seq_field)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
196 # Check if sequence length is 0
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
197 if len(seq) == 0:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
198 return None
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
199
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
200 seq = re.sub('[\.-]','N', str(seq))
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
201 if model == 'aa': seq = translate(seq)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
202
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
203 seq_map.setdefault(seq, []).append(ig)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
204
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
205 # Process records
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
206 if len(seq_map) == 1:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
207 return {1:records}
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
208
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
209 # Define sequences
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
210 seqs = list(seq_map.keys())
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
211
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
212 # Calculate pairwise distance matrix
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
213 dists = calcDistances(seqs, nmer_len, dist_mat, norm, sym)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
214
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
215 # Perform hierarchical clustering
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
216 clusters = formClusters(dists, linkage, distance)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
217
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
218 # Turn clusters into clone dictionary
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
219 clone_dict = {}
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
220 for i, c in enumerate(clusters):
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
221 clone_dict.setdefault(c, []).extend(seq_map[seqs[i]])
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
222
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
223 return clone_dict
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
224
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
225
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
226 def distChen2010(records):
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
227 """
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
228 Calculate pairwise distances as defined in Chen 2010
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
229
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
230 Arguments:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
231 records = list of IgRecords where first is query to be compared to others in list
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
232
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
233 Returns:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
234 list of distances
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
235 """
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
236 # Pull out query sequence and V/J information
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
237 query = records.popitem(last=False)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
238 query_cdr3 = query.junction[3:-3]
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
239 query_v_allele = query.getVAllele()
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
240 query_v_gene = query.getVGene()
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
241 query_v_family = query.getVFamily()
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
242 query_j_allele = query.getJAllele()
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
243 query_j_gene = query.getJGene()
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
244 # Create alignment scoring dictionary
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
245 score_dict = getDNAScoreDict()
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
246
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
247 scores = [0]*len(records)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
248 for i in range(len(records)):
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
249 ld = pairwise2.align.globalds(query_cdr3, records[i].junction[3:-3],
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
250 score_dict, -1, -1, one_alignment_only=True)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
251 # Check V similarity
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
252 if records[i].getVAllele() == query_v_allele: ld += 0
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
253 elif records[i].getVGene() == query_v_gene: ld += 1
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
254 elif records[i].getVFamily() == query_v_family: ld += 3
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
255 else: ld += 5
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
256 # Check J similarity
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
257 if records[i].getJAllele() == query_j_allele: ld += 0
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
258 elif records[i].getJGene() == query_j_gene: ld += 1
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
259 else: ld += 3
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
260 # Divide by length
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
261 scores[i] = ld/max(len(records[i].junction[3:-3]), query_cdr3)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
262
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
263 return scores
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
264
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
265
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
266 def distAdemokun2011(records):
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
267 """
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
268 Calculate pairwise distances as defined in Ademokun 2011
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
269
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
270 Arguments:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
271 records = list of IgRecords where first is query to be compared to others in list
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
272
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
273 Returns:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
274 list of distances
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
275 """
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
276 # Pull out query sequence and V family information
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
277 query = records.popitem(last=False)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
278 query_cdr3 = query.junction[3:-3]
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
279 query_v_family = query.getVFamily()
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
280 # Create alignment scoring dictionary
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
281 score_dict = getDNAScoreDict()
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
282
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
283 scores = [0]*len(records)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
284 for i in range(len(records)):
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
285
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
286 if abs(len(query_cdr3) - len(records[i].junction[3:-3])) > 10:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
287 scores[i] = 1
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
288 elif query_v_family != records[i].getVFamily():
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
289 scores[i] = 1
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
290 else:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
291 ld = pairwise2.align.globalds(query_cdr3, records[i].junction[3:-3],
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
292 score_dict, -1, -1, one_alignment_only=True)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
293 scores[i] = ld/min(len(records[i].junction[3:-3]), query_cdr3)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
294
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
295 return scores
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
296
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
297
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
298 def hierClust(dist_mat, method='chen2010'):
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
299 """
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
300 Calculate hierarchical clustering
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
301
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
302 Arguments:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
303 dist_mat = square-formed distance matrix of pairwise CDR3 comparisons
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
304
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
305 Returns:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
306 list of cluster ids
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
307 """
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
308 if method == 'chen2010':
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
309 clusters = formClusters(dist_mat, 'average', 0.32)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
310 elif method == 'ademokun2011':
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
311 clusters = formClusters(dist_mat, 'complete', 0.25)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
312 else: clusters = np.ones(dist_mat.shape[0])
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
313
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
314 return clusters
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
315
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
316 # TODO: Merge duplicate feed, process and collect functions.
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
317 def feedQueue(alive, data_queue, db_file, group_func, group_args={}):
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
318 """
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
319 Feeds the data queue with Ig records
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
320
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
321 Arguments:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
322 alive = a multiprocessing.Value boolean controlling whether processing continues
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
323 if False exit process
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
324 data_queue = a multiprocessing.Queue to hold data for processing
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
325 db_file = the Ig record database file
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
326 group_func = the function to use for assigning preclones
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
327 group_args = a dictionary of arguments to pass to group_func
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
328
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
329 Returns:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
330 None
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
331 """
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
332 # Open input file and perform grouping
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
333 try:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
334 # Iterate over Ig records and assign groups
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
335 db_iter = readDbFile(db_file)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
336 clone_dict = group_func(db_iter, **group_args)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
337 except:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
338 #sys.stderr.write('Exception in feeder grouping step\n')
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
339 alive.value = False
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
340 raise
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
341
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
342 # Add groups to data queue
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
343 try:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
344 #print 'START FEED', alive.value
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
345 # Iterate over groups and feed data queue
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
346 clone_iter = iter(clone_dict.items())
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
347 while alive.value:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
348 # Get data from queue
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
349 if data_queue.full(): continue
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
350 else: data = next(clone_iter, None)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
351 # Exit upon reaching end of iterator
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
352 if data is None: break
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
353 #print "FEED", alive.value, k
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
354
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
355 # Feed queue
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
356 data_queue.put(DbData(*data))
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
357 else:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
358 sys.stderr.write('PID %s: Error in sibling process detected. Cleaning up.\n' \
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
359 % os.getpid())
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
360 return None
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
361 except:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
362 #sys.stderr.write('Exception in feeder queue feeding step\n')
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
363 alive.value = False
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
364 raise
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
365
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
366 return None
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
367
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
368
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
369 def feedQueueClust(alive, data_queue, db_file, group_func=None, group_args={}):
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
370 """
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
371 Feeds the data queue with Ig records
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
372
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
373 Arguments:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
374 alive = a multiprocessing.Value boolean controlling whether processing continues
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
375 if False exit process
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
376 data_queue = a multiprocessing.Queue to hold data for processing
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
377 db_file = the Ig record database file
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
378
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
379 Returns:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
380 None
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
381 """
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
382 # Open input file and perform grouping
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
383 try:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
384 # Iterate over Ig records and order by junction length
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
385 records = {}
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
386 db_iter = readDbFile(db_file)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
387 for rec in db_iter:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
388 records[rec.id] = rec
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
389 records = OrderedDict(sorted(list(records.items()), key=lambda i: i[1].junction_length))
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
390 dist_dict = {}
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
391 for __ in range(len(records)):
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
392 k,v = records.popitem(last=False)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
393 dist_dict[k] = [v].append(list(records.values()))
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
394 except:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
395 #sys.stderr.write('Exception in feeder grouping step\n')
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
396 alive.value = False
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
397 raise
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
398
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
399 # Add groups to data queue
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
400 try:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
401 # print 'START FEED', alive.value
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
402 # Iterate over groups and feed data queue
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
403 dist_iter = iter(dist_dict.items())
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
404 while alive.value:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
405 # Get data from queue
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
406 if data_queue.full(): continue
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
407 else: data = next(dist_iter, None)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
408 # Exit upon reaching end of iterator
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
409 if data is None: break
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
410 #print "FEED", alive.value, k
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
411
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
412 # Feed queue
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
413 data_queue.put(DbData(*data))
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
414 else:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
415 sys.stderr.write('PID %s: Error in sibling process detected. Cleaning up.\n' \
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
416 % os.getpid())
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
417 return None
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
418 except:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
419 #sys.stderr.write('Exception in feeder queue feeding step\n')
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
420 alive.value = False
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
421 raise
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
422
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
423 return None
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
424
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
425
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
426 def processQueue(alive, data_queue, result_queue, clone_func, clone_args):
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
427 """
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
428 Pulls from data queue, performs calculations, and feeds results queue
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
429
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
430 Arguments:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
431 alive = a multiprocessing.Value boolean controlling whether processing continues
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
432 if False exit process
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
433 data_queue = a multiprocessing.Queue holding data to process
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
434 result_queue = a multiprocessing.Queue to hold processed results
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
435 clone_func = the function to call for clonal assignment
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
436 clone_args = a dictionary of arguments to pass to clone_func
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
437
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
438 Returns:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
439 None
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
440 """
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
441 try:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
442 # Iterator over data queue until sentinel object reached
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
443 while alive.value:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
444 # Get data from queue
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
445 if data_queue.empty(): continue
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
446 else: data = data_queue.get()
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
447 # Exit upon reaching sentinel
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
448 if data is None: break
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
449
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
450 # Define result object for iteration and get data records
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
451 records = data.data
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
452 result = DbResult(data.id, records)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
453
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
454 # Check for invalid data (due to failed indexing) and add failed result
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
455 if not data:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
456 result_queue.put(result)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
457 continue
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
458
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
459 # Add V(D)J to log
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
460 result.log['ID'] = ','.join([str(x) for x in data.id])
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
461 result.log['VALLELE'] = ','.join(set([(r.getVAllele() or '') for r in records]))
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
462 result.log['DALLELE'] = ','.join(set([(r.getDAllele() or '') for r in records]))
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
463 result.log['JALLELE'] = ','.join(set([(r.getJAllele() or '') for r in records]))
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
464 result.log['JUNCLEN'] = ','.join(set([(str(len(r.junction)) or '0') for r in records]))
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
465 result.log['SEQUENCES'] = len(records)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
466
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
467 # Checking for preclone failure and assign clones
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
468 clones = clone_func(records, **clone_args) if data else None
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
469
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
470 # import cProfile
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
471 # prof = cProfile.Profile()
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
472 # clones = prof.runcall(clone_func, records, **clone_args)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
473 # prof.dump_stats('worker-%d.prof' % os.getpid())
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
474
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
475 if clones is not None:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
476 result.results = clones
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
477 result.valid = True
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
478 result.log['CLONES'] = len(clones)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
479 else:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
480 result.log['CLONES'] = 0
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
481
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
482 # Feed results to result queue
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
483 result_queue.put(result)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
484 else:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
485 sys.stderr.write('PID %s: Error in sibling process detected. Cleaning up.\n' \
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
486 % os.getpid())
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
487 return None
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
488 except:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
489 #sys.stderr.write('Exception in worker\n')
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
490 alive.value = False
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
491 raise
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
492
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
493 return None
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
494
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
495
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
496 def processQueueClust(alive, data_queue, result_queue, clone_func, clone_args):
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
497 """
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
498 Pulls from data queue, performs calculations, and feeds results queue
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
499
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
500 Arguments:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
501 alive = a multiprocessing.Value boolean controlling whether processing continues
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
502 if False exit process
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
503 data_queue = a multiprocessing.Queue holding data to process
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
504 result_queue = a multiprocessing.Queue to hold processed results
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
505 clone_func = the function to call for calculating pairwise distances between sequences
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
506 clone_args = a dictionary of arguments to pass to clone_func
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
507
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
508 Returns:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
509 None
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
510 """
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
511
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
512 try:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
513 # print 'START WORK', alive.value
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
514 # Iterator over data queue until sentinel object reached
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
515 while alive.value:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
516 # Get data from queue
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
517 if data_queue.empty(): continue
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
518 else: data = data_queue.get()
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
519 # Exit upon reaching sentinel
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
520 if data is None: break
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
521 # print "WORK", alive.value, data['id']
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
522
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
523 # Define result object for iteration and get data records
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
524 records = data.data
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
525 result = DbResult(data.id, records)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
526
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
527 # Create row of distance matrix and check for error
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
528 dist_row = clone_func(records, **clone_args) if data else None
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
529 if dist_row is not None:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
530 result.results = dist_row
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
531 result.valid = True
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
532
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
533 # Feed results to result queue
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
534 result_queue.put(result)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
535 else:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
536 sys.stderr.write('PID %s: Error in sibling process detected. Cleaning up.\n' \
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
537 % os.getpid())
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
538 return None
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
539 except:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
540 #sys.stderr.write('Exception in worker\n')
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
541 alive.value = False
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
542 raise
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
543
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
544 return None
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
545
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
546
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
547 def collectQueue(alive, result_queue, collect_queue, db_file, out_args, cluster_func=None, cluster_args={}):
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
548 """
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
549 Assembles results from a queue of individual sequence results and manages log/file I/O
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
550
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
551 Arguments:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
552 alive = a multiprocessing.Value boolean controlling whether processing continues
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
553 if False exit process
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
554 result_queue = a multiprocessing.Queue holding processQueue results
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
555 collect_queue = a multiprocessing.Queue to store collector return values
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
556 db_file = the input database file name
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
557 out_args = common output argument dictionary from parseCommonArgs
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
558 cluster_func = the function to call for carrying out clustering on distance matrix
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
559 cluster_args = a dictionary of arguments to pass to cluster_func
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
560
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
561 Returns:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
562 None
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
563 (adds 'log' and 'out_files' to collect_dict)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
564 """
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
565 # Open output files
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
566 try:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
567 # Count records and define output format
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
568 out_type = getFileType(db_file) if out_args['out_type'] is None \
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
569 else out_args['out_type']
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
570 result_count = countDbFile(db_file)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
571
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
572 # Defined successful output handle
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
573 pass_handle = getOutputHandle(db_file,
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
574 out_label='clone-pass',
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
575 out_dir=out_args['out_dir'],
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
576 out_name=out_args['out_name'],
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
577 out_type=out_type)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
578 pass_writer = getDbWriter(pass_handle, db_file, add_fields='CLONE')
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
579
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
580 # Defined failed alignment output handle
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
581 if out_args['failed']:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
582 fail_handle = getOutputHandle(db_file,
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
583 out_label='clone-fail',
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
584 out_dir=out_args['out_dir'],
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
585 out_name=out_args['out_name'],
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
586 out_type=out_type)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
587 fail_writer = getDbWriter(fail_handle, db_file)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
588 else:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
589 fail_handle = None
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
590 fail_writer = None
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
591
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
592 # Define log handle
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
593 if out_args['log_file'] is None:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
594 log_handle = None
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
595 else:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
596 log_handle = open(out_args['log_file'], 'w')
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
597 except:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
598 #sys.stderr.write('Exception in collector file opening step\n')
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
599 alive.value = False
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
600 raise
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
601
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
602 # Get results from queue and write to files
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
603 try:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
604 #print 'START COLLECT', alive.value
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
605 # Iterator over results queue until sentinel object reached
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
606 start_time = time()
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
607 rec_count = clone_count = pass_count = fail_count = 0
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
608 while alive.value:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
609 # Get result from queue
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
610 if result_queue.empty(): continue
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
611 else: result = result_queue.get()
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
612 # Exit upon reaching sentinel
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
613 if result is None: break
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
614 #print "COLLECT", alive.value, result['id']
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
615
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
616 # Print progress for previous iteration and update record count
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
617 if rec_count == 0:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
618 print('PROGRESS> Assigning clones')
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
619 printProgress(rec_count, result_count, 0.05, start_time)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
620 rec_count += len(result.data)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
621
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
622 # Write passed and failed records
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
623 if result:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
624 for clone in result.results.values():
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
625 clone_count += 1
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
626 for i, rec in enumerate(clone):
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
627 rec.annotations['CLONE'] = clone_count
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
628 pass_writer.writerow(rec.toDict())
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
629 pass_count += 1
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
630 result.log['CLONE%i-%i' % (clone_count, i + 1)] = str(rec.junction)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
631
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
632 else:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
633 for i, rec in enumerate(result.data):
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
634 if fail_writer is not None: fail_writer.writerow(rec.toDict())
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
635 fail_count += 1
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
636 result.log['CLONE0-%i' % (i + 1)] = str(rec.junction)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
637
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
638 # Write log
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
639 printLog(result.log, handle=log_handle)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
640 else:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
641 sys.stderr.write('PID %s: Error in sibling process detected. Cleaning up.\n' \
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
642 % os.getpid())
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
643 return None
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
644
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
645 # Print total counts
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
646 printProgress(rec_count, result_count, 0.05, start_time)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
647
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
648 # Close file handles
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
649 pass_handle.close()
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
650 if fail_handle is not None: fail_handle.close()
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
651 if log_handle is not None: log_handle.close()
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
652
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
653 # Update return list
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
654 log = OrderedDict()
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
655 log['OUTPUT'] = os.path.basename(pass_handle.name)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
656 log['CLONES'] = clone_count
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
657 log['RECORDS'] = rec_count
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
658 log['PASS'] = pass_count
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
659 log['FAIL'] = fail_count
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
660 collect_dict = {'log':log, 'out_files': [pass_handle.name]}
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
661 collect_queue.put(collect_dict)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
662 except:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
663 #sys.stderr.write('Exception in collector result processing step\n')
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
664 alive.value = False
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
665 raise
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
666
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
667 return None
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
668
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
669
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
670 def collectQueueClust(alive, result_queue, collect_queue, db_file, out_args, cluster_func, cluster_args):
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
671 """
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
672 Assembles results from a queue of individual sequence results and manages log/file I/O
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
673
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
674 Arguments:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
675 alive = a multiprocessing.Value boolean controlling whether processing continues
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
676 if False exit process
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
677 result_queue = a multiprocessing.Queue holding processQueue results
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
678 collect_queue = a multiprocessing.Queue to store collector return values
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
679 db_file = the input database file name
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
680 out_args = common output argument dictionary from parseCommonArgs
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
681 cluster_func = the function to call for carrying out clustering on distance matrix
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
682 cluster_args = a dictionary of arguments to pass to cluster_func
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
683
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
684 Returns:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
685 None
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
686 (adds 'log' and 'out_files' to collect_dict)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
687 """
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
688 # Open output files
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
689 try:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
690
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
691 # Iterate over Ig records to count and order by junction length
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
692 result_count = 0
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
693 records = {}
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
694 # print 'Reading file...'
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
695 db_iter = readDbFile(db_file)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
696 for rec in db_iter:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
697 records[rec.id] = rec
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
698 result_count += 1
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
699 records = OrderedDict(sorted(list(records.items()), key=lambda i: i[1].junction_length))
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
700
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
701 # Define empty matrix to store assembled results
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
702 dist_mat = np.zeros((result_count,result_count))
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
703
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
704 # Count records and define output format
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
705 out_type = getFileType(db_file) if out_args['out_type'] is None \
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
706 else out_args['out_type']
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
707
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
708 # Defined successful output handle
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
709 pass_handle = getOutputHandle(db_file,
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
710 out_label='clone-pass',
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
711 out_dir=out_args['out_dir'],
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
712 out_name=out_args['out_name'],
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
713 out_type=out_type)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
714 pass_writer = getDbWriter(pass_handle, db_file, add_fields='CLONE')
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
715
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
716 # Defined failed cloning output handle
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
717 if out_args['failed']:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
718 fail_handle = getOutputHandle(db_file,
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
719 out_label='clone-fail',
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
720 out_dir=out_args['out_dir'],
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
721 out_name=out_args['out_name'],
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
722 out_type=out_type)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
723 fail_writer = getDbWriter(fail_handle, db_file)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
724 else:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
725 fail_handle = None
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
726 fail_writer = None
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
727
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
728 # Open log file
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
729 if out_args['log_file'] is None:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
730 log_handle = None
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
731 else:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
732 log_handle = open(out_args['log_file'], 'w')
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
733 except:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
734 alive.value = False
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
735 raise
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
736
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
737 try:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
738 # Iterator over results queue until sentinel object reached
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
739 start_time = time()
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
740 row_count = rec_count = 0
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
741 while alive.value:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
742 # Get result from queue
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
743 if result_queue.empty(): continue
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
744 else: result = result_queue.get()
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
745 # Exit upon reaching sentinel
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
746 if result is None: break
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
747
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
748 # Print progress for previous iteration
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
749 if row_count == 0:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
750 print('PROGRESS> Assigning clones')
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
751 printProgress(row_count, result_count, 0.05, start_time)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
752
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
753 # Update counts for iteration
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
754 row_count += 1
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
755 rec_count += len(result)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
756
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
757 # Add result row to distance matrix
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
758 if result:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
759 dist_mat[list(range(result_count-len(result),result_count)),result_count-len(result)] = result.results
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
760
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
761 else:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
762 sys.stderr.write('PID %s: Error in sibling process detected. Cleaning up.\n' \
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
763 % os.getpid())
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
764 return None
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
765
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
766 # Calculate linkage and carry out clustering
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
767 # print dist_mat
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
768 clusters = cluster_func(dist_mat, **cluster_args) if dist_mat is not None else None
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
769 clones = {}
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
770 # print clusters
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
771 for i, c in enumerate(clusters):
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
772 clones.setdefault(c, []).append(records[list(records.keys())[i]])
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
773
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
774 # Write passed and failed records
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
775 clone_count = pass_count = fail_count = 0
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
776 if clones:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
777 for clone in clones.values():
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
778 clone_count += 1
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
779 for i, rec in enumerate(clone):
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
780 rec.annotations['CLONE'] = clone_count
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
781 pass_writer.writerow(rec.toDict())
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
782 pass_count += 1
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
783 #result.log['CLONE%i-%i' % (clone_count, i + 1)] = str(rec.junction)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
784
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
785 else:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
786 for i, rec in enumerate(result.data):
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
787 fail_writer.writerow(rec.toDict())
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
788 fail_count += 1
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
789 #result.log['CLONE0-%i' % (i + 1)] = str(rec.junction)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
790
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
791 # Print final progress
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
792 printProgress(row_count, result_count, 0.05, start_time)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
793
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
794 # Close file handles
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
795 pass_handle.close()
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
796 if fail_handle is not None: fail_handle.close()
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
797 if log_handle is not None: log_handle.close()
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
798
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
799 # Update return list
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
800 log = OrderedDict()
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
801 log['OUTPUT'] = os.path.basename(pass_handle.name)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
802 log['CLONES'] = clone_count
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
803 log['RECORDS'] = rec_count
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
804 log['PASS'] = pass_count
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
805 log['FAIL'] = fail_count
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
806 collect_dict = {'log':log, 'out_files': [pass_handle.name]}
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
807 collect_queue.put(collect_dict)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
808 except:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
809 alive.value = False
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
810 raise
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
811
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
812 return None
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
813
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
814
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
815 def defineClones(db_file, feed_func, work_func, collect_func, clone_func, cluster_func=None,
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
816 group_func=None, group_args={}, clone_args={}, cluster_args={},
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
817 out_args=default_out_args, nproc=None, queue_size=None):
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
818 """
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
819 Define clonally related sequences
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
820
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
821 Arguments:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
822 db_file = filename of input database
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
823 feed_func = the function that feeds the queue
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
824 work_func = the worker function that will run on each CPU
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
825 collect_func = the function that collects results from the workers
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
826 group_func = the function to use for assigning preclones
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
827 clone_func = the function to use for determining clones within preclonal groups
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
828 group_args = a dictionary of arguments to pass to group_func
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
829 clone_args = a dictionary of arguments to pass to clone_func
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
830 out_args = common output argument dictionary from parseCommonArgs
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
831 nproc = the number of processQueue processes;
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
832 if None defaults to the number of CPUs
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
833 queue_size = maximum size of the argument queue;
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
834 if None defaults to 2*nproc
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
835
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
836 Returns:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
837 a list of successful output file names
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
838 """
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
839 # Print parameter info
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
840 log = OrderedDict()
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
841 log['START'] = 'DefineClones'
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
842 log['DB_FILE'] = os.path.basename(db_file)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
843 if group_func is not None:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
844 log['GROUP_FUNC'] = group_func.__name__
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
845 log['GROUP_ARGS'] = group_args
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
846 log['CLONE_FUNC'] = clone_func.__name__
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
847
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
848 # TODO: this is yucky, but can be fixed by using a model class
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
849 clone_log = clone_args.copy()
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
850 if 'dist_mat' in clone_log: del clone_log['dist_mat']
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
851 log['CLONE_ARGS'] = clone_log
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
852
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
853 if cluster_func is not None:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
854 log['CLUSTER_FUNC'] = cluster_func.__name__
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
855 log['CLUSTER_ARGS'] = cluster_args
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
856 log['NPROC'] = nproc
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
857 printLog(log)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
858
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
859 # Define feeder function and arguments
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
860 feed_args = {'db_file': db_file,
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
861 'group_func': group_func,
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
862 'group_args': group_args}
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
863 # Define worker function and arguments
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
864 work_args = {'clone_func': clone_func,
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
865 'clone_args': clone_args}
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
866 # Define collector function and arguments
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
867 collect_args = {'db_file': db_file,
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
868 'out_args': out_args,
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
869 'cluster_func': cluster_func,
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
870 'cluster_args': cluster_args}
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
871
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
872 # Call process manager
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
873 result = manageProcesses(feed_func, work_func, collect_func,
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
874 feed_args, work_args, collect_args,
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
875 nproc, queue_size)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
876
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
877 # Print log
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
878 result['log']['END'] = 'DefineClones'
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
879 printLog(result['log'])
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
880
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
881 return result['out_files']
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
882
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
883
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
884 def getArgParser():
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
885 """
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
886 Defines the ArgumentParser
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
887
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
888 Arguments:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
889 None
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
890
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
891 Returns:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
892 an ArgumentParser object
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
893 """
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
894 # Define input and output fields
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
895 fields = dedent(
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
896 '''
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
897 output files:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
898 clone-pass
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
899 database with assigned clonal group numbers.
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
900 clone-fail
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
901 database with records failing clonal grouping.
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
902
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
903 required fields:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
904 SEQUENCE_ID, V_CALL or V_CALL_GENOTYPED, D_CALL, J_CALL, JUNCTION_LENGTH
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
905
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
906 <field>
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
907 sequence field specified by the --sf parameter
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
908
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
909 output fields:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
910 CLONE
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
911 ''')
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
912
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
913 # Define ArgumentParser
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
914 parser = ArgumentParser(description=__doc__, epilog=fields,
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
915 formatter_class=CommonHelpFormatter)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
916 parser.add_argument('--version', action='version',
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
917 version='%(prog)s:' + ' %s-%s' %(__version__, __date__))
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
918 subparsers = parser.add_subparsers(title='subcommands', dest='command', metavar='',
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
919 help='Cloning method')
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
920 # TODO: This is a temporary fix for Python issue 9253
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
921 subparsers.required = True
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
922
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
923 # Parent parser
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
924 parser_parent = getCommonArgParser(seq_in=False, seq_out=False, db_in=True,
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
925 multiproc=True)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
926
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
927 # Distance cloning method
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
928 parser_bygroup = subparsers.add_parser('bygroup', parents=[parser_parent],
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
929 formatter_class=CommonHelpFormatter,
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
930 help='''Defines clones as having same V assignment,
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
931 J assignment, and junction length with
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
932 specified substitution distance model.''')
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
933 parser_bygroup.add_argument('-f', nargs='+', action='store', dest='fields', default=None,
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
934 help='Additional fields to use for grouping clones (non VDJ)')
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
935 parser_bygroup.add_argument('--mode', action='store', dest='mode',
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
936 choices=('allele', 'gene'), default='gene',
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
937 help='''Specifies whether to use the V(D)J allele or gene for
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
938 initial grouping.''')
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
939 parser_bygroup.add_argument('--act', action='store', dest='action', default='set',
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
940 choices=('first', 'set'),
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
941 help='''Specifies how to handle multiple V(D)J assignments
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
942 for initial grouping.''')
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
943 parser_bygroup.add_argument('--model', action='store', dest='model',
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
944 choices=('aa', 'ham', 'm1n', 'hs1f', 'hs5f'),
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
945 default=default_bygroup_model,
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
946 help='''Specifies which substitution model to use for
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
947 calculating distance between sequences. Where m1n is the
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
948 mouse single nucleotide transition/trasversion model
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
949 of Smith et al, 1996; hs1f is the human single
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
950 nucleotide model derived from Yaari et al, 2013; hs5f
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
951 is the human S5F model of Yaari et al, 2013; ham is
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
952 nucleotide Hamming distance; and aa is amino acid
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
953 Hamming distance. The hs5f data should be
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
954 considered experimental.''')
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
955 parser_bygroup.add_argument('--dist', action='store', dest='distance', type=float,
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
956 default=default_distance,
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
957 help='The distance threshold for clonal grouping')
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
958 parser_bygroup.add_argument('--norm', action='store', dest='norm',
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
959 choices=('len', 'mut', 'none'), default=default_norm,
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
960 help='''Specifies how to normalize distances. One of none
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
961 (do not normalize), len (normalize by length),
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
962 or mut (normalize by number of mutations between sequences).''')
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
963 parser_bygroup.add_argument('--sym', action='store', dest='sym',
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
964 choices=('avg', 'min'), default=default_sym,
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
965 help='''Specifies how to combine asymmetric distances. One of avg
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
966 (average of A->B and B->A) or min (minimum of A->B and B->A).''')
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
967 parser_bygroup.add_argument('--link', action='store', dest='linkage',
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
968 choices=('single', 'average', 'complete'), default=default_linkage,
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
969 help='''Type of linkage to use for hierarchical clustering.''')
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
970 parser_bygroup.add_argument('--sf', action='store', dest='seq_field',
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
971 default=default_seq_field,
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
972 help='''The name of the field to be used to calculate
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
973 distance between records''')
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
974 parser_bygroup.set_defaults(feed_func=feedQueue)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
975 parser_bygroup.set_defaults(work_func=processQueue)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
976 parser_bygroup.set_defaults(collect_func=collectQueue)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
977 parser_bygroup.set_defaults(group_func=indexJunctions)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
978 parser_bygroup.set_defaults(clone_func=distanceClones)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
979
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
980
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
981 # Hierarchical clustering cloning method
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
982 parser_hclust = subparsers.add_parser('hclust', parents=[parser_parent],
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
983 formatter_class=CommonHelpFormatter,
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
984 help='Defines clones by specified distance metric on CDR3s and \
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
985 cutting of hierarchical clustering tree')
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
986 # parser_hclust.add_argument('-f', nargs='+', action='store', dest='fields', default=None,
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
987 # help='Fields to use for grouping clones (non VDJ)')
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
988 parser_hclust.add_argument('--method', action='store', dest='method',
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
989 choices=('chen2010', 'ademokun2011'), default=default_hclust_model,
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
990 help='Specifies which cloning method to use for calculating distance \
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
991 between CDR3s, computing linkage, and cutting clusters')
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
992 parser_hclust.set_defaults(feed_func=feedQueueClust)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
993 parser_hclust.set_defaults(work_func=processQueueClust)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
994 parser_hclust.set_defaults(collect_func=collectQueueClust)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
995 parser_hclust.set_defaults(cluster_func=hierClust)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
996
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
997 return parser
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
998
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
999
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
1000 if __name__ == '__main__':
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
1001 """
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
1002 Parses command line arguments and calls main function
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
1003 """
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
1004 # Parse arguments
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
1005 parser = getArgParser()
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
1006 args = parser.parse_args()
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
1007 args_dict = parseCommonArgs(args)
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
1008 # Convert case of fields
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
1009 if 'seq_field' in args_dict:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
1010 args_dict['seq_field'] = args_dict['seq_field'].upper()
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
1011 if 'fields' in args_dict and args_dict['fields'] is not None:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
1012 args_dict['fields'] = [f.upper() for f in args_dict['fields']]
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
1013
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
1014 # Define clone_args
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
1015 if args.command == 'bygroup':
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
1016 args_dict['group_args'] = {'fields': args_dict['fields'],
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
1017 'action': args_dict['action'],
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
1018 'mode':args_dict['mode']}
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
1019 args_dict['clone_args'] = {'model': args_dict['model'],
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
1020 'distance': args_dict['distance'],
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
1021 'norm': args_dict['norm'],
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
1022 'sym': args_dict['sym'],
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
1023 'linkage': args_dict['linkage'],
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
1024 'seq_field': args_dict['seq_field']}
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
1025
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
1026 # TODO: can be cleaned up with abstract model class
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
1027 args_dict['clone_args']['dist_mat'] = getModelMatrix(args_dict['model'])
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
1028
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
1029 del args_dict['fields']
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
1030 del args_dict['action']
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
1031 del args_dict['mode']
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
1032 del args_dict['model']
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
1033 del args_dict['distance']
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
1034 del args_dict['norm']
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
1035 del args_dict['sym']
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
1036 del args_dict['linkage']
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
1037 del args_dict['seq_field']
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
1038
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
1039 # Define clone_args
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
1040 if args.command == 'hclust':
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
1041 dist_funcs = {'chen2010':distChen2010, 'ademokun2011':distAdemokun2011}
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
1042 args_dict['clone_func'] = dist_funcs[args_dict['method']]
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
1043 args_dict['cluster_args'] = {'method': args_dict['method']}
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
1044 #del args_dict['fields']
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
1045 del args_dict['method']
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
1046
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
1047 # Call defineClones
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
1048 del args_dict['command']
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
1049 del args_dict['db_files']
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
1050 for f in args.__dict__['db_files']:
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
1051 args_dict['db_file'] = f
8a5a2abbb870 Uploaded
davidvanzessen
parents:
diff changeset
1052 defineClones(**args_dict)