/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.operators;

import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

public class BucketStreamSortOperator<T>
extends AbstractStreamOperator<T>
implements OneInputStreamOperator<T, T> {
    private static final long serialVersionUID = 1L;
    private long granularity;
    private transient Map<Long, List<StreamRecord<T>>> buckets;

    public BucketStreamSortOperator(long interval) {
        this.granularity = interval;
    }

    @Override
    public void open() throws Exception {
        super.open();
        this.buckets = new HashMap<Long, List<StreamRecord<T>>>();
    }

    @Override
    public void processElement(StreamRecord<T> record) throws Exception {
        long bucketId = record.getTimestamp() - record.getTimestamp() % this.granularity;
        List<StreamRecord<T>> bucket = this.buckets.get(bucketId);
        if (bucket == null) {
            bucket = new ArrayList<StreamRecord<T>>();
            this.buckets.put(bucketId, bucket);
        }
        bucket.add(record);
    }

    @Override
    public void processWatermark(Watermark mark) throws Exception {
        long maxBucketId = mark.getTimestamp() - mark.getTimestamp() % this.granularity;
        HashSet<Long> toRemove = new HashSet<Long>();
        for (Map.Entry<Long, List<StreamRecord<T>>> bucket : this.buckets.entrySet()) {
            if (bucket.getKey() >= maxBucketId) continue;
            Collections.sort(bucket.getValue(), new Comparator<StreamRecord<T>>(){

                @Override
                public int compare(StreamRecord<T> o1, StreamRecord<T> o2) {
                    return (int)(o1.getTimestamp() - o2.getTimestamp());
                }
            });
            for (StreamRecord<T> r : bucket.getValue()) {
                this.output.collect(r);
            }
            toRemove.add(bucket.getKey());
        }
        for (Long l : toRemove) {
            this.buckets.remove(l);
        }
        this.output.emitWatermark(mark);
    }
}

