package eu.dnetlib.dhp.incremental;

import eu.dnetlib.dhp.aggregation.mdstore.MDStoreActionNode;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.Constants;
import eu.dnetlib.dhp.common.SparkSessionSupport;
import eu.dnetlib.dhp.common.rest.DNetRestClient;
import eu.dnetlib.dhp.oa.graph.raw.CopyHdfsOafSparkApplication;
import eu.dnetlib.dhp.oozie.RunSQLSparkJob;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.mdstore.MDStoreVersion;
import java.lang.invoke.SerializedLambda;
import java.util.Collections;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.DataTypes;
import org.eclipse.persistence.sdo.SDOConstants;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.JavaConversions;

/* loaded from: input_file:eu/dnetlib/dhp/incremental/CollectNewOafResults.class */
public class CollectNewOafResults {
    private static final Logger log = LoggerFactory.getLogger(RunSQLSparkJob.class);
    private final ArgumentApplicationParser parser;

    public CollectNewOafResults(ArgumentApplicationParser argumentApplicationParser) {
        this.parser = argumentApplicationParser;
    }

    public static void main(String[] strArr) throws Exception {
        ArgumentApplicationParser argumentApplicationParser = new ArgumentApplicationParser(IOUtils.toString(CollectNewOafResults.class.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/incremental/collect/collectnewresults_input_parameters.json")));
        argumentApplicationParser.parseArgument(strArr);
        Boolean bool = (Boolean) Optional.ofNullable(argumentApplicationParser.get("isSparkSessionManaged")).map(Boolean::valueOf).orElse(Boolean.TRUE);
        log.info("isSparkSessionManaged: {}", bool);
        String str = argumentApplicationParser.get("workingDir");
        log.info("workingDir is {}", str);
        String str2 = argumentApplicationParser.get("outputPath");
        log.info("outputPath is {}", str2);
        String str3 = argumentApplicationParser.get("mdStoreManagerURI");
        log.info("mdStoreManagerURI is {}", str3);
        String str4 = argumentApplicationParser.get("mdStoreID");
        if (StringUtils.isBlank(str4)) {
            throw new IllegalArgumentException("missing or empty argument mdStoreID");
        }
        String str5 = argumentApplicationParser.get("hiveDbName");
        log.info("hiveDbName is {}", str5);
        MDStoreVersion mDStoreVersion = (MDStoreVersion) DNetRestClient.doGET(String.format(MDStoreActionNode.READ_LOCK_URL, str3, str4), MDStoreVersion.class);
        log.info("mdstore data is {}", mDStoreVersion.toString());
        try {
            SparkConf sparkConf = new SparkConf();
            sparkConf.set("hive.metastore.uris", argumentApplicationParser.get("hiveMetastoreUris"));
            SparkSessionSupport.runWithSparkHiveSession(sparkConf, bool, sparkSession -> {
                sparkSession.read().text(mDStoreVersion.getHdfsPath() + Constants.MDSTORE_DATA_PATH).selectExpr(new String[]{"value", "get_json_object(value, '$.id') AS id"}).where("id IS NOT NULL").join(sparkSession.table(str5 + ".result").select("id", new String[0]).union(sparkSession.table(str5 + ".relation").where("relClass = 'merges'").selectExpr(new String[]{"target as id"})).distinct(), JavaConversions.asScalaBuffer(Collections.singletonList("id")), "left_anti").withColumn("oaftype", functions.udf(str6 -> {
                    return CopyHdfsOafSparkApplication.getOafType(str6);
                }, DataTypes.StringType).apply(new Column[]{new Column("value")})).write().partitionBy(new String[]{"oaftype"}).mode(SaveMode.Overwrite).option("compression", "gzip").parquet(str + "/entities");
                ModelSupport.oafTypes.keySet().forEach(str7 -> {
                    sparkSession.read().parquet(str + "/entities").filter("oaftype = '" + str7 + "'").select("value", new String[0]).write().option("compression", "gzip").mode(SaveMode.Append).text(str2 + "/" + str7);
                });
                Dataset select = sparkSession.read().parquet(str + "/entities").select("id", new String[0]);
                Dataset where = sparkSession.read().text(mDStoreVersion.getHdfsPath() + Constants.MDSTORE_DATA_PATH).selectExpr(new String[]{"value", "get_json_object(value, '$.source') AS source", "get_json_object(value, '$.target') AS target"}).where("source IS NOT NULL AND target IS NOT NULL");
                where.join(select.selectExpr(new String[]{"id as source"}), JavaConversions.asScalaBuffer(Collections.singletonList(SDOConstants.APPINFO_SOURCE_ATTRIBUTE)), "left_semi").union(where.join(select.selectExpr(new String[]{"id as target"}), JavaConversions.asScalaBuffer(Collections.singletonList("target")), "left_semi")).distinct().select("value", new String[0]).write().option("compression", "gzip").mode(SaveMode.Append).text(str2 + "/relation");
            });
            DNetRestClient.doGET(String.format(MDStoreActionNode.READ_UNLOCK_URL, str3, mDStoreVersion.getId()));
        } catch (Throwable th) {
            DNetRestClient.doGET(String.format(MDStoreActionNode.READ_UNLOCK_URL, str3, mDStoreVersion.getId()));
            throw th;
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -75002618:
                if (implMethodName.equals("lambda$main$684d9640$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/sql/api/java/UDF1") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("eu/dnetlib/dhp/incremental/CollectNewOafResults") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;)Ljava/lang/Object;")) {
                    return str6 -> {
                        return CopyHdfsOafSparkApplication.getOafType(str6);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
