package eu.dnetlib.dhp.oa.provision;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.Lists;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.HdfsSupport;
import eu.dnetlib.dhp.common.SparkSessionSupport;
import eu.dnetlib.dhp.oa.provision.model.JoinedEntity;
import eu.dnetlib.dhp.oa.provision.model.ProvisionModelSupport;
import eu.dnetlib.dhp.oa.provision.model.RelatedEntityWrapper;
import eu.dnetlib.dhp.schema.common.ModelSupport;
import eu.dnetlib.dhp.schema.oaf.Author;
import eu.dnetlib.dhp.schema.oaf.OafEntity;
import eu.dnetlib.dhp.schema.oaf.Result;
import java.lang.invoke.SerializedLambda;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FilterFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.expressions.Aggregator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
import scala.collection.Iterator;
import scala.collection.JavaConverters;
import scala.collection.Seq;

/* loaded from: input_file:eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2.class */
public class CreateRelatedEntitiesJob_phase2 {
    private static final Logger log = LoggerFactory.getLogger(CreateRelatedEntitiesJob_phase2.class);
    private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

    /* loaded from: input_file:eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2$AdjacencyListAggregator.class */
    public static class AdjacencyListAggregator extends Aggregator<JoinedEntity, JoinedEntity, JoinedEntity> {
        /* renamed from: zero, reason: merged with bridge method [inline-methods] */
        public JoinedEntity m3zero() {
            return new JoinedEntity();
        }

        public JoinedEntity reduce(JoinedEntity joinedEntity, JoinedEntity joinedEntity2) {
            return mergeAndGet(joinedEntity, joinedEntity2);
        }

        private JoinedEntity mergeAndGet(JoinedEntity joinedEntity, JoinedEntity joinedEntity2) {
            joinedEntity.setEntity((OafEntity) Optional.ofNullable(joinedEntity2.getEntity()).orElse(Optional.ofNullable(joinedEntity.getEntity()).orElse(null)));
            joinedEntity.getLinks().addAll(joinedEntity2.getLinks());
            return joinedEntity;
        }

        public JoinedEntity merge(JoinedEntity joinedEntity, JoinedEntity joinedEntity2) {
            return mergeAndGet(joinedEntity, joinedEntity2);
        }

        public JoinedEntity finish(JoinedEntity joinedEntity) {
            return joinedEntity;
        }

        public Encoder<JoinedEntity> bufferEncoder() {
            return Encoders.kryo(JoinedEntity.class);
        }

        public Encoder<JoinedEntity> outputEncoder() {
            return Encoders.kryo(JoinedEntity.class);
        }
    }

    public static void main(String[] strArr) throws Exception {
        ArgumentApplicationParser argumentApplicationParser = new ArgumentApplicationParser(IOUtils.toString(PrepareRelationsJob.class.getResourceAsStream("/eu/dnetlib/dhp/oa/provision/input_params_related_entities_pahase2.json")));
        argumentApplicationParser.parseArgument(strArr);
        Boolean bool = (Boolean) Optional.ofNullable(argumentApplicationParser.get("isSparkSessionManaged")).map(Boolean::valueOf).orElse(Boolean.TRUE);
        log.info("isSparkSessionManaged: {}", bool);
        String str = argumentApplicationParser.get("inputRelatedEntitiesPath");
        log.info("inputRelatedEntitiesPath: {}", str);
        String str2 = argumentApplicationParser.get("inputEntityPath");
        log.info("inputEntityPath: {}", str2);
        String str3 = argumentApplicationParser.get("outputPath");
        log.info("outputPath: {}", str3);
        int parseInt = Integer.parseInt(argumentApplicationParser.get("numPartitions"));
        log.info("numPartitions: {}", Integer.valueOf(parseInt));
        String str4 = argumentApplicationParser.get("graphTableClassName");
        log.info("graphTableClassName: {}", str4);
        Class<?> cls = Class.forName(str4);
        SparkConf sparkConf = new SparkConf();
        sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer");
        sparkConf.registerKryoClasses(ProvisionModelSupport.getModelClasses());
        SparkSessionSupport.runWithSparkSession(sparkConf, bool, sparkSession -> {
            removeOutputDir(sparkSession, str3);
            joinEntityWithRelatedEntities(sparkSession, str, str2, str3, parseInt, cls);
        });
    }

    private static <E extends OafEntity> void joinEntityWithRelatedEntities(SparkSession sparkSession, String str, String str2, String str3, int i, Class<E> cls) {
        Dataset readPathEntity = readPathEntity(sparkSession, str2, cls);
        Dataset<Tuple2<String, RelatedEntityWrapper>> readRelatedEntities = readRelatedEntities(sparkSession, str, cls);
        readPathEntity.joinWith(readRelatedEntities, readPathEntity.col("_1").equalTo(readRelatedEntities.col("_1")), "left").map(tuple2 -> {
            JoinedEntity joinedEntity = new JoinedEntity((OafEntity) ((Tuple2) tuple2._1())._2());
            Optional.ofNullable(tuple2._2()).map((v0) -> {
                return v0._2();
            }).ifPresent(relatedEntityWrapper -> {
                joinedEntity.getLinks().add(relatedEntityWrapper);
            });
            return joinedEntity;
        }, Encoders.kryo(JoinedEntity.class)).groupByKey(joinedEntity -> {
            return joinedEntity.getEntity().getId();
        }, Encoders.STRING()).agg(new AdjacencyListAggregator().toColumn()).map(tuple22 -> {
            return (JoinedEntity) tuple22._2();
        }, Encoders.kryo(JoinedEntity.class)).write().mode(SaveMode.Overwrite).parquet(str3);
    }

    private static <E extends OafEntity> Dataset<Tuple2<String, RelatedEntityWrapper>> readRelatedEntities(SparkSession sparkSession, String str, Class<E> cls) {
        log.info("Reading related entities from: {}", str);
        List listFiles = HdfsSupport.listFiles(str, sparkSession.sparkContext().hadoopConfiguration());
        log.info("Found paths: {}", String.join(",", listFiles));
        String idPrefix = ModelSupport.getIdPrefix(cls);
        return sparkSession.read().load(toSeq(listFiles)).as(Encoders.kryo(RelatedEntityWrapper.class)).filter(relatedEntityWrapper -> {
            return relatedEntityWrapper.getRelation().getSource().startsWith(idPrefix);
        }).map(relatedEntityWrapper2 -> {
            return new Tuple2(relatedEntityWrapper2.getRelation().getSource(), relatedEntityWrapper2);
        }, Encoders.tuple(Encoders.STRING(), Encoders.kryo(RelatedEntityWrapper.class)));
    }

    private static <E extends OafEntity> Dataset<Tuple2<String, E>> readPathEntity(SparkSession sparkSession, String str, Class<E> cls) {
        log.info("Reading Graph table from: {}", str);
        return sparkSession.read().textFile(str).map(str2 -> {
            return (OafEntity) OBJECT_MAPPER.readValue(str2, cls);
        }, Encoders.bean(cls)).filter("dataInfo.invisible == false").map(oafEntity -> {
            return pruneOutliers(cls, oafEntity);
        }, Encoders.bean(cls)).map(oafEntity2 -> {
            return new Tuple2(oafEntity2.getId(), oafEntity2);
        }, Encoders.tuple(Encoders.STRING(), Encoders.kryo(cls)));
    }

    /* JADX INFO: Access modifiers changed from: private */
    public static <E extends OafEntity> E pruneOutliers(Class<E> cls, E e) {
        if (ModelSupport.isSubClass(cls, Result.class).booleanValue()) {
            Result result = (Result) e;
            if (result.getExternalReference() != null) {
                result.setExternalReference((List) result.getExternalReference().stream().limit(50L).collect(Collectors.toList()));
            }
            if (result.getAuthor() != null) {
                ArrayList newArrayList = Lists.newArrayList();
                for (Author author : result.getAuthor()) {
                    author.setFullname(StringUtils.left(author.getFullname(), ProvisionConstants.MAX_AUTHOR_FULLNAME_LENGTH));
                    if (newArrayList.size() < 200 || hasORCID(author)) {
                        newArrayList.add(author);
                    }
                }
                result.setAuthor(newArrayList);
            }
            if (result.getDescription() != null) {
                result.setDescription((List) result.getDescription().stream().filter((v0) -> {
                    return Objects.nonNull(v0);
                }).map(field -> {
                    field.setValue(StringUtils.left((String) field.getValue(), ProvisionConstants.MAX_ABSTRACT_LENGTH));
                    return field;
                }).collect(Collectors.toList()));
            }
            if (result.getTitle() != null) {
                result.setTitle((List) result.getTitle().stream().filter((v0) -> {
                    return Objects.nonNull(v0);
                }).map(structuredProperty -> {
                    structuredProperty.setValue(StringUtils.left(structuredProperty.getValue(), ProvisionConstants.MAX_TITLE_LENGTH));
                    return structuredProperty;
                }).limit(10L).collect(Collectors.toList()));
            }
        }
        return e;
    }

    private static boolean hasORCID(Author author) {
        return author.getPid() != null && author.getPid().stream().filter((v0) -> {
            return Objects.nonNull(v0);
        }).map((v0) -> {
            return v0.getQualifier();
        }).filter((v0) -> {
            return Objects.nonNull(v0);
        }).map((v0) -> {
            return v0.getClassid();
        }).filter((v0) -> {
            return StringUtils.isNotBlank(v0);
        }).anyMatch(str -> {
            return "orcid".equals(str.toLowerCase());
        });
    }

    private static FilterFunction<JoinedEntity> filterEmptyEntityFn() {
        return joinedEntity -> {
            return Objects.nonNull(joinedEntity.getEntity());
        };
    }

    private static void removeOutputDir(SparkSession sparkSession, String str) {
        HdfsSupport.remove(str, sparkSession.sparkContext().hadoopConfiguration());
    }

    private static Seq<String> toSeq(List<String> list) {
        return ((Iterator) JavaConverters.asScalaIteratorConverter(list.iterator()).asScala()).toSeq();
    }

    private static /* synthetic */ Object $deserializeLambda$(SerializedLambda serializedLambda) {
        String implMethodName = serializedLambda.getImplMethodName();
        boolean z = -1;
        switch (implMethodName.hashCode()) {
            case -1801912350:
                if (implMethodName.equals("lambda$readRelatedEntities$8fb4ea8f$1")) {
                    z = 6;
                    break;
                }
                break;
            case -1736552564:
                if (implMethodName.equals("lambda$readPathEntity$463445c5$1")) {
                    z = false;
                    break;
                }
                break;
            case -1736552563:
                if (implMethodName.equals("lambda$readPathEntity$463445c5$2")) {
                    z = true;
                    break;
                }
                break;
            case -1721661444:
                if (implMethodName.equals("lambda$filterEmptyEntityFn$6ae6e6cd$1")) {
                    z = 8;
                    break;
                }
                break;
            case -522060732:
                if (implMethodName.equals("lambda$readPathEntity$624a24cf$1")) {
                    z = 3;
                    break;
                }
                break;
            case 798415563:
                if (implMethodName.equals("lambda$readRelatedEntities$b775e1b6$1")) {
                    z = 7;
                    break;
                }
                break;
            case 2011069112:
                if (implMethodName.equals("lambda$joinEntityWithRelatedEntities$642910ba$1")) {
                    z = 4;
                    break;
                }
                break;
            case 2011069113:
                if (implMethodName.equals("lambda$joinEntityWithRelatedEntities$642910ba$2")) {
                    z = 5;
                    break;
                }
                break;
            case 2011069114:
                if (implMethodName.equals("lambda$joinEntityWithRelatedEntities$642910ba$3")) {
                    z = 2;
                    break;
                }
                break;
        }
        switch (z) {
            case false:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Class;Ljava/lang/String;)Leu/dnetlib/dhp/schema/oaf/OafEntity;")) {
                    Class cls = (Class) serializedLambda.getCapturedArg(0);
                    return str2 -> {
                        return (OafEntity) OBJECT_MAPPER.readValue(str2, cls);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/Class;Leu/dnetlib/dhp/schema/oaf/OafEntity;)Leu/dnetlib/dhp/schema/oaf/OafEntity;")) {
                    Class cls2 = (Class) serializedLambda.getCapturedArg(0);
                    return oafEntity -> {
                        return pruneOutliers(cls2, oafEntity);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2") && serializedLambda.getImplMethodSignature().equals("(Lscala/Tuple2;)Leu/dnetlib/dhp/oa/provision/model/JoinedEntity;")) {
                    return tuple22 -> {
                        return (JoinedEntity) tuple22._2();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2") && serializedLambda.getImplMethodSignature().equals("(Leu/dnetlib/dhp/schema/oaf/OafEntity;)Lscala/Tuple2;")) {
                    return oafEntity2 -> {
                        return new Tuple2(oafEntity2.getId(), oafEntity2);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2") && serializedLambda.getImplMethodSignature().equals("(Lscala/Tuple2;)Leu/dnetlib/dhp/oa/provision/model/JoinedEntity;")) {
                    return tuple2 -> {
                        JoinedEntity joinedEntity = new JoinedEntity((OafEntity) ((Tuple2) tuple2._1())._2());
                        Optional.ofNullable(tuple2._2()).map((v0) -> {
                            return v0._2();
                        }).ifPresent(relatedEntityWrapper -> {
                            joinedEntity.getLinks().add(relatedEntityWrapper);
                        });
                        return joinedEntity;
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2") && serializedLambda.getImplMethodSignature().equals("(Leu/dnetlib/dhp/oa/provision/model/JoinedEntity;)Ljava/lang/String;")) {
                    return joinedEntity -> {
                        return joinedEntity.getEntity().getId();
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/MapFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Ljava/lang/Object;") && serializedLambda.getImplClass().equals("eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2") && serializedLambda.getImplMethodSignature().equals("(Leu/dnetlib/dhp/oa/provision/model/RelatedEntityWrapper;)Lscala/Tuple2;")) {
                    return relatedEntityWrapper2 -> {
                        return new Tuple2(relatedEntityWrapper2.getRelation().getSource(), relatedEntityWrapper2);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/FilterFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Z") && serializedLambda.getImplClass().equals("eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2") && serializedLambda.getImplMethodSignature().equals("(Ljava/lang/String;Leu/dnetlib/dhp/oa/provision/model/RelatedEntityWrapper;)Z")) {
                    String str = (String) serializedLambda.getCapturedArg(0);
                    return relatedEntityWrapper -> {
                        return relatedEntityWrapper.getRelation().getSource().startsWith(str);
                    };
                }
                break;
            case true:
                if (serializedLambda.getImplMethodKind() == 6 && serializedLambda.getFunctionalInterfaceClass().equals("org/apache/spark/api/java/function/FilterFunction") && serializedLambda.getFunctionalInterfaceMethodName().equals("call") && serializedLambda.getFunctionalInterfaceMethodSignature().equals("(Ljava/lang/Object;)Z") && serializedLambda.getImplClass().equals("eu/dnetlib/dhp/oa/provision/CreateRelatedEntitiesJob_phase2") && serializedLambda.getImplMethodSignature().equals("(Leu/dnetlib/dhp/oa/provision/model/JoinedEntity;)Z")) {
                    return joinedEntity2 -> {
                        return Objects.nonNull(joinedEntity2.getEntity());
                    };
                }
                break;
        }
        throw new IllegalArgumentException("Invalid lambda deserialization");
    }
}
