package eu.dnetlib.data.hadoop.oozie;

import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import eu.dnetlib.data.hadoop.action.JobCompletion;
import eu.dnetlib.data.hadoop.action.JobMonitor;
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException;
import java.io.IOException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.OozieClientException;
import org.apache.oozie.client.WorkflowAction;
import org.apache.oozie.client.WorkflowJob;

/* loaded from: input_file:eu/dnetlib/data/hadoop/oozie/OozieJobMonitor.class */
public class OozieJobMonitor extends JobMonitor {
    private static final Log log = LogFactory.getLog(JobMonitor.class);
    private final OozieClient oozieClient;
    private final String jobId;
    public static final String ACTION_TYPE_SUBWORKFLOW = "sub-workflow";
    private Set<String> workflowActions;

    @Deprecated
    public OozieJobMonitor(OozieClient oozieClient, String str, JobCompletion jobCompletion) {
        super(jobCompletion);
        this.workflowActions = Sets.newHashSet();
        this.oozieClient = oozieClient;
        this.jobId = str;
    }

    public OozieJobMonitor(OozieClient oozieClient, String str, JobCompletion jobCompletion, Set<String> set) {
        super(jobCompletion);
        this.workflowActions = Sets.newHashSet();
        this.oozieClient = oozieClient;
        this.jobId = str;
        this.workflowActions = set;
    }

    @Override // eu.dnetlib.data.hadoop.action.JobMonitor, java.lang.Runnable
    public void run() {
        try {
            log.info("waiting for oozie job completion: " + getHadoopId());
            WorkflowJob.Status status = WorkflowJob.Status.PREP;
            while (true) {
                if (!status.equals(WorkflowJob.Status.PREP) && !status.equals(WorkflowJob.Status.RUNNING) && !status.equals(WorkflowJob.Status.SUSPENDED)) {
                    break;
                }
                Thread.sleep(10000L);
                try {
                    WorkflowJob.Status doGetStatus = doGetStatus();
                    if (!doGetStatus.equals(status)) {
                        log.debug(String.format("jobId %s status changed from %s to %s", this.jobId, status.toString(), doGetStatus.toString()));
                        status = doGetStatus;
                        this.lastActivity = new Date();
                    }
                } catch (Throwable th) {
                    log.warn(String.format("error polling status for job %s", this.jobId), th);
                }
            }
            log.info(String.format("looking for oozie job(%s) output values: %s", getHadoopId(), this.workflowActions));
            Map<String, String> report = getReport(getOozieClient(), getHadoopId(), this.workflowActions);
            log.debug(String.format("job %s finished with status: %s", this.jobId, status));
            if (WorkflowJob.Status.SUCCEEDED.equals(status)) {
                getCallback().done(report);
            } else {
                String format = String.format("hadoop job: %s failed with status: %s, oozie log:\n %s\n", getHadoopId(), getStatus(), getOozieClient().getJobLog(getHadoopId()));
                getCallback().failed(report, format, new HadoopServiceException(format));
            }
        } catch (Throwable th2) {
            getCallback().failed(Maps.newHashMap(), getHadoopId(), th2);
        }
    }

    private static Map<String, String> getReport(OozieClient oozieClient, String str, Set<String> set) throws OozieClientException, IOException {
        HashMap newHashMap = Maps.newHashMap();
        WorkflowJob jobInfo = oozieClient.getJobInfo(str);
        for (WorkflowAction workflowAction : jobInfo.getActions()) {
            log.info(String.format("looking for workflow actions to report, current: '%s'", workflowAction.getName()));
            if (set.contains(workflowAction.getName())) {
                log.info(String.format("found workflow action %s", workflowAction.getName()));
                if (ACTION_TYPE_SUBWORKFLOW.equals(workflowAction.getType())) {
                    log.info(String.format("looking for sub-workflow actions external id: %s", workflowAction.getExternalId()));
                    Map<String, String> report = getReport(oozieClient, workflowAction.getExternalId(), set);
                    if (report != null) {
                        return report;
                    }
                } else if (StringUtils.isNotBlank(workflowAction.getData())) {
                    Properties properties = new Properties();
                    properties.load(IOUtils.toInputStream(workflowAction.getData()));
                    properties.forEach((obj, obj2) -> {
                    });
                    newHashMap.entrySet().forEach(entry -> {
                        log.info(((String) entry.getKey()) + " - " + ((String) entry.getValue()));
                    });
                    log.info(String.format("found workflow action(%s) properties size %s", workflowAction.getName(), Integer.valueOf(properties.values().size())));
                }
            } else {
                log.info(String.format("cannot find workflow action(%s) properties", workflowAction.getName()));
            }
        }
        log.info(String.format("found workflow (%s) properties size %s", jobInfo.getAppName(), Integer.valueOf(newHashMap.values().size())));
        return newHashMap;
    }

    @Override // eu.dnetlib.data.hadoop.action.JobMonitor
    public String getHadoopId() {
        return this.jobId;
    }

    public OozieClient getOozieClient() {
        return this.oozieClient;
    }

    @Override // eu.dnetlib.data.hadoop.action.JobMonitor
    public String getStatus() {
        try {
            return doGetStatus().toString();
        } catch (OozieClientException e) {
            log.error("error accessing job status", e);
            return "UNKNOWN";
        }
    }

    private WorkflowJob.Status doGetStatus() throws OozieClientException {
        return getOozieClient().getJobInfo(getHadoopId()).getStatus();
    }

    @Override // eu.dnetlib.data.hadoop.action.JobMonitor
    public Date getLastActivity() {
        return this.lastActivity;
    }

    @Override // eu.dnetlib.data.hadoop.action.JobMonitor
    public Date getStartTime() throws HadoopServiceException {
        try {
            return getOozieClient().getJobInfo(getHadoopId()).getStartTime();
        } catch (OozieClientException e) {
            throw new HadoopServiceException("unable to read job start time", e);
        }
    }

    @Override // eu.dnetlib.data.hadoop.action.JobMonitor
    public String getTrackerUrl() {
        return getOozieClient().getOozieUrl();
    }

    @Override // eu.dnetlib.data.hadoop.action.JobMonitor
    public void kill() {
        try {
            getOozieClient().kill(getHadoopId());
        } catch (OozieClientException e) {
            log.error("unable to kill job: " + getHadoopId(), e);
        }
    }
}
