package eu.dnetlib.dhp.oa.graph.merge;

import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.common.SparkSessionSupport;
import eu.dnetlib.dhp.oa.graph.dump.complete.Constants;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Datasource;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import java.io.InputStream;
import java.lang.invoke.SerializedLambda;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

/* loaded from: input_file:eu/dnetlib/dhp/oa/graph/merge/MergeGraphTableSparkJob.class */
public class MergeGraphTableSparkJob {
    private static final String PRIORITY_DEFAULT = "BETA";
    private static final Logger log = LoggerFactory.getLogger(MergeGraphTableSparkJob.class);
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
    private static final Datasource DATASOURCE = new Datasource();

    public static void main(String[] strArr) throws Exception {
        ArgumentApplicationParser argumentApplicationParser = new ArgumentApplicationParser(IOUtils.toString((InputStream) Objects.requireNonNull(MergeGraphTableSparkJob.class.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/merge_graphs_parameters.json"))));
        argumentApplicationParser.parseArgument(strArr);
        String str = (String) Optional.ofNullable(argumentApplicationParser.get("priority")).orElse(PRIORITY_DEFAULT);
        log.info("priority: {}", str);
        Boolean bool = (Boolean) Optional.ofNullable(argumentApplicationParser.get("isSparkSessionManaged")).map(Boolean::valueOf).orElse(Boolean.TRUE);
        log.info("isSparkSessionManaged: {}", bool);
        String str2 = argumentApplicationParser.get("betaInputPath");
        log.info("betaInputPath: {}", str2);
        String str3 = argumentApplicationParser.get("prodInputPath");
        log.info("prodInputPath: {}", str3);
        String str4 = argumentApplicationParser.get("outputPath");
        log.info("outputPath: {}", str4);
        String str5 = argumentApplicationParser.get("graphTableClassName");
        log.info("graphTableClassName: {}", str5);
        Class<?> cls = Class.forName(str5);
        SparkConf sparkConf = new SparkConf();
        sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        sparkConf.registerKryoClasses(ModelSupport.getOafModelClasses());
        SparkSessionSupport.runWithSparkSession(sparkConf, bool, sparkSession -> {
            removeOutputDir(sparkSession, str4);
            mergeGraphTable(sparkSession, str, str2, str3, cls, cls, str4);
        });
    }

    private static <P extends Oaf, B extends Oaf> void mergeGraphTable(SparkSession sparkSession, String str, String str2, String str3, Class<P> cls, Class<B> cls2, String str4) {
        Dataset readTableFromPath = readTableFromPath(sparkSession, str2, cls2);
        Dataset readTableFromPath2 = readTableFromPath(sparkSession, str3, cls);
        readTableFromPath2.joinWith(readTableFromPath, readTableFromPath2.col("_1").equalTo(readTableFromPath.col("_1")), "full_outer").map(tuple2 -> {
            Optional map = Optional.ofNullable(tuple2._1()).map((v0) -> {
                return v0._2();
            });
            Optional map2 = Optional.ofNullable(tuple2._2()).map((v0) -> {
                return v0._2();
            });
            if (map.orElse((Oaf) map2.orElse(DATASOURCE)) instanceof Datasource) {
                return mergeDatasource(map, map2);
            }
            boolean z = -1;
            switch (str.hashCode()) {
                case 2035184:
                    if (str.equals(PRIORITY_DEFAULT)) {
                        z = true;
                        break;
                    }
                    break;
                case 2464599:
                    if (str.equals("PROD")) {
                        z = 2;
                        break;
                    }
                    break;
            }
            switch (z) {
                case true:
                default:
                    return mergeWithPriorityToBETA(map, map2);
                case true:
                    return mergeWithPriorityToPROD(map, map2);
            }
        }, Encoders.bean(cls)).filter((v0) -> {
            return Objects.nonNull(v0);
        }).filter(oaf -> {
            return !((HashSet) Optional.ofNullable(oaf.getCollectedfrom()).map(list -> {
                return (HashSet) list.stream().map((v0) -> {
                    return v0.getValue();
                }).collect(Collectors.toCollection(HashSet::new));
            }).orElse(new HashSet())).contains("Datacite");
        }).write().mode(SaveMode.Overwrite).option("compression", "gzip").json(str4);
    }

    protected static <P extends Oaf, B extends Oaf> P mergeDatasource(Optional<P> optional, Optional<B> optional2) {
        if (optional.isPresent() && (!optional2.isPresent())) {
            return optional.get();
        }
        if (optional2.isPresent() && (!optional.isPresent())) {
            return optional2.get();
        }
        if ((!optional2.isPresent()) && (!optional.isPresent())) {
            return null;
        }
        Datasource datasource = optional.get();
        datasource.setOpenairecompatibility((Qualifier) Collections.min(Arrays.asList(datasource.getOpenairecompatibility(), optional2.get().getOpenairecompatibility()), new DatasourceCompatibilityComparator()));
        return datasource;
    }

    private static <P extends Oaf, B extends Oaf> P mergeWithPriorityToPROD(Optional<P> optional, Optional<B> optional2) {
        if (optional2.isPresent() && (!optional.isPresent())) {
            return optional2.get();
        }
        if (optional.isPresent()) {
            return optional.get();
        }
        return null;
    }

    private static <P extends Oaf, B extends Oaf> P mergeWithPriorityToBETA(Optional<P> optional, Optional<B> optional2) {
        if (optional.isPresent() && (!optional2.isPresent())) {
            return optional.get();
        }
        if (optional2.isPresent()) {
            return optional2.get();
        }
        return null;
    }

    private static <T extends Oaf> Dataset<Tuple2<String, T>> readTableFromPath(SparkSession sparkSession, String str, Class<T> cls) {
        log.info("Reading Graph table from: {}", str);
        return sparkSession.read().textFile(str).map(str2 -> {
            Oaf oaf = (Oaf) OBJECT_MAPPER.readValue(str2, cls);
            return new Tuple2((String) ModelSupport.idFn().apply(oaf), oaf);
        }, Encoders.tuple(Encoders.STRING(), Encoders.kryo(cls)));
    }

    private static void removeOutputDir(SparkSession sparkSession, String str) {
        HdfsSupport.remove(str, sparkSession.sparkContext().hadoopConfiguration());
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1057660790:
                if (implMethodName.equals("lambda$mergeGraphTable$a84c2a65$1")) {
                    z = 3;
                    break;
                }
                break;
            case -484196909:
                if (implMethodName.equals("lambda$mergeGraphTable$ba8b62a9$1")) {
                    z = false;
                    break;
                }
                break;
            case -81832591:
                if (implMethodName.equals("lambda$readTableFromPath$57b16ce1$1")) {
                    z = 2;
                    break;
                }
                break;
            case 2123019764:
                if (implMethodName.equals("nonNull")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("eu/dnetlib/dhp/oa/graph/merge/MergeGraphTableSparkJob") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;Lscala/Tuple2;)Leu/dnetlib/dhp/schema/oaf/Oaf;")) {
                    String str = (String) serializedLambda.getCapturedArg(0);
                    return tuple2 -> {
                        Optional map = Optional.ofNullable(tuple2._1()).map((v0) -> {
                            return v0._2();
                        });
                        Optional map2 = Optional.ofNullable(tuple2._2()).map((v0) -> {
                            return v0._2();
                        });
                        if (map.orElse((Oaf) map2.orElse(DATASOURCE)) instanceof Datasource) {
                            return mergeDatasource(map, map2);
                        }
                        boolean z2 = -1;
                        switch (str.hashCode()) {
                            case 2035184:
                                if (str.equals(PRIORITY_DEFAULT)) {
                                    z2 = true;
                                    break;
                                }
                                break;
                            case 2464599:
                                if (str.equals("PROD")) {
                                    z2 = 2;
                                    break;
                                }
                                break;
                        }
                        switch (z2) {
                            case true:
                            default:
                                return mergeWithPriorityToBETA(map, map2);
                            case true:
                                return mergeWithPriorityToPROD(map, map2);
                        }
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/FilterFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Z") && serializedLambda.getImplClass().equals("java/util/Objects") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;)Z")) {
                    return (v0) -> {
                        return Objects.nonNull(v0);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("eu/dnetlib/dhp/oa/graph/merge/MergeGraphTableSparkJob") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Class;Ljava/lang/String;)Lscala/Tuple2;")) {
                    Class cls = (Class) serializedLambda.getCapturedArg(0);
                    return str2 -> {
                        Oaf oaf = (Oaf) OBJECT_MAPPER.readValue(str2, cls);
                        return new Tuple2((String) ModelSupport.idFn().apply(oaf), oaf);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/FilterFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Z") && serializedLambda.getImplClass().equals("eu/dnetlib/dhp/oa/graph/merge/MergeGraphTableSparkJob") && serializedLambda.getImplMethodSignature().equals("(Leu/dnetlib/dhp/schema/oaf/Oaf;)Z")) {
                    return oaf -> {
                        return !((HashSet) Optional.ofNullable(oaf.getCollectedfrom()).map(list -> {
                            return (HashSet) list.stream().map((v0) -> {
                                return v0.getValue();
                            }).collect(Collectors.toCollection(HashSet::new));
                        }).orElse(new HashSet())).contains("Datacite");
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }

    static {
        Qualifier qualifier = new Qualifier();
        qualifier.setClassid(Constants.UNKNOWN);
        DATASOURCE.setOpenairecompatibility(qualifier);
    }
}
