package eu.dnetlib.dhp.transformation;

import eu.dnetlib.dhp.aggregation.common.AggregationCounter;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.collection.GenerateNativeStoreSparkJob;
import eu.dnetlib.dhp.common.SparkSessionSupport;
import eu.dnetlib.dhp.common.aggregation.AggregatorReport;
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
import eu.dnetlib.dhp.message.MessageSender;
import eu.dnetlib.dhp.schema.mdstore.MDStoreVersion;
import eu.dnetlib.dhp.schema.mdstore.MetadataRecord;
import eu.dnetlib.dhp.utils.DHPUtils;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import java.io.IOException;
import java.lang.invoke.SerializedLambda;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:eu/dnetlib/dhp/transformation/TransformSparkJobNode.class */
public class TransformSparkJobNode {
    private static final Logger log = LoggerFactory.getLogger(TransformSparkJobNode.class);
    private static final int RECORDS_PER_TASK = 200;

    public static void main(String[] strArr) throws Exception {
        ArgumentApplicationParser argumentApplicationParser = new ArgumentApplicationParser(IOUtils.toString(TransformSparkJobNode.class.getResourceAsStream("/eu/dnetlib/dhp/transformation/transformation_input_parameters.json")));
        argumentApplicationParser.parseArgument(strArr);
        Boolean bool = (Boolean) Optional.ofNullable(argumentApplicationParser.get("isSparkSessionManaged")).map(Boolean::valueOf).orElse(Boolean.TRUE);
        log.info("isSparkSessionManaged: {}", bool);
        String str = argumentApplicationParser.get("mdstoreInputVersion");
        String str2 = argumentApplicationParser.get("mdstoreOutputVersion");
        String str3 = ((MDStoreVersion) DHPUtils.MAPPER.readValue(str, MDStoreVersion.class)).getHdfsPath() + "/store";
        log.info("inputPath: {}", str3);
        String hdfsPath = ((MDStoreVersion) DHPUtils.MAPPER.readValue(str2, MDStoreVersion.class)).getHdfsPath();
        log.info("outputBasePath: {}", hdfsPath);
        String str4 = argumentApplicationParser.get("isLookupUrl");
        log.info("isLookupUrl: {}", str4);
        log.info("dateOfTransformation: {}", argumentApplicationParser.get("dateOfTransformation"));
        Integer num = (Integer) Optional.ofNullable(argumentApplicationParser.get("recordsPerTask")).map(Integer::valueOf).orElse(Integer.valueOf(RECORDS_PER_TASK));
        ISLookUpService lookUpService = ISLookupClientFactory.getLookUpService(str4);
        log.info("Retrieved {} vocabularies", Integer.valueOf(VocabularyGroup.loadVocsFromIS(lookUpService).vocabularyNames().size()));
        SparkSessionSupport.runWithSparkSession(new SparkConf(), bool, sparkSession -> {
            transformRecords(argumentApplicationParser.getObjectMap(), lookUpService, sparkSession, str3, hdfsPath, num);
        });
    }

    public static void transformRecords(Map<String, String> map, ISLookUpService iSLookUpService, SparkSession sparkSession, String str, String str2, Integer num) throws DnetTransformationException, IOException {
        AggregationCounter aggregationCounter = new AggregationCounter(sparkSession.sparkContext().longAccumulator("TotalItems"), sparkSession.sparkContext().longAccumulator("InvalidRecords"), sparkSession.sparkContext().longAccumulator("transformedItems"));
        Encoder bean = Encoders.bean(MetadataRecord.class);
        String str3 = map.get("dnetMessageManagerURL");
        log.info("dnetMessageManagerURL is {}", str3);
        String str4 = map.get("workflowId");
        log.info("workflowId is {}", str4);
        MapFunction<MetadataRecord, MetadataRecord> transformationPlugin = TransformationFactory.getTransformationPlugin(map, aggregationCounter, iSLookUpService);
        Dataset parquet = sparkSession.read().parquet(str);
        Dataset as = (ArrayUtils.contains(parquet.schema().fieldNames(), GenerateNativeStoreSparkJob.VALIDATION_RESULTS_FIELD) ? parquet : parquet.withColumn(GenerateNativeStoreSparkJob.VALIDATION_RESULTS_FIELD, functions.map(new Column[0]))).as(bean);
        long count = as.count();
        AggregatorReport aggregatorReport = new AggregatorReport(new MessageSender(str3, str4));
        try {
            try {
                JavaRDD repartition = as.javaRDD().repartition(getRepartitionNumber(count, num));
                Objects.requireNonNull(transformationPlugin);
                DHPUtils.saveDataset(sparkSession.createDataset(repartition.map((v1) -> {
                    return r1.call(v1);
                }).filter((v0) -> {
                    return Objects.nonNull(v0);
                }).rdd(), bean), str2 + "/store");
                log.info("Transformed item {}", Long.valueOf(aggregationCounter.getProcessedItems().count()));
                log.info("Total item {}", Long.valueOf(aggregationCounter.getTotalItems().count()));
                log.info("Transformation Error item {}", Long.valueOf(aggregationCounter.getErrorItems().count()));
                DHPUtils.writeHdfsFile(sparkSession.sparkContext().hadoopConfiguration(), "" + sparkSession.read().load(str2 + "/store").count(), str2 + "/size");
                aggregatorReport.close();
            } finally {
            }
        } catch (Throwable th) {
            try {
                aggregatorReport.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    private static int getRepartitionNumber(long j, Integer num) {
        return Math.max(1, (int) (j / num.intValue()));
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case 3045982:
                if (implMethodName.equals("call")) {
                    z = false;
                    break;
                }
                break;
            case 2123019764:
                if (implMethodName.equals("nonNull")) {
                    z = true;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 9 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("org/apache/spark/api/java/function/MapFunction") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;")) {
                    MapFunction mapFunction = (MapFunction) serializedLambda.getCapturedArg(0);
                    return (v1) -> {
                        return r0.call(v1);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("java/util/Objects") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;)Z")) {
                    return (v0) -> {
                        return Objects.nonNull(v0);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
