package eu.dnetlib.dhp.oa.graph.dump.complete;

import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.SparkSessionSupport;
import eu.dnetlib.dhp.oa.graph.dump.Utils;
import eu.dnetlib.dhp.oa.model.graph.GraphResult;
import eu.dnetlib.dhp.oa.model.graph.Relation;
import java.io.Serializable;
import java.util.Optional;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:eu/dnetlib/dhp/oa/graph/dump/complete/SparkCollectAndSave.class */
public class SparkCollectAndSave implements Serializable {
    private static final Logger log = LoggerFactory.getLogger(SparkCollectAndSave.class);

    public static void main(String[] strArr) throws Exception {
        ArgumentApplicationParser argumentApplicationParser = new ArgumentApplicationParser(IOUtils.toString(SparkCollectAndSave.class.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/dump/input_collect_and_save.json")));
        argumentApplicationParser.parseArgument(strArr);
        Boolean bool = (Boolean) Optional.ofNullable(argumentApplicationParser.get("isSparkSessionManaged")).map(Boolean::valueOf).orElse(Boolean.TRUE);
        log.info("isSparkSessionManaged: {}", bool);
        String str = argumentApplicationParser.get("sourcePath");
        log.info("inputPath: {}", str);
        String str2 = argumentApplicationParser.get("outputPath");
        log.info("outputPath: {}", str2);
        Boolean bool2 = (Boolean) Optional.ofNullable(argumentApplicationParser.get("resultAggregation")).map(Boolean::valueOf).orElse(Boolean.TRUE);
        SparkSessionSupport.runWithSparkSession(new SparkConf(), bool, sparkSession -> {
            Utils.removeOutputDir(sparkSession, str2 + "/result");
            run(sparkSession, str, str2, bool2.booleanValue());
        });
    }

    private static void run(SparkSession sparkSession, String str, String str2, boolean z) {
        if (z) {
            Utils.readPath(sparkSession, str + "/result/publication", GraphResult.class).union(Utils.readPath(sparkSession, str + "/result/dataset", GraphResult.class)).union(Utils.readPath(sparkSession, str + "/result/otherresearchproduct", GraphResult.class)).union(Utils.readPath(sparkSession, str + "/result/software", GraphResult.class)).write().option("compression", "gzip").mode(SaveMode.Overwrite).json(str2 + "/result");
        } else {
            write(Utils.readPath(sparkSession, str + "/result/publication", GraphResult.class), str2 + "/publication");
            write(Utils.readPath(sparkSession, str + "/result/dataset", GraphResult.class), str2 + "/dataset");
            write(Utils.readPath(sparkSession, str + "/result/otherresearchproduct", GraphResult.class), str2 + "/otheresearchproduct");
            write(Utils.readPath(sparkSession, str + "/result/software", GraphResult.class), str2 + "/software");
        }
        Utils.readPath(sparkSession, str + "/relation/publication", Relation.class).union(Utils.readPath(sparkSession, str + "/relation/dataset", Relation.class)).union(Utils.readPath(sparkSession, str + "/relation/orp", Relation.class)).union(Utils.readPath(sparkSession, str + "/relation/software", Relation.class)).union(Utils.readPath(sparkSession, str + "/relation/contextOrg", Relation.class)).union(Utils.readPath(sparkSession, str + "/relation/context", Relation.class)).union(Utils.readPath(sparkSession, str + "/relation/relation", Relation.class)).write().mode(SaveMode.Overwrite).option("compression", "gzip").json(str2 + "/relation");
    }

    private static void write(Dataset<GraphResult> dataset, String str) {
        dataset.write().option("compression", "gzip").mode(SaveMode.Overwrite).json(str);
    }
}
