package eu.dnetlib.dhp.oa.dedup;

import com.kwartile.lib.cc.ConnectedComponent;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.common.EntityType;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.DataInfo;
import eu.dnetlib.dhp.schema.oaf.KeyValue;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.utils.PidType;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import eu.dnetlib.pace.config.DedupConfig;
import eu.dnetlib.pace.util.SparkCompatUtils;
import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.expressions.UserDefinedFunction;
import org.apache.spark.sql.expressions.Window;
import org.apache.spark.sql.expressions.WindowSpec;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;
import org.dom4j.DocumentException;
import org.eclipse.persistence.sdo.SDOConstants;
import org.postgresql.jdbc.EscapedFunctions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.xml.sax.SAXException;
import repackaged.com.google.common.google.common.hash.Hashing;
import scala.Tuple3;
import scala.collection.JavaConversions;

/* loaded from: input_file:eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels.class */
public class SparkCreateMergeRels extends AbstractSparkAction {
    private static final Logger log = LoggerFactory.getLogger(SparkCreateMergeRels.class);

    public SparkCreateMergeRels(ArgumentApplicationParser argumentApplicationParser, SparkSession sparkSession) {
        super(argumentApplicationParser, sparkSession);
    }

    public static void main(String[] strArr) throws Exception {
        ArgumentApplicationParser argumentApplicationParser = new ArgumentApplicationParser(IOUtils.toString(SparkCreateMergeRels.class.getResourceAsStream("/eu/dnetlib/dhp/oa/dedup/createCC_parameters.json")));
        argumentApplicationParser.parseArgument(strArr);
        String str = argumentApplicationParser.get("isLookUpUrl");
        log.info("isLookupUrl {}", str);
        SparkConf sparkConf = new SparkConf();
        sparkConf.set("hive.metastore.uris", argumentApplicationParser.get("hiveMetastoreUris"));
        sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        sparkConf.registerKryoClasses(ModelSupport.getOafModelClasses());
        new SparkCreateMergeRels(argumentApplicationParser, getSparkWithHiveSession(sparkConf)).run(ISLookupClientFactory.getLookUpService(str));
    }

    @Override // eu.dnetlib.dhp.oa.dedup.AbstractSparkAction
    public void run(ISLookUpService iSLookUpService) throws ISLookUpException, DocumentException, IOException, SAXException {
        String str = this.parser.get("graphBasePath");
        String str2 = this.parser.get("workingPath");
        String str3 = this.parser.get("isLookUpUrl");
        String str4 = this.parser.get("actionSetId");
        int intValue = ((Integer) Optional.ofNullable(this.parser.get("cutConnectedComponent")).map(Integer::valueOf).orElse(0)).intValue();
        String str5 = this.parser.get("pivotHistoryDatabase");
        log.info("connected component cut: '{}'", Integer.valueOf(intValue));
        log.info("graphBasePath: '{}'", str);
        log.info("isLookUpUrl:   '{}'", str3);
        log.info("actionSetId:   '{}'", str4);
        log.info("workingPath:   '{}'", str2);
        for (DedupConfig dedupConfig : getConfigurations(iSLookUpService, str4)) {
            String subEntityValue = dedupConfig.getWf().getSubEntityValue();
            Class cls = ModelSupport.entityTypes.get(EntityType.valueOf(subEntityValue));
            log.info("Creating merge rels for: '{}'", subEntityValue);
            log.info("Max iterations {}", Integer.valueOf(dedupConfig.getWf().getMaxIterations()));
            String createMergeRelPath = DedupUtility.createMergeRelPath(str2, str4, subEntityValue);
            Dataset select = this.spark.read().load(DedupUtility.createSimRelPath(str2, str4, subEntityValue)).select(SDOConstants.APPINFO_SOURCE_ATTRIBUTE, new String[]{"target"});
            UserDefinedFunction udf = functions.udf(str6 -> {
                return Long.valueOf(hash(str6));
            }, DataTypes.LongType);
            Dataset distinct = ConnectedComponent.runOnPairs(this.spark.read().load(DedupUtility.createSimRelPath(str2, str4, subEntityValue)).select(SDOConstants.APPINFO_SOURCE_ATTRIBUTE, new String[]{"target"}).withColumn(SDOConstants.APPINFO_SOURCE_ATTRIBUTE, udf.apply(new Column[]{functions.col(SDOConstants.APPINFO_SOURCE_ATTRIBUTE)})).withColumn("target", udf.apply(new Column[]{functions.col("target")})), 50, this.spark).join(select.selectExpr(new String[]{"source as id"}).union(select.selectExpr(new String[]{"target as id"})).distinct().withColumn("vertexId", udf.apply(new Column[]{functions.col("id")})), JavaConversions.asScalaBuffer(Collections.singletonList("vertexId")), "inner").drop("vertexId").distinct();
            Dataset createDataset = this.spark.createDataset(Collections.emptyList(), SparkCompatUtils.encoderFor(StructType.fromDDL("id STRING, lastUsage STRING")));
            if (StringUtils.isNotBlank(str5)) {
                createDataset = this.spark.read().table(str5 + "." + subEntityValue).selectExpr(new String[]{"id", "lastUsage"});
            }
            String str7 = "false AS collectedfrom";
            String str8 = "'' AS date";
            if (Result.class.isAssignableFrom(cls)) {
                if (Publication.class.isAssignableFrom(cls)) {
                    str7 = "array_contains(collectedfrom.key, '10|openaire____::081b82f96300b6a6e3d282bad31cb6e2') AS collectedfrom";
                } else if (eu.dnetlib.dhp.schema.oaf.Dataset.class.isAssignableFrom(cls)) {
                    str7 = "array_contains(collectedfrom.key, '10|openaire____::9e3be59865b2c1c335d32dae2fe7b254') AS collectedfrom";
                }
                str8 = "dateofacceptance.value AS date";
            }
            Dataset withColumn = this.spark.read().schema(Encoders.bean(cls).schema()).json(DedupUtility.createEntityPath(str, subEntityValue)).selectExpr(new String[]{"id", "regexp_extract(id, '^\\\\d+\\\\|([^_]+).*::', 1) AS pidType", str7, str8}).withColumn("pidType", functions.udf(str9 -> {
                return Integer.valueOf(Math.min(PidType.tryValueOf(str9).ordinal(), PidType.w3id.ordinal()));
            }, DataTypes.IntegerType).apply(new Column[]{functions.col("pidType")})).withColumn("date", functions.udf(str10 -> {
                return (StringUtils.isNotBlank(str10) && str10.matches(DatePicker.DATE_PATTERN) && DatePicker.inRange(str10)) ? str10 : LocalDate.now().plusWeeks(1L).toString();
            }, DataTypes.StringType).apply(new Column[]{functions.col("date")}));
            WindowSpec orderBy = Window.partitionBy("groupId", new String[0]).orderBy(new Column[]{functions.col("pidType").asc_nulls_last(), functions.col("lastUsage").desc_nulls_last(), functions.col("collectedfrom").desc_nulls_last(), functions.col("date").asc_nulls_last(), functions.col("id").asc_nulls_last()});
            saveParquet(distinct.join(createDataset, JavaConversions.asScalaBuffer(Collections.singletonList("id")), "full").join(withColumn, JavaConversions.asScalaBuffer(Collections.singletonList("id")), EscapedFunctions.LEFT).withColumn("pivot", functions.first("id").over(orderBy)).withColumn("position", functions.row_number().over(orderBy)).flatMap(row -> {
                String str11 = (String) row.getAs("id");
                String generate = IdGenerator.generate(str11);
                String str12 = (String) row.getAs("pivot");
                String generate2 = IdGenerator.generate(str12);
                if (str11.equals(generate2)) {
                    return Collections.emptyIterator();
                }
                ArrayList arrayList = new ArrayList();
                if (row.isNullAt(row.fieldIndex("groupId"))) {
                    if (!row.isNullAt(row.fieldIndex("collectedfrom"))) {
                        arrayList.add(new Tuple3(str11, generate, null));
                    }
                    return arrayList.iterator();
                }
                if (!row.isNullAt(row.fieldIndex("lastUsage")) && !str12.equals(str11) && !generate.equals(generate2)) {
                    arrayList.add(new Tuple3(generate, generate2, null));
                }
                if (intValue <= 0 || ((Integer) row.getAs("position")).intValue() <= intValue) {
                    arrayList.add(new Tuple3(str11, generate2, str12));
                }
                return arrayList.iterator();
            }, Encoders.tuple(Encoders.STRING(), Encoders.STRING(), Encoders.STRING())).distinct().flatMap(tuple3 -> {
                String str11 = (String) tuple3._1();
                String str12 = (String) tuple3._2();
                String str13 = (String) tuple3._3();
                ArrayList arrayList = new ArrayList();
                arrayList.add(rel(str13, str12, str11, ModelConstants.MERGES, dedupConfig));
                arrayList.add(rel(str13, str11, str12, ModelConstants.IS_MERGED_IN, dedupConfig));
                return arrayList.iterator();
            }, Encoders.bean(Relation.class)), createMergeRelPath, SaveMode.Overwrite);
        }
    }

    private static Relation rel(String str, String str2, String str3, String str4, DedupConfig dedupConfig) {
        String entityType = dedupConfig.getWf().getEntityType();
        Relation relation = new Relation();
        relation.setSource(str2);
        relation.setTarget(str3);
        relation.setRelClass(str4);
        relation.setRelType(entityType + entityType.substring(0, 1).toUpperCase() + entityType.substring(1));
        relation.setSubRelType(ModelConstants.DEDUP);
        DataInfo dataInfo = new DataInfo();
        dataInfo.setDeletedbyinference(false);
        dataInfo.setInferred(true);
        dataInfo.setInvisible(false);
        dataInfo.setInferenceprovenance(dedupConfig.getWf().getConfigurationId());
        Qualifier qualifier = new Qualifier();
        qualifier.setClassid(ModelConstants.PROVENANCE_DEDUP);
        qualifier.setClassname(ModelConstants.PROVENANCE_DEDUP);
        qualifier.setSchemeid(ModelConstants.DNET_PROVENANCE_ACTIONS);
        qualifier.setSchemename(ModelConstants.DNET_PROVENANCE_ACTIONS);
        dataInfo.setProvenanceaction(qualifier);
        relation.setDataInfo(dataInfo);
        if (str != null) {
            KeyValue keyValue = new KeyValue();
            keyValue.setKey("pivot");
            keyValue.setValue(str);
            relation.setProperties(Arrays.asList(keyValue));
        }
        return relation;
    }

    public static long hash(String str) {
        return Hashing.murmur3_128().hashString(str).asLong();
    }

    @Override // eu.dnetlib.dhp.oa.dedup.AbstractSparkAction
    public /* bridge */ /* synthetic */ List getConfigurations(ISLookUpService iSLookUpService, String str) throws ISLookUpException, DocumentException, IOException, SAXException {
        return super.getConfigurations(iSLookUpService, str);
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1812225058:
                if (implMethodName.equals("lambda$run$7eeb475$1")) {
                    z = 2;
                    break;
                }
                break;
            case -1338955361:
                if (implMethodName.equals("lambda$run$af3d2734$1")) {
                    z = 4;
                    break;
                }
                break;
            case -279929540:
                if (implMethodName.equals("lambda$run$3fc20d47$1")) {
                    z = 3;
                    break;
                }
                break;
            case -279057239:
                if (implMethodName.equals("lambda$run$dc267428$1")) {
                    z = true;
                    break;
                }
                break;
            case 1860501542:
                if (implMethodName.equals("lambda$run$fee02a7a$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/sql/api/java/UDF1") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;)Ljava/lang/Object;")) {
                    return str9 -> {
                        return Integer.valueOf(Math.min(PidType.tryValueOf(str9).ordinal(), PidType.w3id.ordinal()));
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/sql/api/java/UDF1") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;)Ljava/lang/Object;")) {
                    return str6 -> {
                        return Long.valueOf(hash(str6));
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/FlatMapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/util/Iterator;") && serializedLambda.getImplClass().equals("eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels") && serializedLambda.getImplMethodSignature().equals("(Leu/dnetlib/pace/config/DedupConfig;Lscala/Tuple3;)Ljava/util/Iterator;")) {
                    DedupConfig dedupConfig = (DedupConfig) serializedLambda.getCapturedArg(0);
                    return tuple3 -> {
                        String str11 = (String) tuple3._1();
                        String str12 = (String) tuple3._2();
                        String str13 = (String) tuple3._3();
                        ArrayList arrayList = new ArrayList();
                        arrayList.add(rel(str13, str12, str11, ModelConstants.MERGES, dedupConfig));
                        arrayList.add(rel(str13, str11, str12, ModelConstants.IS_MERGED_IN, dedupConfig));
                        return arrayList.iterator();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/sql/api/java/UDF1") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;)Ljava/lang/Object;")) {
                    return str10 -> {
                        return (StringUtils.isNotBlank(str10) && str10.matches(DatePicker.DATE_PATTERN) && DatePicker.inRange(str10)) ? str10 : LocalDate.now().plusWeeks(1L).toString();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/FlatMapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/util/Iterator;") && serializedLambda.getImplClass().equals("eu/dnetlib/dhp/oa/dedup/SparkCreateMergeRels") && serializedLambda.getImplMethodSignature().equals("(ILorg/apache/spark/sql/Row;)Ljava/util/Iterator;")) {
                    int intValue = ((Integer) serializedLambda.getCapturedArg(0)).intValue();
                    return row -> {
                        String str11 = (String) row.getAs("id");
                        String generate = IdGenerator.generate(str11);
                        String str12 = (String) row.getAs("pivot");
                        String generate2 = IdGenerator.generate(str12);
                        if (str11.equals(generate2)) {
                            return Collections.emptyIterator();
                        }
                        ArrayList arrayList = new ArrayList();
                        if (row.isNullAt(row.fieldIndex("groupId"))) {
                            if (!row.isNullAt(row.fieldIndex("collectedfrom"))) {
                                arrayList.add(new Tuple3(str11, generate, null));
                            }
                            return arrayList.iterator();
                        }
                        if (!row.isNullAt(row.fieldIndex("lastUsage")) && !str12.equals(str11) && !generate.equals(generate2)) {
                            arrayList.add(new Tuple3(generate, generate2, null));
                        }
                        if (intValue <= 0 || ((Integer) row.getAs("position")).intValue() <= intValue) {
                            arrayList.add(new Tuple3(str11, generate2, str12));
                        }
                        return arrayList.iterator();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
