/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.api.functions.source;

import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.TypeExtractor;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.state.SerializedCheckpointData;
import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier;
import org.apache.flink.streaming.api.checkpoint.Checkpointed;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;

public abstract class MessageAcknowledingSourceBase<Type, Id>
extends RichSourceFunction<Type>
implements Checkpointed<SerializedCheckpointData[]>,
CheckpointNotifier {
    private static final long serialVersionUID = -8689291992192955579L;
    private final TypeSerializer<Id> idSerializer;
    private transient List<Id> idsForCurrentCheckpoint;
    private transient ArrayDeque<Tuple2<Long, List<Id>>> pendingCheckpoints;

    protected MessageAcknowledingSourceBase(Class<Id> idClass) {
        this(TypeExtractor.getForClass(idClass));
    }

    protected MessageAcknowledingSourceBase(TypeInformation<Id> idTypeInfo) {
        this.idSerializer = idTypeInfo.createSerializer(new ExecutionConfig());
    }

    public void open(Configuration parameters) throws Exception {
        this.idsForCurrentCheckpoint = new ArrayList<Id>(64);
        this.pendingCheckpoints = new ArrayDeque();
    }

    public void close() throws Exception {
        this.idsForCurrentCheckpoint.clear();
        this.pendingCheckpoints.clear();
    }

    protected abstract void acknowledgeIDs(List<Id> var1);

    protected void addId(Id id) {
        this.idsForCurrentCheckpoint.add(id);
    }

    @Override
    public SerializedCheckpointData[] snapshotState(long checkpointId, long checkpointTimestamp) throws Exception {
        this.pendingCheckpoints.addLast(new Tuple2((Object)checkpointId, this.idsForCurrentCheckpoint));
        this.idsForCurrentCheckpoint = new ArrayList<Id>(64);
        return SerializedCheckpointData.fromDeque(this.pendingCheckpoints, this.idSerializer);
    }

    @Override
    public void restoreState(SerializedCheckpointData[] state) throws Exception {
        this.pendingCheckpoints = SerializedCheckpointData.toDeque((SerializedCheckpointData[])state, this.idSerializer);
    }

    @Override
    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        Iterator<Tuple2<Long, List<Id>>> iter = this.pendingCheckpoints.iterator();
        while (iter.hasNext()) {
            Tuple2<Long, List<Id>> checkpoint = iter.next();
            long id = (Long)checkpoint.f0;
            if (id > checkpointId) break;
            this.acknowledgeIDs((List)checkpoint.f1);
            iter.remove();
        }
    }
}

