/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.data.hadoop.action;

import com.google.common.collect.Sets;
import eu.dnetlib.data.hadoop.HadoopClientMap;
import eu.dnetlib.data.hadoop.HadoopJob;
import eu.dnetlib.data.hadoop.JobRegistry;
import eu.dnetlib.data.hadoop.action.AbstractSubmitAction;
import eu.dnetlib.data.hadoop.action.JobCompletion;
import eu.dnetlib.data.hadoop.config.ClusterName;
import eu.dnetlib.data.hadoop.mapred.MapreduceJobMonitor;
import eu.dnetlib.data.hadoop.rmi.HadoopServiceException;
import eu.dnetlib.data.hadoop.utils.JobProfile;
import eu.dnetlib.data.hadoop.utils.ScanFactory;
import eu.dnetlib.data.hadoop.utils.ScanProperties;
import eu.dnetlib.enabling.tools.blackboard.BlackboardJob;
import eu.dnetlib.enabling.tools.blackboard.BlackboardServerHandler;
import eu.dnetlib.miscutils.functional.xml.IndentXmlString;
import java.io.IOException;
import java.io.StringWriter;
import java.io.Writer;
import java.util.Map;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.RunningJob;
import org.springframework.beans.factory.annotation.Autowired;

public class SubmitMapreduceJobAction
extends AbstractSubmitAction {
    private static final Log log = LogFactory.getLog(SubmitMapreduceJobAction.class);
    @Autowired
    private HadoopClientMap clientMap;
    @Autowired
    private JobRegistry jobRegistry;

    @Override
    public void executeAsync(final BlackboardServerHandler handler, final BlackboardJob bbJob) {
        final String jobName = (String)bbJob.getParameters().get("job.name");
        ClusterName clusterName = ClusterName.valueOf((String)((String)bbJob.getParameters().get("cluster")));
        try {
            JobProfile jobProfile = this.loadISJobConfiguration(jobName, bbJob.getParameters());
            if (!bbJob.getParameters().keySet().containsAll(jobProfile.getRequiredParams())) {
                String msg = "required parameter is missing for job: " + jobName + ", required params: " + jobProfile.getRequiredParams() + "\nmissing params: " + Sets.difference(jobProfile.getRequiredParams(), bbJob.getParameters().keySet());
                log.error((Object)msg);
                handler.failed(bbJob, (Throwable)new HadoopServiceException(msg));
                return;
            }
            JobConf jobConf = this.prepareJob(this.getConf(clusterName), jobName, jobProfile, bbJob.getParameters());
            this.logJobDetails(jobConf);
            RunningJob runningJob = this.clientMap.getJtClient(clusterName).submitJob(jobConf);
            final String jobId = this.newJobId(clusterName, runningJob.getID().getId());
            JobCompletion callback = new JobCompletion(){

                @Override
                public void done(Map<String, String> properties) {
                    bbJob.getParameters().putAll(properties);
                    log.info((Object)(String.valueOf(jobId) + " completed successfully"));
                    handler.done(bbJob);
                    SubmitMapreduceJobAction.this.decrementRunningJobs(jobName);
                }

                @Override
                public void failed(String msg, Throwable e) {
                    log.error((Object)msg);
                    handler.failed(bbJob, e);
                    SubmitMapreduceJobAction.this.decrementRunningJobs(jobName);
                }
            };
            this.jobRegistry.registerJob(HadoopJob.newInstance(jobId, clusterName, jobProfile, new MapreduceJobMonitor(runningJob, callback)));
            this.updateJobStatus(jobName);
            handler.ongoing(bbJob);
        }
        catch (Throwable e) {
            log.error((Object)("error executing hadoop job: " + jobName), e);
            handler.failed(bbJob, e);
        }
    }

    private JobConf prepareJob(Configuration configuration, String jobName, JobProfile jobProfile, Map<String, String> parameters) throws IOException, HadoopServiceException {
        log.info((Object)("creating job: " + jobName));
        JobConf jobConf = new JobConf(configuration);
        jobConf.setJobName(jobName);
        jobConf.set("dnet.mapred.job.description", jobProfile.getDescription());
        String jobLib = this.getJobLib(configuration, jobProfile);
        jobConf.setJar(new Path(jobLib).toString());
        this.set(jobConf, jobProfile.getJobDefinition());
        this.set(jobConf, parameters);
        ScanProperties scanProperties = jobProfile.getScanProperties();
        if (jobProfile.getRequiredParams().contains("hbase.mapreduce.inputtable") && scanProperties != null) {
            jobConf.set("hbase.mapreduce.scan", ScanFactory.getScan(scanProperties));
        }
        return jobConf;
    }

    private String getJobLib(Configuration configuration, JobProfile jobProfile) throws HadoopServiceException {
        String jobLib = this.getDefaultLibPath(configuration.get("fs.defaultFS"));
        if (jobProfile.getJobDefinition().containsKey("job.lib")) {
            jobLib = jobProfile.getJobDefinition().get("job.lib");
        }
        if (jobLib == null || jobLib.isEmpty()) {
            throw new HadoopServiceException("job.lib must refer to an absolute or relative HDFS path");
        }
        if (!jobLib.startsWith("hdfs://")) {
            jobLib = String.valueOf(configuration.get("fs.defaultFS")) + jobLib;
        }
        log.info((Object)("using job.lib: " + jobLib));
        return jobLib;
    }

    private void logJobDetails(JobConf jobConf) {
        StringWriter sw = new StringWriter();
        try {
            jobConf.writeXml((Writer)sw);
            if (log.isDebugEnabled()) {
                log.debug((Object)("\n" + IndentXmlString.apply((String)sw.toString())));
            }
        }
        catch (IOException e) {
            log.warn((Object)("unable to log job details: " + jobConf.getJobName()));
        }
    }

    private void set(JobConf jobConf, Map<String, String> properties) throws IOException {
        for (Map.Entry<String, String> e : properties.entrySet()) {
            if (this.checkHdfsProperty(e)) {
                String v = String.valueOf(jobConf.get("fs.defaultFS")) + e.getValue();
                e.setValue(v);
            }
            jobConf.set(e.getKey(), e.getValue());
        }
    }
}

