package eu.dnetlib.dhp.broker.oa;

import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.broker.model.Event;
import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
import eu.dnetlib.dhp.broker.oa.util.aggregators.stats.DatasourceStats;
import eu.dnetlib.dhp.broker.oa.util.aggregators.stats.StatsAggregator;
import eu.dnetlib.dhp.common.SparkSessionSupport;
import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.util.Optional;
import java.util.Properties;
import org.apache.commons.io.IOUtils;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.TypedColumn;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:eu/dnetlib/dhp/broker/oa/GenerateStatsJob.class */
public class GenerateStatsJob {
    private static final Logger log = LoggerFactory.getLogger(GenerateStatsJob.class);

    public static void main(String[] strArr) throws Exception {
        ArgumentApplicationParser argumentApplicationParser = new ArgumentApplicationParser(IOUtils.toString(GenerateStatsJob.class.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/stats_params.json")));
        argumentApplicationParser.parseArgument(strArr);
        Boolean bool = (Boolean) Optional.ofNullable(argumentApplicationParser.get("isSparkSessionManaged")).map(Boolean::valueOf).orElse(Boolean.TRUE);
        log.info("isSparkSessionManaged: {}", bool);
        SparkConf sparkConf = new SparkConf();
        String str = argumentApplicationParser.get("outputDir") + "/events";
        log.info("eventsPath: {}", str);
        String str2 = argumentApplicationParser.get("dbUrl");
        log.info("dbUrl: {}", str2);
        String str3 = argumentApplicationParser.get("dbUser");
        log.info("dbUser: {}", str3);
        String str4 = argumentApplicationParser.get("dbPassword");
        log.info("dbPassword: {}", "***");
        String str5 = argumentApplicationParser.get("brokerApiBaseUrl");
        log.info("brokerApiBaseUrl: {}", str5);
        TypedColumn column = new StatsAggregator().toColumn();
        Properties properties = new Properties();
        properties.put("user", str3);
        properties.put("password", str4);
        SparkSessionSupport.runWithSparkSession(sparkConf, bool, sparkSession -> {
            ClusterUtils.readPath(sparkSession, str, Event.class).groupByKey(event -> {
                return event.getTopic() + "@@@" + event.getMap().getTargetDatasourceId();
            }, Encoders.STRING()).agg(column).map(tuple2 -> {
                return (DatasourceStats) tuple2._2;
            }, Encoders.bean(DatasourceStats.class)).coalesce(1).write().mode(SaveMode.Overwrite).jdbc(str2, "oa_datasource_stats_temp", properties);
            log.info("*** updateStats");
            updateStats(str5);
            log.info("*** ALL done.");
        });
    }

    private static String updateStats(String str) throws IOException {
        HttpGet httpGet = new HttpGet(str + "/api/openaireBroker/stats/update");
        CloseableHttpClient createDefault = HttpClients.createDefault();
        Throwable th = null;
        try {
            CloseableHttpResponse execute = createDefault.execute(httpGet);
            Throwable th2 = null;
            try {
                String iOUtils = IOUtils.toString(execute.getEntity().getContent());
                if (execute != null) {
                    if (0 != 0) {
                        try {
                            execute.close();
                        } catch (Throwable th3) {
                            th2.addSuppressed(th3);
                        }
                    } else {
                        execute.close();
                    }
                }
                return iOUtils;
            } catch (Throwable th4) {
                if (execute != null) {
                    if (0 != 0) {
                        try {
                            execute.close();
                        } catch (Throwable th5) {
                            th2.addSuppressed(th5);
                        }
                    } else {
                        execute.close();
                    }
                }
                throw th4;
            }
        } finally {
            if (createDefault != null) {
                if (0 != 0) {
                    try {
                        createDefault.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    createDefault.close();
                }
            }
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -221882804:
                if (implMethodName.equals("lambda$null$79479311$1")) {
                    z = false;
                    break;
                }
                break;
            case -221882803:
                if (implMethodName.equals("lambda$null$79479311$2")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("eu/dnetlib/dhp/broker/oa/GenerateStatsJob") && serializedLambda.getImplMethodSignature().equals("(Leu/dnetlib/dhp/broker/model/Event;)Ljava/lang/String;")) {
                    return event -> {
                        return event.getTopic() + "@@@" + event.getMap().getTargetDatasourceId();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("eu/dnetlib/dhp/broker/oa/GenerateStatsJob") && serializedLambda.getImplMethodSignature().equals("(Lscala/Tuple2;)Leu/dnetlib/dhp/broker/oa/util/aggregators/stats/DatasourceStats;")) {
                    return tuple2 -> {
                        return (DatasourceStats) tuple2._2;
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
