/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.runtime.iterative.task;

import java.util.Map;
import org.apache.flink.api.common.aggregators.Aggregator;
import org.apache.flink.runtime.event.task.TaskEvent;
import org.apache.flink.runtime.iterative.event.WorkerDoneEvent;
import org.apache.flink.runtime.util.event.EventListener;
import org.apache.flink.shaded.com.google.common.base.Preconditions;
import org.apache.flink.types.Value;

public class SyncEventHandler
implements EventListener<TaskEvent> {
    private final ClassLoader userCodeClassLoader;
    private final Map<String, Aggregator<?>> aggregators;
    private final int numberOfEventsUntilEndOfSuperstep;
    private int workerDoneEventCounter;
    private boolean endOfSuperstep;

    public SyncEventHandler(int numberOfEventsUntilEndOfSuperstep, Map<String, Aggregator<?>> aggregators, ClassLoader userCodeClassLoader) {
        Preconditions.checkArgument(numberOfEventsUntilEndOfSuperstep > 0);
        this.userCodeClassLoader = userCodeClassLoader;
        this.numberOfEventsUntilEndOfSuperstep = numberOfEventsUntilEndOfSuperstep;
        this.aggregators = aggregators;
    }

    @Override
    public void onEvent(TaskEvent event) {
        if (WorkerDoneEvent.class.equals(event.getClass())) {
            this.onWorkerDoneEvent((WorkerDoneEvent)event);
            return;
        }
        throw new IllegalStateException("Unable to handle event " + event.getClass().getName());
    }

    private void onWorkerDoneEvent(WorkerDoneEvent workerDoneEvent) {
        Value[] aggregates;
        if (this.endOfSuperstep) {
            throw new RuntimeException("Encountered WorderDoneEvent when still in End-of-Superstep status.");
        }
        ++this.workerDoneEventCounter;
        String[] aggNames = workerDoneEvent.getAggregatorNames();
        if (aggNames.length != (aggregates = workerDoneEvent.getAggregates(this.userCodeClassLoader)).length) {
            throw new RuntimeException("Inconsistent WorkerDoneEvent received!");
        }
        for (int i = 0; i < aggNames.length; ++i) {
            Aggregator<?> aggregator = this.aggregators.get(aggNames[i]);
            aggregator.aggregate(aggregates[i]);
        }
        if (this.workerDoneEventCounter % this.numberOfEventsUntilEndOfSuperstep == 0) {
            this.endOfSuperstep = true;
            Thread.currentThread().interrupt();
        }
    }

    public boolean isEndOfSuperstep() {
        return this.endOfSuperstep;
    }

    public void resetEndOfSuperstep() {
        this.endOfSuperstep = false;
    }
}

