/*
 * Decompiled with CFR 0.152.
 */
package br.pucrio.tecgraf.soma.websocketnotifier.application.service;

import br.pucrio.tecgraf.soma.serviceapi.configuration.ServiceConfiguration;
import br.pucrio.tecgraf.soma.websocketnotifier.application.configuration.Constants;
import br.pucrio.tecgraf.soma.websocketnotifier.model.WebsocketNotifierInfo;
import com.google.gson.JsonArray;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import com.google.gson.JsonParser;
import jakarta.ws.rs.ProcessingException;
import jakarta.ws.rs.client.ClientBuilder;
import jakarta.ws.rs.client.Entity;
import jakarta.ws.rs.client.WebTarget;
import jakarta.ws.rs.core.Response;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Service
public class KtableService {
    private final Logger logger = LoggerFactory.getLogger(KtableService.class);
    private final String KSQLDB_URL;
    private final String KAFKA_NOTIFICATION_TOPIC;

    @Autowired
    public KtableService(ServiceConfiguration serviceConfiguration) {
        this.KSQLDB_URL = serviceConfiguration.getValue(Constants.Config.KAFKA_KSQLDB_URL.option.getLongName());
        this.KAFKA_NOTIFICATION_TOPIC = serviceConfiguration.getValue(Constants.Config.KAFKA_NOTIFICATION_TOPIC.option.getLongName());
    }

    private String getLastUserOffsetKQuery(WebsocketNotifierInfo userInfo) {
        return "{\"ksql\": \"select * from ktable_websocket_notifier where ROWKEY = '" + userInfo.getUser() + "|+|" + userInfo.getApplication() + "|+|" + userInfo.getTopicName() + "';\", \"streamsProperties\": {}}";
    }

    private String getKStreamCreateKSQL() {
        return "{ \"ksql\": \"CREATE STREAM websocket_notifier_stream (user VARCHAR, application VARCHAR, topic_name VARCHAR, offset BIGINT) WITH (KAFKA_TOPIC = '" + this.KAFKA_NOTIFICATION_TOPIC + "', VALUE_FORMAT='JSON');\",  \"streamsProperties\": {\"ksql.streams.auto.offset.reset\": \"earliest\"} }";
    }

    private String getKTableCreateKSQL() {
        return "{ \"ksql\": \"CREATE TABLE ktable_websocket_notifier AS SELECT user, application, topic_name, MAX(offset) AS offset FROM WEBSOCKET_NOTIFIER_STREAM GROUP BY user, application, topic_name;\",  \"streamsProperties\": {\"ksql.streams.auto.offset.reset\": \"earliest\"} }";
    }

    private WebTarget getWebTarget(String uri) {
        return ClientBuilder.newClient().target(uri);
    }

    protected Response makeRequest(String query, String endpoint, String path) {
        this.logger.info("makeRequest {} {}", (Object)endpoint, (Object)path);
        WebTarget target = this.getWebTarget(endpoint);
        Entity queryEntity = Entity.json((Object)query);
        Response response = target.path(path).request(new String[]{"application/vnd.ksql.v1+json"}).post(queryEntity);
        return response;
    }

    public WebsocketNotifierInfo getLastUserOffset(WebsocketNotifierInfo userInfo) {
        String query = this.getLastUserOffsetKQuery(userInfo);
        try {
            this.logger.debug(query);
            Response response = this.makeRequest(query, this.KSQLDB_URL, "query");
            if (response.getStatus() != 200) {
                this.logger.error("Error during getLastUserOffset.", response.getEntity());
            }
            return this.parseJsonArray(userInfo, (String)response.readEntity(String.class));
        }
        catch (ProcessingException e) {
            this.logger.error("Error during getLastUserOffset.", (Throwable)e);
            return userInfo;
        }
    }

    private boolean ktableDoesNotExists(Object response) {
        JsonObject object;
        return response instanceof JsonObject && (object = (JsonObject)response).has("@type") && object.get("@type").getAsString().equals("statement_error") && object.get("error_code").getAsString().equals("40001");
    }

    private boolean ktableReturnHasTuple(Object response) {
        JsonArray array;
        return response instanceof JsonArray && (array = (JsonArray)response).size() > 1;
    }

    private WebsocketNotifierInfo parseJsonArray(WebsocketNotifierInfo userInfo, String response) {
        JsonParser parser = new JsonParser();
        JsonElement responseJson = parser.parse(response);
        if (this.ktableDoesNotExists((Object)responseJson)) {
            if (this.createKtableNotifier()) {
                return this.getLastUserOffset(userInfo);
            }
            this.logger.info("Websocket notification topic does not exist yet.");
            return userInfo;
        }
        if (this.ktableReturnHasTuple((Object)responseJson)) {
            JsonObject row = ((JsonArray)responseJson).get(1).getAsJsonObject();
            userInfo.setOffset(Integer.valueOf(row.get("row").getAsJsonObject().get("columns").getAsJsonArray().get(5).getAsInt()));
            return userInfo;
        }
        this.logger.warn("Json return empty from KSQL query: {}", (Object)response);
        return userInfo;
    }

    private boolean createKSQLAccessStructure(String ksql) {
        try {
            Response response = this.makeRequest(ksql, this.KSQLDB_URL, "ksql");
            this.logger.debug("Response create KSQL ACCESS STRUCTURE", response.readEntity(String.class));
            if (response.getStatus() != 200) {
                this.logger.error("Error while creating KSQL ACCESS STRUCTURE URI {} RESPONSE {} ", (Object)this.KSQLDB_URL, (Object)response);
                return false;
            }
            return true;
        }
        catch (ProcessingException e) {
            this.logger.error("Response create KSQL ACCESS STRUCTURE", (Throwable)e);
            return false;
        }
    }

    protected boolean createKtableNotifier() {
        this.logger.info("Creating KTABLE websocket notifier");
        return this.createKSQLAccessStructure(this.getKStreamCreateKSQL()) && this.createKSQLAccessStructure(this.getKTableCreateKSQL());
    }
}

