package eu.dnetlib.dhp.broker.oa;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.broker.model.Notification;
import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.util.HashMap;
import org.apache.commons.io.IOUtils;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.util.LongAccumulator;
import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:eu/dnetlib/dhp/broker/oa/IndexNotificationsJob.class */
public class IndexNotificationsJob {
    private static final Logger log = LoggerFactory.getLogger(IndexNotificationsJob.class);

    public static void main(String[] strArr) throws Exception {
        ArgumentApplicationParser argumentApplicationParser = new ArgumentApplicationParser(IOUtils.toString(IndexNotificationsJob.class.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/index_notifications.json")));
        argumentApplicationParser.parseArgument(strArr);
        SparkConf sparkConf = new SparkConf();
        String str = argumentApplicationParser.get("outputDir") + "/notifications";
        log.info("notificationsPath: {}", str);
        String str2 = argumentApplicationParser.get("index");
        log.info("index: {}", str2);
        String str3 = argumentApplicationParser.get("esHost");
        log.info("indexHost: {}", str3);
        String str4 = argumentApplicationParser.get("esBatchWriteRetryCount");
        log.info("esBatchWriteRetryCount: {}", str4);
        String str5 = argumentApplicationParser.get("esBatchWriteRetryWait");
        log.info("esBatchWriteRetryWait: {}", str5);
        String str6 = argumentApplicationParser.get("esBatchSizeEntries");
        log.info("esBatchSizeEntries: {}", str6);
        String str7 = argumentApplicationParser.get("esNodesWanOnly");
        log.info("esNodesWanOnly: {}", str7);
        String str8 = argumentApplicationParser.get("brokerApiBaseUrl");
        log.info("brokerApiBaseUrl: {}", str8);
        SparkSession orCreate = SparkSession.builder().config(sparkConf).getOrCreate();
        LongAccumulator longAccumulator = orCreate.sparkContext().longAccumulator("total_indexed");
        Long date = ((Notification) ClusterUtils.readPath(orCreate, str, Notification.class).first()).getDate();
        JavaRDD javaRDD = ClusterUtils.readPath(orCreate, str, Notification.class).map(notification -> {
            return prepareForIndexing(notification, longAccumulator);
        }, Encoders.STRING()).javaRDD();
        HashMap hashMap = new HashMap();
        hashMap.put("es.index.auto.create", "false");
        hashMap.put("es.nodes", str3);
        hashMap.put("es.mapping.id", "notificationId");
        hashMap.put("es.batch.write.retry.count", str4);
        hashMap.put("es.batch.write.retry.wait", str5);
        hashMap.put("es.batch.size.entries", str6);
        hashMap.put("es.nodes.wan.only", str7);
        log.info("*** Start indexing");
        JavaEsSpark.saveJsonToEs(javaRDD, str2, hashMap);
        log.info("*** End indexing");
        log.info("*** Deleting old notifications");
        log.info("*** Deleted notifications: {}", deleteOldNotifications(str8, date.longValue() - 1000));
        log.info("*** sendNotifications (emails, ...)");
        sendNotifications(str8, date.longValue() - 1000);
        log.info("*** ALL done.");
    }

    private static String deleteOldNotifications(String str, long j) throws Exception {
        HttpDelete httpDelete = new HttpDelete(str + "/api/notifications/byDate/0/" + j);
        CloseableHttpClient createDefault = HttpClients.createDefault();
        try {
            CloseableHttpResponse execute = createDefault.execute(httpDelete);
            try {
                String iOUtils = IOUtils.toString(execute.getEntity().getContent());
                if (execute != null) {
                    execute.close();
                }
                if (createDefault != null) {
                    createDefault.close();
                }
                return iOUtils;
            } finally {
            }
        } catch (Throwable th) {
            if (createDefault != null) {
                try {
                    createDefault.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    private static String sendNotifications(String str, long j) throws IOException {
        HttpGet httpGet = new HttpGet(str + "/api/openaireBroker/notifications/send/" + j);
        CloseableHttpClient createDefault = HttpClients.createDefault();
        try {
            CloseableHttpResponse execute = createDefault.execute(httpGet);
            try {
                String iOUtils = IOUtils.toString(execute.getEntity().getContent());
                if (execute != null) {
                    execute.close();
                }
                if (createDefault != null) {
                    createDefault.close();
                }
                return iOUtils;
            } finally {
            }
        } catch (Throwable th) {
            if (createDefault != null) {
                try {
                    createDefault.close();
                } catch (Throwable th2) {
                    th.addSuppressed(th2);
                }
            }
            throw th;
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static String prepareForIndexing(Notification notification, LongAccumulator longAccumulator) throws JsonProcessingException {
        longAccumulator.add(1L);
        return new ObjectMapper().writeValueAsString(notification);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1438247524:
                if (implMethodName.equals("lambda$main$8723ce87$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("eu/dnetlib/dhp/broker/oa/IndexNotificationsJob") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/spark/util/LongAccumulator;Leu/dnetlib/dhp/broker/model/Notification;)Ljava/lang/String;")) {
                    LongAccumulator longAccumulator = (LongAccumulator) serializedLambda.getCapturedArg(0);
                    return notification -> {
                        return prepareForIndexing(notification, longAccumulator);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
