package eu.dnetlib.data.mapreduce.hbase.dedup;

import com.google.common.collect.Lists;
import eu.dnetlib.data.mapreduce.hbase.dedup.config.DedupConfig;
import eu.dnetlib.data.mapreduce.hbase.dedup.config.DedupConfigLoader;
import eu.dnetlib.data.proto.RelTypeProtos;
import eu.dnetlib.data.proto.TypeProtos;
import eu.dnetlib.pace.clustering.NGramUtils;
import eu.dnetlib.pace.config.Config;
import eu.dnetlib.pace.config.DynConf;
import eu.dnetlib.pace.distance.PaceDocumentDistance;
import eu.dnetlib.pace.model.Field;
import eu.dnetlib.pace.model.MapDocument;
import eu.dnetlib.pace.model.MapDocumentComparator;
import eu.dnetlib.pace.model.MapDocumentSerializer;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Queue;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.mapreduce.TableReducer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapreduce.Reducer;

/* loaded from: input_file:eu/dnetlib/data/mapreduce/hbase/dedup/DedupReducer.class */
public class DedupReducer extends TableReducer<Text, ImmutableBytesWritable, ImmutableBytesWritable> {
    private static final boolean WRITE_TO_WAL = false;
    private static final int LIMIT = 2000;
    private static final int FIELD_LIMIT = 10;
    private static final int WINDOW_SIZE = 200;
    private Config paceConf;
    private DedupConfig dedupConf;

    /* JADX INFO: Access modifiers changed from: package-private */
    /* renamed from: eu.dnetlib.data.mapreduce.hbase.dedup.DedupReducer$1, reason: invalid class name */
    /* loaded from: input_file:eu/dnetlib/data/mapreduce/hbase/dedup/DedupReducer$1.class */
    public static /* synthetic */ class AnonymousClass1 {
        static final /* synthetic */ int[] $SwitchMap$eu$dnetlib$data$proto$TypeProtos$Type = new int[TypeProtos.Type.values().length];

        static {
            try {
                $SwitchMap$eu$dnetlib$data$proto$TypeProtos$Type[TypeProtos.Type.person.ordinal()] = 1;
            } catch (NoSuchFieldError e) {
            }
            try {
                $SwitchMap$eu$dnetlib$data$proto$TypeProtos$Type[TypeProtos.Type.result.ordinal()] = 2;
            } catch (NoSuchFieldError e2) {
            }
            try {
                $SwitchMap$eu$dnetlib$data$proto$TypeProtos$Type[TypeProtos.Type.organization.ordinal()] = 3;
            } catch (NoSuchFieldError e3) {
            }
        }
    }

    protected void setup(Reducer<Text, ImmutableBytesWritable, ImmutableBytesWritable, Writable>.Context context) throws IOException, InterruptedException {
        this.paceConf = DynConf.load(context.getConfiguration().get("dedup.pace.conf"));
        this.dedupConf = DedupConfigLoader.load(context.getConfiguration().get("dedup.wf.conf"));
        System.out.println("dedup reduce phase \npace conf: " + this.paceConf.fields() + "\nwf conf: " + this.dedupConf.toString());
    }

    protected void reduce(Text text, Iterable<ImmutableBytesWritable> iterable, Reducer<Text, ImmutableBytesWritable, ImmutableBytesWritable, Writable>.Context context) throws IOException, InterruptedException {
        System.out.println("\nReducing key: '" + text + "'");
        Queue<MapDocument> prepare = prepare(context, text, iterable);
        switch (AnonymousClass1.$SwitchMap$eu$dnetlib$data$proto$TypeProtos$Type[this.dedupConf.getEntityType().ordinal()]) {
            case 1:
                process(prepare, context);
                return;
            case 2:
                process(simplifyQueue(prepare, text.toString(), context), context);
                return;
            case 3:
                process(prepare, context);
                return;
            default:
                throw new IllegalArgumentException("dedup not implemented for type: " + this.dedupConf.getEntityName());
        }
    }

    private Queue<MapDocument> prepare(Reducer<Text, ImmutableBytesWritable, ImmutableBytesWritable, Writable>.Context context, Text text, Iterable<ImmutableBytesWritable> iterable) {
        PriorityQueue priorityQueue = new PriorityQueue(100, new MapDocumentComparator(this.dedupConf.getOrderField()));
        HashSet hashSet = new HashSet();
        Iterator<ImmutableBytesWritable> it = iterable.iterator();
        while (it.hasNext()) {
            try {
                MapDocument decode = MapDocumentSerializer.decode(it.next().copyBytes());
                String identifier = decode.getIdentifier();
                if (!hashSet.contains(identifier)) {
                    hashSet.add(identifier);
                    priorityQueue.add(decode);
                }
            } catch (Throwable th) {
                System.out.println("Got exception: " + th);
            }
            if (priorityQueue.size() > LIMIT) {
                context.getCounter("ngram size > 2000", "N").increment(1L);
                System.out.println("breaking out after limit (2000) for ngram '" + text);
                break;
            }
            continue;
        }
        return priorityQueue;
    }

    private Queue<MapDocument> simplifyQueue(Queue<MapDocument> queue, String str, Reducer<Text, ImmutableBytesWritable, ImmutableBytesWritable, Writable>.Context context) {
        LinkedList linkedList = new LinkedList();
        String str2 = "";
        ArrayList newArrayList = Lists.newArrayList();
        while (!queue.isEmpty()) {
            MapDocument remove = queue.remove();
            if (remove.values(this.dedupConf.getOrderField()).isEmpty()) {
                context.getCounter("Records", "Without " + this.dedupConf.getOrderField()).increment(1L);
            } else {
                String cleanupForOrdering = NGramUtils.cleanupForOrdering(((Field) remove.values(this.dedupConf.getOrderField()).get(WRITE_TO_WAL)).getValue().toString());
                if (cleanupForOrdering.equals(str2)) {
                    newArrayList.add(remove);
                } else {
                    populateSimplifiedQueue(linkedList, newArrayList, context, str2, str);
                    newArrayList.clear();
                    newArrayList.add(remove);
                    str2 = cleanupForOrdering;
                }
            }
        }
        populateSimplifiedQueue(linkedList, newArrayList, context, str2, str);
        return linkedList;
    }

    private void populateSimplifiedQueue(Queue<MapDocument> queue, List<MapDocument> list, Reducer<Text, ImmutableBytesWritable, ImmutableBytesWritable, Writable>.Context context, String str, String str2) {
        if (list.size() < FIELD_LIMIT) {
            queue.addAll(list);
        } else {
            context.getCounter(getClass().getSimpleName(), "Skipped records for count(" + this.dedupConf.getOrderField() + ") >= " + FIELD_LIMIT).increment(list.size());
            System.out.println("Skipped field: " + str + " - size: " + list.size() + " - ngram: " + str2);
        }
    }

    private void process(Queue<MapDocument> queue, Reducer<Text, ImmutableBytesWritable, ImmutableBytesWritable, Writable>.Context context) throws IOException, InterruptedException {
        PaceDocumentDistance paceDocumentDistance = new PaceDocumentDistance();
        while (!queue.isEmpty()) {
            MapDocument remove = queue.remove();
            String identifier = remove.getIdentifier();
            List values = remove.values(this.dedupConf.getOrderField());
            String obj = (values == null || values.isEmpty()) ? null : ((Field) values.get(WRITE_TO_WAL)).getValue().toString();
            if (obj != null) {
                System.out.println(identifier + " --> " + obj);
                int i = WRITE_TO_WAL;
                Iterator<MapDocument> it = queue.iterator();
                while (true) {
                    if (it.hasNext()) {
                        MapDocument next = it.next();
                        String identifier2 = next.getIdentifier();
                        if (mustSkip(identifier2)) {
                            context.getCounter(this.dedupConf.getEntityName(), "skip list").increment(1L);
                            break;
                        }
                        if (i > WINDOW_SIZE) {
                            break;
                        }
                        List values2 = next.values(this.dedupConf.getOrderField());
                        String obj2 = (values2 == null || values2.isEmpty()) ? null : ((Field) values2.get(WRITE_TO_WAL)).getValue().toString();
                        if (!identifier2.equals(identifier) && obj2 != null) {
                            if (paceDocumentDistance.between(remove, next, this.paceConf) >= this.dedupConf.getThreshold()) {
                                writeSimilarity(context, identifier, identifier2);
                                context.getCounter(this.dedupConf.getEntityName(), "similarRel (x2)").increment(1L);
                            } else {
                                context.getCounter(this.dedupConf.getEntityName(), "d < " + this.dedupConf.getThreshold()).increment(1L);
                            }
                            i++;
                        }
                    }
                }
            }
        }
    }

    private boolean mustSkip(String str) {
        return this.dedupConf.getSkipList().contains(getNsPrefix(str));
    }

    private String getNsPrefix(String str) {
        return StringUtils.substringBetween(str, "|", "::");
    }

    private void writeSimilarity(Reducer<Text, ImmutableBytesWritable, ImmutableBytesWritable, Writable>.Context context, String str, String str2) throws IOException, InterruptedException {
        byte[] bytes = Bytes.toBytes(str);
        byte[] bytes2 = Bytes.toBytes(str2);
        emitRel(context, bytes, bytes2);
        emitRel(context, bytes2, bytes);
    }

    private void emitRel(Reducer<Text, ImmutableBytesWritable, ImmutableBytesWritable, Writable>.Context context, byte[] bArr, byte[] bArr2) throws IOException, InterruptedException {
        Put add = new Put(bArr).add(Bytes.toBytes(RelTypeProtos.RelType.similarRel.toString()), bArr2, Bytes.toBytes(""));
        add.setWriteToWAL(false);
        context.write(new ImmutableBytesWritable(bArr), add);
    }

    protected /* bridge */ /* synthetic */ void reduce(Object obj, Iterable iterable, Reducer.Context context) throws IOException, InterruptedException {
        reduce((Text) obj, (Iterable<ImmutableBytesWritable>) iterable, (Reducer<Text, ImmutableBytesWritable, ImmutableBytesWritable, Writable>.Context) context);
    }
}
