/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.operators;

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.CrossFunction;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.runtime.memory.MemoryManager;
import org.apache.flink.runtime.operators.Driver;
import org.apache.flink.runtime.operators.DriverStrategy;
import org.apache.flink.runtime.operators.TaskContext;
import org.apache.flink.runtime.operators.resettable.BlockResettableMutableObjectIterator;
import org.apache.flink.runtime.operators.resettable.SpillingResettableMutableObjectIterator;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.util.Collector;
import org.apache.flink.util.MutableObjectIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class CrossDriver<T1, T2, OT>
implements Driver<CrossFunction<T1, T2, OT>, OT> {
    private static final Logger LOG = LoggerFactory.getLogger(CrossDriver.class);
    private TaskContext<CrossFunction<T1, T2, OT>, OT> taskContext;
    private MemoryManager memManager;
    private SpillingResettableMutableObjectIterator<?> spillIter;
    private BlockResettableMutableObjectIterator<?> blockIter;
    private int memPagesForBlockSide;
    private int memPagesForSpillingSide;
    private boolean blocked;
    private boolean firstIsOuter;
    private volatile boolean running;
    private boolean objectReuseEnabled = false;

    @Override
    public void setup(TaskContext<CrossFunction<T1, T2, OT>, OT> context) {
        this.taskContext = context;
        this.running = true;
    }

    @Override
    public int getNumberOfInputs() {
        return 2;
    }

    @Override
    public Class<CrossFunction<T1, T2, OT>> getStubType() {
        Class<CrossFunction> clazz = CrossFunction.class;
        return clazz;
    }

    @Override
    public int getNumberOfDriverComparators() {
        return 0;
    }

    @Override
    public void prepare() throws Exception {
        TaskConfig config = this.taskContext.getTaskConfig();
        DriverStrategy ls = config.getDriverStrategy();
        switch (ls) {
            case NESTEDLOOP_BLOCKED_OUTER_FIRST: {
                this.blocked = true;
                this.firstIsOuter = true;
                break;
            }
            case NESTEDLOOP_BLOCKED_OUTER_SECOND: {
                this.blocked = true;
                this.firstIsOuter = false;
                break;
            }
            case NESTEDLOOP_STREAMED_OUTER_FIRST: {
                this.blocked = false;
                this.firstIsOuter = true;
                break;
            }
            case NESTEDLOOP_STREAMED_OUTER_SECOND: {
                this.blocked = false;
                this.firstIsOuter = false;
                break;
            }
            default: {
                throw new RuntimeException("Invalid local strategy for CROSS: " + (Object)((Object)ls));
            }
        }
        this.memManager = this.taskContext.getMemoryManager();
        int numPages = this.memManager.computeNumberOfPages(config.getRelativeMemoryDriver());
        if (numPages < 2) {
            throw new RuntimeException("The Cross task was initialized with too little memory. Cross requires at least 2 memory pages.");
        }
        if (ls == DriverStrategy.NESTEDLOOP_STREAMED_OUTER_FIRST || ls == DriverStrategy.NESTEDLOOP_STREAMED_OUTER_SECOND) {
            this.memPagesForSpillingSide = numPages;
            this.memPagesForBlockSide = 0;
        } else {
            this.memPagesForSpillingSide = numPages > 32 ? 2 : 1;
            this.memPagesForBlockSide = numPages - this.memPagesForSpillingSide;
        }
        ExecutionConfig executionConfig = this.taskContext.getExecutionConfig();
        this.objectReuseEnabled = executionConfig.isObjectReuseEnabled();
        if (LOG.isDebugEnabled()) {
            LOG.debug("CrossDriver object reuse: " + (this.objectReuseEnabled ? "ENABLED" : "DISABLED") + ".");
        }
    }

    @Override
    public void run() throws Exception {
        if (this.blocked) {
            if (this.firstIsOuter) {
                this.runBlockedOuterFirst();
            } else {
                this.runBlockedOuterSecond();
            }
        } else if (this.firstIsOuter) {
            this.runStreamedOuterFirst();
        } else {
            this.runStreamedOuterSecond();
        }
    }

    @Override
    public void cleanup() throws Exception {
        if (this.spillIter != null) {
            this.spillIter.close();
            this.spillIter = null;
        }
        if (this.blockIter != null) {
            this.blockIter.close();
            this.blockIter = null;
        }
    }

    @Override
    public void cancel() {
        this.running = false;
    }

    private void runBlockedOuterFirst() throws Exception {
        if (LOG.isDebugEnabled()) {
            LOG.debug(this.taskContext.formatLogString("Running Cross with Block-Nested-Loops: First input is outer (blocking) side, second input is inner (spilling) side."));
        }
        MutableObjectIterator in1 = this.taskContext.getInput(0);
        MutableObjectIterator in2 = this.taskContext.getInput(1);
        TypeSerializer serializer1 = this.taskContext.getInputSerializer(0).getSerializer();
        TypeSerializer serializer2 = this.taskContext.getInputSerializer(1).getSerializer();
        BlockResettableMutableObjectIterator blockVals = new BlockResettableMutableObjectIterator(this.memManager, in1, serializer1, this.memPagesForBlockSide, this.taskContext.getOwningNepheleTask());
        this.blockIter = blockVals;
        SpillingResettableMutableObjectIterator spillVals = new SpillingResettableMutableObjectIterator(in2, serializer2, this.memManager, this.taskContext.getIOManager(), this.memPagesForSpillingSide, this.taskContext.getOwningNepheleTask());
        this.spillIter = spillVals;
        CrossFunction<T1, T2, OT> crosser = this.taskContext.getStub();
        Collector<OT> collector = this.taskContext.getOutputCollector();
        if (this.objectReuseEnabled) {
            Object val1Reuse = serializer1.createInstance();
            Object val2Reuse = serializer2.createInstance();
            while (true) {
                Object val2;
                if (this.running && (val2 = spillVals.next(val2Reuse)) != null) {
                    Object val1;
                    while ((val1 = blockVals.next(val1Reuse)) != null) {
                        collector.collect(crosser.cross(val1, val2));
                    }
                    blockVals.reset();
                    continue;
                }
                spillVals.reset();
                if (!this.running || !blockVals.nextBlock()) break;
            }
        } else {
            while (true) {
                Object val2;
                if (this.running && (val2 = spillVals.next()) != null) {
                    Object val1;
                    while ((val1 = blockVals.next()) != null) {
                        collector.collect(crosser.cross(val1, serializer2.copy(val2)));
                    }
                    blockVals.reset();
                    continue;
                }
                spillVals.reset();
                if (!this.running || !blockVals.nextBlock()) break;
            }
        }
    }

    private void runBlockedOuterSecond() throws Exception {
        if (LOG.isDebugEnabled()) {
            LOG.debug(this.taskContext.formatLogString("Running Cross with Block-Nested-Loops: First input is inner (spilling) side, second input is outer (blocking) side."));
        }
        MutableObjectIterator in1 = this.taskContext.getInput(0);
        MutableObjectIterator in2 = this.taskContext.getInput(1);
        TypeSerializer serializer1 = this.taskContext.getInputSerializer(0).getSerializer();
        TypeSerializer serializer2 = this.taskContext.getInputSerializer(1).getSerializer();
        SpillingResettableMutableObjectIterator spillVals = new SpillingResettableMutableObjectIterator(in1, serializer1, this.memManager, this.taskContext.getIOManager(), this.memPagesForSpillingSide, this.taskContext.getOwningNepheleTask());
        this.spillIter = spillVals;
        BlockResettableMutableObjectIterator blockVals = new BlockResettableMutableObjectIterator(this.memManager, in2, serializer2, this.memPagesForBlockSide, this.taskContext.getOwningNepheleTask());
        this.blockIter = blockVals;
        CrossFunction<T1, T2, OT> crosser = this.taskContext.getStub();
        Collector<OT> collector = this.taskContext.getOutputCollector();
        if (this.objectReuseEnabled) {
            Object val1Reuse = serializer1.createInstance();
            Object val2Reuse = serializer2.createInstance();
            while (true) {
                Object val1;
                if (this.running && (val1 = spillVals.next(val1Reuse)) != null) {
                    Object val2;
                    while (this.running && (val2 = blockVals.next(val2Reuse)) != null) {
                        collector.collect(crosser.cross(val1, val2));
                    }
                    blockVals.reset();
                    continue;
                }
                spillVals.reset();
                if (!this.running || !blockVals.nextBlock()) break;
            }
        } else {
            while (true) {
                Object val1;
                if (this.running && (val1 = spillVals.next()) != null) {
                    Object val2;
                    while (this.running && (val2 = blockVals.next()) != null) {
                        collector.collect(crosser.cross(serializer1.copy(val1), val2));
                    }
                    blockVals.reset();
                    continue;
                }
                spillVals.reset();
                if (!this.running || !blockVals.nextBlock()) break;
            }
        }
    }

    private void runStreamedOuterFirst() throws Exception {
        if (LOG.isDebugEnabled()) {
            LOG.debug(this.taskContext.formatLogString("Running Cross with Nested-Loops: First input is outer side, second input is inner (spilling) side."));
        }
        MutableObjectIterator in1 = this.taskContext.getInput(0);
        MutableObjectIterator in2 = this.taskContext.getInput(1);
        TypeSerializer serializer1 = this.taskContext.getInputSerializer(0).getSerializer();
        TypeSerializer serializer2 = this.taskContext.getInputSerializer(1).getSerializer();
        SpillingResettableMutableObjectIterator spillVals = new SpillingResettableMutableObjectIterator(in2, serializer2, this.memManager, this.taskContext.getIOManager(), this.memPagesForSpillingSide, this.taskContext.getOwningNepheleTask());
        this.spillIter = spillVals;
        CrossFunction<T1, T2, OT> crosser = this.taskContext.getStub();
        Collector<OT> collector = this.taskContext.getOutputCollector();
        if (this.objectReuseEnabled) {
            Object val1;
            Object val1Reuse = serializer1.createInstance();
            Object val2Reuse = serializer2.createInstance();
            while (this.running && (val1 = in1.next(val1Reuse)) != null) {
                Object val2;
                while (this.running && (val2 = spillVals.next(val2Reuse)) != null) {
                    collector.collect(crosser.cross(val1, val2));
                }
                spillVals.reset();
            }
        } else {
            Object val1;
            while (this.running && (val1 = in1.next()) != null) {
                Object val2;
                while (this.running && (val2 = spillVals.next()) != null) {
                    collector.collect(crosser.cross(serializer1.copy(val1), val2));
                }
                spillVals.reset();
            }
        }
    }

    private void runStreamedOuterSecond() throws Exception {
        if (LOG.isDebugEnabled()) {
            LOG.debug(this.taskContext.formatLogString("Running Cross with Nested-Loops: First input is inner (spilling) side, second input is outer side."));
        }
        MutableObjectIterator in1 = this.taskContext.getInput(0);
        MutableObjectIterator in2 = this.taskContext.getInput(1);
        TypeSerializer serializer1 = this.taskContext.getInputSerializer(0).getSerializer();
        TypeSerializer serializer2 = this.taskContext.getInputSerializer(1).getSerializer();
        SpillingResettableMutableObjectIterator spillVals = new SpillingResettableMutableObjectIterator(in1, serializer1, this.memManager, this.taskContext.getIOManager(), this.memPagesForSpillingSide, this.taskContext.getOwningNepheleTask());
        this.spillIter = spillVals;
        CrossFunction<T1, T2, OT> crosser = this.taskContext.getStub();
        Collector<OT> collector = this.taskContext.getOutputCollector();
        if (this.objectReuseEnabled) {
            Object val2;
            Object val1Reuse = serializer1.createInstance();
            Object val2Reuse = serializer2.createInstance();
            while (this.running && (val2 = in2.next(val2Reuse)) != null) {
                Object val1;
                while (this.running && (val1 = spillVals.next(val1Reuse)) != null) {
                    collector.collect(crosser.cross(val1, val2));
                }
                spillVals.reset();
            }
        } else {
            Object val2;
            while (this.running && (val2 = in2.next()) != null) {
                Object val1;
                while (this.running && (val1 = spillVals.next()) != null) {
                    collector.collect(crosser.cross(val1, serializer2.copy(val2)));
                }
                spillVals.reset();
            }
        }
    }
}

