/*
 * Decompiled with CFR 0.152.
 */
package org.apache.cassandra.streaming;

import com.google.common.base.Throwables;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.io.sstable.SSTableReader;
import org.apache.cassandra.streaming.StreamSession;
import org.apache.cassandra.streaming.StreamTask;
import org.apache.cassandra.streaming.messages.OutgoingFileMessage;
import org.apache.cassandra.utils.Pair;
import org.apache.cassandra.utils.concurrent.Ref;

public class StreamTransferTask
extends StreamTask {
    private static final ScheduledExecutorService timeoutExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("StreamingTransferTaskTimeouts"));
    private final AtomicInteger sequenceNumber = new AtomicInteger(0);
    private boolean aborted = false;
    private final Map<Integer, OutgoingFileMessage> files = new HashMap<Integer, OutgoingFileMessage>();
    private final Map<Integer, ScheduledFuture> timeoutTasks = new HashMap<Integer, ScheduledFuture>();
    private long totalSize;

    public StreamTransferTask(StreamSession session, UUID cfId) {
        super(session, cfId);
    }

    public synchronized void addTransferFile(Ref<SSTableReader> ref, long estimatedKeys, List<Pair<Long, Long>> sections, long repairedAt) {
        assert (ref.get() != null && this.cfId.equals(ref.get().metadata.cfId));
        OutgoingFileMessage message = new OutgoingFileMessage(ref, this.sequenceNumber.getAndIncrement(), estimatedKeys, sections, repairedAt);
        this.files.put(message.header.sequenceNumber, message);
        this.totalSize += message.header.size();
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void complete(int sequenceNumber) {
        boolean signalComplete;
        StreamTransferTask streamTransferTask = this;
        synchronized (streamTransferTask) {
            OutgoingFileMessage file;
            ScheduledFuture timeout = this.timeoutTasks.remove(sequenceNumber);
            if (timeout != null) {
                timeout.cancel(false);
            }
            if ((file = this.files.remove(sequenceNumber)) != null) {
                file.complete();
            }
            signalComplete = this.files.isEmpty();
        }
        if (signalComplete) {
            this.session.taskCompleted(this);
        }
    }

    @Override
    public synchronized void abort() {
        if (this.aborted) {
            return;
        }
        this.aborted = true;
        for (ScheduledFuture future : this.timeoutTasks.values()) {
            future.cancel(false);
        }
        this.timeoutTasks.clear();
        Throwable fail = null;
        for (OutgoingFileMessage file : this.files.values()) {
            try {
                file.complete();
            }
            catch (Throwable t) {
                if (fail == null) {
                    fail = t;
                    continue;
                }
                fail.addSuppressed(t);
            }
        }
        this.files.clear();
        if (fail != null) {
            Throwables.propagate(fail);
        }
    }

    @Override
    public synchronized int getTotalNumberOfFiles() {
        return this.files.size();
    }

    @Override
    public long getTotalSize() {
        return this.totalSize;
    }

    public synchronized Collection<OutgoingFileMessage> getFileMessages() {
        return new ArrayList<OutgoingFileMessage>(this.files.values());
    }

    public synchronized OutgoingFileMessage createMessageForRetry(int sequenceNumber) {
        ScheduledFuture future = this.timeoutTasks.remove(sequenceNumber);
        if (future != null) {
            future.cancel(false);
        }
        return this.files.get(sequenceNumber);
    }

    public synchronized ScheduledFuture scheduleTimeout(final int sequenceNumber, long time, TimeUnit unit) {
        if (!this.files.containsKey(sequenceNumber)) {
            return null;
        }
        ScheduledFuture<?> future = timeoutExecutor.schedule(new Runnable(){

            /*
             * WARNING - Removed try catching itself - possible behaviour change.
             */
            @Override
            public void run() {
                StreamTransferTask streamTransferTask = StreamTransferTask.this;
                synchronized (streamTransferTask) {
                    StreamTransferTask.this.timeoutTasks.remove(sequenceNumber);
                    StreamTransferTask.this.complete(sequenceNumber);
                }
            }
        }, time, unit);
        ScheduledFuture<?> prev = this.timeoutTasks.put(sequenceNumber, future);
        assert (prev == null);
        return future;
    }
}

