/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.partition.consumer;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.BitSet;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.JobID;
import org.apache.flink.runtime.deployment.InputChannelDeploymentDescriptor;
import org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
import org.apache.flink.runtime.deployment.ResultPartitionLocation;
import org.apache.flink.runtime.event.task.AbstractEvent;
import org.apache.flink.runtime.event.task.TaskEvent;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.io.network.NetworkEnvironment;
import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
import org.apache.flink.runtime.io.network.buffer.Buffer;
import org.apache.flink.runtime.io.network.buffer.BufferPool;
import org.apache.flink.runtime.io.network.buffer.BufferProvider;
import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
import org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.io.network.partition.consumer.UnknownInputChannel;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
import org.apache.flink.runtime.util.event.EventListener;
import org.apache.flink.shaded.com.google.common.base.Preconditions;
import org.apache.flink.shaded.com.google.common.collect.Maps;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class SingleInputGate
implements InputGate {
    private static final Logger LOG = LoggerFactory.getLogger(SingleInputGate.class);
    private final Object requestLock = new Object();
    private final String owningTaskName;
    private final JobID jobId;
    private final ExecutionAttemptID executionId;
    private final IntermediateDataSetID consumedResultId;
    private final int consumedSubpartitionIndex;
    private final int numberOfInputChannels;
    private final Map<IntermediateResultPartitionID, InputChannel> inputChannels;
    private final BlockingQueue<InputChannel> inputChannelsWithData = new LinkedBlockingQueue<InputChannel>();
    private final BitSet channelsWithEndOfPartitionEvents;
    private final PartitionStateChecker partitionStateChecker;
    private BufferPool bufferPool;
    private boolean hasReceivedAllEndOfPartitionEvents;
    private boolean requestedPartitionsFlag;
    private volatile boolean isReleased;
    private final List<EventListener<InputGate>> registeredListeners = new CopyOnWriteArrayList<EventListener<InputGate>>();
    private final List<TaskEvent> pendingEvents = new ArrayList<TaskEvent>();
    private int numberOfUninitializedChannels;
    private Timer retriggerLocalRequestTimer;

    public SingleInputGate(String owningTaskName, JobID jobId, ExecutionAttemptID executionId, IntermediateDataSetID consumedResultId, int consumedSubpartitionIndex, int numberOfInputChannels, PartitionStateChecker partitionStateChecker) {
        this.owningTaskName = Preconditions.checkNotNull(owningTaskName);
        this.jobId = Preconditions.checkNotNull(jobId);
        this.executionId = Preconditions.checkNotNull(executionId);
        this.consumedResultId = Preconditions.checkNotNull(consumedResultId);
        Preconditions.checkArgument(consumedSubpartitionIndex >= 0);
        this.consumedSubpartitionIndex = consumedSubpartitionIndex;
        Preconditions.checkArgument(numberOfInputChannels > 0);
        this.numberOfInputChannels = numberOfInputChannels;
        this.inputChannels = Maps.newHashMapWithExpectedSize(numberOfInputChannels);
        this.channelsWithEndOfPartitionEvents = new BitSet(numberOfInputChannels);
        this.partitionStateChecker = Preconditions.checkNotNull(partitionStateChecker);
    }

    @Override
    public int getNumberOfInputChannels() {
        return this.numberOfInputChannels;
    }

    public IntermediateDataSetID getConsumedResultId() {
        return this.consumedResultId;
    }

    BufferProvider getBufferProvider() {
        return this.bufferPool;
    }

    public void setBufferPool(BufferPool bufferPool) {
        Preconditions.checkArgument(this.numberOfInputChannels == bufferPool.getNumberOfRequiredMemorySegments(), "Bug in input gate setup logic: buffer pool has not enough guaranteed buffers for this input gate. Input gates require at least as many buffers as there are input channels.");
        Preconditions.checkState(this.bufferPool == null, "Bug in input gate setup logic: buffer pool hasalready been set for this input gate.");
        this.bufferPool = Preconditions.checkNotNull(bufferPool);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void setInputChannel(IntermediateResultPartitionID partitionId, InputChannel inputChannel) {
        Object object = this.requestLock;
        synchronized (object) {
            if (this.inputChannels.put(Preconditions.checkNotNull(partitionId), Preconditions.checkNotNull(inputChannel)) == null && inputChannel.getClass() == UnknownInputChannel.class) {
                ++this.numberOfUninitializedChannels;
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void updateInputChannel(InputChannelDeploymentDescriptor icdd) throws IOException, InterruptedException {
        Object object = this.requestLock;
        synchronized (object) {
            if (this.isReleased) {
                return;
            }
            IntermediateResultPartitionID partitionId = icdd.getConsumedPartitionId().getPartitionId();
            InputChannel current = this.inputChannels.get((Object)partitionId);
            if (current.getClass() == UnknownInputChannel.class) {
                InputChannel newChannel;
                UnknownInputChannel unknownChannel = (UnknownInputChannel)current;
                ResultPartitionLocation partitionLocation = icdd.getConsumedPartitionLocation();
                if (partitionLocation.isLocal()) {
                    newChannel = unknownChannel.toLocalInputChannel();
                } else if (partitionLocation.isRemote()) {
                    newChannel = unknownChannel.toRemoteInputChannel(partitionLocation.getConnectionId());
                } else {
                    throw new IllegalStateException("Tried to update unknown channel with unknown channel.");
                }
                LOG.debug("Updated unknown input channel to {}.", (Object)newChannel);
                this.inputChannels.put(partitionId, newChannel);
                newChannel.requestSubpartition(this.consumedSubpartitionIndex);
                for (TaskEvent event : this.pendingEvents) {
                    newChannel.sendTaskEvent(event);
                }
                if (--this.numberOfUninitializedChannels == 0) {
                    this.pendingEvents.clear();
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void retriggerPartitionRequest(IntermediateResultPartitionID partitionId) throws IOException, InterruptedException {
        Object object = this.requestLock;
        synchronized (object) {
            if (!this.isReleased) {
                InputChannel ch = this.inputChannels.get((Object)partitionId);
                Preconditions.checkNotNull(ch, "Unknown input channel with ID " + (Object)((Object)partitionId));
                LOG.debug("Retriggering partition request {}:{}.", (Object)ch.partitionId, (Object)this.consumedSubpartitionIndex);
                if (ch.getClass() == RemoteInputChannel.class) {
                    RemoteInputChannel rch = (RemoteInputChannel)ch;
                    rch.retriggerSubpartitionRequest(this.consumedSubpartitionIndex);
                } else if (ch.getClass() == LocalInputChannel.class) {
                    LocalInputChannel ich = (LocalInputChannel)ch;
                    if (this.retriggerLocalRequestTimer == null) {
                        this.retriggerLocalRequestTimer = new Timer(true);
                    }
                    ich.retriggerSubpartitionRequest(this.retriggerLocalRequestTimer, this.consumedSubpartitionIndex);
                } else {
                    throw new IllegalStateException("Unexpected type of channel to retrigger partition: " + ch.getClass());
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void releaseAllResources() throws IOException {
        Object object = this.requestLock;
        synchronized (object) {
            if (!this.isReleased) {
                try {
                    LOG.debug("{}: Releasing {}.", (Object)this.owningTaskName, (Object)this);
                    if (this.retriggerLocalRequestTimer != null) {
                        this.retriggerLocalRequestTimer.cancel();
                    }
                    for (InputChannel inputChannel : this.inputChannels.values()) {
                        try {
                            inputChannel.releaseAllResources();
                        }
                        catch (IOException e) {
                            LOG.warn("Error during release of channel resources: " + e.getMessage(), (Throwable)e);
                        }
                    }
                    if (this.bufferPool != null) {
                        this.bufferPool.lazyDestroy();
                    }
                }
                finally {
                    this.isReleased = true;
                }
            }
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public boolean isFinished() {
        Object object = this.requestLock;
        synchronized (object) {
            for (InputChannel inputChannel : this.inputChannels.values()) {
                if (inputChannel.isReleased()) continue;
                return false;
            }
        }
        return true;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void requestPartitions() throws IOException, InterruptedException {
        if (this.numberOfInputChannels != this.inputChannels.size()) {
            throw new IllegalStateException("Bug in input gate setup logic: mismatch betweennumber of total input channels and the currently set number of input channels.");
        }
        if (!this.requestedPartitionsFlag) {
            Object object = this.requestLock;
            synchronized (object) {
                for (InputChannel inputChannel : this.inputChannels.values()) {
                    inputChannel.requestSubpartition(this.consumedSubpartitionIndex);
                }
            }
            this.requestedPartitionsFlag = true;
        }
    }

    @Override
    public BufferOrEvent getNextBufferOrEvent() throws IOException, InterruptedException {
        if (this.hasReceivedAllEndOfPartitionEvents) {
            return null;
        }
        if (this.isReleased) {
            throw new IllegalStateException("Already released.");
        }
        this.requestPartitions();
        InputChannel currentChannel = null;
        while (currentChannel == null) {
            currentChannel = this.inputChannelsWithData.poll(2L, TimeUnit.SECONDS);
        }
        Buffer buffer = currentChannel.getNextBuffer();
        if (buffer == null) {
            throw new IllegalStateException("Bug in input gate/channel logic: input gate gotnotified by channel about available data, but none was available.");
        }
        if (buffer.isBuffer()) {
            return new BufferOrEvent(buffer, currentChannel.getChannelIndex());
        }
        AbstractEvent event = EventSerializer.fromBuffer(buffer, this.getClass().getClassLoader());
        if (event.getClass() == EndOfPartitionEvent.class) {
            this.channelsWithEndOfPartitionEvents.set(currentChannel.getChannelIndex());
            if (this.channelsWithEndOfPartitionEvents.cardinality() == this.numberOfInputChannels) {
                this.hasReceivedAllEndOfPartitionEvents = true;
            }
            currentChannel.notifySubpartitionConsumed();
            currentChannel.releaseAllResources();
        }
        return new BufferOrEvent(event, currentChannel.getChannelIndex());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void sendTaskEvent(TaskEvent event) throws IOException {
        Object object = this.requestLock;
        synchronized (object) {
            for (InputChannel inputChannel : this.inputChannels.values()) {
                inputChannel.sendTaskEvent(event);
            }
            if (this.numberOfUninitializedChannels > 0) {
                this.pendingEvents.add(event);
            }
        }
    }

    @Override
    public void registerListener(EventListener<InputGate> listener) {
        this.registeredListeners.add(Preconditions.checkNotNull(listener));
    }

    public void onAvailableBuffer(InputChannel channel) {
        this.inputChannelsWithData.add(channel);
        for (EventListener<InputGate> registeredListener : this.registeredListeners) {
            registeredListener.onEvent(this);
        }
    }

    void triggerPartitionStateCheck(ResultPartitionID partitionId) {
        this.partitionStateChecker.triggerPartitionStateCheck(this.jobId, this.executionId, this.consumedResultId, partitionId);
    }

    public static SingleInputGate create(String owningTaskName, JobID jobId, ExecutionAttemptID executionId, InputGateDeploymentDescriptor igdd, NetworkEnvironment networkEnvironment) {
        IntermediateDataSetID consumedResultId = Preconditions.checkNotNull(igdd.getConsumedResultId());
        int consumedSubpartitionIndex = igdd.getConsumedSubpartitionIndex();
        Preconditions.checkArgument(consumedSubpartitionIndex >= 0);
        InputChannelDeploymentDescriptor[] icdd = Preconditions.checkNotNull(igdd.getInputChannelDeploymentDescriptors());
        SingleInputGate inputGate = new SingleInputGate(owningTaskName, jobId, executionId, consumedResultId, consumedSubpartitionIndex, icdd.length, networkEnvironment.getPartitionStateChecker());
        Object[] inputChannels = new InputChannel[icdd.length];
        for (int i = 0; i < inputChannels.length; ++i) {
            ResultPartitionID partitionId = icdd[i].getConsumedPartitionId();
            ResultPartitionLocation partitionLocation = icdd[i].getConsumedPartitionLocation();
            if (partitionLocation.isLocal()) {
                inputChannels[i] = new LocalInputChannel(inputGate, i, partitionId, networkEnvironment.getPartitionManager(), networkEnvironment.getTaskEventDispatcher(), networkEnvironment.getPartitionRequestInitialAndMaxBackoff());
            } else if (partitionLocation.isRemote()) {
                inputChannels[i] = new RemoteInputChannel(inputGate, i, partitionId, partitionLocation.getConnectionId(), networkEnvironment.getConnectionManager(), networkEnvironment.getPartitionRequestInitialAndMaxBackoff());
            } else if (partitionLocation.isUnknown()) {
                inputChannels[i] = new UnknownInputChannel(inputGate, i, partitionId, networkEnvironment.getPartitionManager(), networkEnvironment.getTaskEventDispatcher(), networkEnvironment.getConnectionManager(), networkEnvironment.getPartitionRequestInitialAndMaxBackoff());
            } else {
                throw new IllegalStateException("Unexpected partition location.");
            }
            inputGate.setInputChannel(partitionId.getPartitionId(), (InputChannel)inputChannels[i]);
        }
        LOG.debug("Created input channels {} from {}.", (Object)Arrays.toString(inputChannels), (Object)igdd);
        return inputGate;
    }
}

