package eu.dnetlib.dhp.actionmanager.partition;

import eu.dnetlib.dhp.actionmanager.Constants;
import eu.dnetlib.dhp.actionmanager.ISClient;
import eu.dnetlib.dhp.actionmanager.promote.PromoteActionPayloadForGraphTableJob;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.common.SparkSessionSupport;
import java.lang.invoke.SerializedLambda;
import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.io.Text;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField$;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.types.StructType$;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:eu/dnetlib/dhp/actionmanager/partition/PartitionActionSetsByPayloadTypeJob.class */
public class PartitionActionSetsByPayloadTypeJob {
    private static final Logger logger = LoggerFactory.getLogger(PartitionActionSetsByPayloadTypeJob.class);
    private static final StructType KV_SCHEMA = StructType$.MODULE$.apply(Arrays.asList(StructField$.MODULE$.apply("key", DataTypes.StringType, false, Metadata.empty()), StructField$.MODULE$.apply("value", DataTypes.StringType, false, Metadata.empty())));
    private static final StructType ATOMIC_ACTION_SCHEMA = StructType$.MODULE$.apply(Arrays.asList(StructField$.MODULE$.apply("clazz", DataTypes.StringType, false, Metadata.empty()), StructField$.MODULE$.apply("payload", DataTypes.StringType, false, Metadata.empty())));
    private ISClient isClient;

    public PartitionActionSetsByPayloadTypeJob(String str) {
        this.isClient = new ISClient(str);
    }

    public PartitionActionSetsByPayloadTypeJob() {
    }

    public static void main(String[] strArr) throws Exception {
        ArgumentApplicationParser argumentApplicationParser = new ArgumentApplicationParser(IOUtils.toString(PromoteActionPayloadForGraphTableJob.class.getResourceAsStream("/eu/dnetlib/dhp/actionmanager/partition/partition_action_sets_by_payload_type_input_parameters.json")));
        argumentApplicationParser.parseArgument(strArr);
        Boolean bool = (Boolean) Optional.ofNullable(argumentApplicationParser.get("isSparkSessionManaged")).map(Boolean::valueOf).orElse(Boolean.TRUE);
        logger.info("isSparkSessionManaged: {}", bool);
        String str = argumentApplicationParser.get("inputActionSetIds");
        logger.info("inputActionSetIds: {}", str);
        String str2 = argumentApplicationParser.get("outputPath");
        logger.info("outputPath: {}", str2);
        String str3 = argumentApplicationParser.get("isLookupUrl");
        logger.info("isLookupUrl: {}", str3);
        new PartitionActionSetsByPayloadTypeJob(str3).run(bool, str, str2);
    }

    protected void run(Boolean bool, String str, String str2) {
        List<String> latestRawsetPaths = getIsClient().getLatestRawsetPaths(str);
        logger.info("inputActionSetPaths: {}", String.join(Constants.DEFAULT_DELIMITER, latestRawsetPaths));
        SparkConf sparkConf = new SparkConf();
        sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        SparkSessionSupport.runWithSparkSession(sparkConf, bool, sparkSession -> {
            removeOutputDir(sparkSession, str2);
            readAndWriteActionSetsFromPaths(sparkSession, latestRawsetPaths, str2);
        });
    }

    private static void removeOutputDir(SparkSession sparkSession, String str) {
        HdfsSupport.remove(str, sparkSession.sparkContext().hadoopConfiguration());
    }

    private static void readAndWriteActionSetsFromPaths(SparkSession sparkSession, List<String> list, String str) {
        list.stream().filter(str2 -> {
            return HdfsSupport.exists(str2, sparkSession.sparkContext().hadoopConfiguration());
        }).forEach(str3 -> {
            saveActions(readActionSetFromPath(sparkSession, str3), str);
        });
    }

    private static Dataset<Row> readActionSetFromPath(SparkSession sparkSession, String str) {
        logger.info("Reading actions from path: {}", str);
        return sparkSession.createDataFrame(JavaSparkContext.fromSparkContext(sparkSession.sparkContext()).sequenceFile(str, Text.class, Text.class).map(tuple2 -> {
            return RowFactory.create(new Object[]{((Text) tuple2.mo9805_1()).toString(), ((Text) tuple2.mo9804_2()).toString()});
        }), KV_SCHEMA).withColumn("atomic_action", functions.from_json(functions.col("value"), ATOMIC_ACTION_SCHEMA)).select(new Column[]{functions.expr("atomic_action.*")});
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static void saveActions(Dataset<Row> dataset, String str) {
        logger.info("Saving actions to path: {}", str);
        dataset.write().partitionBy(new String[]{"clazz"}).mode(SaveMode.Append).parquet(str);
    }

    public ISClient getIsClient() {
        return this.isClient;
    }

    public void setIsClient(ISClient iSClient) {
        this.isClient = iSClient;
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1043453591:
                if (implMethodName.equals("lambda$readActionSetFromPath$174ea90a$1")) {
                    z = false;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/Function") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("eu/dnetlib/dhp/actionmanager/partition/PartitionActionSetsByPayloadTypeJob") && serializedLambda.getImplMethodSignature().equals("(Lscala/Tuple2;)Lorg/apache/spark/sql/Row;")) {
                    return tuple2 -> {
                        return RowFactory.create(new Object[]{((Text) tuple2.mo9805_1()).toString(), ((Text) tuple2.mo9804_2()).toString()});
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
