package eu.dnetlib.dhp.broker.oa;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.broker.model.Event;
import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
import eu.dnetlib.dhp.broker.oa.util.EventGroup;
import eu.dnetlib.dhp.broker.oa.util.aggregators.subset.EventSubsetAggregator;
import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.util.Date;
import java.util.HashMap;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.TypedColumn;
import org.apache.spark.util.LongAccumulator;
import org.elasticsearch.spark.rdd.api.java.JavaEsSpark;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:eu/dnetlib/dhp/broker/oa/IndexEventSubsetJob.class */
public class IndexEventSubsetJob {
    private static final Logger log = LoggerFactory.getLogger(IndexEventSubsetJob.class);

    public static void main(String[] strArr) throws Exception {
        ArgumentApplicationParser argumentApplicationParser = new ArgumentApplicationParser(IOUtils.toString(IndexEventSubsetJob.class.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/index_event_subset.json")));
        argumentApplicationParser.parseArgument(strArr);
        SparkConf sparkConf = new SparkConf();
        String str = argumentApplicationParser.get("outputDir") + "/events";
        log.info("eventsPath: {}", str);
        String str2 = argumentApplicationParser.get("index");
        log.info("index: {}", str2);
        String str3 = argumentApplicationParser.get("esHost");
        log.info("indexHost: {}", str3);
        String str4 = argumentApplicationParser.get("esBatchWriteRetryCount");
        log.info("esBatchWriteRetryCount: {}", str4);
        String str5 = argumentApplicationParser.get("esBatchWriteRetryWait");
        log.info("esBatchWriteRetryWait: {}", str5);
        String str6 = argumentApplicationParser.get("esBatchSizeEntries");
        log.info("esBatchSizeEntries: {}", str6);
        String str7 = argumentApplicationParser.get("esNodesWanOnly");
        log.info("esNodesWanOnly: {}", str7);
        int i = NumberUtils.toInt(argumentApplicationParser.get("maxEventsForTopic"));
        log.info("maxEventsForTopic: {}", Integer.valueOf(i));
        String str8 = argumentApplicationParser.get("brokerApiBaseUrl");
        log.info("brokerApiBaseUrl: {}", str8);
        SparkSession orCreate = SparkSession.builder().config(sparkConf).getOrCreate();
        TypedColumn column = new EventSubsetAggregator(i).toColumn();
        LongAccumulator longAccumulator = orCreate.sparkContext().longAccumulator("total_indexed");
        long time = new Date().getTime();
        JavaRDD javaRDD = ClusterUtils.readPath(orCreate, str, Event.class).groupByKey(event -> {
            return event.getTopic() + '@' + event.getMap().getTargetDatasourceId();
        }, Encoders.STRING()).agg(column).map(tuple2 -> {
            return (EventGroup) tuple2._2;
        }, Encoders.bean(EventGroup.class)).flatMap(eventGroup -> {
            return eventGroup.getData().iterator();
        }, Encoders.bean(Event.class)).map(event2 -> {
            return prepareEventForIndexing(event2, time, longAccumulator);
        }, Encoders.STRING()).javaRDD();
        HashMap hashMap = new HashMap();
        hashMap.put("es.index.auto.create", "false");
        hashMap.put("es.nodes", str3);
        hashMap.put("es.mapping.id", "eventId");
        hashMap.put("es.batch.write.retry.count", str4);
        hashMap.put("es.batch.write.retry.wait", str5);
        hashMap.put("es.batch.size.entries", str6);
        hashMap.put("es.nodes.wan.only", str7);
        log.info("*** Start indexing");
        JavaEsSpark.saveJsonToEs(javaRDD, str2, hashMap);
        log.info("*** End indexing");
        log.info("*** Deleting old events");
        log.info("*** Deleted events: {}", deleteOldEvents(str8, time - 1000));
    }

    private static String deleteOldEvents(String str, long j) throws IOException {
        HttpDelete httpDelete = new HttpDelete(str + "/api/events/byCreationDate/0/" + j);
        CloseableHttpClient createDefault = HttpClients.createDefault();
        Throwable th = null;
        try {
            CloseableHttpResponse execute = createDefault.execute(httpDelete);
            Throwable th2 = null;
            try {
                try {
                    String iOUtils = IOUtils.toString(execute.getEntity().getContent());
                    if (execute != null) {
                        if (0 != 0) {
                            try {
                                execute.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            execute.close();
                        }
                    }
                    return iOUtils;
                } finally {
                }
            } catch (Throwable th4) {
                if (execute != null) {
                    if (th2 != null) {
                        try {
                            execute.close();
                        } catch (Throwable th5) {
                            th2.addSuppressed(th5);
                        }
                    } else {
                        execute.close();
                    }
                }
                throw th4;
            }
        } finally {
            if (createDefault != null) {
                if (0 != 0) {
                    try {
                        createDefault.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    createDefault.close();
                }
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static String prepareEventForIndexing(Event event, long j, LongAccumulator longAccumulator) throws JsonProcessingException {
        longAccumulator.add(1L);
        event.setCreationDate(Long.valueOf(j));
        event.setExpiryDate(Long.MAX_VALUE);
        return new ObjectMapper().writeValueAsString(event);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1352635735:
                if (implMethodName.equals("lambda$main$9f48ce61$1")) {
                    z = 2;
                    break;
                }
                break;
            case -1098280594:
                if (implMethodName.equals("lambda$main$6c72501a$1")) {
                    z = true;
                    break;
                }
                break;
            case -1098280593:
                if (implMethodName.equals("lambda$main$6c72501a$2")) {
                    z = false;
                    break;
                }
                break;
            case 762617275:
                if (implMethodName.equals("lambda$main$e8c57266$1")) {
                    z = 3;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("eu/dnetlib/dhp/broker/oa/IndexEventSubsetJob") && serializedLambda.getImplMethodSignature().equals("(Lscala/Tuple2;)Leu/dnetlib/dhp/broker/oa/util/EventGroup;")) {
                    return tuple2 -> {
                        return (EventGroup) tuple2._2;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("eu/dnetlib/dhp/broker/oa/IndexEventSubsetJob") && serializedLambda.getImplMethodSignature().equals("(Leu/dnetlib/dhp/broker/model/Event;)Ljava/lang/String;")) {
                    return event -> {
                        return event.getTopic() + '@' + event.getMap().getTargetDatasourceId();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/FlatMapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/util/Iterator;") && serializedLambda.getImplClass().equals("eu/dnetlib/dhp/broker/oa/IndexEventSubsetJob") && serializedLambda.getImplMethodSignature().equals("(Leu/dnetlib/dhp/broker/oa/util/EventGroup;)Ljava/util/Iterator;")) {
                    return eventGroup -> {
                        return eventGroup.getData().iterator();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("eu/dnetlib/dhp/broker/oa/IndexEventSubsetJob") && serializedLambda.getImplMethodSignature().equals("(JLorg/apache/spark/util/LongAccumulator;Leu/dnetlib/dhp/broker/model/Event;)Ljava/lang/String;")) {
                    long longValue = ((Long) serializedLambda.getCapturedArg(0)).longValue();
                    LongAccumulator longAccumulator = (LongAccumulator) serializedLambda.getCapturedArg(1);
                    return event2 -> {
                        return prepareEventForIndexing(event2, longValue, longAccumulator);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
