/*
 * Decompiled with CFR 0.152.
 */
package org.apache.flink.streaming.runtime.tasks;

import org.apache.flink.streaming.api.operators.Output;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.tasks.StreamTask;

public class SourceStreamTask<OUT>
extends StreamTask<OUT, StreamSource<OUT>> {
    @Override
    protected void init() {
    }

    @Override
    protected void cleanup() {
    }

    @Override
    protected void run() throws Exception {
        Object checkpointLock = this.getCheckpointLock();
        SourceOutput output = new SourceOutput(this.getHeadOutput(), checkpointLock);
        ((StreamSource)this.headOperator).run(checkpointLock, output);
    }

    @Override
    protected void cancelTask() throws Exception {
        ((StreamSource)this.headOperator).cancel();
    }

    private class SourceOutput<T>
    implements Output<T> {
        private final Output<T> output;
        private final Object lockObject;

        public SourceOutput(Output<T> output, Object lockObject) {
            this.output = output;
            this.lockObject = lockObject;
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        @Override
        public void emitWatermark(Watermark mark) {
            Object object = this.lockObject;
            synchronized (object) {
                this.output.emitWatermark(mark);
            }
        }

        /*
         * WARNING - Removed try catching itself - possible behaviour change.
         */
        public void collect(T record) {
            Object object = this.lockObject;
            synchronized (object) {
                SourceStreamTask.this.checkTimerException();
                this.output.collect(record);
            }
        }

        public void close() {
            this.output.close();
        }
    }
}

