/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.oa.dedup;

import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.oa.dedup.AbstractSparkAction;
import eu.dnetlib.dhp.oa.dedup.DedupUtility;
import eu.dnetlib.dhp.oa.dedup.model.OrgSimRel;
import eu.dnetlib.dhp.schema.common.EntityType;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Field;
import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.Organization;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.util.Properties;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

public class SparkPrepareNewOrgs
extends AbstractSparkAction {
    private static final Logger log = LoggerFactory.getLogger(SparkPrepareNewOrgs.class);

    public SparkPrepareNewOrgs(ArgumentApplicationParser parser, SparkSession spark) {
        super(parser, spark);
    }

    public static void main(String[] args) throws Exception {
        ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString((InputStream)SparkPrepareNewOrgs.class.getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/prepareNewOrgs_parameters.json")));
        parser.parseArgument(args);
        SparkConf conf = new SparkConf();
        conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        conf.registerKryoClasses(ModelSupport.getOafModelClasses());
        new SparkPrepareNewOrgs(parser, SparkPrepareNewOrgs.getSparkSession(conf)).run(ISLookupClientFactory.getLookUpService((String)parser.get("isLookUpUrl")));
    }

    @Override
    public void run(ISLookUpService isLookUpService) throws IOException {
        String graphBasePath = this.parser.get("graphBasePath");
        String isLookUpUrl = this.parser.get("isLookUpUrl");
        String actionSetId = this.parser.get("actionSetId");
        String workingPath = this.parser.get("workingPath");
        int numConnections = java.util.Optional.ofNullable(this.parser.get("numConnections")).map(Integer::valueOf).orElse(20);
        String dbUrl = this.parser.get("dbUrl");
        String dbTable = this.parser.get("dbTable");
        String dbUser = this.parser.get("dbUser");
        String dbPwd = this.parser.get("dbPwd");
        log.info("graphBasePath: '{}'", (Object)graphBasePath);
        log.info("isLookUpUrl:   '{}'", (Object)isLookUpUrl);
        log.info("actionSetId:   '{}'", (Object)actionSetId);
        log.info("workingPath:   '{}'", (Object)workingPath);
        log.info("numPartitions: '{}'", (Object)numConnections);
        log.info("dbUrl:         '{}'", (Object)dbUrl);
        log.info("dbUser:        '{}'", (Object)dbUser);
        log.info("table:         '{}'", (Object)dbTable);
        log.info("dbPwd:         '{}'", (Object)"xxx");
        String organizazion = ModelSupport.getMainType((EntityType)EntityType.organization);
        String entityPath = DedupUtility.createEntityPath(graphBasePath, organizazion);
        String mergeRelPath = DedupUtility.createMergeRelPath(workingPath, actionSetId, organizazion);
        String relationPath = DedupUtility.createEntityPath(graphBasePath, "relation");
        Dataset<OrgSimRel> newOrgs = SparkPrepareNewOrgs.createNewOrgs(this.spark, mergeRelPath, relationPath, entityPath);
        Properties connectionProperties = new Properties();
        connectionProperties.put("user", dbUser);
        connectionProperties.put("password", dbPwd);
        log.info("Number of New Organization created: '{}'", (Object)newOrgs.count());
        newOrgs.repartition(numConnections).write().mode(SaveMode.Append).jdbc(dbUrl, dbTable, connectionProperties);
    }

    public static Dataset<OrgSimRel> createNewOrgs(SparkSession spark, String mergeRelsPath, String relationPath, String entitiesPath) {
        JavaPairRDD diffRels = spark.read().textFile(relationPath).map(SparkPrepareNewOrgs.patchRelFn(), Encoders.bean(Relation.class)).toJavaRDD().filter((Function & Serializable)r -> SparkPrepareNewOrgs.filterRels(r, ModelSupport.getMainType((EntityType)EntityType.organization))).mapToPair((PairFunction & Serializable)rel -> {
            if (DedupUtility.compareOpenOrgIds(rel.getSource(), rel.getTarget()) > 0) {
                return new Tuple2((Object)rel.getSource(), (Object)"diffRel");
            }
            return new Tuple2((Object)rel.getTarget(), (Object)"diffRel");
        }).distinct();
        log.info("Number of DiffRels collected: '{}'", (Object)diffRels.count());
        Dataset entities = spark.read().textFile(entitiesPath).map((MapFunction & Serializable)it -> {
            Organization entity = (Organization)OBJECT_MAPPER.readValue(it, Organization.class);
            return new Tuple2((Object)entity.getId(), (Object)entity);
        }, Encoders.tuple((Encoder)Encoders.STRING(), (Encoder)Encoders.kryo(Organization.class)));
        Dataset openorgsRels = spark.createDataset(spark.read().load(mergeRelsPath).as(Encoders.bean(Relation.class)).where("relClass == 'isMergedIn'").toJavaRDD().mapToPair((PairFunction & Serializable)r -> new Tuple2((Object)r.getSource(), (Object)r.getTarget())).leftOuterJoin(diffRels).filter((Function & Serializable)rel -> !((Optional)((Tuple2)rel._2())._2()).isPresent()).mapToPair((PairFunction & Serializable)rel -> new Tuple2(rel._1(), ((Tuple2)rel._2())._1())).rdd(), Encoders.tuple((Encoder)Encoders.STRING(), (Encoder)Encoders.STRING()));
        log.info("Number of Openorgs Relations loaded: '{}'", (Object)openorgsRels.count());
        return entities.joinWith(openorgsRels, entities.col("_1").equalTo((Object)openorgsRels.col("_1")), "left").filter((FilterFunction & Serializable)t -> t._2() == null).filter((FilterFunction & Serializable)t -> !((String)((Tuple2)t._1())._1()).contains("openorgs")).map((MapFunction & Serializable)r -> new OrgSimRel("", (String)((Organization)((Tuple2)r._1())._2()).getOriginalId().get(0), ((Organization)((Tuple2)r._1())._2()).getLegalname() != null ? (String)((Organization)((Tuple2)r._1())._2()).getLegalname().getValue() : "", ((Organization)((Tuple2)r._1())._2()).getLegalshortname() != null ? (String)((Organization)((Tuple2)r._1())._2()).getLegalshortname().getValue() : "", ((Organization)((Tuple2)r._1())._2()).getCountry() != null ? ((Organization)((Tuple2)r._1())._2()).getCountry().getClassid() : "", ((Organization)((Tuple2)r._1())._2()).getWebsiteurl() != null ? (String)((Organization)((Tuple2)r._1())._2()).getWebsiteurl().getValue() : "", ((KeyValue)((Organization)((Tuple2)r._1())._2()).getCollectedfrom().get(0)).getValue(), "", SparkPrepareNewOrgs.structuredPropertyListToString(((Organization)((Tuple2)r._1())._2()).getPid()), SparkPrepareNewOrgs.parseECField((Field<String>)((Organization)((Tuple2)r._1())._2()).getEclegalbody()), SparkPrepareNewOrgs.parseECField((Field<String>)((Organization)((Tuple2)r._1())._2()).getEclegalperson()), SparkPrepareNewOrgs.parseECField((Field<String>)((Organization)((Tuple2)r._1())._2()).getEcnonprofit()), SparkPrepareNewOrgs.parseECField((Field<String>)((Organization)((Tuple2)r._1())._2()).getEcresearchorganization()), SparkPrepareNewOrgs.parseECField((Field<String>)((Organization)((Tuple2)r._1())._2()).getEchighereducation()), SparkPrepareNewOrgs.parseECField((Field<String>)((Organization)((Tuple2)r._1())._2()).getEcinternationalorganizationeurinterests()), SparkPrepareNewOrgs.parseECField((Field<String>)((Organization)((Tuple2)r._1())._2()).getEcinternationalorganization()), SparkPrepareNewOrgs.parseECField((Field<String>)((Organization)((Tuple2)r._1())._2()).getEcenterprise()), SparkPrepareNewOrgs.parseECField((Field<String>)((Organization)((Tuple2)r._1())._2()).getEcsmevalidated()), SparkPrepareNewOrgs.parseECField((Field<String>)((Organization)((Tuple2)r._1())._2()).getEcnutscode())), Encoders.bean(OrgSimRel.class));
    }

    private static boolean filterRels(Relation rel, String entityType) {
        switch (entityType) {
            case "result": {
                if (!rel.getRelClass().equals("isDifferentFrom") || !rel.getRelType().equals("resultResult") || !rel.getSubRelType().equals("dedup")) break;
                return true;
            }
            case "organization": {
                if (!rel.getRelClass().equals("isDifferentFrom") || !rel.getRelType().equals("organizationOrganization") || !rel.getSubRelType().equals("dedup")) break;
                return true;
            }
            default: {
                return false;
            }
        }
        return false;
    }
}

