package eu.dnetlib.iis.common.lock;

import eu.dnetlib.iis.core.java.PortBindings;
import eu.dnetlib.iis.core.java.Process;
import eu.dnetlib.iis.core.java.porttype.PortType;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.Semaphore;
import org.apache.hadoop.conf.Configuration;
import org.apache.log4j.Logger;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;

/* loaded from: input_file:eu/dnetlib/iis/common/lock/LockManagingProcess.class */
public class LockManagingProcess implements Process {
    public static final String DEFAULT_ROOT_NODE = "/cache";
    public static final String NODE_SEPARATOR = "/";
    public static final String PARAM_ZK_SESSION_TIMEOUT = "zk_session_timeout";
    public static final String PARAM_ROOT_NODE = "root_node";
    public static final String PARAM_NODE_ID = "node_id";
    public static final String PARAM_LOCK_MODE = "mode";
    public static final int DEFAULT_SESSION_TIMEOUT = 60000;
    public static final Logger log = Logger.getLogger(LockManagingProcess.class);

    /* loaded from: input_file:eu/dnetlib/iis/common/lock/LockManagingProcess$LockMode.class */
    public enum LockMode {
        obtain,
        release
    }

    public Map<String, PortType> getInputPorts() {
        return Collections.emptyMap();
    }

    public Map<String, PortType> getOutputPorts() {
        return Collections.emptyMap();
    }

    public void run(PortBindings portBindings, Configuration configuration, Map<String, String> map) throws Exception {
        if (!map.containsKey(PARAM_NODE_ID)) {
            throw new Exception("node id not provided!");
        }
        if (!map.containsKey("mode")) {
            throw new Exception("lock mode not provided!");
        }
        String str = configuration.get("ha.zookeeper.quorum");
        if (str == null || str.isEmpty()) {
            throw new Exception("zookeeper quorum is unknown, invalid ha.zookeeper.quorum property value: " + str);
        }
        final ZooKeeper zooKeeper = new ZooKeeper(str, map.containsKey(PARAM_ZK_SESSION_TIMEOUT) ? Integer.valueOf(map.get(PARAM_ZK_SESSION_TIMEOUT)).intValue() : 60000, new Watcher() { // from class: eu.dnetlib.iis.common.lock.LockManagingProcess.1
            public void process(WatchedEvent watchedEvent) {
            }
        });
        String str2 = map.containsKey(PARAM_ROOT_NODE) ? map.get(PARAM_ROOT_NODE) : DEFAULT_ROOT_NODE;
        if (zooKeeper.exists(str2, false) == null) {
            log.warn("initializing root node: " + str2);
            zooKeeper.create(str2, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            log.warn("root node initialized");
        }
        final String generatePath = generatePath(map.get(PARAM_NODE_ID), str2);
        LockMode valueOf = LockMode.valueOf(map.get("mode"));
        final Semaphore semaphore = new Semaphore(1);
        semaphore.acquire();
        if (!LockMode.obtain.equals(valueOf)) {
            if (!LockMode.release.equals(valueOf)) {
                throw new Exception("unsupported lock mode: " + valueOf);
            }
            log.warn("removing lock" + generatePath + "...");
            zooKeeper.delete(generatePath, -1);
            log.warn("lock" + generatePath + " removed");
            return;
        }
        log.warn("trying to obtain lock: " + generatePath);
        if (zooKeeper.exists(generatePath, new Watcher() { // from class: eu.dnetlib.iis.common.lock.LockManagingProcess.2
            public void process(WatchedEvent watchedEvent) {
                if (Watcher.Event.EventType.NodeDeleted == watchedEvent.getType()) {
                    LockManagingProcess.log.warn(generatePath + " lock release detected");
                    LockManagingProcess.log.warn("creating new lock instance: " + generatePath + "...");
                    try {
                        zooKeeper.create(generatePath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
                        LockManagingProcess.log.warn("lock" + generatePath + " created");
                        semaphore.release();
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    } catch (KeeperException e2) {
                        throw new RuntimeException((Throwable) e2);
                    }
                }
            }
        }) == null) {
            log.warn("lock not found, creating new lock instance: " + generatePath);
            zooKeeper.create(generatePath, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
            log.warn("lock" + generatePath + " created");
            semaphore.release();
            return;
        }
        log.warn("waiting until lock is released");
        long currentTimeMillis = System.currentTimeMillis();
        semaphore.acquire();
        log.warn("lock released, waited for " + (System.currentTimeMillis() - currentTimeMillis) + " ms");
        semaphore.release();
    }

    public static final String generatePath(String str, String str2) {
        if (str != null) {
            return str2 + NODE_SEPARATOR + str.replace('/', '_');
        }
        return null;
    }
}
