package eu.dnetlib.dhp.actionmanager.promote;

import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.actionmanager.promote.MergeAndGet;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Dataset;
import eu.dnetlib.dhp.schema.oaf.Datasource;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.Organization;
import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct;
import eu.dnetlib.dhp.schema.oaf.Project;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.Software;
import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.net.URL;
import java.nio.file.Files;
import java.nio.file.LinkOption;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.attribute.FileAttribute;
import java.util.Comparator;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.io.FileUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;

/* loaded from: input_file:eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadForGraphTableJobTest.class */
public class PromoteActionPayloadForGraphTableJobTest {
    private static SparkSession spark;
    private Path workingDir;
    private Path inputDir;
    private Path inputGraphRootDir;
    private Path inputActionPayloadRootDir;
    private Path outputDir;
    private static final ClassLoader cl = PromoteActionPayloadForGraphTableJobTest.class.getClassLoader();
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper().configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false);

    @DisplayName("Job")
    @Nested
    /* loaded from: input_file:eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadForGraphTableJobTest$Main.class */
    class Main {
        Main() {
        }

        @Test
        void shouldThrowWhenGraphTableClassIsNotASubClassOfActionPayloadClass() {
            Class<Relation> cls = Relation.class;
            Class<OafEntity> cls2 = OafEntity.class;
            Assertions.assertTrue(((RuntimeException) Assertions.assertThrows(RuntimeException.class, () -> {
                PromoteActionPayloadForGraphTableJob.main(new String[]{"-isSparkSessionManaged", Boolean.FALSE.toString(), "-inputGraphTablePath", "", "-graphTableClassName", cls.getCanonicalName(), "-inputActionPayloadPath", "", "-actionPayloadClassName", cls2.getCanonicalName(), "-outputGraphTablePath", "", "-mergeAndGetStrategy", MergeAndGet.Strategy.SELECT_NEWER_AND_GET.name(), "--shouldGroupById", "true"});
            })).getMessage().contains(String.format("graph table class is not a subclass of action payload class: graph=%s, action=%s", Relation.class.getCanonicalName(), OafEntity.class.getCanonicalName())));
        }

        @MethodSource({"eu.dnetlib.dhp.actionmanager.promote.PromoteActionPayloadForGraphTableJobTest#promoteJobTestParams"})
        @ParameterizedTest(name = "strategy: {0}, graph table: {1}, action payload: {2}")
        void shouldPromoteActionPayloadForGraphTable(MergeAndGet.Strategy strategy, Class<? extends Oaf> cls, Class<? extends Oaf> cls2) throws Exception {
            Path createGraphTable = PromoteActionPayloadForGraphTableJobTest.createGraphTable(PromoteActionPayloadForGraphTableJobTest.this.inputGraphRootDir, cls);
            Path createActionPayload = PromoteActionPayloadForGraphTableJobTest.createActionPayload(PromoteActionPayloadForGraphTableJobTest.this.inputActionPayloadRootDir, cls, cls2);
            Path resolve = PromoteActionPayloadForGraphTableJobTest.this.outputDir.resolve("graph").resolve(cls.getSimpleName().toLowerCase());
            PromoteActionPayloadForGraphTableJob.main(new String[]{"-isSparkSessionManaged", Boolean.FALSE.toString(), "-inputGraphTablePath", createGraphTable.toString(), "-graphTableClassName", cls.getCanonicalName(), "-inputActionPayloadPath", createActionPayload.toString(), "-actionPayloadClassName", cls2.getCanonicalName(), "-outputGraphTablePath", resolve.toString(), "-mergeAndGetStrategy", strategy.name(), "--shouldGroupById", "true"});
            Assertions.assertTrue(Files.exists(resolve, new LinkOption[0]));
            Assertions.assertIterableEquals((List) PromoteActionPayloadForGraphTableJobTest.readGraphTableFromJsonDump(Paths.get(((URL) Objects.requireNonNull(PromoteActionPayloadForGraphTableJobTest.cl.getResource(PromoteActionPayloadForGraphTableJobTest.resultFileLocation(strategy, cls, cls2)))).getFile(), new String[0]).toString(), cls).collectAsList().stream().map(oaf -> {
                oaf.setLastupdatetimestamp(0L);
                return oaf;
            }).sorted(Comparator.comparingInt((v0) -> {
                return v0.hashCode();
            })).collect(Collectors.toList()), (List) PromoteActionPayloadForGraphTableJobTest.readGraphTableFromJobOutput(resolve.toString(), cls).collectAsList().stream().map(oaf2 -> {
                oaf2.setLastupdatetimestamp(0L);
                return oaf2;
            }).sorted(Comparator.comparingInt((v0) -> {
                return v0.hashCode();
            })).collect(Collectors.toList()));
        }
    }

    @BeforeAll
    public static void beforeAll() {
        SparkConf sparkConf = new SparkConf();
        sparkConf.setAppName(PromoteActionPayloadForGraphTableJobTest.class.getSimpleName());
        sparkConf.setMaster("local");
        sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        sparkConf.registerKryoClasses(ModelSupport.getOafModelClasses());
        spark = SparkSession.builder().config(sparkConf).getOrCreate();
    }

    @BeforeEach
    public void beforeEach() throws IOException {
        this.workingDir = Files.createTempDirectory(PromoteActionPayloadForGraphTableJobTest.class.getSimpleName(), new FileAttribute[0]);
        this.inputDir = this.workingDir.resolve("input");
        this.inputGraphRootDir = this.inputDir.resolve("graph");
        this.inputActionPayloadRootDir = this.inputDir.resolve("action_payload");
        this.outputDir = this.workingDir.resolve("output");
    }

    @AfterEach
    public void afterEach() throws IOException {
        FileUtils.deleteDirectory(this.workingDir.toFile());
    }

    @AfterAll
    public static void afterAll() {
        spark.stop();
    }

    @Test
    void shouldPromoteActionPayload_custom() throws Exception {
        MergeAndGet.Strategy strategy = MergeAndGet.Strategy.MERGE_FROM_AND_GET;
        Path createGraphTable = createGraphTable(this.inputGraphRootDir, Publication.class);
        Path createActionPayload = createActionPayload(this.inputActionPayloadRootDir, Publication.class, Result.class);
        Path resolve = this.outputDir.resolve("graph").resolve(Publication.class.getSimpleName().toLowerCase());
        PromoteActionPayloadForGraphTableJob.main(new String[]{"-isSparkSessionManaged", Boolean.FALSE.toString(), "-inputGraphTablePath", createGraphTable.toString(), "-graphTableClassName", Publication.class.getCanonicalName(), "-inputActionPayloadPath", createActionPayload.toString(), "-actionPayloadClassName", Result.class.getCanonicalName(), "-outputGraphTablePath", resolve.toString(), "-mergeAndGetStrategy", strategy.name(), "--shouldGroupById", "true"});
        Assertions.assertTrue(Files.exists(resolve, new LinkOption[0]));
        Publication publication = (Publication) ((List) readGraphTableFromJobOutput(resolve.toString(), Publication.class).collectAsList().stream().sorted(Comparator.comparingInt((v0) -> {
            return v0.hashCode();
        })).collect(Collectors.toList())).stream().map(oaf -> {
            return (Publication) oaf;
        }).filter(publication2 -> {
            return "50|4ScienceCRIS::6a67ed3daba1c380bf9de3c13ed9c879".equals(publication2.getId());
        }).findFirst().get();
        Assertions.assertNotNull(publication.getMeasures());
        Assertions.assertTrue(publication.getMeasures().size() > 0);
    }

    public static Stream<Arguments> promoteJobTestParams() {
        return Stream.of((Object[]) new Arguments[]{Arguments.arguments(new Object[]{MergeAndGet.Strategy.MERGE_FROM_AND_GET, Dataset.class, Dataset.class}), Arguments.arguments(new Object[]{MergeAndGet.Strategy.MERGE_FROM_AND_GET, Dataset.class, Result.class}), Arguments.arguments(new Object[]{MergeAndGet.Strategy.MERGE_FROM_AND_GET, Datasource.class, Datasource.class}), Arguments.arguments(new Object[]{MergeAndGet.Strategy.MERGE_FROM_AND_GET, Organization.class, Organization.class}), Arguments.arguments(new Object[]{MergeAndGet.Strategy.MERGE_FROM_AND_GET, OtherResearchProduct.class, OtherResearchProduct.class}), Arguments.arguments(new Object[]{MergeAndGet.Strategy.MERGE_FROM_AND_GET, OtherResearchProduct.class, Result.class}), Arguments.arguments(new Object[]{MergeAndGet.Strategy.MERGE_FROM_AND_GET, Project.class, Project.class}), Arguments.arguments(new Object[]{MergeAndGet.Strategy.MERGE_FROM_AND_GET, Publication.class, Publication.class}), Arguments.arguments(new Object[]{MergeAndGet.Strategy.MERGE_FROM_AND_GET, Publication.class, Result.class}), Arguments.arguments(new Object[]{MergeAndGet.Strategy.MERGE_FROM_AND_GET, Relation.class, Relation.class}), Arguments.arguments(new Object[]{MergeAndGet.Strategy.MERGE_FROM_AND_GET, Software.class, Software.class}), Arguments.arguments(new Object[]{MergeAndGet.Strategy.MERGE_FROM_AND_GET, Software.class, Result.class})});
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <G extends Oaf> Path createGraphTable(Path path, Class<G> cls) {
        org.apache.spark.sql.Dataset readGraphTableFromJsonDump = readGraphTableFromJsonDump(Paths.get(((URL) Objects.requireNonNull(cl.getResource(inputGraphTableJsonDumpLocation(cls)))).getFile(), new String[0]).toString(), cls);
        Path resolve = path.resolve(cls.getSimpleName().toLowerCase());
        writeGraphTableAaJobInput(readGraphTableFromJsonDump, resolve.toString());
        return resolve;
    }

    private static String inputGraphTableJsonDumpLocation(Class<? extends Oaf> cls) {
        return String.format("%s/%s.json", "eu/dnetlib/dhp/actionmanager/promote/input/graph", cls.getSimpleName().toLowerCase());
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <G extends Oaf> org.apache.spark.sql.Dataset<G> readGraphTableFromJsonDump(String str, Class<G> cls) {
        return spark.read().textFile(str).map(str2 -> {
            return (Oaf) OBJECT_MAPPER.readValue(str2, cls);
        }, Encoders.bean(cls));
    }

    private static <G extends Oaf> void writeGraphTableAaJobInput(org.apache.spark.sql.Dataset<G> dataset, String str) {
        dataset.write().option("compression", "gzip").json(str);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <G extends Oaf, A extends Oaf> Path createActionPayload(Path path, Class<G> cls, Class<A> cls2) {
        org.apache.spark.sql.Dataset<String> readActionPayloadFromJsonDump = readActionPayloadFromJsonDump(Paths.get(((URL) Objects.requireNonNull(cl.getResource(inputActionPayloadJsonDumpLocation(cls, cls2)))).getFile(), new String[0]).toString());
        Path resolve = path.resolve(cls2.getSimpleName().toLowerCase());
        writeActionPayloadAsJobInput(readActionPayloadFromJsonDump, resolve.toString());
        return resolve;
    }

    private static String inputActionPayloadJsonDumpLocation(Class<? extends Oaf> cls, Class<? extends Oaf> cls2) {
        return String.format("eu/dnetlib/dhp/actionmanager/promote/input/action_payload/%s_table/%s.json", cls.getSimpleName().toLowerCase(), cls2.getSimpleName().toLowerCase());
    }

    private static org.apache.spark.sql.Dataset<String> readActionPayloadFromJsonDump(String str) {
        return spark.read().textFile(str);
    }

    private static void writeActionPayloadAsJobInput(org.apache.spark.sql.Dataset<String> dataset, String str) {
        dataset.withColumnRenamed("value", "payload").write().parquet(str);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <G extends Oaf> org.apache.spark.sql.Dataset<G> readGraphTableFromJobOutput(String str, Class<G> cls) {
        return spark.read().textFile(str).map(str2 -> {
            return (Oaf) OBJECT_MAPPER.readValue(str2, cls);
        }, Encoders.bean(cls));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static String resultFileLocation(MergeAndGet.Strategy strategy, Class<? extends Oaf> cls, Class<? extends Oaf> cls2) {
        return String.format("eu/dnetlib/dhp/actionmanager/promote/output/graph/%s/%s/%s_action_payload/result.json", strategy.name().toLowerCase(), cls.getSimpleName().toLowerCase(), cls2.getSimpleName().toLowerCase());
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 845785239:
                if (implMethodName.equals("lambda$readGraphTableFromJsonDump$ac26491e$1")) {
                    z = true;
                    break;
                }
                break;
            case 1829049935:
                if (implMethodName.equals("lambda$readGraphTableFromJobOutput$ac26491e$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadForGraphTableJobTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Class;Ljava/lang/String;)Leu/dnetlib/dhp/schema/oaf/Oaf;")) {
                    Class cls = (Class) serializedLambda.getCapturedArg(0);
                    return str2 -> {
                        return (Oaf) OBJECT_MAPPER.readValue(str2, cls);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("eu/dnetlib/dhp/actionmanager/promote/PromoteActionPayloadForGraphTableJobTest") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Class;Ljava/lang/String;)Leu/dnetlib/dhp/schema/oaf/Oaf;")) {
                    Class cls2 = (Class) serializedLambda.getCapturedArg(0);
                    return str22 -> {
                        return (Oaf) OBJECT_MAPPER.readValue(str22, cls2);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
