# HG changeset patch # User kevyin # Date 1486186778 18000 # Node ID b3db1d9534143b5db98b6cb427f2cb3fd56dbd23 # Parent 66ca0d174cf20b6f470be12899dce75f3d2dc4db Uploaded diff -r 66ca0d174cf2 -r b3db1d953414 README --- a/README Sun Apr 27 02:17:09 2014 -0400 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,35 +0,0 @@ -Copyright (c) 2012, Kenneth Sabir -All rights reserved. - -Redistribution and use in source and binary forms, with or without modification, are permitted provided -that the following conditions are met: - - - Redistributions of source code must retain the above copyright notice, this list of conditions and - the following disclaimer. - - Redistributions in binary form must reproduce the above copyright notice, this list of conditions and - the following disclaimer in the documentation and/or other materials provided with the distribution. - - Neither the name of the Garvan Institute nor the names of its contributors may be used to endorse - or promote products derived from this software without specific prior written permission. - -THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, -INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE -ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, -INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF -SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY -THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING -IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - -################################################## -INSTALLATION: -#================================================# - -Set the number of threads to use by editing: -fastq_groomer_parallel.xml: -Modify the last argument in the tag, Default is 8 - -Tested on galaxy-dist -r e6444e7a1685 -This currently depends on the following galaxy_utils python library which should come with the default galaxy install - -More info: - from galaxy_utils.sequence.fastq import fastqReader, fastqVerboseErrorReader, fastqAggregator, fastqWriter -################################################## diff -r 66ca0d174cf2 -r b3db1d953414 fastq_groomer_parallel.py --- a/fastq_groomer_parallel.py Sun Apr 27 02:17:09 2014 -0400 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,164 +0,0 @@ -# Kenneth Sabir -# Garvan Institute -import sys -import time -import os -import math -import subprocess -import string -import shutil -import pickle -import io -from multiprocessing import Process -from galaxy_utils.sequence.fastq import fastqReader, fastqVerboseErrorReader, fastqAggregator, fastqWriter - - - -def main(): - split_program = "split" - cat_program = "cat" - input_filename = sys.argv[1] - output_filename = sys.argv[3] - number_of_processes = 1; - if (len(sys.argv) > 7): - number_of_processes = int(sys.argv[7]) - file_prefix = "temp_groomer_part_" - - t1 = time.time() - old_path = os.getcwd() - - lines_per_process,number_of_lines = calculate_lines_per_process(input_filename, number_of_processes) - temp_dir_name = move_to_temp_dir() - sequences = number_of_lines/4; - args = [split_program, "-l"+str(lines_per_process), input_filename, file_prefix] -# print "The args are: " , args - subprocess.call(args) -# print "Finished" - file_count = 0; - keep_checking = True - processes = [] - output_filenames = [] - while keep_checking: - - # only need to support 26x26 different processes, so do it brute force (ie not in a loop) for 2 chars. - lastchar = string.letters[file_count % len(string.letters)] - firstchar = string.letters[(file_count / len(string.letters)) % len(string.letters)] - temp_input_filename = "%s%c%c" % (file_prefix, firstchar, lastchar) - - # print 'looking for ' + temp_input_filename - if os.path.exists(temp_input_filename): -# print 'found ' + temp_input_filename - temp_output_filename = temp_input_filename + "_output" - output_filenames.append(temp_output_filename) - p = Process(target=partition, args=([temp_input_filename, temp_output_filename, file_count])) - p.start() - processes.append(p) - file_count = file_count + 1 - else: - break - for p in processes : - p.join() - cat_params = [cat_program] - cat_params.extend(output_filenames) - with open(output_filename, 'w') as catOutputFile: - subprocess.call(cat_params, stdout=catOutputFile) - summarize_input = sys.argv[6] == 'summarize_input' - input_type = sys.argv[2] - output_type = sys.argv[4] - print "Groomed %i %s reads into %s reads." % ( sequences, input_type, output_type ) - - aggregators = [] - if summarize_input: - for temp_output_filename in output_filenames : - with open(temp_output_filename + "_summary", 'r') as summaryLogFile: - temp_aggregator = pickle.load(summaryLogFile) - aggregators.append(temp_aggregator) - - print_aggregators(aggregators) - os.chdir(old_path) - shutil.rmtree(temp_dir_name) - time2 = time.time() - print 'Groomer took: %0.3f ms using %d processes' % (((time2 - t1)*1000.0), number_of_processes) - -def calculate_lines_per_process(input_filename, number_of_processes): - wc_program = "wc" - p = subprocess.Popen([wc_program, "-l", input_filename], stdout=subprocess.PIPE) - out, err = p.communicate() - number_of_lines = int(string.split(string.lstrip(out), ' ', 1)[0]) - exact_lines_per_process = number_of_lines * 1.0 / number_of_processes - lines_per_process = int(math.ceil((exact_lines_per_process / 4.0))) * 4 - return lines_per_process,number_of_lines - -def move_to_temp_dir(): - dirExists = False; - dir_name = None - - while not dirExists: - dir_name = "temp_groomer_part_" + str(time.time()) - if not os.path.exists(dir_name): - os.makedirs(dir_name) - break; - os.chdir(dir_name) - return dir_name - -def print_aggregators(aggregators): - total_ascii_range = [None, None] - total_decimal_range = [None, None] - total_valid_formats = set() - for aggregator in aggregators: -# print "This aggregators valid formats are: " + str(aggregator.get_valid_formats()) - total_valid_formats = total_valid_formats.union(set(aggregator.get_valid_formats())) - ascii_range = aggregator.get_ascii_range() - decimal_range = aggregator.get_decimal_range() - - if total_ascii_range[0] is None: - total_ascii_range[0] = ascii_range[0] - else: - total_ascii_range[0] = min (total_ascii_range[0], ascii_range[0]) - - # max of None and a value is the value - total_ascii_range[1] = max (total_ascii_range[1], ascii_range[1]) - if total_decimal_range[0] is None: - total_decimal_range[0] = decimal_range[0] - else: - total_decimal_range[0] = min (total_decimal_range[0], decimal_range[0]) - # max of None and a value is the value - total_decimal_range[1] = max (total_decimal_range[1], decimal_range[1]) - print "total_valid_formats= " + str(total_valid_formats) - print "Based upon quality and sequence, the input data is valid for: %s" % ( ", ".join( total_valid_formats ) or "None" ) - print "Input ASCII range: %s(%i) - %s(%i)" % ( repr( total_ascii_range[0] ), ord( total_ascii_range[0] ), repr( total_ascii_range[1] ), ord( total_ascii_range[1] ) ) #print using repr, since \x00 (null) causes info truncation in galaxy when printed - print "Input decimal range: %i - %i" % ( total_decimal_range[0], total_decimal_range[1] ) - - -def partition(input_filename, temp_output_filename, fileCount): -# print 'Starting Thread: ' + str(fileCount) -# input_filename = sys.argv[1] - input_type = sys.argv[2] - output_type = sys.argv[4] - force_quality_encoding = sys.argv[5] - summarize_input = sys.argv[6] == 'summarize_input' - if force_quality_encoding == 'None': - force_quality_encoding = None - aggregator = fastqAggregator() - temp_process_file = fastqWriter( open( temp_output_filename, 'wb'), format = output_type, force_quality_encoding = force_quality_encoding ) - read_count = None - if summarize_input: - reader = fastqVerboseErrorReader - else: - reader = fastqReader - for read_count, fastq_read in enumerate( reader( open(input_filename, 'rb'), format = input_type, apply_galaxy_conventions = True ) ): - if summarize_input: - aggregator.consume_read( fastq_read ) - temp_process_file.write( fastq_read ) -# print "Just wrote (%d): " % read_count + str(fastq_read) - temp_process_file.close() - if read_count is not None: - if input_type != output_type and 'solexa' in [ input_type, output_type ]: - print "Converted between Solexa and PHRED scores." - if summarize_input: - with open(temp_output_filename + "_summary", 'w') as summaryLogFile : - pickle.dump(aggregator, summaryLogFile) - else: - print "No valid FASTQ reads were provided." - -if __name__ == "__main__": main() diff -r 66ca0d174cf2 -r b3db1d953414 fastq_groomer_parallel.xml --- a/fastq_groomer_parallel.xml Sun Apr 27 02:17:09 2014 -0400 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,71 +0,0 @@ - - Parallel Implementation of FASTQ Groomer - fastq_groomer_parallel.py '$input_file' '$input_type' '$output_file' -#if str( $options_type['options_type_selector'] ) == 'basic': -#if str( $input_type ) == 'cssanger': -'cssanger' -#else: -'sanger' -#end if -'ascii' 'summarize_input' -#else: -'${options_type.output_type}' '${options_type.force_quality_encoding}' '${options_type.summarize_input}' -#end if -"\${GALAXY_SLOTS:-4}" - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -**What it does** - -This is a parallel implementation of FASTQ Groomer. It utilizes multiple CPUs thus runs much faster than the original implementation. - - diff -r 66ca0d174cf2 -r b3db1d953414 fastq_groomer_parallel/README --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/fastq_groomer_parallel/README Sat Feb 04 00:39:38 2017 -0500 @@ -0,0 +1,37 @@ +Copyright (c) 2012, Kenneth Sabir +Copyright (c) 2016, Kevin Ying +All rights reserved. + +Redistribution and use in source and binary forms, with or without modification, are permitted provided +that the following conditions are met: + + - Redistributions of source code must retain the above copyright notice, this list of conditions and + the following disclaimer. + - Redistributions in binary form must reproduce the above copyright notice, this list of conditions and + the following disclaimer in the documentation and/or other materials provided with the distribution. + - Neither the name of the Garvan Institute nor the names of its contributors may be used to endorse + or promote products derived from this software without specific prior written permission. + +THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, +INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, +INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING +IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +################################################## +INSTALLATION: +#================================================# + +Set the number of threads to use by editing: +fastq_groomer_parallel.xml: +Modify the last argument in the tag, Default is 8 + +Tested on galaxy-dist -r e6444e7a1685 +This currently depends on the following galaxy_utils python library which should come with the default galaxy install + +More info: + from galaxy_utils.sequence.fastq import fastqReader, fastqVerboseErrorReader, fastqAggregator, fastqWriter +v0.3.2 Explicit dependency on ``galaxy_sequence_utils`` +################################################## diff -r 66ca0d174cf2 -r b3db1d953414 fastq_groomer_parallel/fastq_groomer_parallel.py --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/fastq_groomer_parallel/fastq_groomer_parallel.py Sat Feb 04 00:39:38 2017 -0500 @@ -0,0 +1,164 @@ +# Kenneth Sabir +# Garvan Institute +import sys +import time +import os +import math +import subprocess +import string +import shutil +import pickle +import io +from multiprocessing import Process +from galaxy_utils.sequence.fastq import fastqReader, fastqVerboseErrorReader, fastqAggregator, fastqWriter + + + +def main(): + split_program = "split" + cat_program = "cat" + input_filename = sys.argv[1] + output_filename = sys.argv[3] + number_of_processes = 1; + if (len(sys.argv) > 7): + number_of_processes = int(sys.argv[7]) + file_prefix = "temp_groomer_part_" + + t1 = time.time() + old_path = os.getcwd() + + lines_per_process,number_of_lines = calculate_lines_per_process(input_filename, number_of_processes) + temp_dir_name = move_to_temp_dir() + sequences = number_of_lines/4; + args = [split_program, "-l"+str(lines_per_process), input_filename, file_prefix] +# print "The args are: " , args + subprocess.call(args) +# print "Finished" + file_count = 0; + keep_checking = True + processes = [] + output_filenames = [] + while keep_checking: + + # only need to support 26x26 different processes, so do it brute force (ie not in a loop) for 2 chars. + lastchar = string.letters[file_count % len(string.letters)] + firstchar = string.letters[(file_count / len(string.letters)) % len(string.letters)] + temp_input_filename = "%s%c%c" % (file_prefix, firstchar, lastchar) + + # print 'looking for ' + temp_input_filename + if os.path.exists(temp_input_filename): +# print 'found ' + temp_input_filename + temp_output_filename = temp_input_filename + "_output" + output_filenames.append(temp_output_filename) + p = Process(target=partition, args=([temp_input_filename, temp_output_filename, file_count])) + p.start() + processes.append(p) + file_count = file_count + 1 + else: + break + for p in processes : + p.join() + cat_params = [cat_program] + cat_params.extend(output_filenames) + with open(output_filename, 'w') as catOutputFile: + subprocess.call(cat_params, stdout=catOutputFile) + summarize_input = sys.argv[6] == 'summarize_input' + input_type = sys.argv[2] + output_type = sys.argv[4] + print "Groomed %i %s reads into %s reads." % ( sequences, input_type, output_type ) + + aggregators = [] + if summarize_input: + for temp_output_filename in output_filenames : + with open(temp_output_filename + "_summary", 'r') as summaryLogFile: + temp_aggregator = pickle.load(summaryLogFile) + aggregators.append(temp_aggregator) + + print_aggregators(aggregators) + os.chdir(old_path) + shutil.rmtree(temp_dir_name) + time2 = time.time() + print 'Groomer took: %0.3f ms using %d processes' % (((time2 - t1)*1000.0), number_of_processes) + +def calculate_lines_per_process(input_filename, number_of_processes): + wc_program = "wc" + p = subprocess.Popen([wc_program, "-l", input_filename], stdout=subprocess.PIPE) + out, err = p.communicate() + number_of_lines = int(string.split(string.lstrip(out), ' ', 1)[0]) + exact_lines_per_process = number_of_lines * 1.0 / number_of_processes + lines_per_process = int(math.ceil((exact_lines_per_process / 4.0))) * 4 + return lines_per_process,number_of_lines + +def move_to_temp_dir(): + dirExists = False; + dir_name = None + + while not dirExists: + dir_name = "temp_groomer_part_" + str(time.time()) + if not os.path.exists(dir_name): + os.makedirs(dir_name) + break; + os.chdir(dir_name) + return dir_name + +def print_aggregators(aggregators): + total_ascii_range = [None, None] + total_decimal_range = [None, None] + total_valid_formats = set() + for aggregator in aggregators: +# print "This aggregators valid formats are: " + str(aggregator.get_valid_formats()) + total_valid_formats = total_valid_formats.union(set(aggregator.get_valid_formats())) + ascii_range = aggregator.get_ascii_range() + decimal_range = aggregator.get_decimal_range() + + if total_ascii_range[0] is None: + total_ascii_range[0] = ascii_range[0] + else: + total_ascii_range[0] = min (total_ascii_range[0], ascii_range[0]) + + # max of None and a value is the value + total_ascii_range[1] = max (total_ascii_range[1], ascii_range[1]) + if total_decimal_range[0] is None: + total_decimal_range[0] = decimal_range[0] + else: + total_decimal_range[0] = min (total_decimal_range[0], decimal_range[0]) + # max of None and a value is the value + total_decimal_range[1] = max (total_decimal_range[1], decimal_range[1]) + print "total_valid_formats= " + str(total_valid_formats) + print "Based upon quality and sequence, the input data is valid for: %s" % ( ", ".join( total_valid_formats ) or "None" ) + print "Input ASCII range: %s(%i) - %s(%i)" % ( repr( total_ascii_range[0] ), ord( total_ascii_range[0] ), repr( total_ascii_range[1] ), ord( total_ascii_range[1] ) ) #print using repr, since \x00 (null) causes info truncation in galaxy when printed + print "Input decimal range: %i - %i" % ( total_decimal_range[0], total_decimal_range[1] ) + + +def partition(input_filename, temp_output_filename, fileCount): +# print 'Starting Thread: ' + str(fileCount) +# input_filename = sys.argv[1] + input_type = sys.argv[2] + output_type = sys.argv[4] + force_quality_encoding = sys.argv[5] + summarize_input = sys.argv[6] == 'summarize_input' + if force_quality_encoding == 'None': + force_quality_encoding = None + aggregator = fastqAggregator() + temp_process_file = fastqWriter( open( temp_output_filename, 'wb'), format = output_type, force_quality_encoding = force_quality_encoding ) + read_count = None + if summarize_input: + reader = fastqVerboseErrorReader + else: + reader = fastqReader + for read_count, fastq_read in enumerate( reader( open(input_filename, 'rb'), format = input_type, apply_galaxy_conventions = True ) ): + if summarize_input: + aggregator.consume_read( fastq_read ) + temp_process_file.write( fastq_read ) +# print "Just wrote (%d): " % read_count + str(fastq_read) + temp_process_file.close() + if read_count is not None: + if input_type != output_type and 'solexa' in [ input_type, output_type ]: + print "Converted between Solexa and PHRED scores." + if summarize_input: + with open(temp_output_filename + "_summary", 'w') as summaryLogFile : + pickle.dump(aggregator, summaryLogFile) + else: + print "No valid FASTQ reads were provided." + +if __name__ == "__main__": main() diff -r 66ca0d174cf2 -r b3db1d953414 fastq_groomer_parallel/fastq_groomer_parallel.xml --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/fastq_groomer_parallel/fastq_groomer_parallel.xml Sat Feb 04 00:39:38 2017 -0500 @@ -0,0 +1,74 @@ + + Parallel Implementation of FASTQ Groomer + + type="package" version="1.0.1">galaxy_sequence_utils + + fastq_groomer_parallel.py '$input_file' '$input_type' '$output_file' +#if str( $options_type['options_type_selector'] ) == 'basic': +#if str( $input_type ) == 'cssanger': +'cssanger' +#else: +'sanger' +#end if +'ascii' 'summarize_input' +#else: +'${options_type.output_type}' '${options_type.force_quality_encoding}' '${options_type.summarize_input}' +#end if +"\${GALAXY_SLOTS:-4}" + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + +**What it does** + +This is a parallel implementation of FASTQ Groomer. It utilizes multiple CPUs thus runs much faster than the original implementation. + + diff -r 66ca0d174cf2 -r b3db1d953414 fastq_groomer_parallel/tool_dependencies.xml --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/fastq_groomer_parallel/tool_dependencies.xml Sat Feb 04 00:39:38 2017 -0500 @@ -0,0 +1,6 @@ + + + + + +