/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.collection;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.collection.ApiDescriptor;
import eu.dnetlib.dhp.common.SparkSessionSupport;
import eu.dnetlib.dhp.schema.mdstore.MDStoreVersion;
import eu.dnetlib.dhp.schema.mdstore.MetadataRecord;
import eu.dnetlib.dhp.schema.mdstore.Provenance;
import eu.dnetlib.dhp.schema.mdstore.ValidationType;
import eu.dnetlib.dhp.utils.DHPUtils;
import eu.dnetlib.validator2.result_models.StandardValidationResult;
import eu.dnetlib.validator2.validation.guideline.openaire.AbstractOpenAireProfile;
import eu.dnetlib.validator2.validation.guideline.openaire.DataArchiveGuidelinesV2Profile;
import eu.dnetlib.validator2.validation.guideline.openaire.FAIR_Data_GuidelinesProfile;
import eu.dnetlib.validator2.validation.guideline.openaire.FAIR_Literature_GuidelinesV4Profile;
import eu.dnetlib.validator2.validation.guideline.openaire.LiteratureGuidelinesV3Profile;
import eu.dnetlib.validator2.validation.guideline.openaire.LiteratureGuidelinesV4Profile;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.TypedColumn;
import org.apache.spark.sql.expressions.Aggregator;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.util.LongAccumulator;
import org.dom4j.Document;
import org.dom4j.Node;
import org.dom4j.io.SAXReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xml.sax.SAXException;
import scala.Tuple2;

public class GenerateNativeStoreSparkJob {
    private static final Logger log = LoggerFactory.getLogger(GenerateNativeStoreSparkJob.class);
    private static final ObjectMapper MAPPER = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);
    public static final String VALIDATION_RESULTS_FIELD = "validationResults";

    public static void main(String[] args) throws Exception {
        ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString((InputStream)GenerateNativeStoreSparkJob.class.getResourceAsStream("/eu/dnetlib/dhp/collection/generate_native_input_parameters.json")));
        parser.parseArgument(args);
        String provenanceArgument = parser.get("provenance");
        log.info("provenance: {}", (Object)provenanceArgument);
        Provenance provenance = (Provenance)MAPPER.readValue(provenanceArgument, Provenance.class);
        String apiDescriptor = parser.get("apidescriptor");
        log.info("apidescriptor: {}", (Object)apiDescriptor);
        ApiDescriptor api = (ApiDescriptor)MAPPER.readValue(apiDescriptor, ApiDescriptor.class);
        String dateOfCollectionArgs = parser.get("dateOfCollection");
        log.info("dateOfCollection: {}", (Object)dateOfCollectionArgs);
        Long dateOfCollection = Long.valueOf(dateOfCollectionArgs);
        String mdStoreVersion = parser.get("mdStoreVersion");
        log.info("mdStoreVersion: {}", (Object)mdStoreVersion);
        MDStoreVersion currentVersion = (MDStoreVersion)MAPPER.readValue(mdStoreVersion, MDStoreVersion.class);
        String readMdStoreVersionParam = parser.get("readMdStoreVersion");
        log.info("readMdStoreVersion: {}", (Object)readMdStoreVersionParam);
        MDStoreVersion readMdStoreVersion = StringUtils.isBlank((CharSequence)readMdStoreVersionParam) ? null : (MDStoreVersion)MAPPER.readValue(readMdStoreVersionParam, MDStoreVersion.class);
        String xpath = parser.get("xpath");
        log.info("xpath: {}", (Object)xpath);
        String encoding = parser.get("encoding");
        log.info("encoding: {}", (Object)encoding);
        Boolean isSparkSessionManaged = Optional.ofNullable(parser.get("isSparkSessionManaged")).map(Boolean::valueOf).orElse(Boolean.TRUE);
        log.info("isSparkSessionManaged: {}", (Object)isSparkSessionManaged);
        boolean runValidation = Optional.ofNullable(parser.get("runValidation")).map(Boolean::valueOf).orElse(Boolean.FALSE);
        log.info("runValidation: {}", (Object)runValidation);
        SparkConf conf = new SparkConf();
        Map<ValidationType, AbstractOpenAireProfile> validators = GenerateNativeStoreSparkJob.getValidationTypes(api.getCompatibilityLevel(), runValidation);
        SparkSessionSupport.runWithSparkSession((SparkConf)conf, (Boolean)isSparkSessionManaged, spark -> GenerateNativeStoreSparkJob.createNativeMDStore(spark, provenance, dateOfCollection, xpath, encoding, validators, currentVersion, readMdStoreVersion));
    }

    private static Map<ValidationType, AbstractOpenAireProfile> getValidationTypes(String compatibilityLevel, boolean runValidation) {
        LinkedHashMap<ValidationType, AbstractOpenAireProfile> res = new LinkedHashMap<ValidationType, AbstractOpenAireProfile>();
        if (!runValidation) {
            log.info("Skipping validation, returning empty validators map");
            return res;
        }
        switch (compatibilityLevel) {
            case "openaire2.0": {
                res.put(ValidationType.openaire2_0, (AbstractOpenAireProfile)new DataArchiveGuidelinesV2Profile());
                break;
            }
            case "openaire3.0": {
                res.put(ValidationType.openaire3_0, (AbstractOpenAireProfile)new LiteratureGuidelinesV3Profile());
                break;
            }
            case "openaire4.0": {
                res.put(ValidationType.openaire4_0, (AbstractOpenAireProfile)new LiteratureGuidelinesV4Profile());
                res.put(ValidationType.fair_literature_v4, (AbstractOpenAireProfile)new FAIR_Literature_GuidelinesV4Profile());
                break;
            }
            case "openaire2.0_data": {
                res.put(ValidationType.fair_data, (AbstractOpenAireProfile)new FAIR_Data_GuidelinesProfile());
                break;
            }
            default: {
                log.info(String.format("Skipping validation for compatibility: %s", compatibilityLevel));
                return res;
            }
        }
        return res;
    }

    private static void createNativeMDStore(SparkSession spark, Provenance provenance, Long dateOfCollection, String xpath, String encoding, Map<ValidationType, AbstractOpenAireProfile> validators, MDStoreVersion currentVersion, MDStoreVersion readVersion) throws IOException {
        Dataset toSaveRecords;
        JavaSparkContext sc = JavaSparkContext.fromSparkContext((SparkContext)spark.sparkContext());
        LongAccumulator totalItems = sc.sc().longAccumulator("TotalItems");
        LongAccumulator invalidRecords = sc.sc().longAccumulator("InvalidRecords");
        String seqFilePath = currentVersion.getHdfsPath() + "/sequence_file";
        JavaRDD nativeStore = sc.sequenceFile(seqFilePath, IntWritable.class, Text.class).map((Function & Serializable)item -> GenerateNativeStoreSparkJob.parseRecord(((Text)item._2()).toString(), xpath, encoding, provenance, dateOfCollection, totalItems, invalidRecords)).filter(Objects::nonNull).distinct();
        Encoder encoder = Encoders.bean(MetadataRecord.class);
        Dataset newRecords = spark.createDataset(nativeStore.rdd(), encoder);
        String targetPath = currentVersion.getHdfsPath() + "/store";
        if (readVersion != null) {
            log.info("updating {} incrementally with {}", (Object)targetPath, (Object)readVersion.getHdfsPath());
            DataType dataType = Arrays.stream(newRecords.schema().fields()).filter(f -> VALIDATION_RESULTS_FIELD.equals(f.name())).map(StructField::dataType).findFirst().orElseThrow(() -> new RuntimeException("Missing validationResults field in new schema"));
            Dataset oldRows = spark.read().load(readVersion.getHdfsPath() + "/store");
            Dataset oldRowsWithNewField = ArrayUtils.contains((Object[])oldRows.schema().fieldNames(), (Object)VALIDATION_RESULTS_FIELD) ? oldRows : oldRows.withColumn(VALIDATION_RESULTS_FIELD, functions.lit(null).cast(dataType));
            Dataset oldRecords = oldRowsWithNewField.as(encoder);
            TypedColumn aggregator = new MDStoreAggregator().toColumn();
            toSaveRecords = oldRecords.union(newRecords).groupByKey(MetadataRecord::getId, Encoders.STRING()).agg(aggregator).map(Tuple2::_2, encoder);
        } else {
            toSaveRecords = newRecords;
        }
        Dataset validatedRecords = validators == null || validators.isEmpty() ? toSaveRecords : toSaveRecords.map((MapFunction & Serializable)mdr -> GenerateNativeStoreSparkJob.addValidationReports(mdr, validators), encoder);
        DHPUtils.saveDataset((Dataset)validatedRecords, (String)targetPath);
        Long total = spark.read().load(targetPath).count();
        log.info("collected {} records for datasource '{}'", (Object)total, (Object)provenance.getDatasourceName());
        DHPUtils.writeHdfsFile((Configuration)spark.sparkContext().hadoopConfiguration(), (String)total.toString(), (String)(currentVersion.getHdfsPath() + "/size"));
    }

    public static MetadataRecord parseRecord(String input, String xpath, String encoding, Provenance provenance, Long dateOfCollection, LongAccumulator totalItems, LongAccumulator invalidRecords) {
        if (totalItems != null) {
            totalItems.add(1L);
        }
        try {
            SAXReader reader = new SAXReader();
            reader.setFeature("http://apache.org/xml/features/disallow-doctype-decl", true);
            Document document = reader.read((InputStream)new ByteArrayInputStream(input.getBytes(StandardCharsets.UTF_8)));
            Node node = document.selectSingleNode(xpath);
            String originalIdentifier = node.getText();
            if (StringUtils.isBlank((CharSequence)originalIdentifier)) {
                if (invalidRecords != null) {
                    invalidRecords.add(1L);
                }
                return null;
            }
            return new MetadataRecord(originalIdentifier, encoding, provenance, document.asXML(), dateOfCollection);
        }
        catch (Throwable e) {
            invalidRecords.add(1L);
            return null;
        }
    }

    public static MetadataRecord addValidationReports(MetadataRecord mdr, Map<ValidationType, AbstractOpenAireProfile> validators) throws ParserConfigurationException, IOException, SAXException {
        if (validators == null || validators.isEmpty()) {
            return mdr;
        }
        if (mdr.getValidationResults() == null) {
            mdr.setValidationResults(new LinkedHashMap());
        }
        try (ByteArrayInputStream is = new ByteArrayInputStream(mdr.getBody().getBytes(StandardCharsets.UTF_8));){
            org.w3c.dom.Document doc = DocumentBuilderFactory.newInstance().newDocumentBuilder().parse(is);
            validators.entrySet().forEach(e -> {
                ValidationType validationType = (ValidationType)e.getKey();
                AbstractOpenAireProfile profile = (AbstractOpenAireProfile)e.getValue();
                try {
                    StandardValidationResult report = profile.validate(mdr.getId(), doc);
                    LinkedHashMap<ValidationType, StandardValidationResult> validationResults = new LinkedHashMap<ValidationType, StandardValidationResult>(mdr.getValidationResults());
                    validationResults.put(validationType, report);
                    mdr.setValidationResults(validationResults);
                }
                catch (Exception e1) {
                    String errorMessage = "Error validating record id: " + mdr.getId() + " with profile: " + String.valueOf(validationType);
                    throw new RuntimeException(errorMessage, e1);
                }
            });
        }
        return mdr;
    }

    public static class MDStoreAggregator
    extends Aggregator<MetadataRecord, MetadataRecord, MetadataRecord> {
        private static final long serialVersionUID = -3409563083332613984L;

        public MetadataRecord zero() {
            return null;
        }

        public MetadataRecord reduce(MetadataRecord b, MetadataRecord a) {
            return this.getLatestRecord(b, a);
        }

        public MetadataRecord merge(MetadataRecord b, MetadataRecord a) {
            return this.getLatestRecord(b, a);
        }

        private MetadataRecord getLatestRecord(MetadataRecord b, MetadataRecord a) {
            if (b == null) {
                return a;
            }
            if (a == null) {
                return b;
            }
            return a.getDateOfCollection() > b.getDateOfCollection() ? a : b;
        }

        public MetadataRecord finish(MetadataRecord r) {
            return r;
        }

        public Encoder<MetadataRecord> bufferEncoder() {
            return Encoders.bean(MetadataRecord.class);
        }

        public Encoder<MetadataRecord> outputEncoder() {
            return Encoders.bean(MetadataRecord.class);
        }
    }
}

