Repository 'ppp_vcfphase'
hg clone https://toolshed.g2.bx.psu.edu/repos/jaredgk/ppp_vcfphase

Changeset 3:d1e3db7f6521 (2018-10-17)
Previous changeset 2:54c84f7dcb2c (2018-10-17) Next changeset 4:901857c9b24f (2018-10-17)
Commit message:
Uploaded
added:
vcftools.py
b
diff -r 54c84f7dcb2c -r d1e3db7f6521 vcftools.py
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/vcftools.py Wed Oct 17 17:28:38 2018 -0400
[
b"@@ -0,0 +1,775 @@\n+import os\n+import sys\n+import logging\n+import subprocess\n+\n+sys.path.insert(0, os.path.abspath(os.path.join(os.pardir,'jared')))\n+\n+from vcf_reader_func import checkFormat\n+from bcftools import check_bcftools_for_errors\n+\n+def check_bgzip_for_errors (bgzip_stderr):\n+    '''\n+        Checks the bgzip stderr for errors\n+\n+        Parameters\n+        ----------\n+        bgzip_stderr : str\n+            bgzip stderr\n+\n+        Raises\n+        ------\n+        IOError\n+            If bgzip stderr returns an error\n+    '''\n+\n+    if bgzip_stderr:\n+        raise IOError('Error occured while compressing the vcf file')\n+\n+def bgzip_decompress_vcfgz (vcfgz_filename, out_prefix = '', keep_original = False):\n+        '''\n+            Converts a vcf.gz to vcf\n+\n+            The function automates bgzip to decompress a vcf.gz file into a vcf\n+\n+            Parameters\n+            ----------\n+            vcfgz_filename : str\n+                The file name of the vcf.gz file to be decompressed\n+            out_prefix : str\n+                Output file prefix (i.e. filename without extension)\n+            keep_original : bool\n+                Specifies if the original file should be kept\n+\n+            Raises\n+            ------\n+            IOError\n+                Error in creating the compressed file\n+        '''\n+\n+        # Run bgzip with stdout piped to file\n+        if keep_original or out_prefix:\n+\n+            if out_prefix:\n+\n+                # Assign the bgzip filename\n+                vcf_filename = out_prefix + '.vcf'\n+\n+            else:\n+\n+                # Seperate into path and filename\n+                split_path, split_filename = os.path.split(vcfgz_filename)\n+\n+                # Remove any file extensions\n+                vcf_basename = split_filename.split(os.extsep)[0] + '.vcf'\n+\n+                # Join path and filename\n+                vcf_filename = os.path.join(split_path, vcf_basename)\n+\n+            # Create the output file\n+            vcf_file = open(vcf_filename, 'w')\n+\n+            # bgzip subprocess call\n+            bgzip_call = subprocess.Popen(['bgzip', '-dc', vcfgz_filename], stdout = vcf_file, stderr = subprocess.PIPE)\n+\n+        # Run bgzip normally\n+        else:\n+\n+            # bgzip subprocess call\n+            bgzip_call = subprocess.Popen(['bgzip', '-d', vcfgz_filename], stdout = subprocess.PIPE, stderr = subprocess.PIPE)\n+\n+        # Save the stdout and stderr from bgzip\n+        bgzip_out, bgzip_err = bgzip_call.communicate()\n+\n+        # Check that output file was compressed correctly\n+        check_bgzip_for_errors(bgzip_err)\n+\n+        # Delete input when also using an output prefix\n+        if out_prefix and not keep_original:\n+            os.remove(vcfgz_filename)\n+\n+def bgzip_compress_vcf (vcf_filename, out_prefix = '', keep_original = False):\n+        '''\n+            Converts a vcf to vcf.gz\n+\n+            The function automates bgzip to compress a vcf file into a vcf.gz\n+\n+            Parameters\n+            ----------\n+            vcf_filename : str\n+                The file name of the vcf file to be compressed\n+            keep_original : bool\n+                Specifies if the original file should be kept\n+\n+            Raises\n+            ------\n+            IOError\n+                Error in creating the compressed file\n+        '''\n+\n+        # Compress and keep the original file\n+        if keep_original or out_prefix:\n+\n+            if out_prefix:\n+\n+                # Assign the filename\n+                vcfgz_filename = out_prefix + '.vcf.gz'\n+\n+            else:\n+\n+                # Seperate into path and filename\n+                split_path, split_filename = os.path.split(vcfgz_filename)\n+\n+                # Remove any file extensions\n+                vcfgz_basename = split_filename.split(os.extsep)[0] + '.vcf.gz'\n+\n+                # Join path and filename\n+                vcfgz_filename = os.path.join(split_path, vcfgz_basename)\n+\n+\n+            # Create the "..b"+        # Splits log into list of lines\n+        vcftools_stderr_lines = vcftools_stderr.splitlines()\n+        # Prints the error(s)\n+        raise Exception('\\n'.join((output_line for output_line in vcftools_stderr_lines if output_line.startswith('Error'))))\n+\n+    # Print output if not completed and no error found. Unlikely to be used, but included.\n+    else:\n+        raise Exception(vcftools_stderr)\n+\n+def produce_vcftools_output (output, filename, append_mode = False, strip_header = False):\n+    '''\n+        Creates the vcftools output file\n+\n+        This function will create an output file from the vcftools stdout.\n+        Please run `check_vcftools_for_errors` prior to check that vcftools\n+        finished without error.\n+\n+        Parameters\n+        ----------\n+        output : str\n+            vcftools stdout\n+        filename : str\n+            Specifies the filename for the output file\n+        append_mode : bool\n+            Used to create a single output file from multiple calls\n+        strip_header : bool\n+            Used to remove the header if not needed\n+\n+        Returns\n+        -------\n+        output : file\n+            vcftools output file\n+\n+    '''\n+\n+    # Check if the header should be stripped\n+    if strip_header:\n+        output = ''.join(output.splitlines(True)[1:])\n+\n+    # Check if single log file is required from multiple calls\n+    if append_mode:\n+        vcftools_log_file = open(filename,'a')\n+    else:\n+        vcftools_log_file = open(filename,'w')\n+\n+    vcftools_log_file.write(str(output))\n+    vcftools_log_file.close()\n+\n+def produce_vcftools_log (output, filename, append_mode = False):\n+    '''\n+        Creates the vcftools log file\n+\n+        This function will create a log file from the vcftools stderr. Please\n+        run `check_vcftools_for_errors` prior to check that vcftools finished\n+        without error.\n+\n+        Parameters\n+        ----------\n+        output : str\n+            vcftools stderr\n+        filename : str\n+            Specifies the filename for the log file\n+        append_mode : bool\n+            Used to create a single log file from multiple calls\n+\n+        Returns\n+        -------\n+        output : file\n+            vcftools log file\n+\n+    '''\n+    # Check if single log file is required from multiple calls\n+    if append_mode:\n+        vcftools_log_file = open(filename + '.log','a')\n+    else:\n+        vcftools_log_file = open(filename + '.log','w')\n+\n+    vcftools_log_file.write(str(output))\n+    vcftools_log_file.close()\n+\n+def assign_vcftools_input_arg (filename):\n+    '''\n+        Confirms file format for vcftools\n+\n+        Parameters\n+        ----------\n+        filename : str\n+            Specifies the input filename of unknown format\n+\n+        Returns\n+        -------\n+        list\n+            Returns vcftools input command for `filename`\n+\n+        Raises\n+        ------\n+        IOError\n+            If filename is an unknown file format\n+    '''\n+\n+    # True if file extensions is recognized by vcftools\n+    if filename.endswith('.vcf') or filename.endswith('.vcf.gz') or filename.endswith('.bcf'):\n+        # Assign the associated input command\n+        if filename.endswith('.vcf'):\n+            return ['--vcf', filename]\n+        elif filename.endswith('.vcf.gz'):\n+            return ['--gzvcf', filename]\n+        elif filename.endswith('.bcf'):\n+            return ['--bcf', filename]\n+\n+    # True if file extension is unknown or not recognized\n+    else:\n+\n+        # Checks if the file is unzipped, bgzipped, or gzipped\n+        vcfname_format = checkFormat(filename)\n+\n+        # Assign the associated input command, or return an error.\n+        if vcfname_format == 'vcf':\n+            return ['--vcf', filename]\n+        elif vcfname_format == 'bgzip':\n+            return ['--gzvcf', filename]\n+        elif vcfname_format == 'bcf':\n+            return ['--bcf', filename]\n+        else:\n+            raise Exception('Unknown VCF file format')\n"