/*
 * Decompiled with CFR 0.152.
 */
package org.broadinstitute.sting.gatk.executive;

import java.util.Collection;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Queue;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import net.sf.picard.reference.IndexedFastaSequenceFile;
import org.broad.tribble.TribbleException;
import org.broadinstitute.sting.gatk.GenomeAnalysisEngine;
import org.broadinstitute.sting.gatk.datasources.reads.SAMDataSource;
import org.broadinstitute.sting.gatk.datasources.reads.Shard;
import org.broadinstitute.sting.gatk.datasources.rmd.ReferenceOrderedDataSource;
import org.broadinstitute.sting.gatk.executive.HierarchicalMicroSchedulerMBean;
import org.broadinstitute.sting.gatk.executive.MicroScheduler;
import org.broadinstitute.sting.gatk.executive.OutputMergeTask;
import org.broadinstitute.sting.gatk.executive.ReduceTree;
import org.broadinstitute.sting.gatk.executive.ShardTraverser;
import org.broadinstitute.sting.gatk.executive.TreeReducer;
import org.broadinstitute.sting.gatk.io.OutputTracker;
import org.broadinstitute.sting.gatk.io.ThreadLocalOutputTracker;
import org.broadinstitute.sting.gatk.walkers.TreeReducible;
import org.broadinstitute.sting.gatk.walkers.Walker;
import org.broadinstitute.sting.utils.exceptions.ReviewedStingException;
import org.broadinstitute.sting.utils.threading.ThreadPoolMonitor;

public class HierarchicalMicroScheduler
extends MicroScheduler
implements HierarchicalMicroSchedulerMBean,
ReduceTree.TreeReduceNotifier {
    private static final int MAX_OUTSTANDING_OUTPUT_MERGES = 50;
    private ExecutorService threadPool;
    private ThreadLocalOutputTracker outputTracker = new ThreadLocalOutputTracker();
    private final Queue<TreeReduceTask> reduceTasks = new LinkedList<TreeReduceTask>();
    private Iterator<Shard> traversalTasks;
    private final Queue<ShardTraverser> outputMergeTasks = new LinkedList<ShardTraverser>();
    private int totalCompletedTraversals = 0;
    private long totalShardTraverseTime = 0L;
    private long totalTreeReduceTime = 0L;
    private long totalCompletedTreeReduces = 0L;
    private long totalOutputMergeTime = 0L;

    protected HierarchicalMicroScheduler(GenomeAnalysisEngine engine, Walker walker, SAMDataSource reads, IndexedFastaSequenceFile reference, Collection<ReferenceOrderedDataSource> rods, int nThreadsToUse) {
        super(engine, walker, reads, reference, rods);
        this.threadPool = Executors.newFixedThreadPool(nThreadsToUse);
    }

    @Override
    public Object execute(Walker walker, Iterable<Shard> shardStrategy) {
        if (!(walker instanceof TreeReducible)) {
            throw new IllegalArgumentException("The GATK can currently run in parallel only with TreeReducible walkers");
        }
        this.traversalTasks = shardStrategy.iterator();
        ReduceTree reduceTree = new ReduceTree(this);
        this.initializeWalker(walker);
        while (this.isShardTraversePending() || this.isTreeReducePending()) {
            if (this.isMergeLimitExceeded()) {
                this.mergeExistingOutput(false);
            }
            this.waitForFreeQueueSlot();
            if (this.isTreeReduceReady()) {
                this.queueNextTreeReduce(walker);
                continue;
            }
            if (!this.isShardTraversePending()) continue;
            this.queueNextShardTraverse(walker, reduceTree);
        }
        this.threadPool.shutdown();
        this.mergeExistingOutput(true);
        Object result = null;
        try {
            result = reduceTree.getResult().get();
            this.notifyTraversalDone(walker, result);
        }
        catch (InterruptedException ex) {
            this.handleException(ex);
        }
        catch (ExecutionException ex) {
            this.handleException(ex);
        }
        this.outputTracker.close();
        this.cleanup();
        return result;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void initializeWalker(Walker walker) {
        this.outputTracker.bypassThreadLocalStorage(true);
        try {
            walker.initialize();
        }
        finally {
            this.outputTracker.bypassThreadLocalStorage(false);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void notifyTraversalDone(Walker walker, Object result) {
        this.outputTracker.bypassThreadLocalStorage(true);
        try {
            walker.onTraversalDone(result);
            this.printOnTraversalDone(result);
        }
        finally {
            this.outputTracker.bypassThreadLocalStorage(false);
        }
    }

    @Override
    public OutputTracker getOutputTracker() {
        return this.outputTracker;
    }

    protected boolean isShardTraversePending() {
        return this.traversalTasks.hasNext();
    }

    protected boolean isTreeReduceReady() {
        if (this.reduceTasks.size() == 0) {
            return false;
        }
        return this.reduceTasks.peek().isReadyForReduce();
    }

    protected boolean isTreeReducePending() {
        return this.reduceTasks.size() > 0;
    }

    protected boolean isMergeLimitExceeded() {
        ShardTraverser shardTraverse;
        int pendingTasks = 0;
        Iterator i$ = this.outputMergeTasks.iterator();
        while (i$.hasNext() && (shardTraverse = (ShardTraverser)i$.next()).isComplete()) {
            ++pendingTasks;
        }
        return this.outputMergeTasks.size() >= 50;
    }

    protected void mergeExistingOutput(boolean wait) {
        ShardTraverser traverser;
        long startTime = System.currentTimeMillis();
        LinkedList<ShardTraverser> mergeTasksInSession = new LinkedList<ShardTraverser>();
        while (!this.outputMergeTasks.isEmpty() && ((traverser = this.outputMergeTasks.peek()).isComplete() || wait)) {
            this.outputMergeTasks.remove();
            mergeTasksInSession.add(traverser);
        }
        for (ShardTraverser traverser2 : mergeTasksInSession) {
            OutputMergeTask mergeTask;
            if (!traverser2.isComplete()) {
                traverser2.waitForComplete();
            }
            if ((mergeTask = traverser2.getOutputMergeTask()) == null) continue;
            try {
                mergeTask.merge();
            }
            catch (TribbleException ex) {
                throw new ReviewedStingException("Unable to merge temporary Tribble output file.", ex);
            }
        }
        long endTime = System.currentTimeMillis();
        this.totalOutputMergeTime += endTime - startTime;
    }

    protected void queueNextShardTraverse(Walker walker, ReduceTree reduceTree) {
        if (!this.traversalTasks.hasNext()) {
            throw new IllegalStateException("Cannot traverse; no pending traversals exist.");
        }
        Shard shard = this.traversalTasks.next();
        ShardTraverser traverser = new ShardTraverser(this, this.traversalEngine, walker, shard, this.outputTracker);
        Future traverseResult = this.threadPool.submit(traverser);
        reduceTree.addEntry(traverseResult);
        this.outputMergeTasks.add(traverser);
        if (!this.isShardTraversePending()) {
            reduceTree.complete();
        }
    }

    protected void queueNextTreeReduce(Walker walker) {
        if (this.reduceTasks.size() == 0) {
            throw new IllegalStateException("Cannot reduce; no pending reduces exist.");
        }
        TreeReduceTask reducer = this.reduceTasks.remove();
        reducer.setWalker((TreeReducible)((Object)walker));
        this.threadPool.submit(reducer);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected void waitForFreeQueueSlot() {
        ThreadPoolMonitor monitor;
        ThreadPoolMonitor threadPoolMonitor = monitor = new ThreadPoolMonitor();
        synchronized (threadPoolMonitor) {
            this.threadPool.submit(monitor);
            monitor.watch();
        }
    }

    @Override
    public Future notifyReduce(Future lhs, Future rhs) {
        TreeReduceTask reducer = new TreeReduceTask(new TreeReducer(this, lhs, rhs));
        this.reduceTasks.add(reducer);
        return reducer;
    }

    protected final void handleException(InterruptedException ex) {
        throw new ExecutionFailure("Hierarchical reduce interrupted", ex);
    }

    protected final void handleException(ExecutionException ex) {
        if (ex.getCause() instanceof RuntimeException) {
            throw (RuntimeException)ex.getCause();
        }
        throw new ExecutionFailure("Hierarchical reduce failed", ex);
    }

    synchronized void reportShardTraverseTime(long shardTraversalTime) {
        this.totalShardTraverseTime += shardTraversalTime;
        ++this.totalCompletedTraversals;
    }

    synchronized void reportTreeReduceTime(long treeReduceTime) {
        this.totalTreeReduceTime += treeReduceTime;
        ++this.totalCompletedTreeReduces;
    }

    @Override
    public int getNumberOfTasksInReduceQueue() {
        return this.reduceTasks.size();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public int getNumberOfTasksInIOQueue() {
        Queue<ShardTraverser> queue = this.outputMergeTasks;
        synchronized (queue) {
            return this.outputMergeTasks.size();
        }
    }

    @Override
    public long getTotalShardTraverseTimeMillis() {
        return this.totalShardTraverseTime;
    }

    @Override
    public long getAvgShardTraverseTimeMillis() {
        if (this.totalCompletedTraversals == 0) {
            return 0L;
        }
        return this.totalShardTraverseTime / (long)this.totalCompletedTraversals;
    }

    @Override
    public long getTotalTreeReduceTimeMillis() {
        return this.totalTreeReduceTime;
    }

    @Override
    public long getAvgTreeReduceTimeMillis() {
        if (this.totalCompletedTreeReduces == 0L) {
            return 0L;
        }
        return this.totalTreeReduceTime / this.totalCompletedTreeReduces;
    }

    @Override
    public long getTotalOutputMergeTimeMillis() {
        return this.totalOutputMergeTime;
    }

    public static class ExecutionFailure
    extends ReviewedStingException {
        public ExecutionFailure(String s, Throwable throwable) {
            super(s, throwable);
        }
    }

    private class TreeReduceTask
    extends FutureTask {
        private TreeReducer treeReducer;

        public TreeReduceTask(TreeReducer treeReducer) {
            super(treeReducer);
            this.treeReducer = null;
            this.treeReducer = treeReducer;
        }

        public void setWalker(TreeReducible walker) {
            this.treeReducer.setWalker(walker);
        }

        public boolean isReadyForReduce() {
            return this.treeReducer.isReadyForReduce();
        }
    }
}

