diff pedify.py @ 0:64e75e21466e draft default tip

Uploaded
author pmac
date Wed, 01 Jun 2016 03:38:39 -0400
parents
children
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/pedify.py	Wed Jun 01 03:38:39 2016 -0400
@@ -0,0 +1,395 @@
+import sys
+import csv
+import argparse
+
+DEBUG = 0
+
+REQ_KEYS = ['genotype_column', 'reference_column', 'alternate_column', 
+			'sample_id_column', 'chromosome_column', 'position_column',
+			'variant_id_column']
+
+GENOTYPE_DICT = {
+	"'0/0": "hom_ref",
+	"'0/1": "het",
+	"'1/1": "hom_alt",
+	"'1/2": "tri_allelic"
+}
+
+GENOTYPE_TO_NUMERIC = {
+	"'0/0": 0,
+	"'0/1": 1,
+	"'1/1": 2,
+	"'1/2": 2
+}
+
+class PedConverter:
+	def __init__(self):
+		self.cfg = None
+		self.samples = {}
+		self.sites = {}
+		self.xsamples = []
+
+	def verify_column_names(self, datafile_header):
+		# check all the config column names actually exist in the data file
+		for col in self.cfg.col_names.values():
+			if col not in datafile_header:
+				print "The '{}' column was not found in the datafile! Double check your config file is correct. Exiting...".format(
+					col)
+				sys.exit(1)
+
+	def verify_filters(self, datafile_header):
+		# print warning messages if filters invalid
+		all_filters = self.cfg.nfilters.copy()
+		all_filters.update(self.cfg.sfilters)
+		for key in all_filters:
+			col_name = all_filters[key]["col_name"]
+			if col_name not in datafile_header:
+				print "Warning! The '{}' filter was not applied as the datafile does not contain the column '{}'".format(
+					key, col_name)
+
+	def read_config_file(self, cfname):
+		self.cfg = ConfigSettings()
+		rc = self.cfg.parse_config_file(cfname)
+		return rc
+
+	def read_data_file(self, dfname):
+		if (self.cfg == None) or (not self.cfg.is_valid()):
+			return 1
+
+		datafile = open(dfname, 'r')
+		dreader = csv.DictReader(datafile, delimiter='\t')
+		# verify datafile data matches config file
+		self.verify_column_names(dreader.fieldnames)
+		self.verify_filters(dreader.fieldnames)
+		all_sample_ids = set()
+		i = 0
+
+		for row in dreader:
+			failed_filters = self.filter_all(row)
+			sample_key = row[self.cfg.col_names['sample_id_column']]
+			all_sample_ids.add(sample_key)
+			if not failed_filters:
+				# add to sample dict
+				# key is a tuple made up of which chromosome the snp is found on
+				# and the position on the chromosome itself
+				SNP_key = (row[self.cfg.col_names['chromosome_column']], int(row[self.cfg.col_names['position_column']]))
+				genotype = row[self.cfg.col_names['genotype_column']]
+
+				# create a dictionary for each sample (person); each person is associated
+				# with another dictionary of all the SNPs found in that person
+				if sample_key not in self.samples:
+					self.samples[sample_key] = {SNP_key: genotype} 
+				else:
+					self.samples[sample_key][SNP_key] = genotype
+
+				# create a dict of all the sites where SNPs exist
+				if SNP_key not in self.sites:
+					# generate arbitrary ID's if there is no pre-existing ID for the SNP
+					if row[self.cfg.col_names['variant_id_column']] == '.':
+						SNP_id = "SNP_" + str(i)
+					else:
+						SNP_id = row[self.cfg.col_names['variant_id_column']]
+
+					SNP_data = {'ref_col': row[self.cfg.col_names['reference_column']], 
+								'alt_col': row[self.cfg.col_names['alternate_column']], 
+								'SNP_id': SNP_id}
+					self.sites[SNP_key] = SNP_data
+			i += 1
+		
+		# make sure every sample contains a genotype for every SNP found
+		for sample_key in self.samples:
+			this_sample = self.samples[sample_key]
+			for SNP_key in self.sites:
+				if SNP_key not in this_sample:
+					this_sample[SNP_key] = "'0/0"
+		datafile.close()
+
+		# get list of samples which were filtered out
+		self.xsamples = list(all_sample_ids.difference(set(self.samples.keys())))
+		return 0
+
+	# returns key of the specific filter/s that failed
+	def filter_numeric(self, row):
+		failed_filters = set()
+		for key in self.cfg.nfilters.keys():
+			nfilter = self.cfg.nfilters[key]
+			cutoff = float(nfilter["cutoff"])
+			op = nfilter["op"]
+			col_name = nfilter["col_name"]
+			if col_name in row.keys():
+				cv = float(row[col_name])
+				if not string_as_operator(cv, cutoff, op):
+					failed_filters.add(key)
+
+		return failed_filters
+
+	# returns key of the specific filter/s that failed
+	def filter_string(self, row):
+		failed_filters = set()
+		for key in self.cfg.sfilters.keys():
+			sfilter = self.cfg.sfilters[key]
+			col_name = sfilter["col_name"]
+			if col_name in row.keys():
+				cs = row[col_name]
+				af = sfilter['accept_flag']
+				ef = sfilter['exact_flag']
+				patterns = sfilter['patterns']
+				if ef:
+					if af:
+						passed = False
+						for p in patterns:
+							if p == cs:
+								passed = True
+								break
+						if passed == False:
+							failed_filters.add(key)
+					else:
+						for p in patterns:
+							if p == cs:
+								failed_filters.add(key)
+								break
+				else:
+					if af:
+						passed = False
+						for p in patterns:
+							if p in cs:
+								passed = True
+								break
+						if passed == False:
+							failed_filters.add(key)
+					else:
+						for p in patterns:
+							if p in cs:
+								failed_filters.add(key)
+								break
+
+		return failed_filters
+
+	def filter_all(self, row):
+		return self.filter_numeric(row).union(self.filter_string(row))
+
+	def create_ped_file(self, filename, numeric=False):
+		output = ""
+		
+		sorted_sample_keys = sorted(self.samples.keys())
+		for sample_key in sorted_sample_keys:
+			this_sample = self.samples[sample_key]
+			sorted_site_keys = sorted(this_sample.keys())
+			variants = []
+			if numeric:
+				pef = sample_key
+			else:
+				pef = self.create_ped_entry_front(sample_key)
+
+			for SNP_key in sorted_site_keys:
+				genotype = this_sample[SNP_key]
+				if numeric == True:
+					variants.append(str(GENOTYPE_TO_NUMERIC[genotype]))
+				else:
+					variants.append(genotype_to_bases(genotype, self.sites[SNP_key]))
+			
+			output += "{}\t{}\n".format(pef, '\t'.join(variants))				
+
+		pedfile = open(filename, 'w')
+		pedfile.write(output)
+		pedfile.close()
+
+	def create_ped_entry_front(self, sample_id):
+		if self.cfg.control_info["control_tag"]["tag"] in sample_id:
+			group = 2
+		elif self.cfg.control_info["cases_tag"]["tag"] in sample_id:
+			group = 1
+		else:
+			group = 1
+
+		entry = "{0}\t{1}\t{2}\t{3}\t{4}\t{5}".format(
+					sample_id, 
+					sample_id,
+					sample_id + "_F", 
+					sample_id + "_M", 
+					2, 
+					group)
+
+		return entry
+
+	def create_map_file(self, filename):
+		output = ""
+		for SNP_key in sorted(self.sites.keys()):
+			chrom = SNP_key[0]
+			SNP_id = self.sites[SNP_key]['SNP_id']
+			posn = SNP_key[1]
+			output += "{}\t{}\t{}\n".format(chrom, SNP_id, str(posn))
+
+		mapfile = open(filename, 'w')
+		mapfile.write(output)
+		mapfile.close()
+
+	def create_excluded_samples_file(self, filename):
+		xsfile = open(filename, 'w')
+		xsfile.write('\n'.join(self.xsamples))
+		xsfile.close()
+
+class ConfigSettings:
+
+	SECTIONS = [
+		"#control",
+		"#column_names",
+		"#numeric_filters",
+		"#string_filters"
+		]
+
+	def __init__(self):
+		self.control_info = {}
+		self.col_names = {}
+		self.nfilters = {}
+		self.sfilters = {}
+
+	def parse_config_file(self, cfname):
+		cffile = open(cfname, 'r')
+		section = None
+		rc = 0
+		
+		for line in cffile:
+			# clean trailing/leading whitespace/newlines
+			line = line.strip()
+			# set section flag
+			if line[0] == '#':
+				if line in ConfigSettings.SECTIONS:
+					section = line
+				else:
+					continue
+			else:
+				# fill up config dicts
+				if section == "#control":
+					(key, col_name, tag) = line.split(',')
+					self.control_info[key] = {'col_name': col_name, 'tag': tag}
+				elif section == "#column_names":
+					(key, col_name) = line.split(',')
+					self.col_names[key] = col_name
+				elif section == "#numeric_filters":
+					(key, col_name, op, cutoff) = line.split(',')
+					self.add_numeric_filter(key, col_name, op, float(cutoff))
+				elif section == "#string_filters":
+					(key, col_name, exact_flag, accept_flag) = line.split(',')
+					patterns = next(cffile).strip().split(',')
+					self.add_string_filter(key, col_name, exact_flag, accept_flag, patterns)
+				else:
+					rc = 2
+					break
+
+		cffile.close()
+		if rc != 0:
+			return rc
+		if not self.is_valid():
+			rc = 3
+		return rc
+
+
+	def is_valid(self):
+		for k in REQ_KEYS:
+			if k not in self.col_names.keys():
+				return False
+		return True
+
+	def add_numeric_filter(self, key, col_name, op, cutoff):					
+		self.nfilters[key] = {
+			'col_name': col_name,
+			'op': op,
+			'cutoff': cutoff
+			}
+
+	def add_string_filter(self, key, col_name, exact_flag, accept_flag, patterns):
+		if exact_flag == "exact":
+			ef = True
+		elif exact_flag == "not_exact":
+			ef = False
+		else:
+			return False
+
+		if accept_flag == "accept":
+			af = True
+		elif accept_flag == "reject":
+			af = False
+		else:
+			return False
+
+		self.sfilters[key] = {
+			'col_name': col_name,
+			'exact_flag': ef,
+			'accept_flag': af,
+			'patterns': patterns
+			}
+
+	def __str__(self):
+		rv = "is Valid: {} || control info: {} || col names: {} || numeric filters: {} || string filters: {}".format(
+			self.is_valid(), self.control_info, self.col_names, self.nfilters, self.sfilters)
+		return rv 
+
+
+### Utility ###
+def string_as_operator(arg1, arg2, op):
+	if op == "==":
+		return arg1 == arg2
+	elif op == ">":
+		return arg1 > arg2
+	elif op == "<":
+		return arg1 < arg2
+	elif op == "<=":
+		return arg1 <= arg2
+	elif op == ">=":
+		return arg1 >= arg2 
+
+def genotype_to_bases(genotype, SNPdata):
+	bases = ""
+	if genotype in GENOTYPE_DICT:
+		gtype = GENOTYPE_DICT[genotype]
+		if gtype == "hom_ref":
+			bases = "{} {}".format(SNPdata['ref_col'], SNPdata['ref_col'])
+		elif gtype == "hom_alt":
+			bases = "{} {}".format(SNPdata['alt_col'], SNPdata['alt_col'])	
+		elif gtype == "het":
+			bases = "{} {}".format(SNPdata['ref_col'], SNPdata['alt_col'])	
+		elif gtype == "tri_allelic":
+			aa_col = SNPdata['alt_col']
+			if len(aa_col) > 1:
+				# arbitrarily choose the first one
+				alt_allele = aa_col[0]
+			else:
+				alt_allele = aa_col
+			bases = "{} {}".format(alt_allele, alt_allele)
+	else:
+		print genotype
+		print "Unrecognized genotype!"
+		sys.exit(1)
+	return bases
+
+### Main ###
+def main():
+	# argument parsing
+	parser = argparse.ArgumentParser()
+	parser.add_argument("dfname", help="name of input data file")
+	parser.add_argument("cfname", help="name of input configuration file")
+	parser.add_argument("pfname", help="name of output ped file")
+	parser.add_argument("mfname", help="name of output map file")
+	parser.add_argument("xsname", help="name of output file containing exact IDs of samples who were excluded")
+	args = parser.parse_args()
+
+	pc = PedConverter()
+	# read in config file
+	rc = pc.read_config_file(args.cfname)
+	if rc == 0:
+		print 'config file read successfully'
+	else:
+		print 'failed to read in config file successfully. Error code: {}'.format(rc)
+	# read in data file
+	rc = pc.read_data_file(args.dfname)
+	if rc == 0:
+		print 'data file read successfully'
+	else:
+		print 'failed to read in data file successfully. Error code: {}'.format(rc)	
+	pc.create_ped_file(args.pfname, numeric=True)
+	pc.create_map_file(args.mfname)
+	pc.create_excluded_samples_file(args.xsname)
+
+if __name__ == "__main__":
+	main()