/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators.windowing.buffers;

import java.util.Collections;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.functions.util.FunctionUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBuffer;
import org.apache.flink.streaming.runtime.operators.windowing.buffers.WindowBufferFactory;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

public class PreAggregatingHeapWindowBuffer<T>
implements WindowBuffer<T> {
    private static final long serialVersionUID = 1L;
    private final ReduceFunction<T> reduceFunction;
    private transient StreamRecord<T> data;

    protected PreAggregatingHeapWindowBuffer(ReduceFunction<T> reduceFunction) {
        this.reduceFunction = reduceFunction;
    }

    @Override
    public void storeElement(StreamRecord<T> element) throws Exception {
        if (this.data == null) {
            this.data = new StreamRecord<T>(element.getValue(), element.getTimestamp());
        } else {
            this.data.replace(this.reduceFunction.reduce(this.data.getValue(), element.getValue()));
        }
    }

    @Override
    public Iterable<StreamRecord<T>> getElements() {
        return Collections.singleton(this.data);
    }

    @Override
    public Iterable<T> getUnpackedElements() {
        return Collections.singleton(this.data.getValue());
    }

    @Override
    public int size() {
        return 1;
    }

    public static class Factory<T>
    implements WindowBufferFactory<T, PreAggregatingHeapWindowBuffer<T>> {
        private static final long serialVersionUID = 1L;
        private final ReduceFunction<T> reduceFunction;

        public Factory(ReduceFunction<T> reduceFunction) {
            this.reduceFunction = reduceFunction;
        }

        @Override
        public void setRuntimeContext(RuntimeContext ctx) {
            FunctionUtils.setFunctionRuntimeContext(this.reduceFunction, (RuntimeContext)ctx);
        }

        @Override
        public void open(Configuration config) throws Exception {
            FunctionUtils.openFunction(this.reduceFunction, (Configuration)config);
        }

        @Override
        public void close() throws Exception {
            FunctionUtils.closeFunction(this.reduceFunction);
        }

        @Override
        public PreAggregatingHeapWindowBuffer<T> create() {
            return new PreAggregatingHeapWindowBuffer<T>(this.reduceFunction);
        }
    }
}

