package eu.dnetlib.dhp.collection;

import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.aggregation.AbstractVocabularyTest;
import eu.dnetlib.dhp.schema.mdstore.MDStoreVersion;
import eu.dnetlib.dhp.schema.mdstore.MetadataRecord;
import eu.dnetlib.dhp.schema.mdstore.Provenance;
import eu.dnetlib.dhp.transformation.TransformSparkJobNode;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FileReader;
import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.util.LongAccumulator;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.MethodOrderer;
import org.junit.jupiter.api.Order;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.TestMethodOrder;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.junit.jupiter.MockitoExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
@ExtendWith({MockitoExtension.class})
/* loaded from: input_file:eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJobTest.class */
public class GenerateNativeStoreSparkJobTest extends AbstractVocabularyTest {
    private static SparkSession spark;
    private static Path workingDir;
    private static Encoder<MetadataRecord> encoder;
    private static final String encoding = "XML";
    private static final String xpath = "//*[local-name()='header']/*[local-name()='identifier']";
    private static String provenance;
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
    private static final String dateOfCollection = System.currentTimeMillis() + "";
    private static final Logger log = LoggerFactory.getLogger(GenerateNativeStoreSparkJobTest.class);

    @BeforeAll
    public static void beforeAll() throws IOException {
        provenance = IOUtils.toString(GenerateNativeStoreSparkJobTest.class.getResourceAsStream("/eu/dnetlib/dhp/collection/provenance.json"));
        workingDir = Files.createTempDirectory(GenerateNativeStoreSparkJobTest.class.getSimpleName(), new FileAttribute[0]);
        log.info("using work dir {}", workingDir);
        SparkConf sparkConf = new SparkConf();
        sparkConf.setAppName(GenerateNativeStoreSparkJobTest.class.getSimpleName());
        sparkConf.setMaster("local[*]");
        sparkConf.set("spark.driver.host", "localhost");
        sparkConf.set("hive.metastore.local", "true");
        sparkConf.set("spark.ui.enabled", "false");
        sparkConf.set("spark.sql.warehouse.dir", workingDir.toString());
        sparkConf.set("hive.metastore.warehouse.dir", workingDir.resolve("warehouse").toString());
        encoder = Encoders.bean(MetadataRecord.class);
        spark = SparkSession.builder().appName(GenerateNativeStoreSparkJobTest.class.getSimpleName()).config(sparkConf).getOrCreate();
    }

    @AfterAll
    public static void afterAll() throws IOException {
        FileUtils.deleteDirectory(workingDir.toFile());
        spark.stop();
    }

    @Test
    @Order(1)
    void testGenerateNativeStoreSparkJobRefresh() throws Exception {
        MDStoreVersion prepareVersion = prepareVersion("/eu/dnetlib/dhp/collection/mdStoreVersion_1.json");
        FileUtils.forceMkdir(new File(prepareVersion.getHdfsPath()));
        IOUtils.copy(getClass().getResourceAsStream("/eu/dnetlib/dhp/collection/sequence_file"), new FileOutputStream(prepareVersion.getHdfsPath() + "/sequence_file"));
        GenerateNativeStoreSparkJob.main(new String[]{"-isSparkSessionManaged", Boolean.FALSE.toString(), "-encoding", encoding, "-dateOfCollection", dateOfCollection, "-provenance", provenance, "-xpath", xpath, "-mdStoreVersion", OBJECT_MAPPER.writeValueAsString(prepareVersion), "-readMdStoreVersion", "", "-workflowId", "abc"});
        verify(prepareVersion);
    }

    @Test
    @Order(2)
    void testGenerateNativeStoreSparkJobIncremental() throws Exception {
        MDStoreVersion prepareVersion = prepareVersion("/eu/dnetlib/dhp/collection/mdStoreVersion_2.json");
        FileUtils.forceMkdir(new File(prepareVersion.getHdfsPath()));
        IOUtils.copy(getClass().getResourceAsStream("/eu/dnetlib/dhp/collection/sequence_file"), new FileOutputStream(prepareVersion.getHdfsPath() + "/sequence_file"));
        GenerateNativeStoreSparkJob.main(new String[]{"-isSparkSessionManaged", Boolean.FALSE.toString(), "-encoding", encoding, "-dateOfCollection", dateOfCollection, "-provenance", provenance, "-xpath", xpath, "-mdStoreVersion", OBJECT_MAPPER.writeValueAsString(prepareVersion), "-readMdStoreVersion", OBJECT_MAPPER.writeValueAsString(prepareVersion("/eu/dnetlib/dhp/collection/mdStoreVersion_1.json")), "-workflowId", "abc"});
        verify(prepareVersion);
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Test
    @Order(3)
    void testTransformSparkJob() throws Exception {
        setUpVocabulary();
        MDStoreVersion prepareVersion = prepareVersion("/eu/dnetlib/dhp/collection/mdStoreVersion_2.json");
        MDStoreVersion prepareVersion2 = prepareVersion("/eu/dnetlib/dhp/collection/mdStoreCleanedVersion.json");
        mockupTrasformationRule("simpleTRule", "/eu/dnetlib/dhp/transform/ext_simple.xsl");
        TransformSparkJobNode.transformRecords((Map) Stream.of((Object[]) new String[]{new String[]{"dateOfTransformation", "1234"}, new String[]{"transformationPlugin", "XSLT_TRANSFORM"}, new String[]{"transformationRuleId", "simpleTRule"}}).collect(Collectors.toMap(strArr -> {
            return strArr[0];
        }, strArr2 -> {
            return strArr2[1];
        })), this.isLookUpService, spark, prepareVersion.getHdfsPath() + "/store", prepareVersion2.getHdfsPath(), 200);
        Dataset as = spark.read().format("parquet").load(prepareVersion2.getHdfsPath() + "/store").as(Encoders.bean(MetadataRecord.class));
        Long valueOf = Long.valueOf(as.count());
        long count = as.filter(metadataRecord -> {
            return metadataRecord.getDateOfTransformation().longValue() == 1234;
        }).count();
        long count2 = as.filter(metadataRecord2 -> {
            return !StringUtils.isBlank(metadataRecord2.getBody());
        }).count();
        Assertions.assertEquals(valueOf, count);
        Assertions.assertEquals(valueOf, count2);
    }

    @Test
    void testJSONSerialization() throws Exception {
        String iOUtils = IOUtils.toString(getClass().getResourceAsStream("mdStoreVersion_1.json"));
        System.out.println("s = " + iOUtils);
        Assertions.assertNotNull((MDStoreVersion) new ObjectMapper().readValue(iOUtils, MDStoreVersion.class));
    }

    @Test
    void testGenerationMetadataRecord() throws Exception {
        MetadataRecord parseRecord = GenerateNativeStoreSparkJob.parseRecord(IOUtils.toString(getClass().getResourceAsStream("./record.xml")), "./*[local-name()='record']/*[local-name()='header']/*[local-name()='identifier']", encoding, new Provenance("foo", "bar", "ns_prefix"), Long.valueOf(System.currentTimeMillis()), (LongAccumulator) null, (LongAccumulator) null);
        Assertions.assertNotNull(parseRecord.getId());
        Assertions.assertNotNull(parseRecord.getOriginalId());
    }

    @Test
    void testEquals() throws IOException {
        String iOUtils = IOUtils.toString(getClass().getResourceAsStream("./record.xml"));
        MetadataRecord parseRecord = GenerateNativeStoreSparkJob.parseRecord(iOUtils, "./*[local-name()='record']/*[local-name()='header']/*[local-name()='identifier']", encoding, new Provenance("foo", "bar", "ns_prefix"), Long.valueOf(System.currentTimeMillis()), (LongAccumulator) null, (LongAccumulator) null);
        MetadataRecord parseRecord2 = GenerateNativeStoreSparkJob.parseRecord(iOUtils, "./*[local-name()='record']/*[local-name()='header']/*[local-name()='identifier']", encoding, new Provenance("foo", "bar", "ns_prefix"), Long.valueOf(System.currentTimeMillis()), (LongAccumulator) null, (LongAccumulator) null);
        parseRecord.setBody("ciao");
        parseRecord2.setBody("mondo");
        Assertions.assertNotNull(parseRecord);
        Assertions.assertNotNull(parseRecord2);
        Assertions.assertEquals(parseRecord, parseRecord2);
    }

    protected void verify(MDStoreVersion mDStoreVersion) throws IOException {
        Assertions.assertTrue(new File(mDStoreVersion.getHdfsPath()).exists());
        long count = JavaSparkContext.fromSparkContext(spark.sparkContext()).sequenceFile(mDStoreVersion.getHdfsPath() + "/sequence_file", IntWritable.class, Text.class).count();
        Dataset as = spark.read().load(mDStoreVersion.getHdfsPath() + "/store").as(encoder);
        long count2 = as.count();
        Assertions.assertEquals(count, Long.parseLong(IOUtils.toString(new FileReader(mDStoreVersion.getHdfsPath() + "/size"))), "the size must be equal");
        Assertions.assertEquals(count, count2, "the size must be equal");
        Assertions.assertEquals(count, as.map((v0) -> {
            return v0.getId();
        }, Encoders.STRING()).distinct().count(), "the size must be equal");
    }

    public MDStoreVersion prepareVersion(String str) throws IOException {
        MDStoreVersion mDStoreVersion = (MDStoreVersion) OBJECT_MAPPER.readValue(IOUtils.toString(getClass().getResource(str)), MDStoreVersion.class);
        mDStoreVersion.setHdfsPath(String.format(mDStoreVersion.getHdfsPath(), workingDir.toString()));
        return mDStoreVersion;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1124415299:
                if (implMethodName.equals("lambda$testTransformSparkJob$44db9b02$1")) {
                    z = true;
                    break;
                }
                break;
            case 98245393:
                if (implMethodName.equals("getId")) {
                    z = 2;
                    break;
                }
                break;
            case 1901096502:
                if (implMethodName.equals("lambda$testTransformSparkJob$9b6b469d$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/FilterFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Z") && serializedLambda.getImplClass().equals("eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJobTest") && serializedLambda.getImplMethodSignature().equals("(Leu/dnetlib/dhp/schema/mdstore/MetadataRecord;)Z")) {
                    return metadataRecord -> {
                        return metadataRecord.getDateOfTransformation().longValue() == 1234;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/FilterFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Z") && serializedLambda.getImplClass().equals("eu/dnetlib/dhp/collection/GenerateNativeStoreSparkJobTest") && serializedLambda.getImplMethodSignature().equals("(Leu/dnetlib/dhp/schema/mdstore/MetadataRecord;)Z")) {
                    return metadataRecord2 -> {
                        return !StringUtils.isBlank(metadataRecord2.getBody());
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 5 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("eu/dnetlib/dhp/schema/mdstore/MetadataRecord") && serializedLambda.getImplMethodSignature().equals("()Ljava/lang/String;")) {
                    return (v0) -> {
                        return v0.getId();
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
