package eu.dnetlib.dhp.sx.graph;

import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.oaf.Dataset;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.OtherResearchProduct;
import eu.dnetlib.dhp.schema.oaf.Publication;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.Software;
import eu.dnetlib.dhp.schema.oaf.utils.MergeUtils;
import java.util.Iterator;
import org.apache.commons.io.IOUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders$;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.SparkSession$;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
import scala.collection.JavaConverters$;
import scala.collection.immutable.C$colon$colon;
import scala.collection.immutable.Nil$;
import scala.runtime.BoxedUnit;
import scala.runtime.BoxesRunTime;

/* compiled from: SparkCreateInputGraph.scala */
/* loaded from: input_file:eu/dnetlib/dhp/sx/graph/SparkCreateInputGraph$.class */
public final class SparkCreateInputGraph$ {
    public static SparkCreateInputGraph$ MODULE$;

    static {
        new SparkCreateInputGraph$();
    }

    public void main(String[] strArr) {
        Logger logger = LoggerFactory.getLogger(getClass());
        SparkConf sparkConf = new SparkConf();
        ArgumentApplicationParser argumentApplicationParser = new ArgumentApplicationParser(IOUtils.toString(getClass().getResourceAsStream("/eu/dnetlib/dhp/sx/graph/extract_entities_params.json")));
        argumentApplicationParser.parseArgument(strArr);
        SparkSession orCreate = SparkSession$.MODULE$.builder().config(sparkConf).appName(getClass().getSimpleName()).master(argumentApplicationParser.get("master")).getOrCreate();
        C$colon$colon c$colon$colon = new C$colon$colon(new Tuple2(ModelConstants.PUBLICATION_RESULTTYPE_CLASSID, Publication.class), new C$colon$colon(new Tuple2(ModelConstants.DATASET_RESULTTYPE_CLASSID, Dataset.class), new C$colon$colon(new Tuple2(ModelConstants.SOFTWARE_RESULTTYPE_CLASSID, Software.class), new C$colon$colon(new Tuple2("otherResearchProduct", OtherResearchProduct.class), Nil$.MODULE$))));
        Encoder kryo = Encoders$.MODULE$.kryo(Oaf.class);
        Encoder kryo2 = Encoders$.MODULE$.kryo(Publication.class);
        Encoder kryo3 = Encoders$.MODULE$.kryo(Dataset.class);
        Encoder kryo4 = Encoders$.MODULE$.kryo(Software.class);
        Encoder kryo5 = Encoders$.MODULE$.kryo(OtherResearchProduct.class);
        Encoder kryo6 = Encoders$.MODULE$.kryo(Relation.class);
        String str = argumentApplicationParser.get("sourcePath");
        logger.info(new StringBuilder(15).append("sourcePath  -> ").append(str).toString());
        String str2 = argumentApplicationParser.get("targetPath");
        logger.info(new StringBuilder(15).append("targetPath  -> ").append(str2).toString());
        org.apache.spark.sql.Dataset as = orCreate.read().load(new StringBuilder(2).append(str).append("/*").toString()).as(kryo);
        logger.info("Extract Publication");
        as.filter(oaf -> {
            return BoxesRunTime.boxToBoolean($anonfun$main$1(oaf));
        }).map(oaf2 -> {
            return (Publication) oaf2;
        }, kryo2).write().mode(SaveMode.Overwrite).save(new StringBuilder(22).append(str2).append("/extracted/publication").toString());
        logger.info("Extract dataset");
        as.filter(oaf3 -> {
            return BoxesRunTime.boxToBoolean($anonfun$main$3(oaf3));
        }).map(oaf4 -> {
            return (Dataset) oaf4;
        }, kryo3).write().mode(SaveMode.Overwrite).save(new StringBuilder(18).append(str2).append("/extracted/dataset").toString());
        logger.info("Extract software");
        as.filter(oaf5 -> {
            return BoxesRunTime.boxToBoolean($anonfun$main$5(oaf5));
        }).map(oaf6 -> {
            return (Software) oaf6;
        }, kryo4).write().mode(SaveMode.Overwrite).save(new StringBuilder(19).append(str2).append("/extracted/software").toString());
        logger.info("Extract otherResearchProduct");
        as.filter(oaf7 -> {
            return BoxesRunTime.boxToBoolean($anonfun$main$7(oaf7));
        }).map(oaf8 -> {
            return (OtherResearchProduct) oaf8;
        }, kryo5).write().mode(SaveMode.Overwrite).save(new StringBuilder(31).append(str2).append("/extracted/otherResearchProduct").toString());
        logger.info("Extract Relation");
        as.filter(oaf9 -> {
            return BoxesRunTime.boxToBoolean($anonfun$main$9(oaf9));
        }).map(oaf10 -> {
            return (Relation) oaf10;
        }, kryo6).write().mode(SaveMode.Overwrite).save(new StringBuilder(19).append(str2).append("/extracted/relation").toString());
        c$colon$colon.foreach(tuple2 -> {
            $anonfun$main$11(logger, str2, orCreate, tuple2);
            return BoxedUnit.UNIT;
        });
    }

    public <T extends Oaf> void extractEntities(org.apache.spark.sql.Dataset<Oaf> dataset, String str, Class<T> cls, Logger logger) {
        Encoder kryo = Encoders$.MODULE$.kryo(cls);
        logger.info(new StringBuilder(8).append("Extract ").append(cls.getSimpleName()).toString());
        dataset.filter(oaf -> {
            return BoxesRunTime.boxToBoolean($anonfun$extractEntities$1(oaf));
        }).map(oaf2 -> {
            return oaf2;
        }, kryo).write().mode(SaveMode.Overwrite).save(str);
    }

    public <T extends Result> void makeDatasetUnique(String str, String str2, SparkSession sparkSession, Class<T> cls) {
        Encoder kryo = Encoders$.MODULE$.kryo(cls);
        sparkSession.read().load(str).as(kryo).groupByKey(result -> {
            return result.getId();
        }, sparkSession.implicits().newStringEncoder()).mapGroups((str3, iterator) -> {
            return (Result) MergeUtils.mergeGroup((Iterator) JavaConverters$.MODULE$.asJavaIteratorConverter(iterator).asJava());
        }, kryo).write().mode(SaveMode.Overwrite).save(str2);
    }

    public static final /* synthetic */ boolean $anonfun$main$1(Oaf oaf) {
        return oaf instanceof Publication;
    }

    public static final /* synthetic */ boolean $anonfun$main$3(Oaf oaf) {
        return oaf instanceof Dataset;
    }

    public static final /* synthetic */ boolean $anonfun$main$5(Oaf oaf) {
        return oaf instanceof Software;
    }

    public static final /* synthetic */ boolean $anonfun$main$7(Oaf oaf) {
        return oaf instanceof OtherResearchProduct;
    }

    public static final /* synthetic */ boolean $anonfun$main$9(Oaf oaf) {
        return oaf instanceof Relation;
    }

    public static final /* synthetic */ void $anonfun$main$11(Logger logger, String str, SparkSession sparkSession, Tuple2 tuple2) {
        logger.info(new StringBuilder(12).append("Make ").append(tuple2.mo9986_1()).append(" unique").toString());
        MODULE$.makeDatasetUnique(new StringBuilder(11).append(str).append("/extracted/").append(tuple2.mo9986_1()).toString(), new StringBuilder(12).append(str).append("/preprocess/").append(tuple2.mo9986_1()).toString(), sparkSession, (Class) tuple2.mo9985_2());
    }

    public static final /* synthetic */ boolean $anonfun$extractEntities$1(Oaf oaf) {
        return oaf instanceof Oaf;
    }

    private SparkCreateInputGraph$() {
        MODULE$ = this;
    }
}
