/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.ext.mongo.impl;

import io.vertx.core.Context;
import io.vertx.core.Handler;
import io.vertx.core.internal.ContextInternal;
import io.vertx.core.internal.EventExecutor;
import io.vertx.core.internal.concurrent.InboundMessageQueue;
import io.vertx.core.streams.ReadStream;
import java.util.Objects;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;

public class PublisherAdapter<T>
implements ReadStream<T> {
    private final ContextInternal context;
    private final Publisher<T> publisher;
    private final int batchSize;
    private Handler<T> handler;
    private Handler<Throwable> exceptionHandler;
    private Handler<Void> endHandler;
    private Subscriber subscriber;
    private long demand;
    private final Lock lock = new ReentrantLock();
    private final EventExecutor syncExec = new EventExecutor(){

        public boolean inThread() {
            return true;
        }

        public void execute(Runnable command) {
            PublisherAdapter.this.lock.lock();
            try {
                command.run();
            }
            finally {
                PublisherAdapter.this.lock.unlock();
            }
        }
    };
    private static final Object END = new Object();

    public PublisherAdapter(Context context, Publisher<T> publisher, int batchSize) {
        Objects.requireNonNull(context, "context is null");
        Objects.requireNonNull(publisher, "publisher is null");
        this.context = (ContextInternal)context;
        this.publisher = publisher;
        this.batchSize = batchSize;
        this.demand = Long.MAX_VALUE;
    }

    public synchronized ReadStream<T> exceptionHandler(Handler<Throwable> handler) {
        this.exceptionHandler = handler;
        return this;
    }

    public synchronized ReadStream<T> endHandler(Handler<Void> handler) {
        this.endHandler = handler;
        return this;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public ReadStream<T> handler(Handler<T> h) {
        if (h == null) {
            Subscriber s;
            PublisherAdapter publisherAdapter = this;
            synchronized (publisherAdapter) {
                this.handler = h;
                s = this.subscriber;
                this.subscriber = null;
                this.demand = Long.MAX_VALUE;
            }
            if (s != null) {
                s.cancel();
            }
        } else {
            Subscriber s;
            PublisherAdapter publisherAdapter = this;
            synchronized (publisherAdapter) {
                this.handler = h;
                s = this.subscriber;
                if (s != null) {
                    return this;
                }
                this.subscriber = s = new Subscriber();
                long d = this.demand;
                if (d > 0L) {
                    s.fetch(d);
                } else {
                    s.pause();
                }
            }
            this.publisher.subscribe((org.reactivestreams.Subscriber)s);
        }
        return this;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public ReadStream<T> pause() {
        Subscriber s;
        PublisherAdapter publisherAdapter = this;
        synchronized (publisherAdapter) {
            this.demand = 0L;
            s = this.subscriber;
        }
        if (s != null) {
            s.pause();
        }
        return this;
    }

    public ReadStream<T> resume() {
        return this.fetch(Long.MAX_VALUE);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public synchronized ReadStream<T> fetch(long amount) {
        Subscriber s;
        long d;
        if (amount < 0L) {
            throw new IllegalArgumentException();
        }
        if (amount == 0L) {
            return this;
        }
        PublisherAdapter publisherAdapter = this;
        synchronized (publisherAdapter) {
            this.demand += amount;
            if (this.demand < 0L) {
                this.demand = Long.MAX_VALUE;
            }
            d = this.demand;
            s = this.subscriber;
        }
        if (s != null) {
            s.fetch(d);
        }
        return this;
    }

    private class Subscriber
    extends InboundMessageQueue<Object>
    implements org.reactivestreams.Subscriber<T> {
        private Subscription subscription;
        private boolean paused;
        private int inflight;

        public Subscriber() {
            super(PublisherAdapter.this.syncExec, PublisherAdapter.this.context.executor());
            this.inflight = PublisherAdapter.this.batchSize;
        }

        protected void handleResume() {
            this.paused = false;
            if (this.inflight == 0) {
                this.inflight += PublisherAdapter.this.batchSize;
                this.subscription.request((long)PublisherAdapter.this.batchSize);
            }
        }

        protected void handlePause() {
            this.paused = true;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        protected void handleMessage(Object msg) {
            Object handler;
            PublisherAdapter publisherAdapter = PublisherAdapter.this;
            synchronized (publisherAdapter) {
                if (msg == END) {
                    msg = null;
                    handler = PublisherAdapter.this.endHandler;
                } else {
                    handler = msg instanceof Throwable ? PublisherAdapter.this.exceptionHandler : PublisherAdapter.this.handler;
                }
            }
            if (handler != null) {
                PublisherAdapter.this.context.dispatch(msg, handler);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void onSubscribe(Subscription subscription) {
            PublisherAdapter publisherAdapter = PublisherAdapter.this;
            synchronized (publisherAdapter) {
                this.subscription = subscription;
            }
            subscription.request((long)PublisherAdapter.this.batchSize);
        }

        void cancel() {
            this.subscription.cancel();
        }

        public void onNext(T t) {
            PublisherAdapter.this.lock.lock();
            --this.inflight;
            try {
                this.write(t);
                if (this.inflight == 0 && !this.paused) {
                    this.inflight += PublisherAdapter.this.batchSize;
                    this.subscription.request((long)PublisherAdapter.this.batchSize);
                }
            }
            finally {
                PublisherAdapter.this.lock.unlock();
            }
        }

        public void onError(Throwable t) {
            PublisherAdapter.this.lock.lock();
            try {
                this.write(t);
            }
            finally {
                PublisherAdapter.this.lock.unlock();
            }
        }

        public void onComplete() {
            PublisherAdapter.this.lock.lock();
            try {
                this.write(END);
            }
            finally {
                PublisherAdapter.this.lock.unlock();
            }
        }
    }
}

