/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.functions.windowing;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.functions.FoldFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.operators.translation.WrappingFunction;
import org.apache.flink.core.memory.DataInputView;
import org.apache.flink.core.memory.DataOutputView;
import org.apache.flink.core.memory.InputViewDataInputStreamWrapper;
import org.apache.flink.core.memory.OutputViewDataOutputStreamWrapper;
import org.apache.flink.streaming.api.functions.windowing.WindowFunction;
import org.apache.flink.streaming.api.operators.OutputTypeConfigurable;
import org.apache.flink.streaming.api.windowing.windows.Window;
import org.apache.flink.util.Collector;

public class FoldWindowFunction<K, W extends Window, T, R>
extends WrappingFunction<FoldFunction<T, R>>
implements WindowFunction<T, R, K, W>,
OutputTypeConfigurable<R> {
    private static final long serialVersionUID = 1L;
    private byte[] serializedInitialValue;
    private TypeSerializer<R> outSerializer;
    private transient R initialValue;

    public FoldWindowFunction(R initialValue, FoldFunction<T, R> reduceFunction) {
        super(reduceFunction);
        this.initialValue = initialValue;
    }

    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
        in.defaultReadObject();
        if (this.serializedInitialValue == null) {
            throw new RuntimeException("No initial value was serialized for the fold window function. Probably the setOutputType method was not called.");
        }
        ByteArrayInputStream bais = new ByteArrayInputStream(this.serializedInitialValue);
        InputViewDataInputStreamWrapper inStream = new InputViewDataInputStreamWrapper(new DataInputStream(bais));
        this.initialValue = this.outSerializer.deserialize((DataInputView)inStream);
    }

    @Override
    public void apply(K k, W window, Iterable<T> values, Collector<R> out) throws Exception {
        Object result = this.outSerializer.copy(this.initialValue);
        for (T val : values) {
            result = ((FoldFunction)this.wrappedFunction).fold(result, val);
        }
        out.collect(result);
    }

    @Override
    public void setOutputType(TypeInformation<R> outTypeInfo, ExecutionConfig executionConfig) {
        this.outSerializer = outTypeInfo.createSerializer(executionConfig);
        ByteArrayOutputStream baos = new ByteArrayOutputStream();
        OutputViewDataOutputStreamWrapper out = new OutputViewDataOutputStreamWrapper(new DataOutputStream(baos));
        try {
            this.outSerializer.serialize(this.initialValue, (DataOutputView)out);
        }
        catch (IOException ioe) {
            throw new RuntimeException("Unable to serialize initial value of type " + this.initialValue.getClass().getSimpleName() + " of fold window function.", ioe);
        }
        this.serializedInitialValue = baos.toByteArray();
    }
}

