/*
 * Decompiled with CFR 0.152.
 */
package eu.dnetlib.dhp.actionmanager.migration;

import com.google.common.base.Splitter;
import com.google.common.collect.Lists;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.utils.ISLookupClientFactory;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpException;
import eu.dnetlib.enabling.is.lookup.rmi.ISLookUpService;
import java.io.File;
import java.io.FileOutputStream;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
import java.util.stream.Collectors;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.tools.DistCp;
import org.apache.hadoop.tools.DistCpOptions;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MigrateActionSet {
    private static final Logger log = LoggerFactory.getLogger(MigrateActionSet.class);
    private static final String SEPARATOR = "/";
    private static final String TARGET_PATHS = "target_paths";
    private static final String RAWSET_PREFIX = "rawset_";

    public static void main(String[] args) throws Exception {
        ArgumentApplicationParser parser = new ArgumentApplicationParser(IOUtils.toString((InputStream)MigrateActionSet.class.getResourceAsStream("/eu/dnetlib/dhp/actionmanager/migration/migrate_actionsets_parameters.json")));
        parser.parseArgument(args);
        new MigrateActionSet().run(parser);
    }

    private void run(ArgumentApplicationParser parser) throws Exception {
        String isLookupUrl = parser.get("isLookupUrl");
        String sourceNN = parser.get("sourceNameNode");
        String targetNN = parser.get("targetNameNode");
        String workDir = parser.get("workingDirectory");
        Integer distcp_num_maps = Integer.parseInt(parser.get("distcp_num_maps"));
        String distcp_memory_mb = parser.get("distcp_memory_mb");
        String distcp_task_timeout = parser.get("distcp_task_timeout");
        String transform_only_s = parser.get("transform_only");
        log.info("transform only param: {}", (Object)transform_only_s);
        Boolean transformOnly = Boolean.valueOf(parser.get("transform_only"));
        log.info("transform only: {}", (Object)transformOnly);
        ISLookUpService isLookUp = ISLookupClientFactory.getLookUpService((String)isLookupUrl);
        Configuration conf = this.getConfiguration(distcp_task_timeout, distcp_memory_mb, distcp_num_maps);
        FileSystem targetFS = FileSystem.get((Configuration)conf);
        Configuration sourceConf = this.getConfiguration(distcp_task_timeout, distcp_memory_mb, distcp_num_maps);
        sourceConf.set("fs.defaultFS", sourceNN);
        FileSystem sourceFS = FileSystem.get((Configuration)sourceConf);
        Properties props = new Properties();
        ArrayList<Path> targetPaths = new ArrayList<Path>();
        List<Path> sourcePaths = this.getSourcePaths(sourceNN, isLookUp);
        log.info("paths to process:\n{}", (Object)sourcePaths.stream().map(p -> p.toString()).collect(Collectors.joining("\n")));
        for (Path source : sourcePaths) {
            if (!sourceFS.exists(source)) {
                log.warn("skipping unexisting path: {}", (Object)source);
                continue;
            }
            LinkedList pathQ = Lists.newLinkedList((Iterable)Splitter.on((String)SEPARATOR).split((CharSequence)source.toUri().getPath()));
            String rawSet = (String)pathQ.pollLast();
            log.info("got RAWSET: {}", (Object)rawSet);
            if (!StringUtils.isNotBlank((CharSequence)rawSet) || !rawSet.startsWith(RAWSET_PREFIX)) continue;
            String actionSetDirectory = (String)pathQ.pollLast();
            Path targetPath = new Path(targetNN + workDir + SEPARATOR + actionSetDirectory + SEPARATOR + rawSet);
            log.info("using TARGET PATH: {}", (Object)targetPath);
            if (!transformOnly.booleanValue()) {
                if (targetFS.exists(targetPath)) {
                    targetFS.delete(targetPath, true);
                }
                this.runDistcp(distcp_num_maps, distcp_memory_mb, distcp_task_timeout, conf, source, targetPath);
            }
            targetPaths.add(targetPath);
        }
        String targetPathsCsv = targetPaths.stream().map(p -> p.toString()).collect(Collectors.joining(","));
        props.setProperty(TARGET_PATHS, targetPathsCsv);
        File file = new File(System.getProperty("oozie.action.output.properties"));
        try (FileOutputStream os = new FileOutputStream(file);){
            props.store(os, "");
        }
        System.out.println(file.getAbsolutePath());
    }

    private void runDistcp(Integer distcp_num_maps, String distcp_memory_mb, String distcp_task_timeout, Configuration conf, Path source, Path targetPath) throws Exception {
        DistCpOptions op = new DistCpOptions(source, targetPath);
        op.setMaxMaps(distcp_num_maps.intValue());
        op.preserve(DistCpOptions.FileAttribute.BLOCKSIZE);
        op.preserve(DistCpOptions.FileAttribute.REPLICATION);
        op.preserve(DistCpOptions.FileAttribute.CHECKSUMTYPE);
        int res = ToolRunner.run((Tool)new DistCp(conf, op), (String[])new String[]{"-Dmapred.task.timeout=" + distcp_task_timeout, "-Dmapreduce.map.memory.mb=" + distcp_memory_mb, "-pb", "-m " + distcp_num_maps, source.toString(), targetPath.toString()});
        if (res != 0) {
            throw new RuntimeException(String.format("distcp exited with code %s", res));
        }
    }

    private Configuration getConfiguration(String distcp_task_timeout, String distcp_memory_mb, Integer distcp_num_maps) {
        Configuration conf = new Configuration();
        conf.set("dfs.webhdfs.socket.connect-timeout", distcp_task_timeout);
        conf.set("dfs.webhdfs.socket.read-timeout", distcp_task_timeout);
        conf.set("dfs.http.client.retry.policy.enabled", "true");
        conf.set("mapred.task.timeout", distcp_task_timeout);
        conf.set("mapreduce.map.memory.mb", distcp_memory_mb);
        conf.set("mapred.map.tasks", String.valueOf(distcp_num_maps));
        return conf;
    }

    private List<Path> getSourcePaths(String sourceNN, ISLookUpService isLookUp) throws ISLookUpException {
        String XQUERY = "distinct-values(\nlet $basePath := collection('/db/DRIVER/ServiceResources/ActionManagerServiceResourceType')//SERVICE_PROPERTIES/PROPERTY[@key = 'basePath']/@value/string()\nfor $x in collection('/db/DRIVER/ActionManagerSetDSResources/ActionManagerSetDSResourceType') \nlet $setDir := $x//SET/@directory/string()\nlet $rawSet := $x//RAW_SETS/LATEST/@id/string()\nreturn concat($basePath, '/', $setDir, '/', $rawSet))";
        log.info(String.format("running xquery:\n%s", XQUERY));
        return isLookUp.quickSearchProfile(XQUERY).stream().map(p -> sourceNN + p).map(Path::new).collect(Collectors.toList());
    }
}

