package eu.dnetlib.dhp.collection.worker;

import com.fasterxml.jackson.databind.ObjectMapper;
import eu.dnetlib.collector.worker.model.ApiDescriptor;
import eu.dnetlib.dhp.application.ArgumentApplicationParser;
import eu.dnetlib.dhp.collection.plugin.CollectorPlugin;
import eu.dnetlib.dhp.collection.worker.utils.CollectorPluginFactory;
import eu.dnetlib.message.Message;
import eu.dnetlib.message.MessageManager;
import eu.dnetlib.message.MessageType;
import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Text;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:eu/dnetlib/dhp/collection/worker/DnetCollectorWorker.class */
public class DnetCollectorWorker {
    private static final Logger log = LoggerFactory.getLogger(DnetCollectorWorker.class);
    private final CollectorPluginFactory collectorPluginFactory;
    private final ArgumentApplicationParser argumentParser;
    private final MessageManager manager;

    public DnetCollectorWorker(CollectorPluginFactory collectorPluginFactory, ArgumentApplicationParser argumentApplicationParser, MessageManager messageManager) throws DnetCollectorException {
        this.collectorPluginFactory = collectorPluginFactory;
        this.argumentParser = argumentApplicationParser;
        this.manager = messageManager;
    }

    public void collect() throws DnetCollectorException {
        try {
            ApiDescriptor apiDescriptor = (ApiDescriptor) new ObjectMapper().readValue(this.argumentParser.get("apidescriptor"), ApiDescriptor.class);
            CollectorPlugin pluginByProtocol = this.collectorPluginFactory.getPluginByProtocol(apiDescriptor.getProtocol());
            String str = this.argumentParser.get("namenode");
            Configuration configuration = new Configuration();
            configuration.set("fs.defaultFS", str);
            configuration.set("fs.hdfs.impl", DistributedFileSystem.class.getName());
            configuration.set("fs.file.impl", LocalFileSystem.class.getName());
            System.setProperty("HADOOP_USER_NAME", this.argumentParser.get("userHDFS"));
            System.setProperty("hadoop.home.dir", "/");
            FileSystem.get(URI.create(str), configuration);
            Path path = new Path(this.argumentParser.get("hdfsPath"));
            log.info("Created path " + path.toString());
            HashMap hashMap = new HashMap();
            HashMap hashMap2 = new HashMap();
            AtomicInteger atomicInteger = new AtomicInteger(0);
            SequenceFile.Writer createWriter = SequenceFile.createWriter(configuration, new SequenceFile.Writer.Option[]{SequenceFile.Writer.file(path), SequenceFile.Writer.keyClass(IntWritable.class), SequenceFile.Writer.valueClass(Text.class)});
            Throwable th = null;
            try {
                try {
                    IntWritable intWritable = new IntWritable(atomicInteger.get());
                    Text text = new Text();
                    pluginByProtocol.collect(apiDescriptor).forEach(str2 -> {
                        intWritable.set(atomicInteger.getAndIncrement());
                        text.set(str2);
                        if (atomicInteger.get() % 10 == 0) {
                            try {
                                hashMap.put("ongoing", "" + atomicInteger.get());
                                log.debug("Sending message: " + this.manager.sendMessage(new Message(this.argumentParser.get("workflowId"), "Collection", MessageType.ONGOING, hashMap), this.argumentParser.get("rabbitOngoingQueue"), true, false));
                            } catch (Exception e) {
                                log.error("Error on sending message ", e);
                            }
                        }
                        try {
                            createWriter.append(intWritable, text);
                        } catch (IOException e2) {
                            throw new RuntimeException(e2);
                        }
                    });
                    if (createWriter != null) {
                        if (0 != 0) {
                            try {
                                createWriter.close();
                            } catch (Throwable th2) {
                                th.addSuppressed(th2);
                            }
                        } else {
                            createWriter.close();
                        }
                    }
                    hashMap.put("ongoing", "" + atomicInteger.get());
                    this.manager.sendMessage(new Message(this.argumentParser.get("workflowId"), "Collection", MessageType.ONGOING, hashMap), this.argumentParser.get("rabbitOngoingQueue"), true, false);
                    hashMap2.put("collected", "" + atomicInteger.get());
                    this.manager.sendMessage(new Message(this.argumentParser.get("workflowId"), "Collection", MessageType.REPORT, hashMap2), this.argumentParser.get("rabbitOngoingQueue"), true, false);
                    this.manager.close();
                } finally {
                }
            } finally {
            }
        } catch (Throwable th3) {
            throw new DnetCollectorException("Error on collecting ", th3);
        }
    }
}
