/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.msro.openaireplus.workflows.nodes.claims;

import com.googlecode.sarasvati.Arc;
import com.googlecode.sarasvati.NodeToken;
import eu.dnetlib.data.hadoop.rmi.HadoopService;
import eu.dnetlib.data.proto.KindProtos;
import eu.dnetlib.data.proto.OafProtos;
import eu.dnetlib.data.proto.ResultProtos;
import eu.dnetlib.data.proto.TypeProtos;
import eu.dnetlib.msro.openaireplus.workflows.nodes.claims.AbstractClaimsToHBASE;
import eu.dnetlib.msro.rmi.MSROException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.codec.binary.Base64;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.util.StringUtils;

public class ApplyClaimUpdatesJobNode
extends AbstractClaimsToHBASE {
    private static final Log log = LogFactory.getLog(ApplyClaimUpdatesJobNode.class);

    protected String execute(NodeToken token) throws Exception {
        long timestamp = System.currentTimeMillis();
        this.setTotal(this.getClaimDatabaseUtils().count(this.getCountQuery()));
        AtomicInteger discardedClaims = new AtomicInteger(0);
        HadoopService hadoopService = (HadoopService)this.getServiceLocator().getService(HadoopService.class);
        this.getClaimDatabaseUtils().query(this.getSql()).forEach(claim -> {
            try {
                log.debug(claim);
                String contextId = claim.getSource();
                String rowKey = this.getFullId(this.getOpenAIREType(claim.getTargetType()), claim.getTarget());
                String value = this.getValue(rowKey, contextId, timestamp);
                hadoopService.addHBaseColumn(this.getClusterName(), this.getTableName(), rowKey, "result", "update_" + System.nanoTime(), value);
                this.incrementProcessed();
            }
            catch (Exception e) {
                log.error((Object)("Discarding claim " + claim + ". Cause: " + e.getMessage()));
                discardedClaims.incrementAndGet();
            }
        });
        log.info((Object)("Total Claim Updates: " + this.getTotal()));
        token.getEnv().setAttribute("claimUpdatesSize", (Object)this.getTotal());
        log.info((Object)("Claim updates: " + this.getProcessed()));
        token.getEnv().setAttribute("claimUpdates", (Object)this.getProcessed());
        log.info((Object)("Discarded Claim Updates: " + discardedClaims.intValue()));
        token.getEnv().setAttribute("discardedClaimUpdates", (Object)discardedClaims.intValue());
        return Arc.DEFAULT_ARC;
    }

    protected String getValue(String rowkey, String contextid, long timestamp) throws MSROException {
        log.debug((Object)StringUtils.format((String)"%s --  %s", (Object[])new Object[]{rowkey, contextid}));
        ResultProtos.Result.Builder resultBuilder = ResultProtos.Result.newBuilder().setMetadata(ResultProtos.Result.Metadata.newBuilder().addContext(this.getContext(contextid)));
        OafProtos.OafEntity.Builder entityBuilder = OafProtos.OafEntity.newBuilder().setId(rowkey).setType(TypeProtos.Type.result).setResult(resultBuilder);
        OafProtos.Oaf.Builder builder = OafProtos.Oaf.newBuilder().setKind(KindProtos.Kind.entity).setLastupdatetimestamp(timestamp).setEntity(entityBuilder);
        return Base64.encodeBase64String((byte[])builder.build().toByteArray());
    }

    private ResultProtos.Result.Context getContext(String sourceId) {
        return ResultProtos.Result.Context.newBuilder().setDataInfo(ApplyClaimUpdatesJobNode.getDataInfo()).setId(sourceId).build();
    }
}

