package eu.dnetlib.dhp.sx.bio.ebi;

import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.collection.CollectionUtils$;
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.sx.bio.pubmed.PMArticle;
import eu.dnetlib.dhp.sx.bio.pubmed.PMAuthor;
import eu.dnetlib.dhp.sx.bio.pubmed.PMJournal;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import java.io.InputStream;
import java.nio.charset.Charset;
import javax.xml.stream.XMLInputFactory;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.CloseableHttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.spark.SparkConf;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders$;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.SparkSession$;
import org.apache.spark.sql.expressions.Aggregator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Predef$;
import scala.StringContext;
import scala.Tuple2;
import scala.collection.immutable.List;
import scala.collection.immutable.StringOps;
import scala.reflect.ClassTag$;
import scala.runtime.BoxesRunTime;

/* compiled from: SparkCreateBaselineDataFrame.scala */
/* loaded from: input_file:eu/dnetlib/dhp/sx/bio/ebi/SparkCreateBaselineDataFrame$.class */
public final class SparkCreateBaselineDataFrame$ {
    public static final SparkCreateBaselineDataFrame$ MODULE$ = null;
    private final Aggregator<Tuple2<String, PMArticle>, PMArticle, PMArticle> pmArticleAggregator;

    static {
        new SparkCreateBaselineDataFrame$();
    }

    public List<Tuple2<String, String>> requestBaseLineUpdatePage(String str) {
        return new StringOps(Predef$.MODULE$.augmentString(requestPage("https://ftp.ncbi.nlm.nih.gov/pubmed/updatefiles/"))).linesWithSeparators().map(new SparkCreateBaselineDataFrame$$anonfun$1()).filter(new SparkCreateBaselineDataFrame$$anonfun$2()).map(new SparkCreateBaselineDataFrame$$anonfun$3()).filter(new SparkCreateBaselineDataFrame$$anonfun$4()).filter(new SparkCreateBaselineDataFrame$$anonfun$5(str)).map(new SparkCreateBaselineDataFrame$$anonfun$6()).toList();
    }

    public InputStream downloadBaselinePart(String str) {
        CloseableHttpResponse execute = HttpClientBuilder.create().setDefaultRequestConfig(RequestConfig.custom().setConnectTimeout(60 * 1000).setConnectionRequestTimeout(60 * 1000).setSocketTimeout(60 * 1000).build()).build().execute((HttpUriRequest) new HttpGet(str));
        Predef$.MODULE$.println(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"get response with status", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(execute.getStatusLine().getStatusCode())})));
        return execute.getEntity().getContent();
    }

    public String requestPage(String str) {
        CloseableHttpResponse execute;
        HttpGet httpGet = new HttpGet(str);
        CloseableHttpClient build = HttpClientBuilder.create().setDefaultRequestConfig(RequestConfig.custom().setConnectTimeout(60 * 1000).setConnectionRequestTimeout(60 * 1000).setSocketTimeout(60 * 1000).build()).build();
        int i = 4;
        while (i > 0) {
            try {
                Predef$.MODULE$.println(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"requesting ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{httpGet.getURI()})));
                try {
                    execute = build.execute((HttpUriRequest) httpGet);
                    Predef$.MODULE$.println(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"get response with status", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{BoxesRunTime.boxToInteger(execute.getStatusLine().getStatusCode())})));
                } catch (Throwable th) {
                    Predef$.MODULE$.println(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"Error on requesting ", ""})).s(Predef$.MODULE$.genericWrapArray(new Object[]{httpGet.getURI()})));
                    th.printStackTrace();
                    i--;
                }
                if (execute.getStatusLine().getStatusCode() <= 400) {
                    return IOUtils.toString(execute.getEntity().getContent(), Charset.defaultCharset());
                }
                i--;
            } finally {
                if (build != null) {
                    build.close();
                }
            }
        }
        if (build != null) {
            build.close();
        }
        return "";
    }

    public void downloadBaseLineUpdate(String str, String str2) {
        Configuration configuration = new Configuration();
        configuration.set("fs.defaultFS", str2);
        FileSystem fileSystem = FileSystem.get(configuration);
        RemoteIterator listFiles = fileSystem.listFiles(new Path(str), false);
        String str3 = "";
        while (listFiles.hasNext()) {
            String path = ((LocatedFileStatus) listFiles.next()).getPath().toString();
            String substring = path.substring(path.lastIndexOf("/") + 1);
            if (new StringOps(Predef$.MODULE$.augmentString(substring)).$greater(str3)) {
                str3 = substring;
            }
        }
        requestBaseLineUpdatePage(str3).foreach(new SparkCreateBaselineDataFrame$$anonfun$downloadBaseLineUpdate$1(str, fileSystem));
    }

    public Aggregator<Tuple2<String, PMArticle>, PMArticle, PMArticle> pmArticleAggregator() {
        return this.pmArticleAggregator;
    }

    public void main(String[] strArr) {
        SparkConf sparkConf = new SparkConf();
        Logger logger = LoggerFactory.getLogger(getClass());
        ArgumentApplicationParser argumentApplicationParser = new ArgumentApplicationParser(IOUtils.toString(SparkEBILinksToOaf$.MODULE$.getClass().getResourceAsStream("/eu/dnetlib/dhp/sx/bio/ebi/baseline_to_oaf_params.json"), Charset.defaultCharset()));
        argumentApplicationParser.parseArgument(strArr);
        String str = argumentApplicationParser.get("isLookupUrl");
        logger.info("isLookupUrl: {}", new Object[]{str});
        String str2 = argumentApplicationParser.get("workingPath");
        logger.info("workingPath: {}", new Object[]{str2});
        String str3 = argumentApplicationParser.get("targetPath");
        logger.info("targetPath: {}", new Object[]{str3});
        String str4 = argumentApplicationParser.get("hdfsServerUri");
        logger.info("hdfsServerUri: {}", new Object[]{str3});
        String str5 = argumentApplicationParser.get("skipUpdate");
        logger.info("skipUpdate: {}", new Object[]{str5});
        VocabularyGroup loadVocsFromIS = VocabularyGroup.loadVocsFromIS(ISLookupClientFactory.getLookUpService(str));
        SparkSession orCreate = SparkSession$.MODULE$.builder().config(sparkConf).appName(SparkEBILinksToOaf$.MODULE$.getClass().getSimpleName()).master(argumentApplicationParser.get("master")).getOrCreate();
        SparkContext sparkContext = orCreate.sparkContext();
        Encoder kryo = Encoders$.MODULE$.kryo(PMArticle.class);
        Encoders$.MODULE$.kryo(PMJournal.class);
        Encoders$.MODULE$.kryo(PMAuthor.class);
        Encoder kryo2 = Encoders$.MODULE$.kryo(Oaf.class);
        if (!"true".equalsIgnoreCase(str5)) {
            downloadBaseLineUpdate(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"", "/baseline"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{str2})), str4);
            orCreate.createDataset(sparkContext.wholeTextFiles(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"", "/baseline"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{str2})), 2000).filter(new SparkCreateBaselineDataFrame$$anonfun$7()).flatMap(new SparkCreateBaselineDataFrame$$anonfun$8(XMLInputFactory.newInstance()), ClassTag$.MODULE$.apply(PMArticle.class)), kryo).map(new SparkCreateBaselineDataFrame$$anonfun$main$1(), Encoders$.MODULE$.tuple(Encoders$.MODULE$.STRING(), kryo)).groupByKey(new SparkCreateBaselineDataFrame$$anonfun$main$2(), orCreate.implicits().newStringEncoder()).agg(pmArticleAggregator().toColumn()).map(new SparkCreateBaselineDataFrame$$anonfun$main$3(), kryo).write().mode(SaveMode.Overwrite).save(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"", "/baseline_dataset"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{str2})));
        }
        CollectionUtils$.MODULE$.saveDataset(orCreate.read().load(new StringContext(Predef$.MODULE$.wrapRefArray(new String[]{"", "/baseline_dataset"})).s(Predef$.MODULE$.genericWrapArray(new Object[]{str2}))).as(kryo).map(new SparkCreateBaselineDataFrame$$anonfun$main$4(loadVocsFromIS), kryo2).as(kryo2).filter(new SparkCreateBaselineDataFrame$$anonfun$main$5()), str3);
    }

    private SparkCreateBaselineDataFrame$() {
        MODULE$ = this;
        this.pmArticleAggregator = new Aggregator<Tuple2<String, PMArticle>, PMArticle, PMArticle>() { // from class: eu.dnetlib.dhp.sx.bio.ebi.SparkCreateBaselineDataFrame$$anon$1
            /* renamed from: zero, reason: merged with bridge method [inline-methods] */
            public PMArticle m2472zero() {
                return new PMArticle();
            }

            public PMArticle reduce(PMArticle pMArticle, Tuple2<String, PMArticle> tuple2) {
                return (pMArticle == null || pMArticle.getPmid() == null) ? tuple2.mo9804_2() : pMArticle;
            }

            public PMArticle merge(PMArticle pMArticle, PMArticle pMArticle2) {
                return (pMArticle == null || pMArticle.getPmid() == null) ? pMArticle2 : pMArticle;
            }

            public PMArticle finish(PMArticle pMArticle) {
                return pMArticle;
            }

            public Encoder<PMArticle> bufferEncoder() {
                return Encoders$.MODULE$.kryo(ClassTag$.MODULE$.apply(PMArticle.class));
            }

            public Encoder<PMArticle> outputEncoder() {
                return Encoders$.MODULE$.kryo(ClassTag$.MODULE$.apply(PMArticle.class));
            }
        };
    }
}
