/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.personprojectthroughdeliverable;

import eu.dnetlib.dhp.PropagationConstant;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.SparkSessionSupport;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Person;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory;
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
import java.io.InputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.JavaConverters;
import scala.collection.Seq;

public class SparkAuthorProjectRelationExtraction {
    private static final Logger log = LoggerFactory.getLogger(SparkAuthorProjectRelationExtraction.class);
    private static final String PERSON_PREFIX = ModelSupport.getIdPrefix(Person.class) + "|orcid_______";

    public static void main(String[] args) throws Exception {
        String jsonConfiguration = IOUtils.toString((InputStream)SparkAuthorProjectRelationExtraction.class.getResourceAsStream("/eu/dnetlib/dhp/wf/subworkflows/personprojectthroughdeliverable/input_personprojectpropagation_parameters.json"));
        ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
        parser.parseArgument(args);
        Boolean isSparkSessionManaged = PropagationConstant.isSparkSessionManaged(parser);
        log.info("isSparkSessionManaged: {}", (Object)isSparkSessionManaged);
        String sourcePath = parser.get("sourcePath");
        log.info("sourcePath: {}", (Object)sourcePath);
        String workingDir = parser.get("workingDir");
        log.info("workingPath: {}", (Object)workingDir);
        String classCodes = parser.get("classCodes");
        log.info("classCodes: {}", (Object)classCodes);
        SparkConf conf = new SparkConf();
        SparkSessionSupport.runWithSparkSession((SparkConf)conf, (Boolean)isSparkSessionManaged, spark -> SparkAuthorProjectRelationExtraction.exec(spark, sourcePath, workingDir, classCodes));
    }

    private static void exec(SparkSession spark, String sourcePath, String workingPath, String classCodes) {
        String[] classIds = classCodes.split(";");
        Dataset publications = spark.read().schema(Encoders.bean(Publication.class).schema()).json(sourcePath + "/publication");
        Dataset selectedResults = Arrays.stream(classIds).map(classid -> publications.filter(functions.array_contains((Column)functions.col((String)"instance.instancetype.classid"), (Object)classid)).select("id", new String[]{"author", "instance"})).reduce(Dataset::union).orElseGet(() -> ((SparkSession)spark).emptyDataFrame());
        Dataset relations = spark.read().schema(Encoders.bean(Relation.class).schema()).json(sourcePath + "/relation").filter("subRelType = 'outcome'").select("source", new String[]{"target"});
        selectedResults.joinWith(relations, selectedResults.col("id").equalTo((Object)relations.col("target"))).flatMap((FlatMapFunction & Serializable)t2 -> {
            Seq scalaSeq = (Seq)((Row)t2._1()).getAs("author");
            List authors = (List)JavaConverters.seqAsJavaListConverter((Seq)scalaSeq).asJava();
            ArrayList relationList = new ArrayList();
            authors.forEach(a -> {
                Seq scalaSeqPid = (Seq)a.getAs("pid");
                List pids = (List)JavaConverters.seqAsJavaListConverter((Seq)scalaSeqPid).asJava();
                if (Optional.ofNullable(pids).isPresent() && pids.stream().anyMatch(p -> {
                    Row qualifier = (Row)p.getAs("qualifier");
                    String classid = (String)qualifier.getAs("classid");
                    return classid.equalsIgnoreCase("orcid") || classid.equalsIgnoreCase("orcid_pending");
                })) {
                    relationList.add(SparkAuthorProjectRelationExtraction.getRelation(a, (String)((Row)t2._2()).getAs("source")));
                }
            });
            return relationList.iterator();
        }, Encoders.bean(Relation.class)).distinct().write().mode(SaveMode.Overwrite).option("compression", "gzip").json(workingPath + "/relation");
        spark.read().schema(Encoders.bean(Relation.class).schema()).json(workingPath + "/relation").write().mode(SaveMode.Append).option("compression", "gzip").json(sourcePath + "/relation");
    }

    private static Relation getRelation(Row a, String projectId) {
        Seq scalaSeqPid = (Seq)a.getAs("pid");
        List pids = (List)JavaConverters.seqAsJavaListConverter((Seq)scalaSeqPid).asJava();
        Optional<Row> authorPid = pids.stream().filter(pid -> {
            Row qualifier = (Row)pid.getAs("qualifier");
            String classid = (String)qualifier.getAs("classid");
            return classid.equalsIgnoreCase("orcid");
        }).findFirst();
        String orcid = null;
        orcid = authorPid.isPresent() ? (String)authorPid.get().getAs("value") : (String)pids.stream().filter(pid -> {
            Row qualifier = (Row)pid.getAs("qualifier");
            String classid = (String)qualifier.getAs("classid");
            return classid.equalsIgnoreCase("orcid_pending");
        }).findFirst().get().getAs("value");
        String source = PERSON_PREFIX + "::" + IdentifierFactory.md5((String)orcid);
        return OafMapperUtils.getRelation((String)source, (String)projectId, (String)"projectPerson", (String)"participation", (String)"participatesToProject", null, (DataInfo)PropagationConstant.getDataInfo("propagation", "person:project:deliverable", "Extraction of relations person participates in project if person orcid is found as author of a project deliverable", "dnet:provenanceActions"), null);
    }
}

