/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators.windowing;

import java.util.Objects;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
import org.apache.flink.streaming.api.windowing.evictors.Evictor;
import org.apache.flink.streaming.api.windowing.triggers.Trigger;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator;
import org.apache.flink.streaming.runtime.operators.windowing.buffers.EvictingWindowBuffer;
import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory;

public class EvictingWindowOperator<K, IN, OUT, W extends Window>
extends WindowOperator<K, IN, OUT, W> {
    private static final long serialVersionUID = 1L;
    private final Evictor<? super IN, ? super W> evictor;

    public EvictingWindowOperator(WindowAssigner<? super IN, W> windowAssigner, TypeSerializer<W> windowSerializer, KeySelector<IN, K> keySelector, TypeSerializer<K> keySerializer, WindowBufferFactory<? super IN, ? extends EvictingWindowBuffer<IN>> windowBufferFactory, WindowFunction<IN, OUT, K, W> windowFunction, Trigger<? super IN, ? super W> trigger, Evictor<? super IN, ? super W> evictor) {
        super(windowAssigner, windowSerializer, keySelector, keySerializer, windowBufferFactory, windowFunction, trigger);
        this.evictor = Objects.requireNonNull(evictor);
    }

    @Override
    protected void emitWindow(WindowOperator.Context context) throws Exception {
        this.timestampedCollector.setTimestamp(((Window)context.window).maxTimestamp());
        EvictingWindowBuffer windowBuffer = (EvictingWindowBuffer)context.windowBuffer;
        int toEvict = 0;
        if (windowBuffer.size() > 0) {
            toEvict = this.evictor.evict(windowBuffer.getElements(), windowBuffer.size(), context.window);
        }
        windowBuffer.removeElements(toEvict);
        ((WindowFunction)this.userFunction).apply(context.key, context.window, context.windowBuffer.getUnpackedElements(), this.timestampedCollector);
    }

    @Override
    public EvictingWindowOperator<K, IN, OUT, W> enableSetProcessingTime(boolean setProcessingTime) {
        super.enableSetProcessingTime(setProcessingTime);
        return this;
    }

    @VisibleForTesting
    public Evictor<? super IN, ? super W> getEvictor() {
        return this.evictor;
    }
}

