view patrist.py @ 0:f8847f5a5491 draft default tip

"planemo upload for repository https://github.com/phac-nml/patrist commit f64cb2a8399f83d8c025c8efdc3c3eec72922a7d"
author nml
date Tue, 17 Dec 2019 09:53:52 -0500
parents
children
line wrap: on
line source

import argparse
import os
import sys

from Bio import Phylo


def walk_up(tips, curnode, pathlen, cutoff):
    """
    Recursive function for traversing up a tree.
    """
    pathlen += curnode.branch_length
    if pathlen < cutoff:
        if curnode.is_terminal():
            tips.append((curnode.name, pathlen))
        else:
            for c in curnode.clades:
                tips = walk_up(tips, c, pathlen, cutoff)
    return tips


def walk_trunk(curnode, cutoff, parents):
    """
    Find all tips in the tree that are within a threshold distance
    of a reference tip.
    """
    # first go down to parent and up other branch
    tips = []
    pathlen = curnode.branch_length  # 0.0184788
    p = parents[curnode]

    for c in p.clades:
        if c == curnode:
            continue
        if c.is_terminal():
            if pathlen + c.branch_length < cutoff:
                tips.append((c.name, pathlen+c.branch_length))
        else:
            tips.extend(walk_up([], c, pathlen, cutoff))

    # next walk down trunk until path length exceeds cutoff or hit root
    while p in parents:
        curnode = p
        pathlen += p.branch_length  # + 0.0104047
        p = parents[curnode]
        if pathlen >= cutoff:
            break
        for c in p.clades:
            if c == curnode:
                continue
            if c.is_terminal():
                if pathlen + c.branch_length < cutoff:  # + 0.0503079
                    tips.append((c.name, pathlen+c.branch_length))
            else:
                tips.extend(walk_up([], c, pathlen, cutoff))
    return tips


def find_short_edges(tree, cutoff, keep_ties=True,
                     minimize=False, returnlist=False):
    """
    Find the shortest edge from the earliest sequence of a patient to a
    any sequence from any other patient.
    minimize = keep only edge from earliest seq to the closest other seq
    keep_ties = [to be used in conjunction with minimize]
                report all edges with the same minimum distance
    """

    # generate dictionary of child->parent associations
    parents = {}
    for clade in tree.find_clades(order='level'):
        for child in clade:
            parents.update({child: clade})

    tips = tree.get_terminals()
    res = {}
    for tip1 in tips:
        # find the shortest distance in sequences that "cluster" with this one
        min_dist = 99999.
        tip2 = []
        for tipname, dist in walk_trunk(tip1, cutoff, parents):
            if minimize and dist < min_dist:
                min_dist = dist
                tip2 = [[tipname, dist]]
            else:
                tip2.append([tipname, dist])

        t1 = tip1.name
        for t2, dist in tip2:
            # sort tip names in lexico order
            key = (t1, t2) if t1 < t2 else (t2, t1)
            if key in res:
                continue
            res.update({key: dist})
            if minimize and keep_ties:
                # output only one edge
                break

    if returnlist:
        reslist = []
        for key, dist in res.iteritems():
            reslist.append((key[0], key[1], dist))
        return reslist

    return res


def main():
    parser = argparse.ArgumentParser(
        description='Generate clusters of tips from a tree that have'
        ' a path length within a maximum distance of each other.'
    )
    parser.add_argument('tree', help='<input> file '
                                     'containing Newick tree string.')
    parser.add_argument('cutoff', type=float, help='Maximum '
                                                   'patristic distance.')
    parser.add_argument('outfile', default=None, help='<output> file to '
                                                      'write results '
                                                      'in CSV format.')
    parser.add_argument('--minimize', help='Report no more than '
                                           'one nearest neighbour per tip.',
                                           action='store_true')
    parser.add_argument('--keep_ties', help='If more than one '
                        'tip has the same patristic distance, '
                        'report all as nearest neighbours.',
                        action='store_true')
    parser.add_argument('--overwrite', help='Overwrite existing '
                                            'output file.',
                                            action='store_true')
    args = parser.parse_args()

    assert args.cutoff > 0, 'Cutoff %f must be greater than 0.' % (args.cutoff)

    if os.path.exists(args.outfile) and not args.overwrite:
        print ('Output file', args.outfile, 'already exists, use --overwrite.')
        sys.exit()

    outfile = open(args.outfile, 'w')
    outfile.write('tree,tip1,tip2,dist,is.tie\n')

    trees = Phylo.parse(args.tree, 'newick')
    for treenum, tree in enumerate(trees):
        results = find_short_edges(tree, args.cutoff)
        for key, dist in results.items():
            outfile.write('%d,%s,%s,%f\n' % (treenum, key[0], key[1], dist))

    outfile.close()


if __name__ == "__main__":
    main()