/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.collection.crossref;

import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.dhp.application.AbstractScalaApplication;
import eu.dnetlib.dhp.collection.crossref.Crossref2Oaf$;
import eu.dnetlib.dhp.collection.crossref.Crossref2Oaf$TransformationType$;
import eu.dnetlib.dhp.collection.crossref.SparkMapDumpIntoOAF$;
import eu.dnetlib.dhp.collection.crossref.UnpayWall;
import eu.dnetlib.dhp.common.vocabulary.VocabularyGroup;
import eu.dnetlib.dhp.schema.mdstore.MDStoreVersion;
import eu.dnetlib.dhp.schema.oaf.Oaf;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.StructuredProperty;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import java.io.Serializable;
import org.apache.spark.sql.Column;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders$;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.functions$;
import org.apache.spark.sql.types.BooleanType$;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.StringType$;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructField$;
import org.apache.spark.sql.types.StructType;
import org.slf4j.Logger;
import scala.Function1;
import scala.Predef$;
import scala.Tuple2;
import scala.collection.immutable.Seq;
import scala.reflect.ScalaSignature;
import scala.reflect.api.JavaUniverse;
import scala.reflect.api.Mirror;
import scala.reflect.api.TypeCreator;
import scala.reflect.api.TypeTags;
import scala.reflect.api.Types;
import scala.reflect.api.Universe;
import scala.reflect.runtime.package$;
import scala.runtime.ScalaRunTime$;

@ScalaSignature(bytes="\u0006\u0005q4A\u0001D\u0007\u00011!Iq\u0004\u0001B\u0001B\u0003%\u0001%\f\u0005\n]\u0001\u0011\t\u0011)A\u0005_MB\u0001\u0002\u000e\u0001\u0003\u0002\u0003\u0006I!\u000e\u0005\u0006{\u0001!\tA\u0010\u0005\u0006\t\u0002!\t%\u0012\u0005\u0006\u0013\u0002!\tA\u0013\u0005\u0006A\u0002!\t!Y\u0004\u0006e6A\ta\u001d\u0004\u0006\u00195A\t\u0001\u001e\u0005\u0006{%!\t\u0001\u001f\u0005\u0006s&!\tA\u001f\u0002\u0014'B\f'o['ba\u0012+X\u000e]%oi>|\u0015I\u0012\u0006\u0003\u001d=\t\u0001b\u0019:pgN\u0014XM\u001a\u0006\u0003!E\t!bY8mY\u0016\u001cG/[8o\u0015\t\u00112#A\u0002eQBT!\u0001F\u000b\u0002\u000f\u0011tW\r\u001e7jE*\ta#\u0001\u0002fk\u000e\u00011C\u0001\u0001\u001a!\tQR$D\u0001\u001c\u0015\ta\u0012#A\u0006baBd\u0017nY1uS>t\u0017B\u0001\u0010\u001c\u0005a\t%m\u001d;sC\u000e$8kY1mC\u0006\u0003\b\u000f\\5dCRLwN\\\u0001\raJ|\u0007/\u001a:usB\u000bG\u000f\u001b\t\u0003C)r!A\t\u0015\u0011\u0005\r2S\"\u0001\u0013\u000b\u0005\u0015:\u0012A\u0002\u001fs_>$hHC\u0001(\u0003\u0015\u00198-\u00197b\u0013\tIc%\u0001\u0004Qe\u0016$WMZ\u0005\u0003W1\u0012aa\u0015;sS:<'BA\u0015'\u0013\tyR$\u0001\u0003be\u001e\u001c\bc\u0001\u00192A5\ta%\u0003\u00023M\t)\u0011I\u001d:bs&\u0011a&H\u0001\u0004Y><\u0007C\u0001\u001c<\u001b\u00059$B\u0001\u001d:\u0003\u0015\u0019HN\u001a\u001bk\u0015\u0005Q\u0014aA8sO&\u0011Ah\u000e\u0002\u0007\u0019><w-\u001a:\u0002\rqJg.\u001b;?)\u0011y\u0014IQ\"\u0011\u0005\u0001\u0003Q\"A\u0007\t\u000b}!\u0001\u0019\u0001\u0011\t\u000b9\"\u0001\u0019A\u0018\t\u000bQ\"\u0001\u0019A\u001b\u0002\u0007I,h\u000eF\u0001G!\t\u0001t)\u0003\u0002IM\t!QK\\5u\u0003I!(/\u00198tM>\u0014X.\u00168qCf<\u0016\r\u001c7\u0015\t-CFL\u0018\t\u0004\u0019N+V\"A'\u000b\u00059{\u0015aA:rY*\u0011\u0001+U\u0001\u0006gB\f'o\u001b\u0006\u0003%f\na!\u00199bG\",\u0017B\u0001+N\u0005\u001d!\u0015\r^1tKR\u0004\"\u0001\u0011,\n\u0005]k!!C+oa\u0006Lx+\u00197m\u0011\u0015\u0001f\u00011\u0001Z!\ta%,\u0003\u0002\\\u001b\na1\u000b]1sWN+7o]5p]\")QL\u0002a\u0001A\u0005iQO\u001c9bs^\fG\u000e\u001c)bi\"DQa\u0018\u0004A\u0002\u0001\nAb\u0019:pgN\u0014XM\u001a)bi\"\f\u0011\u0003\u001e:b]N4wN]7De>\u001c8O]3g)\u00191%mY3hQ\")\u0001k\u0002a\u00013\")Am\u0002a\u0001A\u0005Q1o\\;sG\u0016\u0004\u0016\r\u001e5\t\u000b\u0019<\u0001\u0019\u0001\u0011\u0002\u0015Q\f'oZ3u!\u0006$\b\u000eC\u0003^\u000f\u0001\u0007\u0001\u0005C\u0003j\u000f\u0001\u0007!.\u0001\u0007w_\u000e\f'-\u001e7be&,7\u000f\u0005\u0002la6\tAN\u0003\u0002n]\u0006Qao\\2bEVd\u0017M]=\u000b\u0005=\f\u0012AB2p[6|g.\u0003\u0002rY\nyak\\2bEVd\u0017M]=He>,\b/A\nTa\u0006\u00148.T1q\tVl\u0007/\u00138u_>\u000be\t\u0005\u0002A\u0013M\u0011\u0011\"\u001e\t\u0003aYL!a\u001e\u0014\u0003\r\u0005s\u0017PU3g)\u0005\u0019\u0018\u0001B7bS:$\"AR>\t\u000b9Z\u0001\u0019A\u0018")
public class SparkMapDumpIntoOAF
extends AbstractScalaApplication {
    private final Logger log;

    public static void main(String[] args) {
        SparkMapDumpIntoOAF$.MODULE$.main(args);
    }

    public void run() {
        String sourcePath = this.parser().get("sourcePath");
        this.log.info("sourcePath: {}", (Object)sourcePath);
        String unpaywallPath = this.parser().get("unpaywallPath");
        this.log.info("unpaywallPath: {}", (Object)unpaywallPath);
        String isLookupUrl = this.parser().get("isLookupUrl");
        this.log.info("isLookupUrl: {}", (Object)isLookupUrl);
        ISLookUpService isLookupService = ISLookupClientFactory.getLookUpService((String)isLookupUrl);
        VocabularyGroup vocabularies = VocabularyGroup.loadVocsFromIS((ISLookUpService)isLookupService);
        Predef$.MODULE$.require(vocabularies != null);
        String mdstoreOutputVersion = this.parser().get("mdstoreOutputVersion");
        this.log.info("mdstoreOutputVersion is '" + mdstoreOutputVersion + "'");
        ObjectMapper mapper = new ObjectMapper();
        MDStoreVersion cleanedMdStoreVersion = (MDStoreVersion)mapper.readValue(mdstoreOutputVersion, MDStoreVersion.class);
        String outputBasePath = cleanedMdStoreVersion.getHdfsPath();
        this.log.info("outputBasePath is '" + outputBasePath + "'");
        String targetPath = outputBasePath + "/store";
        this.log.info("targetPath is '" + targetPath + "'");
        this.transformCrossref(this.spark(), sourcePath, targetPath, unpaywallPath, vocabularies);
        this.reportTotalSize(targetPath, outputBasePath);
    }

    public Dataset<UnpayWall> transformUnpayWall(SparkSession spark, String unpaywallPath, String crossrefPath) {
        StructType schema = new StructType().add(new StructField("doi", (DataType)StringType$.MODULE$, StructField$.MODULE$.apply$default$3(), StructField$.MODULE$.apply$default$4())).add(new StructField("is_oa", (DataType)BooleanType$.MODULE$, StructField$.MODULE$.apply$default$3(), StructField$.MODULE$.apply$default$4())).add(new StructField("best_oa_location", (DataType)new StructType().add("host_type", (DataType)StringType$.MODULE$).add("license", (DataType)StringType$.MODULE$).add("url", (DataType)StringType$.MODULE$), StructField$.MODULE$.apply$default$3(), StructField$.MODULE$.apply$default$4())).add("oa_status", (DataType)StringType$.MODULE$);
        Dataset cId = spark.read().schema(new StructType().add("DOI", (DataType)StringType$.MODULE$)).json(crossrefPath).withColumn("doi", functions$.MODULE$.lower(functions$.MODULE$.col("DOI")));
        Dataset uw = spark.read().schema(schema).json(unpaywallPath).withColumn("doi", functions$.MODULE$.lower(functions$.MODULE$.col("doi"))).where("is_oa = true and best_oa_location.url is not null");
        JavaUniverse $u = package$.MODULE$.universe();
        JavaUniverse.JavaMirror $m = package$.MODULE$.universe().runtimeMirror(SparkMapDumpIntoOAF.class.getClassLoader());
        public final class Eu_dnetlib_dhp_collection_crossref_SparkMapDumpIntoOAF$$typecreator5$1
        extends TypeCreator {
            public <U extends Universe> Types.TypeApi apply(Mirror<U> $m$untyped) {
                Universe $u = $m$untyped.universe();
                Mirror<U> $m = $m$untyped;
                return $m.staticClass("eu.dnetlib.dhp.collection.crossref.UnpayWall").asType().toTypeConstructor();
            }

            public Eu_dnetlib_dhp_collection_crossref_SparkMapDumpIntoOAF$$typecreator5$1(SparkMapDumpIntoOAF $outer) {
            }
        }
        return uw.join(cId, uw.apply("doi").$eq$eq$eq((Object)cId.apply("doi")), "leftsemi").as(spark.implicits().newProductEncoder(((TypeTags)$u).TypeTag().apply((Mirror)$m, (TypeCreator)new Eu_dnetlib_dhp_collection_crossref_SparkMapDumpIntoOAF$$typecreator5$1(null)))).cache();
    }

    public void transformCrossref(SparkSession spark, String sourcePath, String targetPath, String unpaywallPath, VocabularyGroup vocabularies) {
        ObjectMapper mapper = new ObjectMapper();
        Encoder oafEncoder = Encoders$.MODULE$.kryo(Oaf.class);
        Encoder resultEncoder = Encoders$.MODULE$.kryo(Result.class);
        Dataset dump = spark.read().text(sourcePath).as(spark.implicits().newStringEncoder());
        dump.flatMap((Function1 & Serializable)s -> Crossref2Oaf$.MODULE$.convert((String)s, vocabularies, Crossref2Oaf$TransformationType$.MODULE$.OnlyRelation()), oafEncoder).as(oafEncoder).map((Function1 & Serializable)r -> mapper.writeValueAsString(r), spark.implicits().newStringEncoder()).write().mode(SaveMode.Overwrite).option("compression", "gzip").text(targetPath);
        Dataset<UnpayWall> uw = this.transformUnpayWall(spark, unpaywallPath, sourcePath);
        Dataset resultCrossref = dump.flatMap((Function1 & Serializable)s -> Crossref2Oaf$.MODULE$.convert((String)s, vocabularies, Crossref2Oaf$TransformationType$.MODULE$.OnlyResult()), oafEncoder).as(oafEncoder).map((Function1 & Serializable)r -> (Result)r, resultEncoder).map((Function1 & Serializable)r -> new Tuple2((Object)((StructuredProperty)r.getPid().get(0)).getValue(), r), Encoders$.MODULE$.tuple(Encoders$.MODULE$.STRING(), resultEncoder));
        resultCrossref.joinWith(uw, resultCrossref.apply("_1").equalTo((Object)uw.apply("doi")), "left").map((Function1 & Serializable)k -> Crossref2Oaf$.MODULE$.mergeUnpayWall((Result)((Tuple2)k._1())._2(), (UnpayWall)k._2()), resultEncoder).map((Function1 & Serializable)r -> mapper.writeValueAsString(r), spark.implicits().newStringEncoder()).as(resultEncoder).write().mode(SaveMode.Append).option("compression", "gzip").text(String.valueOf(targetPath));
        spark.read().json(sourcePath).select((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Column[]{functions$.MODULE$.col("DOI"), functions$.MODULE$.explode(functions$.MODULE$.col("author.affiliation")).alias("affiliations")})).select((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Column[]{functions$.MODULE$.col("DOI"), functions$.MODULE$.explode(functions$.MODULE$.col("affiliations.id")).alias("aids")})).where("aids is not null").select((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Column[]{functions$.MODULE$.col("DOI"), functions$.MODULE$.explode(functions$.MODULE$.col("aids")).alias("aff")})).select((Seq)ScalaRunTime$.MODULE$.wrapRefArray((Object[])new Column[]{functions$.MODULE$.col("DOI"), functions$.MODULE$.col("aff.id").alias("id"), functions$.MODULE$.col("aff.id-type").alias("idType")})).where(functions$.MODULE$.col("idType").like("ROR")).flatMap((Function1 & Serializable)r -> Crossref2Oaf$.MODULE$.generateAffliation((Row)r), spark.implicits().newStringEncoder()).write().mode(SaveMode.Append).option("compression", "gzip").text(String.valueOf(targetPath));
    }

    public SparkMapDumpIntoOAF(String propertyPath, String[] args, Logger log) {
        this.log = log;
        super(propertyPath, args, log);
    }
}

