/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.client;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Address;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.actor.Status;
import akka.pattern.Patterns;
import akka.util.Timeout;
import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.UnknownHostException;
import java.util.concurrent.TimeoutException;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.client.JobClientActor;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.client.JobTimeoutException;
import org.apache.flink.runtime.client.SerializedJobExecutionResult;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.messages.JobClientMessages;
import org.apache.flink.runtime.messages.JobManagerMessages;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Option;
import scala.Some;
import scala.Tuple2;
import scala.concurrent.Await;
import scala.concurrent.Awaitable;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;

public class JobClient {
    private static final Logger LOG = LoggerFactory.getLogger(JobClient.class);

    public static ActorSystem startJobClientActorSystem(Configuration config) throws IOException {
        LOG.info("Starting JobClient actor system");
        Some remoting = new Some((Object)new Tuple2((Object)"", (Object)0));
        ActorSystem system = AkkaUtils.createActorSystem(config, (Option<Tuple2<String, Object>>)remoting);
        Address address = system.provider().getDefaultAddress();
        String host = address.host().isDefined() ? (String)address.host().get() : "(unknown)";
        int port = address.port().isDefined() ? (Integer)address.port().get() : -1;
        LOG.info("Started JobClient actor system at " + host + ':' + port);
        return system;
    }

    public static InetSocketAddress getJobManagerAddress(Configuration config) throws IOException {
        String jobManagerAddress = config.getString("jobmanager.rpc.address", null);
        int jobManagerRPCPort = config.getInteger("jobmanager.rpc.port", 6123);
        if (jobManagerAddress == null) {
            throw new RuntimeException("JobManager address has not been specified in the configuration.");
        }
        try {
            return new InetSocketAddress(InetAddress.getByName(jobManagerAddress), jobManagerRPCPort);
        }
        catch (UnknownHostException e) {
            throw new IOException("Cannot resolve JobManager hostname " + jobManagerAddress, e);
        }
    }

    /*
     * Unable to fully structure code
     */
    public static SerializedJobExecutionResult submitJobAndWait(ActorSystem actorSystem, ActorRef jobManager, JobGraph jobGraph, FiniteDuration timeout, boolean sysoutLogUpdates) throws JobExecutionException {
        if (actorSystem == null || jobManager == null || jobGraph == null || timeout == null) {
            throw new NullPointerException();
        }
        jobClientActorProps = Props.create(JobClientActor.class, (Object[])new Object[]{jobManager, JobClient.LOG, sysoutLogUpdates});
        jobClientActor = actorSystem.actorOf(jobClientActorProps);
        try {
            future = Patterns.ask((ActorRef)jobClientActor, (Object)new JobClientMessages.SubmitJobAndWait(jobGraph), (Timeout)new Timeout(AkkaUtils.INF_TIMEOUT()));
            answer = Await.result((Awaitable)future, (Duration)AkkaUtils.INF_TIMEOUT());
            if (!(answer instanceof JobManagerMessages.JobResultSuccess)) ** GOTO lbl16
            JobClient.LOG.info("Job execution complete");
            result = ((JobManagerMessages.JobResultSuccess)answer).result();
            if (result != null) {
                var10_13 = result;
                return var10_13;
            }
            try {
                throw new Exception("Job was successfully executed but result contained a null JobExecutionResult.");
lbl16:
                // 1 sources

                if (answer instanceof Status.Failure) {
                    throw ((Status.Failure)answer).cause();
                }
                throw new Exception("Unknown answer after submitting the job: " + answer);
            }
            catch (JobExecutionException e) {
                throw e;
            }
            catch (TimeoutException e) {
                throw new JobTimeoutException(jobGraph.getJobID(), "Timeout while waiting for JobManager answer. Job time exceeded " + AkkaUtils.INF_TIMEOUT(), e);
            }
            catch (Throwable t) {
                throw new JobExecutionException(jobGraph.getJobID(), "Communication with JobManager failed: " + t.getMessage(), t);
            }
        }
        finally {
            jobClientActor.tell((Object)PoisonPill.getInstance(), ActorRef.noSender());
        }
    }

    public static void submitJobDetached(ActorRef jobManager, JobGraph jobGraph, FiniteDuration timeout) throws JobExecutionException {
        block7: {
            if (jobManager == null || jobGraph == null || timeout == null) {
                throw new NullPointerException();
            }
            Future future = Patterns.ask((ActorRef)jobManager, (Object)new JobManagerMessages.SubmitJob(jobGraph, false), (Timeout)new Timeout(timeout));
            try {
                Object result = Await.result((Awaitable)future, (Duration)timeout);
                if (result instanceof JobID) {
                    JobID respondedID = (JobID)result;
                    if (!respondedID.equals((Object)jobGraph.getJobID())) {
                        throw new Exception("JobManager responded for wrong Job. This Job: " + jobGraph.getJobID() + ", response: " + respondedID);
                    }
                    break block7;
                }
                throw new Exception("Unexpected response: " + result);
            }
            catch (JobExecutionException e) {
                throw e;
            }
            catch (TimeoutException e) {
                throw new JobTimeoutException(jobGraph.getJobID(), "JobManager did not respond within " + timeout.toString(), e);
            }
            catch (Throwable t) {
                throw new JobExecutionException(jobGraph.getJobID(), "Failed to send job to JobManager: " + t.getMessage(), t.getCause());
            }
        }
    }

    public static void uploadJarFiles(JobGraph jobGraph, ActorRef jobManager, FiniteDuration timeout) throws IOException {
        if (jobGraph.hasUsercodeJarFiles()) {
            int port;
            Timeout tOut = new Timeout(timeout);
            Future futureBlobPort = Patterns.ask((ActorRef)jobManager, (Object)JobManagerMessages.getRequestBlobManagerPort(), (Timeout)tOut);
            try {
                Object result = Await.result((Awaitable)futureBlobPort, (Duration)timeout);
                if (!(result instanceof Integer)) {
                    throw new Exception("Expected port number (int) as answer, received " + result);
                }
                port = (Integer)result;
            }
            catch (Exception e) {
                throw new IOException("Could not retrieve the JobManager's blob port.", e);
            }
            Option jmHost = jobManager.path().address().host();
            String jmHostname = jmHost.isDefined() ? (String)jmHost.get() : "localhost";
            InetSocketAddress serverAddress = new InetSocketAddress(jmHostname, port);
            jobGraph.uploadRequiredJarFiles(serverAddress);
        }
    }
}

