/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.io.network.netty;

import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.util.concurrent.GenericFutureListener;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.apache.flink.runtime.event.TaskEvent;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.netty.NettyMessage;
import org.apache.flink.runtime.io.network.netty.PartitionRequestClientFactory;
import org.apache.flink.runtime.io.network.netty.PartitionRequestClientHandler;
import org.apache.flink.runtime.io.network.netty.exception.LocalTransportException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
import org.apache.flink.runtime.util.AtomicDisposableReferenceCounter;
import org.apache.flink.shaded.com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PartitionRequestClient {
    private static final Logger LOG = LoggerFactory.getLogger(PartitionRequestClient.class);
    private final Channel tcpChannel;
    private final PartitionRequestClientHandler partitionRequestHandler;
    private final ConnectionID connectionId;
    private final PartitionRequestClientFactory clientFactory;
    private final AtomicDisposableReferenceCounter closeReferenceCounter = new AtomicDisposableReferenceCounter();

    PartitionRequestClient(Channel tcpChannel, PartitionRequestClientHandler partitionRequestHandler, ConnectionID connectionId, PartitionRequestClientFactory clientFactory) {
        this.tcpChannel = Preconditions.checkNotNull(tcpChannel);
        this.partitionRequestHandler = Preconditions.checkNotNull(partitionRequestHandler);
        this.connectionId = Preconditions.checkNotNull(connectionId);
        this.clientFactory = Preconditions.checkNotNull(clientFactory);
    }

    boolean disposeIfNotUsed() {
        return this.closeReferenceCounter.disposeIfNotUsed();
    }

    boolean incrementReferenceCounter() {
        return this.closeReferenceCounter.increment();
    }

    public ChannelFuture requestSubpartition(ResultPartitionID partitionId, int subpartitionIndex, final RemoteInputChannel inputChannel, int delayMs) throws IOException {
        this.checkNotClosed();
        LOG.debug("Requesting subpartition {} of partition {} with {} ms delay.", subpartitionIndex, partitionId, delayMs);
        this.partitionRequestHandler.addInputChannel(inputChannel);
        final NettyMessage.PartitionRequest request = new NettyMessage.PartitionRequest(partitionId, subpartitionIndex, inputChannel.getInputChannelId());
        final ChannelFutureListener listener = new ChannelFutureListener(){

            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    PartitionRequestClient.this.partitionRequestHandler.removeInputChannel(inputChannel);
                    inputChannel.onError(new LocalTransportException("Sending the partition request failed.", future.channel().localAddress(), future.cause()));
                }
            }
        };
        if (delayMs == 0) {
            ChannelFuture f = this.tcpChannel.writeAndFlush((Object)request);
            f.addListener((GenericFutureListener)listener);
            return f;
        }
        final ChannelFuture[] f = new ChannelFuture[1];
        this.tcpChannel.eventLoop().schedule(new Runnable(){

            @Override
            public void run() {
                f[0] = PartitionRequestClient.this.tcpChannel.writeAndFlush((Object)request);
                f[0].addListener((GenericFutureListener)listener);
            }
        }, (long)delayMs, TimeUnit.MILLISECONDS);
        return f[0];
    }

    public void sendTaskEvent(ResultPartitionID partitionId, TaskEvent event, final RemoteInputChannel inputChannel) throws IOException {
        this.checkNotClosed();
        this.tcpChannel.writeAndFlush((Object)new NettyMessage.TaskEventRequest(event, partitionId, inputChannel.getInputChannelId())).addListener((GenericFutureListener)new ChannelFutureListener(){

            public void operationComplete(ChannelFuture future) throws Exception {
                if (!future.isSuccess()) {
                    inputChannel.onError(new LocalTransportException("Sending the task event failed.", future.channel().localAddress(), future.cause()));
                }
            }
        });
    }

    public void close(RemoteInputChannel inputChannel) throws IOException {
        this.partitionRequestHandler.removeInputChannel(inputChannel);
        if (this.closeReferenceCounter.decrement()) {
            this.tcpChannel.writeAndFlush((Object)new NettyMessage.CloseRequest()).addListener((GenericFutureListener)ChannelFutureListener.CLOSE_ON_FAILURE);
            this.clientFactory.destroyPartitionRequestClient(this.connectionId, this);
        } else {
            this.partitionRequestHandler.cancelRequestFor(inputChannel.getInputChannelId());
        }
    }

    private void checkNotClosed() throws IOException {
        if (this.closeReferenceCounter.isDisposed()) {
            throw new LocalTransportException("Channel closed.", this.tcpChannel.localAddress());
        }
    }
}

