/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.executiongraph;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.dispatch.Futures;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.JobException;
import org.apache.flink.runtime.akka.AkkaUtils;
import org.apache.flink.runtime.blob.BlobKey;
import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.AllVerticesIterator;
import org.apache.flink.runtime.executiongraph.Execution;
import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.executiongraph.IntermediateResult;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobStatus;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobmanager.scheduler.Scheduler;
import org.apache.flink.runtime.messages.ExecutionGraphMessages;
import org.apache.flink.runtime.taskmanager.TaskExecutionState;
import org.apache.flink.runtime.util.SerializableObject;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.InstantiationUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.concurrent.ExecutionContext;
import scala.concurrent.duration.FiniteDuration;

public class ExecutionGraph
implements Serializable {
    private static final long serialVersionUID = 42L;
    private static final AtomicReferenceFieldUpdater<ExecutionGraph, JobStatus> STATE_UPDATER = AtomicReferenceFieldUpdater.newUpdater(ExecutionGraph.class, JobStatus.class, "state");
    static final Logger LOG = LoggerFactory.getLogger(ExecutionGraph.class);
    private static final int NUMBER_OF_SUCCESSFUL_CHECKPOINTS_TO_RETAIN = 1;
    private final SerializableObject progressLock = new SerializableObject();
    private final JobID jobID;
    private final String jobName;
    private final Configuration jobConfiguration;
    private final ConcurrentHashMap<JobVertexID, ExecutionJobVertex> tasks;
    private final List<ExecutionJobVertex> verticesInCreationOrder;
    private final ConcurrentHashMap<IntermediateDataSetID, IntermediateResult> intermediateResults;
    private final ConcurrentHashMap<ExecutionAttemptID, Execution> currentExecutions;
    private final List<BlobKey> requiredJarFiles;
    private final List<ActorRef> jobStatusListenerActors;
    private final List<ActorRef> executionListenerActors;
    private final long[] stateTimestamps;
    private final FiniteDuration timeout;
    private int numberOfRetriesLeft;
    private long delayBeforeRetrying;
    private boolean allowQueuedScheduling = false;
    private ScheduleMode scheduleMode = ScheduleMode.FROM_SOURCES;
    private boolean snapshotCheckpointsEnabled;
    private volatile JobStatus state = JobStatus.CREATED;
    private volatile Throwable failureCause;
    private volatile int numFinishedJobVertices;
    private Scheduler scheduler;
    private ClassLoader userClassLoader;
    private CheckpointCoordinator checkpointCoordinator;
    private ExecutionConfig executionConfig;

    ExecutionGraph(JobID jobId, String jobName, Configuration jobConfig, FiniteDuration timeout) {
        this(jobId, jobName, jobConfig, timeout, new ArrayList<BlobKey>(), ExecutionGraph.class.getClassLoader());
    }

    public ExecutionGraph(JobID jobId, String jobName, Configuration jobConfig, FiniteDuration timeout, List<BlobKey> requiredJarFiles, ClassLoader userClassLoader) {
        if (jobId == null || jobName == null || jobConfig == null || userClassLoader == null) {
            throw new NullPointerException();
        }
        this.jobID = jobId;
        this.jobName = jobName;
        this.jobConfiguration = jobConfig;
        this.userClassLoader = userClassLoader;
        this.tasks = new ConcurrentHashMap();
        this.intermediateResults = new ConcurrentHashMap();
        this.verticesInCreationOrder = new ArrayList<ExecutionJobVertex>();
        this.currentExecutions = new ConcurrentHashMap();
        this.jobStatusListenerActors = new CopyOnWriteArrayList<ActorRef>();
        this.executionListenerActors = new CopyOnWriteArrayList<ActorRef>();
        this.stateTimestamps = new long[JobStatus.values().length];
        this.stateTimestamps[JobStatus.CREATED.ordinal()] = System.currentTimeMillis();
        this.requiredJarFiles = requiredJarFiles;
        this.timeout = timeout;
    }

    public void setNumberOfRetriesLeft(int numberOfRetriesLeft) {
        if (numberOfRetriesLeft < -1) {
            throw new IllegalArgumentException();
        }
        this.numberOfRetriesLeft = numberOfRetriesLeft;
    }

    public int getNumberOfRetriesLeft() {
        return this.numberOfRetriesLeft;
    }

    public void setDelayBeforeRetrying(long delayBeforeRetrying) {
        if (delayBeforeRetrying < 0L) {
            throw new IllegalArgumentException("Delay before retry must be non-negative.");
        }
        this.delayBeforeRetrying = delayBeforeRetrying;
    }

    public long getDelayBeforeRetrying() {
        return this.delayBeforeRetrying;
    }

    public boolean isQueuedSchedulingAllowed() {
        return this.allowQueuedScheduling;
    }

    public void setQueuedSchedulingAllowed(boolean allowed) {
        this.allowQueuedScheduling = allowed;
    }

    public void setScheduleMode(ScheduleMode scheduleMode) {
        this.scheduleMode = scheduleMode;
    }

    public ScheduleMode getScheduleMode() {
        return this.scheduleMode;
    }

    public void enableSnaphotCheckpointing(long interval, long checkpointTimeout, List<ExecutionJobVertex> verticesToTrigger, List<ExecutionJobVertex> verticesToWaitFor, List<ExecutionJobVertex> verticesToCommitTo, ActorSystem actorSystem) {
        if (interval < 10L || checkpointTimeout < 10L) {
            throw new IllegalArgumentException();
        }
        if (this.state != JobStatus.CREATED) {
            throw new IllegalStateException("Job must be in CREATED state");
        }
        ExecutionVertex[] tasksToTrigger = this.collectExecutionVertices(verticesToTrigger);
        ExecutionVertex[] tasksToWaitFor = this.collectExecutionVertices(verticesToWaitFor);
        ExecutionVertex[] tasksToCommitTo = this.collectExecutionVertices(verticesToCommitTo);
        this.disableSnaphotCheckpointing();
        this.snapshotCheckpointsEnabled = true;
        this.checkpointCoordinator = new CheckpointCoordinator(this.jobID, 1, checkpointTimeout, tasksToTrigger, tasksToWaitFor, tasksToCommitTo, this.userClassLoader);
        this.registerJobStatusListener(this.checkpointCoordinator.createJobStatusListener(actorSystem, interval));
    }

    public void disableSnaphotCheckpointing() {
        if (this.state != JobStatus.CREATED) {
            throw new IllegalStateException("Job must be in CREATED state");
        }
        this.snapshotCheckpointsEnabled = false;
        if (this.checkpointCoordinator != null) {
            this.checkpointCoordinator.shutdown();
            this.checkpointCoordinator = null;
        }
    }

    public boolean isSnapshotCheckpointsEnabled() {
        return this.snapshotCheckpointsEnabled;
    }

    public CheckpointCoordinator getCheckpointCoordinator() {
        return this.checkpointCoordinator;
    }

    private ExecutionVertex[] collectExecutionVertices(List<ExecutionJobVertex> jobVertices) {
        if (jobVertices.size() == 1) {
            ExecutionJobVertex jv = jobVertices.get(0);
            if (jv.getGraph() != this) {
                throw new IllegalArgumentException("Can only use ExecutionJobVertices of this ExecutionGraph");
            }
            return jv.getTaskVertices();
        }
        ArrayList<ExecutionVertex> all = new ArrayList<ExecutionVertex>();
        for (ExecutionJobVertex jv : jobVertices) {
            if (jv.getGraph() != this) {
                throw new IllegalArgumentException("Can only use ExecutionJobVertices of this ExecutionGraph");
            }
            all.addAll(Arrays.asList(jv.getTaskVertices()));
        }
        return all.toArray(new ExecutionVertex[all.size()]);
    }

    public List<BlobKey> getRequiredJarFiles() {
        return this.requiredJarFiles;
    }

    public Scheduler getScheduler() {
        return this.scheduler;
    }

    public JobID getJobID() {
        return this.jobID;
    }

    public String getJobName() {
        return this.jobName;
    }

    public Configuration getJobConfiguration() {
        return this.jobConfiguration;
    }

    public ClassLoader getUserClassLoader() {
        return this.userClassLoader;
    }

    public JobStatus getState() {
        return this.state;
    }

    public Throwable getFailureCause() {
        return this.failureCause;
    }

    public ExecutionJobVertex getJobVertex(JobVertexID id) {
        return this.tasks.get((Object)id);
    }

    public Map<JobVertexID, ExecutionJobVertex> getAllVertices() {
        return Collections.unmodifiableMap(this.tasks);
    }

    public Iterable<ExecutionJobVertex> getVerticesTopologically() {
        final int numElements = this.verticesInCreationOrder.size();
        return new Iterable<ExecutionJobVertex>(){

            @Override
            public Iterator<ExecutionJobVertex> iterator() {
                return new Iterator<ExecutionJobVertex>(){
                    private int pos = 0;

                    @Override
                    public boolean hasNext() {
                        return this.pos < numElements;
                    }

                    @Override
                    public ExecutionJobVertex next() {
                        if (this.hasNext()) {
                            return (ExecutionJobVertex)ExecutionGraph.this.verticesInCreationOrder.get(this.pos++);
                        }
                        throw new NoSuchElementException();
                    }

                    @Override
                    public void remove() {
                        throw new UnsupportedOperationException();
                    }
                };
            }
        };
    }

    public Map<IntermediateDataSetID, IntermediateResult> getAllIntermediateResults() {
        return Collections.unmodifiableMap(this.intermediateResults);
    }

    public Iterable<ExecutionVertex> getAllExecutionVertices() {
        return new Iterable<ExecutionVertex>(){

            @Override
            public Iterator<ExecutionVertex> iterator() {
                return new AllVerticesIterator(ExecutionGraph.this.getVerticesTopologically().iterator());
            }
        };
    }

    public long getStatusTimestamp(JobStatus status) {
        return this.stateTimestamps[status.ordinal()];
    }

    public void attachJobGraph(List<JobVertex> topologiallySorted) throws JobException {
        if (LOG.isDebugEnabled()) {
            LOG.debug(String.format("Attaching %d topologically sorted vertices to existing job graph with %d vertices and %d intermediate results.", topologiallySorted.size(), this.tasks.size(), this.intermediateResults.size()));
        }
        long createTimestamp = System.currentTimeMillis();
        for (JobVertex jobVertex : topologiallySorted) {
            ExecutionJobVertex ejv = new ExecutionJobVertex(this, jobVertex, 1, this.timeout, createTimestamp);
            ejv.connectToPredecessors(this.intermediateResults);
            ExecutionJobVertex previousTask = this.tasks.putIfAbsent(jobVertex.getID(), ejv);
            if (previousTask != null) {
                throw new JobException(String.format("Encountered two job vertices with ID %s : previous=[%s] / new=[%s]", new Object[]{jobVertex.getID(), ejv, previousTask}));
            }
            for (IntermediateResult res : ejv.getProducedDataSets()) {
                IntermediateResult previousDataSet = this.intermediateResults.putIfAbsent(res.getId(), res);
                if (previousDataSet == null) continue;
                throw new JobException(String.format("Encountered two intermediate data set with ID %s : previous=[%s] / new=[%s]", new Object[]{res.getId(), res, previousDataSet}));
            }
            this.verticesInCreationOrder.add(ejv);
        }
    }

    public void scheduleForExecution(Scheduler scheduler) throws JobException {
        block10: {
            block9: {
                if (scheduler == null) {
                    throw new IllegalArgumentException("Scheduler must not be null.");
                }
                if (this.scheduler != null && this.scheduler != scheduler) {
                    throw new IllegalArgumentException("Cannot use different schedulers for the same job");
                }
                if (!this.transitionState(JobStatus.CREATED, JobStatus.RUNNING)) break block9;
                this.scheduler = scheduler;
                switch (this.scheduleMode) {
                    case FROM_SOURCES: {
                        for (ExecutionJobVertex ejv : this.tasks.values()) {
                            if (!ejv.getJobVertex().isInputVertex()) continue;
                            ejv.scheduleAll(scheduler, this.allowQueuedScheduling);
                        }
                        break block10;
                    }
                    case ALL: {
                        for (ExecutionJobVertex ejv : this.getVerticesTopologically()) {
                            ejv.scheduleAll(scheduler, this.allowQueuedScheduling);
                        }
                        break block10;
                    }
                    case BACKTRACKING: {
                        throw new JobException("BACKTRACKING is currently not supported as schedule mode.");
                    }
                }
                break block10;
            }
            throw new IllegalStateException("Job may only be scheduled from state " + (Object)((Object)JobStatus.CREATED));
        }
    }

    public void cancel() {
        JobStatus current;
        while ((current = this.state) == JobStatus.RUNNING || current == JobStatus.CREATED) {
            if (!this.transitionState(current, JobStatus.CANCELLING)) continue;
            for (ExecutionJobVertex ejv : this.verticesInCreationOrder) {
                ejv.cancel();
            }
            return;
        }
    }

    public void fail(Throwable t) {
        JobStatus current;
        do {
            if ((current = this.state) != JobStatus.FAILED && current != JobStatus.FAILING) continue;
            return;
        } while (!this.transitionState(current, JobStatus.FAILING, t));
        this.failureCause = t;
        if (!this.verticesInCreationOrder.isEmpty()) {
            for (ExecutionJobVertex ejv : this.verticesInCreationOrder) {
                ejv.cancel();
            }
        } else {
            this.transitionState(JobStatus.FAILING, JobStatus.FAILED, t);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void restart() {
        try {
            if (this.state == JobStatus.FAILED && !this.transitionState(JobStatus.FAILED, JobStatus.RESTARTING)) {
                throw new IllegalStateException("Execution Graph left the state FAILED while trying to restart.");
            }
            SerializableObject serializableObject = this.progressLock;
            synchronized (serializableObject) {
                if (this.state != JobStatus.RESTARTING) {
                    throw new IllegalStateException("Can only restart job from state restarting.");
                }
                if (this.scheduler == null) {
                    throw new IllegalStateException("The execution graph has not been scheduled before - scheduler is null.");
                }
                this.currentExecutions.clear();
                for (ExecutionJobVertex jv : this.verticesInCreationOrder) {
                    jv.resetForNewExecution();
                }
                for (int i = 0; i < this.stateTimestamps.length; ++i) {
                    this.stateTimestamps[i] = 0L;
                }
                this.numFinishedJobVertices = 0;
                this.transitionState(JobStatus.RESTARTING, JobStatus.CREATED);
                if (this.checkpointCoordinator != null) {
                    this.checkpointCoordinator.restoreLatestCheckpointedState(this.getAllVertices(), false, false);
                }
            }
            this.scheduleForExecution(this.scheduler);
        }
        catch (Throwable t) {
            this.fail(t);
        }
    }

    public void prepareForArchiving() {
        if (!this.state.isTerminalState()) {
            throw new IllegalStateException("Can only archive the job from a terminal state");
        }
        try {
            this.executionConfig = (ExecutionConfig)InstantiationUtil.readObjectFromConfig((Configuration)this.jobConfiguration, (String)"runtime.config", (ClassLoader)this.userClassLoader);
        }
        catch (Exception e) {
            LOG.warn("Error deserializing the execution config while archiving the execution graph", (Throwable)e);
        }
        this.userClassLoader = null;
        this.scheduler = null;
        this.checkpointCoordinator = null;
        for (ExecutionJobVertex vertex : this.verticesInCreationOrder) {
            vertex.prepareForArchiving();
        }
        this.intermediateResults.clear();
        this.currentExecutions.clear();
        this.requiredJarFiles.clear();
        this.jobStatusListenerActors.clear();
        this.executionListenerActors.clear();
    }

    public ExecutionConfig getExecutionConfig() {
        return this.executionConfig;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void waitUntilFinished() throws InterruptedException {
        SerializableObject serializableObject = this.progressLock;
        synchronized (serializableObject) {
            while (!this.state.isTerminalState()) {
                this.progressLock.wait();
            }
        }
    }

    private boolean transitionState(JobStatus current, JobStatus newState) {
        return this.transitionState(current, newState, null);
    }

    private boolean transitionState(JobStatus current, JobStatus newState, Throwable error) {
        if (STATE_UPDATER.compareAndSet(this, current, newState)) {
            if (LOG.isDebugEnabled()) {
                LOG.debug("{} switched from {} to {}.", new Object[]{this.getJobName(), current, newState});
            }
            this.stateTimestamps[newState.ordinal()] = System.currentTimeMillis();
            this.notifyJobStatusChange(newState, error);
            return true;
        }
        return false;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    void jobVertexInFinalState(ExecutionJobVertex ev) {
        SerializableObject serializableObject = this.progressLock;
        synchronized (serializableObject) {
            if (this.numFinishedJobVertices >= this.verticesInCreationOrder.size()) {
                throw new IllegalStateException("All vertices are already finished, cannot transition vertex to finished.");
            }
            ++this.numFinishedJobVertices;
            if (this.numFinishedJobVertices == this.verticesInCreationOrder.size()) {
                while (true) {
                    JobStatus current;
                    if ((current = this.state) == JobStatus.RUNNING) {
                        if (!this.transitionState(current, JobStatus.FINISHED)) continue;
                        this.postRunCleanup();
                        break;
                    }
                    if (current == JobStatus.CANCELLING) {
                        if (!this.transitionState(current, JobStatus.CANCELED)) continue;
                        this.postRunCleanup();
                        break;
                    }
                    if (current == JobStatus.FAILING) {
                        if (this.numberOfRetriesLeft > 0 && this.transitionState(current, JobStatus.RESTARTING)) {
                            --this.numberOfRetriesLeft;
                            Futures.future((Callable)new Callable<Object>(){

                                @Override
                                public Object call() throws Exception {
                                    try {
                                        LOG.info("Delaying retry of job execution for {} ms ...", (Object)ExecutionGraph.this.delayBeforeRetrying);
                                        Thread.sleep(ExecutionGraph.this.delayBeforeRetrying);
                                    }
                                    catch (InterruptedException interruptedException) {
                                        // empty catch block
                                    }
                                    ExecutionGraph.this.restart();
                                    return null;
                                }
                            }, (ExecutionContext)AkkaUtils.globalExecutionContext());
                            break;
                        }
                        if (this.numberOfRetriesLeft > 0 || !this.transitionState(current, JobStatus.FAILED, this.failureCause)) continue;
                        this.postRunCleanup();
                        break;
                    }
                    this.fail(new Exception("ExecutionGraph went into final state from state " + (Object)((Object)current)));
                }
                this.progressLock.notifyAll();
            }
        }
    }

    private void postRunCleanup() {
        try {
            CheckpointCoordinator coord = this.checkpointCoordinator;
            this.checkpointCoordinator = null;
            if (coord != null) {
                coord.shutdown();
            }
        }
        catch (Exception e) {
            LOG.error("Error while cleaning up after execution", (Throwable)e);
        }
    }

    public boolean updateState(TaskExecutionState state) {
        Execution attempt = this.currentExecutions.get((Object)state.getID());
        if (attempt != null) {
            switch (state.getExecutionState()) {
                case RUNNING: {
                    return attempt.switchToRunning();
                }
                case FINISHED: {
                    attempt.markFinished();
                    return true;
                }
                case CANCELED: {
                    attempt.cancelingComplete();
                    return true;
                }
                case FAILED: {
                    attempt.markFailed(state.getError(this.userClassLoader));
                    return true;
                }
            }
            attempt.fail(new Exception("TaskManager sent illegal state update: " + (Object)((Object)state.getExecutionState())));
            return false;
        }
        return false;
    }

    public void scheduleOrUpdateConsumers(ResultPartitionID partitionId) {
        Execution execution = this.currentExecutions.get((Object)partitionId.getProducerId());
        if (execution == null) {
            this.fail(new IllegalStateException("Cannot find execution for execution ID " + (Object)((Object)partitionId.getPartitionId())));
        } else if (execution.getVertex() == null) {
            this.fail(new IllegalStateException("Execution with execution ID " + (Object)((Object)partitionId.getPartitionId()) + " has no vertex assigned."));
        } else {
            execution.getVertex().scheduleOrUpdateConsumers(partitionId);
        }
    }

    public Map<ExecutionAttemptID, Execution> getRegisteredExecutions() {
        return Collections.unmodifiableMap(this.currentExecutions);
    }

    void registerExecution(Execution exec) {
        Execution previous = this.currentExecutions.putIfAbsent(exec.getAttemptId(), exec);
        if (previous != null) {
            this.fail(new Exception("Trying to register execution " + exec + " for already used ID " + (Object)((Object)exec.getAttemptId())));
        }
    }

    void deregisterExecution(Execution exec) {
        Execution contained = this.currentExecutions.remove((Object)exec.getAttemptId());
        if (contained != null && contained != exec) {
            this.fail(new Exception("De-registering execution " + exec + " failed. Found for same ID execution " + contained));
        }
    }

    public void registerJobStatusListener(ActorRef listener) {
        if (listener != null) {
            this.jobStatusListenerActors.add(listener);
        }
    }

    public void registerExecutionListener(ActorRef listener) {
        if (listener != null) {
            this.executionListenerActors.add(listener);
        }
    }

    private void notifyJobStatusChange(JobStatus newState, Throwable error) {
        if (this.jobStatusListenerActors.size() > 0) {
            ExecutionGraphMessages.JobStatusChanged message = new ExecutionGraphMessages.JobStatusChanged(this.jobID, newState, System.currentTimeMillis(), error);
            for (ActorRef listener : this.jobStatusListenerActors) {
                listener.tell((Object)message, ActorRef.noSender());
            }
        }
    }

    void notifyExecutionChange(JobVertexID vertexId, int subtask, ExecutionAttemptID executionID, ExecutionState newExecutionState, Throwable error) {
        ExecutionJobVertex vertex = this.getJobVertex(vertexId);
        if (this.executionListenerActors.size() > 0) {
            String message = error == null ? null : ExceptionUtils.stringifyException((Throwable)error);
            ExecutionGraphMessages.ExecutionStateChanged actorMessage = new ExecutionGraphMessages.ExecutionStateChanged(this.jobID, vertexId, vertex.getJobVertex().getName(), vertex.getParallelism(), subtask, executionID, newExecutionState, System.currentTimeMillis(), message);
            for (ActorRef listener : this.executionListenerActors) {
                listener.tell((Object)actorMessage, ActorRef.noSender());
            }
        }
        if (newExecutionState == ExecutionState.FAILED) {
            this.fail(error);
        }
    }
}

