/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.person;

import eu.dnetlib.dhp.PropagationConstant;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.SparkSessionSupport;
import eu.dnetlib.dhp.common.person.CoAuthorshipIterator;
import eu.dnetlib.dhp.common.person.Coauthors;
import eu.dnetlib.dhp.countrypropagation.SparkCountryPropagationJob;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Author;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.Person;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
import eu.dnetlib.dhp.schema.oaf.utils.IdentifierFactory;
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
import java.io.InputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

public class SparkExtractPersonRelations {
    private static final Logger log = LoggerFactory.getLogger(SparkCountryPropagationJob.class);
    private static final String PERSON_PREFIX = ModelSupport.getIdPrefix(Person.class) + "|orcid_______";
    public static final DataInfo DATAINFO = OafMapperUtils.dataInfo((Boolean)false, (String)"openaire", (Boolean)true, (Boolean)false, (Qualifier)OafMapperUtils.qualifier((String)"sysimport:crosswalk:repository", (String)"sysimport:crosswalk:repository", (String)"dnet:provenanceActions", (String)"dnet:provenanceActions"), (String)"0.85");

    public static void main(String[] args) throws Exception {
        String jsonConfiguration = IOUtils.toString((InputStream)SparkCountryPropagationJob.class.getResourceAsStream("/eu/dnetlib/dhp/wf/subworkflows/person/input_personpropagation_parameters.json"));
        ArgumentApplicationParser parser = new ArgumentApplicationParser(jsonConfiguration);
        parser.parseArgument(args);
        Boolean isSparkSessionManaged = PropagationConstant.isSparkSessionManaged(parser);
        log.info("isSparkSessionManaged: {}", (Object)isSparkSessionManaged);
        String sourcePath = parser.get("sourcePath");
        log.info("sourcePath: {}", (Object)sourcePath);
        String workingPath = parser.get("outputPath");
        log.info("workingPath: {}", (Object)workingPath);
        SparkConf conf = new SparkConf();
        SparkSessionSupport.runWithSparkSession((SparkConf)conf, (Boolean)isSparkSessionManaged, spark -> {
            SparkExtractPersonRelations.extractRelations(spark, sourcePath, workingPath);
            SparkExtractPersonRelations.removeIsolatedPerson(spark, sourcePath, workingPath);
        });
    }

    private static void removeIsolatedPerson(SparkSession spark, String sourcePath, String workingPath) {
        Dataset personDataset = spark.read().schema(Encoders.bean(Person.class).schema()).json(sourcePath + "person").as(Encoders.bean(Person.class));
        Dataset relationDataset = spark.read().schema(Encoders.bean(Relation.class).schema()).json(sourcePath + "relation").as(Encoders.bean(Relation.class));
        personDataset.join(relationDataset, personDataset.col("id").equalTo((Object)relationDataset.col("source")), "left_semi").write().option("compression", "gzip").mode(SaveMode.Overwrite).json(workingPath + "person");
        spark.read().schema(Encoders.bean(Person.class).schema()).json(workingPath + "person").write().mode(SaveMode.Overwrite).option("compression", "gzip").json(sourcePath + "person");
    }

    private static void extractRelations(SparkSession spark, String sourcePath, String workingPath) {
        ModelSupport.entityTypes.keySet().stream().filter(ModelSupport::isResult).forEach(e -> {
            Dataset resultWithOrcids = spark.read().schema(Encoders.bean(Result.class).schema()).json(sourcePath + e.name()).as(Encoders.bean(Result.class)).filter((FilterFunction & Serializable)r -> r.getDataInfo().getDeletedbyinference() == false && r.getDataInfo().getInvisible() == false && Optional.ofNullable(r.getAuthor()).isPresent()).filter((FilterFunction & Serializable)r -> r.getAuthor().stream().anyMatch(a -> Optional.ofNullable(a.getPid()).isPresent() && a.getPid().stream().anyMatch(p -> Arrays.asList("orcid", "orcid_pending").contains(p.getQualifier().getClassid().toLowerCase()))));
            resultWithOrcids.flatMap(SparkExtractPersonRelations::getAuthorshipRelations, Encoders.bean(Relation.class)).distinct().write().mode(SaveMode.Append).option("compression", "gzip").json(workingPath + "/authorshipNew");
            resultWithOrcids.map(SparkExtractPersonRelations::getAuthorsPidList, Encoders.bean(Coauthors.class)).flatMap((FlatMapFunction & Serializable)c -> new CoAuthorshipIterator(c.getCoauthors()), Encoders.bean(Relation.class)).distinct().write().mode(SaveMode.Append).option("compression", "gzip").json(workingPath + "/coauthorshipNew");
        });
        Dataset relationDataset = spark.read().schema(Encoders.bean(Relation.class).schema()).json(sourcePath + "relation").as(Encoders.bean(Relation.class)).map((MapFunction & Serializable)r -> new Tuple2((Object)(r.getSource() + r.getRelClass() + r.getTarget()), r), Encoders.tuple((Encoder)Encoders.STRING(), (Encoder)Encoders.bean(Relation.class)));
        Dataset newRelations = SparkExtractPersonRelations.getRelationMap(spark, workingPath + "/authorshipNew").union(SparkExtractPersonRelations.getRelationMap(spark, workingPath + "/coauthorshipNew"));
        newRelations.joinWith(relationDataset, newRelations.col("_1").equalTo((Object)relationDataset.col("_1")), "left").map((MapFunction & Serializable)t2 -> {
            if (t2._2() == null) {
                return (Relation)((Tuple2)t2._1())._2();
            }
            return null;
        }, Encoders.bean(Relation.class)).filter((FilterFunction & Serializable)r -> r != null).write().mode(SaveMode.Append).option("compression", "gzip").json(workingPath + "/relation");
        spark.read().schema(Encoders.bean(Relation.class).schema()).json(workingPath + "/relation").write().mode(SaveMode.Append).option("compression", "gzip").json(sourcePath + "/relation");
    }

    private static Dataset<Tuple2<String, Relation>> getRelationMap(SparkSession spark, String workingPath) {
        return spark.read().schema(Encoders.bean(Relation.class).schema()).json(workingPath).as(Encoders.bean(Relation.class)).distinct().map((MapFunction & Serializable)r -> new Tuple2((Object)(r.getSource() + r.getRelClass() + r.getTarget()), r), Encoders.tuple((Encoder)Encoders.STRING(), (Encoder)Encoders.bean(Relation.class)));
    }

    private static Coauthors getAuthorsPidList(Result r) {
        Coauthors coauth = new Coauthors();
        coauth.setCoauthors(r.getAuthor().stream().filter(a -> a.getPid().stream().anyMatch(p -> Arrays.asList("orcid", "orcid_pending").contains(p.getQualifier().getClassid()))).map(a -> {
            Optional<StructuredProperty> tmp = a.getPid().stream().filter(p -> p.getQualifier().getClassid().equalsIgnoreCase("orcid")).findFirst();
            if (tmp.isPresent()) {
                return tmp.get().getValue();
            }
            tmp = a.getPid().stream().filter(p -> p.getQualifier().getClassid().equalsIgnoreCase("orcid_pending")).findFirst();
            return tmp.map(StructuredProperty::getValue).orElse(null);
        }).filter(Objects::nonNull).collect(Collectors.toList()));
        return coauth;
    }

    private static Iterator<Relation> getAuthorshipRelations(Result r) {
        ArrayList relationList = new ArrayList();
        for (Author a : r.getAuthor()) {
            relationList.addAll(a.getPid().stream().map(p -> {
                if (p.getQualifier().getClassid().equalsIgnoreCase("orcid_pending")) {
                    return SparkExtractPersonRelations.getRelation(p.getValue(), r.getId());
                }
                return null;
            }).filter(Objects::nonNull).collect(Collectors.toList()));
        }
        return relationList.iterator();
    }

    private static Relation getRelation(String orcid, String resultId) {
        String source = PERSON_PREFIX + "::" + IdentifierFactory.md5((String)orcid);
        Relation relation = OafMapperUtils.getRelation((String)source, (String)resultId, (String)"resultPerson", (String)"authorship", (String)"hasAuthored", null, (DataInfo)DATAINFO, null);
        return relation;
    }
}

