/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.graph;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.apache.flink.api.common.operators.util.UserCodeObjectWrapper;
import org.apache.flink.api.common.operators.util.UserCodeWrapper;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
import org.apache.flink.runtime.jobgraph.InputFormatVertex;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobVertex;
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.ScheduleMode;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.jobgraph.tasks.JobSnapshottingSettings;
import org.apache.flink.runtime.jobmanager.scheduler.CoLocationGroup;
import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
import org.apache.flink.runtime.operators.util.TaskConfig;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.graph.StreamEdge;
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.graph.StreamNode;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
import org.apache.flink.streaming.runtime.tasks.StreamIterationHead;
import org.apache.flink.streaming.runtime.tasks.StreamIterationTail;
import org.apache.flink.util.InstantiationUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamingJobGraphGenerator {
    private static final Logger LOG = LoggerFactory.getLogger(StreamingJobGraphGenerator.class);
    private StreamGraph streamGraph;
    private Map<Integer, JobVertex> jobVertices;
    private JobGraph jobGraph;
    private Collection<Integer> builtVertices;
    private List<StreamEdge> physicalEdgesInOrder;
    private Map<Integer, Map<Integer, StreamConfig>> chainedConfigs;
    private Map<Integer, StreamConfig> vertexConfigs;
    private Map<Integer, String> chainedNames;

    public StreamingJobGraphGenerator(StreamGraph streamGraph) {
        this.streamGraph = streamGraph;
    }

    private void init() {
        this.jobVertices = new HashMap<Integer, JobVertex>();
        this.builtVertices = new HashSet<Integer>();
        this.chainedConfigs = new HashMap<Integer, Map<Integer, StreamConfig>>();
        this.vertexConfigs = new HashMap<Integer, StreamConfig>();
        this.chainedNames = new HashMap<Integer, String>();
        this.physicalEdgesInOrder = new ArrayList<StreamEdge>();
    }

    public JobGraph createJobGraph(String jobName) {
        this.jobGraph = new JobGraph(jobName);
        this.jobGraph.setScheduleMode(ScheduleMode.ALL);
        this.init();
        this.setChaining();
        this.setPhysicalEdges();
        this.setSlotSharing();
        this.configureCheckpointing();
        this.configureExecutionRetries();
        this.configureExecutionRetryDelay();
        try {
            InstantiationUtil.writeObjectToConfig((Object)this.streamGraph.getExecutionConfig(), (Configuration)this.jobGraph.getJobConfiguration(), (String)"runtime.config");
        }
        catch (IOException e) {
            throw new RuntimeException("Config object could not be written to Job Configuration: ", e);
        }
        return this.jobGraph;
    }

    private void setPhysicalEdges() {
        HashMap<Integer, ArrayList<StreamEdge>> physicalInEdgesInOrder = new HashMap<Integer, ArrayList<StreamEdge>>();
        for (StreamEdge streamEdge : this.physicalEdgesInOrder) {
            int target = streamEdge.getTargetId();
            ArrayList<StreamEdge> inEdges = (ArrayList<StreamEdge>)physicalInEdgesInOrder.get(target);
            if (inEdges == null) {
                inEdges = new ArrayList<StreamEdge>();
                physicalInEdgesInOrder.put(target, inEdges);
            }
            inEdges.add(streamEdge);
        }
        for (Map.Entry entry : physicalInEdgesInOrder.entrySet()) {
            int vertex = (Integer)entry.getKey();
            List edgeList = (List)entry.getValue();
            this.vertexConfigs.get(vertex).setInPhysicalEdges(edgeList);
        }
    }

    private void setChaining() {
        for (Integer sourceName : this.streamGraph.getSourceIDs()) {
            this.createChain(sourceName, sourceName);
        }
    }

    private List<StreamEdge> createChain(Integer startNode, Integer current) {
        if (!this.builtVertices.contains(startNode)) {
            ArrayList<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>();
            ArrayList<StreamEdge> chainableOutputs = new ArrayList<StreamEdge>();
            ArrayList<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();
            for (StreamEdge streamEdge : this.streamGraph.getStreamNode(current).getOutEdges()) {
                if (this.isChainable(streamEdge)) {
                    chainableOutputs.add(streamEdge);
                    continue;
                }
                nonChainableOutputs.add(streamEdge);
            }
            for (StreamEdge streamEdge : chainableOutputs) {
                transitiveOutEdges.addAll(this.createChain(startNode, streamEdge.getTargetId()));
            }
            for (StreamEdge streamEdge : nonChainableOutputs) {
                transitiveOutEdges.add(streamEdge);
                this.createChain(streamEdge.getTargetId(), streamEdge.getTargetId());
            }
            this.chainedNames.put(current, this.createChainedName(current, chainableOutputs));
            StreamConfig config = current.equals(startNode) ? this.createProcessingVertex(startNode) : new StreamConfig(new Configuration());
            this.setVertexConfig(current, config, chainableOutputs, nonChainableOutputs);
            if (current.equals(startNode)) {
                config.setChainStart();
                config.setOutEdgesInOrder(transitiveOutEdges);
                config.setOutEdges(this.streamGraph.getStreamNode(current).getOutEdges());
                for (StreamEdge edge : transitiveOutEdges) {
                    this.connect(startNode, edge);
                }
                config.setTransitiveChainedTaskConfigs(this.chainedConfigs.get(startNode));
            } else {
                Map<Integer, StreamConfig> map = this.chainedConfigs.get(startNode);
                if (map == null) {
                    this.chainedConfigs.put(startNode, new HashMap());
                }
                this.chainedConfigs.get(startNode).put(current, config);
            }
            return transitiveOutEdges;
        }
        return new ArrayList<StreamEdge>();
    }

    private String createChainedName(Integer vertexID, List<StreamEdge> chainedOutputs) {
        String operatorName = this.streamGraph.getStreamNode(vertexID).getOperatorName();
        if (chainedOutputs.size() > 1) {
            ArrayList<String> outputChainedNames = new ArrayList<String>();
            for (StreamEdge chainable : chainedOutputs) {
                outputChainedNames.add(this.chainedNames.get(chainable.getTargetId()));
            }
            return operatorName + " -> (" + StringUtils.join(outputChainedNames, (String)", ") + ")";
        }
        if (chainedOutputs.size() == 1) {
            return operatorName + " -> " + this.chainedNames.get(chainedOutputs.get(0).getTargetId());
        }
        return operatorName;
    }

    private StreamConfig createProcessingVertex(Integer vertexID) {
        InputFormatVertex jobVertex;
        StreamNode vertex = this.streamGraph.getStreamNode(vertexID);
        if (vertex.getInputFormat() != null) {
            jobVertex = new InputFormatVertex(this.chainedNames.get(vertexID));
            TaskConfig taskConfig = new TaskConfig(jobVertex.getConfiguration());
            taskConfig.setStubWrapper((UserCodeWrapper)new UserCodeObjectWrapper(vertex.getInputFormat()));
        } else {
            jobVertex = new JobVertex(this.chainedNames.get(vertexID));
        }
        jobVertex.setInvokableClass(vertex.getJobVertexClass());
        int parallelism = vertex.getParallelism();
        if (parallelism > 0) {
            jobVertex.setParallelism(parallelism);
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("Parallelism set: {} for {}", (Object)parallelism, (Object)vertexID);
        }
        this.jobVertices.put(vertexID, (JobVertex)jobVertex);
        this.builtVertices.add(vertexID);
        this.jobGraph.addVertex((JobVertex)jobVertex);
        return new StreamConfig(jobVertex.getConfiguration());
    }

    private void setVertexConfig(Integer vertexID, StreamConfig config, List<StreamEdge> chainableOutputs, List<StreamEdge> nonChainableOutputs) {
        StreamNode vertex = this.streamGraph.getStreamNode(vertexID);
        config.setVertexID(vertexID);
        config.setBufferTimeout(vertex.getBufferTimeout());
        config.setTypeSerializerIn1(vertex.getTypeSerializerIn1());
        config.setTypeSerializerIn2(vertex.getTypeSerializerIn2());
        config.setTypeSerializerOut(vertex.getTypeSerializerOut());
        config.setStreamOperator(vertex.getOperator());
        config.setOutputSelectorWrapper(vertex.getOutputSelectorWrapper());
        config.setNumberOfOutputs(nonChainableOutputs.size());
        config.setNonChainedOutputs(nonChainableOutputs);
        config.setChainedOutputs(chainableOutputs);
        config.setCheckpointingEnabled(this.streamGraph.isCheckpointingEnabled());
        if (this.streamGraph.isCheckpointingEnabled()) {
            config.setCheckpointMode(this.streamGraph.getCheckpointingMode());
            config.setStateBackend(this.streamGraph.getStateBackend());
        } else {
            config.setCheckpointMode(CheckpointingMode.AT_LEAST_ONCE);
        }
        config.setStatePartitioner(vertex.getStatePartitioner());
        config.setStateKeySerializer(vertex.getStateKeySerializer());
        Class<? extends AbstractInvokable> vertexClass = vertex.getJobVertexClass();
        if (vertexClass.equals(StreamIterationHead.class) || vertexClass.equals(StreamIterationTail.class)) {
            config.setIterationId(this.streamGraph.getBrokerID(vertexID));
            config.setIterationWaitTime(this.streamGraph.getLoopTimeout(vertexID));
        }
        ArrayList<StreamEdge> allOutputs = new ArrayList<StreamEdge>(chainableOutputs);
        allOutputs.addAll(nonChainableOutputs);
        this.vertexConfigs.put(vertexID, config);
    }

    private void connect(Integer headOfChain, StreamEdge edge) {
        this.physicalEdgesInOrder.add(edge);
        Integer downStreamvertexID = edge.getTargetId();
        JobVertex headVertex = this.jobVertices.get(headOfChain);
        JobVertex downStreamVertex = this.jobVertices.get(downStreamvertexID);
        StreamConfig downStreamConfig = new StreamConfig(downStreamVertex.getConfiguration());
        downStreamConfig.setNumberOfInputs(downStreamConfig.getNumberOfInputs() + 1);
        StreamPartitioner<?> partitioner = edge.getPartitioner();
        if (partitioner instanceof ForwardPartitioner) {
            downStreamVertex.connectNewDataSetAsInput(headVertex, DistributionPattern.POINTWISE);
        } else {
            downStreamVertex.connectNewDataSetAsInput(headVertex, DistributionPattern.ALL_TO_ALL);
        }
        if (LOG.isDebugEnabled()) {
            LOG.debug("CONNECTED: {} - {} -> {}", new Object[]{partitioner.getClass().getSimpleName(), headOfChain, downStreamvertexID});
        }
    }

    private boolean isChainable(StreamEdge edge) {
        StreamNode upStreamVertex = edge.getSourceVertex();
        StreamNode downStreamVertex = edge.getTargetVertex();
        StreamOperator<?> headOperator = upStreamVertex.getOperator();
        StreamOperator<?> outOperator = downStreamVertex.getOperator();
        return !(downStreamVertex.getInEdges().size() != 1 || outOperator == null || headOperator == null || upStreamVertex.getSlotSharingID() != downStreamVertex.getSlotSharingID() || upStreamVertex.getSlotSharingID() == -1 || outOperator.getChainingStrategy() != ChainingStrategy.ALWAYS && outOperator.getChainingStrategy() != ChainingStrategy.FORCE_ALWAYS || headOperator.getChainingStrategy() != ChainingStrategy.HEAD && headOperator.getChainingStrategy() != ChainingStrategy.ALWAYS && headOperator.getChainingStrategy() != ChainingStrategy.FORCE_ALWAYS || !(edge.getPartitioner() instanceof ForwardPartitioner) && downStreamVertex.getParallelism() != 1 || upStreamVertex.getParallelism() != downStreamVertex.getParallelism() || !this.streamGraph.isChainingEnabled() && outOperator.getChainingStrategy() != ChainingStrategy.FORCE_ALWAYS);
    }

    private void setSlotSharing() {
        HashMap<Integer, SlotSharingGroup> slotSharingGroups = new HashMap<Integer, SlotSharingGroup>();
        for (Map.Entry<Integer, JobVertex> entry : this.jobVertices.entrySet()) {
            int slotSharingID = this.streamGraph.getStreamNode(entry.getKey()).getSlotSharingID();
            if (slotSharingID == -1) continue;
            SlotSharingGroup group = (SlotSharingGroup)slotSharingGroups.get(slotSharingID);
            if (group == null) {
                group = new SlotSharingGroup();
                slotSharingGroups.put(slotSharingID, group);
            }
            entry.getValue().setSlotSharingGroup(group);
        }
        for (Tuple2 tuple2 : this.streamGraph.getIterationSourceSinkPairs()) {
            CoLocationGroup ccg = new CoLocationGroup();
            JobVertex source = this.jobVertices.get(((StreamNode)tuple2.f0).getId());
            JobVertex sink = this.jobVertices.get(((StreamNode)tuple2.f1).getId());
            ccg.addVertex(source);
            ccg.addVertex(sink);
            source.updateCoLocationGroup(ccg);
            sink.updateCoLocationGroup(ccg);
        }
    }

    private void configureCheckpointing() {
        if (this.streamGraph.isCheckpointingEnabled()) {
            long interval = this.streamGraph.getCheckpointingInterval();
            if (interval < 1L) {
                throw new IllegalArgumentException("The checkpoint interval must be positive");
            }
            ArrayList<JobVertexID> triggerVertices = new ArrayList<JobVertexID>();
            ArrayList<JobVertexID> ackVertices = new ArrayList<JobVertexID>(this.jobVertices.size());
            ArrayList<JobVertexID> commitVertices = new ArrayList<JobVertexID>();
            for (JobVertex vertex : this.jobVertices.values()) {
                if (vertex.isInputVertex()) {
                    triggerVertices.add(vertex.getID());
                }
                commitVertices.add(vertex.getID());
                ackVertices.add(vertex.getID());
            }
            JobSnapshottingSettings settings = new JobSnapshottingSettings(triggerVertices, ackVertices, commitVertices, interval);
            this.jobGraph.setSnapshotSettings(settings);
            int executionRetries = this.streamGraph.getExecutionConfig().getNumberOfExecutionRetries();
            if (executionRetries == -1) {
                this.streamGraph.getExecutionConfig().setNumberOfExecutionRetries(Integer.MAX_VALUE);
            }
        }
    }

    private void configureExecutionRetries() {
        int executionRetries = this.streamGraph.getExecutionConfig().getNumberOfExecutionRetries();
        this.jobGraph.setNumberOfExecutionRetries(executionRetries);
    }

    private void configureExecutionRetryDelay() {
        long executionRetryDelay = this.streamGraph.getExecutionConfig().getExecutionRetryDelay();
        this.jobGraph.setExecutionRetryDelay(executionRetryDelay);
    }
}

