Mercurial > repos > fubar > egapx_runner
view nf/subworkflows/ncbi/gnomon/chainer_wnode/main.nf @ 8:1680e72e27be draft default tip
planemo upload for repository https://github.com/ncbi/egapx commit bdbe05027c2c40e217a2ff0c9e0556450c443e54
author | fubar |
---|---|
date | Mon, 05 Aug 2024 03:56:41 +0000 |
parents | d9c5c5b87fec |
children |
line wrap: on
line source
#!/usr/bin/env nextflow nextflow.enable.dsl=2 include { merge_params } from '../../utilities' include { run_align_sort } from '../../default/align_sort_sa/main.nf' split_count=16 workflow chainer_wnode { take: alignments hmm_params evidence_denylist gap_fill_allowlist scaffolds trusted_genes genome proteins parameters // Map : extra parameter and parameter update main: String input_sorting = parameters.get('input_aligns_sort', '') def sort_aligns = alignments if (!input_sorting.contains("presorted")) { String align_sort_params = "" if (input_sorting.contains("merge_only")) { align_sort_params = "-merge" } align_sort_params += " -ifmt seq-align -compression none -k subject,subject_start,-subject_end " // print(align_sort_params) sort_aligns = run_align_sort([], [], alignments, align_sort_params).collect() //sort_aligns = align_sort(alignments, align_sort_params) } String submit_chainer_params = merge_params("-minimum-abut-margin 20 -separate-within-introns", parameters, 'submit_chainer') String chainer_wnode_params = merge_params("", parameters, 'chainer_wnode') String gpx_make_outputs_params = merge_params("-default-output-name chains -slices-for affinity -sort-by affinity", parameters, 'gpx_make_outputs') def (jobs, lines_per_file) = generate_jobs(sort_aligns, submit_chainer_params) def collected = run_chainer(jobs.flatten(), sort_aligns, hmm_params, evidence_denylist, gap_fill_allowlist, scaffolds, trusted_genes, genome, proteins, lines_per_file, chainer_wnode_params) | collect run_gpx_make_outputs(collected, gpx_make_outputs_params) emit: chains = run_gpx_make_outputs.out.chains chains_slices = run_gpx_make_outputs.out.chains_slices evidence = run_gpx_make_outputs.out.evidence evidence_slices = run_gpx_make_outputs.out.evidence_slices } process generate_jobs { input: path sort_aligns val params output: path "job.*" env lines_per_file script: njobs=split_count """ #!/usr/bin/env bash # generate_jobs $sort_aligns $params -output chains -output-slices chains_slices -output-evidence evidence -output-evidence-slices evidence_slices submit_chainer $params -asn $sort_aligns -o jobs total_lines=\$(wc -l <jobs) (( lines_per_file = (total_lines + ${njobs} - 1) / ${njobs} )) echo total_lines=\$total_lines, lines_per_file=\$lines_per_file ####split -l\$lines_per_file jobs job. -da 3 # Use round robin to distribute jobs across nodes more evenly if [ \$total_lines -lt $njobs ]; then effective_njobs=\$total_lines else effective_njobs=$njobs fi split -nr/\$effective_njobs jobs job. -da 3 """ stub: """ for i in {1..$split_count}; do echo "<job query =\\\"lcl|${sort_aligns}:\${i}-\${i}\\\"></job>" >> jobs done split -nr/$split_count jobs job. -da 3 lines_per_file=10 """ } process run_chainer { input: path job path alignments path hmm_params path evidence_denylist path gap_fill_allowlist path scaffolds path trusted_genes path genome, stageAs: 'indexed/*' path proteins_asn, stageAs: 'indexed/*' val lines_per_file val params output: path "output/*" script: job_num = job.toString().tokenize('.').last().toInteger() """ echo "${evidence_denylist.join('\n')}" > evidence_denylist.mft echo "${gap_fill_allowlist.join('\n')}" > gap_fill_allowlist.mft echo "${scaffolds.join('\n')}" > scaffolds.mft echo "${trusted_genes.join('\n')}" > trusted_genes.mft # HACK: derive start_job_id from job file extension filename=\$(basename -- "$job") extension="\${filename##*.}" (( start_job_id = ((10#\$extension) * $lines_per_file) + 1 )) # make the local LDS of the genomic and protein (if present) sequences lds2_indexer -source indexed -db LDS2 # When running multiple jobs on the cluster there is a chance that # several jobs will run on the same node and thus generate files # with the same filename. We need to avoid that to be able to stage # the output files for gpx_make_outputs. We add the job file numeric # extension as a prefix to the filename. mkdir interim chainer_wnode $params -start-job-id \$start_job_id -workers 32 -input-jobs ${job} -O interim -nogenbank -lds2 LDS2 -evidence-denylist-manifest evidence_denylist.mft -gap-fill-allowlist-manifest gap_fill_allowlist.mft -param ${hmm_params} -scaffolds-manifest scaffolds.mft -trusted-genes-manifest trusted_genes.mft mkdir output for f in interim/*; do if [ -f \$f ]; then mv \$f output/\${extension}_\$(basename \$f) fi done """ stub: job_num = job.toString().tokenize('.').last().toInteger() """ mkdir -p output touch output/sample_chainer_wnode.${job_num}.out """ } process run_gpx_make_outputs { input: path files, stageAs: "gpx_inputs/*" val params output: path "output/chains.*.out.gz", emit: 'chains' path "output/chains.*.out.gz.slices", emit: 'chains_slices' path "output/evidence.*.out.gz", emit: 'evidence', optional: true path "output/evidence.*.out.gz.slices", emit: 'evidence_slices', optional: true script: """ ls -1 gpx_inputs/* > gpx_inputs.mft mkdir -p output gpx_make_outputs $params -input-manifest gpx_inputs.mft -output output/@.#.out.gz -output-manifest output/@.mft -slices-manifest output/@_slices.mft -num-partitions $split_count """ stub: """ mkdir -p output echo ${files} for i in {1..$split_count}; do touch output/chains.\$i.out.gz touch output/chains.\$i.out.gz.slices touch output/evidence.\$i.out.gz touch output/evidence.\$i.out.gz.slices done """ }