package eu.dnetlib.dhp.collection.plugin.mongodb;

import eu.dnetlib.dhp.aggregation.common.AggregatorReport;
import eu.dnetlib.dhp.collection.ApiDescriptor;
import eu.dnetlib.dhp.collection.CollectorException;
import eu.dnetlib.dhp.collection.plugin.CollectorPlugin;
import eu.dnetlib.dhp.utils.DHPUtils;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.Charset;
import java.util.Optional;
import java.util.stream.Stream;
import java.util.zip.GZIPInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

/* loaded from: input_file:eu/dnetlib/dhp/collection/plugin/mongodb/MongoDbDumpCollectorPlugin.class */
public class MongoDbDumpCollectorPlugin implements CollectorPlugin {
    public static final String PATH_PARAM = "path";
    public static final String BODY_JSONPATH = "$.body";
    public FileSystem fileSystem;

    public MongoDbDumpCollectorPlugin(FileSystem fileSystem) {
        this.fileSystem = fileSystem;
    }

    @Override // eu.dnetlib.dhp.collection.plugin.CollectorPlugin
    public Stream<String> collect(ApiDescriptor apiDescriptor, AggregatorReport aggregatorReport) throws CollectorException {
        Path path = (Path) Optional.ofNullable(apiDescriptor.getParams().get(PATH_PARAM)).map(Path::new).orElseThrow(() -> {
            return new CollectorException(String.format("missing parameter '%s'", PATH_PARAM));
        });
        try {
            if (this.fileSystem.exists(path)) {
                return new BufferedReader(new InputStreamReader(new GZIPInputStream(this.fileSystem.open(path)), Charset.defaultCharset())).lines().map(str -> {
                    return DHPUtils.getJPathString(BODY_JSONPATH, str);
                });
            }
            throw new CollectorException("path does not exist: " + path.toString());
        } catch (IOException e) {
            throw new CollectorException(e);
        }
    }
}
