/*
 * Decompiled with CFR 0.152.
 */
package io.aeron.samples;

import io.aeron.Aeron;
import io.aeron.Publication;
import io.aeron.Subscription;
import io.aeron.driver.MediaDriver;
import io.aeron.logbuffer.FragmentHandler;
import io.aeron.samples.RateReporter;
import io.aeron.samples.SampleConfiguration;
import io.aeron.samples.SamplesUtil;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.agrona.BufferUtil;
import org.agrona.concurrent.BusySpinIdleStrategy;
import org.agrona.concurrent.UnsafeBuffer;
import org.agrona.console.ContinueBarrier;

public class EmbeddedThroughput {
    private static final long NUMBER_OF_MESSAGES = SampleConfiguration.NUMBER_OF_MESSAGES;
    private static final long LINGER_TIMEOUT_MS = SampleConfiguration.LINGER_TIMEOUT_MS;
    private static final int STREAM_ID = SampleConfiguration.STREAM_ID;
    private static final int MESSAGE_LENGTH = SampleConfiguration.MESSAGE_LENGTH;
    private static final int FRAGMENT_COUNT_LIMIT = SampleConfiguration.FRAGMENT_COUNT_LIMIT;
    private static final String CHANNEL = SampleConfiguration.CHANNEL;
    private static final UnsafeBuffer ATOMIC_BUFFER = new UnsafeBuffer(BufferUtil.allocateDirectAligned(MESSAGE_LENGTH, 64));
    private static final BusySpinIdleStrategy OFFER_IDLE_STRATEGY = new BusySpinIdleStrategy();
    private static volatile boolean printingActive = true;

    public static void main(String[] args) throws Exception {
        MediaDriver.loadPropertiesFiles(args);
        RateReporter reporter = new RateReporter(TimeUnit.SECONDS.toNanos(1L), EmbeddedThroughput::printRate);
        FragmentHandler rateReporterHandler = SamplesUtil.rateReporterHandler(reporter);
        ExecutorService executor = Executors.newFixedThreadPool(2);
        Aeron.Context context = new Aeron.Context();
        AtomicBoolean running = new AtomicBoolean(true);
        try (MediaDriver ignore = MediaDriver.launch();
             Aeron aeron = Aeron.connect(context);
             Publication publication = aeron.addPublication(CHANNEL, STREAM_ID);
             Subscription subscription = aeron.addSubscription(CHANNEL, STREAM_ID);){
            executor.execute(reporter);
            executor.execute(() -> SamplesUtil.subscriberLoop(rateReporterHandler, FRAGMENT_COUNT_LIMIT, running).accept(subscription));
            ContinueBarrier barrier = new ContinueBarrier("Execute again?");
            do {
                System.out.format("%nStreaming %,d messages of size %d bytes to %s on stream Id %d%n", NUMBER_OF_MESSAGES, MESSAGE_LENGTH, CHANNEL, STREAM_ID);
                printingActive = true;
                long backPressureCount = 0L;
                for (long i = 0L; i < NUMBER_OF_MESSAGES; ++i) {
                    ATOMIC_BUFFER.putLong(0, i);
                    OFFER_IDLE_STRATEGY.reset();
                    while (publication.offer(ATOMIC_BUFFER, 0, ATOMIC_BUFFER.capacity()) < 0L) {
                        OFFER_IDLE_STRATEGY.idle();
                        ++backPressureCount;
                    }
                }
                System.out.println("Done streaming. backPressureRatio=" + (double)backPressureCount / (double)NUMBER_OF_MESSAGES);
                if (0L < LINGER_TIMEOUT_MS) {
                    System.out.println("Lingering for " + LINGER_TIMEOUT_MS + " milliseconds...");
                    Thread.sleep(LINGER_TIMEOUT_MS);
                }
                printingActive = false;
            } while (barrier.await());
            running.set(false);
            reporter.halt();
            executor.shutdown();
        }
    }

    public static void printRate(double messagesPerSec, double bytesPerSec, long totalFragments, long totalBytes) {
        if (printingActive) {
            System.out.format("%.02g msgs/sec, %.02g bytes/sec, totals %d messages %d MB%n", messagesPerSec, bytesPerSec, totalFragments, totalBytes / 0x100000L);
        }
    }
}

