package eu.dnetlib.msro.openaireplus.workflows.nodes.hadoop;

import com.googlecode.sarasvati.Engine;
import com.googlecode.sarasvati.NodeToken;
import com.googlecode.sarasvati.env.Env;
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
import eu.dnetlib.message.Message;
import eu.dnetlib.msro.message.DnetMessageManager;
import eu.dnetlib.msro.workflows.hadoop.SubmitHadoopJobNode;
import eu.dnetlib.msro.workflows.nodes.ProgressJobNode;
import eu.dnetlib.msro.workflows.nodes.blackboard.BlackboardWorkflowJobListener;
import eu.dnetlib.msro.workflows.util.ProgressProvider;
import java.util.List;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Autowired;

/* loaded from: input_file:eu/dnetlib/msro/openaireplus/workflows/nodes/hadoop/SubmitDnetHadoopJobNode.class */
public class SubmitDnetHadoopJobNode extends SubmitHadoopJobNode implements ProgressProvider, ProgressJobNode {
    private static final Log log = LogFactory.getLog(SubmitDnetHadoopJobNode.class);

    @Autowired
    DnetMessageManager dnetMessageManager;
    private boolean ongoing = true;
    private int currentValue;
    private String wfId;

    protected void prepareJob(BlackboardJob blackboardJob, NodeToken nodeToken) throws Exception {
        this.wfId = nodeToken.getProcess().getEnv().getAttribute("system:processId");
        new Thread(() -> {
            while (this.ongoing) {
                Message onGoingMessages = this.dnetMessageManager.getOnGoingMessages(this.wfId);
                if (onGoingMessages != null && onGoingMessages.getBody() != null && onGoingMessages.getBody().containsKey("ongoing")) {
                    try {
                        this.currentValue = Integer.parseInt((String) onGoingMessages.getBody().get("ongoing"));
                        Thread.sleep(1000L);
                    } catch (Throwable th) {
                        log.error("Error ono receiving messages ", th);
                    }
                }
            }
        }).start();
        super.prepareJob(blackboardJob, nodeToken);
    }

    protected BlackboardWorkflowJobListener generateBlackboardListener(Engine engine, final NodeToken nodeToken) {
        return new BlackboardWorkflowJobListener(engine, nodeToken) { // from class: eu.dnetlib.msro.openaireplus.workflows.nodes.hadoop.SubmitDnetHadoopJobNode.1
            protected void onFailed(BlackboardJob blackboardJob) {
                SubmitDnetHadoopJobNode.this.ongoing = false;
                SubmitDnetHadoopJobNode.log.warn("Blackboard workflow node FAILED: " + blackboardJob.getError());
                nodeToken.getEnv().setAttribute("system:hasFailed", true);
                nodeToken.getEnv().setAttribute("system:error", blackboardJob.getError());
                complete(blackboardJob, "abort");
            }

            protected void populateEnv(Env env, Map<String, String> map) {
                SubmitDnetHadoopJobNode.this.ongoing = false;
                List<Message> report = SubmitDnetHadoopJobNode.this.dnetMessageManager.getReport(SubmitDnetHadoopJobNode.this.wfId);
                if (report != null) {
                    report.forEach(message -> {
                        Map body = message.getBody();
                        env.getClass();
                        body.forEach(env::setAttribute);
                    });
                }
            }
        };
    }

    public int getTotalValue() {
        return 0;
    }

    public int getCurrentValue() {
        return this.currentValue;
    }

    public boolean isInaccurate() {
        return false;
    }

    public ProgressProvider getProgressProvider() {
        return this;
    }
}
