/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators.windowing;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.OperatorState;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.typeutils.InputTypeConfigurable;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.runtime.state.StateBackend;
import org.apache.flink.runtime.state.StateHandle;
import org.apache.flink.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.TimestampedCollector;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.Triggerable;
import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBuffer;
import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory;
import org.apache.flink.streaming.runtime.streamrecord.MultiplexingStreamRecordSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTaskState;
import org.apache.flink.util.InstantiationUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class WindowOperator<K, IN, OUT, W extends Window>
extends AbstractUdfStreamOperator<OUT, WindowFunction<IN, OUT, K, W>>
implements OneInputStreamOperator<IN, OUT>,
Triggerable,
InputTypeConfigurable {
    private static final long serialVersionUID = 1L;
    private static final Logger LOG = LoggerFactory.getLogger(WindowOperator.class);
    private final WindowAssigner<? super IN, W> windowAssigner;
    private final KeySelector<IN, K> keySelector;
    private final Trigger<? super IN, ? super W> trigger;
    private final WindowBufferFactory<? super IN, ? extends WindowBuffer<IN>> windowBufferFactory;
    private boolean setProcessingTime = false;
    private TypeSerializer<IN> inputSerializer;
    private final TypeSerializer<K> keySerializer;
    private final TypeSerializer<W> windowSerializer;
    private transient Map<Long, Set<Context>> processingTimeTimers;
    private transient Map<Long, Set<Context>> watermarkTimers;
    protected transient TimestampedCollector<OUT> timestampedCollector;
    protected transient long currentWatermark = -1L;
    protected transient Map<K, Map<W, Context>> windows;

    public WindowOperator(WindowAssigner<? super IN, W> windowAssigner, TypeSerializer<W> windowSerializer, KeySelector<IN, K> keySelector, TypeSerializer<K> keySerializer, WindowBufferFactory<? super IN, ? extends WindowBuffer<IN>> windowBufferFactory, WindowFunction<IN, OUT, K, W> windowFunction, Trigger<? super IN, ? super W> trigger) {
        super(windowFunction);
        this.windowAssigner = Objects.requireNonNull(windowAssigner);
        this.windowSerializer = windowSerializer;
        this.keySelector = Objects.requireNonNull(keySelector);
        this.keySerializer = Objects.requireNonNull(keySerializer);
        this.windowBufferFactory = Objects.requireNonNull(windowBufferFactory);
        this.trigger = Objects.requireNonNull(trigger);
        this.setChainingStrategy(ChainingStrategy.ALWAYS);
    }

    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
        in.defaultReadObject();
        this.currentWatermark = -1L;
    }

    public final void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) {
        this.inputSerializer = type.createSerializer(executionConfig);
    }

    @Override
    public final void open() throws Exception {
        super.open();
        this.timestampedCollector = new TimestampedCollector(this.output);
        if (this.inputSerializer == null) {
            throw new IllegalStateException("Input serializer was not set.");
        }
        this.windowBufferFactory.setRuntimeContext((RuntimeContext)this.getRuntimeContext());
        this.windowBufferFactory.open(this.getUserFunctionParameters());
        if (this.watermarkTimers == null) {
            this.watermarkTimers = new HashMap<Long, Set<Context>>();
        }
        if (this.processingTimeTimers == null) {
            this.processingTimeTimers = new HashMap<Long, Set<Context>>();
        }
        if (this.windows == null) {
            this.windows = new HashMap<K, Map<W, Context>>();
        }
        for (Map.Entry<K, Map<W, Context>> entry : this.windows.entrySet()) {
            Map<W, Context> keyWindows = entry.getValue();
            for (Context context : keyWindows.values()) {
                Set<Context> triggers;
                if (context.processingTimeTimer > 0L) {
                    triggers = this.processingTimeTimers.get(context.processingTimeTimer);
                    if (triggers == null) {
                        this.getRuntimeContext().registerTimer(context.processingTimeTimer, this);
                        triggers = new HashSet<Context>();
                        this.processingTimeTimers.put(context.processingTimeTimer, triggers);
                    }
                    triggers.add(context);
                }
                if (context.watermarkTimer <= 0L) continue;
                triggers = this.watermarkTimers.get(context.watermarkTimer);
                if (triggers == null) {
                    triggers = new HashSet<Context>();
                    this.watermarkTimers.put(context.watermarkTimer, triggers);
                }
                triggers.add(context);
            }
        }
    }

    @Override
    public final void close() throws Exception {
        super.close();
        for (Map.Entry<K, Map<W, Context>> entry : this.windows.entrySet()) {
            Map<W, Context> keyWindows = entry.getValue();
            for (Context window : keyWindows.values()) {
                this.emitWindow(window);
            }
        }
        this.windows.clear();
        this.windowBufferFactory.close();
    }

    @Override
    public final void processElement(StreamRecord<IN> element) throws Exception {
        if (this.setProcessingTime) {
            element.replace(element.getValue(), System.currentTimeMillis());
        }
        Collection<W> elementWindows = this.windowAssigner.assignWindows(element.getValue(), element.getTimestamp());
        Object key = this.keySelector.getKey(element.getValue());
        Map<W, Context> keyWindows = this.windows.get(key);
        if (keyWindows == null) {
            keyWindows = new HashMap<W, Context>();
            this.windows.put(key, keyWindows);
        }
        for (Window window : elementWindows) {
            Context context = keyWindows.get(window);
            if (context == null) {
                WindowBuffer<IN> windowBuffer = this.windowBufferFactory.create();
                context = new Context(this, key, window, windowBuffer);
                keyWindows.put(window, context);
            }
            context.windowBuffer.storeElement(element);
            Trigger.TriggerResult triggerResult = context.onElement(element);
            this.processTriggerResult(triggerResult, key, window);
        }
    }

    protected void emitWindow(Context context) throws Exception {
        this.timestampedCollector.setTimestamp(((Window)context.window).maxTimestamp());
        if (context.windowBuffer.size() > 0) {
            this.setKeyContextElement(context.windowBuffer.getElements().iterator().next());
            ((WindowFunction)this.userFunction).apply(context.key, context.window, context.windowBuffer.getUnpackedElements(), this.timestampedCollector);
        }
    }

    private void processTriggerResult(Trigger.TriggerResult triggerResult, K key, W window) throws Exception {
        Context context;
        if (!triggerResult.isFire() && !triggerResult.isPurge()) {
            return;
        }
        Map<W, Context> keyWindows = this.windows.get(key);
        if (keyWindows == null) {
            LOG.debug("Window {} for key {} already gone.", window, key);
            return;
        }
        if (triggerResult.isPurge()) {
            context = keyWindows.remove(window);
            if (keyWindows.isEmpty()) {
                this.windows.remove(key);
            }
        } else {
            context = keyWindows.get(window);
        }
        if (context == null) {
            LOG.debug("Window {} for key {} already gone.", window, key);
            return;
        }
        if (triggerResult.isFire()) {
            this.emitWindow(context);
        }
    }

    @Override
    public final void processWatermark(Watermark mark) throws Exception {
        ArrayList<Set<Context>> toTrigger = new ArrayList<Set<Context>>();
        Iterator<Map.Entry<Long, Set<Context>>> it = this.watermarkTimers.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<Long, Set<Context>> triggers = it.next();
            if (triggers.getKey() > mark.getTimestamp()) continue;
            toTrigger.add(triggers.getValue());
            it.remove();
        }
        for (Set set : toTrigger) {
            for (Context ctx : set) {
                if (ctx.watermarkTimer > mark.getTimestamp()) continue;
                Trigger.TriggerResult triggerResult = ctx.onEventTime(ctx.watermarkTimer);
                this.processTriggerResult(triggerResult, ctx.key, ctx.window);
            }
        }
        this.output.emitWatermark(mark);
        this.currentWatermark = mark.getTimestamp();
    }

    @Override
    public final void trigger(long time) throws Exception {
        ArrayList<Set<Context>> toTrigger = new ArrayList<Set<Context>>();
        Iterator<Map.Entry<Long, Set<Context>>> it = this.processingTimeTimers.entrySet().iterator();
        while (it.hasNext()) {
            Map.Entry<Long, Set<Context>> triggers = it.next();
            if (triggers.getKey() > time) continue;
            toTrigger.add(triggers.getValue());
            it.remove();
        }
        for (Set set : toTrigger) {
            for (Context ctx : set) {
                if (ctx.processingTimeTimer > time) continue;
                Trigger.TriggerResult triggerResult = ctx.onProcessingTime(ctx.processingTimeTimer);
                this.processTriggerResult(triggerResult, ctx.key, ctx.window);
            }
        }
    }

    public WindowOperator<K, IN, OUT, W> enableSetProcessingTime(boolean setProcessingTime) {
        this.setProcessingTime = setProcessingTime;
        return this;
    }

    @Override
    public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception {
        StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp);
        StateBackend.CheckpointStateOutputView out = this.getStateBackend().createCheckpointStateOutputView(checkpointId, timestamp);
        int numKeys = this.windows.size();
        out.writeInt(numKeys);
        for (Map.Entry<K, Map<W, Context>> keyWindows : this.windows.entrySet()) {
            int numWindows = keyWindows.getValue().size();
            out.writeInt(numWindows);
            for (Context context : keyWindows.getValue().values()) {
                context.writeToState(out);
            }
        }
        taskState.setOperatorState(out.closeAndGetHandle());
        return taskState;
    }

    @Override
    public void restoreState(StreamTaskState taskState) throws Exception {
        super.restoreState(taskState);
        ClassLoader userClassloader = this.getUserCodeClassloader();
        StateHandle<?> inputState = taskState.getOperatorState();
        DataInputView in = (DataInputView)inputState.getState(userClassloader);
        int numKeys = in.readInt();
        this.windows = new HashMap<K, Map<W, Context>>(numKeys);
        this.processingTimeTimers = new HashMap<Long, Set<Context>>();
        this.watermarkTimers = new HashMap<Long, Set<Context>>();
        for (int i = 0; i < numKeys; ++i) {
            int numWindows = in.readInt();
            for (int j = 0; j < numWindows; ++j) {
                Context context = new Context(this, in, userClassloader);
                Map<W, Context> keyWindows = this.windows.get(context.key);
                if (keyWindows == null) {
                    keyWindows = new HashMap<W, Context>(numWindows);
                    this.windows.put(context.key, keyWindows);
                }
                keyWindows.put(context.window, context);
            }
        }
    }

    @VisibleForTesting
    public boolean isSetProcessingTime() {
        return this.setProcessingTime;
    }

    @VisibleForTesting
    public Trigger<? super IN, ? super W> getTrigger() {
        return this.trigger;
    }

    @VisibleForTesting
    public KeySelector<IN, K> getKeySelector() {
        return this.keySelector;
    }

    @VisibleForTesting
    public WindowAssigner<? super IN, W> getWindowAssigner() {
        return this.windowAssigner;
    }

    @VisibleForTesting
    public WindowBufferFactory<? super IN, ? extends WindowBuffer<IN>> getWindowBufferFactory() {
        return this.windowBufferFactory;
    }

    protected static class Context
    implements Trigger.TriggerContext {
        protected K key;
        protected W window;
        protected WindowBuffer<IN> windowBuffer;
        protected HashMap<String, Serializable> state;
        protected long watermarkTimer;
        protected long processingTimeTimer;
        final /* synthetic */ WindowOperator this$0;

        public Context(K key, W window, WindowBuffer<IN> windowBuffer) {
            this.this$0 = this$0;
            this.key = key;
            this.window = window;
            this.windowBuffer = windowBuffer;
            this.state = new HashMap();
            this.watermarkTimer = -1L;
            this.processingTimeTimer = -1L;
        }

        protected Context(WindowOperator this$0, DataInputView in, ClassLoader userClassloader) throws Exception {
            this.this$0 = this$0;
            this.key = this$0.keySerializer.deserialize(in);
            this.window = (Window)this$0.windowSerializer.deserialize(in);
            this.watermarkTimer = in.readLong();
            this.processingTimeTimer = in.readLong();
            int stateSize = in.readInt();
            byte[] stateData = new byte[stateSize];
            in.read(stateData);
            this.state = (HashMap)InstantiationUtil.deserializeObject((byte[])stateData, (ClassLoader)userClassloader);
            this.windowBuffer = this$0.windowBufferFactory.create();
            int numElements = in.readInt();
            MultiplexingStreamRecordSerializer recordSerializer = new MultiplexingStreamRecordSerializer(this$0.inputSerializer);
            for (int i = 0; i < numElements; ++i) {
                this.windowBuffer.storeElement(recordSerializer.deserialize(in).asRecord());
            }
        }

        protected void writeToState(StateBackend.CheckpointStateOutputView out) throws IOException {
            this.this$0.keySerializer.serialize(this.key, (DataOutputView)out);
            this.this$0.windowSerializer.serialize(this.window, (DataOutputView)out);
            out.writeLong(this.watermarkTimer);
            out.writeLong(this.processingTimeTimer);
            byte[] serializedState = InstantiationUtil.serializeObject(this.state);
            out.writeInt(serializedState.length);
            out.write(serializedState, 0, serializedState.length);
            MultiplexingStreamRecordSerializer recordSerializer = new MultiplexingStreamRecordSerializer(this.this$0.inputSerializer);
            out.writeInt(this.windowBuffer.size());
            for (StreamRecord element : this.windowBuffer.getElements()) {
                recordSerializer.serialize(element, (DataOutputView)out);
            }
        }

        @Override
        public <S extends Serializable> OperatorState<S> getKeyValueState(final String name, final S defaultState) {
            return new OperatorState<S>(){

                public S value() throws IOException {
                    Serializable value = Context.this.state.get(name);
                    if (value == null) {
                        Context.this.state.put(name, defaultState);
                        value = defaultState;
                    }
                    return value;
                }

                public void update(S value) throws IOException {
                    Context.this.state.put(name, (Serializable)value);
                }
            };
        }

        @Override
        public void registerProcessingTimeTimer(long time) {
            if (this.processingTimeTimer == time) {
                return;
            }
            HashSet<Context> triggers = (HashSet<Context>)this.this$0.processingTimeTimers.get(time);
            if (triggers == null) {
                this.this$0.getRuntimeContext().registerTimer(time, this.this$0);
                triggers = new HashSet<Context>();
                this.this$0.processingTimeTimers.put(time, triggers);
            }
            this.processingTimeTimer = time;
            triggers.add(this);
        }

        @Override
        public void registerEventTimeTimer(long time) {
            if (this.watermarkTimer == time) {
                return;
            }
            HashSet<Context> triggers = (HashSet<Context>)this.this$0.watermarkTimers.get(time);
            if (triggers == null) {
                triggers = new HashSet<Context>();
                this.this$0.watermarkTimers.put(time, triggers);
            }
            this.watermarkTimer = time;
            triggers.add(this);
        }

        public Trigger.TriggerResult onElement(StreamRecord<IN> element) throws Exception {
            Trigger.TriggerResult onElementResult = this.this$0.trigger.onElement(element.getValue(), element.getTimestamp(), this.window, this);
            if (this.watermarkTimer > 0L && this.watermarkTimer <= this.this$0.currentWatermark) {
                Trigger.TriggerResult onEventTimeResult = this.onEventTime(this.watermarkTimer);
                return Trigger.TriggerResult.merge(onElementResult, onEventTimeResult);
            }
            return onElementResult;
        }

        public Trigger.TriggerResult onProcessingTime(long time) throws Exception {
            if (time == this.processingTimeTimer) {
                this.processingTimeTimer = -1L;
                return this.this$0.trigger.onProcessingTime(time, this.window, this);
            }
            return Trigger.TriggerResult.CONTINUE;
        }

        public Trigger.TriggerResult onEventTime(long time) throws Exception {
            if (time == this.watermarkTimer) {
                this.watermarkTimer = -1L;
                Trigger.TriggerResult firstTriggerResult = this.this$0.trigger.onEventTime(time, this.window, this);
                if (this.watermarkTimer > 0L && this.watermarkTimer <= this.this$0.currentWatermark) {
                    Trigger.TriggerResult secondTriggerResult = this.onEventTime(this.watermarkTimer);
                    return Trigger.TriggerResult.merge(firstTriggerResult, secondTriggerResult);
                }
                return firstTriggerResult;
            }
            return Trigger.TriggerResult.CONTINUE;
        }
    }
}

