package eu.dnetlib.dhp.oa.graph.raw;

import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.MDStoreInfo;
import eu.dnetlib.dhp.common.MdstoreClient;
import eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication;
import eu.dnetlib.dhp.utils.DHPUtils;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.function.Consumer;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:eu/dnetlib/dhp/oa/graph/raw/MigrateMongoMdstoresApplication.class */
public class MigrateMongoMdstoresApplication extends AbstractMigrationApplication implements Closeable {
    private static final Logger log = LoggerFactory.getLogger(MigrateMongoMdstoresApplication.class);
    private final MdstoreClient mdstoreClient;

    private static List<MDStoreInfo> snapshotsMDStores(MdstoreClient mdstoreClient, String str, String str2, String str3) {
        return mdstoreClient.mdStoreWithTimestamp(str, str2, str3);
    }

    private static MDStoreInfo extractPath(String str, String str2) {
        int indexOf = str.indexOf(str2);
        if (indexOf <= 0) {
            return null;
        }
        String[] split = str.substring(indexOf).split("/");
        if (split.length <= 2) {
            return null;
        }
        return new MDStoreInfo(split[split.length - 2], (String) null, Long.valueOf(Long.parseLong(split[split.length - 1])));
    }

    private static Map<String, MDStoreInfo> hdfsMDStoreInfo(FileSystem fileSystem, String str) throws IOException {
        HashMap hashMap = new HashMap();
        Path path = new Path(str);
        if (!fileSystem.exists(path)) {
            return hashMap;
        }
        RemoteIterator listFiles = fileSystem.listFiles(path, true);
        while (listFiles.hasNext()) {
            MDStoreInfo extractPath = extractPath(((LocatedFileStatus) listFiles.next()).getPath().toString(), str);
            if (extractPath != null) {
                hashMap.put(extractPath.getMdstore(), extractPath);
            }
        }
        return hashMap;
    }

    private static String createMDStoreDir(String str, String str2) {
        return str.endsWith("/") ? str + str2 : String.format("%s/%s", str, str2);
    }

    public static void main(String[] strArr) throws Exception {
        ArgumentApplicationParser argumentApplicationParser = new ArgumentApplicationParser(IOUtils.toString((InputStream) Objects.requireNonNull(MigrateMongoMdstoresApplication.class.getResourceAsStream("/eu/dnetlib/dhp/oa/graph/migrate_mongo_mstores_parameters.json"))));
        argumentApplicationParser.parseArgument(strArr);
        String str = argumentApplicationParser.get("mongoBaseUrl");
        String str2 = argumentApplicationParser.get("mongoDb");
        String str3 = argumentApplicationParser.get("mdFormat");
        String str4 = argumentApplicationParser.get("mdLayout");
        String str5 = argumentApplicationParser.get("mdInterpretation");
        String str6 = argumentApplicationParser.get("hdfsPath");
        FileSystem fileSystem = FileSystem.get(DHPUtils.getHadoopConfiguration(argumentApplicationParser.get("nameNode")));
        snapshotsMDStores(new MdstoreClient(str, str2), str3, str4, str5).stream().filter(mDStoreInfo -> {
            return mDStoreInfo.getLatestTimestamp() != null;
        }).forEach(consumeMDStore(str3, str4, str5, str6, fileSystem, str, str2, hdfsMDStoreInfo(fileSystem, str6)));
    }

    private static Consumer<MDStoreInfo> consumeMDStore(String str, String str2, String str3, String str4, FileSystem fileSystem, String str5, String str6, Map<String, MDStoreInfo> map) {
        return mDStoreInfo -> {
            if (!map.containsKey(mDStoreInfo.getMdstore())) {
                log.info("Adding store {}", mDStoreInfo.getMdstore());
                try {
                    synchMDStoreIntoHDFS(str, str2, str3, str4, fileSystem, str5, str6, mDStoreInfo);
                    return;
                } catch (IOException e) {
                    throw new RuntimeException(e);
                }
            }
            MDStoreInfo mDStoreInfo = (MDStoreInfo) map.get(mDStoreInfo.getMdstore());
            if (mDStoreInfo.getLatestTimestamp().longValue() > mDStoreInfo.getLatestTimestamp().longValue()) {
                log.info("Updating MDStore {}", mDStoreInfo.getMdstore());
                String createMDStoreDir = createMDStoreDir(createMDStoreDir(str4, mDStoreInfo.getMdstore()), mDStoreInfo.getLatestTimestamp().toString());
                try {
                    synchMDStoreIntoHDFS(str, str2, str3, str4, fileSystem, str5, str6, mDStoreInfo);
                    log.info("deleting {}", createMDStoreDir);
                    fileSystem.delete(new Path(createMDStoreDir), true);
                } catch (IOException e2) {
                    throw new RuntimeException("Unable to synch and remove path " + createMDStoreDir, e2);
                }
            }
        };
    }

    private static void synchMDStoreIntoHDFS(String str, String str2, String str3, String str4, FileSystem fileSystem, String str5, String str6, MDStoreInfo mDStoreInfo) throws IOException {
        String createMDStoreDir = createMDStoreDir(str4, mDStoreInfo.getMdstore());
        fileSystem.mkdirs(new Path(createMDStoreDir));
        String createMDStoreDir2 = createMDStoreDir(createMDStoreDir, mDStoreInfo.getLatestTimestamp().toString());
        try {
            MigrateMongoMdstoresApplication migrateMongoMdstoresApplication = new MigrateMongoMdstoresApplication(str5, str6, createMDStoreDir2);
            try {
                migrateMongoMdstoresApplication.execute(mDStoreInfo.getCurrentId(), str, str2, str3);
                migrateMongoMdstoresApplication.close();
                log.info(String.format("Synchronized mdStore id : %s into path %s", mDStoreInfo.getMdstore(), createMDStoreDir2));
            } finally {
            }
        } catch (Exception e) {
            throw new RuntimeException(String.format("Error on sync mdstore with ID %s into path %s", mDStoreInfo.getMdstore(), createMDStoreDir2), e);
        }
    }

    public MigrateMongoMdstoresApplication(String str, String str2, String str3) throws Exception {
        super(str3);
        this.mdstoreClient = new MdstoreClient(str, str2);
    }

    public void execute(String str, String str2, String str3, String str4) {
        Iterator it = this.mdstoreClient.listRecords(str).iterator();
        while (it.hasNext()) {
            emit((String) it.next(), String.format("%s-%s-%s", str2, str3, str4));
        }
    }

    @Override // eu.dnetlib.dhp.oa.graph.raw.common.AbstractMigrationApplication, java.io.Closeable, java.lang.AutoCloseable
    public void close() throws IOException {
        super.close();
        this.mdstoreClient.close();
    }
}
