/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.oa.dedup;

import com.google.common.collect.Lists;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.dedup.AbstractSparkAction;
import eu.dnetlib.dhp.oa.dedup.DedupUtility;
import eu.dnetlib.dhp.oa.dedup.model.OrgSimRel;
import eu.dnetlib.dhp.schema.common.EntityType;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Field;
import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.Organization;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.Properties;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
import scala.Tuple3;

public class SparkPrepareOrgRels
extends AbstractSparkAction {
    private static final Logger log = LoggerFactory.getLogger(SparkPrepareOrgRels.class);
    public static final String GROUP_PREFIX = "group::";

    public SparkPrepareOrgRels(ArgumentApplicationParser parser, SparkSession spark) {
        super(parser, spark);
    }

    public static void main(String[] args) throws Exception {
        ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString((InputStream)SparkPrepareOrgRels.class.getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/prepareOrgRels_parameters.json")));
        parser.parseArgument(args);
        SparkConf conf = new SparkConf();
        conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        conf.registerKryoClasses(ModelSupport.getOafModelClasses());
        new SparkPrepareOrgRels(parser, SparkPrepareOrgRels.getSparkSession(conf)).run(ISLookupClientFactory.getLookUpService((String)parser.get("isLookUpUrl")));
    }

    @Override
    public void run(ISLookUpService isLookUpService) throws IOException {
        String graphBasePath = this.parser.get("graphBasePath");
        String isLookUpUrl = this.parser.get("isLookUpUrl");
        String actionSetId = this.parser.get("actionSetId");
        String workingPath = this.parser.get("workingPath");
        int numConnections = Optional.ofNullable(this.parser.get("numConnections")).map(Integer::valueOf).orElse(20);
        String dbUrl = this.parser.get("dbUrl");
        String dbTable = this.parser.get("dbTable");
        String dbUser = this.parser.get("dbUser");
        String dbPwd = this.parser.get("dbPwd");
        log.info("graphBasePath: '{}'", (Object)graphBasePath);
        log.info("isLookUpUrl:   '{}'", (Object)isLookUpUrl);
        log.info("actionSetId:   '{}'", (Object)actionSetId);
        log.info("workingPath:   '{}'", (Object)workingPath);
        log.info("numPartitions: '{}'", (Object)numConnections);
        log.info("dbUrl:         '{}'", (Object)dbUrl);
        log.info("dbUser:        '{}'", (Object)dbUser);
        log.info("table:         '{}'", (Object)dbTable);
        log.info("dbPwd:         '{}'", (Object)"xxx");
        String organization = ModelSupport.getMainType((EntityType)EntityType.organization);
        String mergeRelPath = DedupUtility.createMergeRelPath(workingPath, actionSetId, organization);
        String entityPath = DedupUtility.createEntityPath(graphBasePath, organization);
        String relationPath = DedupUtility.createEntityPath(graphBasePath, "relation");
        Dataset<OrgSimRel> relations = SparkPrepareOrgRels.createRelations(this.spark, mergeRelPath, relationPath, entityPath);
        Properties connectionProperties = new Properties();
        connectionProperties.put("user", dbUser);
        connectionProperties.put("password", dbPwd);
        relations.repartition(numConnections).write().mode(SaveMode.Overwrite).jdbc(dbUrl, dbTable, connectionProperties);
    }

    private static boolean filterRels(Relation rel, String entityType) {
        switch (entityType) {
            case "result": {
                if (!rel.getRelClass().equals("isDifferentFrom") || !rel.getRelType().equals("resultResult") || !rel.getSubRelType().equals("dedup")) break;
                return true;
            }
            case "organization": {
                if (!rel.getRelClass().equals("isDifferentFrom") || !rel.getRelType().equals("organizationOrganization") || !rel.getSubRelType().equals("dedup")) break;
                return true;
            }
            default: {
                return false;
            }
        }
        return false;
    }

    public static Dataset<OrgSimRel> createRelations(SparkSession spark, String mergeRelsPath, String relationPath, String entitiesPath) {
        JavaRDD diffRels = spark.read().schema(Encoders.bean(Relation.class).schema()).json(relationPath).as(Encoders.bean(Relation.class)).map(SparkPrepareOrgRels.patchRelFn(), Encoders.bean(Relation.class)).toJavaRDD().filter((Function & Serializable)r -> SparkPrepareOrgRels.filterRels(r, "organization")).map((Function & Serializable)rel -> {
            if (DedupUtility.compareOpenOrgIds(rel.getSource(), rel.getTarget()) < 0) {
                return new Tuple2((Object)new Tuple2((Object)rel.getSource(), (Object)rel.getTarget()), (Object)"diffRel");
            }
            return new Tuple2((Object)new Tuple2((Object)rel.getTarget(), (Object)rel.getSource()), (Object)"diffRel");
        }).distinct();
        log.info("Number of DiffRels collected: {}", (Object)diffRels.count());
        Dataset entities = spark.read().textFile(entitiesPath).map((MapFunction & Serializable)it -> {
            Organization entity = (Organization)OBJECT_MAPPER.readValue(it, Organization.class);
            return new Tuple2((Object)entity.getId(), (Object)entity);
        }, Encoders.tuple((Encoder)Encoders.STRING(), (Encoder)Encoders.kryo(Organization.class)));
        JavaRDD rawOpenorgsRels = spark.read().load(mergeRelsPath).as(Encoders.bean(Relation.class)).where("relClass == 'merges'").toJavaRDD().mapToPair((PairFunction & Serializable)r -> new Tuple2((Object)r.getSource(), (Object)r.getTarget())).filter((Function & Serializable)t -> !((String)t._2()).contains("openorgsmesh")).groupByKey().map((Function & Serializable)g -> Lists.newArrayList((Iterable)((Iterable)g._2()))).filter((Function & Serializable)l -> l.size() > 1).flatMap((FlatMapFunction & Serializable)l -> {
            String groupId = GROUP_PREFIX + UUID.randomUUID();
            List<String> ids = SparkPrepareOrgRels.sortIds(l);
            ArrayList<Tuple2> rels = new ArrayList<Tuple2>();
            String source = ids.get(0);
            for (String target : ids) {
                rels.add(new Tuple2((Object)new Tuple2((Object)source, (Object)target), (Object)groupId));
            }
            return rels.iterator();
        });
        log.info("Number of Raw Openorgs Relations created: {}", (Object)rawOpenorgsRels.count());
        JavaRDD openorgsRels = rawOpenorgsRels.union(diffRels).mapToPair((PairFunction & Serializable)t -> new Tuple2((Object)((String)((Tuple2)t._1())._1() + "@@@" + (String)((Tuple2)t._1())._2()), t._2())).groupByKey().map((Function & Serializable)g -> new Tuple2(g._1(), StreamSupport.stream(((Iterable)g._2()).spliterator(), false).collect(Collectors.toList()))).filter((Function & Serializable)g -> ((List)g._2()).size() == 1 && ((String)((List)g._2()).get(0)).contains(GROUP_PREFIX)).map((Function & Serializable)t -> new Tuple3((Object)((String)t._1()).split("@@@")[0], (Object)((String)t._1()).split("@@@")[1], ((List)t._2()).get(0)));
        log.info("Number of Openorgs Relations created: '{}'", (Object)openorgsRels.count());
        Dataset relations = spark.createDataset(openorgsRels.rdd(), Encoders.tuple((Encoder)Encoders.STRING(), (Encoder)Encoders.STRING(), (Encoder)Encoders.STRING()));
        Dataset relations2 = relations.joinWith(entities, relations.col("_2").equalTo((Object)entities.col("_1")), "inner").map((MapFunction & Serializable)r -> {
            Organization o = (Organization)((Tuple2)r._2())._2();
            return new OrgSimRel((String)((Tuple3)r._1())._1(), Optional.ofNullable(o.getOriginalId()).map(oid -> (String)oid.get(0)).orElse(null), Optional.ofNullable(o.getLegalname()).map(Field::getValue).orElse(""), Optional.ofNullable(o.getLegalshortname()).map(Field::getValue).orElse(""), Optional.ofNullable(o.getCountry()).map(Qualifier::getClassid).orElse(""), Optional.ofNullable(o.getWebsiteurl()).map(Field::getValue).orElse(""), Optional.ofNullable(o.getCollectedfrom()).map(c -> Optional.ofNullable(c.get(0)).map(KeyValue::getValue).orElse("")).orElse(""), (String)((Tuple3)r._1())._3(), SparkPrepareOrgRels.structuredPropertyListToString(o.getPid()), SparkPrepareOrgRels.parseECField((Field<String>)o.getEclegalbody()), SparkPrepareOrgRels.parseECField((Field<String>)o.getEclegalperson()), SparkPrepareOrgRels.parseECField((Field<String>)o.getEcnonprofit()), SparkPrepareOrgRels.parseECField((Field<String>)o.getEcresearchorganization()), SparkPrepareOrgRels.parseECField((Field<String>)o.getEchighereducation()), SparkPrepareOrgRels.parseECField((Field<String>)o.getEcinternationalorganizationeurinterests()), SparkPrepareOrgRels.parseECField((Field<String>)o.getEcinternationalorganization()), SparkPrepareOrgRels.parseECField((Field<String>)o.getEcenterprise()), SparkPrepareOrgRels.parseECField((Field<String>)o.getEcsmevalidated()), SparkPrepareOrgRels.parseECField((Field<String>)o.getEcnutscode()));
        }, Encoders.bean(OrgSimRel.class)).map((MapFunction & Serializable)o -> new Tuple2((Object)o.getLocal_id(), o), Encoders.tuple((Encoder)Encoders.STRING(), (Encoder)Encoders.bean(OrgSimRel.class)));
        return relations2.joinWith(entities, relations2.col("_1").equalTo((Object)entities.col("_1")), "inner").map((MapFunction & Serializable)r -> {
            OrgSimRel orgSimRel = (OrgSimRel)((Tuple2)r._1())._2();
            orgSimRel.setLocal_id(Optional.ofNullable(((Organization)((Tuple2)r._2())._2()).getOriginalId()).map(oid -> (String)oid.get(0)).orElse(null));
            return orgSimRel;
        }, Encoders.bean(OrgSimRel.class));
    }

    public static List<String> sortIds(List<String> ids) {
        ids.sort(DedupUtility::compareOpenOrgIds);
        return ids;
    }

    public static Dataset<OrgSimRel> createRelationsFromScratch(SparkSession spark, String mergeRelsPath, String entitiesPath) {
        Dataset entities = spark.read().textFile(entitiesPath).map((MapFunction & Serializable)it -> {
            Organization entity = (Organization)OBJECT_MAPPER.readValue(it, Organization.class);
            return new Tuple2((Object)entity.getId(), (Object)entity);
        }, Encoders.tuple((Encoder)Encoders.STRING(), (Encoder)Encoders.kryo(Organization.class)));
        Dataset relations = spark.createDataset(spark.read().load(mergeRelsPath).as(Encoders.bean(Relation.class)).where("relClass == 'merges'").toJavaRDD().mapToPair((PairFunction & Serializable)r -> new Tuple2((Object)r.getSource(), (Object)r.getTarget())).groupByKey().flatMap((FlatMapFunction & Serializable)g -> {
            ArrayList<Tuple2> rels = new ArrayList<Tuple2>();
            for (String id1 : (Iterable)g._2()) {
                for (String id2 : (Iterable)g._2()) {
                    if (id1.equals(id2) || !id1.contains("openorgs____") || id2.contains("openorgsmesh")) continue;
                    rels.add(new Tuple2((Object)id1, (Object)id2));
                }
            }
            return rels.iterator();
        }).rdd(), Encoders.tuple((Encoder)Encoders.STRING(), (Encoder)Encoders.STRING()));
        Dataset relations2 = relations.joinWith(entities, relations.col("_2").equalTo((Object)entities.col("_1")), "inner").map((MapFunction & Serializable)r -> new OrgSimRel((String)((Tuple2)r._1())._1(), (String)((Organization)((Tuple2)r._2())._2()).getOriginalId().get(0), ((Organization)((Tuple2)r._2())._2()).getLegalname() != null ? (String)((Organization)((Tuple2)r._2())._2()).getLegalname().getValue() : "", ((Organization)((Tuple2)r._2())._2()).getLegalshortname() != null ? (String)((Organization)((Tuple2)r._2())._2()).getLegalshortname().getValue() : "", ((Organization)((Tuple2)r._2())._2()).getCountry() != null ? ((Organization)((Tuple2)r._2())._2()).getCountry().getClassid() : "", ((Organization)((Tuple2)r._2())._2()).getWebsiteurl() != null ? (String)((Organization)((Tuple2)r._2())._2()).getWebsiteurl().getValue() : "", ((KeyValue)((Organization)((Tuple2)r._2())._2()).getCollectedfrom().get(0)).getValue(), GROUP_PREFIX + (String)((Tuple2)r._1())._1(), SparkPrepareOrgRels.structuredPropertyListToString(((Organization)((Tuple2)r._2())._2()).getPid()), SparkPrepareOrgRels.parseECField((Field<String>)((Organization)((Tuple2)r._2())._2()).getEclegalbody()), SparkPrepareOrgRels.parseECField((Field<String>)((Organization)((Tuple2)r._2())._2()).getEclegalperson()), SparkPrepareOrgRels.parseECField((Field<String>)((Organization)((Tuple2)r._2())._2()).getEcnonprofit()), SparkPrepareOrgRels.parseECField((Field<String>)((Organization)((Tuple2)r._2())._2()).getEcresearchorganization()), SparkPrepareOrgRels.parseECField((Field<String>)((Organization)((Tuple2)r._2())._2()).getEchighereducation()), SparkPrepareOrgRels.parseECField((Field<String>)((Organization)((Tuple2)r._2())._2()).getEcinternationalorganizationeurinterests()), SparkPrepareOrgRels.parseECField((Field<String>)((Organization)((Tuple2)r._2())._2()).getEcinternationalorganization()), SparkPrepareOrgRels.parseECField((Field<String>)((Organization)((Tuple2)r._2())._2()).getEcenterprise()), SparkPrepareOrgRels.parseECField((Field<String>)((Organization)((Tuple2)r._2())._2()).getEcsmevalidated()), SparkPrepareOrgRels.parseECField((Field<String>)((Organization)((Tuple2)r._2())._2()).getEcnutscode())), Encoders.bean(OrgSimRel.class)).map((MapFunction & Serializable)o -> new Tuple2((Object)o.getLocal_id(), o), Encoders.tuple((Encoder)Encoders.STRING(), (Encoder)Encoders.bean(OrgSimRel.class)));
        return relations2.joinWith(entities, relations2.col("_1").equalTo((Object)entities.col("_1")), "inner").map((MapFunction & Serializable)r -> {
            OrgSimRel orgSimRel = (OrgSimRel)((Tuple2)r._1())._2();
            orgSimRel.setLocal_id((String)((Organization)((Tuple2)r._2())._2()).getOriginalId().get(0));
            return orgSimRel;
        }, Encoders.bean(OrgSimRel.class));
    }
}

