/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.data.mapreduce.hbase.broker.enrich;

import com.google.common.base.Function;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.google.protobuf.InvalidProtocolBufferException;
import eu.dnetlib.data.broker.model.openaire.OpenAireEventPayload;
import eu.dnetlib.data.mapreduce.hbase.broker.Topic;
import eu.dnetlib.data.mapreduce.hbase.broker.mapping.EventFactory;
import eu.dnetlib.data.mapreduce.hbase.broker.mapping.HighlightFactory;
import eu.dnetlib.data.mapreduce.hbase.broker.mapping.OpenAireEventPayloadFactory;
import eu.dnetlib.data.mapreduce.hbase.broker.model.EventMessage;
import eu.dnetlib.data.mapreduce.util.OafHbaseUtils;
import eu.dnetlib.data.proto.FieldTypeProtos;
import eu.dnetlib.data.proto.OafProtos;
import eu.dnetlib.data.proto.ResultProtos;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.filter.Filter;
import org.apache.hadoop.hbase.filter.FilterList;
import org.apache.hadoop.hbase.filter.PrefixFilter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

public class EnrichmentReducer
extends Reducer<ImmutableBytesWritable, ImmutableBytesWritable, Text, Text> {
    private static final int LIMIT = 1000;
    private Set<String> pidType;
    private Text tKey;
    private Text tValue;
    private Map<String, String> dsTypeMap = Maps.newHashMap();
    private Set<String> dsWhitelist = Sets.newHashSet();
    private Set<String> dsBlacklist = Sets.newHashSet();
    private Set<String> untrustedOaDsList = Sets.newHashSet();
    private Set<String> dsTypeWhitelist = Sets.newHashSet();

    protected void setup(Reducer.Context context) throws IOException, InterruptedException {
        super.setup(context);
        System.out.println("LIMIT: 1000");
        this.tKey = new Text("");
        this.tValue = new Text();
        this.pidType = Sets.newHashSet((Object[])new String[]{"doi", "pmc", "pmid", "urn", "arxiv"});
        this.dsWhitelist.addAll(OafHbaseUtils.getPropertyValues(context, "broker.datasource.id.whitelist"));
        this.dsBlacklist.addAll(OafHbaseUtils.getPropertyValues(context, "broker.datasource.id.blacklist"));
        this.dsTypeWhitelist.addAll(OafHbaseUtils.getPropertyValues(context, "broker.datasource.type.whitelist"));
        this.untrustedOaDsList.addAll(OafHbaseUtils.getPropertyValues(context, "broker.datasource.untrusted.oa.list"));
        this.dsTypeMap = this.getDsTypeMap(context, this.dsTypeWhitelist);
        System.out.println("datasource whitelist: " + this.dsWhitelist);
        System.out.println("datasource blacklist: " + this.dsBlacklist);
        System.out.println("datasource OA list: " + this.untrustedOaDsList);
        System.out.println("datasource type whitelist: " + this.dsTypeWhitelist);
    }

    private Map<String, String> getDsTypeMap(Reducer.Context context, Set<String> dsTypeWhitelist) throws IOException {
        System.out.println("loading datasource typology mapping");
        HashMap dsTypeMap = Maps.newHashMap();
        Scan scan = new Scan();
        FilterList fl = new FilterList(FilterList.Operator.MUST_PASS_ALL);
        fl.addFilter((Filter)new PrefixFilter(Bytes.toBytes((String)"10")));
        scan.setFilter((Filter)fl);
        scan.addFamily(Bytes.toBytes((String)"datasource"));
        String tableName = context.getConfiguration().get("hbase.mapred.inputtable");
        System.out.println(String.format("table name: '%s'", tableName));
        HTable table = new HTable(context.getConfiguration(), tableName);
        ResultScanner res = table.getScanner(scan);
        for (Result r : res) {
            byte[] b = r.getValue(Bytes.toBytes((String)"datasource"), Bytes.toBytes((String)"body"));
            if (b == null) continue;
            OafProtos.Oaf oaf = OafProtos.Oaf.parseFrom((byte[])b);
            String dsId = StringUtils.substringAfter((String)oaf.getEntity().getId(), (String)"|");
            String dsType = oaf.getEntity().getDatasource().getMetadata().getDatasourcetype().getClassid();
            if (!dsTypeWhitelist.contains(dsType)) continue;
            System.out.println(String.format("dsId '%s', dsType '%s'", dsId, dsType));
            dsTypeMap.put(dsId, dsType);
        }
        res.close();
        System.out.println("datasource type map size: " + dsTypeMap.size());
        return dsTypeMap;
    }

    protected void reduce(ImmutableBytesWritable key, Iterable<ImmutableBytesWritable> values, Reducer.Context context) throws IOException, InterruptedException {
        ArrayList oafList = Lists.newArrayList((Iterable)Iterables.transform((Iterable)Iterables.limit(values, (int)1000), this.oafDeserialiser()));
        this.generateEvents(oafList, context);
    }

    private void generateEvents(List<OafProtos.Oaf> oafList, Reducer.Context context) throws IOException, InterruptedException {
        for (OafProtos.Oaf current : oafList) {
            String currentId = current.getEntity().getId();
            String currentDsId = StringUtils.substringAfter((String)OafHbaseUtils.getKey(current.getEntity().getCollectedfromList()), (String)"|");
            String currentDsType = this.dsTypeMap.get(currentDsId);
            if (StringUtils.isBlank((String)currentDsType) && !this.dsWhitelist.contains(currentDsId)) {
                context.getCounter("events skipped", "datasource type excluded").increment(1L);
                continue;
            }
            if (this.dsBlacklist.contains(currentDsId)) {
                context.getCounter("events skipped", "datasource blacklisted").increment(1L);
                continue;
            }
            for (OafProtos.Oaf other : oafList) {
                OpenAireEventPayload payload;
                EventMessage event;
                OafProtos.Oaf oaf;
                OafProtos.Oaf.Builder prototype;
                String otherId = other.getEntity().getId();
                if (currentId.equals(otherId)) continue;
                for (final String type : this.pidType) {
                    if (this.hasPid(current, type) || !this.hasPid(other, type)) continue;
                    OafProtos.Oaf.Builder prototype2 = OafProtos.Oaf.newBuilder((OafProtos.Oaf)current);
                    Iterable pids = Iterables.filter((Iterable)other.getEntity().getPidList(), (Predicate)new Predicate<FieldTypeProtos.StructuredProperty>(){

                        public boolean apply(FieldTypeProtos.StructuredProperty pid) {
                            return pid.getQualifier().getClassid().equalsIgnoreCase(type);
                        }
                    });
                    prototype2.getEntityBuilder().addAllPid(pids);
                    OafProtos.Oaf oaf2 = prototype2.build();
                    EventMessage event2 = EventFactory.asEvent(oaf2.getEntity(), Topic.PID, other.getEntity());
                    OpenAireEventPayload payload2 = OpenAireEventPayloadFactory.fromOAF(oaf2.getEntity());
                    event2.setPayload(HighlightFactory.highlightEnrichPid(payload2, Lists.newArrayList((Iterable)pids)).toJSON());
                    this.emit(event2, context);
                    context.getCounter("event", Topic.PID.getValue()).increment(1L);
                }
                String otherDsId = StringUtils.substringAfter((String)OafHbaseUtils.getKey(other.getEntity().getCollectedfromList()), (String)"|");
                if (this.openAccessCheck(current, currentDsId, other, otherDsId)) {
                    prototype = OafProtos.Oaf.newBuilder((OafProtos.Oaf)current);
                    Iterable i = Iterables.filter((Iterable)other.getEntity().getResult().getInstanceList(), (Predicate)new Predicate<ResultProtos.Result.Instance>(){

                        public boolean apply(ResultProtos.Result.Instance i) {
                            return "OPEN".equalsIgnoreCase(i.getLicence().getClassid());
                        }
                    });
                    prototype.getEntityBuilder().getResultBuilder().addAllInstance(i);
                    oaf = prototype.build();
                    event = EventFactory.asEvent(oaf.getEntity(), Topic.OA_STATUS, other.getEntity());
                    payload = OpenAireEventPayloadFactory.fromOAF(oaf.getEntity());
                    event.setPayload(HighlightFactory.highlightEnrichOa(payload, Lists.newArrayList((Iterable)i)).toJSON());
                    this.emit(event, context);
                    context.getCounter("event", Topic.OA_STATUS.getValue()).increment(1L);
                }
                if (!this.hasAbstract(current) && this.hasAbstract(other)) {
                    prototype = OafProtos.Oaf.newBuilder((OafProtos.Oaf)current);
                    List descriptionList = other.getEntity().getResult().getMetadata().getDescriptionList();
                    prototype.getEntityBuilder().getResultBuilder().getMetadataBuilder().addAllDescription((Iterable)descriptionList);
                    oaf = prototype.build();
                    event = EventFactory.asEvent(oaf.getEntity(), Topic.ABSTRACT, other.getEntity());
                    payload = OpenAireEventPayloadFactory.fromOAF(oaf.getEntity());
                    event.setPayload(HighlightFactory.highlightEnrichAbstract(payload, descriptionList).toJSON());
                    this.emit(event, context);
                    context.getCounter("event", Topic.ABSTRACT.getValue()).increment(1L);
                }
                if (this.hasPubDate(current) || !this.hasPubDate(other)) continue;
                prototype = OafProtos.Oaf.newBuilder((OafProtos.Oaf)current);
                FieldTypeProtos.StringField date = other.getEntity().getResult().getMetadata().getDateofacceptance();
                prototype.getEntityBuilder().getResultBuilder().getMetadataBuilder().setDateofacceptance(date);
                oaf = prototype.build();
                event = EventFactory.asEvent(oaf.getEntity(), Topic.PUBLICATION_DATE, other.getEntity());
                payload = OpenAireEventPayloadFactory.fromOAF(oaf.getEntity());
                event.setPayload(HighlightFactory.highlightEnrichPublicationDate(payload, date).toJSON());
                this.emit(event, context);
                context.getCounter("event", Topic.PUBLICATION_DATE.getValue()).increment(1L);
            }
        }
    }

    private boolean openAccessCheck(OafProtos.Oaf current, String currentDsId, OafProtos.Oaf other, String otherDsId) {
        return (this.untrustedOaDsList.contains(currentDsId) || !this.hasAccess(current, "OPEN", false)) && !this.untrustedOaDsList.contains(otherDsId) && this.hasAccess(other, "OPEN", false);
    }

    private void emit(EventMessage e, Reducer.Context context) throws IOException, InterruptedException {
        this.tValue.set(e.toString());
        context.write((Object)this.tKey, (Object)this.tValue);
    }

    private boolean hasPubDate(OafProtos.Oaf current) {
        ResultProtos.Result.Metadata m = current.getEntity().getResult().getMetadata();
        return StringUtils.isNotBlank((String)m.getDateofacceptance().getValue());
    }

    private boolean hasAbstract(OafProtos.Oaf oaf) {
        return Iterables.all((Iterable)oaf.getEntity().getResult().getMetadata().getDescriptionList(), (Predicate)new Predicate<FieldTypeProtos.StringField>(){

            public boolean apply(FieldTypeProtos.StringField s) {
                return StringUtils.isNotBlank((String)s.getValue());
            }
        });
    }

    private boolean hasAccess(OafProtos.Oaf oaf, final String access, final boolean strict) {
        return Iterables.all((Iterable)oaf.getEntity().getChildrenList(), (Predicate)new Predicate<OafProtos.OafEntity>(){

            public boolean apply(OafProtos.OafEntity entity) {
                Predicate<ResultProtos.Result.Instance> p = new Predicate<ResultProtos.Result.Instance>(){

                    public boolean apply(ResultProtos.Result.Instance i) {
                        return access.equalsIgnoreCase(i.getLicence().getClassid());
                    }
                };
                return strict ? Iterables.all((Iterable)entity.getResult().getInstanceList(), (Predicate)p) : Iterables.any((Iterable)entity.getResult().getInstanceList(), (Predicate)p);
            }
        });
    }

    private boolean hasPid(OafProtos.Oaf oaf, final String type) {
        return Iterables.any((Iterable)oaf.getEntity().getPidList(), (Predicate)new Predicate<FieldTypeProtos.StructuredProperty>(){

            public boolean apply(FieldTypeProtos.StructuredProperty pid) {
                return pid.getQualifier().getClassid().equalsIgnoreCase(type);
            }
        });
    }

    private Function<ImmutableBytesWritable, OafProtos.Oaf> oafDeserialiser() {
        return new Function<ImmutableBytesWritable, OafProtos.Oaf>(){

            public OafProtos.Oaf apply(ImmutableBytesWritable input) {
                try {
                    return OafProtos.Oaf.parseFrom((byte[])input.copyBytes());
                }
                catch (InvalidProtocolBufferException e) {
                    throw new IllegalArgumentException(e);
                }
            }
        };
    }
}

