/*
 * Decompiled with CFR 0.152.
 */
package org.jclouds.s3.blobstore.strategy.internal;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.ListeningExecutorService;
import com.google.inject.Inject;
import java.util.HashMap;
import java.util.Map;
import java.util.Queue;
import java.util.SortedMap;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.CancellationException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.annotation.Resource;
import javax.inject.Named;
import org.jclouds.blobstore.domain.Blob;
import org.jclouds.blobstore.internal.BlobRuntimeException;
import org.jclouds.blobstore.options.PutOptions;
import org.jclouds.io.Payload;
import org.jclouds.io.PayloadSlicer;
import org.jclouds.logging.Logger;
import org.jclouds.s3.S3Client;
import org.jclouds.s3.blobstore.S3BlobStore;
import org.jclouds.s3.blobstore.strategy.AsyncMultipartUploadStrategy;
import org.jclouds.s3.blobstore.strategy.internal.MultipartUploadSlicingAlgorithm;
import org.jclouds.s3.domain.ObjectMetadataBuilder;
import org.jclouds.s3.options.PutObjectOptions;
import org.jclouds.util.Throwables2;

public class ParallelMultipartUploadStrategy
implements AsyncMultipartUploadStrategy {
    @Resource
    @Named(value="jclouds.blobstore")
    protected Logger logger = Logger.NULL;
    @VisibleForTesting
    static final int DEFAULT_PARALLEL_DEGREE = 4;
    @VisibleForTesting
    static final int DEFAULT_MIN_RETRIES = 5;
    @VisibleForTesting
    static final int DEFAULT_MAX_PERCENT_RETRIES = 10;
    private final ListeningExecutorService executor;
    @Inject(optional=true)
    @Named(value="jclouds.mpu.parallel.degree")
    @VisibleForTesting
    int parallelDegree = 4;
    @Inject(optional=true)
    @Named(value="jclouds.mpu.parallel.retries.min")
    @VisibleForTesting
    int minRetries = 5;
    @Inject(optional=true)
    @Named(value="jclouds.mpu.parallel.retries.maxpercent")
    @VisibleForTesting
    int maxPercentRetries = 10;
    @Inject(optional=true)
    @Named(value="jclouds.request-timeout")
    protected Long maxTime;
    protected final S3BlobStore blobstore;
    protected final PayloadSlicer slicer;

    @Inject
    public ParallelMultipartUploadStrategy(S3BlobStore blobstore, PayloadSlicer slicer, @Named(value="jclouds.user-threads") ListeningExecutorService executor) {
        this.blobstore = (S3BlobStore)((Object)Preconditions.checkNotNull((Object)((Object)blobstore), (Object)"blobstore"));
        this.slicer = (PayloadSlicer)Preconditions.checkNotNull((Object)slicer, (Object)"slicer");
        this.executor = (ListeningExecutorService)Preconditions.checkNotNull((Object)executor, (Object)"executor");
    }

    protected void prepareUploadPart(final String container, final String key, final String uploadId, final Integer part, Payload payload, final long offset, final long size, final SortedMap<Integer, String> etags, final BlockingQueue<Integer> activeParts, final Map<Integer, ListenableFuture<String>> futureParts, final AtomicInteger errors, final int maxRetries, final Map<Integer, Exception> errorMap, final Queue<Part> toRetry, final CountDownLatch latch) {
        if (errors.get() > maxRetries) {
            activeParts.remove(part);
            latch.countDown();
            return;
        }
        final S3Client client = (S3Client)this.blobstore.getContext().unwrapApi(S3Client.class);
        final Payload chunkedPart = this.slicer.slice(payload, offset, size);
        this.logger.debug(String.format("async uploading part %s of %s to container %s with uploadId %s", part, key, container, uploadId), new Object[0]);
        final long start = System.currentTimeMillis();
        final ListenableFuture futureETag = this.executor.submit((Callable)new Callable<String>(){

            @Override
            public String call() throws Exception {
                return client.uploadPart(container, key, part, uploadId, chunkedPart);
            }
        });
        futureETag.addListener(new Runnable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                try {
                    etags.put(part, futureETag.get());
                    ParallelMultipartUploadStrategy.this.logger.debug(String.format("async uploaded part %s of %s to container %s in %sms with uploadId %s", part, key, container, System.currentTimeMillis() - start, uploadId), new Object[0]);
                }
                catch (CancellationException e) {
                    errorMap.put(part, e);
                    String message = String.format("%s while uploading part %s - [%s,%s] to container %s with uploadId: %s running since %dms", e.getMessage(), part, offset, size, container, uploadId, System.currentTimeMillis() - start);
                    ParallelMultipartUploadStrategy.this.logger.debug(message, new Object[0]);
                }
                catch (Exception e) {
                    errorMap.put(part, e);
                    String message = String.format("%s while uploading part %s - [%s,%s] to container %s with uploadId: %s running since %dms", e.getMessage(), part, offset, size, container, uploadId, System.currentTimeMillis() - start);
                    ParallelMultipartUploadStrategy.this.logger.error(message, new Object[]{e});
                    if (errors.incrementAndGet() <= maxRetries) {
                        toRetry.add(new Part(part, offset, size));
                    }
                }
                finally {
                    activeParts.remove(part);
                    futureParts.remove(part);
                    latch.countDown();
                }
            }
        }, (Executor)this.executor);
        futureParts.put(part, (ListenableFuture<String>)futureETag);
    }

    @Override
    public ListenableFuture<String> execute(final String container, final Blob blob, PutOptions options) {
        return this.executor.submit((Callable)new Callable<String>(){

            @Override
            public String call() throws Exception {
                String key = blob.getMetadata().getName();
                Payload payload = blob.getPayload();
                MultipartUploadSlicingAlgorithm algorithm = new MultipartUploadSlicingAlgorithm();
                algorithm.calculateChunkSize(payload.getContentMetadata().getContentLength());
                int parts = algorithm.getParts();
                long chunkSize = algorithm.getChunkSize();
                long remaining = algorithm.getRemaining();
                if (parts > 0) {
                    S3Client client = (S3Client)ParallelMultipartUploadStrategy.this.blobstore.getContext().unwrapApi(S3Client.class);
                    String uploadId = null;
                    ConcurrentHashMap<Integer, ListenableFuture<String>> futureParts = new ConcurrentHashMap<Integer, ListenableFuture<String>>();
                    HashMap errorMap = Maps.newHashMap();
                    AtomicInteger errors = new AtomicInteger(0);
                    int maxRetries = Math.max(ParallelMultipartUploadStrategy.this.minRetries, parts * ParallelMultipartUploadStrategy.this.maxPercentRetries / 100);
                    int effectiveParts = remaining > 0L ? parts + 1 : parts;
                    try {
                        Integer partKey;
                        int part;
                        uploadId = client.initiateMultipartUpload(container, ObjectMetadataBuilder.create().key(key).build(), new PutObjectOptions[0]);
                        ParallelMultipartUploadStrategy.this.logger.debug(String.format("initiated multipart upload of %s to container %s with uploadId %s consisting from %s part (possible max. retries: %d)", key, container, uploadId, effectiveParts, maxRetries), new Object[0]);
                        ArrayBlockingQueue<Integer> activeParts = new ArrayBlockingQueue<Integer>(ParallelMultipartUploadStrategy.this.parallelDegree);
                        ConcurrentLinkedQueue<Part> toRetry = new ConcurrentLinkedQueue<Part>();
                        ConcurrentSkipListMap<Integer, String> etags = new ConcurrentSkipListMap<Integer, String>();
                        CountDownLatch latch = new CountDownLatch(effectiveParts);
                        while ((part = algorithm.getNextPart()) <= parts) {
                            partKey = part;
                            activeParts.put(partKey);
                            ParallelMultipartUploadStrategy.this.prepareUploadPart(container, key, uploadId, partKey, payload, algorithm.getNextChunkOffset(), chunkSize, etags, activeParts, futureParts, errors, maxRetries, errorMap, toRetry, latch);
                        }
                        if (remaining > 0L) {
                            partKey = part;
                            activeParts.put(partKey);
                            ParallelMultipartUploadStrategy.this.prepareUploadPart(container, key, uploadId, partKey, payload, algorithm.getNextChunkOffset(), remaining, etags, activeParts, futureParts, errors, maxRetries, errorMap, toRetry, latch);
                        }
                        latch.await();
                        while (errors.get() <= maxRetries && !toRetry.isEmpty()) {
                            int atOnce = Math.min(Math.min(toRetry.size(), errors.get()), ParallelMultipartUploadStrategy.this.parallelDegree);
                            CountDownLatch retryLatch = new CountDownLatch(atOnce);
                            for (int i = 0; i < atOnce; ++i) {
                                Part failedPart = (Part)toRetry.poll();
                                Integer partKey2 = failedPart.getPart();
                                activeParts.put(partKey2);
                                ParallelMultipartUploadStrategy.this.prepareUploadPart(container, key, uploadId, partKey2, payload, failedPart.getOffset(), failedPart.getSize(), etags, activeParts, futureParts, errors, maxRetries, errorMap, toRetry, retryLatch);
                            }
                            retryLatch.await();
                        }
                        if (errors.get() > maxRetries) {
                            throw new BlobRuntimeException(String.format("Too many failed parts: %s while multipart upload of %s to container %s with uploadId %s", errors.get(), key, container, uploadId));
                        }
                        String eTag = client.completeMultipartUpload(container, key, uploadId, etags);
                        ParallelMultipartUploadStrategy.this.logger.debug(String.format("multipart upload of %s to container %s with uploadId %s successfully finished with %s retries", key, container, uploadId, errors.get()), new Object[0]);
                        return eTag;
                    }
                    catch (Exception ex) {
                        RuntimeException rtex = (RuntimeException)Throwables2.getFirstThrowableOfType((Throwable)ex, RuntimeException.class);
                        if (rtex == null) {
                            rtex = new RuntimeException(ex);
                        }
                        for (Map.Entry entry : futureParts.entrySet()) {
                            ((ListenableFuture)entry.getValue()).cancel(false);
                        }
                        if (uploadId != null) {
                            client.abortMultipartUpload(container, key, uploadId);
                        }
                        throw rtex;
                    }
                }
                final PutOptions nonMultipartOptions = PutOptions.Builder.multipart((boolean)false);
                ListenableFuture futureETag = ParallelMultipartUploadStrategy.this.executor.submit((Callable)new Callable<String>(){

                    @Override
                    public String call() throws Exception {
                        return ParallelMultipartUploadStrategy.this.blobstore.putBlob(container, blob, nonMultipartOptions);
                    }
                });
                return ParallelMultipartUploadStrategy.this.maxTime != null ? (String)futureETag.get(ParallelMultipartUploadStrategy.this.maxTime.longValue(), TimeUnit.SECONDS) : (String)futureETag.get();
            }
        });
    }

    static class Part {
        private int part;
        private long offset;
        private long size;

        Part(int part, long offset, long size) {
            this.part = part;
            this.offset = offset;
            this.size = size;
        }

        public int getPart() {
            return this.part;
        }

        public void setPart(int part) {
            this.part = part;
        }

        public long getOffset() {
            return this.offset;
        }

        public void setOffset(long offset) {
            this.offset = offset;
        }

        public long getSize() {
            return this.size;
        }

        public void setSize(long size) {
            this.size = size;
        }
    }
}

