/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.broker.oa;

import com.google.gson.Gson;
import eu.dnetlib.broker.objects.OaBrokerEventPayload;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.broker.model.Event;
import eu.dnetlib.dhp.broker.model.ShortEventMessageWithGroupId;
import eu.dnetlib.dhp.broker.oa.util.ClusterUtils;
import eu.dnetlib.dhp.common.SparkSessionSupport;
import java.io.IOException;
import java.io.InputStream;
import java.io.Serializable;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PartitionEventsByDsIdJob {
    private static final Logger log = LoggerFactory.getLogger(PartitionEventsByDsIdJob.class);
    private static final String OPENDOAR_NSPREFIX = "opendoar____::";

    public static void main(String[] args) throws Exception {
        ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString((InputStream)PartitionEventsByDsIdJob.class.getResourceAsStream("/eu/dnetlib/dhp/broker/oa/od_partitions_params.json")));
        parser.parseArgument(args);
        Boolean isSparkSessionManaged = Optional.ofNullable(parser.get("isSparkSessionManaged")).map(Boolean::valueOf).orElse(Boolean.TRUE);
        log.info("isSparkSessionManaged: {}", (Object)isSparkSessionManaged);
        SparkConf conf = new SparkConf();
        String eventsPath = parser.get("outputDir") + "/events";
        log.info("eventsPath: {}", (Object)eventsPath);
        String partitionPath = parser.get("outputDir") + "/eventsByOpendoarId";
        log.info("partitionPath: {}", (Object)partitionPath);
        String opendoarIds = parser.get("opendoarIds");
        log.info("opendoarIds: {}", (Object)opendoarIds);
        HashSet validOpendoarIds = new HashSet();
        if (!opendoarIds.trim().equals("-")) {
            validOpendoarIds.addAll(Arrays.stream(opendoarIds.split(",")).map(String::trim).filter(StringUtils::isNotBlank).map(s -> OPENDOAR_NSPREFIX + DigestUtils.md5Hex((String)s)).collect(Collectors.toSet()));
        }
        log.info("validOpendoarIds: {}", validOpendoarIds);
        SparkSessionSupport.runWithSparkSession((SparkConf)conf, (Boolean)isSparkSessionManaged, spark -> ClusterUtils.readPath(spark, eventsPath, Event.class).filter((FilterFunction & Serializable)e -> StringUtils.isNotBlank((CharSequence)e.getMap().getTargetDatasourceId())).filter((FilterFunction & Serializable)e -> e.getMap().getTargetDatasourceId().startsWith(OPENDOAR_NSPREFIX)).filter((FilterFunction & Serializable)e -> validOpendoarIds.contains(e.getMap().getTargetDatasourceId())).map((MapFunction & Serializable)e -> PartitionEventsByDsIdJob.messageFromNotification(e), Encoders.bean(ShortEventMessageWithGroupId.class)).coalesce(1).write().partitionBy(new String[]{"group"}).mode(SaveMode.Overwrite).option("compression", "gzip").json(partitionPath));
        PartitionEventsByDsIdJob.renameSubDirs(partitionPath);
    }

    private static void renameSubDirs(String path) throws IOException {
        FileSystem fs = FileSystem.get((Configuration)new Configuration());
        log.info("** Renaming subdirs of {}", (Object)path);
        for (FileStatus fileStatus : fs.listStatus(new Path(path))) {
            Path oldPath;
            String oldName;
            if (!fileStatus.isDirectory() || !(oldName = (oldPath = fileStatus.getPath()).getName()).contains("=")) continue;
            Path newPath = new Path(path + "/" + StringUtils.substringAfter((String)oldName, (String)"="));
            log.info(" * {} -> {}", (Object)oldPath.getName(), (Object)newPath.getName());
            fs.rename(oldPath, newPath);
        }
    }

    private static ShortEventMessageWithGroupId messageFromNotification(Event e) {
        Gson gson = new Gson();
        OaBrokerEventPayload payload = (OaBrokerEventPayload)gson.fromJson(e.getPayload(), OaBrokerEventPayload.class);
        ShortEventMessageWithGroupId res = new ShortEventMessageWithGroupId();
        res.setEventId(e.getEventId());
        res.setOriginalId(payload.getResult().getOriginalId());
        res.setTitle(payload.getResult().getTitles().stream().filter(StringUtils::isNotBlank).findFirst().orElse(null));
        res.setTopic(e.getTopic());
        res.setTrust(payload.getTrust());
        res.generateMessageFromObject(payload.getHighlight());
        res.setGroup(StringUtils.substringAfter((String)e.getMap().getTargetDatasourceId(), (String)OPENDOAR_NSPREFIX));
        return res;
    }
}

