/*
 * Decompiled with CFR 0.152.
 */
package net.intelie.live;

import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import net.intelie.live.ControlEvent;
import net.intelie.live.EndHistoryInfo;
import net.intelie.live.EventSender;
import net.intelie.live.MapBuilder;
import net.intelie.live.Query;
import net.intelie.live.QueryEvent;
import net.intelie.live.QueryListener;
import net.intelie.live.ScheduledQuerier;
import net.intelie.live.SilentListener;
import net.intelie.live.StopInfo;
import net.intelie.live.StopReason;
import net.intelie.live.util.QuerySpan;
import net.intelie.pipes.time.ClockScheduler;
import net.intelie.pipes.time.Period;
import net.intelie.pipes.time.SchedulerContext;
import net.intelie.pipes.time.TaskHandle;
import net.intelie.pipes.time.TimeSpan;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ScheduledQueryHandle {
    private static final Logger LOGGER = LoggerFactory.getLogger(ScheduledQueryHandle.class);
    private final Listener listener;
    private final ScheduledQuerier querier;
    private final ClockScheduler scheduler;
    private final boolean follow;
    private final Period period;
    private final EventSender status;
    private final String expression;
    private final AtomicBoolean sentEndHistory;
    private final AtomicBoolean started;
    private Query query;
    private TimeSpan span;
    private TaskHandle handle;

    public ScheduledQueryHandle(Query query, ScheduledQuerier querier, Period period, ClockScheduler scheduler, EventSender status) {
        this.query = query;
        this.period = period;
        this.status = status;
        this.listener = new Listener(query.getListener());
        this.querier = querier;
        this.follow = query.getFollow();
        this.expression = query.getExpression();
        this.scheduler = scheduler;
        this.sentEndHistory = new AtomicBoolean(false);
        this.started = new AtomicBoolean(false);
    }

    public void start() {
        try {
            this.span = QuerySpan.parse(this.query.getSpan());
            long now = this.scheduler.now();
            if (this.querier instanceof ScheduledQuerier.WithHistory && !this.span.isPoint()) {
                this.executeHistory();
            }
            if (!this.follow) {
                this.executeOnce(now);
                return;
            }
            Preconditions.checkArgument((this.period != null ? 1 : 0) != 0, (Object)"No valid @cron expression found");
            this.maybeSendStartAndEndHistory(now);
            this.querier.touch(now);
            SchedulerContext context = this.scheduler.newContext();
            this.handle = context.schedule(this.period, (start, end) -> this.executeOnce(end));
            context.start();
        }
        catch (Throwable e) {
            SilentListener.send(this.listener, new StopInfo(new StopReason.Error(e)));
            LOGGER.info("Could not start query. Not scheduling.", e);
        }
    }

    private void executeOnce(long timestamp) {
        long start = this.scheduler.now();
        try {
            this.maybeSendStartAndEndHistory(start);
            AtomicInteger size = new AtomicInteger();
            this.querier.execute(timestamp, new MyListener(timestamp, size, start));
        }
        catch (Throwable e) {
            this.sendError(e, start);
        }
    }

    private void executeHistory() {
        long now = this.scheduler.now();
        try {
            this.maybeStart();
            long start = this.span.start(now);
            long end = this.span.end(now);
            AtomicInteger size = new AtomicInteger();
            ((ScheduledQuerier.WithHistory)this.querier).executeHistory(start, end, new MyListener(end, size, now));
            this.maybeEndHistory(end, now);
        }
        catch (Throwable e) {
            this.sendError(e, now);
        }
    }

    private QueryEvent maybeAddTimestamp(QueryEvent event, long timestamp) {
        ArrayList<Map<String, Object>> withTimestamp = new ArrayList<Map<String, Object>>();
        Iterator iterator = event.iterator();
        while (iterator.hasNext()) {
            LinkedHashMap<String, Long> map = (LinkedHashMap<String, Long>)iterator.next();
            if (map.get("timestamp") == null) {
                LinkedHashMap<String, Long> copy = new LinkedHashMap<String, Long>();
                copy.put("timestamp", timestamp);
                copy.putAll(map);
                copy.put("timestamp", timestamp);
                map = copy;
            }
            withTimestamp.add((Map<String, Object>)map);
        }
        return new QueryEvent(withTimestamp);
    }

    private void sendError(Throwable e, long start) {
        LOGGER.info("Could not execute query", e);
        this.status.send(new MapBuilder().put("expression", this.expression).put("status", "error").put("duration", this.scheduler.now() - start).put("message", e.getMessage()).ok());
        SilentListener.send(this.listener, new StopInfo(new StopReason.Error(e)));
    }

    private void maybeSendStartAndEndHistory(long start) throws Exception {
        this.maybeStart();
        this.maybeEndHistory(start, start);
    }

    private void maybeEndHistory(long timestamp, long start) {
        if (this.follow && !this.sentEndHistory.getAndSet(true)) {
            SilentListener.send(this.listener, new EndHistoryInfo(timestamp, this.scheduler.now() - start));
        }
    }

    private void maybeStart() throws Exception {
        if (!this.started.getAndSet(true)) {
            this.listener.onControl(this.querier.metadata());
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    public void stop(StopReason reason) {
        if (this.handle != null) {
            this.handle.cancel();
        }
        ScheduledQueryHandle scheduledQueryHandle = this;
        synchronized (scheduledQueryHandle) {
            SilentListener.send(this.listener, new StopInfo(reason));
        }
    }

    private class MyListener
    implements ScheduledQuerier.Listener {
        private final long timestamp;
        private final AtomicInteger size;
        private final long start;

        public MyListener(long timestamp, AtomicInteger size, long start) {
            this.timestamp = timestamp;
            this.size = size;
            this.start = start;
        }

        @Override
        public void onEvent(QueryEvent event) {
            SilentListener.send(ScheduledQueryHandle.this.listener, ScheduledQueryHandle.this.maybeAddTimestamp(event, this.timestamp), !ScheduledQueryHandle.this.sentEndHistory.get());
            this.size.addAndGet(event.size());
        }

        @Override
        public void onSuccess() {
            long end = ScheduledQueryHandle.this.scheduler.now();
            ScheduledQueryHandle.this.status.send(new MapBuilder().put("expression", ScheduledQueryHandle.this.expression).put("status", "success").put("events", this.size.get()).put("duration", end - this.start).ok());
            if (!ScheduledQueryHandle.this.follow) {
                SilentListener.send(ScheduledQueryHandle.this.listener, new EndHistoryInfo(end, end - this.start));
                SilentListener.send(ScheduledQueryHandle.this.listener, new StopInfo(new StopReason.NoRealtime()));
            }
        }

        @Override
        public void onError(Throwable e) {
            ScheduledQueryHandle.this.sendError(e, this.start);
        }
    }

    private class Listener
    extends QueryListener.Empty {
        private final QueryListener listener;

        public Listener(QueryListener listener) {
            this.listener = listener;
        }

        @Override
        public void onEvent(QueryEvent event, boolean history) throws Exception {
            this.listener.onEvent(event, history);
        }

        @Override
        public void onStop(StopInfo event) throws Exception {
            ScheduledQueryHandle.this.started.set(false);
            ScheduledQueryHandle.this.sentEndHistory.set(false);
            this.listener.onControl(event);
        }

        @Override
        public void onCustom(ControlEvent event) throws Exception {
            this.listener.onControl(event);
        }
    }
}

