package eu.dnetlib.dhp.collection;

import eu.dnetlib.data.mdstore.manager.common.model.MDStoreVersion;
import eu.dnetlib.dhp.aggregation.common.ReportingJob;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.SparkSessionSupport;
import eu.dnetlib.dhp.schema.mdstore.MetadataRecord;
import eu.dnetlib.dhp.schema.mdstore.Provenance;
import eu.dnetlib.dhp.utils.DHPUtils;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.nio.charset.StandardCharsets;
import java.util.Objects;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.expressions.Aggregator;
import org.apache.spark.util.LongAccumulator;
import org.dom4j.Document;
import org.dom4j.io.SAXReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJob.class */
public class GenerateNativeStoreSparkJob {
    private static final Logger log = LoggerFactory.getLogger(GenerateNativeStoreSparkJob.class);

    /* loaded from: input_file:eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJob$MDStoreAggregator.class */
    public static class MDStoreAggregator extends Aggregator<MetadataRecord, MetadataRecord, MetadataRecord> {
        /* renamed from: zero, reason: merged with bridge method [inline-methods] */
        public MetadataRecord m48zero() {
            return null;
        }

        public MetadataRecord reduce(MetadataRecord metadataRecord, MetadataRecord metadataRecord2) {
            return getLatestRecord(metadataRecord, metadataRecord2);
        }

        public MetadataRecord merge(MetadataRecord metadataRecord, MetadataRecord metadataRecord2) {
            return getLatestRecord(metadataRecord, metadataRecord2);
        }

        private MetadataRecord getLatestRecord(MetadataRecord metadataRecord, MetadataRecord metadataRecord2) {
            if (metadataRecord == null) {
                return metadataRecord2;
            }
            if (metadataRecord2 != null && metadataRecord2.getDateOfCollection().longValue() > metadataRecord.getDateOfCollection().longValue()) {
                return metadataRecord2;
            }
            return metadataRecord;
        }

        public MetadataRecord finish(MetadataRecord metadataRecord) {
            return metadataRecord;
        }

        public Encoder<MetadataRecord> bufferEncoder() {
            return Encoders.bean(MetadataRecord.class);
        }

        public Encoder<MetadataRecord> outputEncoder() {
            return Encoders.bean(MetadataRecord.class);
        }
    }

    public static void main(String[] strArr) throws Exception {
        ArgumentApplicationParser argumentApplicationParser = new ArgumentApplicationParser(IOUtils.toString(GenerateNativeStoreSparkJob.class.getResourceAsStream("/eu/dnetlib/dhp/collection/generate_native_input_parameters.json")));
        argumentApplicationParser.parseArgument(strArr);
        String str = argumentApplicationParser.get("provenance");
        log.info("Provenance is {}", str);
        Provenance provenance = (Provenance) DHPUtils.MAPPER.readValue(str, Provenance.class);
        String str2 = argumentApplicationParser.get("dateOfCollection");
        log.info("dateOfCollection is {}", str2);
        Long l = new Long(str2);
        String str3 = argumentApplicationParser.get("mdStoreVersion");
        log.info("mdStoreVersion is {}", str3);
        MDStoreVersion mDStoreVersion = (MDStoreVersion) DHPUtils.MAPPER.readValue(str3, MDStoreVersion.class);
        String str4 = argumentApplicationParser.get("readMdStoreVersion");
        log.info("readMdStoreVersion is {}", str4);
        MDStoreVersion mDStoreVersion2 = StringUtils.isBlank(str4) ? null : (MDStoreVersion) DHPUtils.MAPPER.readValue(str4, MDStoreVersion.class);
        String str5 = argumentApplicationParser.get("xpath");
        log.info("xpath is {}", str5);
        String str6 = argumentApplicationParser.get("encoding");
        log.info("encoding is {}", str6);
        Boolean bool = (Boolean) Optional.ofNullable(argumentApplicationParser.get("isSparkSessionManaged")).map(Boolean::valueOf).orElse(Boolean.TRUE);
        log.info("isSparkSessionManaged: {}", bool);
        SparkSessionSupport.runWithSparkSession(new SparkConf(), bool, sparkSession -> {
            createNativeMDStore(sparkSession, provenance, l, str5, str6, mDStoreVersion, mDStoreVersion2);
        });
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void createNativeMDStore(SparkSession sparkSession, Provenance provenance, Long l, String str, String str2, MDStoreVersion mDStoreVersion, MDStoreVersion mDStoreVersion2) throws IOException {
        JavaSparkContext fromSparkContext = JavaSparkContext.fromSparkContext(sparkSession.sparkContext());
        LongAccumulator longAccumulator = fromSparkContext.sc().longAccumulator("TotalItems");
        LongAccumulator longAccumulator2 = fromSparkContext.sc().longAccumulator("InvalidRecords");
        JavaRDD distinct = fromSparkContext.sequenceFile(mDStoreVersion.getHdfsPath() + "/sequence_file", IntWritable.class, Text.class).map(tuple2 -> {
            return parseRecord(((Text) tuple2._2()).toString(), str, str2, provenance, l, longAccumulator, longAccumulator2);
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        }).distinct();
        Encoder bean = Encoders.bean(MetadataRecord.class);
        Dataset createDataset = sparkSession.createDataset(distinct.rdd(), bean);
        String str3 = mDStoreVersion.getHdfsPath() + "/store";
        if (mDStoreVersion2 != null) {
            log.info("updating {} incrementally with {}", str3, mDStoreVersion2.getHdfsPath());
            Dataset map = sparkSession.read().load(mDStoreVersion2.getHdfsPath() + "/store").as(bean).union(createDataset).groupByKey((v0) -> {
                return v0.getId();
            }, Encoders.STRING()).agg(new MDStoreAggregator().toColumn()).map((v0) -> {
                return v0._2();
            }, bean);
            map.select("id", new String[0]).takeAsList(100).forEach(row -> {
                log.info(row.toString());
            });
            DHPUtils.saveDataset(map, str3);
        } else {
            DHPUtils.saveDataset(createDataset, str3);
        }
        Long valueOf = Long.valueOf(sparkSession.read().load(str3).count());
        log.info("collected {} records for datasource '{}'", valueOf, provenance.getDatasourceName());
        DHPUtils.writeHdfsFile(sparkSession.sparkContext().hadoopConfiguration(), valueOf.toString(), mDStoreVersion.getHdfsPath() + "/size");
    }

    public static MetadataRecord parseRecord(String str, String str2, String str3, Provenance provenance, Long l, LongAccumulator longAccumulator, LongAccumulator longAccumulator2) {
        if (longAccumulator != null) {
            longAccumulator.add(1L);
        }
        try {
            Document read = new SAXReader().read(new ByteArrayInputStream(str.getBytes(StandardCharsets.UTF_8)));
            String text = read.selectSingleNode(str2).getText();
            if (!StringUtils.isBlank(text)) {
                return new MetadataRecord(text, str3, provenance, read.asXML(), l);
            }
            if (longAccumulator2 == null) {
                return null;
            }
            longAccumulator2.add(1L);
            return null;
        } catch (Throwable th) {
            longAccumulator2.add(1L);
            return null;
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 2995:
                if (implMethodName.equals("_2")) {
                    z = false;
                    break;
                }
                break;
            case 98245393:
                if (implMethodName.equals("getId")) {
                    z = true;
                    break;
                }
                break;
            case 582568470:
                if (implMethodName.equals("lambda$createNativeMDStore$26bdb759$1")) {
                    z = 3;
                    break;
                }
                break;
            case 2123019764:
                if (implMethodName.equals("nonNull")) {
                    z = 2;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("scala/Tuple2") && serializedLambda.getImplMethodSignature().equals("()Ljava/lang/Object;")) {
                    return (v0) -> {
                        return v0._2();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("eu/dnetlib/dhp/schema/mdstore/MetadataRecord") && serializedLambda.getImplMethodSignature().equals("()Ljava/lang/String;")) {
                    return (v0) -> {
                        return v0.getId();
                    };
                }
                break;
            case ReportingJob.INITIAL_DELAY /* 2 */:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("java/util/Objects") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;)Z")) {
                    return (v0) -> {
                        return Objects.nonNull(v0);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJob") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;Ljava/lang/String;Leu/dnetlib/dhp/schema/mdstore/Provenance;Ljava/lang/Long;Lorg/apache/spark/util/LongAccumulator;Lorg/apache/spark/util/LongAccumulator;Lscala/Tuple2;)Leu/dnetlib/dhp/schema/mdstore/MetadataRecord;")) {
                    String str = (String) serializedLambda.getCapturedArg(0);
                    String str2 = (String) serializedLambda.getCapturedArg(1);
                    Provenance provenance = (Provenance) serializedLambda.getCapturedArg(2);
                    Long l = (Long) serializedLambda.getCapturedArg(3);
                    LongAccumulator longAccumulator = (LongAccumulator) serializedLambda.getCapturedArg(4);
                    LongAccumulator longAccumulator2 = (LongAccumulator) serializedLambda.getCapturedArg(5);
                    return tuple2 -> {
                        return parseRecord(((Text) tuple2._2()).toString(), str, str2, provenance, l, longAccumulator, longAccumulator2);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
