/*
 * Decompiled with CFR 0.152.
 */
package org.apache.kafka.clients.consumer.internals;

import java.io.DataOutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.apache.kafka.clients.KafkaClient;
import org.apache.kafka.clients.Metadata;
import org.apache.kafka.clients.MockClient;
import org.apache.kafka.clients.NodeApiVersions;
import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetAndTimestamp;
import org.apache.kafka.clients.consumer.OffsetOutOfRangeException;
import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient;
import org.apache.kafka.clients.consumer.internals.Fetcher;
import org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener;
import org.apache.kafka.clients.consumer.internals.SubscriptionState;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.InvalidTopicException;
import org.apache.kafka.common.errors.RecordTooLargeException;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.errors.TopicAuthorizationException;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.ByteBufferOutputStream;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.Records;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.requests.AbstractRequest;
import org.apache.kafka.common.requests.AbstractResponse;
import org.apache.kafka.common.requests.ApiVersionsResponse;
import org.apache.kafka.common.requests.FetchRequest;
import org.apache.kafka.common.requests.FetchResponse;
import org.apache.kafka.common.requests.ListOffsetRequest;
import org.apache.kafka.common.requests.ListOffsetResponse;
import org.apache.kafka.common.requests.MetadataRequest;
import org.apache.kafka.common.requests.MetadataResponse;
import org.apache.kafka.common.serialization.ByteArrayDeserializer;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.utils.MockTime;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.test.TestUtils;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class FetcherTest {
    private ConsumerRebalanceListener listener = new NoOpConsumerRebalanceListener();
    private String topicName = "test";
    private String groupId = "test-group";
    private final String metricGroup = "consumer" + this.groupId + "-fetch-manager-metrics";
    private TopicPartition tp = new TopicPartition(this.topicName, 0);
    private int minBytes = 1;
    private int maxBytes = Integer.MAX_VALUE;
    private int maxWaitMs = 0;
    private int fetchSize = 1000;
    private long retryBackoffMs = 100L;
    private MockTime time = new MockTime(1L);
    private Metadata metadata = new Metadata(0L, Long.MAX_VALUE);
    private MockClient client = new MockClient(this.time, this.metadata);
    private Cluster cluster = TestUtils.singletonCluster(this.topicName, 1);
    private Node node = (Node)this.cluster.nodes().get(0);
    private Metrics metrics = new Metrics((Time)this.time);
    private SubscriptionState subscriptions = new SubscriptionState(OffsetResetStrategy.EARLIEST);
    private SubscriptionState subscriptionsNoAutoReset = new SubscriptionState(OffsetResetStrategy.NONE);
    private static final double EPSILON = 1.0E-4;
    private ConsumerNetworkClient consumerClient = new ConsumerNetworkClient((KafkaClient)this.client, this.metadata, (Time)this.time, 100L, 1000L);
    private MemoryRecords records;
    private MemoryRecords nextRecords;
    private Fetcher<byte[], byte[]> fetcher = this.createFetcher(this.subscriptions, this.metrics);
    private Metrics fetcherMetrics = new Metrics((Time)this.time);
    private Fetcher<byte[], byte[]> fetcherNoAutoReset = this.createFetcher(this.subscriptionsNoAutoReset, this.fetcherMetrics);

    @Before
    public void setup() throws Exception {
        this.metadata.update(this.cluster, this.time.milliseconds());
        this.client.setNode(this.node);
        MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)ByteBuffer.allocate(1024), (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME, (long)1L);
        builder.append(0L, "key".getBytes(), "value-1".getBytes());
        builder.append(0L, "key".getBytes(), "value-2".getBytes());
        builder.append(0L, "key".getBytes(), "value-3".getBytes());
        this.records = builder.build();
        builder = MemoryRecords.builder((ByteBuffer)ByteBuffer.allocate(1024), (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME, (long)4L);
        builder.append(0L, "key".getBytes(), "value-4".getBytes());
        builder.append(0L, "key".getBytes(), "value-5".getBytes());
        this.nextRecords = builder.build();
    }

    @After
    public void teardown() {
        this.metrics.close();
        this.fetcherMetrics.close();
    }

    @Test
    public void testFetchNormal() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp));
        this.subscriptions.seek(this.tp, 0L);
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        Assert.assertFalse((boolean)this.fetcher.hasCompletedFetches());
        this.client.prepareResponse((AbstractResponse)this.fetchResponse(this.records, Errors.NONE.code(), 100L, 0));
        this.consumerClient.poll(0L);
        Assert.assertTrue((boolean)this.fetcher.hasCompletedFetches());
        Map partitionRecords = this.fetcher.fetchedRecords();
        Assert.assertTrue((boolean)partitionRecords.containsKey(this.tp));
        List records = (List)partitionRecords.get(this.tp);
        Assert.assertEquals((long)3L, (long)records.size());
        Assert.assertEquals((long)4L, (long)this.subscriptions.position(this.tp));
        long offset = 1L;
        for (ConsumerRecord record : records) {
            Assert.assertEquals((long)offset, (long)record.offset());
            ++offset;
        }
    }

    @Test
    public void testFetchError() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp));
        this.subscriptions.seek(this.tp, 0L);
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        Assert.assertFalse((boolean)this.fetcher.hasCompletedFetches());
        this.client.prepareResponse((AbstractResponse)this.fetchResponse(this.records, Errors.NOT_LEADER_FOR_PARTITION.code(), 100L, 0));
        this.consumerClient.poll(0L);
        Assert.assertTrue((boolean)this.fetcher.hasCompletedFetches());
        Map partitionRecords = this.fetcher.fetchedRecords();
        Assert.assertFalse((boolean)partitionRecords.containsKey(this.tp));
    }

    private MockClient.RequestMatcher matchesOffset(final TopicPartition tp, final long offset) {
        return new MockClient.RequestMatcher(){

            @Override
            public boolean matches(AbstractRequest body) {
                FetchRequest fetch = (FetchRequest)body;
                return fetch.fetchData().containsKey(tp) && ((FetchRequest.PartitionData)fetch.fetchData().get((Object)tp)).offset == offset;
            }
        };
    }

    @Test
    public void testFetchedRecordsRaisesOnSerializationErrors() {
        ByteArrayDeserializer deserializer = new ByteArrayDeserializer(){
            int i = 0;

            public byte[] deserialize(String topic, byte[] data) {
                if (this.i++ == 1) {
                    throw new SerializationException();
                }
                return data;
            }
        };
        Fetcher fetcher = this.createFetcher(this.subscriptions, new Metrics((Time)this.time), (Deserializer)deserializer, (Deserializer)deserializer);
        this.subscriptions.assignFromUser(Collections.singleton(this.tp));
        this.subscriptions.seek(this.tp, 1L);
        this.client.prepareResponse(this.matchesOffset(this.tp, 1L), (AbstractResponse)this.fetchResponse(this.records, Errors.NONE.code(), 100L, 0));
        Assert.assertEquals((long)1L, (long)fetcher.sendFetches());
        this.consumerClient.poll(0L);
        try {
            fetcher.fetchedRecords();
            Assert.fail((String)"fetchedRecords should have raised");
        }
        catch (SerializationException e) {
            Assert.assertEquals((long)1L, (long)this.subscriptions.position(this.tp));
        }
    }

    @Test
    public void testParseInvalidRecord() throws Exception {
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        ByteBufferOutputStream out = new ByteBufferOutputStream(buffer);
        byte magic = 1;
        byte[] key = "foo".getBytes();
        byte[] value = "baz".getBytes();
        long offset = 0L;
        long timestamp = 500L;
        int size = Record.recordSize((byte[])key, (byte[])value);
        byte attributes = Record.computeAttributes((byte)magic, (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME);
        long crc = Record.computeChecksum((byte)magic, (byte)attributes, (long)timestamp, (byte[])key, (byte[])value);
        out.writeLong(offset);
        out.writeInt(size);
        Record.write((DataOutputStream)out, (byte)magic, (long)crc, (byte)Record.computeAttributes((byte)magic, (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME), (long)timestamp, (byte[])key, (byte[])value);
        out.writeLong(offset);
        out.writeInt(size);
        Record.write((DataOutputStream)out, (byte)magic, (long)(crc + 1L), (byte)Record.computeAttributes((byte)magic, (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME), (long)timestamp, (byte[])key, (byte[])value);
        buffer.flip();
        this.subscriptions.assignFromUser(Collections.singleton(this.tp));
        this.subscriptions.seek(this.tp, 0L);
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        this.client.prepareResponse((AbstractResponse)this.fetchResponse(MemoryRecords.readableRecords((ByteBuffer)buffer), Errors.NONE.code(), 100L, 0));
        this.consumerClient.poll(0L);
        try {
            this.fetcher.fetchedRecords();
            Assert.fail((String)"fetchedRecords should have raised");
        }
        catch (KafkaException e) {
            Assert.assertEquals((long)0L, (long)this.subscriptions.position(this.tp));
        }
    }

    @Test
    public void testFetchMaxPollRecords() {
        Fetcher<byte[], byte[]> fetcher = this.createFetcher(this.subscriptions, new Metrics((Time)this.time), 2);
        this.subscriptions.assignFromUser(Collections.singleton(this.tp));
        this.subscriptions.seek(this.tp, 1L);
        this.client.prepareResponse(this.matchesOffset(this.tp, 1L), (AbstractResponse)this.fetchResponse(this.records, Errors.NONE.code(), 100L, 0));
        this.client.prepareResponse(this.matchesOffset(this.tp, 4L), (AbstractResponse)this.fetchResponse(this.nextRecords, Errors.NONE.code(), 100L, 0));
        Assert.assertEquals((long)1L, (long)fetcher.sendFetches());
        this.consumerClient.poll(0L);
        List records = (List)fetcher.fetchedRecords().get(this.tp);
        Assert.assertEquals((long)2L, (long)records.size());
        Assert.assertEquals((long)3L, (long)this.subscriptions.position(this.tp));
        Assert.assertEquals((long)1L, (long)((ConsumerRecord)records.get(0)).offset());
        Assert.assertEquals((long)2L, (long)((ConsumerRecord)records.get(1)).offset());
        Assert.assertEquals((long)0L, (long)fetcher.sendFetches());
        this.consumerClient.poll(0L);
        records = (List)fetcher.fetchedRecords().get(this.tp);
        Assert.assertEquals((long)1L, (long)records.size());
        Assert.assertEquals((long)4L, (long)this.subscriptions.position(this.tp));
        Assert.assertEquals((long)3L, (long)((ConsumerRecord)records.get(0)).offset());
        Assert.assertTrue((fetcher.sendFetches() > 0 ? 1 : 0) != 0);
        this.consumerClient.poll(0L);
        records = (List)fetcher.fetchedRecords().get(this.tp);
        Assert.assertEquals((long)2L, (long)records.size());
        Assert.assertEquals((long)6L, (long)this.subscriptions.position(this.tp));
        Assert.assertEquals((long)4L, (long)((ConsumerRecord)records.get(0)).offset());
        Assert.assertEquals((long)5L, (long)((ConsumerRecord)records.get(1)).offset());
    }

    @Test
    public void testFetchNonContinuousRecords() {
        MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)ByteBuffer.allocate(1024), (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME);
        builder.appendWithOffset(15L, 0L, "key".getBytes(), "value-1".getBytes());
        builder.appendWithOffset(20L, 0L, "key".getBytes(), "value-2".getBytes());
        builder.appendWithOffset(30L, 0L, "key".getBytes(), "value-3".getBytes());
        MemoryRecords records = builder.build();
        this.subscriptions.assignFromUser(Collections.singleton(this.tp));
        this.subscriptions.seek(this.tp, 0L);
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        this.client.prepareResponse((AbstractResponse)this.fetchResponse(records, Errors.NONE.code(), 100L, 0));
        this.consumerClient.poll(0L);
        List consumerRecords = (List)this.fetcher.fetchedRecords().get(this.tp);
        Assert.assertEquals((long)3L, (long)consumerRecords.size());
        Assert.assertEquals((long)31L, (long)this.subscriptions.position(this.tp));
        Assert.assertEquals((long)15L, (long)((ConsumerRecord)consumerRecords.get(0)).offset());
        Assert.assertEquals((long)20L, (long)((ConsumerRecord)consumerRecords.get(1)).offset());
        Assert.assertEquals((long)30L, (long)((ConsumerRecord)consumerRecords.get(2)).offset());
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Test
    public void testFetchRequestWhenRecordTooLarge() {
        try {
            this.client.setNodeApiVersions(NodeApiVersions.create(Collections.singletonList(new ApiVersionsResponse.ApiVersion(ApiKeys.FETCH.id, 2, 2))));
            this.makeFetchRequestWithIncompleteRecord();
            try {
                this.fetcher.fetchedRecords();
                Assert.fail((String)"RecordTooLargeException should have been raised");
            }
            catch (RecordTooLargeException e) {
                Assert.assertTrue((boolean)e.getMessage().startsWith("There are some messages at [Partition=Offset]: "));
                Assert.assertEquals((long)0L, (long)this.subscriptions.position(this.tp));
            }
        }
        finally {
            this.client.setNodeApiVersions(NodeApiVersions.create());
        }
    }

    @Test
    public void testFetchRequestInternalError() {
        this.makeFetchRequestWithIncompleteRecord();
        try {
            this.fetcher.fetchedRecords();
            Assert.fail((String)"RecordTooLargeException should have been raised");
        }
        catch (KafkaException e) {
            Assert.assertTrue((boolean)e.getMessage().startsWith("Failed to make progress reading messages"));
            Assert.assertEquals((long)0L, (long)this.subscriptions.position(this.tp));
        }
    }

    private void makeFetchRequestWithIncompleteRecord() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp));
        this.subscriptions.seek(this.tp, 0L);
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        Assert.assertFalse((boolean)this.fetcher.hasCompletedFetches());
        MemoryRecords partialRecord = MemoryRecords.readableRecords((ByteBuffer)ByteBuffer.wrap(new byte[]{0, 0, 0, 0, 0, 0, 0, 0}));
        this.client.prepareResponse((AbstractResponse)this.fetchResponse(partialRecord, Errors.NONE.code(), 100L, 0));
        this.consumerClient.poll(0L);
        Assert.assertTrue((boolean)this.fetcher.hasCompletedFetches());
    }

    @Test
    public void testUnauthorizedTopic() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp));
        this.subscriptions.seek(this.tp, 0L);
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        this.client.prepareResponse((AbstractResponse)this.fetchResponse(this.records, Errors.TOPIC_AUTHORIZATION_FAILED.code(), 100L, 0));
        this.consumerClient.poll(0L);
        try {
            this.fetcher.fetchedRecords();
            Assert.fail((String)"fetchedRecords should have thrown");
        }
        catch (TopicAuthorizationException e) {
            Assert.assertEquals(Collections.singleton(this.topicName), (Object)e.unauthorizedTopics());
        }
    }

    @Test
    public void testFetchDuringRebalance() {
        this.subscriptions.subscribe(Collections.singleton(this.topicName), this.listener);
        this.subscriptions.assignFromSubscribed(Collections.singleton(this.tp));
        this.subscriptions.seek(this.tp, 0L);
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        this.subscriptions.assignFromSubscribed(Collections.singleton(this.tp));
        this.client.prepareResponse((AbstractResponse)this.fetchResponse(this.records, Errors.NONE.code(), 100L, 0));
        this.consumerClient.poll(0L);
        Assert.assertTrue((boolean)this.fetcher.fetchedRecords().isEmpty());
    }

    @Test
    public void testInFlightFetchOnPausedPartition() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp));
        this.subscriptions.seek(this.tp, 0L);
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        this.subscriptions.pause(this.tp);
        this.client.prepareResponse((AbstractResponse)this.fetchResponse(this.records, Errors.NONE.code(), 100L, 0));
        this.consumerClient.poll(0L);
        Assert.assertNull(this.fetcher.fetchedRecords().get(this.tp));
    }

    @Test
    public void testFetchOnPausedPartition() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp));
        this.subscriptions.seek(this.tp, 0L);
        this.subscriptions.pause(this.tp);
        Assert.assertFalse((this.fetcher.sendFetches() > 0 ? 1 : 0) != 0);
        Assert.assertTrue((boolean)this.client.requests().isEmpty());
    }

    @Test
    public void testFetchNotLeaderForPartition() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp));
        this.subscriptions.seek(this.tp, 0L);
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        this.client.prepareResponse((AbstractResponse)this.fetchResponse(this.records, Errors.NOT_LEADER_FOR_PARTITION.code(), 100L, 0));
        this.consumerClient.poll(0L);
        Assert.assertEquals((long)0L, (long)this.fetcher.fetchedRecords().size());
        Assert.assertEquals((long)0L, (long)this.metadata.timeToNextUpdate(this.time.milliseconds()));
    }

    @Test
    public void testFetchUnknownTopicOrPartition() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp));
        this.subscriptions.seek(this.tp, 0L);
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        this.client.prepareResponse((AbstractResponse)this.fetchResponse(this.records, Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), 100L, 0));
        this.consumerClient.poll(0L);
        Assert.assertEquals((long)0L, (long)this.fetcher.fetchedRecords().size());
        Assert.assertEquals((long)0L, (long)this.metadata.timeToNextUpdate(this.time.milliseconds()));
    }

    @Test
    public void testFetchOffsetOutOfRange() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp));
        this.subscriptions.seek(this.tp, 0L);
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        this.client.prepareResponse((AbstractResponse)this.fetchResponse(this.records, Errors.OFFSET_OUT_OF_RANGE.code(), 100L, 0));
        this.consumerClient.poll(0L);
        Assert.assertEquals((long)0L, (long)this.fetcher.fetchedRecords().size());
        Assert.assertTrue((boolean)this.subscriptions.isOffsetResetNeeded(this.tp));
        Assert.assertEquals(null, (Object)this.subscriptions.position(this.tp));
    }

    @Test
    public void testStaleOutOfRangeError() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp));
        this.subscriptions.seek(this.tp, 0L);
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        this.client.prepareResponse((AbstractResponse)this.fetchResponse(this.records, Errors.OFFSET_OUT_OF_RANGE.code(), 100L, 0));
        this.subscriptions.seek(this.tp, 1L);
        this.consumerClient.poll(0L);
        Assert.assertEquals((long)0L, (long)this.fetcher.fetchedRecords().size());
        Assert.assertFalse((boolean)this.subscriptions.isOffsetResetNeeded(this.tp));
        Assert.assertEquals((long)1L, (long)this.subscriptions.position(this.tp));
    }

    @Test
    public void testFetchedRecordsAfterSeek() {
        this.subscriptionsNoAutoReset.assignFromUser(Collections.singleton(this.tp));
        this.subscriptionsNoAutoReset.seek(this.tp, 0L);
        Assert.assertTrue((this.fetcherNoAutoReset.sendFetches() > 0 ? 1 : 0) != 0);
        this.client.prepareResponse((AbstractResponse)this.fetchResponse(this.records, Errors.OFFSET_OUT_OF_RANGE.code(), 100L, 0));
        this.consumerClient.poll(0L);
        Assert.assertFalse((boolean)this.subscriptionsNoAutoReset.isOffsetResetNeeded(this.tp));
        this.subscriptionsNoAutoReset.seek(this.tp, 2L);
        Assert.assertEquals((long)0L, (long)this.fetcherNoAutoReset.fetchedRecords().size());
    }

    @Test
    public void testFetchOffsetOutOfRangeException() {
        this.subscriptionsNoAutoReset.assignFromUser(Collections.singleton(this.tp));
        this.subscriptionsNoAutoReset.seek(this.tp, 0L);
        this.fetcherNoAutoReset.sendFetches();
        this.client.prepareResponse((AbstractResponse)this.fetchResponse(this.records, Errors.OFFSET_OUT_OF_RANGE.code(), 100L, 0));
        this.consumerClient.poll(0L);
        Assert.assertFalse((boolean)this.subscriptionsNoAutoReset.isOffsetResetNeeded(this.tp));
        try {
            this.fetcherNoAutoReset.fetchedRecords();
            Assert.fail((String)"Should have thrown OffsetOutOfRangeException");
        }
        catch (OffsetOutOfRangeException e) {
            Assert.assertTrue((boolean)e.offsetOutOfRangePartitions().containsKey(this.tp));
            Assert.assertEquals((long)e.offsetOutOfRangePartitions().size(), (long)1L);
        }
        Assert.assertEquals((long)0L, (long)this.fetcherNoAutoReset.fetchedRecords().size());
    }

    @Test
    public void testFetchDisconnected() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp));
        this.subscriptions.seek(this.tp, 0L);
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        this.client.prepareResponse((AbstractResponse)this.fetchResponse(this.records, Errors.NONE.code(), 100L, 0), true);
        this.consumerClient.poll(0L);
        Assert.assertEquals((long)0L, (long)this.fetcher.fetchedRecords().size());
        Assert.assertFalse((boolean)this.subscriptions.isOffsetResetNeeded(this.tp));
        Assert.assertTrue((boolean)this.subscriptions.isFetchable(this.tp));
        Assert.assertEquals((long)0L, (long)this.subscriptions.position(this.tp));
    }

    @Test
    public void testUpdateFetchPositionToCommitted() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp));
        this.subscriptions.committed(this.tp, new OffsetAndMetadata(5L));
        this.fetcher.updateFetchPositions(Collections.singleton(this.tp));
        Assert.assertTrue((boolean)this.subscriptions.isFetchable(this.tp));
        Assert.assertEquals((long)5L, (long)this.subscriptions.position(this.tp));
    }

    @Test
    public void testUpdateFetchPositionResetToDefaultOffset() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp));
        this.client.prepareResponse(this.listOffsetRequestMatcher(-2L), (AbstractResponse)this.listOffsetResponse(Errors.NONE, 1L, 5L));
        this.fetcher.updateFetchPositions(Collections.singleton(this.tp));
        Assert.assertFalse((boolean)this.subscriptions.isOffsetResetNeeded(this.tp));
        Assert.assertTrue((boolean)this.subscriptions.isFetchable(this.tp));
        Assert.assertEquals((long)5L, (long)this.subscriptions.position(this.tp));
    }

    @Test
    public void testUpdateFetchPositionResetToLatestOffset() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp));
        this.subscriptions.needOffsetReset(this.tp, OffsetResetStrategy.LATEST);
        this.client.prepareResponse(this.listOffsetRequestMatcher(-1L), (AbstractResponse)this.listOffsetResponse(Errors.NONE, 1L, 5L));
        this.fetcher.updateFetchPositions(Collections.singleton(this.tp));
        Assert.assertFalse((boolean)this.subscriptions.isOffsetResetNeeded(this.tp));
        Assert.assertTrue((boolean)this.subscriptions.isFetchable(this.tp));
        Assert.assertEquals((long)5L, (long)this.subscriptions.position(this.tp));
    }

    @Test
    public void testUpdateFetchPositionResetToEarliestOffset() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp));
        this.subscriptions.needOffsetReset(this.tp, OffsetResetStrategy.EARLIEST);
        this.client.prepareResponse(this.listOffsetRequestMatcher(-2L), (AbstractResponse)this.listOffsetResponse(Errors.NONE, 1L, 5L));
        this.fetcher.updateFetchPositions(Collections.singleton(this.tp));
        Assert.assertFalse((boolean)this.subscriptions.isOffsetResetNeeded(this.tp));
        Assert.assertTrue((boolean)this.subscriptions.isFetchable(this.tp));
        Assert.assertEquals((long)5L, (long)this.subscriptions.position(this.tp));
    }

    @Test
    public void testUpdateFetchPositionDisconnect() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp));
        this.subscriptions.needOffsetReset(this.tp, OffsetResetStrategy.LATEST);
        this.client.prepareResponse(this.listOffsetRequestMatcher(-1L), (AbstractResponse)this.listOffsetResponse(Errors.NONE, 1L, 5L), true);
        this.client.prepareResponse(this.listOffsetRequestMatcher(-1L), (AbstractResponse)this.listOffsetResponse(Errors.NONE, 1L, 5L));
        this.fetcher.updateFetchPositions(Collections.singleton(this.tp));
        Assert.assertFalse((boolean)this.subscriptions.isOffsetResetNeeded(this.tp));
        Assert.assertTrue((boolean)this.subscriptions.isFetchable(this.tp));
        Assert.assertEquals((long)5L, (long)this.subscriptions.position(this.tp));
    }

    @Test
    public void testUpdateFetchPositionOfPausedPartitionsRequiringOffsetReset() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp));
        this.subscriptions.committed(this.tp, new OffsetAndMetadata(0L));
        this.subscriptions.pause(this.tp);
        this.subscriptions.needOffsetReset(this.tp, OffsetResetStrategy.LATEST);
        this.client.prepareResponse(this.listOffsetRequestMatcher(-1L), (AbstractResponse)this.listOffsetResponse(Errors.NONE, 1L, 10L));
        this.fetcher.updateFetchPositions(Collections.singleton(this.tp));
        Assert.assertFalse((boolean)this.subscriptions.isOffsetResetNeeded(this.tp));
        Assert.assertFalse((boolean)this.subscriptions.isFetchable(this.tp));
        Assert.assertTrue((boolean)this.subscriptions.hasValidPosition(this.tp));
        Assert.assertEquals((long)10L, (long)this.subscriptions.position(this.tp));
    }

    @Test
    public void testUpdateFetchPositionOfPausedPartitionsWithoutACommittedOffset() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp));
        this.subscriptions.pause(this.tp);
        this.client.prepareResponse(this.listOffsetRequestMatcher(-2L), (AbstractResponse)this.listOffsetResponse(Errors.NONE, 1L, 0L));
        this.fetcher.updateFetchPositions(Collections.singleton(this.tp));
        Assert.assertFalse((boolean)this.subscriptions.isOffsetResetNeeded(this.tp));
        Assert.assertFalse((boolean)this.subscriptions.isFetchable(this.tp));
        Assert.assertTrue((boolean)this.subscriptions.hasValidPosition(this.tp));
        Assert.assertEquals((long)0L, (long)this.subscriptions.position(this.tp));
    }

    @Test
    public void testUpdateFetchPositionOfPausedPartitionsWithoutAValidPosition() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp));
        this.subscriptions.committed(this.tp, new OffsetAndMetadata(0L));
        this.subscriptions.pause(this.tp);
        this.subscriptions.seek(this.tp, 10L);
        this.fetcher.updateFetchPositions(Collections.singleton(this.tp));
        Assert.assertFalse((boolean)this.subscriptions.isOffsetResetNeeded(this.tp));
        Assert.assertFalse((boolean)this.subscriptions.isFetchable(this.tp));
        Assert.assertTrue((boolean)this.subscriptions.hasValidPosition(this.tp));
        Assert.assertEquals((long)10L, (long)this.subscriptions.position(this.tp));
    }

    @Test
    public void testUpdateFetchPositionOfPausedPartitionsWithAValidPosition() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp));
        this.subscriptions.committed(this.tp, new OffsetAndMetadata(0L));
        this.subscriptions.seek(this.tp, 10L);
        this.subscriptions.pause(this.tp);
        this.fetcher.updateFetchPositions(Collections.singleton(this.tp));
        Assert.assertFalse((boolean)this.subscriptions.isOffsetResetNeeded(this.tp));
        Assert.assertFalse((boolean)this.subscriptions.isFetchable(this.tp));
        Assert.assertTrue((boolean)this.subscriptions.hasValidPosition(this.tp));
        Assert.assertEquals((long)10L, (long)this.subscriptions.position(this.tp));
    }

    @Test
    public void testGetAllTopics() {
        this.client.prepareResponse((AbstractResponse)this.newMetadataResponse(this.topicName, Errors.NONE));
        Map allTopics = this.fetcher.getAllTopicMetadata(5000L);
        Assert.assertEquals((long)this.cluster.topics().size(), (long)allTopics.size());
    }

    @Test
    public void testGetAllTopicsDisconnect() {
        this.client.prepareResponse(null, true);
        this.client.prepareResponse((AbstractResponse)this.newMetadataResponse(this.topicName, Errors.NONE));
        Map allTopics = this.fetcher.getAllTopicMetadata(5000L);
        Assert.assertEquals((long)this.cluster.topics().size(), (long)allTopics.size());
    }

    @Test(expected=TimeoutException.class)
    public void testGetAllTopicsTimeout() {
        this.fetcher.getAllTopicMetadata(50L);
    }

    @Test
    public void testGetAllTopicsUnauthorized() {
        this.client.prepareResponse((AbstractResponse)this.newMetadataResponse(this.topicName, Errors.TOPIC_AUTHORIZATION_FAILED));
        try {
            this.fetcher.getAllTopicMetadata(10L);
            Assert.fail();
        }
        catch (TopicAuthorizationException e) {
            Assert.assertEquals(Collections.singleton(this.topicName), (Object)e.unauthorizedTopics());
        }
    }

    @Test(expected=InvalidTopicException.class)
    public void testGetTopicMetadataInvalidTopic() {
        this.client.prepareResponse((AbstractResponse)this.newMetadataResponse(this.topicName, Errors.INVALID_TOPIC_EXCEPTION));
        this.fetcher.getTopicMetadata(new MetadataRequest.Builder(Collections.singletonList(this.topicName)), 5000L);
    }

    @Test
    public void testGetTopicMetadataUnknownTopic() {
        this.client.prepareResponse((AbstractResponse)this.newMetadataResponse(this.topicName, Errors.UNKNOWN_TOPIC_OR_PARTITION));
        Map topicMetadata = this.fetcher.getTopicMetadata(new MetadataRequest.Builder(Collections.singletonList(this.topicName)), 5000L);
        Assert.assertNull(topicMetadata.get(this.topicName));
    }

    @Test
    public void testGetTopicMetadataLeaderNotAvailable() {
        this.client.prepareResponse((AbstractResponse)this.newMetadataResponse(this.topicName, Errors.LEADER_NOT_AVAILABLE));
        this.client.prepareResponse((AbstractResponse)this.newMetadataResponse(this.topicName, Errors.NONE));
        Map topicMetadata = this.fetcher.getTopicMetadata(new MetadataRequest.Builder(Collections.singletonList(this.topicName)), 5000L);
        Assert.assertTrue((boolean)topicMetadata.containsKey(this.topicName));
    }

    @Test
    public void testQuotaMetrics() throws Exception {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp));
        this.subscriptions.seek(this.tp, 0L);
        for (int i = 1; i < 4; ++i) {
            MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)ByteBuffer.allocate(1024), (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME);
            for (int v = 0; v < 3; ++v) {
                builder.appendWithOffset((long)i * 3L + (long)v, -1L, "key".getBytes(), String.format("value-%d", v).getBytes());
            }
            List<ConsumerRecord<byte[], byte[]>> records = this.fetchRecords(builder.build(), Errors.NONE.code(), 100L, 100 * i).get(this.tp);
            Assert.assertEquals((long)3L, (long)records.size());
        }
        Map allMetrics = this.metrics.metrics();
        KafkaMetric avgMetric = (KafkaMetric)allMetrics.get(this.metrics.metricName("fetch-throttle-time-avg", this.metricGroup, ""));
        KafkaMetric maxMetric = (KafkaMetric)allMetrics.get(this.metrics.metricName("fetch-throttle-time-max", this.metricGroup, ""));
        Assert.assertEquals((double)200.0, (double)avgMetric.value(), (double)1.0E-4);
        Assert.assertEquals((double)300.0, (double)maxMetric.value(), (double)1.0E-4);
    }

    @Test
    public void testFetcherMetrics() {
        this.subscriptions.assignFromUser(Collections.singleton(this.tp));
        this.subscriptions.seek(this.tp, 0L);
        MetricName maxLagMetric = this.metrics.metricName("records-lag-max", this.metricGroup, "");
        MetricName partitionLagMetric = this.metrics.metricName(this.tp + ".records-lag", this.metricGroup, "");
        Map allMetrics = this.metrics.metrics();
        KafkaMetric recordsFetchLagMax = (KafkaMetric)allMetrics.get(maxLagMetric);
        Assert.assertEquals((double)Double.NEGATIVE_INFINITY, (double)recordsFetchLagMax.value(), (double)1.0E-4);
        this.fetchRecords(MemoryRecords.EMPTY, Errors.NONE.code(), 100L, 0);
        Assert.assertEquals((double)100.0, (double)recordsFetchLagMax.value(), (double)1.0E-4);
        KafkaMetric partitionLag = (KafkaMetric)allMetrics.get(partitionLagMetric);
        Assert.assertEquals((double)100.0, (double)partitionLag.value(), (double)1.0E-4);
        MemoryRecordsBuilder builder = MemoryRecords.builder((ByteBuffer)ByteBuffer.allocate(1024), (CompressionType)CompressionType.NONE, (TimestampType)TimestampType.CREATE_TIME);
        for (int v = 0; v < 3; ++v) {
            builder.appendWithOffset((long)v, -1L, "key".getBytes(), String.format("value-%d", v).getBytes());
        }
        this.fetchRecords(builder.build(), Errors.NONE.code(), 200L, 0);
        Assert.assertEquals((double)197.0, (double)recordsFetchLagMax.value(), (double)1.0E-4);
        this.subscriptions.unsubscribe();
        Assert.assertFalse((boolean)allMetrics.containsKey(partitionLagMetric));
    }

    private Map<TopicPartition, List<ConsumerRecord<byte[], byte[]>>> fetchRecords(MemoryRecords records, short error, long hw, int throttleTime) {
        Assert.assertEquals((long)1L, (long)this.fetcher.sendFetches());
        this.client.prepareResponse((AbstractResponse)this.fetchResponse(records, error, hw, throttleTime));
        this.consumerClient.poll(0L);
        return this.fetcher.fetchedRecords();
    }

    @Test
    public void testGetOffsetsForTimesTimeout() {
        try {
            this.fetcher.getOffsetsByTimes(Collections.singletonMap(new TopicPartition(this.topicName, 2), 1000L), 100L);
            Assert.fail((String)"Should throw timeout exception.");
        }
        catch (TimeoutException timeoutException) {
            // empty catch block
        }
    }

    @Test
    public void testGetOffsetsForTimes() {
        Assert.assertTrue((boolean)this.fetcher.getOffsetsByTimes(new HashMap(), 100L).isEmpty());
        this.testGetOffsetsForTimesWithError(Errors.NONE, Errors.NONE, -1L, 100L, null, 100L);
        this.testGetOffsetsForTimesWithError(Errors.NONE, Errors.NONE, 10L, 100L, 10L, 100L);
        this.testGetOffsetsForTimesWithError(Errors.NOT_LEADER_FOR_PARTITION, Errors.INVALID_REQUEST, 10L, 100L, 10L, 100L);
        this.testGetOffsetsForTimesWithError(Errors.NONE, Errors.NOT_LEADER_FOR_PARTITION, 10L, 100L, 10L, 100L);
        this.testGetOffsetsForTimesWithError(Errors.NOT_LEADER_FOR_PARTITION, Errors.NONE, 10L, 100L, 10L, 100L);
        this.testGetOffsetsForTimesWithError(Errors.UNKNOWN_TOPIC_OR_PARTITION, Errors.NONE, 10L, 100L, 10L, 100L);
        this.testGetOffsetsForTimesWithError(Errors.UNSUPPORTED_FOR_MESSAGE_FORMAT, Errors.NONE, 10L, 100L, null, 100L);
        this.testGetOffsetsForTimesWithError(Errors.BROKER_NOT_AVAILABLE, Errors.NONE, 10L, 100L, 10L, 100L);
    }

    private void testGetOffsetsForTimesWithError(Errors errorForTp0, Errors errorForTp1, long offsetForTp0, long offsetForTp1, Long expectedOffsetForTp0, Long expectedOffsetForTp1) {
        this.client.reset();
        TopicPartition tp0 = this.tp;
        TopicPartition tp1 = new TopicPartition(this.topicName, 1);
        Cluster cluster = TestUtils.clusterWith(2, this.topicName, 2);
        this.metadata.update(cluster, this.time.milliseconds());
        this.client.prepareResponseFrom((AbstractResponse)this.listOffsetResponse(tp0, errorForTp0, offsetForTp0, offsetForTp0), cluster.leaderFor(tp0));
        this.client.prepareResponseFrom((AbstractResponse)this.listOffsetResponse(tp1, errorForTp1, offsetForTp1, offsetForTp1), cluster.leaderFor(tp1));
        this.client.prepareResponseFrom((AbstractResponse)this.listOffsetResponse(tp0, Errors.NONE, offsetForTp0, offsetForTp0), cluster.leaderFor(tp0));
        this.client.prepareResponseFrom((AbstractResponse)this.listOffsetResponse(tp1, Errors.NONE, offsetForTp1, offsetForTp1), cluster.leaderFor(tp1));
        HashMap<TopicPartition, Long> timestampToSearch = new HashMap<TopicPartition, Long>();
        timestampToSearch.put(tp0, 0L);
        timestampToSearch.put(tp1, 0L);
        Map offsetAndTimestampMap = this.fetcher.getOffsetsByTimes(timestampToSearch, Long.MAX_VALUE);
        if (expectedOffsetForTp0 == null) {
            Assert.assertNull(offsetAndTimestampMap.get(tp0));
        } else {
            Assert.assertEquals((long)expectedOffsetForTp0, (long)((OffsetAndTimestamp)offsetAndTimestampMap.get(tp0)).timestamp());
            Assert.assertEquals((long)expectedOffsetForTp0, (long)((OffsetAndTimestamp)offsetAndTimestampMap.get(tp0)).offset());
        }
        if (expectedOffsetForTp1 == null) {
            Assert.assertNull(offsetAndTimestampMap.get(tp1));
        } else {
            Assert.assertEquals((long)expectedOffsetForTp1, (long)((OffsetAndTimestamp)offsetAndTimestampMap.get(tp1)).timestamp());
            Assert.assertEquals((long)expectedOffsetForTp1, (long)((OffsetAndTimestamp)offsetAndTimestampMap.get(tp1)).offset());
        }
    }

    private MockClient.RequestMatcher listOffsetRequestMatcher(final long timestamp) {
        return new MockClient.RequestMatcher(){

            @Override
            public boolean matches(AbstractRequest body) {
                ListOffsetRequest req = (ListOffsetRequest)body;
                return timestamp == (Long)req.partitionTimestamps().get(FetcherTest.this.tp);
            }
        };
    }

    private ListOffsetResponse listOffsetResponse(Errors error, long timestamp, long offset) {
        return this.listOffsetResponse(this.tp, error, timestamp, offset);
    }

    private ListOffsetResponse listOffsetResponse(TopicPartition tp, Errors error, long timestamp, long offset) {
        ListOffsetResponse.PartitionData partitionData = new ListOffsetResponse.PartitionData(error.code(), timestamp, offset);
        HashMap<TopicPartition, ListOffsetResponse.PartitionData> allPartitionData = new HashMap<TopicPartition, ListOffsetResponse.PartitionData>();
        allPartitionData.put(tp, partitionData);
        return new ListOffsetResponse(allPartitionData, 1);
    }

    private FetchResponse fetchResponse(MemoryRecords records, short error, long hw, int throttleTime) {
        return new FetchResponse(new LinkedHashMap<TopicPartition, FetchResponse.PartitionData>(Collections.singletonMap(this.tp, new FetchResponse.PartitionData(error, hw, (Records)records))), throttleTime);
    }

    private MetadataResponse newMetadataResponse(String topic, Errors error) {
        ArrayList<MetadataResponse.PartitionMetadata> partitionsMetadata = new ArrayList<MetadataResponse.PartitionMetadata>();
        if (error == Errors.NONE) {
            for (PartitionInfo partitionInfo : this.cluster.partitionsForTopic(topic)) {
                partitionsMetadata.add(new MetadataResponse.PartitionMetadata(Errors.NONE, partitionInfo.partition(), partitionInfo.leader(), Arrays.asList(partitionInfo.replicas()), Arrays.asList(partitionInfo.inSyncReplicas())));
            }
        }
        MetadataResponse.TopicMetadata topicMetadata = new MetadataResponse.TopicMetadata(error, topic, false, partitionsMetadata);
        return new MetadataResponse(this.cluster.nodes(), null, -1, Arrays.asList(topicMetadata));
    }

    private Fetcher<byte[], byte[]> createFetcher(SubscriptionState subscriptions, Metrics metrics, int maxPollRecords) {
        return this.createFetcher(subscriptions, metrics, (Deserializer)new ByteArrayDeserializer(), (Deserializer)new ByteArrayDeserializer(), maxPollRecords);
    }

    private Fetcher<byte[], byte[]> createFetcher(SubscriptionState subscriptions, Metrics metrics) {
        return this.createFetcher(subscriptions, metrics, Integer.MAX_VALUE);
    }

    private <K, V> Fetcher<K, V> createFetcher(SubscriptionState subscriptions, Metrics metrics, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer) {
        return this.createFetcher(subscriptions, metrics, keyDeserializer, valueDeserializer, Integer.MAX_VALUE);
    }

    private <K, V> Fetcher<K, V> createFetcher(SubscriptionState subscriptions, Metrics metrics, Deserializer<K> keyDeserializer, Deserializer<V> valueDeserializer, int maxPollRecords) {
        return new Fetcher(this.consumerClient, this.minBytes, this.maxBytes, this.maxWaitMs, this.fetchSize, maxPollRecords, true, keyDeserializer, valueDeserializer, this.metadata, subscriptions, metrics, "consumer" + this.groupId, (Time)this.time, this.retryBackoffMs);
    }
}

