/*
 * Decompiled with CFR 0.152.
 */
package org.apache.samza.util;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.apache.samza.metrics.Counter;
import org.apache.samza.metrics.Gauge;
import org.apache.samza.metrics.MetricsRegistry;
import org.apache.samza.system.IncomingMessageEnvelope;
import org.apache.samza.system.SystemConsumer;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.util.Clock;
import org.apache.samza.util.NoOpMetricsRegistry;

public abstract class BlockingEnvelopeMap
implements SystemConsumer {
    private final BlockingEnvelopeMapMetrics metrics;
    private final ConcurrentHashMap<SystemStreamPartition, BlockingQueue<IncomingMessageEnvelope>> bufferedMessages;
    private final Map<SystemStreamPartition, Boolean> noMoreMessage;
    private final Clock clock;

    public BlockingEnvelopeMap() {
        this(new NoOpMetricsRegistry());
    }

    public BlockingEnvelopeMap(Clock clock) {
        this(new NoOpMetricsRegistry(), clock);
    }

    public BlockingEnvelopeMap(MetricsRegistry metricsRegistry) {
        this(metricsRegistry, new Clock(){

            @Override
            public long currentTimeMillis() {
                return System.currentTimeMillis();
            }
        });
    }

    public BlockingEnvelopeMap(MetricsRegistry metricsRegistry, Clock clock) {
        this(metricsRegistry, clock, null);
    }

    public BlockingEnvelopeMap(MetricsRegistry metricsRegistry, Clock clock, String metricsGroupName) {
        metricsGroupName = metricsGroupName == null ? this.getClass().getName() : metricsGroupName;
        this.metrics = new BlockingEnvelopeMapMetrics(metricsGroupName, metricsRegistry);
        this.bufferedMessages = new ConcurrentHashMap();
        this.noMoreMessage = new ConcurrentHashMap<SystemStreamPartition, Boolean>();
        this.clock = clock;
    }

    @Override
    public void register(SystemStreamPartition systemStreamPartition, String offset) {
        this.metrics.initMetrics(systemStreamPartition);
        this.bufferedMessages.putIfAbsent(systemStreamPartition, this.newBlockingQueue());
    }

    protected BlockingQueue<IncomingMessageEnvelope> newBlockingQueue() {
        return new LinkedBlockingQueue<IncomingMessageEnvelope>();
    }

    @Override
    public List<IncomingMessageEnvelope> poll(Map<SystemStreamPartition, Integer> systemStreamPartitionAndMaxPerStream, long timeout) throws InterruptedException {
        long stopTime = this.clock.currentTimeMillis() + timeout;
        ArrayList<IncomingMessageEnvelope> messagesToReturn = new ArrayList<IncomingMessageEnvelope>();
        this.metrics.incPoll();
        for (Map.Entry<SystemStreamPartition, Integer> systemStreamPartitionAndMaxCount : systemStreamPartitionAndMaxPerStream.entrySet()) {
            SystemStreamPartition systemStreamPartition = systemStreamPartitionAndMaxCount.getKey();
            Integer numMessages = systemStreamPartitionAndMaxCount.getValue();
            BlockingQueue<IncomingMessageEnvelope> queue = this.bufferedMessages.get(systemStreamPartition);
            IncomingMessageEnvelope envelope = null;
            ArrayList<IncomingMessageEnvelope> systemStreamPartitionMessages = new ArrayList<IncomingMessageEnvelope>();
            for (int i = 0; i < numMessages && (i == 0 || envelope != null); ++i) {
                envelope = (IncomingMessageEnvelope)queue.poll();
                if (envelope == null) continue;
                systemStreamPartitionMessages.add(envelope);
            }
            if (systemStreamPartitionMessages.size() == 0) {
                long timeRemaining = stopTime - this.clock.currentTimeMillis();
                if (timeout == -1L) {
                    while (systemStreamPartitionMessages.size() < numMessages && !this.isAtHead(systemStreamPartition)) {
                        this.metrics.incBlockingPoll(systemStreamPartition);
                        envelope = queue.poll(1000L, TimeUnit.MILLISECONDS);
                        if (envelope == null) continue;
                        systemStreamPartitionMessages.add(envelope);
                    }
                } else if (timeout > 0L && timeRemaining > 0L) {
                    this.metrics.incBlockingTimeoutPoll(systemStreamPartition);
                    envelope = queue.poll(timeRemaining, TimeUnit.MILLISECONDS);
                    if (envelope != null) {
                        systemStreamPartitionMessages.add(envelope);
                    }
                }
            }
            messagesToReturn.addAll(systemStreamPartitionMessages);
        }
        return messagesToReturn;
    }

    protected void put(SystemStreamPartition systemStreamPartition, IncomingMessageEnvelope envelope) throws InterruptedException {
        this.bufferedMessages.get(systemStreamPartition).put(envelope);
    }

    protected void putAll(SystemStreamPartition systemStreamPartition, List<IncomingMessageEnvelope> envelopes) throws InterruptedException {
        BlockingQueue<IncomingMessageEnvelope> queue = this.bufferedMessages.get(systemStreamPartition);
        for (IncomingMessageEnvelope envelope : envelopes) {
            queue.put(envelope);
        }
    }

    public int getNumMessagesInQueue(SystemStreamPartition systemStreamPartition) {
        BlockingQueue<IncomingMessageEnvelope> queue = this.bufferedMessages.get(systemStreamPartition);
        if (queue == null) {
            throw new NullPointerException("Attempting to get queue for " + systemStreamPartition + ", but the system/stream/partition was never registered.");
        }
        return queue.size();
    }

    protected Boolean setIsAtHead(SystemStreamPartition systemStreamPartition, boolean isAtHead) {
        this.metrics.setNoMoreMessages(systemStreamPartition, isAtHead);
        return this.noMoreMessage.put(systemStreamPartition, isAtHead);
    }

    protected boolean isAtHead(SystemStreamPartition systemStreamPartition) {
        Boolean isAtHead = this.noMoreMessage.get(systemStreamPartition);
        return this.getNumMessagesInQueue(systemStreamPartition) == 0 && isAtHead != null && isAtHead.equals(true);
    }

    public class BufferGauge
    extends Gauge<Integer> {
        private final SystemStreamPartition systemStreamPartition;

        public BufferGauge(SystemStreamPartition systemStreamPartition, String name) {
            super(name, 0);
            this.systemStreamPartition = systemStreamPartition;
        }

        @Override
        public Integer getValue() {
            Queue envelopes = (Queue)BlockingEnvelopeMap.this.bufferedMessages.get(this.systemStreamPartition);
            if (envelopes == null) {
                return 0;
            }
            return envelopes.size();
        }
    }

    public class BlockingEnvelopeMapMetrics {
        private final String group;
        private final MetricsRegistry metricsRegistry;
        private final ConcurrentHashMap<SystemStreamPartition, Gauge<Boolean>> noMoreMessageGaugeMap;
        private final ConcurrentHashMap<SystemStreamPartition, Counter> blockingPollCountMap;
        private final ConcurrentHashMap<SystemStreamPartition, Counter> blockingPollTimeoutCountMap;
        private final Counter pollCount;

        public BlockingEnvelopeMapMetrics(String group, MetricsRegistry metricsRegistry) {
            this.group = group;
            this.metricsRegistry = metricsRegistry;
            this.noMoreMessageGaugeMap = new ConcurrentHashMap();
            this.blockingPollCountMap = new ConcurrentHashMap();
            this.blockingPollTimeoutCountMap = new ConcurrentHashMap();
            this.pollCount = metricsRegistry.newCounter(group, "poll-count");
        }

        public void initMetrics(SystemStreamPartition systemStreamPartition) {
            this.noMoreMessageGaugeMap.putIfAbsent(systemStreamPartition, this.metricsRegistry.newGauge(this.group, "no-more-messages-" + systemStreamPartition, false));
            this.blockingPollCountMap.putIfAbsent(systemStreamPartition, this.metricsRegistry.newCounter(this.group, "blocking-poll-count-" + systemStreamPartition));
            this.blockingPollTimeoutCountMap.putIfAbsent(systemStreamPartition, this.metricsRegistry.newCounter(this.group, "blocking-poll-timeout-count-" + systemStreamPartition));
            this.metricsRegistry.newGauge(this.group, new BufferGauge(systemStreamPartition, "buffered-message-count-" + systemStreamPartition));
        }

        public void setNoMoreMessages(SystemStreamPartition systemStreamPartition, boolean noMoreMessages) {
            this.noMoreMessageGaugeMap.get(systemStreamPartition).set(noMoreMessages);
        }

        public void incBlockingPoll(SystemStreamPartition systemStreamPartition) {
            this.blockingPollCountMap.get(systemStreamPartition).inc();
        }

        public void incBlockingTimeoutPoll(SystemStreamPartition systemStreamPartition) {
            this.blockingPollTimeoutCountMap.get(systemStreamPartition).inc();
        }

        public void incPoll() {
            this.pollCount.inc();
        }
    }
}

