/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.oa.provision;

import com.lucidworks.spark.util.SolrSupport;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.SparkSessionSupport;
import eu.dnetlib.dhp.oa.provision.ProvisionConstants;
import eu.dnetlib.dhp.oa.provision.model.SerializableSolrInputDocument;
import eu.dnetlib.dhp.oa.provision.utils.ISLookupClient;
import eu.dnetlib.dhp.oa.provision.utils.StreamingInputDocumentFactory;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.dhp.utils.saxon.SaxonTransformerFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import java.io.InputStream;
import java.io.Serializable;
import java.io.StringReader;
import java.io.StringWriter;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Optional;
import javax.xml.transform.Transformer;
import javax.xml.transform.TransformerException;
import javax.xml.transform.stream.StreamResult;
import javax.xml.transform.stream.StreamSource;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.io.Text;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.rdd.RDD;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class XmlIndexingJob {
    private static final Logger log = LoggerFactory.getLogger(XmlIndexingJob.class);
    private static final Integer DEFAULT_BATCH_SIZE = 1000;
    protected static final String DATE_FORMAT = "yyyy-MM-dd'T'hh:mm:ss'Z'";
    private final String inputPath;
    private final String format;
    private final int batchSize;
    private final OutputFormat outputFormat;
    private final String outputPath;
    private final SparkSession spark;

    public static void main(String[] args) throws Exception {
        ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString((InputStream)XmlIndexingJob.class.getResourceAsStream("/eu/dnetlib/dhp/oa/provision/input_params_update_index.json")));
        parser.parseArgument(args);
        Boolean isSparkSessionManaged = Optional.ofNullable(parser.get("isSparkSessionManaged")).map(Boolean::valueOf).orElse(Boolean.TRUE);
        log.info("isSparkSessionManaged: {}", (Object)isSparkSessionManaged);
        String inputPath = parser.get("inputPath");
        log.info("inputPath: {}", (Object)inputPath);
        String format = parser.get("format");
        log.info("format: {}", (Object)format);
        String outputPath = Optional.ofNullable(parser.get("outputPath")).map(StringUtils::trim).orElse(null);
        log.info("outputPath: {}", (Object)outputPath);
        Integer batchSize = Optional.ofNullable(parser.get("batchSize")).map(Integer::valueOf).orElse(DEFAULT_BATCH_SIZE);
        log.info("batchSize: {}", (Object)batchSize);
        OutputFormat outputFormat = Optional.ofNullable(parser.get("outputFormat")).map(OutputFormat::valueOf).orElse(OutputFormat.SOLR);
        log.info("outputFormat: {}", (Object)outputFormat);
        SparkConf conf = new SparkConf();
        conf.registerKryoClasses(new Class[]{SerializableSolrInputDocument.class});
        SparkSessionSupport.runWithSparkSession((SparkConf)conf, (Boolean)isSparkSessionManaged, spark -> {
            String isLookupUrl = parser.get("isLookupUrl");
            log.info("isLookupUrl: {}", (Object)isLookupUrl);
            ISLookupClient isLookup = new ISLookupClient(ISLookupClientFactory.getLookUpService((String)isLookupUrl));
            new XmlIndexingJob((SparkSession)spark, inputPath, format, batchSize, outputFormat, outputPath).run(isLookup);
        });
    }

    public XmlIndexingJob(SparkSession spark, String inputPath, String format, Integer batchSize, OutputFormat outputFormat, String outputPath) {
        this.spark = spark;
        this.inputPath = inputPath;
        this.format = format;
        this.batchSize = batchSize;
        this.outputFormat = outputFormat;
        this.outputPath = outputPath;
    }

    public void run(ISLookupClient isLookup) throws ISLookUpException, TransformerException {
        String fields = isLookup.getLayoutSource(this.format);
        log.info("fields: {}", (Object)fields);
        String xslt = isLookup.getLayoutTransformer();
        String dsId = isLookup.getDsId(this.format);
        log.info("dsId: {}", (Object)dsId);
        String zkHost = isLookup.getZkHost();
        log.info("zkHost: {}", (Object)zkHost);
        String version = XmlIndexingJob.getRecordDatestamp();
        String indexRecordXslt = XmlIndexingJob.getLayoutTransformer(this.format, fields, xslt);
        log.info("indexRecordTransformer {}", (Object)indexRecordXslt);
        JavaSparkContext sc = JavaSparkContext.fromSparkContext((SparkContext)this.spark.sparkContext());
        JavaRDD docs = sc.sequenceFile(this.inputPath, Text.class, Text.class).map((Function & Serializable)t -> ((Text)t._2()).toString()).map((Function & Serializable)s -> XmlIndexingJob.toIndexRecord(SaxonTransformerFactory.newInstance((String)indexRecordXslt), s)).map((Function & Serializable)s -> new StreamingInputDocumentFactory().parseDocument((String)s));
        switch (this.outputFormat) {
            case SOLR: {
                String collection = ProvisionConstants.getCollectionName(this.format);
                SolrSupport.indexDocs((String)zkHost, (String)collection, (int)this.batchSize, (RDD)docs.rdd());
                break;
            }
            case HDFS: {
                this.spark.createDataset(docs.map(SerializableSolrInputDocument::new).rdd(), Encoders.kryo(SerializableSolrInputDocument.class)).write().mode(SaveMode.Overwrite).parquet(this.outputPath);
                break;
            }
            default: {
                throw new IllegalArgumentException("invalid outputFormat: " + (Object)((Object)this.outputFormat));
            }
        }
    }

    protected static String toIndexRecord(Transformer tr, String xmlRecord) {
        StreamResult res = new StreamResult(new StringWriter());
        try {
            tr.transform(new StreamSource(new StringReader(xmlRecord)), res);
            return res.getWriter().toString();
        }
        catch (TransformerException e) {
            throw new IllegalArgumentException("XPathException on record: \n" + xmlRecord, e);
        }
    }

    protected static String getLayoutTransformer(String format, String fields, String xslt) throws TransformerException {
        Transformer layoutTransformer = SaxonTransformerFactory.newInstance((String)xslt);
        StreamResult layoutToXsltXslt = new StreamResult(new StringWriter());
        layoutTransformer.setParameter("format", format);
        layoutTransformer.transform(new StreamSource(new StringReader(fields)), layoutToXsltXslt);
        return layoutToXsltXslt.getWriter().toString();
    }

    public static String getRecordDatestamp() {
        return new SimpleDateFormat(DATE_FORMAT).format(new Date());
    }

    public static enum OutputFormat {
        SOLR,
        HDFS;

    }
}

