/*
 * Decompiled with CFR 0.152.
 */
package com.hazelcast.map.impl.mapstore.writebehind;

import com.hazelcast.core.MapStore;
import com.hazelcast.logging.ILogger;
import com.hazelcast.map.impl.mapstore.MapStoreContext;
import com.hazelcast.map.impl.mapstore.writebehind.DelayedEntry;
import com.hazelcast.map.impl.mapstore.writebehind.StoreEvent;
import com.hazelcast.map.impl.mapstore.writebehind.StoreListener;
import com.hazelcast.map.impl.mapstore.writebehind.WriteBehindProcessor;
import com.hazelcast.map.impl.mapstore.writebehind.WriteBehindQueue;
import com.hazelcast.nio.serialization.Data;
import com.hazelcast.nio.serialization.SerializationService;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;

class DefaultWriteBehindProcessor
implements WriteBehindProcessor<DelayedEntry> {
    public static final Comparator<DelayedEntry> DELAYED_ENTRY_COMPARATOR = new Comparator<DelayedEntry>(){

        @Override
        public int compare(DelayedEntry o1, DelayedEntry o2) {
            long s2;
            long s1 = o1.getStoreTime();
            return s1 < (s2 = o2.getStoreTime()) ? -1 : (s1 == s2 ? 0 : 1);
        }
    };
    private static final int RETRY_TIMES_OF_A_FAILED_STORE_OPERATION = 3;
    private static final int RETRY_STORE_AFTER_WAIT_SECONDS = 1;
    private final MapStore mapStore;
    private final SerializationService serializationService;
    private final List<StoreListener> storeListeners;
    private final ILogger logger;
    private final int writeBatchSize;

    DefaultWriteBehindProcessor(MapStoreContext mapStoreContext) {
        this.serializationService = mapStoreContext.getSerializationService();
        this.mapStore = mapStoreContext.getMapStoreWrapper();
        this.storeListeners = new ArrayList<StoreListener>(2);
        this.logger = mapStoreContext.getLogger(DefaultWriteBehindProcessor.class);
        this.writeBatchSize = mapStoreContext.getMapStoreConfig().getWriteBatchSize();
    }

    @Override
    public Map<Integer, List<DelayedEntry>> process(List<DelayedEntry> delayedEntries) {
        this.sort(delayedEntries);
        Map<Integer, List<DelayedEntry>> failMap = this.writeBatchSize > 1 ? this.doStoreUsingBatchSize(delayedEntries) : this.processInternal(delayedEntries);
        return failMap;
    }

    private Map<Integer, List<DelayedEntry>> processInternal(List<DelayedEntry> delayedEntries) {
        if (delayedEntries == null || delayedEntries.isEmpty()) {
            return Collections.emptyMap();
        }
        HashMap<Integer, List<DelayedEntry>> failsPerPartition = new HashMap<Integer, List<DelayedEntry>>();
        ArrayList<DelayedEntry> entriesToProcess = new ArrayList<DelayedEntry>();
        StoreOperationType operationType = null;
        for (DelayedEntry entry : delayedEntries) {
            StoreOperationType previousOperationType = operationType;
            operationType = entry.getValue() == null ? StoreOperationType.DELETE : StoreOperationType.WRITE;
            if (previousOperationType != null && !previousOperationType.equals((Object)operationType)) {
                List<DelayedEntry> failures = this.callHandler(entriesToProcess, previousOperationType);
                this.addToFails(failures, failsPerPartition);
                entriesToProcess.clear();
            }
            entriesToProcess.add(entry);
        }
        List<DelayedEntry> failures = this.callHandler(entriesToProcess, operationType);
        this.addToFails(failures, failsPerPartition);
        entriesToProcess.clear();
        return failsPerPartition;
    }

    private void addToFails(List<DelayedEntry> fails, Map<Integer, List<DelayedEntry>> failsPerPartition) {
        if (fails == null || fails.isEmpty()) {
            return;
        }
        for (DelayedEntry entry : fails) {
            int partitionId = entry.getPartitionId();
            List<DelayedEntry> delayedEntriesPerPartition = failsPerPartition.get(partitionId);
            if (delayedEntriesPerPartition == null) {
                delayedEntriesPerPartition = new ArrayList<DelayedEntry>();
                failsPerPartition.put(partitionId, delayedEntriesPerPartition);
            }
            delayedEntriesPerPartition.add(entry);
        }
    }

    private List<DelayedEntry> callHandler(Collection<DelayedEntry> delayedEntries, StoreOperationType operationType) {
        int size = delayedEntries.size();
        if (size == 0) {
            return Collections.emptyList();
        }
        if (size == 1) {
            Iterator<DelayedEntry> iterator = delayedEntries.iterator();
            DelayedEntry delayedEntry = iterator.next();
            return this.callSingleStoreWithListeners(delayedEntry, operationType);
        }
        DelayedEntry[] delayedEntriesArray = delayedEntries.toArray(new DelayedEntry[delayedEntries.size()]);
        Map batchMap = this.prepareBatchMap(delayedEntriesArray);
        if (batchMap.size() == 1) {
            DelayedEntry delayedEntry = delayedEntriesArray[delayedEntriesArray.length - 1];
            return this.callSingleStoreWithListeners(delayedEntry, operationType);
        }
        List<DelayedEntry> failedEntryList = this.callBatchStoreWithListeners(batchMap, operationType);
        ArrayList<DelayedEntry> failedTries = new ArrayList<DelayedEntry>();
        for (DelayedEntry entry : failedEntryList) {
            List<DelayedEntry> tmpFails = this.callSingleStoreWithListeners(entry, operationType);
            failedTries.addAll(tmpFails);
        }
        return failedTries;
    }

    private Map prepareBatchMap(DelayedEntry[] delayedEntries) {
        HashMap<Object, DelayedEntry> batchMap = new HashMap<Object, DelayedEntry>();
        int length = delayedEntries.length;
        for (int i = length - 1; i >= 0; --i) {
            DelayedEntry delayedEntry = delayedEntries[i];
            Object key = delayedEntry.getKey();
            if (batchMap.containsKey(key)) continue;
            batchMap.put(key, delayedEntry);
        }
        return batchMap;
    }

    private List<DelayedEntry> callSingleStoreWithListeners(final DelayedEntry entry, final StoreOperationType operationType) {
        return this.retryCall(new RetryTask<DelayedEntry>(){
            private List<DelayedEntry> failedDelayedEntries = Collections.emptyList();

            @Override
            public boolean run() throws Exception {
                DefaultWriteBehindProcessor.this.callBeforeStoreListeners(entry);
                Object key = DefaultWriteBehindProcessor.this.toObject(entry.getKey());
                Object value = DefaultWriteBehindProcessor.this.toObject(entry.getValue());
                boolean result = operationType.processSingle(key, value, DefaultWriteBehindProcessor.this.mapStore);
                DefaultWriteBehindProcessor.this.callAfterStoreListeners(entry);
                return result;
            }

            @Override
            public List<DelayedEntry> failedList() {
                this.failedDelayedEntries = Collections.singletonList(entry);
                return this.failedDelayedEntries;
            }
        });
    }

    private Map convertToObject(Map<Object, DelayedEntry> batchMap) {
        HashMap<Object, Object> map = new HashMap<Object, Object>();
        for (DelayedEntry entry : batchMap.values()) {
            Object key = this.toObject(entry.getKey());
            Object value = this.toObject(entry.getValue());
            map.put(key, value);
        }
        return map;
    }

    protected Object toObject(Object obj) {
        return this.serializationService.toObject(obj);
    }

    protected Data toData(Object obj) {
        return this.serializationService.toData(obj);
    }

    private List<DelayedEntry> callBatchStoreWithListeners(final Map<Object, DelayedEntry> batchMap, final StoreOperationType operationType) {
        return this.retryCall(new RetryTask<DelayedEntry>(){
            private List<DelayedEntry> failedDelayedEntries = Collections.emptyList();

            @Override
            public boolean run() throws Exception {
                DefaultWriteBehindProcessor.this.callBeforeStoreListeners(batchMap.values());
                Map map = DefaultWriteBehindProcessor.this.convertToObject(batchMap);
                boolean result = operationType.processBatch(map, DefaultWriteBehindProcessor.this.mapStore);
                DefaultWriteBehindProcessor.this.callAfterStoreListeners(batchMap.values());
                return result;
            }

            @Override
            public List<DelayedEntry> failedList() {
                this.failedDelayedEntries = new ArrayList<DelayedEntry>(batchMap.values().size());
                this.failedDelayedEntries.addAll(batchMap.values());
                return this.failedDelayedEntries;
            }
        });
    }

    private void callBeforeStoreListeners(DelayedEntry entry) {
        for (StoreListener listener : this.storeListeners) {
            listener.beforeStore(StoreEvent.createStoreEvent(entry));
        }
    }

    private void callAfterStoreListeners(DelayedEntry entry) {
        for (StoreListener listener : this.storeListeners) {
            listener.afterStore(StoreEvent.createStoreEvent(entry));
        }
    }

    @Override
    public void callBeforeStoreListeners(Collection<DelayedEntry> entries) {
        for (DelayedEntry entry : entries) {
            this.callBeforeStoreListeners(entry);
        }
    }

    @Override
    public void addStoreListener(StoreListener listeners) {
        this.storeListeners.add(listeners);
    }

    @Override
    public Collection flush(WriteBehindQueue queue) {
        if (queue.size() == 0) {
            return Collections.emptyList();
        }
        List<DelayedEntry> sortedDelayedEntries = queue.removeAll();
        return this.flushInternal(sortedDelayedEntries);
    }

    @Override
    public void flush(DelayedEntry entry) {
        List<DelayedEntry> entries = Collections.singletonList(entry);
        this.flushInternal(entries);
    }

    private Collection<Data> flushInternal(List<DelayedEntry> delayedEntries) {
        this.sort(delayedEntries);
        Map<Integer, List<DelayedEntry>> failedStoreOpPerPartition = this.process(delayedEntries);
        if (failedStoreOpPerPartition.size() > 0) {
            this.printErrorLog(failedStoreOpPerPartition);
        }
        return this.getDataKeys(delayedEntries);
    }

    private void printErrorLog(Map<Integer, List<DelayedEntry>> failsPerPartition) {
        int size = 0;
        Collection<List<DelayedEntry>> values = failsPerPartition.values();
        for (Collection collection : values) {
            size += collection.size();
        }
        String logMessage = String.format("Map store flush operation can not be done for %d entries", size);
        this.logger.severe(logMessage);
    }

    private List<Data> getDataKeys(List<DelayedEntry> sortedDelayedEntries) {
        if (sortedDelayedEntries == null || sortedDelayedEntries.isEmpty()) {
            return Collections.emptyList();
        }
        ArrayList<Data> keys = new ArrayList<Data>(sortedDelayedEntries.size());
        for (DelayedEntry entry : sortedDelayedEntries) {
            keys.add(this.toData(entry.getKey()));
        }
        return keys;
    }

    @Override
    public void callAfterStoreListeners(Collection<DelayedEntry> entries) {
        for (DelayedEntry entry : entries) {
            this.callAfterStoreListeners(entry);
        }
    }

    private Map<Integer, List<DelayedEntry>> doStoreUsingBatchSize(List<DelayedEntry> sortedDelayedEntries) {
        List<DelayedEntry> delayedEntryList;
        HashMap<Integer, List<DelayedEntry>> failsPerPartition = new HashMap<Integer, List<DelayedEntry>>();
        int page = 0;
        while ((delayedEntryList = this.getBatchChunk(sortedDelayedEntries, this.writeBatchSize, page++)) != null) {
            Map<Integer, List<DelayedEntry>> fails = this.processInternal(delayedEntryList);
            Set<Map.Entry<Integer, List<DelayedEntry>>> entries = fails.entrySet();
            for (Map.Entry<Integer, List<DelayedEntry>> entry : entries) {
                Integer partitionId = entry.getKey();
                List<DelayedEntry> tmpFailList = entry.getValue();
                List failList = (List)failsPerPartition.get(partitionId);
                if (failList == null || failList.isEmpty()) {
                    failsPerPartition.put(partitionId, tmpFailList);
                    failList = (List)failsPerPartition.get(partitionId);
                }
                failList.addAll(tmpFailList);
            }
        }
        return failsPerPartition;
    }

    private List<DelayedEntry> getBatchChunk(List<DelayedEntry> list, int batchSize, int chunkNumber) {
        if (list == null || list.isEmpty()) {
            return null;
        }
        int start = chunkNumber * batchSize;
        int end = Math.min(start + batchSize, list.size());
        if (start >= end) {
            return null;
        }
        return list.subList(start, end);
    }

    private List<DelayedEntry> retryCall(RetryTask task) {
        int k;
        boolean result = false;
        Throwable throwable = null;
        for (k = 0; k < 3; ++k) {
            try {
                result = task.run();
            }
            catch (Throwable t) {
                throwable = t;
            }
            if (result) break;
            this.sleepSeconds(1L);
        }
        if (k > 0) {
            String msg = String.format("Store operation failed and retries %s", result ? "succeeded." : "failed too.");
            this.logger.warning(msg, throwable);
            if (!result) {
                return task.failedList();
            }
        }
        return Collections.emptyList();
    }

    private void sort(List<DelayedEntry> entries) {
        if (entries == null || entries.isEmpty()) {
            return;
        }
        if (entries.size() < 2) {
            return;
        }
        Collections.sort(entries, DELAYED_ENTRY_COMPARATOR);
    }

    private void sleepSeconds(long secs) {
        try {
            TimeUnit.SECONDS.sleep(secs);
        }
        catch (InterruptedException e) {
            this.logger.warning(e);
        }
    }

    private static enum StoreOperationType {
        DELETE{

            @Override
            boolean processSingle(Object key, Object value, MapStore mapStore) {
                mapStore.delete(key);
                return true;
            }

            @Override
            boolean processBatch(Map map, MapStore mapStore) {
                mapStore.deleteAll(map.keySet());
                return true;
            }
        }
        ,
        WRITE{

            @Override
            boolean processSingle(Object key, Object value, MapStore mapStore) {
                mapStore.store(key, value);
                return true;
            }

            @Override
            boolean processBatch(Map map, MapStore mapStore) {
                mapStore.storeAll(map);
                return true;
            }
        };


        abstract boolean processSingle(Object var1, Object var2, MapStore var3);

        abstract boolean processBatch(Map var1, MapStore var2);
    }

    private static interface RetryTask<T> {
        public boolean run() throws Exception;

        public List<T> failedList();
    }
}

