/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators.sort;

import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.apache.flink.api.common.typeutils.TypeComparator;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.TypeSerializerFactory;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.runtime.io.disk.ChannelReaderInputViewIterator;
import org.apache.flink.runtime.io.disk.iomanager.BlockChannelReader;
import org.apache.flink.runtime.io.disk.iomanager.BlockChannelWriter;
import org.apache.flink.runtime.io.disk.iomanager.ChannelReaderInputView;
import org.apache.flink.runtime.io.disk.iomanager.ChannelWriterOutputView;
import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.memorymanager.MemoryAllocationException;
import org.apache.flink.runtime.memorymanager.MemoryManager;
import org.apache.flink.runtime.operators.sort.ExceptionHandler;
import org.apache.flink.runtime.operators.sort.FixedLengthRecordSorter;
import org.apache.flink.runtime.operators.sort.InMemorySorter;
import org.apache.flink.runtime.operators.sort.IndexedSorter;
import org.apache.flink.runtime.operators.sort.LargeRecordHandler;
import org.apache.flink.runtime.operators.sort.MergeIterator;
import org.apache.flink.runtime.operators.sort.NormalizedKeySorter;
import org.apache.flink.runtime.operators.sort.QuickSort;
import org.apache.flink.runtime.operators.sort.Sorter;
import org.apache.flink.util.MutableObjectIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class UnilateralSortMerger<E>
implements Sorter<E> {
    private static final Logger LOG = LoggerFactory.getLogger(UnilateralSortMerger.class);
    private static final int THRESHOLD_FOR_IN_PLACE_SORTING = 32;
    protected static final int MIN_NUM_WRITE_BUFFERS = 2;
    protected static final int MAX_NUM_WRITE_BUFFERS = 4;
    protected static final int MIN_NUM_SORT_MEM_SEGMENTS = 10;
    private final ThreadBase<E> readThread;
    private final ThreadBase<E> sortThread;
    private final ThreadBase<E> spillThread;
    protected final List<MemorySegment> sortReadMemory;
    protected final List<MemorySegment> writeMemory;
    protected final MemoryManager memoryManager;
    private final LargeRecordHandler<E> largeRecordHandler;
    private final HashSet<FileIOChannel> openChannels;
    private final HashSet<FileIOChannel.ID> channelsToDeleteAtShutdown;
    protected final Object iteratorLock = new Object();
    protected volatile MutableObjectIterator<E> iterator;
    protected volatile IOException iteratorException;
    protected volatile boolean closed;
    private static final CircularElement<Object> EOF_MARKER = new CircularElement();
    private static final CircularElement<Object> SPILLING_MARKER = new CircularElement();

    public UnilateralSortMerger(MemoryManager memoryManager, IOManager ioManager, MutableObjectIterator<E> input, AbstractInvokable parentTask, TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator, double memoryFraction, int maxNumFileHandles, float startSpillingFraction) throws IOException, MemoryAllocationException {
        this(memoryManager, ioManager, input, parentTask, serializerFactory, comparator, memoryFraction, -1, maxNumFileHandles, startSpillingFraction);
    }

    public UnilateralSortMerger(MemoryManager memoryManager, IOManager ioManager, MutableObjectIterator<E> input, AbstractInvokable parentTask, TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator, double memoryFraction, int numSortBuffers, int maxNumFileHandles, float startSpillingFraction) throws IOException, MemoryAllocationException {
        this(memoryManager, ioManager, input, parentTask, serializerFactory, comparator, memoryFraction, numSortBuffers, maxNumFileHandles, startSpillingFraction, false, true);
    }

    public UnilateralSortMerger(MemoryManager memoryManager, List<MemorySegment> memory, IOManager ioManager, MutableObjectIterator<E> input, AbstractInvokable parentTask, TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator, int numSortBuffers, int maxNumFileHandles, float startSpillingFraction, boolean handleLargeRecords) throws IOException {
        this(memoryManager, memory, ioManager, input, parentTask, serializerFactory, comparator, numSortBuffers, maxNumFileHandles, startSpillingFraction, false, handleLargeRecords);
    }

    protected UnilateralSortMerger(MemoryManager memoryManager, IOManager ioManager, MutableObjectIterator<E> input, AbstractInvokable parentTask, TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator, double memoryFraction, int numSortBuffers, int maxNumFileHandles, float startSpillingFraction, boolean noSpillingMemory, boolean handleLargeRecords) throws IOException, MemoryAllocationException {
        this(memoryManager, memoryManager.allocatePages(parentTask, memoryManager.computeNumberOfPages(memoryFraction)), ioManager, input, parentTask, serializerFactory, comparator, numSortBuffers, maxNumFileHandles, startSpillingFraction, noSpillingMemory, true);
    }

    protected UnilateralSortMerger(MemoryManager memoryManager, List<MemorySegment> memory, IOManager ioManager, MutableObjectIterator<E> input, AbstractInvokable parentTask, TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator, int numSortBuffers, int maxNumFileHandles, float startSpillingFraction, boolean noSpillingMemory, boolean handleLargeRecords) throws IOException {
        int numLargeRecordBuffers;
        int numWriteBuffers;
        if (memoryManager == null | (ioManager == null && !noSpillingMemory) | serializerFactory == null | comparator == null) {
            throw new NullPointerException();
        }
        if (parentTask == null) {
            throw new NullPointerException("Parent Task must not be null.");
        }
        if (maxNumFileHandles < 2) {
            throw new IllegalArgumentException("Merger cannot work with less than two file handles.");
        }
        this.memoryManager = memoryManager;
        int numPagesTotal = memory.size();
        if (numPagesTotal < 12) {
            throw new IllegalArgumentException("Too little memory provided to sorter to perform task. Required are at least 12 pages. Current page size is " + memoryManager.getPageSize() + " bytes.");
        }
        if (noSpillingMemory && !handleLargeRecords) {
            numWriteBuffers = 0;
            numLargeRecordBuffers = 0;
        } else {
            int numConsumers = (noSpillingMemory ? 0 : 1) + (handleLargeRecords ? 2 : 0);
            int minBuffersForMerging = maxNumFileHandles + numConsumers * 2;
            if (minBuffersForMerging > numPagesTotal) {
                numWriteBuffers = noSpillingMemory ? 0 : 2;
                numLargeRecordBuffers = handleLargeRecords ? 4 : 0;
                maxNumFileHandles = numPagesTotal - numConsumers * 2;
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Reducing maximal merge fan-in to " + maxNumFileHandles + " due to limited memory availability during merge");
                }
            } else {
                int fractionalAuxBuffers = numPagesTotal / (numConsumers * 100);
                if (fractionalAuxBuffers >= 4) {
                    numWriteBuffers = noSpillingMemory ? 0 : 4;
                    numLargeRecordBuffers = handleLargeRecords ? 8 : 0;
                } else {
                    numWriteBuffers = noSpillingMemory ? 0 : Math.max(2, fractionalAuxBuffers);
                    numLargeRecordBuffers = handleLargeRecords ? Math.max(4, fractionalAuxBuffers) : 0;
                }
            }
        }
        int sortMemPages = numPagesTotal - numWriteBuffers - numLargeRecordBuffers;
        long sortMemory = (long)sortMemPages * (long)memoryManager.getPageSize();
        if (numSortBuffers < 1) {
            numSortBuffers = sortMemory > 0x6400000L ? 2 : 1;
        }
        int numSegmentsPerSortBuffer = sortMemPages / numSortBuffers;
        if (LOG.isDebugEnabled()) {
            LOG.debug(String.format("Instantiating sorter with %d pages of sorting memory (=%d bytes total) divided over %d sort buffers (%d pages per buffer). Using %d buffers for writing sorted results and merging maximally %d streams at once. Using %d memory segments for large record spilling.", sortMemPages, sortMemory, numSortBuffers, numSegmentsPerSortBuffer, numWriteBuffers, maxNumFileHandles, numLargeRecordBuffers));
        }
        this.sortReadMemory = memory;
        this.writeMemory = new ArrayList<MemorySegment>(numWriteBuffers);
        TypeSerializer serializer = serializerFactory.getSerializer();
        if (numWriteBuffers > 0) {
            for (int i = 0; i < numWriteBuffers; ++i) {
                this.writeMemory.add(this.sortReadMemory.remove(this.sortReadMemory.size() - 1));
            }
        }
        if (numLargeRecordBuffers > 0) {
            ArrayList<MemorySegment> mem = new ArrayList<MemorySegment>();
            for (int i = 0; i < numLargeRecordBuffers; ++i) {
                mem.add(this.sortReadMemory.remove(this.sortReadMemory.size() - 1));
            }
            this.largeRecordHandler = new LargeRecordHandler(serializer, comparator.duplicate(), ioManager, memoryManager, mem, parentTask, maxNumFileHandles);
        } else {
            this.largeRecordHandler = null;
        }
        CircularQueues circularQueues = new CircularQueues();
        Iterator<MemorySegment> segments = this.sortReadMemory.iterator();
        for (int i = 0; i < numSortBuffers; ++i) {
            int k;
            ArrayList<MemorySegment> sortSegments = new ArrayList<MemorySegment>(numSegmentsPerSortBuffer);
            int n = k = i == numSortBuffers - 1 ? Integer.MAX_VALUE : numSegmentsPerSortBuffer;
            while (k > 0 && segments.hasNext()) {
                sortSegments.add(segments.next());
                --k;
            }
            TypeComparator comp = comparator.duplicate();
            InMemorySorter buffer = comp.supportsSerializationWithKeyNormalization() && serializer.getLength() > 0 && serializer.getLength() <= 32 ? new FixedLengthRecordSorter(serializerFactory.getSerializer(), comp, sortSegments) : new NormalizedKeySorter(serializerFactory.getSerializer(), comp, sortSegments);
            CircularElement element = new CircularElement(i, buffer);
            circularQueues.empty.add(element);
        }
        ExceptionHandler<IOException> exceptionHandler = new ExceptionHandler<IOException>(){

            @Override
            public void handleException(IOException exception) {
                if (!UnilateralSortMerger.this.closed) {
                    UnilateralSortMerger.this.setResultIteratorException(exception);
                    UnilateralSortMerger.this.close();
                }
            }
        };
        this.channelsToDeleteAtShutdown = new HashSet(64);
        this.openChannels = new HashSet(64);
        this.readThread = this.getReadingThread(exceptionHandler, input, circularQueues, this.largeRecordHandler, parentTask, serializer, (long)(startSpillingFraction * (float)sortMemory));
        this.sortThread = this.getSortingThread(exceptionHandler, circularQueues, parentTask);
        this.spillThread = this.getSpillingThread(exceptionHandler, circularQueues, parentTask, memoryManager, ioManager, serializerFactory, comparator, this.sortReadMemory, this.writeMemory, maxNumFileHandles);
        ClassLoader contextLoader = Thread.currentThread().getContextClassLoader();
        if (contextLoader != null) {
            if (this.readThread != null) {
                this.readThread.setContextClassLoader(contextLoader);
            }
            if (this.sortThread != null) {
                this.sortThread.setContextClassLoader(contextLoader);
            }
            if (this.spillThread != null) {
                this.spillThread.setContextClassLoader(contextLoader);
            }
        }
        this.startThreads();
    }

    protected void startThreads() {
        if (this.readThread != null) {
            this.readThread.start();
        }
        if (this.sortThread != null) {
            this.sortThread.start();
        }
        if (this.spillThread != null) {
            this.spillThread.start();
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void close() {
        Object object = this;
        synchronized (object) {
            if (this.closed) {
                return;
            }
            this.closed = true;
        }
        try {
            object = this.iteratorLock;
            synchronized (object) {
                if (this.iteratorException == null) {
                    this.iteratorException = new IOException("The sorter has been closed.");
                    this.iteratorLock.notifyAll();
                }
            }
            if (this.readThread != null) {
                try {
                    this.readThread.shutdown();
                }
                catch (Throwable t) {
                    LOG.error("Error shutting down reader thread: " + t.getMessage(), t);
                }
            }
            if (this.sortThread != null) {
                try {
                    this.sortThread.shutdown();
                }
                catch (Throwable t) {
                    LOG.error("Error shutting down sorter thread: " + t.getMessage(), t);
                }
            }
            if (this.spillThread != null) {
                try {
                    this.spillThread.shutdown();
                }
                catch (Throwable t) {
                    LOG.error("Error shutting down spilling thread: " + t.getMessage(), t);
                }
            }
            try {
                if (this.readThread != null) {
                    this.readThread.join();
                }
                if (this.sortThread != null) {
                    this.sortThread.join();
                }
                if (this.spillThread != null) {
                    this.spillThread.join();
                }
            }
            catch (InterruptedException iex) {
                LOG.debug("Closing of sort/merger was interrupted. The reading/sorting/spilling threads may still be working.", (Throwable)iex);
            }
        }
        finally {
            Object channel;
            Iterator<Object> channels2;
            try {
                if (!this.writeMemory.isEmpty()) {
                    this.memoryManager.release(this.writeMemory);
                }
                this.writeMemory.clear();
            }
            catch (Throwable iex) {}
            try {
                if (!this.sortReadMemory.isEmpty()) {
                    this.memoryManager.release(this.sortReadMemory);
                }
                this.sortReadMemory.clear();
            }
            catch (Throwable iex) {}
            while (!this.openChannels.isEmpty()) {
                try {
                    channels2 = this.openChannels.iterator();
                    while (channels2.hasNext()) {
                        channel = channels2.next();
                        channels2.remove();
                        channel.closeAndDelete();
                    }
                }
                catch (Throwable channels2) {
                }
            }
            while (!this.channelsToDeleteAtShutdown.isEmpty()) {
                try {
                    channels2 = this.channelsToDeleteAtShutdown.iterator();
                    while (channels2.hasNext()) {
                        channel = (FileIOChannel.ID)channels2.next();
                        channels2.remove();
                        try {
                            File f = new File(((FileIOChannel.ID)channel).getPath());
                            if (!f.exists()) continue;
                            f.delete();
                        }
                        catch (Throwable throwable) {}
                    }
                }
                catch (Throwable throwable) {
                }
            }
            try {
                if (this.largeRecordHandler != null) {
                    this.largeRecordHandler.close();
                }
            }
            catch (Throwable throwable) {}
        }
    }

    protected ThreadBase<E> getReadingThread(ExceptionHandler<IOException> exceptionHandler, MutableObjectIterator<E> reader, CircularQueues<E> queues, LargeRecordHandler<E> largeRecordHandler, AbstractInvokable parentTask, TypeSerializer<E> serializer, long startSpillingBytes) {
        return new ReadingThread<Object>(exceptionHandler, reader, queues, largeRecordHandler, serializer.createInstance(), parentTask, startSpillingBytes);
    }

    protected ThreadBase<E> getSortingThread(ExceptionHandler<IOException> exceptionHandler, CircularQueues<E> queues, AbstractInvokable parentTask) {
        return new SortingThread<E>(exceptionHandler, queues, parentTask);
    }

    protected ThreadBase<E> getSpillingThread(ExceptionHandler<IOException> exceptionHandler, CircularQueues<E> queues, AbstractInvokable parentTask, MemoryManager memoryManager, IOManager ioManager, TypeSerializerFactory<E> serializerFactory, TypeComparator<E> comparator, List<MemorySegment> sortReadMemory, List<MemorySegment> writeMemory, int maxFileHandles) {
        return new SpillingThread(exceptionHandler, queues, parentTask, memoryManager, ioManager, serializerFactory.getSerializer(), comparator, sortReadMemory, writeMemory, maxFileHandles);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public MutableObjectIterator<E> getIterator() throws InterruptedException {
        Object object = this.iteratorLock;
        synchronized (object) {
            while (this.iterator == null && this.iteratorException == null) {
                this.iteratorLock.wait();
            }
            if (this.iteratorException != null) {
                throw new RuntimeException("Error obtaining the sorted input: " + this.iteratorException.getMessage(), this.iteratorException);
            }
            return this.iterator;
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected final void setResultIterator(MutableObjectIterator<E> iterator) {
        Object object = this.iteratorLock;
        synchronized (object) {
            if (this.iteratorException == null) {
                this.iterator = iterator;
                this.iteratorLock.notifyAll();
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected final void setResultIteratorException(IOException ioex) {
        Object object = this.iteratorLock;
        synchronized (object) {
            if (this.iteratorException == null) {
                this.iteratorException = ioex;
                this.iteratorLock.notifyAll();
            }
        }
    }

    protected static <T> CircularElement<T> endMarker() {
        CircularElement<Object> c = EOF_MARKER;
        return c;
    }

    protected static <T> CircularElement<T> spillingMarker() {
        CircularElement<Object> c = SPILLING_MARKER;
        return c;
    }

    static /* synthetic */ LargeRecordHandler access$300(UnilateralSortMerger x0) {
        return x0.largeRecordHandler;
    }

    protected static final class ChannelWithBlockCount {
        private final FileIOChannel.ID channel;
        private final int blockCount;

        public ChannelWithBlockCount(FileIOChannel.ID channel, int blockCount) {
            this.channel = channel;
            this.blockCount = blockCount;
        }

        public FileIOChannel.ID getChannel() {
            return this.channel;
        }

        public int getBlockCount() {
            return this.blockCount;
        }
    }

    protected class SpillingThread
    extends ThreadBase<E> {
        protected final MemoryManager memManager;
        protected final IOManager ioManager;
        protected final TypeSerializer<E> serializer;
        protected final TypeComparator<E> comparator;
        protected final List<MemorySegment> writeMemory;
        protected final List<MemorySegment> mergeReadMemory;
        protected final int maxFanIn;
        protected final int numWriteBuffersToCluster;

        public SpillingThread(ExceptionHandler<IOException> exceptionHandler, CircularQueues<E> queues, AbstractInvokable parentTask, MemoryManager memManager, IOManager ioManager, TypeSerializer<E> serializer, TypeComparator<E> comparator, List<MemorySegment> sortReadMemory, List<MemorySegment> writeMemory, int maxNumFileHandles) {
            super(exceptionHandler, "SortMerger spilling thread", queues, parentTask);
            this.memManager = memManager;
            this.ioManager = ioManager;
            this.serializer = serializer;
            this.comparator = comparator;
            this.mergeReadMemory = sortReadMemory;
            this.writeMemory = writeMemory;
            this.maxFanIn = maxNumFileHandles;
            this.numWriteBuffersToCluster = writeMemory.size() >= 4 ? writeMemory.size() / 2 : 1;
        }

        /*
         * Exception decompiling
         */
        @Override
        public void go() throws IOException {
            /*
             * This method has failed to decompile.  When submitting a bug report, please provide this stack trace, and (if you hold appropriate legal rights) the relevant class file.
             * 
             * java.lang.NullPointerException: Cannot invoke "org.benf.cfr.reader.bytecode.analysis.types.BindingSuperContainer.getBoundAssignable(org.benf.cfr.reader.bytecode.analysis.types.JavaGenericRefTypeInstance, org.benf.cfr.reader.bytecode.analysis.types.JavaGenericRefTypeInstance)" because "maybeBindingContainer" is null
             *     at org.benf.cfr.reader.bytecode.analysis.types.GenericTypeBinder.extractBaseBindings(GenericTypeBinder.java:125)
             *     at org.benf.cfr.reader.bytecode.analysis.parse.rewriters.ExplicitTypeCallRewriter$InnerExplicitTypeCallRewriter.rewriteFunctionInvokation(ExplicitTypeCallRewriter.java:37)
             *     at org.benf.cfr.reader.bytecode.analysis.parse.rewriters.ExplicitTypeCallRewriter$InnerExplicitTypeCallRewriter.rewriteExpression(ExplicitTypeCallRewriter.java:56)
             *     at org.benf.cfr.reader.bytecode.analysis.parse.rewriters.ExpressionRewriterHelper.applyForwards(ExpressionRewriterHelper.java:12)
             *     at org.benf.cfr.reader.bytecode.analysis.parse.expression.AbstractMemberFunctionInvokation.applyExpressionRewriterToArgs(AbstractMemberFunctionInvokation.java:101)
             *     at org.benf.cfr.reader.bytecode.analysis.parse.rewriters.ExplicitTypeCallRewriter.rewriteExpression(ExplicitTypeCallRewriter.java:71)
             *     at org.benf.cfr.reader.bytecode.analysis.parse.statement.ExpressionStatement.rewriteExpressions(ExpressionStatement.java:40)
             *     at org.benf.cfr.reader.bytecode.analysis.opgraph.Op03SimpleStatement.rewrite(Op03SimpleStatement.java:479)
             *     at org.benf.cfr.reader.bytecode.analysis.opgraph.op3rewriters.Op03Rewriters.rewriteWith(Op03Rewriters.java:23)
             *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisInner(CodeAnalyser.java:819)
             *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysisOrWrapFail(CodeAnalyser.java:278)
             *     at org.benf.cfr.reader.bytecode.CodeAnalyser.getAnalysis(CodeAnalyser.java:201)
             *     at org.benf.cfr.reader.entities.attributes.AttributeCode.analyse(AttributeCode.java:94)
             *     at org.benf.cfr.reader.entities.Method.analyse(Method.java:531)
             *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1055)
             *     at org.benf.cfr.reader.entities.ClassFile.analyseInnerClassesPass1(ClassFile.java:923)
             *     at org.benf.cfr.reader.entities.ClassFile.analyseMid(ClassFile.java:1035)
             *     at org.benf.cfr.reader.entities.ClassFile.analyseTop(ClassFile.java:942)
             *     at org.benf.cfr.reader.Driver.doJarVersionTypes(Driver.java:257)
             *     at org.benf.cfr.reader.Driver.doJar(Driver.java:139)
             *     at org.benf.cfr.reader.CfrDriverImpl.analyse(CfrDriverImpl.java:76)
             *     at org.benf.cfr.reader.Main.main(Main.java:54)
             */
            throw new IllegalStateException("Decompilation failed");
        }

        protected final void disposeSortBuffers(boolean releaseMemory) {
            while (!this.queues.empty.isEmpty()) {
                try {
                    InMemorySorter sorter = this.queues.empty.take().buffer;
                    List<MemorySegment> sorterMem = sorter.dispose();
                    if (!releaseMemory) continue;
                    this.memManager.release(sorterMem);
                }
                catch (InterruptedException iex) {
                    if (this.isRunning()) {
                        LOG.error("Spilling thread was interrupted (without being shut down) while collecting empty buffers to release them. Retrying to collect buffers...");
                        continue;
                    }
                    return;
                }
            }
        }

        protected final CircularElement<E> takeNext(BlockingQueue<CircularElement<E>> queue, Queue<CircularElement<E>> cache) throws InterruptedException {
            return cache.isEmpty() ? queue.take() : cache.poll();
        }

        protected final MergeIterator<E> getMergingIterator(List<ChannelWithBlockCount> channelIDs, List<List<MemorySegment>> inputSegments, List<FileIOChannel> readerList, MutableObjectIterator<E> largeRecords) throws IOException {
            if (LOG.isDebugEnabled()) {
                LOG.debug("Performing merge of " + channelIDs.size() + " sorted streams.");
            }
            ArrayList iterators = new ArrayList(channelIDs.size() + 1);
            for (int i = 0; i < channelIDs.size(); ++i) {
                ChannelWithBlockCount channel = channelIDs.get(i);
                List<MemorySegment> segsForChannel = inputSegments.get(i);
                BlockChannelReader<MemorySegment> reader = this.ioManager.createBlockChannelReader(channel.getChannel());
                readerList.add(reader);
                this.registerOpenChannelToBeRemovedAtShudown(reader);
                this.unregisterChannelToBeRemovedAtShudown(channel.getChannel());
                ChannelReaderInputView inView = new ChannelReaderInputView(reader, segsForChannel, channel.getBlockCount(), false);
                iterators.add(new ChannelReaderInputViewIterator(inView, null, this.serializer));
            }
            if (largeRecords != null) {
                iterators.add(largeRecords);
            }
            return new MergeIterator(iterators, this.comparator);
        }

        protected final List<ChannelWithBlockCount> mergeChannelList(List<ChannelWithBlockCount> channelIDs, List<MemorySegment> allReadBuffers, List<MemorySegment> writeBuffers) throws IOException {
            double numMerges = Math.ceil((double)channelIDs.size() / (double)this.maxFanIn);
            int channelsToMergePerStep = (int)Math.ceil((double)channelIDs.size() / numMerges);
            ArrayList<List<MemorySegment>> readBuffers = new ArrayList<List<MemorySegment>>(channelsToMergePerStep);
            this.getSegmentsForReaders(readBuffers, allReadBuffers, channelsToMergePerStep);
            ArrayList<ChannelWithBlockCount> mergedChannelIDs = new ArrayList<ChannelWithBlockCount>((int)(numMerges + 1.0));
            ArrayList<ChannelWithBlockCount> channelsToMergeThisStep = new ArrayList<ChannelWithBlockCount>(channelsToMergePerStep);
            int channelNum = 0;
            while (this.isRunning() && channelNum < channelIDs.size()) {
                channelsToMergeThisStep.clear();
                for (int i = 0; i < channelsToMergePerStep && channelNum < channelIDs.size(); ++i, ++channelNum) {
                    channelsToMergeThisStep.add(channelIDs.get(channelNum));
                }
                if (channelsToMergeThisStep.size() < 2) {
                    mergedChannelIDs.addAll(channelsToMergeThisStep);
                    continue;
                }
                mergedChannelIDs.add(this.mergeChannels(channelsToMergeThisStep, readBuffers, writeBuffers));
            }
            return mergedChannelIDs;
        }

        protected ChannelWithBlockCount mergeChannels(List<ChannelWithBlockCount> channelIDs, List<List<MemorySegment>> readBuffers, List<MemorySegment> writeBuffers) throws IOException {
            ArrayList<FileIOChannel> channelAccesses = new ArrayList<FileIOChannel>(channelIDs.size());
            MergeIterator<Object> mergeIterator = this.getMergingIterator(channelIDs, readBuffers, channelAccesses, null);
            FileIOChannel.ID mergedChannelID = this.ioManager.createChannel();
            this.registerChannelToBeRemovedAtShudown(mergedChannelID);
            BlockChannelWriter<MemorySegment> writer = this.ioManager.createBlockChannelWriter(mergedChannelID);
            this.registerOpenChannelToBeRemovedAtShudown(writer);
            ChannelWriterOutputView output = new ChannelWriterOutputView(writer, writeBuffers, this.memManager.getPageSize());
            TypeSerializer serializer = this.serializer;
            Object rec = serializer.createInstance();
            while ((rec = mergeIterator.next(rec)) != null) {
                serializer.serialize(rec, (DataOutputView)output);
            }
            output.close();
            int numBlocksWritten = output.getBlockCount();
            this.unregisterOpenChannelToBeRemovedAtShudown(writer);
            for (int i = 0; i < channelAccesses.size(); ++i) {
                FileIOChannel access = (FileIOChannel)channelAccesses.get(i);
                access.closeAndDelete();
                this.unregisterOpenChannelToBeRemovedAtShudown(access);
            }
            return new ChannelWithBlockCount(mergedChannelID, numBlocksWritten);
        }

        protected final void getSegmentsForReaders(List<List<MemorySegment>> target, List<MemorySegment> memory, int numChannels) {
            int k;
            ArrayList<MemorySegment> segs;
            int i;
            int numBuffers = memory.size();
            int buffersPerChannelLowerBound = numBuffers / numChannels;
            int numChannelsWithOneMore = numBuffers % numChannels;
            Iterator<MemorySegment> segments = memory.iterator();
            for (i = 0; i < numChannelsWithOneMore; ++i) {
                segs = new ArrayList<MemorySegment>(buffersPerChannelLowerBound + 1);
                target.add(segs);
                for (k = buffersPerChannelLowerBound; k >= 0; --k) {
                    segs.add(segments.next());
                }
            }
            for (i = numChannelsWithOneMore; i < numChannels; ++i) {
                segs = new ArrayList(buffersPerChannelLowerBound);
                target.add(segs);
                for (k = buffersPerChannelLowerBound; k > 0; --k) {
                    segs.add(segments.next());
                }
            }
        }

        protected void registerChannelToBeRemovedAtShudown(FileIOChannel.ID channel) {
            UnilateralSortMerger.this.channelsToDeleteAtShutdown.add(channel);
        }

        protected void unregisterChannelToBeRemovedAtShudown(FileIOChannel.ID channel) {
            UnilateralSortMerger.this.channelsToDeleteAtShutdown.remove(channel);
        }

        protected void registerOpenChannelToBeRemovedAtShudown(FileIOChannel channel) {
            UnilateralSortMerger.this.openChannels.add(channel);
        }

        protected void unregisterOpenChannelToBeRemovedAtShudown(FileIOChannel channel) {
            UnilateralSortMerger.this.openChannels.remove(channel);
        }
    }

    protected static class SortingThread<E>
    extends ThreadBase<E> {
        private final IndexedSorter sorter = new QuickSort();

        public SortingThread(ExceptionHandler<IOException> exceptionHandler, CircularQueues<E> queues, AbstractInvokable parentTask) {
            super(exceptionHandler, "SortMerger sorting thread", queues, parentTask);
        }

        @Override
        public void go() throws IOException {
            boolean alive = true;
            while (this.isRunning() && alive) {
                CircularElement element = null;
                try {
                    element = this.queues.sort.take();
                }
                catch (InterruptedException iex) {
                    if (this.isRunning()) {
                        if (!LOG.isErrorEnabled()) continue;
                        LOG.error("Sorting thread was interrupted (without being shut down) while grabbing a buffer. Retrying to grab buffer...");
                        continue;
                    }
                    return;
                }
                if (element != EOF_MARKER && element != SPILLING_MARKER) {
                    if (element.buffer.size() == 0) {
                        element.buffer.reset();
                        this.queues.empty.add(element);
                        continue;
                    }
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Sorting buffer " + element.id + ".");
                    }
                    this.sorter.sort(element.buffer);
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Sorted buffer " + element.id + ".");
                    }
                } else if (element == EOF_MARKER) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Sorting thread done.");
                    }
                    alive = false;
                }
                this.queues.spill.add(element);
            }
        }
    }

    protected static class ReadingThread<E>
    extends ThreadBase<E> {
        private final MutableObjectIterator<E> reader;
        private final LargeRecordHandler<E> largeRecords;
        private final long startSpillingBytes;
        private final E readTarget;

        public ReadingThread(ExceptionHandler<IOException> exceptionHandler, MutableObjectIterator<E> reader, CircularQueues<E> queues, LargeRecordHandler<E> largeRecordsHandler, E readTarget, AbstractInvokable parentTask, long startSpillingBytes) {
            super(exceptionHandler, "SortMerger Reading Thread", queues, parentTask);
            this.reader = reader;
            this.readTarget = readTarget;
            this.startSpillingBytes = startSpillingBytes;
            this.largeRecords = largeRecordsHandler;
        }

        @Override
        public void go() throws IOException {
            MutableObjectIterator<E> reader = this.reader;
            Object current = this.readTarget;
            Object leftoverRecord = null;
            CircularElement element = null;
            long bytesUntilSpilling = this.startSpillingBytes;
            boolean done = false;
            if (bytesUntilSpilling < 1L) {
                bytesUntilSpilling = 0L;
                this.queues.sort.add(UnilateralSortMerger.spillingMarker());
            }
            while (!done && this.isRunning()) {
                while (element == null) {
                    try {
                        element = this.queues.empty.take();
                    }
                    catch (InterruptedException iex) {
                        throw new IOException(iex);
                    }
                }
                InMemorySorter buffer = element.buffer;
                if (!buffer.isEmpty()) {
                    throw new IOException("New buffer is not empty.");
                }
                if (LOG.isDebugEnabled()) {
                    LOG.debug("Retrieved empty read buffer " + element.id + ".");
                }
                if (leftoverRecord != null) {
                    if (!buffer.write(leftoverRecord)) {
                        if (this.largeRecords != null) {
                            if (LOG.isDebugEnabled()) {
                                LOG.debug("Large record did not fit into a fresh sort buffer. Putting into large record store.");
                            }
                        } else {
                            throw new IOException("The record exceeds the maximum size of a sort buffer (current maximum: " + buffer.getCapacity() + " bytes).");
                        }
                        this.largeRecords.addRecord(leftoverRecord);
                        buffer.reset();
                    }
                    leftoverRecord = null;
                }
                boolean available = true;
                if (bytesUntilSpilling > 0L && buffer.getCapacity() >= bytesUntilSpilling) {
                    CircularElement SPILLING_MARKER;
                    Object newCurrent;
                    boolean fullBuffer = false;
                    while (this.isRunning() && (available = (newCurrent = reader.next(current)) != null)) {
                        current = newCurrent;
                        if (!buffer.write(current)) {
                            leftoverRecord = current;
                            fullBuffer = true;
                            break;
                        }
                        if (bytesUntilSpilling - buffer.getOccupancy() > 0L) continue;
                        bytesUntilSpilling = 0L;
                        SPILLING_MARKER = UnilateralSortMerger.spillingMarker();
                        this.queues.sort.add(SPILLING_MARKER);
                        break;
                    }
                    if (fullBuffer) {
                        if (bytesUntilSpilling > 0L && (bytesUntilSpilling -= buffer.getCapacity()) <= 0L) {
                            bytesUntilSpilling = 0L;
                            SPILLING_MARKER = UnilateralSortMerger.spillingMarker();
                            this.queues.sort.add(SPILLING_MARKER);
                        }
                        if (LOG.isDebugEnabled()) {
                            LOG.debug("Emitting full buffer from reader thread: " + element.id + ".");
                        }
                        this.queues.sort.add(element);
                        element = null;
                        continue;
                    }
                } else if (bytesUntilSpilling > 0L && (bytesUntilSpilling -= buffer.getCapacity()) <= 0L) {
                    bytesUntilSpilling = 0L;
                    CircularElement SPILLING_MARKER = UnilateralSortMerger.spillingMarker();
                    this.queues.sort.add(SPILLING_MARKER);
                }
                if (available) {
                    Object newCurrent;
                    while (this.isRunning() && (newCurrent = reader.next(current)) != null) {
                        current = newCurrent;
                        if (buffer.write(current)) continue;
                        leftoverRecord = current;
                        break;
                    }
                }
                if (leftoverRecord != null) {
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Emitting full buffer from reader thread: " + element.id + ".");
                    }
                } else {
                    done = true;
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Emitting final buffer from reader thread: " + element.id + ".");
                    }
                }
                if (!buffer.isEmpty()) {
                    this.queues.sort.add(element);
                } else {
                    buffer.reset();
                    this.queues.empty.add(element);
                }
                element = null;
            }
            if (!this.isRunning()) {
                return;
            }
            CircularElement EOF_MARKER = UnilateralSortMerger.endMarker();
            this.queues.sort.add(EOF_MARKER);
            LOG.debug("Reading thread done.");
        }
    }

    protected static abstract class ThreadBase<E>
    extends Thread
    implements Thread.UncaughtExceptionHandler {
        protected final CircularQueues<E> queues;
        private final ExceptionHandler<IOException> exceptionHandler;
        private volatile boolean alive;

        protected ThreadBase(ExceptionHandler<IOException> exceptionHandler, String name, CircularQueues<E> queues, AbstractInvokable parentTask) {
            super(name);
            this.setDaemon(true);
            this.exceptionHandler = exceptionHandler;
            this.setUncaughtExceptionHandler(this);
            this.queues = queues;
            this.alive = true;
        }

        @Override
        public void run() {
            try {
                this.go();
            }
            catch (Throwable t) {
                this.internalHandleException(new IOException("Thread '" + this.getName() + "' terminated due to an exception: " + t.getMessage(), t));
            }
        }

        protected abstract void go() throws IOException;

        public boolean isRunning() {
            return this.alive;
        }

        public void shutdown() {
            this.alive = false;
            this.interrupt();
        }

        protected final void internalHandleException(IOException ioex) {
            if (!this.isRunning()) {
                return;
            }
            if (this.exceptionHandler != null) {
                try {
                    this.exceptionHandler.handleException(ioex);
                }
                catch (Throwable throwable) {
                    // empty catch block
                }
            }
        }

        @Override
        public void uncaughtException(Thread t, Throwable e) {
            this.internalHandleException(new IOException("Thread '" + t.getName() + "' terminated due to an uncaught exception: " + e.getMessage(), e));
        }
    }

    protected static final class CircularQueues<E> {
        final BlockingQueue<CircularElement<E>> empty;
        final BlockingQueue<CircularElement<E>> sort;
        final BlockingQueue<CircularElement<E>> spill;

        public CircularQueues() {
            this.empty = new LinkedBlockingQueue<CircularElement<E>>();
            this.sort = new LinkedBlockingQueue<CircularElement<E>>();
            this.spill = new LinkedBlockingQueue<CircularElement<E>>();
        }

        public CircularQueues(int numElements) {
            this.empty = new ArrayBlockingQueue<CircularElement<E>>(numElements);
            this.sort = new ArrayBlockingQueue<CircularElement<E>>(numElements);
            this.spill = new ArrayBlockingQueue<CircularElement<E>>(numElements);
        }
    }

    protected static final class CircularElement<E> {
        final int id;
        final InMemorySorter<E> buffer;

        public CircularElement() {
            this.id = -1;
            this.buffer = null;
        }

        public CircularElement(int id, InMemorySorter<E> buffer) {
            this.id = id;
            this.buffer = buffer;
        }
    }
}

