view bwape_parallel_optimized @ 0:8f4d4b4e8b04 default tip

init commit
author Yusuf Ali <ali@yusuf.email>
date Wed, 25 Mar 2015 13:41:11 -0600
parents
children
line wrap: on
line source

#!/usr/bin/env perl

use strict;
use warnings;
use File::Basename;
use File::Temp qw(:POSIX);
use File::Spec qw(tmpdir);

@ARGV == 6 or die "Usage: $0 <# threads> <ref.fasta> <read1.fq[.gz]> <read2.fq[.gz]> <out bam prefix>\nMerges, then pair aligns reads using BWA 0.6.2\n";

my %config;
my $dirname = dirname(__FILE__);
my $tool_dir = shift @ARGV;
#read config file
if(not -e "$tool_dir/miseq_fastq_bam.loc"){
  system("cp $dirname/tool-data/miseq_fastq_bam.loc $tool_dir/miseq_fastq_bam.loc");
}
open CONFIG, '<', "$tool_dir/miseq_fastq_bam.loc";
while(<CONFIG>){
  (my $key, my $value) = split(/\s+/,$_);
  $config{$key} = $value;
}
close CONFIG;

my $threads = shift @ARGV;
my $ref = $config{"ref_genome_dir"} . (shift @ARGV);
my $read1 = shift @ARGV;
my $read2 = shift @ARGV;
my $outbam = shift @ARGV;
my $log = "$outbam.align.log";
$SIG{__DIE__} = $SIG{INT} = $SIG{HUP} = $SIG{TERM} = $SIG{ABRT} = \&cleanup;

open(STDERR, ">$log");
#my $tmpdir = File::Spec->tmpdir();
my $tmpdir = ".";
$ENV{"TMP_DIR"} = $tmpdir;
#my ($label, $dirs, $suffix) = fileparse($read1, ".fastq", ".fq");

if(not -e "$ref.sa"){
  # TODO: Use NFS lock to avoid concurrent indexing of the same file?
  system("bwa index $ref &>> $log") >> 8 and die "bwa index had non-zero ($?) exit status for $ref, aborting\n";
}

# Split the input into temporary chunks for parallel processing
my $tot_read1_size = -s $read1;
# Estimate expansion of compressed FASTQ files.  About 3.5 from sample data we have 
my $compressed_reads = 0;
if(`file $read1` =~ /gzip/i){
  $compressed_reads = 1;
  $tot_read1_size *= 3.5;
}
my $chunk_read_size = $tot_read1_size/$threads;

my (@read1_chunk_tmpfiles, @read2_chunk_tmpfiles);
# run initial alignment of each end in parallel
my $pid;
my $read_so_far = 0;
my $read_chunk_tmpfile_prefix = $$; #tmpnam();
my $tmpbam = basename($read_chunk_tmpfile_prefix);

# Check the average num of lines in a chunk to ensure chunks are lined up between forward and reverse
my $lines_per_chunk = 0;
open(READS1, ($compressed_reads ? "gzip -cd $read1 |" : $read1))
  or die "Cannot ", ($compressed_reads ? "gunzip" : "read"), " $read1: $!\n";
while(<READS1>){
  $read_so_far += length($_);
  # FASTQ records have 4 lines...only split if at the end of a record
  if($.%4 == 1 and $read_so_far > $chunk_read_size){
    last;
  }
  $lines_per_chunk++;
}
close(READS1);
my $chunk_count = 0;
my @align_cmds;
if($pid = fork) {
  # parent here
  my $read1_chunk_tmpfile;
  open(READS1, ($compressed_reads ? "gzip -cd $read1 |" : $read1))
    or die "Cannot ", ($compressed_reads ? "gunzip" : "read"), " $read1: $!\n";
  while(<READS1>){
    # FASTQ records have 4 lines...only split if at the end of a record
    if($. % $lines_per_chunk == 1){
      if(defined $read1_chunk_tmpfile){
        push @align_cmds, "bwa aln -n 0.1 -f $read1_chunk_tmpfile.sai $ref $read1_chunk_tmpfile &>> $log"; 
      }
      $read1_chunk_tmpfile = $read_chunk_tmpfile_prefix.".".int($chunk_count++).".forward";
      push @read1_chunk_tmpfiles, $read1_chunk_tmpfile;
      open(FASTQ_CHUNK, ">$read1_chunk_tmpfile")
        or die "Cannot open $read1_chunk_tmpfile for writing: $!\n";
    }
    print FASTQ_CHUNK $_;
  }
  close(FASTQ_CHUNK);
  push @align_cmds, "bwa aln -n 0.1 -f $read1_chunk_tmpfile.sai $ref $read1_chunk_tmpfile &>> $log";
  wait;
} 
elsif(defined $pid) { # $pid is zero in child process if defined
  my $read2_chunk_tmpfile;
  open(READS2, ($compressed_reads ? "gzip -cd $read2 |" : $read2))
    or die "Cannot ", ($compressed_reads ? "gunzip" : "read"), " $read2: $!\n";
  while(<READS2>){
    # FASTQ records have 4 lines...only split if at the end of a record
    if($. % $lines_per_chunk == 1){
      $read2_chunk_tmpfile = $read_chunk_tmpfile_prefix.".".int($chunk_count++).".reverse";
      open(FASTQ_CHUNK, ">$read2_chunk_tmpfile")
        or die "Cannot open $read2_chunk_tmpfile for writing: $!\n";
    }
    print FASTQ_CHUNK $_;
  }
  close(FASTQ_CHUNK);
  exit(0);
}
else {
  die "Can't fork: $!\n";
}

# repeat the commands for the reverse reads
my @final_align_cmds = (@align_cmds, map {my $r = $_; $r =~ s/\.forward/.reverse/g; $r} @align_cmds);
&run_cmds($threads, 5, @final_align_cmds);

my @pair_cmds;
my @bam_chunk_sorted_tmpfiles;
for my $read1_chunk_tmpfile (@read1_chunk_tmpfiles){
  my $read2_chunk_tmpfile = $read1_chunk_tmpfile;
  $read2_chunk_tmpfile =~ s/\.forward$/.reverse/;
  my $bam_chunk_tmpfile = $read1_chunk_tmpfile;
  $bam_chunk_tmpfile =~ s/\.forward$/.pe.bam/;
  my $bam_chunk_sorted_tmpfile_prefix = $bam_chunk_tmpfile;
  $bam_chunk_sorted_tmpfile_prefix =~ s/\.bam$/.sorted/;
  push @bam_chunk_sorted_tmpfiles, "$bam_chunk_sorted_tmpfile_prefix.bam";
  #push @cmds, "/export/achri_data/programs/bwa-0.6.2/bwa sampe -r \"\@RG\tID:NA\tPL:illumina\tPU:NA\tLB:NA\tSM:$outbam\" $ref $read1_chunk_tmpfile.sai $read2_chunk_tmpfile.sai $read1_chunk_tmpfile $read2_chunk_tmpfile | samtools view -S -b - | samtools sort - $bam_chunk_sorted_tmpfile_prefix 2>> $log";
  push @pair_cmds, "bwa sampe -r \"\@RG\tID:NA\tPL:illumina\tPU:NA\tLB:NA\tSM:$outbam\" $ref $read1_chunk_tmpfile.sai $read2_chunk_tmpfile.sai $read1_chunk_tmpfile $read2_chunk_tmpfile | samtools view -S -b - > $bam_chunk_tmpfile; samtools sort $bam_chunk_tmpfile $bam_chunk_sorted_tmpfile_prefix &>> $log";
}
&run_cmds($threads, 5, @pair_cmds);

system("samtools merge $tmpdir/$tmpbam.bam ".join(" ", @bam_chunk_sorted_tmpfiles)."&>> $log");
system("samtools index $tmpdir/$tmpbam.bam &>> $log") >> 8 and die "samtools index had non-zero ($?) exit status for $tmpdir/$tmpbam.bam, aborting\n";

# post-process the alignments
if($pid = fork){
  system("samtools flagstat $tmpdir/$tmpbam.bam > $outbam.before_dedup.flagstat.txt 2>> $log");
  wait(); # for rmdup to finish
}
elsif(defined $pid) {
  system("samtools rmdup $tmpdir/$tmpbam.bam $tmpdir/$tmpbam.rmdup.bam &>> $log")>>8 and die "Samtools rmdup failed with exit status ", ($?>>8), ", please check the log\n";
  exit(0);
}
else{ # Can't fork, do in serial
  system("samtools flagstat $tmpdir/$tmpbam.bam > $outbam.before_dedup.flagstat.txt 2>> $log");
  system("samtools rmdup $tmpdir/$tmpbam.bam $tmpdir/$tmpbam.rmdup.bam &>> $log")>>8 and die "Samtools rmdup failed with exit status ", ($?>>8), ", please check the log\n";
}

die "Samtools rmdup did not generate the expected output file" if not -e "$tmpdir/$tmpbam.rmdup.bam";
die "Samtools generated a blank output file" if -z "$tmpdir/$tmpbam.rmdup.bam";
system "samtools index $tmpdir/$tmpbam.rmdup.bam &>> $log";
# Peak into the BAM to see if it's got Illumina encoded quality values, which will require an extra flag for GATK
my $is_illumina_encoded_qual = 1;
my $num_mapped = 0;
if(open(SAMTOOLS, "samtools view $tmpdir/$tmpbam.rmdup.bam |")){
    while(<SAMTOOLS>){
        my @F = split /\t/, $_;
        next unless $F[2] ne "*"; # ignore unmapped reads
        my @quals = map {ord($_)} split(//, $F[10]);
        if(grep {$_ < 64} @quals){
            $is_illumina_encoded_qual = 0; last;
        }
        last if ++$num_mapped > 100; # only look at the first 100 mapped reads 
    }
} # take our chances if samtools call failed that it's not illumina encoded, which will fail quickly in GATK anyways
close(SAMTOOLS);

system "java -Xmx4G -jar $diname/GenomeAnalysisTK.jar -nt $threads -I $tmpdir/$tmpbam.rmdup.bam -R $ref -T RealignerTargetCreator -o $tmpdir/$tmpbam.rmdup.gatk_realigner.intervals &>> $log";
die "GATK did not generate the expected intervals file\n" if not -e "$tmpdir/$tmpbam.rmdup.gatk_realigner.intervals";
system "java -Xmx4G -jar $dirname/GenomeAnalysisTK.jar -I $tmpdir/$tmpbam.rmdup.bam -R $ref -T IndelRealigner -targetIntervals $tmpdir/$tmpbam.rmdup.gatk_realigner.intervals -o $outbam.bam ".
       ($is_illumina_encoded_qual?"--fix_misencoded_quality_scores":"")." &>> $log";
die "GATK did not generate the expected realigned BAM output file\n" if not -e "$outbam.bam";
die "GATK generated a blank output file\n" if -z "$outbam.bam";
my $outfile_bai = "$outbam.bai";
if(not -e $outfile_bai){
  if(not -e "$outbam.bam.bai"){
    system "samtools index $outbam.bam &>> $log";
  }
  system "cp $outbam.bam.bai $outfile_bai";
}
else{
  system "cp $outfile_bai $outbam.bam.bai"; # some tools expect name.bai, some expect name.bam.bai, so provide both
}
&cleanup;

sub cleanup{
  print STDERR "Cleaning up temp files for aborted bwa run: @_" if @_;
  for my $read1_chunk_tmpfile (@read1_chunk_tmpfiles){
    my $read2_chunk_tmpfile = $read1_chunk_tmpfile;
    $read2_chunk_tmpfile =~ s/\.forward$/.reverse/;
    my $bam_chunk_tmpfile = $read1_chunk_tmpfile;
    $bam_chunk_tmpfile =~ s/\.forward$/.pe.bam/;
    unlink $read1_chunk_tmpfile, "$read1_chunk_tmpfile.sai", $read2_chunk_tmpfile, "$read2_chunk_tmpfile.sai", $bam_chunk_tmpfile;
  }
  unlink @bam_chunk_sorted_tmpfiles;
  unlink "$tmpdir/$tmpbam.bam";
  unlink "$tmpdir/$tmpbam.bam.bai";
  unlink "$tmpdir/$tmpbam.rmdup.bam";
  unlink "$tmpdir/$tmpbam.rmdup.bam.bai";
  unlink "$tmpdir/$tmpbam.rmdup.gatk_realigner.intervals";
}

sub run_cmds{

  my ($max_cmds, $sleep_time, @cmds) = @_;

  my ($num_children, $pid);

        for($num_children = 1; $num_children < $max_cmds && @cmds; $num_children++){
                my $cmd = shift (@cmds);
                if($pid = fork) {
                  sleep $sleep_time; # seems to be a major system time spike if all commands launched in quick succession.
                } elsif(defined $pid) { 
                        #print STDERR "$cmd\n";
                        system $cmd;
                        #print STDERR "Finished $cmd\n";
                        exit;
                } else {
                        die "Can't fork: $!\n";
                }
        }

        while(@cmds) {
                undef $pid;
                FORK: {
                        my $cmd = shift (@cmds);
                        if($pid = fork) {
                                $num_children++;
                                wait;
                                $num_children--;
                                next;

                        } elsif(defined $pid) { 
                                system $cmd;
                                exit;

                        } else {
                                die "Can't fork: $!\n";
                        }
                }
        }
        wait while $num_children--;
}