#!/usr/bin/perl # Copyright (c) 2014- The University of Notre Dame. # This software is distributed under the GNU General Public License. # See the file COPYING for details. # # This program runs BWA using Work Queue. It accepts the query and reference # sequence as arguments on the command line. use Error qw(:try); use Error::Simple; use Getopt::Long qw(:config no_ignore_case); use work_queue; my $num_reads = 200000; my $usage = " wq_bwa.pl [options] -reads Sets the number of reads per split to use for running the query (default:$num_reads). Work Queue Options: -port Sets the port for work_queue to listen on (default: arbitrary). -fa Sets the work_queue fast abort option with the given multiplier (default: off). -N Sets the project name to (default: none). -stats Prints WQ statistics to (default:off). -d Sets the debug flag for Work Queue. For all debugging output, try 'all' (default: off). -help Print this usage string. "; #---Process options on the command line my %OPT; try{ GetOptions("port=i" => \$OPT{port}, "fa=i" => \$OPT{fast_abort}, "N=s" => \$OPT{project}, "stats=s" => \$OPT{stats}, "d=s" => \$OPT{debug}, "help|?" => sub {print $usage; exit(0)} ); } catch Error::Simple with{ my $E = shift; print STDERR $E->{-text}; die "\nParsing command line options failed!\n" if(); }; if ($#ARGV < 1) { print $usage; exit 1; } main(); #------------------------------- FUNCTION DEFINITIONS------------------------------------- sub setup_workqueue { if(defined($OPT{"debug"})){ work_queue::cctools_debug_flags_set($OPT{"debug"}); print localtime()." Work Queue debug flags set.\n"; } my $port = 0; if(defined($OPT{"port"})) { $port = $OPT{"port"}; } my $wq = work_queue::work_queue_create($port); if(defined($wq)) { $port = work_queue_port($wq); print localtime()." Work Queue listening on port $port.\n"; } else { print STDERR "Failed to create Work Queue on port $port.\n"; exit(0); } if(defined($OPT{"fast_abort"})) { my $multiplier = $OPT{"fast_abort"}; my $fa = work_queue::work_queue_activate_fast_abort($wq,$multiplier); print localtime()." Work Queue fast abort set to $multiplier.\n"; } if(defined($OPT{"project"})) { work_queue::work_queue_specify_name($wq, $OPT{"project"}); print localtime()." Work Queue project name set to $OPT{\"project\"}.\n"; } if(defined($OPT{"stats"})) { work_queue::work_queue_specify_log($wq, $OPT{"stats"}); print localtime()." Work Queue stats file set to $OPT{\"stats\"}.\n"; } return $wq; } sub split_query { my($query_file) = @_; my $read_count = 0; open(infile, $query_file); my $num_outputs = 0; open(outfile,">$query_file.$num_outputs"); my $line_count=0; while (my $line = ) { chomp $line; #a new sequence begins with '@' if ($line =~ /^[@]/ and $line_count % 4 ==0){ if ($read_count == $num_reads){ close(outfile); $num_outputs++; $read_count = 0; open(outfile, ">$query_file.$num_outputs"); } else { $read_count++; } } print outfile $line; print outfile "\n"; $line_count++; } $num_outputs++; close(infile); print localtime(). " Number of splits of $query_file is $num_outputs.\n"; return $num_outputs; } sub partition_tasks { my($wq, $ref_file, $query_file) = @_; my $num_splits = split_query($query_file); for(my $i = 0; $i < $num_splits; $i++) { my $task_query_file = $query_file. ".$i"; my $task_outfile = $task_query_file. ".sam"; my $task_command = "./bwa mem $ref_file $task_query_file > $task_outfile"; my $task = work_queue_task_create($task_command); work_queue_task_specify_file($task, "bwa", "bwa", $WORK_QUEUE_INPUT, $WORK_QUEUE_CACHE); #add the ref file indexes using two for loops foreach my $ending ('', '.amb', '.ann', '.bwt', '.pac', '.sa') { my $task_ref_file = $ref_file . $ending ; work_queue_task_specify_file($task, $task_ref_file, $task_ref_file, $WORK_QUEUE_INPUT, $WORK_QUEUE_CACHE); } work_queue_task_specify_file($task, $task_query_file, $task_query_file, $WORK_QUEUE_INPUT, $WORK_QUEUE_NOCACHE); work_queue_task_specify_file($task, $task_outfile, $task_outfile, $WORK_QUEUE_OUTPUT, $WORK_QUEUE_NOCACHE); my $taskid = work_queue_submit($wq, $task); print localtime(). " Submitted task (id# $task->{taskid}): $task->{command_line}\n"; } return $num_splits; } sub retrieve_tasks { my ($wq, $num_tasks) = @_; my $retrieved_tasks = 0; print localtime(). " Waiting for $num_tasks tasks to complete...\n"; while ($retrieved_tasks < $num_tasks) { my $t = work_queue_wait($wq, 5); if(defined($t)) { print localtime(). " Task (id# $t->{taskid}) complete: $t->{command_line} (return code $t->{return_status})\n"; if($t->{return_status} != 0) { print localtime(). " Task (id# $t->{taskid}) failed\n"; } my $task_output = $t->{output}; open (task_outfile, '>>task_outputs.txt'); print task_outfile "$task_output\n"; print task_outfile "=====================================\n\n"; close (task_outfile); work_queue_task_delete($t); $retrieved_tasks++; print localtime(). " Retrieved $retrieved_tasks tasks.\n"; } } } sub merge_tasks { my($query_file, $num_splits) = @_; open(sam_outfile,">$query_file.sam"); open(seq_outfile,">$query_file.seq"); for(my $i = 0; $i < $num_splits; $i++) { my $task_outfile = $query_file. ".$i" . ".sam"; open(outfh,$task_outfile); while(my $line = ) { $line =~ s/^\s+//; $first_char = substr($line, 0, 1); if($first_char == '@') { print sam_outfile $line; } else { print seq_outfile $line; } } close(outfh); } close(seq_outfile); #open(seq_outfile, "$query_file.seq"); #while(my $line = ) { # print sam_outfile $line; #} close(sam_outfile); } sub delete_workqueue { my($wq) = @_; work_queue_delete($wq); } sub main { my $ref_file=$ARGV[0]; my $query_file=$ARGV[1]; my $wq = setup_workqueue(); my $num_tasks = partition_tasks($wq, $ref_file, $query_file); retrieve_tasks($wq, $num_tasks); merge_tasks($query_file, $num_tasks); delete_workqueue($wq); exit 0; }