package eu.dnetlib.dhp.oa.graph.merge;

import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.common.SparkSessionSupport;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Datasource;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.Qualifier;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.utils.MergeUtils;
import java.io.InputStream;
import java.lang.invoke.SerializedLambda;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.TypedColumn;
import org.apache.spark.sql.expressions.Aggregator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

/* loaded from: input_file:eu/dnetlib/dhp/oa/graph/merge/MergeGraphTableSparkJob.class */
public class MergeGraphTableSparkJob {
    private static final String PRIORITY_DEFAULT = "BETA";
    private static final Logger log = LoggerFactory.getLogger(MergeGraphTableSparkJob.class);
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
    private static final Datasource DATASOURCE = new Datasource();

    /* loaded from: input_file:eu/dnetlib/dhp/oa/graph/merge/MergeGraphTableSparkJob$GroupingAggregator.class */
    public static class GroupingAggregator<T extends Oaf> extends Aggregator<T, T, T> {
        private Class<T> clazz;

        public GroupingAggregator(Class<T> cls) {
            this.clazz = cls;
        }

        /* renamed from: zero, reason: merged with bridge method [inline-methods] */
        public T m21zero() {
            return null;
        }

        public T reduce(T t, T t2) {
            return mergeAndGet(t, t2);
        }

        private T mergeAndGet(T t, T t2) {
            if (Objects.nonNull(t2) && Objects.nonNull(t)) {
                if (ModelSupport.isSubClass(t2, OafEntity.class).booleanValue() && ModelSupport.isSubClass(t, OafEntity.class).booleanValue()) {
                    return (T) MergeUtils.merge(t, t2);
                }
                if ((t2 instanceof Relation) && (t instanceof Relation)) {
                    return MergeUtils.mergeRelation((Relation) t2, (Relation) t);
                }
            }
            return Objects.isNull(t2) ? t : t2;
        }

        public T merge(T t, T t2) {
            return mergeAndGet(t, t2);
        }

        public T finish(T t) {
            return t;
        }

        public Encoder<T> bufferEncoder() {
            return Encoders.kryo(this.clazz);
        }

        public Encoder<T> outputEncoder() {
            return Encoders.kryo(this.clazz);
        }
    }

    public static void main(String[] strArr) throws Exception {
        ArgumentApplicationParser argumentApplicationParser = new ArgumentApplicationParser(IOUtils.toString((InputStream) Objects.requireNonNull(MergeGraphTableSparkJob.class.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/merge_graphs_parameters.json"))));
        argumentApplicationParser.parseArgument(strArr);
        String str = (String) Optional.ofNullable(argumentApplicationParser.get("priority")).orElse(PRIORITY_DEFAULT);
        log.info("priority: {}", str);
        Boolean bool = (Boolean) Optional.ofNullable(argumentApplicationParser.get("isSparkSessionManaged")).map(Boolean::valueOf).orElse(Boolean.TRUE);
        log.info("isSparkSessionManaged: {}", bool);
        String str2 = argumentApplicationParser.get("betaInputPath");
        log.info("betaInputPath: {}", str2);
        String str3 = argumentApplicationParser.get("prodInputPath");
        log.info("prodInputPath: {}", str3);
        String str4 = argumentApplicationParser.get("outputPath");
        log.info("outputPath: {}", str4);
        String str5 = argumentApplicationParser.get("graphTableClassName");
        log.info("graphTableClassName: {}", str5);
        Class<?> cls = Class.forName(str5);
        SparkConf sparkConf = new SparkConf();
        sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        sparkConf.registerKryoClasses(ModelSupport.getOafModelClasses());
        SparkSessionSupport.runWithSparkSession(sparkConf, bool, sparkSession -> {
            removeOutputDir(sparkSession, str4);
            mergeGraphTable(sparkSession, str, str2, str3, cls, cls, str4);
        });
    }

    private static <P extends Oaf, B extends Oaf> void mergeGraphTable(SparkSession sparkSession, String str, String str2, String str3, Class<P> cls, Class<B> cls2, String str4) {
        Dataset readTableAndGroupById = readTableAndGroupById(sparkSession, str2, cls2);
        Dataset readTableAndGroupById2 = readTableAndGroupById(sparkSession, str3, cls);
        Dataset filter = readTableAndGroupById2.joinWith(readTableAndGroupById, readTableAndGroupById2.col("value").equalTo(readTableAndGroupById.col("value")), "full_outer").map(tuple2 -> {
            Optional map = Optional.ofNullable((Tuple2) tuple2._1()).map((v0) -> {
                return v0._2();
            });
            Optional map2 = Optional.ofNullable((Tuple2) tuple2._2()).map((v0) -> {
                return v0._2();
            });
            if (map.orElse((Oaf) map2.orElse(DATASOURCE)) instanceof Datasource) {
                return mergeDatasource(map, map2);
            }
            boolean z = -1;
            switch (str.hashCode()) {
                case 2035184:
                    if (str.equals(PRIORITY_DEFAULT)) {
                        z = true;
                        break;
                    }
                    break;
                case 2464599:
                    if (str.equals("PROD")) {
                        z = 2;
                        break;
                    }
                    break;
            }
            switch (z) {
                case true:
                default:
                    return mergeWithPriorityToBETA(map, map2);
                case true:
                    return mergeWithPriorityToPROD(map, map2);
            }
        }, Encoders.kryo(cls)).filter((v0) -> {
            return Objects.nonNull(v0);
        });
        ObjectMapper objectMapper = OBJECT_MAPPER;
        Objects.requireNonNull(objectMapper);
        filter.map((v1) -> {
            return r1.writeValueAsString(v1);
        }, Encoders.STRING()).write().mode(SaveMode.Overwrite).option("compression", "gzip").text(str4);
    }

    protected static <P extends Oaf, B extends Oaf> P mergeDatasource(Optional<P> optional, Optional<B> optional2) {
        if (optional.isPresent() && (!optional2.isPresent())) {
            return optional.get();
        }
        if (optional2.isPresent() && (!optional.isPresent())) {
            return optional2.get();
        }
        if ((!optional2.isPresent()) && (!optional.isPresent())) {
            return null;
        }
        Datasource datasource = optional.get();
        Datasource datasource2 = optional2.get();
        datasource.setOpenairecompatibility((Qualifier) Collections.min(Arrays.asList(datasource.getOpenairecompatibility(), datasource2.getOpenairecompatibility()), new DatasourceCompatibilityComparator()));
        datasource.setCollectedfrom((List) Stream.concat((Stream) Optional.ofNullable(datasource.getCollectedfrom()).map((v0) -> {
            return v0.stream();
        }).orElse(Stream.empty()), (Stream) Optional.ofNullable(datasource2.getCollectedfrom()).map((v0) -> {
            return v0.stream();
        }).orElse(Stream.empty())).distinct().collect(Collectors.toList()));
        datasource.setOriginalId(mergeLists(datasource.getOriginalId(), datasource2.getOriginalId()));
        datasource.setPid(mergeLists(datasource.getPid(), datasource2.getPid()));
        return datasource;
    }

    private static final <T> List<T> mergeLists(List<T>... listArr) {
        return (List) Arrays.stream(listArr).filter((v0) -> {
            return Objects.nonNull(v0);
        }).flatMap((v0) -> {
            return v0.stream();
        }).filter(Objects::nonNull).distinct().collect(Collectors.toList());
    }

    private static <P extends Oaf, B extends Oaf> P mergeWithPriorityToPROD(Optional<P> optional, Optional<B> optional2) {
        if (optional2.isPresent() && (!optional.isPresent())) {
            return optional2.get();
        }
        if (optional.isPresent()) {
            return optional.get();
        }
        return null;
    }

    private static <P extends Oaf, B extends Oaf> P mergeWithPriorityToBETA(Optional<P> optional, Optional<B> optional2) {
        if (optional.isPresent() && (!optional2.isPresent())) {
            return optional.get();
        }
        if (optional2.isPresent()) {
            return optional2.get();
        }
        return null;
    }

    private static <T extends Oaf> Dataset<Tuple2<String, T>> readTableAndGroupById(SparkSession sparkSession, String str, Class<T> cls) {
        TypedColumn column = new GroupingAggregator(cls).toColumn();
        log.info("Reading Graph table from: {}", str);
        return sparkSession.read().textFile(str).map(str2 -> {
            return (Oaf) OBJECT_MAPPER.readValue(str2, cls);
        }, Encoders.kryo(cls)).groupByKey(oaf -> {
            return (String) ModelSupport.idFn().apply(oaf);
        }, Encoders.STRING()).agg(column);
    }

    private static void removeOutputDir(SparkSession sparkSession, String str) {
        HdfsSupport.remove(str, sparkSession.sparkContext().hadoopConfiguration());
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -484196909:
                if (implMethodName.equals("lambda$mergeGraphTable$ba8b62a9$1")) {
                    z = false;
                    break;
                }
                break;
            case 681086541:
                if (implMethodName.equals("lambda$readTableAndGroupById$9c3bdbf$1")) {
                    z = 2;
                    break;
                }
                break;
            case 1097149990:
                if (implMethodName.equals("lambda$readTableAndGroupById$57b16ce1$1")) {
                    z = true;
                    break;
                }
                break;
            case 1533589973:
                if (implMethodName.equals("writeValueAsString")) {
                    z = 3;
                    break;
                }
                break;
            case 2123019764:
                if (implMethodName.equals("nonNull")) {
                    z = 4;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("eu/dnetlib/dhp/oa/graph/merge/MergeGraphTableSparkJob") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;Lscala/Tuple2;)Leu/dnetlib/dhp/schema/oaf/Oaf;")) {
                    String str = (String) serializedLambda.getCapturedArg(0);
                    return tuple2 -> {
                        Optional map = Optional.ofNullable((Tuple2) tuple2._1()).map((v0) -> {
                            return v0._2();
                        });
                        Optional map2 = Optional.ofNullable((Tuple2) tuple2._2()).map((v0) -> {
                            return v0._2();
                        });
                        if (map.orElse((Oaf) map2.orElse(DATASOURCE)) instanceof Datasource) {
                            return mergeDatasource(map, map2);
                        }
                        boolean z2 = -1;
                        switch (str.hashCode()) {
                            case 2035184:
                                if (str.equals(PRIORITY_DEFAULT)) {
                                    z2 = true;
                                    break;
                                }
                                break;
                            case 2464599:
                                if (str.equals("PROD")) {
                                    z2 = 2;
                                    break;
                                }
                                break;
                        }
                        switch (z2) {
                            case true:
                            default:
                                return mergeWithPriorityToBETA(map, map2);
                            case true:
                                return mergeWithPriorityToPROD(map, map2);
                        }
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("eu/dnetlib/dhp/oa/graph/merge/MergeGraphTableSparkJob") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Class;Ljava/lang/String;)Leu/dnetlib/dhp/schema/oaf/Oaf;")) {
                    Class cls = (Class) serializedLambda.getCapturedArg(0);
                    return str2 -> {
                        return (Oaf) OBJECT_MAPPER.readValue(str2, cls);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("eu/dnetlib/dhp/oa/graph/merge/MergeGraphTableSparkJob") && serializedLambda.getImplMethodSignature().equals("(Leu/dnetlib/dhp/schema/oaf/Oaf;)Ljava/lang/String;")) {
                    return oaf -> {
                        return (String) ModelSupport.idFn().apply(oaf);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("com/fasterxml/jackson/databind/ObjectMapper") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/String;")) {
                    ObjectMapper objectMapper = (ObjectMapper) serializedLambda.getCapturedArg(0);
                    return (v1) -> {
                        return r0.writeValueAsString(v1);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/FilterFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Z") && serializedLambda.getImplClass().equals("java/util/Objects") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;)Z")) {
                    return (v0) -> {
                        return Objects.nonNull(v0);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }

    static {
        Qualifier qualifier = new Qualifier();
        qualifier.setClassid("UNKNOWN");
        DATASOURCE.setOpenairecompatibility(qualifier);
    }
}
