package eu.dnetlib.dhp.oa.graph.raw;

import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.common.SparkSessionSupport;
import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication;
import java.io.StringReader;
import java.lang.invoke.SerializedLambda;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.compress.GzipCodec;
import org.apache.hadoop.mapred.SequenceFileOutputFormat;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.dom4j.Document;
import org.dom4j.Element;
import org.dom4j.Namespace;
import org.dom4j.QName;
import org.dom4j.io.SAXReader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;

/* loaded from: input_file:eu/dnetlib/dhp/oa/graph/raw/MigrateHdfsMdstoresApplication.class */
public class MigrateHdfsMdstoresApplication extends AbstractMigrationApplication {
    private static final Logger log = LoggerFactory.getLogger(MigrateHdfsMdstoresApplication.class);
    private static final Namespace DRI_NS_PREFIX = new Namespace("dri", "http://www.driver-repository.eu/namespace/dri");

    public static void main(String[] strArr) throws Exception {
        ArgumentApplicationParser argumentApplicationParser = new ArgumentApplicationParser(IOUtils.toString(MigrateHdfsMdstoresApplication.class.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/migrate_hdfs_mstores_parameters.json")));
        argumentApplicationParser.parseArgument(strArr);
        Boolean bool = (Boolean) Optional.ofNullable(argumentApplicationParser.get("isSparkSessionManaged")).map(Boolean::valueOf).orElse(Boolean.TRUE);
        log.info("isSparkSessionManaged: {}", bool);
        String str = argumentApplicationParser.get("mdstoreManagerUrl");
        String str2 = argumentApplicationParser.get("mdFormat");
        String str3 = argumentApplicationParser.get("mdLayout");
        String str4 = argumentApplicationParser.get("mdInterpretation");
        String str5 = argumentApplicationParser.get("hdfsPath");
        Set<String> mdstorePaths = mdstorePaths(str, str2, str3, str4);
        SparkSessionSupport.runWithSparkSession(new SparkConf(), bool, sparkSession -> {
            HdfsSupport.remove(str5, sparkSession.sparkContext().hadoopConfiguration());
            processPaths(sparkSession, str5, mdstorePaths, String.format("%s-%s-%s", str2, str3, str4));
        });
    }

    public static void processPaths(SparkSession sparkSession, String str, Set<String> set, String str2) {
        JavaSparkContext fromSparkContext = JavaSparkContext.fromSparkContext(sparkSession.sparkContext());
        log.info("Found {} not empty mdstores", Integer.valueOf(set.size()));
        Logger logger = log;
        logger.getClass();
        set.forEach(logger::info);
        String[] strArr = (String[]) set.stream().filter(str3 -> {
            return HdfsSupport.exists(str3, fromSparkContext.hadoopConfiguration());
        }).toArray(i -> {
            return new String[i];
        });
        if (strArr.length > 0) {
            sparkSession.read().parquet(strArr).map(MigrateHdfsMdstoresApplication::enrichRecord, Encoders.STRING()).toJavaRDD().mapToPair(str4 -> {
                return new Tuple2(new Text(UUID.randomUUID() + ":" + str2), new Text(str4));
            }).saveAsHadoopFile(str, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class);
        } else {
            sparkSession.emptyDataFrame().toJavaRDD().mapToPair(row -> {
                return new Tuple2(new Text(), new Text());
            }).saveAsHadoopFile(str, Text.class, Text.class, SequenceFileOutputFormat.class, GzipCodec.class);
        }
    }

    private static String enrichRecord(Row row) {
        String str = (String) row.getAs("body");
        SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ssZ");
        String format = simpleDateFormat.format(new Date(((Long) row.getAs("dateOfCollection")).longValue()));
        String format2 = simpleDateFormat.format(new Date(((Long) row.getAs("dateOfTransformation")).longValue()));
        try {
            SAXReader sAXReader = new SAXReader();
            sAXReader.setFeature("http://apache.org/xml/features/disallow-doctype-decl", true);
            Document read = sAXReader.read(new StringReader(str));
            Element selectSingleNode = read.selectSingleNode("//*[local-name() = 'header']");
            selectSingleNode.addElement(new QName("objIdentifier", DRI_NS_PREFIX)).addText((String) row.getAs("id"));
            selectSingleNode.addElement(new QName("dateOfCollection", DRI_NS_PREFIX)).addText(format);
            selectSingleNode.addElement(new QName("dateOfTransformation", DRI_NS_PREFIX)).addText(format2);
            return read.asXML();
        } catch (Exception e) {
            log.error("Error patching record: " + str);
            throw new RuntimeException("Error patching record: " + str, e);
        }
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -681002570:
                if (implMethodName.equals("enrichRecord")) {
                    z = 2;
                    break;
                }
                break;
            case 67890742:
                if (implMethodName.equals("lambda$processPaths$2fc49786$1")) {
                    z = true;
                    break;
                }
                break;
            case 397203749:
                if (implMethodName.equals("lambda$processPaths$f30c4d22$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/PairFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Lscala/Tuple2;") && serializedLambda.getImplClass().equals("eu/dnetlib/dhp/oa/graph/raw/MigrateHdfsMdstoresApplication") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;Ljava/lang/String;)Lscala/Tuple2;")) {
                    String str = (String) serializedLambda.getCapturedArg(0);
                    return str4 -> {
                        return new Tuple2(new Text(UUID.randomUUID() + ":" + str), new Text(str4));
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/PairFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Lscala/Tuple2;") && serializedLambda.getImplClass().equals("eu/dnetlib/dhp/oa/graph/raw/MigrateHdfsMdstoresApplication") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/spark/sql/Row;)Lscala/Tuple2;")) {
                    return row -> {
                        return new Tuple2(new Text(), new Text());
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("eu/dnetlib/dhp/oa/graph/raw/MigrateHdfsMdstoresApplication") && serializedLambda.getImplMethodSignature().equals("(Lorg/apache/spark/sql/Row;)Ljava/lang/String;")) {
                    return MigrateHdfsMdstoresApplication::enrichRecord;
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
