diff tests/compute-site.py @ 5:31361191d2d2

Uploaded tarball. Version 1.1: Stranded output, slightly different handling of minor allele ties and 0 coverage sites, revised help text, added test datasets.
author nick
date Thu, 12 Sep 2013 11:34:23 -0400
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/tests/compute-site.py	Thu Sep 12 11:34:23 2013 -0400
@@ -0,0 +1,161 @@
+import os
+import sys
+from optparse import OptionParser
+OPT_DEFAULTS = {'freq_thres':0, 'covg_thres':0}
+BASES = ['A', 'C', 'G', 'T']
+USAGE = ("Usage: %prog (options) 'variant_str' ('variant_str2' (etc))\n"
+    +"       cat variants.txt > %prog (options)")
+def main():
+  parser = OptionParser(usage=USAGE)
+  parser.add_option('-f', '--freq-thres', dest='freq_thres', type='float',
+    default=OPT_DEFAULTS.get('freq_thres'),
+    help=('Frequency threshold for counting alleles, given in percentage: -f 1 '
+      +'= 1% frequency. Default is %default%.'))
+  parser.add_option('-c', '--covg-thres', dest='covg_thres', type='int',
+    default=OPT_DEFAULTS.get('covg_thres'),
+    help=('Coverage threshold. Each site must be supported by at least this '
+      +'many reads on each strand. Otherwise the site will not be printed in '
+      +'the output. The default is %default reads per strand.'))
+  (options, args) = parser.parse_args()
+  freq_thres = options.freq_thres
+  covg_thres = options.covg_thres
+  if len(sys.argv) > 1 and '-h' in sys.argv[1][0:3]:
+    script_name = os.path.basename(sys.argv[0])
+    print """USAGE:
+  $ """+script_name+""" [sample column text]
+  $ """+script_name+""" '+A=10,+G=2,-A=20,-G=41,'
+  $ """+script_name+""" '0/1:10,1:0.25,0.025:-A=29,-AG=1,-G=10,'
+  $ """+script_name+""" '+A=10,+G=2,-A=20,-G=41,' '0/1:10,1:0.25,0.025:-A=29,-AG=1,-G=10,'
+Or invoke with no arguments to use interactively. It will read from stdin, so
+just paste one sample per line."""
+    sys.exit(0)
+  if len(args) > 0:
+    stdin = False
+    samples = args
+  else:
+    stdin = True
+    samples = sys.stdin
+    print "Reading from standard input.."
+  for sample in samples:
+    print ''
+    sample = sample.split(':')[-1]
+    if not sample:
+      continue
+    print sample
+    counts_dict = parse_counts(sample)
+    compute_stats(counts_dict, freq_thres, covg_thres)
+def parse_counts(sample_str):
+  counts_dict = {}
+  counts = sample_str.split(',')
+  for count in counts:
+    if '=' not in count:
+      continue
+    (var, reads) = count.split('=')
+    if var[1:] in BASES:
+      counts_dict[var] = int(reads)
+  return counts_dict
+def compute_stats(counts_dict, freq_thres, covg_thres):
+  # totals for A, C, G, T
+  counts_unstranded = {}
+  for base in BASES:
+    counts_unstranded[base] = 0
+    for strand in '+-':
+      counts_unstranded[base] += counts_dict.get(strand+base, 0)
+  print '+- '+str(counts_unstranded)
+  # get counts for each strand
+  plus_counts  = get_stranded_counts(counts_dict, '+')
+  minus_counts = get_stranded_counts(counts_dict, '-')
+  counts_lists = {'+':plus_counts, '-':minus_counts}
+  print ' + '+str(plus_counts)
+  print ' - '+str(minus_counts)
+  # stranded coverage threshold
+  coverages = {}
+  failing_strands = {}
+  for strand in '+-':
+    coverages[strand] = 0
+    for count in counts_lists[strand].values():
+      coverages[strand] += count
+    if coverages[strand] < covg_thres:
+      failing_strands[strand] = coverages[strand]
+    sys.stdout.write(strand+'coverage: '+str(coverages[strand])+"\t")
+  coverages['+-'] = coverages['+'] + coverages['-']
+  sys.stdout.write("+-coverage: "+str(coverages['+-'])+"\n")
+  if failing_strands:
+    for strand in failing_strands:
+      print ('coverage on '+strand+' strand too low ('
+        +str(failing_strands[strand])+' < '+str(covg_thres)+")")
+    return
+  # apply frequency threshold
+  for strand in counts_lists:
+    strand_counts = counts_lists[strand]
+    for variant in strand_counts.keys():
+      # print (variant+" freq: "+str(strand_counts[variant])+"/"
+      #   +str(coverages[strand])+" = "
+      #   +str(strand_counts[variant]/float(coverages[strand])))
+      if strand_counts[variant]/float(coverages[strand]) < freq_thres:
+        strand_counts.pop(variant)
+  plus_variants  = sorted(plus_counts.keys())
+  minus_variants = sorted(minus_counts.keys())
+  if plus_variants == minus_variants:
+    strand_bias = False
+    print "no strand bias: +"+str(plus_variants)+" == -"+str(minus_variants)
+    sys.stdout.write("alleles: "+str(len(plus_variants))+"\t")
+  else:
+    strand_bias = True
+    print "   strand bias: +"+str(plus_variants)+" != -"+str(minus_variants)
+    sys.stdout.write("alleles: 0\t")
+  variants_sorted = sort_variants(counts_unstranded)
+  if len(variants_sorted) >= 1:
+    sys.stdout.write("major: "+variants_sorted[0]+"\t")
+  minor = '.'
+  if len(variants_sorted) == 2:
+    minor = variants_sorted[1]
+  elif len(variants_sorted) > 2:
+    if (counts_unstranded.get(variants_sorted[1]) ==
+        counts_unstranded.get(variants_sorted[2])):
+      minor = 'N'
+    else:
+      minor = variants_sorted[1]
+  sys.stdout.write("minor: "+minor+"\tfreq: ")
+  print round(float(counts_unstranded.get(minor, 0))/coverages['+-'], 5)
+def get_stranded_counts(unstranded_counts, strand):
+  stranded_counts = {}
+  for variant in unstranded_counts:
+    if variant[0] == strand:
+      stranded_counts[variant[1:]] = unstranded_counts[variant]
+  return stranded_counts
+def sort_variants(variant_counts):
+  """Sort the list of variants based on their counts. Returns a list of just
+  the variants, no counts."""
+  variants = variant_counts.keys()
+  var_del = []
+  for variant in variants:
+    if variant_counts.get(variant) == 0:
+      var_del.append(variant)
+  for variant in var_del:
+    variants.remove(variant)
+  variants.sort(reverse=True, key=lambda variant: variant_counts.get(variant,0))
+  return variants
+if __name__ == "__main__":
+  main()