package eu.dnetlib.data.mapreduce.hbase.broker.enrich;

import com.google.common.base.Function;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.protobuf.InvalidProtocolBufferException;
import eu.dnetlib.data.mapreduce.Algorithms;
import eu.dnetlib.data.mapreduce.JobParams;
import eu.dnetlib.data.mapreduce.hbase.broker.AbstractEventFactory;
import eu.dnetlib.data.mapreduce.hbase.broker.OAVersionEventFactory;
import eu.dnetlib.data.mapreduce.hbase.broker.PIDEventFactory;
import eu.dnetlib.data.mapreduce.hbase.broker.PublicationDateEventFactory;
import eu.dnetlib.data.mapreduce.hbase.broker.SubjectEventFactory;
import eu.dnetlib.data.mapreduce.util.DedupUtils;
import eu.dnetlib.data.mapreduce.util.OafHbaseUtils;
import eu.dnetlib.data.proto.OafProtos;
import eu.dnetlib.pace.config.DedupConfig;
import eu.dnetlib.pace.distance.PaceDocumentDistance;
import eu.dnetlib.pace.model.ProtoDocumentBuilder;
import java.io.IOException;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.math.util.MathUtils;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.dom4j.DocumentException;

/* loaded from: input_file:eu/dnetlib/data/mapreduce/hbase/broker/enrich/EnrichmentReducer.class */
public class EnrichmentReducer extends Reducer<ImmutableBytesWritable, ImmutableBytesWritable, Text, Text> {
    private static final int LIMIT = 1000;
    private static final int SCORE_DECIMALS = 2;
    private Map<String, String> dsTypeMap = Maps.newHashMap();
    private Set<String> dsWhitelist = Sets.newHashSet();
    private Set<String> dsBlacklist = Sets.newHashSet();
    private Set<String> untrustedOaDsList = Sets.newHashSet();
    private Set<String> dsTypeWhitelist = Sets.newHashSet();
    private DedupConfig dedupConf;
    private double scaleLB;

    protected void setup(Reducer<ImmutableBytesWritable, ImmutableBytesWritable, Text, Text>.Context context) throws IOException, InterruptedException {
        super.setup(context);
        System.out.println("LIMIT: 1000");
        this.dsWhitelist.addAll(OafHbaseUtils.getPropertyValues(context, "broker.datasource.id.whitelist"));
        this.dsBlacklist.addAll(OafHbaseUtils.getPropertyValues(context, "broker.datasource.id.blacklist"));
        this.dsTypeWhitelist.addAll(OafHbaseUtils.getPropertyValues(context, "broker.datasource.type.whitelist"));
        this.untrustedOaDsList.addAll(OafHbaseUtils.getPropertyValues(context, "broker.datasource.untrusted.oa.list"));
        this.dsTypeMap = getDsTypeMap(context, this.dsTypeWhitelist);
        System.out.println("datasource whitelist: " + this.dsWhitelist);
        System.out.println("datasource blacklist: " + this.dsBlacklist);
        System.out.println("datasource OA list: " + this.untrustedOaDsList);
        System.out.println("datasource type whitelist: " + this.dsTypeWhitelist);
        String str = context.getConfiguration().get(JobParams.DEDUP_CONF);
        System.out.println("got dedup conf: " + str);
        this.dedupConf = DedupConfig.load(str);
        System.out.println("parsed dedup conf: " + this.dedupConf.toString());
        this.scaleLB = this.dedupConf.getWf().getThreshold() - 0.01d;
    }

    private Map<String, String> getDsTypeMap(Reducer<ImmutableBytesWritable, ImmutableBytesWritable, Text, Text>.Context context, Set<String> set) throws IOException {
        System.out.println("loading datasource typology mapping");
        HashMap newHashMap = Maps.newHashMap();
        Scan scan = new Scan();
        FilterList filterList = new FilterList(FilterList.Operator.MUST_PASS_ALL);
        filterList.addFilter(new PrefixFilter(Bytes.toBytes("10")));
        scan.setFilter(filterList);
        scan.addFamily(Bytes.toBytes("datasource"));
        String str = context.getConfiguration().get("hbase.mapred.inputtable");
        System.out.println(String.format("table name: '%s'", str));
        HTable hTable = new HTable(context.getConfiguration(), str);
        Throwable th = null;
        try {
            ResultScanner scanner = hTable.getScanner(scan);
            Throwable th2 = null;
            try {
                try {
                    Iterator it = scanner.iterator();
                    while (it.hasNext()) {
                        byte[] value = ((Result) it.next()).getValue(Bytes.toBytes("datasource"), Bytes.toBytes("body"));
                        if (value != null) {
                            OafProtos.Oaf parseFrom = OafProtos.Oaf.parseFrom(value);
                            String substringAfter = StringUtils.substringAfter(parseFrom.getEntity().getId(), "|");
                            String classid = parseFrom.getEntity().getDatasource().getMetadata().getDatasourcetype().getClassid();
                            if (set.contains(classid)) {
                                System.out.println(String.format("dsId '%s', dsType '%s'", substringAfter, classid));
                                newHashMap.put(substringAfter, classid);
                            }
                        }
                    }
                    if (scanner != null) {
                        if (0 != 0) {
                            try {
                                scanner.close();
                            } catch (Throwable th3) {
                                th2.addSuppressed(th3);
                            }
                        } else {
                            scanner.close();
                        }
                    }
                    System.out.println("datasource type map size: " + newHashMap.size());
                    return newHashMap;
                } finally {
                }
            } catch (Throwable th4) {
                if (scanner != null) {
                    if (th2 != null) {
                        try {
                            scanner.close();
                        } catch (Throwable th5) {
                            th2.addSuppressed(th5);
                        }
                    } else {
                        scanner.close();
                    }
                }
                throw th4;
            }
        } finally {
            if (hTable != null) {
                if (0 != 0) {
                    try {
                        hTable.close();
                    } catch (Throwable th6) {
                        th.addSuppressed(th6);
                    }
                } else {
                    hTable.close();
                }
            }
        }
    }

    protected void reduce(ImmutableBytesWritable immutableBytesWritable, Iterable<ImmutableBytesWritable> iterable, Reducer<ImmutableBytesWritable, ImmutableBytesWritable, Text, Text>.Context context) throws IOException, InterruptedException {
        try {
            generateEvents(Lists.newArrayList(Iterables.transform(Iterables.limit(iterable, LIMIT), oafDeserialiser())), context);
        } catch (DocumentException e) {
            throw new RuntimeException((Throwable) e);
        }
    }

    private void generateEvents(List<OafProtos.Oaf> list, Reducer<ImmutableBytesWritable, ImmutableBytesWritable, Text, Text>.Context context) throws IOException, InterruptedException, DocumentException {
        for (OafProtos.Oaf oaf : list) {
            String id = oaf.getEntity().getId();
            String substringAfter = StringUtils.substringAfter(OafHbaseUtils.getKey((Iterable) oaf.getEntity().getCollectedfromList()), "|");
            if (StringUtils.isBlank(this.dsTypeMap.get(substringAfter)) && !this.dsWhitelist.contains(substringAfter)) {
                context.getCounter("events skipped", "datasource type excluded").increment(1L);
            } else if (this.dsBlacklist.contains(substringAfter)) {
                context.getCounter("events skipped", "datasource blacklisted").increment(1L);
            } else {
                for (OafProtos.Oaf oaf2 : list) {
                    if (!id.equals(oaf2.getEntity().getId())) {
                        float similarity = similarity(oaf, oaf2);
                        if (!DedupUtils.isRoot(oaf.getEntity().getId())) {
                            PIDEventFactory.process(context, oaf, oaf2, similarity);
                            OAVersionEventFactory.process(context, oaf, oaf2, similarity, this.untrustedOaDsList);
                            AbstractEventFactory.process(context, oaf, oaf2, similarity);
                            PublicationDateEventFactory.process(context, oaf, oaf2, similarity);
                        }
                        SubjectEventFactory.process(context, oaf, oaf2, similarity);
                    } else if (list.size() == 1) {
                        SubjectEventFactory.process(context, oaf);
                    }
                }
            }
        }
    }

    private float similarity(OafProtos.Oaf oaf, OafProtos.Oaf oaf2) {
        return MathUtils.round((float) Algorithms.scale(new PaceDocumentDistance().between(ProtoDocumentBuilder.newInstance(oaf.getEntity().getId(), oaf.getEntity(), this.dedupConf.getPace().getModel()), ProtoDocumentBuilder.newInstance(oaf2.getEntity().getId(), oaf2.getEntity(), this.dedupConf.getPace().getModel()), this.dedupConf).getScore(), this.scaleLB, 1.0d, 0.0d, 1.0d), SCORE_DECIMALS);
    }

    private Function<ImmutableBytesWritable, OafProtos.Oaf> oafDeserialiser() {
        return new Function<ImmutableBytesWritable, OafProtos.Oaf>() { // from class: eu.dnetlib.data.mapreduce.hbase.broker.enrich.EnrichmentReducer.1
            public OafProtos.Oaf apply(ImmutableBytesWritable immutableBytesWritable) {
                try {
                    return OafProtos.Oaf.parseFrom(immutableBytesWritable.copyBytes());
                } catch (InvalidProtocolBufferException e) {
                    throw new IllegalArgumentException((Throwable) e);
                }
            }
        };
    }

    protected /* bridge */ /* synthetic */ void reduce(Object obj, Iterable iterable, Reducer.Context context) throws IOException, InterruptedException {
        reduce((ImmutableBytesWritable) obj, (Iterable<ImmutableBytesWritable>) iterable, (Reducer<ImmutableBytesWritable, ImmutableBytesWritable, Text, Text>.Context) context);
    }
}
