diff change_o/DefineClones.py @ 0:8a5a2abbb870 draft default tip

Uploaded
author davidvanzessen
date Mon, 29 Aug 2016 05:36:10 -0400
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/change_o/DefineClones.py	Mon Aug 29 05:36:10 2016 -0400
@@ -0,0 +1,1052 @@
+#!/usr/bin/env python3
+"""
+Assign Ig sequences into clones
+"""
+# Info
+__author__ = 'Namita Gupta, Jason Anthony Vander Heiden, Gur Yaari, Mohamed Uduman'
+from changeo import __version__, __date__
+
+# Imports
+import os
+import re
+import sys
+import numpy as np
+from argparse import ArgumentParser
+from collections import OrderedDict
+from itertools import chain
+from textwrap import dedent
+from time import time
+from Bio import pairwise2
+from Bio.Seq import translate
+
+# Presto and changeo imports
+from presto.Defaults import default_out_args
+from presto.IO import getFileType, getOutputHandle, printLog, printProgress
+from presto.Multiprocessing import manageProcesses
+from presto.Sequence import getDNAScoreDict
+from changeo.Commandline import CommonHelpFormatter, getCommonArgParser, parseCommonArgs
+from changeo.Distance import getDNADistMatrix, getAADistMatrix, \
+                             hs1f_model, m1n_model, hs5f_model, \
+                             calcDistances, formClusters
+from changeo.IO import getDbWriter, readDbFile, countDbFile
+from changeo.Multiprocessing import DbData, DbResult
+
+# Defaults
+default_translate = False
+default_distance = 0.0
+default_bygroup_model = 'hs1f'
+default_hclust_model = 'chen2010'
+default_seq_field = 'JUNCTION'
+default_norm = 'len'
+default_sym = 'avg'
+default_linkage = 'single'
+
+# TODO:  should be in Distance, but need to be after function definitions
+# Amino acid Hamming distance
+aa_model = getAADistMatrix(mask_dist=1, gap_dist=0)
+
+# DNA Hamming distance
+ham_model = getDNADistMatrix(mask_dist=0, gap_dist=0)
+
+
+# TODO:  this function is an abstraction to facilitate later cleanup
+def getModelMatrix(model):
+    """
+    Simple wrapper to get distance matrix from model name
+
+    Arguments:
+    model = model name
+
+    Return:
+    a pandas.DataFrame containing the character distance matrix
+    """
+    if model == 'aa':
+        return(aa_model)
+    elif model == 'ham':
+        return(ham_model)
+    elif model == 'm1n':
+        return(m1n_model)
+    elif model == 'hs1f':
+        return(hs1f_model)
+    elif model == 'hs5f':
+        return(hs5f_model)
+    else:
+        sys.stderr.write('Unrecognized distance model: %s.\n' % model)
+
+
+def indexJunctions(db_iter, fields=None, mode='gene', action='first'):
+    """
+    Identifies preclonal groups by V, J and junction length
+
+    Arguments: 
+    db_iter = an iterator of IgRecords defined by readDbFile
+    fields = additional annotation fields to use to group preclones;
+             if None use only V, J and junction length
+    mode = specificity of alignment call to use for assigning preclones;
+           one of ('allele', 'gene')
+    action = how to handle multiple value fields when assigning preclones;
+             one of ('first', 'set')
+    
+    Returns: 
+    a dictionary of {(V, J, junction length):[IgRecords]}
+    """
+    # Define functions for grouping keys
+    if mode == 'allele' and fields is None:
+        def _get_key(rec, act):
+            return (rec.getVAllele(act), rec.getJAllele(act),
+                    None if rec.junction is None else len(rec.junction))
+    elif mode == 'gene' and fields is None:
+        def _get_key(rec, act):  
+            return (rec.getVGene(act), rec.getJGene(act),
+                    None if rec.junction is None else len(rec.junction))
+    elif mode == 'allele' and fields is not None:
+        def _get_key(rec, act):
+            vdj = [rec.getVAllele(act), rec.getJAllele(act),
+                    None if rec.junction is None else len(rec.junction)]
+            ann = [rec.toDict().get(k, None) for k in fields]
+            return tuple(chain(vdj, ann))
+    elif mode == 'gene' and fields is not None:
+        def _get_key(rec, act):
+            vdj = [rec.getVGene(act), rec.getJGene(act),
+                    None if rec.junction is None else len(rec.junction)]
+            ann = [rec.toDict().get(k, None) for k in fields]
+            return tuple(chain(vdj, ann))
+
+    start_time = time()
+    clone_index = {}
+    rec_count = 0
+    for rec in db_iter:
+        key = _get_key(rec, action)
+
+        # Print progress
+        if rec_count == 0:
+            print('PROGRESS> Grouping sequences')
+
+        printProgress(rec_count, step=1000, start_time=start_time)
+        rec_count += 1
+
+        # Assigned passed preclone records to key and failed to index None
+        if all([k is not None and k != '' for k in key]):
+            #print key
+            # TODO:  Has much slow. Should have less slow.
+            if action == 'set':
+                
+                f_range = list(range(2, 3 + (len(fields) if fields else 0)))
+                vdj_range = list(range(2))
+                
+                # Check for any keys that have matching columns and junction length and overlapping genes/alleles
+                to_remove = []
+                if len(clone_index) > (1 if None in clone_index else 0) and key not in clone_index:
+                    key = list(key)
+                    for k in clone_index:
+                        if k is not None and all([key[i] == k[i] for i in f_range]):
+                            if all([not set(key[i]).isdisjoint(set(k[i])) for i in vdj_range]):
+                                for i in vdj_range:  key[i] = tuple(set(key[i]).union(set(k[i])))
+                                to_remove.append(k)
+                
+                # Remove original keys, replace with union of all genes/alleles and append values to new key
+                val = [rec]
+                val += list(chain(*(clone_index.pop(k) for k in to_remove)))
+                clone_index[tuple(key)] = clone_index.get(tuple(key),[]) + val 
+
+            elif action == 'first':
+                clone_index.setdefault(key, []).append(rec)
+        else:
+            clone_index.setdefault(None, []).append(rec)
+
+    printProgress(rec_count, step=1000, start_time=start_time, end=True)
+
+    return clone_index
+
+
+def distanceClones(records, model=default_bygroup_model, distance=default_distance,
+                   dist_mat=None, norm=default_norm, sym=default_sym,
+                   linkage=default_linkage, seq_field=default_seq_field):
+    """
+    Separates a set of IgRecords into clones
+
+    Arguments: 
+    records = an iterator of IgRecords
+    model = substitution model used to calculate distance
+    distance = the distance threshold to assign clonal groups
+    dist_mat = pandas DataFrame of pairwise nucleotide or amino acid distances
+    norm = normalization method
+    sym = symmetry method
+    linkage = type of linkage
+    seq_field = sequence field used to calculate distance between records
+
+    Returns: 
+    a dictionary of lists defining {clone number: [IgRecords clonal group]}
+    """
+    # Get distance matrix if not provided
+    if dist_mat is None:  dist_mat = getModelMatrix(model)
+
+    # Determine length of n-mers
+    if model in ['hs1f', 'm1n', 'aa', 'ham']:
+        nmer_len = 1
+    elif model in ['hs5f']:
+        nmer_len = 5
+    else:
+        sys.stderr.write('Unrecognized distance model: %s.\n' % model)
+
+    # Define unique junction mapping
+    seq_map = {}
+    for ig in records:
+        seq = ig.getSeqField(seq_field)
+        # Check if sequence length is 0
+        if len(seq) == 0:
+            return None
+
+        seq = re.sub('[\.-]','N', str(seq))
+        if model == 'aa':  seq = translate(seq)
+
+        seq_map.setdefault(seq, []).append(ig)
+
+    # Process records
+    if len(seq_map) == 1:
+        return {1:records}
+
+    # Define sequences
+    seqs = list(seq_map.keys())
+
+    # Calculate pairwise distance matrix
+    dists = calcDistances(seqs, nmer_len, dist_mat, norm, sym)
+
+    # Perform hierarchical clustering
+    clusters = formClusters(dists, linkage, distance)
+
+    # Turn clusters into clone dictionary
+    clone_dict = {}
+    for i, c in enumerate(clusters):
+        clone_dict.setdefault(c, []).extend(seq_map[seqs[i]])
+
+    return clone_dict
+
+
+def distChen2010(records):
+    """
+    Calculate pairwise distances as defined in Chen 2010
+    
+    Arguments:
+    records = list of IgRecords where first is query to be compared to others in list
+    
+    Returns:
+    list of distances
+    """
+    # Pull out query sequence and V/J information
+    query = records.popitem(last=False)
+    query_cdr3 = query.junction[3:-3]
+    query_v_allele = query.getVAllele()
+    query_v_gene = query.getVGene()
+    query_v_family = query.getVFamily()
+    query_j_allele = query.getJAllele()
+    query_j_gene = query.getJGene()
+    # Create alignment scoring dictionary
+    score_dict = getDNAScoreDict()
+    
+    scores = [0]*len(records)    
+    for i in range(len(records)):
+        ld = pairwise2.align.globalds(query_cdr3, records[i].junction[3:-3],
+                                      score_dict, -1, -1, one_alignment_only=True)
+        # Check V similarity
+        if records[i].getVAllele() == query_v_allele: ld += 0
+        elif records[i].getVGene() == query_v_gene: ld += 1
+        elif records[i].getVFamily() == query_v_family: ld += 3
+        else: ld += 5
+        # Check J similarity
+        if records[i].getJAllele() == query_j_allele: ld += 0
+        elif records[i].getJGene() == query_j_gene: ld += 1
+        else: ld += 3
+        # Divide by length
+        scores[i] = ld/max(len(records[i].junction[3:-3]), query_cdr3)
+        
+    return scores
+
+
+def distAdemokun2011(records):
+    """
+    Calculate pairwise distances as defined in Ademokun 2011
+    
+    Arguments:
+    records = list of IgRecords where first is query to be compared to others in list
+    
+    Returns:
+    list of distances
+    """
+    # Pull out query sequence and V family information
+    query = records.popitem(last=False)
+    query_cdr3 = query.junction[3:-3]
+    query_v_family = query.getVFamily()
+    # Create alignment scoring dictionary
+    score_dict = getDNAScoreDict()
+    
+    scores = [0]*len(records)    
+    for i in range(len(records)):
+        
+        if abs(len(query_cdr3) - len(records[i].junction[3:-3])) > 10:
+            scores[i] = 1
+        elif query_v_family != records[i].getVFamily(): 
+            scores[i] = 1
+        else: 
+            ld = pairwise2.align.globalds(query_cdr3, records[i].junction[3:-3], 
+                                          score_dict, -1, -1, one_alignment_only=True)
+            scores[i] = ld/min(len(records[i].junction[3:-3]), query_cdr3)
+    
+    return scores
+
+
+def hierClust(dist_mat, method='chen2010'):
+    """
+    Calculate hierarchical clustering
+    
+    Arguments:
+    dist_mat = square-formed distance matrix of pairwise CDR3 comparisons
+    
+    Returns:
+    list of cluster ids
+    """
+    if method == 'chen2010':
+        clusters = formClusters(dist_mat, 'average', 0.32)
+    elif method == 'ademokun2011':
+        clusters = formClusters(dist_mat, 'complete', 0.25)
+    else: clusters = np.ones(dist_mat.shape[0])
+        
+    return clusters
+
+# TODO:  Merge duplicate feed, process and collect functions.
+def feedQueue(alive, data_queue, db_file, group_func, group_args={}):
+    """
+    Feeds the data queue with Ig records
+
+    Arguments: 
+    alive = a multiprocessing.Value boolean controlling whether processing continues
+            if False exit process
+    data_queue = a multiprocessing.Queue to hold data for processing
+    db_file = the Ig record database file
+    group_func = the function to use for assigning preclones
+    group_args = a dictionary of arguments to pass to group_func
+    
+    Returns: 
+    None
+    """
+    # Open input file and perform grouping
+    try:
+        # Iterate over Ig records and assign groups
+        db_iter = readDbFile(db_file)
+        clone_dict = group_func(db_iter, **group_args)
+    except:
+        #sys.stderr.write('Exception in feeder grouping step\n')
+        alive.value = False
+        raise
+    
+    # Add groups to data queue
+    try:
+        #print 'START FEED', alive.value
+        # Iterate over groups and feed data queue
+        clone_iter = iter(clone_dict.items())
+        while alive.value:
+            # Get data from queue
+            if data_queue.full():  continue
+            else:  data = next(clone_iter, None)
+            # Exit upon reaching end of iterator
+            if data is None:  break
+            #print "FEED", alive.value, k
+            
+            # Feed queue
+            data_queue.put(DbData(*data))
+        else:
+            sys.stderr.write('PID %s:  Error in sibling process detected. Cleaning up.\n' \
+                             % os.getpid())
+            return None
+    except:
+        #sys.stderr.write('Exception in feeder queue feeding step\n')
+        alive.value = False
+        raise
+
+    return None
+
+
+def feedQueueClust(alive, data_queue, db_file, group_func=None, group_args={}):
+    """
+    Feeds the data queue with Ig records
+
+    Arguments: 
+    alive = a multiprocessing.Value boolean controlling whether processing continues
+            if False exit process
+    data_queue = a multiprocessing.Queue to hold data for processing
+    db_file = the Ig record database file
+    
+    Returns: 
+    None
+    """
+    # Open input file and perform grouping
+    try:
+        # Iterate over Ig records and order by junction length
+        records = {}
+        db_iter = readDbFile(db_file)
+        for rec in db_iter:
+            records[rec.id] = rec
+        records = OrderedDict(sorted(list(records.items()), key=lambda i: i[1].junction_length))
+        dist_dict = {}
+        for __ in range(len(records)):
+            k,v = records.popitem(last=False)
+            dist_dict[k] = [v].append(list(records.values()))
+    except:
+        #sys.stderr.write('Exception in feeder grouping step\n')
+        alive.value = False
+        raise
+    
+    # Add groups to data queue
+    try:
+        # print 'START FEED', alive.value
+        # Iterate over groups and feed data queue
+        dist_iter = iter(dist_dict.items())
+        while alive.value:
+            # Get data from queue
+            if data_queue.full():  continue
+            else:  data = next(dist_iter, None)
+            # Exit upon reaching end of iterator
+            if data is None:  break
+            #print "FEED", alive.value, k
+            
+            # Feed queue
+            data_queue.put(DbData(*data))
+        else:
+            sys.stderr.write('PID %s:  Error in sibling process detected. Cleaning up.\n' \
+                             % os.getpid())
+            return None
+    except:
+        #sys.stderr.write('Exception in feeder queue feeding step\n')
+        alive.value = False
+        raise
+
+    return None
+
+
+def processQueue(alive, data_queue, result_queue, clone_func, clone_args):
+    """
+    Pulls from data queue, performs calculations, and feeds results queue
+
+    Arguments: 
+    alive = a multiprocessing.Value boolean controlling whether processing continues
+            if False exit process
+    data_queue = a multiprocessing.Queue holding data to process
+    result_queue = a multiprocessing.Queue to hold processed results
+    clone_func = the function to call for clonal assignment
+    clone_args = a dictionary of arguments to pass to clone_func
+
+    Returns: 
+    None
+    """
+    try:
+        # Iterator over data queue until sentinel object reached
+        while alive.value:
+            # Get data from queue
+            if data_queue.empty():  continue
+            else:  data = data_queue.get()
+            # Exit upon reaching sentinel
+            if data is None:  break
+
+            # Define result object for iteration and get data records
+            records = data.data
+            result = DbResult(data.id, records)
+
+            # Check for invalid data (due to failed indexing) and add failed result
+            if not data:
+                result_queue.put(result)
+                continue
+
+            # Add V(D)J to log
+            result.log['ID'] = ','.join([str(x) for x in data.id])
+            result.log['VALLELE'] = ','.join(set([(r.getVAllele() or '') for r in records]))
+            result.log['DALLELE'] = ','.join(set([(r.getDAllele() or '') for r in records]))
+            result.log['JALLELE'] = ','.join(set([(r.getJAllele() or '') for r in records]))
+            result.log['JUNCLEN'] = ','.join(set([(str(len(r.junction)) or '0') for r in records]))
+            result.log['SEQUENCES'] = len(records)
+             
+            # Checking for preclone failure and assign clones
+            clones = clone_func(records, **clone_args) if data else None
+
+            # import cProfile
+            # prof = cProfile.Profile()
+            # clones = prof.runcall(clone_func, records, **clone_args)
+            # prof.dump_stats('worker-%d.prof' % os.getpid())
+
+            if clones is not None:
+                result.results = clones
+                result.valid = True
+                result.log['CLONES'] = len(clones)
+            else:
+                result.log['CLONES'] = 0
+  
+            # Feed results to result queue
+            result_queue.put(result)
+        else:
+            sys.stderr.write('PID %s:  Error in sibling process detected. Cleaning up.\n' \
+                             % os.getpid())
+            return None
+    except:
+        #sys.stderr.write('Exception in worker\n')
+        alive.value = False
+        raise
+    
+    return None
+
+
+def processQueueClust(alive, data_queue, result_queue, clone_func, clone_args):
+    """
+    Pulls from data queue, performs calculations, and feeds results queue
+
+    Arguments: 
+    alive = a multiprocessing.Value boolean controlling whether processing continues
+            if False exit process
+    data_queue = a multiprocessing.Queue holding data to process
+    result_queue = a multiprocessing.Queue to hold processed results
+    clone_func = the function to call for calculating pairwise distances between sequences
+    clone_args = a dictionary of arguments to pass to clone_func
+
+    Returns: 
+    None
+    """
+    
+    try:
+        # print 'START WORK', alive.value
+        # Iterator over data queue until sentinel object reached
+        while alive.value:
+            # Get data from queue
+            if data_queue.empty():  continue
+            else:  data = data_queue.get()
+            # Exit upon reaching sentinel
+            if data is None:  break
+            # print "WORK", alive.value, data['id']
+
+            # Define result object for iteration and get data records
+            records = data.data
+            result = DbResult(data.id, records)
+             
+            # Create row of distance matrix and check for error
+            dist_row = clone_func(records, **clone_args) if data else None
+            if dist_row is not None:
+                result.results = dist_row
+                result.valid = True
+  
+            # Feed results to result queue
+            result_queue.put(result)
+        else:
+            sys.stderr.write('PID %s:  Error in sibling process detected. Cleaning up.\n' \
+                             % os.getpid())
+            return None
+    except:
+        #sys.stderr.write('Exception in worker\n')
+        alive.value = False
+        raise
+    
+    return None
+
+
+def collectQueue(alive, result_queue, collect_queue, db_file, out_args, cluster_func=None, cluster_args={}):
+    """
+    Assembles results from a queue of individual sequence results and manages log/file I/O
+
+    Arguments: 
+    alive = a multiprocessing.Value boolean controlling whether processing continues
+            if False exit process
+    result_queue = a multiprocessing.Queue holding processQueue results
+    collect_queue = a multiprocessing.Queue to store collector return values
+    db_file = the input database file name
+    out_args = common output argument dictionary from parseCommonArgs
+    cluster_func = the function to call for carrying out clustering on distance matrix
+    cluster_args = a dictionary of arguments to pass to cluster_func
+    
+    Returns: 
+    None
+    (adds 'log' and 'out_files' to collect_dict)
+    """
+    # Open output files
+    try:
+        # Count records and define output format 
+        out_type = getFileType(db_file) if out_args['out_type'] is None \
+                   else out_args['out_type']
+        result_count = countDbFile(db_file)
+        
+        # Defined successful output handle
+        pass_handle = getOutputHandle(db_file, 
+                                      out_label='clone-pass', 
+                                      out_dir=out_args['out_dir'], 
+                                      out_name=out_args['out_name'], 
+                                      out_type=out_type)
+        pass_writer = getDbWriter(pass_handle, db_file, add_fields='CLONE')
+        
+        # Defined failed alignment output handle
+        if out_args['failed']:
+            fail_handle = getOutputHandle(db_file,
+                                          out_label='clone-fail', 
+                                          out_dir=out_args['out_dir'], 
+                                          out_name=out_args['out_name'], 
+                                          out_type=out_type)
+            fail_writer = getDbWriter(fail_handle, db_file)
+        else:
+            fail_handle = None
+            fail_writer = None
+
+        # Define log handle
+        if out_args['log_file'] is None:  
+            log_handle = None
+        else:  
+            log_handle = open(out_args['log_file'], 'w')
+    except:
+        #sys.stderr.write('Exception in collector file opening step\n')
+        alive.value = False
+        raise
+
+    # Get results from queue and write to files
+    try:
+        #print 'START COLLECT', alive.value
+        # Iterator over results queue until sentinel object reached
+        start_time = time()
+        rec_count = clone_count = pass_count = fail_count = 0
+        while alive.value:
+            # Get result from queue
+            if result_queue.empty():  continue
+            else:  result = result_queue.get()
+            # Exit upon reaching sentinel
+            if result is None:  break
+            #print "COLLECT", alive.value, result['id']
+            
+            # Print progress for previous iteration and update record count
+            if rec_count == 0:
+                print('PROGRESS> Assigning clones')
+            printProgress(rec_count, result_count, 0.05, start_time) 
+            rec_count += len(result.data)
+            
+            # Write passed and failed records
+            if result:
+                for clone in result.results.values():
+                    clone_count += 1
+                    for i, rec in enumerate(clone):
+                        rec.annotations['CLONE'] = clone_count
+                        pass_writer.writerow(rec.toDict())
+                        pass_count += 1
+                        result.log['CLONE%i-%i' % (clone_count, i + 1)] = str(rec.junction)
+    
+            else:
+                for i, rec in enumerate(result.data):
+                    if fail_writer is not None: fail_writer.writerow(rec.toDict())
+                    fail_count += 1
+                    result.log['CLONE0-%i' % (i + 1)] = str(rec.junction)
+                    
+            # Write log
+            printLog(result.log, handle=log_handle)
+        else:
+            sys.stderr.write('PID %s:  Error in sibling process detected. Cleaning up.\n' \
+                             % os.getpid())
+            return None
+        
+        # Print total counts
+        printProgress(rec_count, result_count, 0.05, start_time)
+
+        # Close file handles
+        pass_handle.close()
+        if fail_handle is not None:  fail_handle.close()
+        if log_handle is not None:  log_handle.close()
+                
+        # Update return list
+        log = OrderedDict()
+        log['OUTPUT'] = os.path.basename(pass_handle.name)
+        log['CLONES'] = clone_count
+        log['RECORDS'] = rec_count
+        log['PASS'] = pass_count
+        log['FAIL'] = fail_count
+        collect_dict = {'log':log, 'out_files': [pass_handle.name]}
+        collect_queue.put(collect_dict)
+    except:
+        #sys.stderr.write('Exception in collector result processing step\n')
+        alive.value = False
+        raise
+
+    return None
+
+
+def collectQueueClust(alive, result_queue, collect_queue, db_file, out_args, cluster_func, cluster_args):
+    """
+    Assembles results from a queue of individual sequence results and manages log/file I/O
+
+    Arguments: 
+    alive = a multiprocessing.Value boolean controlling whether processing continues
+            if False exit process
+    result_queue = a multiprocessing.Queue holding processQueue results
+    collect_queue = a multiprocessing.Queue to store collector return values
+    db_file = the input database file name
+    out_args = common output argument dictionary from parseCommonArgs
+    cluster_func = the function to call for carrying out clustering on distance matrix
+    cluster_args = a dictionary of arguments to pass to cluster_func
+    
+    Returns: 
+    None
+    (adds 'log' and 'out_files' to collect_dict)
+    """
+    # Open output files
+    try:
+               
+        # Iterate over Ig records to count and order by junction length
+        result_count = 0
+        records = {}
+        # print 'Reading file...'
+        db_iter = readDbFile(db_file)
+        for rec in db_iter:
+            records[rec.id] = rec
+            result_count += 1
+        records = OrderedDict(sorted(list(records.items()), key=lambda i: i[1].junction_length))
+                
+        # Define empty matrix to store assembled results
+        dist_mat = np.zeros((result_count,result_count))
+        
+        # Count records and define output format 
+        out_type = getFileType(db_file) if out_args['out_type'] is None \
+                   else out_args['out_type']
+                   
+        # Defined successful output handle
+        pass_handle = getOutputHandle(db_file, 
+                                      out_label='clone-pass', 
+                                      out_dir=out_args['out_dir'], 
+                                      out_name=out_args['out_name'], 
+                                      out_type=out_type)
+        pass_writer = getDbWriter(pass_handle, db_file, add_fields='CLONE')
+        
+        # Defined failed cloning output handle
+        if out_args['failed']:
+            fail_handle = getOutputHandle(db_file,
+                                          out_label='clone-fail', 
+                                          out_dir=out_args['out_dir'], 
+                                          out_name=out_args['out_name'], 
+                                          out_type=out_type)
+            fail_writer = getDbWriter(fail_handle, db_file)
+        else:
+            fail_handle = None
+            fail_writer = None
+
+        # Open log file
+        if out_args['log_file'] is None:
+            log_handle = None
+        else:
+            log_handle = open(out_args['log_file'], 'w')
+    except:
+        alive.value = False
+        raise
+    
+    try:
+        # Iterator over results queue until sentinel object reached
+        start_time = time()
+        row_count = rec_count = 0
+        while alive.value:
+            # Get result from queue
+            if result_queue.empty():  continue
+            else:  result = result_queue.get()
+            # Exit upon reaching sentinel
+            if result is None:  break
+
+            # Print progress for previous iteration
+            if row_count == 0:
+                print('PROGRESS> Assigning clones')
+            printProgress(row_count, result_count, 0.05, start_time)
+            
+            # Update counts for iteration
+            row_count += 1
+            rec_count += len(result)
+            
+            # Add result row to distance matrix
+            if result:
+                dist_mat[list(range(result_count-len(result),result_count)),result_count-len(result)] = result.results
+                
+        else:
+            sys.stderr.write('PID %s:  Error in sibling process detected. Cleaning up.\n' \
+                             % os.getpid())
+            return None    
+        
+        # Calculate linkage and carry out clustering
+        # print dist_mat
+        clusters = cluster_func(dist_mat, **cluster_args) if dist_mat is not None else None
+        clones = {}
+        # print clusters
+        for i, c in enumerate(clusters):
+            clones.setdefault(c, []).append(records[list(records.keys())[i]])
+        
+        # Write passed and failed records
+        clone_count = pass_count = fail_count = 0
+        if clones:
+            for clone in clones.values():
+                clone_count += 1
+                for i, rec in enumerate(clone):
+                    rec.annotations['CLONE'] = clone_count
+                    pass_writer.writerow(rec.toDict())
+                    pass_count += 1
+                    #result.log['CLONE%i-%i' % (clone_count, i + 1)] = str(rec.junction)
+
+        else:
+            for i, rec in enumerate(result.data):
+                fail_writer.writerow(rec.toDict())
+                fail_count += 1
+                #result.log['CLONE0-%i' % (i + 1)] = str(rec.junction)
+        
+        # Print final progress
+        printProgress(row_count, result_count, 0.05, start_time)
+    
+        # Close file handles
+        pass_handle.close()
+        if fail_handle is not None:  fail_handle.close()
+        if log_handle is not None:  log_handle.close()
+                
+        # Update return list
+        log = OrderedDict()
+        log['OUTPUT'] = os.path.basename(pass_handle.name)
+        log['CLONES'] = clone_count
+        log['RECORDS'] = rec_count
+        log['PASS'] = pass_count
+        log['FAIL'] = fail_count
+        collect_dict = {'log':log, 'out_files': [pass_handle.name]}
+        collect_queue.put(collect_dict)
+    except:
+        alive.value = False
+        raise
+    
+    return None
+
+
+def defineClones(db_file, feed_func, work_func, collect_func, clone_func, cluster_func=None,
+                 group_func=None, group_args={}, clone_args={}, cluster_args={}, 
+                 out_args=default_out_args, nproc=None, queue_size=None):
+    """
+    Define clonally related sequences
+    
+    Arguments:
+    db_file = filename of input database
+    feed_func = the function that feeds the queue
+    work_func = the worker function that will run on each CPU
+    collect_func = the function that collects results from the workers
+    group_func = the function to use for assigning preclones
+    clone_func = the function to use for determining clones within preclonal groups
+    group_args = a dictionary of arguments to pass to group_func
+    clone_args = a dictionary of arguments to pass to clone_func
+    out_args = common output argument dictionary from parseCommonArgs
+    nproc = the number of processQueue processes;
+            if None defaults to the number of CPUs
+    queue_size = maximum size of the argument queue;
+                 if None defaults to 2*nproc    
+    
+    Returns:
+    a list of successful output file names
+    """
+    # Print parameter info
+    log = OrderedDict()
+    log['START'] = 'DefineClones'
+    log['DB_FILE'] = os.path.basename(db_file)
+    if group_func is not None:
+        log['GROUP_FUNC'] = group_func.__name__
+        log['GROUP_ARGS'] = group_args
+    log['CLONE_FUNC'] = clone_func.__name__
+
+    # TODO:  this is yucky, but can be fixed by using a model class
+    clone_log = clone_args.copy()
+    if 'dist_mat' in clone_log:  del clone_log['dist_mat']
+    log['CLONE_ARGS'] = clone_log
+
+    if cluster_func is not None:
+        log['CLUSTER_FUNC'] = cluster_func.__name__
+        log['CLUSTER_ARGS'] = cluster_args
+    log['NPROC'] = nproc
+    printLog(log)
+    
+    # Define feeder function and arguments
+    feed_args = {'db_file': db_file,
+                 'group_func': group_func, 
+                 'group_args': group_args}
+    # Define worker function and arguments
+    work_args = {'clone_func': clone_func, 
+                 'clone_args': clone_args}
+    # Define collector function and arguments
+    collect_args = {'db_file': db_file,
+                    'out_args': out_args,
+                    'cluster_func': cluster_func,
+                    'cluster_args': cluster_args}
+    
+    # Call process manager
+    result = manageProcesses(feed_func, work_func, collect_func, 
+                             feed_args, work_args, collect_args, 
+                             nproc, queue_size)
+        
+    # Print log
+    result['log']['END'] = 'DefineClones'
+    printLog(result['log'])
+    
+    return result['out_files']
+
+
+def getArgParser():
+    """
+    Defines the ArgumentParser
+
+    Arguments: 
+    None
+                      
+    Returns: 
+    an ArgumentParser object
+    """
+    # Define input and output fields
+    fields = dedent(
+             '''
+             output files:
+                 clone-pass
+                     database with assigned clonal group numbers.
+                 clone-fail
+                     database with records failing clonal grouping.
+
+             required fields:
+                 SEQUENCE_ID, V_CALL or V_CALL_GENOTYPED, D_CALL, J_CALL, JUNCTION_LENGTH
+
+                 <field>
+                     sequence field specified by the --sf parameter
+                
+             output fields:
+                 CLONE
+              ''')
+
+    # Define ArgumentParser
+    parser = ArgumentParser(description=__doc__, epilog=fields,
+                            formatter_class=CommonHelpFormatter)
+    parser.add_argument('--version', action='version',
+                        version='%(prog)s:' + ' %s-%s' %(__version__, __date__))
+    subparsers = parser.add_subparsers(title='subcommands', dest='command', metavar='',
+                                       help='Cloning method')
+    # TODO:  This is a temporary fix for Python issue 9253
+    subparsers.required = True
+    
+    # Parent parser    
+    parser_parent = getCommonArgParser(seq_in=False, seq_out=False, db_in=True, 
+                                       multiproc=True)
+    
+    # Distance cloning method
+    parser_bygroup = subparsers.add_parser('bygroup', parents=[parser_parent],
+                                        formatter_class=CommonHelpFormatter,
+                                        help='''Defines clones as having same V assignment,
+                                              J assignment, and junction length with
+                                              specified substitution distance model.''')
+    parser_bygroup.add_argument('-f', nargs='+', action='store', dest='fields', default=None,
+                             help='Additional fields to use for grouping clones (non VDJ)')
+    parser_bygroup.add_argument('--mode', action='store', dest='mode', 
+                             choices=('allele', 'gene'), default='gene', 
+                             help='''Specifies whether to use the V(D)J allele or gene for
+                                  initial grouping.''')
+    parser_bygroup.add_argument('--act', action='store', dest='action', default='set',
+                             choices=('first', 'set'),
+                             help='''Specifies how to handle multiple V(D)J assignments
+                                  for initial grouping.''')
+    parser_bygroup.add_argument('--model', action='store', dest='model', 
+                             choices=('aa', 'ham', 'm1n', 'hs1f', 'hs5f'),
+                             default=default_bygroup_model,
+                             help='''Specifies which substitution model to use for
+                                  calculating distance between sequences. Where m1n is the
+                                  mouse single nucleotide transition/trasversion model
+                                  of Smith et al, 1996; hs1f is the human single
+                                  nucleotide model derived from Yaari et al, 2013; hs5f
+                                  is the human S5F model of Yaari et al, 2013; ham is
+                                  nucleotide Hamming distance; and aa is amino acid
+                                  Hamming distance. The hs5f data should be
+                                  considered experimental.''')
+    parser_bygroup.add_argument('--dist', action='store', dest='distance', type=float, 
+                             default=default_distance,
+                             help='The distance threshold for clonal grouping')
+    parser_bygroup.add_argument('--norm', action='store', dest='norm',
+                             choices=('len', 'mut', 'none'), default=default_norm,
+                             help='''Specifies how to normalize distances. One of none
+                                  (do not normalize), len (normalize by length),
+                                  or mut (normalize by number of mutations between sequences).''')
+    parser_bygroup.add_argument('--sym', action='store', dest='sym',
+                             choices=('avg', 'min'), default=default_sym,
+                             help='''Specifies how to combine asymmetric distances. One of avg
+                                  (average of A->B and B->A) or min (minimum of A->B and B->A).''')
+    parser_bygroup.add_argument('--link', action='store', dest='linkage',
+                             choices=('single', 'average', 'complete'), default=default_linkage,
+                             help='''Type of linkage to use for hierarchical clustering.''')
+    parser_bygroup.add_argument('--sf', action='store', dest='seq_field',
+                                default=default_seq_field,
+                                help='''The name of the field to be used to calculate
+                                     distance between records''')
+    parser_bygroup.set_defaults(feed_func=feedQueue)
+    parser_bygroup.set_defaults(work_func=processQueue)
+    parser_bygroup.set_defaults(collect_func=collectQueue)  
+    parser_bygroup.set_defaults(group_func=indexJunctions)  
+    parser_bygroup.set_defaults(clone_func=distanceClones)
+    
+    
+    # Hierarchical clustering cloning method
+    parser_hclust = subparsers.add_parser('hclust', parents=[parser_parent],
+                                        formatter_class=CommonHelpFormatter,
+                                        help='Defines clones by specified distance metric on CDR3s and \
+                                              cutting of hierarchical clustering tree')
+#     parser_hclust.add_argument('-f', nargs='+', action='store', dest='fields', default=None,
+#                              help='Fields to use for grouping clones (non VDJ)')
+    parser_hclust.add_argument('--method', action='store', dest='method', 
+                             choices=('chen2010', 'ademokun2011'), default=default_hclust_model, 
+                             help='Specifies which cloning method to use for calculating distance \
+                                   between CDR3s, computing linkage, and cutting clusters')
+    parser_hclust.set_defaults(feed_func=feedQueueClust)
+    parser_hclust.set_defaults(work_func=processQueueClust)
+    parser_hclust.set_defaults(collect_func=collectQueueClust)
+    parser_hclust.set_defaults(cluster_func=hierClust)
+        
+    return parser
+
+
+if __name__ == '__main__':
+    """
+    Parses command line arguments and calls main function
+    """
+    # Parse arguments
+    parser = getArgParser()
+    args = parser.parse_args()
+    args_dict = parseCommonArgs(args)
+    # Convert case of fields
+    if 'seq_field' in args_dict:
+        args_dict['seq_field'] = args_dict['seq_field'].upper()
+    if 'fields' in args_dict and args_dict['fields'] is not None:  
+        args_dict['fields'] = [f.upper() for f in args_dict['fields']]
+    
+    # Define clone_args
+    if args.command == 'bygroup':
+        args_dict['group_args'] = {'fields': args_dict['fields'],
+                                   'action': args_dict['action'], 
+                                   'mode':args_dict['mode']}
+        args_dict['clone_args'] = {'model':  args_dict['model'],
+                                   'distance':  args_dict['distance'],
+                                   'norm': args_dict['norm'],
+                                   'sym': args_dict['sym'],
+                                   'linkage': args_dict['linkage'],
+                                   'seq_field': args_dict['seq_field']}
+
+        # TODO:  can be cleaned up with abstract model class
+        args_dict['clone_args']['dist_mat'] = getModelMatrix(args_dict['model'])
+
+        del args_dict['fields']
+        del args_dict['action']
+        del args_dict['mode']
+        del args_dict['model']
+        del args_dict['distance']
+        del args_dict['norm']
+        del args_dict['sym']
+        del args_dict['linkage']
+        del args_dict['seq_field']
+
+    # Define clone_args
+    if args.command == 'hclust':
+        dist_funcs = {'chen2010':distChen2010, 'ademokun2011':distAdemokun2011}
+        args_dict['clone_func'] = dist_funcs[args_dict['method']]
+        args_dict['cluster_args'] = {'method':  args_dict['method']}
+        #del args_dict['fields']
+        del args_dict['method']
+    
+    # Call defineClones
+    del args_dict['command']
+    del args_dict['db_files']
+    for f in args.__dict__['db_files']:
+        args_dict['db_file'] = f
+        defineClones(**args_dict)
\ No newline at end of file