package eu.dnetlib.dhp.bulktag;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.gson.Gson;
import eu.dnetlib.dhp.PropagationConstant;
import eu.dnetlib.dhp.api.Utils;
import eu.dnetlib.dhp.api.model.CommunityEntityMap;
import eu.dnetlib.dhp.api.model.EntityCommunities;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.bulktag.community.Community;
import eu.dnetlib.dhp.bulktag.community.CommunityConfiguration;
import eu.dnetlib.dhp.bulktag.community.CommunityConfigurationFactory;
import eu.dnetlib.dhp.bulktag.community.Pair;
import eu.dnetlib.dhp.bulktag.community.ProtoMap;
import eu.dnetlib.dhp.bulktag.community.ResultTagger;
import eu.dnetlib.dhp.bulktag.community.SelectionConstraints;
import eu.dnetlib.dhp.bulktag.community.TaggingConstants;
import eu.dnetlib.dhp.common.SparkSessionSupport;
import eu.dnetlib.dhp.common.action.ReadDatasourceMasterDuplicateFromDB;
import eu.dnetlib.dhp.common.action.model.MasterDuplicate;
import eu.dnetlib.dhp.schema.common.ModelConstants;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Context;
import eu.dnetlib.dhp.schema.oaf.Datasource;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.Organization;
import eu.dnetlib.dhp.schema.oaf.Project;
import eu.dnetlib.dhp.schema.oaf.Relation;
import eu.dnetlib.dhp.schema.oaf.Result;
import eu.dnetlib.dhp.schema.oaf.utils.OafMapperUtils;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.lang.invoke.SerializedLambda;
import java.nio.charset.StandardCharsets;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.eclipse.persistence.internal.oxm.schema.model.Occurs;
import org.eclipse.persistence.sdo.SDOConstants;
import org.postgresql.jdbc.EscapedFunctions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:eu/dnetlib/dhp/bulktag/SparkBulkTagJob.class */
public class SparkBulkTagJob {
    private static String OPENAIRE_3 = "openaire3.0";
    private static String OPENAIRE_4 = "openaire-pub_4.0";
    private static String OPENAIRE_CRIS = "openaire-cris_1.1";
    private static String OPENAIRE_DATA = "openaire2.0_data";
    private static String EOSC = "10|openaire____::2e06c1122c7df43765fdcf91080824fa";
    private static final Logger log = LoggerFactory.getLogger(SparkBulkTagJob.class);
    public static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

    public static void main(String[] strArr) throws Exception {
        CommunityConfiguration communityConfiguration;
        String iOUtils = IOUtils.toString(SparkBulkTagJob.class.getResourceAsStream("/eu/dnetlib/dhp/wf/subworkflows/bulktag/input_bulkTag_parameters.json"));
        log.info(strArr.toString());
        ArgumentApplicationParser argumentApplicationParser = new ArgumentApplicationParser(iOUtils);
        argumentApplicationParser.parseArgument(strArr);
        Boolean bool = (Boolean) Optional.ofNullable(argumentApplicationParser.get("isSparkSessionManaged")).map(Boolean::valueOf).orElse(Boolean.TRUE);
        log.info("isSparkSessionManaged: {}", bool);
        String str = argumentApplicationParser.get("sourcePath");
        log.info("inputPath: {}", str);
        String str2 = argumentApplicationParser.get("outputPath");
        log.info("outputPath: {}", str2);
        String str3 = argumentApplicationParser.get("baseURL");
        log.info("baseURL: {}", str3);
        log.info("pathMap: {}", argumentApplicationParser.get("pathMap"));
        String str4 = argumentApplicationParser.get("pathMap");
        String str5 = argumentApplicationParser.get("nameNode");
        log.info("nameNode: {}", str5);
        Configuration configuration = new Configuration();
        configuration.set("fs.defaultFS", str5);
        String iOUtils2 = IOUtils.toString((InputStream) FileSystem.get(configuration).open(new Path(str4)), StandardCharsets.UTF_8);
        log.info("protoMap: {}", iOUtils2);
        ProtoMap protoMap = (ProtoMap) new Gson().fromJson(iOUtils2, ProtoMap.class);
        log.info("pathMap: {}", new Gson().toJson(protoMap));
        String str6 = argumentApplicationParser.get("dbUrl");
        log.info("dbUrl: {}", str6);
        String str7 = argumentApplicationParser.get("dbUser");
        log.info("dbUser: {}", str7);
        String str8 = argumentApplicationParser.get("dbPassword");
        log.info("dbPassword: {}", str8);
        String str9 = str2 + "masterDuplicate";
        log.info("hdfsPath: {}", str9);
        String str10 = argumentApplicationParser.get("configurationPath");
        SparkConf sparkConf = new SparkConf();
        String str11 = (String) Optional.ofNullable(argumentApplicationParser.get("taggingConf")).map((v0) -> {
            return String.valueOf(v0);
        }).orElse(null);
        if (str11 != null) {
            communityConfiguration = CommunityConfigurationFactory.newInstance(str11);
        } else {
            communityConfiguration = Utils.getCommunityConfiguration(str3);
            writeCommunityConfiguration(str10, str5, communityConfiguration);
        }
        CommunityConfiguration communityConfiguration2 = communityConfiguration;
        SparkSessionSupport.runWithSparkSession(sparkConf, bool, sparkSession -> {
            extendCommunityConfigurationForEOSC(sparkSession, str, communityConfiguration2);
            ReadDatasourceMasterDuplicateFromDB.execute(str6, str7, str8, str9, str5);
            execBulkTag(sparkSession, str, str2, protoMap, communityConfiguration2);
            execEntityTag(sparkSession, str + "organization", str2 + "organization", mapWithRepresentativeOrganization(sparkSession, str + "relation", Utils.getOrganizationCommunityMap(str3)), Organization.class, TaggingConstants.CLASS_ID_ORGANIZATION, TaggingConstants.CLASS_NAME_BULKTAG_ORGANIZATION);
            execEntityTag(sparkSession, str + "project", str2 + "project", Utils.getProjectCommunityMap(str3), Project.class, TaggingConstants.CLASS_ID_PROJECT, TaggingConstants.CLASS_NAME_BULKTAG_PROJECT);
            execEntityTag(sparkSession, str + "datasource", str2 + "datasource", mapWithMasterDatasource(sparkSession, str9, Utils.getDatasourceCommunities(str3)), Datasource.class, TaggingConstants.CLASS_ID_DATASOURCE, TaggingConstants.CLASS_NAME_BULKTAG_DATASOURCE);
        });
    }

    private static void writeCommunityConfiguration(String str, String str2, CommunityConfiguration communityConfiguration) throws IOException {
        Configuration configuration = new Configuration();
        configuration.set("fs.defaultFS", str2);
        BufferedWriter bufferedWriter = new BufferedWriter(new OutputStreamWriter((OutputStream) FileSystem.get(configuration).create(new Path(str + LocalDate.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd")))), StandardCharsets.UTF_8));
        try {
            Iterator<Community> it = communityConfiguration.getCommunities().values().iterator();
            while (it.hasNext()) {
                bufferedWriter.write(new ObjectMapper().writeValueAsString(it.next()));
                bufferedWriter.write("\n");
            }
            bufferedWriter.close();
        } catch (Throwable th) {
            try {
                bufferedWriter.close();
            } catch (Throwable th2) {
                th.addSuppressed(th2);
            }
            throw th;
        }
    }

    private static CommunityEntityMap mapWithMasterDatasource(SparkSession sparkSession, String str, CommunityEntityMap communityEntityMap) {
        Dataset as = sparkSession.read().schema(Encoders.bean(MasterDuplicate.class).schema()).json(str).as(Encoders.bean(MasterDuplicate.class));
        Dataset createDataset = sparkSession.createDataset(entityIdList(communityEntityMap), Encoders.STRING());
        return remapCommunityEntityMap(communityEntityMap, as.join(createDataset, createDataset.col("value").equalTo(as.col("duplicateId")), "left_semi").selectExpr(new String[]{"masterId as source", "duplicateId as target"}).collectAsList());
    }

    private static List<String> entityIdList(CommunityEntityMap communityEntityMap) {
        return new ArrayList(communityEntityMap.keySet());
    }

    private static CommunityEntityMap mapWithRepresentativeOrganization(SparkSession sparkSession, String str, CommunityEntityMap communityEntityMap) {
        Dataset select = sparkSession.read().schema(Encoders.bean(Relation.class).schema()).json(str).filter("datainfo.deletedbyinference != true and relClass = 'merges'").select(SDOConstants.APPINFO_SOURCE_ATTRIBUTE, new String[]{"target"});
        Dataset createDataset = sparkSession.createDataset(entityIdList(communityEntityMap), Encoders.STRING());
        return remapCommunityEntityMap(communityEntityMap, select.join(createDataset, createDataset.col("value").equalTo(select.col("target")), "left_semi").select(SDOConstants.APPINFO_SOURCE_ATTRIBUTE, new String[]{"target"}).collectAsList());
    }

    private static CommunityEntityMap remapCommunityEntityMap(CommunityEntityMap communityEntityMap, List<Row> list) {
        for (Row row : list) {
            String str = (String) row.getAs("target");
            String str2 = (String) row.getAs(SDOConstants.APPINFO_SOURCE_ATTRIBUTE);
            if (communityEntityMap.containsKey(str)) {
                communityEntityMap.put(str2, communityEntityMap.remove(str));
            }
        }
        return communityEntityMap;
    }

    private static <E extends OafEntity> void execEntityTag(SparkSession sparkSession, String str, String str2, CommunityEntityMap communityEntityMap, Class<E> cls, String str3, String str4) {
        Dataset readPath = readPath(sparkSession, str, cls);
        Dataset createDataset = sparkSession.createDataset((List) communityEntityMap.keySet().stream().map(str5 -> {
            return EntityCommunities.newInstance(str5, communityEntityMap.get(str5));
        }).collect(Collectors.toList()), Encoders.bean(EntityCommunities.class));
        readPath.joinWith(createDataset, readPath.col("id").equalTo(createDataset.col("entityId")), EscapedFunctions.LEFT).map(tuple2 -> {
            OafEntity oafEntity = (OafEntity) tuple2.mo9966_1();
            if (tuple2.mo9965_2() != null) {
                List list = (List) Optional.ofNullable(oafEntity.getContext()).map(list2 -> {
                    return (List) list2.stream().map(context -> {
                        return context.getId();
                    }).collect(Collectors.toList());
                }).orElse(new ArrayList());
                if (!Optional.ofNullable(oafEntity.getContext()).isPresent()) {
                    oafEntity.setContext(new ArrayList());
                }
                ((EntityCommunities) tuple2.mo9965_2()).getCommunitiesId().forEach(str6 -> {
                    if (list.contains(str6)) {
                        return;
                    }
                    Context context = new Context();
                    context.setId(str6);
                    context.setDataInfo(Arrays.asList(OafMapperUtils.dataInfo(false, TaggingConstants.BULKTAG_DATA_INFO_TYPE, true, false, OafMapperUtils.qualifier(str3, str4, ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS), Occurs.ONE)));
                    oafEntity.getContext().add(context);
                });
            }
            return oafEntity;
        }, Encoders.bean(cls)).write().mode(SaveMode.Overwrite).option("compression", "gzip").json(str2);
        readPath(sparkSession, str2, cls).write().mode(SaveMode.Overwrite).option("compression", "gzip").json(str);
    }

    private static void extendCommunityConfigurationForEOSC(SparkSession sparkSession, String str, CommunityConfiguration communityConfiguration) {
        Dataset map = readPath(sparkSession, str + "datasource", Datasource.class).filter(datasource -> {
            return isOKDatasource(datasource);
        }).map(datasource2 -> {
            return datasource2.getId();
        }, Encoders.STRING());
        Map<String, List<Pair<String, SelectionConstraints>>> eoscDatasourceMap = communityConfiguration.getEoscDatasourceMap();
        for (String str2 : map.collectAsList()) {
            if (!eoscDatasourceMap.containsKey(str2)) {
                eoscDatasourceMap.put(str2, new ArrayList());
            }
        }
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static boolean isOKDatasource(Datasource datasource) {
        String classid = datasource.getOpenairecompatibility().getClassid();
        return (classid.equalsIgnoreCase(OPENAIRE_3) || classid.equalsIgnoreCase(OPENAIRE_4) || classid.equalsIgnoreCase(OPENAIRE_CRIS) || classid.equalsIgnoreCase(OPENAIRE_DATA)) && datasource.getCollectedfrom().stream().anyMatch(keyValue -> {
            return keyValue.getKey().equals(EOSC);
        });
    }

    private static <R extends Result> void execBulkTag(SparkSession sparkSession, String str, String str2, ProtoMap protoMap, CommunityConfiguration communityConfiguration) {
        ModelSupport.entityTypes.keySet().parallelStream().filter(ModelSupport::isResult).forEach(entityType -> {
            PropagationConstant.removeOutputDir(sparkSession, str2 + entityType.name());
            ResultTagger resultTagger = new ResultTagger();
            Class cls = ModelSupport.entityTypes.get(entityType);
            readPath(sparkSession, str + entityType.name(), cls).map(patchResult(), Encoders.bean(cls)).filter((v0) -> {
                return Objects.nonNull(v0);
            }).map(result -> {
                return resultTagger.enrichContextCriteria(result, communityConfiguration, protoMap);
            }, Encoders.bean(cls)).write().mode(SaveMode.Overwrite).option("compression", "gzip").json(str2 + entityType.name());
            readPath(sparkSession, str2 + entityType.name(), cls).write().mode(SaveMode.Overwrite).option("compression", "gzip").json(str + entityType.name());
        });
    }

    public static <R> Dataset<R> readPath(SparkSession sparkSession, String str, Class<R> cls) {
        return sparkSession.read().textFile(str).map(str2 -> {
            return OBJECT_MAPPER.readValue(str2, cls);
        }, Encoders.bean(cls));
    }

    private static <R extends Result> MapFunction<R, R> patchResult() {
        return result -> {
            if (Objects.isNull(result.getDataInfo())) {
                result.setDataInfo(OafMapperUtils.dataInfo(false, "", false, false, OafMapperUtils.unknown("", ""), ""));
            } else if (result.getDataInfo().getDeletedbyinference() == null) {
                result.getDataInfo().setDeletedbyinference(false);
            }
            if (Objects.isNull(result.getContext())) {
                result.setContext(new ArrayList());
            }
            return result;
        };
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -827641737:
                if (implMethodName.equals("lambda$execEntityTag$54c42dc0$1")) {
                    z = 3;
                    break;
                }
                break;
            case -614738631:
                if (implMethodName.equals("lambda$readPath$f29df2fc$1")) {
                    z = false;
                    break;
                }
                break;
            case 599483728:
                if (implMethodName.equals("lambda$extendCommunityConfigurationForEOSC$8b0cec14$1")) {
                    z = 2;
                    break;
                }
                break;
            case 857149486:
                if (implMethodName.equals("lambda$patchResult$591be984$1")) {
                    z = true;
                    break;
                }
                break;
            case 1306853258:
                if (implMethodName.equals("lambda$extendCommunityConfigurationForEOSC$fe0235a6$1")) {
                    z = 5;
                    break;
                }
                break;
            case 1572813922:
                if (implMethodName.equals("lambda$execBulkTag$94059c45$1")) {
                    z = 4;
                    break;
                }
                break;
            case 2123019764:
                if (implMethodName.equals("nonNull")) {
                    z = 6;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("eu/dnetlib/dhp/bulktag/SparkBulkTagJob") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Class;Ljava/lang/String;)Ljava/lang/Object;")) {
                    Class cls = (Class) serializedLambda.getCapturedArg(0);
                    return str2 -> {
                        return OBJECT_MAPPER.readValue(str2, cls);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("eu/dnetlib/dhp/bulktag/SparkBulkTagJob") && serializedLambda.getImplMethodSignature().equals("(Leu/dnetlib/dhp/schema/oaf/Result;)Leu/dnetlib/dhp/schema/oaf/Result;")) {
                    return result -> {
                        if (Objects.isNull(result.getDataInfo())) {
                            result.setDataInfo(OafMapperUtils.dataInfo(false, "", false, false, OafMapperUtils.unknown("", ""), ""));
                        } else if (result.getDataInfo().getDeletedbyinference() == null) {
                            result.getDataInfo().setDeletedbyinference(false);
                        }
                        if (Objects.isNull(result.getContext())) {
                            result.setContext(new ArrayList());
                        }
                        return result;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("eu/dnetlib/dhp/bulktag/SparkBulkTagJob") && serializedLambda.getImplMethodSignature().equals("(Leu/dnetlib/dhp/schema/oaf/Datasource;)Ljava/lang/String;")) {
                    return datasource2 -> {
                        return datasource2.getId();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("eu/dnetlib/dhp/bulktag/SparkBulkTagJob") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;Ljava/lang/String;Lscala/Tuple2;)Leu/dnetlib/dhp/schema/oaf/OafEntity;")) {
                    String str = (String) serializedLambda.getCapturedArg(0);
                    String str3 = (String) serializedLambda.getCapturedArg(1);
                    return tuple2 -> {
                        OafEntity oafEntity = (OafEntity) tuple2.mo9966_1();
                        if (tuple2.mo9965_2() != null) {
                            List list = (List) Optional.ofNullable(oafEntity.getContext()).map(list2 -> {
                                return (List) list2.stream().map(context -> {
                                    return context.getId();
                                }).collect(Collectors.toList());
                            }).orElse(new ArrayList());
                            if (!Optional.ofNullable(oafEntity.getContext()).isPresent()) {
                                oafEntity.setContext(new ArrayList());
                            }
                            ((EntityCommunities) tuple2.mo9965_2()).getCommunitiesId().forEach(str6 -> {
                                if (list.contains(str6)) {
                                    return;
                                }
                                Context context = new Context();
                                context.setId(str6);
                                context.setDataInfo(Arrays.asList(OafMapperUtils.dataInfo(false, TaggingConstants.BULKTAG_DATA_INFO_TYPE, true, false, OafMapperUtils.qualifier(str, str3, ModelConstants.DNET_PROVENANCE_ACTIONS, ModelConstants.DNET_PROVENANCE_ACTIONS), Occurs.ONE)));
                                oafEntity.getContext().add(context);
                            });
                        }
                        return oafEntity;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("eu/dnetlib/dhp/bulktag/SparkBulkTagJob") && serializedLambda.getImplMethodSignature().equals("(Leu/dnetlib/dhp/bulktag/community/ResultTagger;Leu/dnetlib/dhp/bulktag/community/CommunityConfiguration;Leu/dnetlib/dhp/bulktag/community/ProtoMap;Leu/dnetlib/dhp/schema/oaf/Result;)Leu/dnetlib/dhp/schema/oaf/Result;")) {
                    ResultTagger resultTagger = (ResultTagger) serializedLambda.getCapturedArg(0);
                    CommunityConfiguration communityConfiguration = (CommunityConfiguration) serializedLambda.getCapturedArg(1);
                    ProtoMap protoMap = (ProtoMap) serializedLambda.getCapturedArg(2);
                    return result2 -> {
                        return resultTagger.enrichContextCriteria(result2, communityConfiguration, protoMap);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/FilterFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Z") && serializedLambda.getImplClass().equals("eu/dnetlib/dhp/bulktag/SparkBulkTagJob") && serializedLambda.getImplMethodSignature().equals("(Leu/dnetlib/dhp/schema/oaf/Datasource;)Z")) {
                    return datasource -> {
                        return isOKDatasource(datasource);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/FilterFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Z") && serializedLambda.getImplClass().equals("java/util/Objects") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Object;)Z")) {
                    return (v0) -> {
                        return Objects.nonNull(v0);
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
