package eu.dnetlib.dhp.collection.orcid;

import eu.dnetlib.dhp.actionmanager.Constants;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.common.collection.HttpClientParams;
import eu.dnetlib.dhp.utils.DHPUtils;
import java.io.BufferedInputStream;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.commons.compress.compressors.gzip.GzipCompressorInputStream;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:eu/dnetlib/dhp/collection/orcid/OrcidGetUpdatesFile.class */
public class OrcidGetUpdatesFile {
    private static Logger log = LoggerFactory.getLogger(OrcidGetUpdatesFile.class);

    public static void main(String[] strArr) throws Exception {
        ArgumentApplicationParser argumentApplicationParser = new ArgumentApplicationParser(IOUtils.toString((InputStream) Objects.requireNonNull(OrcidGetUpdatesFile.class.getResourceAsStream("/eu/dnetlib/dhp/collection/orcid/download_orcid_update_parameter.json"))));
        argumentApplicationParser.parseArgument(strArr);
        String str = argumentApplicationParser.get("namenode");
        log.info("got variable namenode: {}", str);
        String str2 = argumentApplicationParser.get("master");
        log.info("got variable master: {}", str2);
        String str3 = argumentApplicationParser.get("targetPath");
        log.info("got variable targetPath: {}", str3);
        String str4 = argumentApplicationParser.get("apiURL");
        log.info("got variable apiURL: {}", str4);
        String str5 = argumentApplicationParser.get("accessToken");
        log.info("got variable accessToken: {}", str5);
        String str6 = argumentApplicationParser.get("graphPath");
        log.info("got variable graphPath: {}", str6);
        String string = ((Row) SparkSession.builder().appName(OrcidGetUpdatesFile.class.getName()).master(str2).getOrCreate().read().load(str6 + "/Authors").selectExpr(new String[]{"max(lastModifiedDate)"}).first()).getString(0);
        log.info("latest date is {}", string);
        new OrcidGetUpdatesFile().readTar(FileSystem.get(DHPUtils.getHadoopConfiguration(str)), str5, str4, str3, string);
    }

    private SequenceFile.Writer createFile(Path path, FileSystem fileSystem) throws IOException {
        return SequenceFile.createWriter(fileSystem.getConf(), new SequenceFile.Writer.Option[]{SequenceFile.Writer.file(path), SequenceFile.Writer.keyClass(Text.class), SequenceFile.Writer.valueClass(Text.class)});
    }

    private ORCIDWorker createWorker(String str, String str2, BlockingQueue<String> blockingQueue, String str3, FileSystem fileSystem) throws Exception {
        return ORCIDWorker.builder().withId(str).withEmployments(createFile(new Path(String.format("%s/employments_%s", str2, str)), fileSystem)).withSummary(createFile(new Path(String.format("%s/summary_%s", str2, str)), fileSystem)).withWorks(createFile(new Path(String.format("%s/works_%s", str2, str)), fileSystem)).withAccessToken(str3).withBlockingQueue(blockingQueue).build();
    }

    public void readTar(FileSystem fileSystem, String str, String str2, String str3, String str4) throws Exception {
        HttpURLConnection httpURLConnection = (HttpURLConnection) new URL(str2).openConnection();
        HttpClientParams httpClientParams = new HttpClientParams();
        httpURLConnection.setInstanceFollowRedirects(false);
        httpURLConnection.setReadTimeout(httpClientParams.getReadTimeOut() * 1000);
        httpURLConnection.setConnectTimeout(httpClientParams.getConnectTimeOut() * 1000);
        if (httpURLConnection.getResponseCode() <= 199 || httpURLConnection.getResponseCode() >= 300) {
            return;
        }
        InputStream inputStream = httpURLConnection.getInputStream();
        Path path = new Path("/tmp/orcid_updates.tar.gz");
        FSDataOutputStream create = fileSystem.create(path, true);
        IOUtils.copy(inputStream, (OutputStream) create);
        create.flush();
        create.close();
        TarArchiveInputStream tarArchiveInputStream = new TarArchiveInputStream(new GzipCompressorInputStream(new BufferedInputStream(fileSystem.open(path).getWrappedStream())));
        ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(3000);
        ArrayList arrayList = new ArrayList();
        for (int i = 0; i < 22; i++) {
            arrayList.add(createWorker("" + i, str3, arrayBlockingQueue, str, fileSystem));
        }
        arrayList.forEach((v0) -> {
            v0.start();
        });
        while (true) {
            TarArchiveEntry nextTarEntry = tarArchiveInputStream.getNextTarEntry();
            if (nextTarEntry == null) {
                break;
            }
            if (nextTarEntry.isFile()) {
                BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(tarArchiveInputStream));
                System.out.println(bufferedReader.readLine());
                bufferedReader.lines().map(str5 -> {
                    return str5.split(Constants.DEFAULT_DELIMITER);
                }).filter(strArr -> {
                    return StringUtils.compare(strArr[3].substring(0, 10), str4) > 0;
                }).map(strArr2 -> {
                    return strArr2[0];
                }).forEach(str6 -> {
                    try {
                        arrayBlockingQueue.put(str6);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                });
            }
        }
        for (int i2 = 0; i2 < 22; i2++) {
            arrayBlockingQueue.put(ORCIDWorker.JOB_COMPLETE);
        }
        Iterator it = arrayList.iterator();
        while (it.hasNext()) {
            ((ORCIDWorker) it.next()).join();
        }
    }
}
