Mercurial > repos > devteam > mutate_snp_codon
comparison mutate_snp_codon.py @ 0:8f0af7251167 draft default tip
Uploaded tool tarball.
| author | devteam | 
|---|---|
| date | Wed, 25 Sep 2013 11:27:51 -0400 | 
| parents | |
| children | 
   comparison
  equal
  deleted
  inserted
  replaced
| -1:000000000000 | 0:8f0af7251167 | 
|---|---|
| 1 #!/usr/bin/env python | |
| 2 """ | |
| 3 Script to mutate SNP codons. | |
| 4 Dan Blankenberg | |
| 5 """ | |
| 6 | |
| 7 import sys, string | |
| 8 | |
| 9 def strandify( fields, column ): | |
| 10 strand = '+' | |
| 11 if column >= 0 and column < len( fields ): | |
| 12 strand = fields[ column ] | |
| 13 if strand not in [ '+', '-' ]: | |
| 14 strand = '+' | |
| 15 return strand | |
| 16 | |
| 17 def main(): | |
| 18 # parse command line | |
| 19 input_file = sys.argv[1] | |
| 20 out = open( sys.argv[2], 'wb+' ) | |
| 21 codon_chrom_col = int( sys.argv[3] ) - 1 | |
| 22 codon_start_col = int( sys.argv[4] ) - 1 | |
| 23 codon_end_col = int( sys.argv[5] ) - 1 | |
| 24 codon_strand_col = int( sys.argv[6] ) - 1 | |
| 25 codon_seq_col = int( sys.argv[7] ) - 1 | |
| 26 | |
| 27 snp_chrom_col = int( sys.argv[8] ) - 1 | |
| 28 snp_start_col = int( sys.argv[9] ) - 1 | |
| 29 snp_end_col = int( sys.argv[10] ) - 1 | |
| 30 snp_strand_col = int( sys.argv[11] ) - 1 | |
| 31 snp_observed_col = int( sys.argv[12] ) - 1 | |
| 32 | |
| 33 max_field_index = max( codon_chrom_col, codon_start_col, codon_end_col, codon_strand_col, codon_seq_col, snp_chrom_col, snp_start_col, snp_end_col, snp_strand_col, snp_observed_col ) | |
| 34 | |
| 35 DNA_COMP = string.maketrans( "ACGTacgt", "TGCAtgca" ) | |
| 36 skipped_lines = 0 | |
| 37 errors = {} | |
| 38 for name, message in [ ('max_field_index','not enough fields'), ( 'codon_len', 'codon length must be 3' ), ( 'codon_seq', 'codon sequence must have length 3' ), ( 'snp_len', 'SNP length must be 3' ), ( 'snp_observed', 'SNP observed values must have length 3' ), ( 'empty_comment', 'empty or comment'), ( 'no_overlap', 'codon and SNP do not overlap' ) ]: | |
| 39 errors[ name ] = { 'count':0, 'message':message } | |
| 40 line_count = 0 | |
| 41 for line_count, line in enumerate( open( input_file ) ): | |
| 42 line = line.rstrip( '\n\r' ) | |
| 43 if line and not line.startswith( '#' ): | |
| 44 fields = line.split( '\t' ) | |
| 45 if max_field_index >= len( fields ): | |
| 46 skipped_lines += 1 | |
| 47 errors[ 'max_field_index' ]['count'] += 1 | |
| 48 continue | |
| 49 | |
| 50 #read codon info | |
| 51 codon_chrom = fields[codon_chrom_col] | |
| 52 codon_start = int( fields[codon_start_col] ) | |
| 53 codon_end = int( fields[codon_end_col] ) | |
| 54 if codon_end - codon_start != 3: | |
| 55 #codons must be length 3 | |
| 56 skipped_lines += 1 | |
| 57 errors[ 'codon_len' ]['count'] += 1 | |
| 58 continue | |
| 59 codon_strand = strandify( fields, codon_strand_col ) | |
| 60 codon_seq = fields[codon_seq_col].upper() | |
| 61 if len( codon_seq ) != 3: | |
| 62 #codon sequence must have length 3 | |
| 63 skipped_lines += 1 | |
| 64 errors[ 'codon_seq' ]['count'] += 1 | |
| 65 continue | |
| 66 | |
| 67 #read snp info | |
| 68 snp_chrom = fields[snp_chrom_col] | |
| 69 snp_start = int( fields[snp_start_col] ) | |
| 70 snp_end = int( fields[snp_end_col] ) | |
| 71 if snp_end - snp_start != 1: | |
| 72 #snps must be length 1 | |
| 73 skipped_lines += 1 | |
| 74 errors[ 'snp_len' ]['count'] += 1 | |
| 75 continue | |
| 76 snp_strand = strandify( fields, snp_strand_col ) | |
| 77 snp_observed = fields[snp_observed_col].split( '/' ) | |
| 78 snp_observed = [ observed for observed in snp_observed if len( observed ) == 1 ] | |
| 79 if not snp_observed: | |
| 80 #sequence replacements must be length 1 | |
| 81 skipped_lines += 1 | |
| 82 errors[ 'snp_observed' ]['count'] += 1 | |
| 83 continue | |
| 84 | |
| 85 #Determine index of replacement for observed values into codon | |
| 86 offset = snp_start - codon_start | |
| 87 #Extract DNA on neg strand codons will have positions reversed relative to interval positions; i.e. position 0 == position 2 | |
| 88 if codon_strand == '-': | |
| 89 offset = 2 - offset | |
| 90 if offset < 0 or offset > 2: #assert offset >= 0 and offset <= 2, ValueError( 'Impossible offset determined: %s' % offset ) | |
| 91 #codon and snp do not overlap | |
| 92 skipped_lines += 1 | |
| 93 errors[ 'no_overlap' ]['count'] += 1 | |
| 94 continue | |
| 95 | |
| 96 for observed in snp_observed: | |
| 97 if codon_strand != snp_strand: | |
| 98 #if our SNP is on a different strand than our codon, take complement of provided observed SNP base | |
| 99 observed = observed.translate( DNA_COMP ) | |
| 100 snp_codon = [ char for char in codon_seq ] | |
| 101 snp_codon[offset] = observed.upper() | |
| 102 snp_codon = ''.join( snp_codon ) | |
| 103 | |
| 104 if codon_seq != snp_codon: #only output when we actually have a different codon | |
| 105 out.write( "%s\t%s\n" % ( line, snp_codon ) ) | |
| 106 else: | |
| 107 skipped_lines += 1 | |
| 108 errors[ 'empty_comment' ]['count'] += 1 | |
| 109 if skipped_lines: | |
| 110 print "Skipped %i (%4.2f%%) of %i lines; reasons: %s" % ( skipped_lines, ( float( skipped_lines )/float( line_count ) ) * 100, line_count, ', '.join( [ "%s (%i)" % ( error['message'], error['count'] ) for error in errors.itervalues() if error['count'] ] ) ) | |
| 111 | |
| 112 if __name__ == "__main__": main() | 
