/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.msro.workflows.sarasvati.loader;

import com.googlecode.sarasvati.Graph;
import com.googlecode.sarasvati.GraphProcess;
import com.googlecode.sarasvati.mem.MemEngine;
import com.googlecode.sarasvati.mem.MemGraphProcess;
import eu.dnetlib.enabling.common.Stoppable;
import eu.dnetlib.enabling.common.StoppableDetails;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import eu.dnetlib.enabling.locators.UniqueServiceLocator;
import eu.dnetlib.miscutils.datetime.DateUtils;
import eu.dnetlib.msro.rmi.MSROException;
import eu.dnetlib.msro.workflows.sarasvati.loader.GraphLoader;
import eu.dnetlib.msro.workflows.sarasvati.loader.ProfileToSarasvatiConverter;
import eu.dnetlib.msro.workflows.sarasvati.loader.WfProfileDescriptor;
import eu.dnetlib.msro.workflows.sarasvati.registry.GraphProcessRegistry;
import java.io.File;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import javax.annotation.Resource;
import org.apache.commons.lang.math.NumberUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.springframework.beans.factory.annotation.Required;

public class WorkflowExecutor
implements Stoppable {
    private static final Log log = LogFactory.getLog(WorkflowExecutor.class);
    private MemEngine engine;
    private GraphLoader graphLoader;
    private GraphProcessRegistry graphProcessRegistry;
    private ProfileToSarasvatiConverter profileToSarasvatiConverter;
    private ScheduledExecutorService queueConsumers;
    private boolean paused = false;
    private int maxRunningWorkflows = 10;
    @Resource
    private UniqueServiceLocator serviceLocator;
    private PriorityBlockingQueue<GraphProcess> pendingProcs = new PriorityBlockingQueue<GraphProcess>(20, new Comparator<GraphProcess>(){

        @Override
        public int compare(GraphProcess p1, GraphProcess p2) {
            int n1 = NumberUtils.toInt((String)p1.getEnv().getAttribute("system:priority"), (int)50);
            int n2 = NumberUtils.toInt((String)p2.getEnv().getAttribute("system:priority"), (int)50);
            return NumberUtils.compare((float)n1, (float)n2);
        }
    });

    public boolean isPaused() {
        return this.paused;
    }

    public void setPaused(boolean paused) {
        this.paused = paused;
    }

    public void init() {
        this.queueConsumers = Executors.newScheduledThreadPool(10);
        int period = 60;
        int step = 6;
        for (int i = 0; i < 10; ++i) {
            this.queueConsumers.scheduleAtFixedRate(new Runnable(){

                @Override
                public void run() {
                    if (WorkflowExecutor.this.isPaused()) {
                        return;
                    }
                    int running = WorkflowExecutor.this.graphProcessRegistry.countRunningWfs(false);
                    if (running >= WorkflowExecutor.this.getMaxRunningWorkflows()) {
                        if (log.isDebugEnabled()) {
                            log.debug((Object)("reached max running workflows: " + running));
                        }
                        return;
                    }
                    GraphProcess process = (GraphProcess)WorkflowExecutor.this.pendingProcs.poll();
                    if (process != null) {
                        log.info((Object)("Starting workflow: " + process));
                        long now = DateUtils.now();
                        process.getEnv().setAttribute("system:startDate", (Object)now);
                        process.getEnv().setAttribute("system:startHumanDate", DateUtils.calculate_ISO8601((long)now));
                        WorkflowExecutor.this.engine.startProcess(process);
                    } else {
                        log.debug((Object)"Process queue is empty");
                    }
                }
            }, i * 6, 60L, TimeUnit.SECONDS);
        }
    }

    public String startProcess(String profileId) throws Exception {
        return this.startProcess(profileId, null);
    }

    public String startProcess(String profileId, Map<String, Object> params) throws Exception {
        WfProfileDescriptor desc = this.profileToSarasvatiConverter.getSarasvatiWorkflow(profileId);
        if (this.isPaused()) {
            log.warn((Object)("Wf " + profileId + " not launched, because WorkflowExecutor is preparing for shutdown"));
            throw new MSROException("WorkflowExecutor is preparing for shutdown");
        }
        if (!desc.isReady()) {
            log.warn((Object)("Wf " + profileId + " not launched, because it is not ready to start"));
            throw new MSROException("Workflow " + profileId + " is not ready to start");
        }
        if (this.pendingProcs.size() > 100) {
            log.warn((Object)("Wf " + profileId + " not launched, Max number of pending procs reached: " + 100));
            throw new MSROException("Max number of pending procs reached: 100");
        }
        File tmpFile = File.createTempFile("wftfs", null);
        try {
            Graph graph = this.graphLoader.loadGraph(desc.getWorkflowXml());
            MemGraphProcess process = new MemGraphProcess(graph);
            String procId = this.graphProcessRegistry.registerProcess((GraphProcess)process);
            this.graphProcessRegistry.associateProcessWithResource((GraphProcess)process, profileId);
            process.getEnv().setAttribute("system:processId", procId);
            process.getEnv().setAttribute("system:profileId", profileId);
            process.getEnv().setAttribute("system:profileName", desc.getName());
            process.getEnv().setAttribute("system:wfName", graph.getName());
            process.getEnv().setAttribute("system:profileFamily", desc.getType());
            process.getEnv().setAttribute("system:priority", (Object)desc.getPriority());
            if (params != null) {
                for (Map.Entry<String, Object> e : params.entrySet()) {
                    process.getEnv().setAttribute(e.getKey(), e.getValue());
                }
            }
            log.info((Object)("Process " + process + " in queue, priority=" + desc.getPriority()));
            this.pendingProcs.put((GraphProcess)process);
            String string = procId;
            return string;
        }
        catch (Exception e) {
            log.error((Object)("Error parsing workflow xml: " + desc.getWorkflowXml()), (Throwable)e);
            throw new IllegalArgumentException("Error parsing workflow");
        }
        finally {
            tmpFile.delete();
        }
    }

    public void startMetaWorkflow(String id, boolean manual) throws Exception {
        String query = "/*[.//RESOURCE_IDENTIFIER/@value='" + id + "']//CONFIGURATION[@status='EXECUTABLE']/WORKFLOW/@id/string()";
        ISLookUpService lookup = (ISLookUpService)this.serviceLocator.getService(ISLookUpService.class);
        List list = lookup.quickSearchProfile(query);
        if (list == null || list.isEmpty()) {
            throw new MSROException("Metaworkflow " + id + " not launched");
        }
        for (String wfId : list) {
            String q = "/*[.//RESOURCE_IDENTIFIER/@value='" + wfId + "']//CONFIGURATION/@start/string()";
            if (manual || lookup.getResourceProfileByQuery(q).equals("auto")) {
                this.startProcess(wfId);
                continue;
            }
            log.warn((Object)("Worflow " + wfId + " can not be launched AUTOMATICALLY"));
        }
    }

    public GraphLoader getGraphLoader() {
        return this.graphLoader;
    }

    @Required
    public void setGraphLoader(GraphLoader graphLoader) {
        this.graphLoader = graphLoader;
    }

    public MemEngine getEngine() {
        return this.engine;
    }

    @Required
    public void setEngine(MemEngine engine) {
        this.engine = engine;
    }

    public GraphProcessRegistry getGraphProcessRegistry() {
        return this.graphProcessRegistry;
    }

    @Required
    public void setGraphProcessRegistry(GraphProcessRegistry graphProcessRegistry) {
        this.graphProcessRegistry = graphProcessRegistry;
    }

    public ProfileToSarasvatiConverter getProfileToSarasvatiConverter() {
        return this.profileToSarasvatiConverter;
    }

    @Required
    public void setProfileToSarasvatiConverter(ProfileToSarasvatiConverter profileToSarasvatiConverter) {
        this.profileToSarasvatiConverter = profileToSarasvatiConverter;
    }

    public void stop() {
        this.paused = true;
    }

    public void resume() {
        this.paused = false;
    }

    public StoppableDetails getStopDetails() {
        int count = this.graphProcessRegistry.countRunningWfs();
        StoppableDetails.StopStatus status = StoppableDetails.StopStatus.RUNNING;
        if (this.isPaused()) {
            status = count == 0 ? StoppableDetails.StopStatus.STOPPED : StoppableDetails.StopStatus.STOPPING;
        }
        this.graphProcessRegistry.listIdentifiers();
        return new StoppableDetails("D-NET workflow manager", "Running workflows: " + count, status);
    }

    public int getMaxRunningWorkflows() {
        return this.maxRunningWorkflows;
    }

    public void setMaxRunningWorkflows(int maxRunningWorkflows) {
        this.maxRunningWorkflows = maxRunningWorkflows;
    }
}

