Mercurial > repos > pmac > iterativepca
diff pedify.py @ 0:64e75e21466e draft default tip
Uploaded
author | pmac |
---|---|
date | Wed, 01 Jun 2016 03:38:39 -0400 |
parents | |
children |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/pedify.py Wed Jun 01 03:38:39 2016 -0400 @@ -0,0 +1,395 @@ +import sys +import csv +import argparse + +DEBUG = 0 + +REQ_KEYS = ['genotype_column', 'reference_column', 'alternate_column', + 'sample_id_column', 'chromosome_column', 'position_column', + 'variant_id_column'] + +GENOTYPE_DICT = { + "'0/0": "hom_ref", + "'0/1": "het", + "'1/1": "hom_alt", + "'1/2": "tri_allelic" +} + +GENOTYPE_TO_NUMERIC = { + "'0/0": 0, + "'0/1": 1, + "'1/1": 2, + "'1/2": 2 +} + +class PedConverter: + def __init__(self): + self.cfg = None + self.samples = {} + self.sites = {} + self.xsamples = [] + + def verify_column_names(self, datafile_header): + # check all the config column names actually exist in the data file + for col in self.cfg.col_names.values(): + if col not in datafile_header: + print "The '{}' column was not found in the datafile! Double check your config file is correct. Exiting...".format( + col) + sys.exit(1) + + def verify_filters(self, datafile_header): + # print warning messages if filters invalid + all_filters = self.cfg.nfilters.copy() + all_filters.update(self.cfg.sfilters) + for key in all_filters: + col_name = all_filters[key]["col_name"] + if col_name not in datafile_header: + print "Warning! The '{}' filter was not applied as the datafile does not contain the column '{}'".format( + key, col_name) + + def read_config_file(self, cfname): + self.cfg = ConfigSettings() + rc = self.cfg.parse_config_file(cfname) + return rc + + def read_data_file(self, dfname): + if (self.cfg == None) or (not self.cfg.is_valid()): + return 1 + + datafile = open(dfname, 'r') + dreader = csv.DictReader(datafile, delimiter='\t') + # verify datafile data matches config file + self.verify_column_names(dreader.fieldnames) + self.verify_filters(dreader.fieldnames) + all_sample_ids = set() + i = 0 + + for row in dreader: + failed_filters = self.filter_all(row) + sample_key = row[self.cfg.col_names['sample_id_column']] + all_sample_ids.add(sample_key) + if not failed_filters: + # add to sample dict + # key is a tuple made up of which chromosome the snp is found on + # and the position on the chromosome itself + SNP_key = (row[self.cfg.col_names['chromosome_column']], int(row[self.cfg.col_names['position_column']])) + genotype = row[self.cfg.col_names['genotype_column']] + + # create a dictionary for each sample (person); each person is associated + # with another dictionary of all the SNPs found in that person + if sample_key not in self.samples: + self.samples[sample_key] = {SNP_key: genotype} + else: + self.samples[sample_key][SNP_key] = genotype + + # create a dict of all the sites where SNPs exist + if SNP_key not in self.sites: + # generate arbitrary ID's if there is no pre-existing ID for the SNP + if row[self.cfg.col_names['variant_id_column']] == '.': + SNP_id = "SNP_" + str(i) + else: + SNP_id = row[self.cfg.col_names['variant_id_column']] + + SNP_data = {'ref_col': row[self.cfg.col_names['reference_column']], + 'alt_col': row[self.cfg.col_names['alternate_column']], + 'SNP_id': SNP_id} + self.sites[SNP_key] = SNP_data + i += 1 + + # make sure every sample contains a genotype for every SNP found + for sample_key in self.samples: + this_sample = self.samples[sample_key] + for SNP_key in self.sites: + if SNP_key not in this_sample: + this_sample[SNP_key] = "'0/0" + datafile.close() + + # get list of samples which were filtered out + self.xsamples = list(all_sample_ids.difference(set(self.samples.keys()))) + return 0 + + # returns key of the specific filter/s that failed + def filter_numeric(self, row): + failed_filters = set() + for key in self.cfg.nfilters.keys(): + nfilter = self.cfg.nfilters[key] + cutoff = float(nfilter["cutoff"]) + op = nfilter["op"] + col_name = nfilter["col_name"] + if col_name in row.keys(): + cv = float(row[col_name]) + if not string_as_operator(cv, cutoff, op): + failed_filters.add(key) + + return failed_filters + + # returns key of the specific filter/s that failed + def filter_string(self, row): + failed_filters = set() + for key in self.cfg.sfilters.keys(): + sfilter = self.cfg.sfilters[key] + col_name = sfilter["col_name"] + if col_name in row.keys(): + cs = row[col_name] + af = sfilter['accept_flag'] + ef = sfilter['exact_flag'] + patterns = sfilter['patterns'] + if ef: + if af: + passed = False + for p in patterns: + if p == cs: + passed = True + break + if passed == False: + failed_filters.add(key) + else: + for p in patterns: + if p == cs: + failed_filters.add(key) + break + else: + if af: + passed = False + for p in patterns: + if p in cs: + passed = True + break + if passed == False: + failed_filters.add(key) + else: + for p in patterns: + if p in cs: + failed_filters.add(key) + break + + return failed_filters + + def filter_all(self, row): + return self.filter_numeric(row).union(self.filter_string(row)) + + def create_ped_file(self, filename, numeric=False): + output = "" + + sorted_sample_keys = sorted(self.samples.keys()) + for sample_key in sorted_sample_keys: + this_sample = self.samples[sample_key] + sorted_site_keys = sorted(this_sample.keys()) + variants = [] + if numeric: + pef = sample_key + else: + pef = self.create_ped_entry_front(sample_key) + + for SNP_key in sorted_site_keys: + genotype = this_sample[SNP_key] + if numeric == True: + variants.append(str(GENOTYPE_TO_NUMERIC[genotype])) + else: + variants.append(genotype_to_bases(genotype, self.sites[SNP_key])) + + output += "{}\t{}\n".format(pef, '\t'.join(variants)) + + pedfile = open(filename, 'w') + pedfile.write(output) + pedfile.close() + + def create_ped_entry_front(self, sample_id): + if self.cfg.control_info["control_tag"]["tag"] in sample_id: + group = 2 + elif self.cfg.control_info["cases_tag"]["tag"] in sample_id: + group = 1 + else: + group = 1 + + entry = "{0}\t{1}\t{2}\t{3}\t{4}\t{5}".format( + sample_id, + sample_id, + sample_id + "_F", + sample_id + "_M", + 2, + group) + + return entry + + def create_map_file(self, filename): + output = "" + for SNP_key in sorted(self.sites.keys()): + chrom = SNP_key[0] + SNP_id = self.sites[SNP_key]['SNP_id'] + posn = SNP_key[1] + output += "{}\t{}\t{}\n".format(chrom, SNP_id, str(posn)) + + mapfile = open(filename, 'w') + mapfile.write(output) + mapfile.close() + + def create_excluded_samples_file(self, filename): + xsfile = open(filename, 'w') + xsfile.write('\n'.join(self.xsamples)) + xsfile.close() + +class ConfigSettings: + + SECTIONS = [ + "#control", + "#column_names", + "#numeric_filters", + "#string_filters" + ] + + def __init__(self): + self.control_info = {} + self.col_names = {} + self.nfilters = {} + self.sfilters = {} + + def parse_config_file(self, cfname): + cffile = open(cfname, 'r') + section = None + rc = 0 + + for line in cffile: + # clean trailing/leading whitespace/newlines + line = line.strip() + # set section flag + if line[0] == '#': + if line in ConfigSettings.SECTIONS: + section = line + else: + continue + else: + # fill up config dicts + if section == "#control": + (key, col_name, tag) = line.split(',') + self.control_info[key] = {'col_name': col_name, 'tag': tag} + elif section == "#column_names": + (key, col_name) = line.split(',') + self.col_names[key] = col_name + elif section == "#numeric_filters": + (key, col_name, op, cutoff) = line.split(',') + self.add_numeric_filter(key, col_name, op, float(cutoff)) + elif section == "#string_filters": + (key, col_name, exact_flag, accept_flag) = line.split(',') + patterns = next(cffile).strip().split(',') + self.add_string_filter(key, col_name, exact_flag, accept_flag, patterns) + else: + rc = 2 + break + + cffile.close() + if rc != 0: + return rc + if not self.is_valid(): + rc = 3 + return rc + + + def is_valid(self): + for k in REQ_KEYS: + if k not in self.col_names.keys(): + return False + return True + + def add_numeric_filter(self, key, col_name, op, cutoff): + self.nfilters[key] = { + 'col_name': col_name, + 'op': op, + 'cutoff': cutoff + } + + def add_string_filter(self, key, col_name, exact_flag, accept_flag, patterns): + if exact_flag == "exact": + ef = True + elif exact_flag == "not_exact": + ef = False + else: + return False + + if accept_flag == "accept": + af = True + elif accept_flag == "reject": + af = False + else: + return False + + self.sfilters[key] = { + 'col_name': col_name, + 'exact_flag': ef, + 'accept_flag': af, + 'patterns': patterns + } + + def __str__(self): + rv = "is Valid: {} || control info: {} || col names: {} || numeric filters: {} || string filters: {}".format( + self.is_valid(), self.control_info, self.col_names, self.nfilters, self.sfilters) + return rv + + +### Utility ### +def string_as_operator(arg1, arg2, op): + if op == "==": + return arg1 == arg2 + elif op == ">": + return arg1 > arg2 + elif op == "<": + return arg1 < arg2 + elif op == "<=": + return arg1 <= arg2 + elif op == ">=": + return arg1 >= arg2 + +def genotype_to_bases(genotype, SNPdata): + bases = "" + if genotype in GENOTYPE_DICT: + gtype = GENOTYPE_DICT[genotype] + if gtype == "hom_ref": + bases = "{} {}".format(SNPdata['ref_col'], SNPdata['ref_col']) + elif gtype == "hom_alt": + bases = "{} {}".format(SNPdata['alt_col'], SNPdata['alt_col']) + elif gtype == "het": + bases = "{} {}".format(SNPdata['ref_col'], SNPdata['alt_col']) + elif gtype == "tri_allelic": + aa_col = SNPdata['alt_col'] + if len(aa_col) > 1: + # arbitrarily choose the first one + alt_allele = aa_col[0] + else: + alt_allele = aa_col + bases = "{} {}".format(alt_allele, alt_allele) + else: + print genotype + print "Unrecognized genotype!" + sys.exit(1) + return bases + +### Main ### +def main(): + # argument parsing + parser = argparse.ArgumentParser() + parser.add_argument("dfname", help="name of input data file") + parser.add_argument("cfname", help="name of input configuration file") + parser.add_argument("pfname", help="name of output ped file") + parser.add_argument("mfname", help="name of output map file") + parser.add_argument("xsname", help="name of output file containing exact IDs of samples who were excluded") + args = parser.parse_args() + + pc = PedConverter() + # read in config file + rc = pc.read_config_file(args.cfname) + if rc == 0: + print 'config file read successfully' + else: + print 'failed to read in config file successfully. Error code: {}'.format(rc) + # read in data file + rc = pc.read_data_file(args.dfname) + if rc == 0: + print 'data file read successfully' + else: + print 'failed to read in data file successfully. Error code: {}'.format(rc) + pc.create_ped_file(args.pfname, numeric=True) + pc.create_map_file(args.mfname) + pc.create_excluded_samples_file(args.xsname) + +if __name__ == "__main__": + main()