package eu.dnetlib.parthenos.workflows.nodes;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.gson.Gson;
import eu.dnetlib.enabling.resultset.client.ResultSetClient;
import eu.dnetlib.msro.workflows.graph.Arc;
import eu.dnetlib.msro.workflows.nodes.AsyncJobNode;
import eu.dnetlib.msro.workflows.procs.Env;
import eu.dnetlib.rmi.common.ResultSet;
import eu.dnetlib.rmi.manager.MSROException;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.http.HttpResponse;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicNameValuePair;
import org.springframework.beans.factory.annotation.Autowired;

/* loaded from: input_file:eu/dnetlib/parthenos/workflows/nodes/PublishAbstractJobNode.class */
public abstract class PublishAbstractJobNode extends AsyncJobNode {
    private static final Log log = LogFactory.getLog(PublishAbstractJobNode.class);
    private String inputEprParam;

    @Autowired
    private ResultSetClient resultSetClient;
    private String publisherEndpoint;
    private int nThreads = 5;
    private int nLatch = 15;
    private ExecutorService executorService = Executors.newFixedThreadPool(this.nThreads);
    private List<Future<Integer>> resList = Lists.newArrayList();

    protected String execute(Env env) throws Exception {
        ResultSet resultSet = (ResultSet) env.getAttribute(getInputEprParam(), ResultSet.class);
        if (resultSet == null) {
            throw new MSROException("InputEprParam (" + getInputEprParam() + ") not found in ENV");
        }
        int i = 0;
        int i2 = 0;
        HashMap newHashMap = Maps.newHashMap();
        log.info("Publisher endpoint: " + getPublisherEndpoint());
        CountDownLatch countDownLatch = new CountDownLatch(this.nLatch);
        for (String str : getResultSetClient().iter(resultSet, String.class)) {
            i++;
            if (i >= this.nLatch + 1 && i % this.nLatch == 1) {
                log.debug("Waiting for tasks to complete before resubmitting to executor (countAll = " + i + ") . . . ");
                long currentTimeMillis = System.currentTimeMillis();
                countDownLatch.await();
                log.debug(". . . Ready to submit again after " + (System.currentTimeMillis() - currentTimeMillis) + " ms");
            }
            this.resList.add(this.executorService.submit(() -> {
                try {
                    try {
                        HttpPost httpPost = new HttpPost(getPublisherEndpoint());
                        ArrayList newArrayList = Lists.newArrayList();
                        newArrayList.add(new BasicNameValuePair("record", str));
                        newArrayList.add(new BasicNameValuePair("parthenosTarget", getTarget()));
                        httpPost.setEntity(new UrlEncodedFormEntity(newArrayList, "UTF-8"));
                        HttpResponse execute = HttpClients.createDefault().execute(httpPost);
                        int statusCode = execute.getStatusLine().getStatusCode();
                        switch (statusCode) {
                            case 200:
                                Integer valueOf = Integer.valueOf(statusCode);
                                countDownLatch.countDown();
                                return valueOf;
                            default:
                                log.error(execute.getStatusLine().getStatusCode() + ": " + execute.getStatusLine().getReasonPhrase());
                                log.error("Source record causing error: " + str);
                                newHashMap.merge(Integer.valueOf(statusCode), 1, (v0, v1) -> {
                                    return Integer.sum(v0, v1);
                                });
                                Integer valueOf2 = Integer.valueOf(statusCode);
                                countDownLatch.countDown();
                                return valueOf2;
                        }
                    } catch (IOException e) {
                        log.error(e.getMessage());
                        newHashMap.merge(-1, 1, (v0, v1) -> {
                            return Integer.sum(v0, v1);
                        });
                        countDownLatch.countDown();
                        return -1;
                    }
                } catch (Throwable th) {
                    countDownLatch.countDown();
                    throw th;
                }
            }));
        }
        this.executorService.shutdown();
        log.info("Waiting for responses");
        Iterator<Future<Integer>> it = this.resList.iterator();
        while (it.hasNext()) {
            if (it.next().get().intValue() == 200) {
                i2++;
            }
        }
        log.info(String.format("Got all responses. Ok %s/%s", Integer.valueOf(i2), Integer.valueOf(i)));
        env.setAttribute("mainlog:countOk", Integer.valueOf(i2));
        env.setAttribute("mainlog:countAll", Integer.valueOf(i));
        env.setAttribute("mainlog:errorsMap", new Gson().toJson(newHashMap));
        log.info("publishing completed");
        if (!newHashMap.isEmpty()) {
            log.warn("Problems in publishing on " + getTarget() + ": " + i2 + "/" + i + " see error maps for details");
        }
        if (i == 0) {
            log.warn("0 resources to publish");
        }
        return Arc.DEFAULT_ARC;
    }

    public abstract String getTarget();

    public String getInputEprParam() {
        return this.inputEprParam;
    }

    public void setInputEprParam(String str) {
        this.inputEprParam = str;
    }

    public String getPublisherEndpoint() {
        return this.publisherEndpoint;
    }

    public void setPublisherEndpoint(String str) {
        this.publisherEndpoint = str;
    }

    public ResultSetClient getResultSetClient() {
        return this.resultSetClient;
    }

    public void setResultSetClient(ResultSetClient resultSetClient) {
        this.resultSetClient = resultSetClient;
    }
}
