package org.gcube.data.streams.publishers;

import gr.uoa.di.madgik.grs.buffer.IBuffer;
import gr.uoa.di.madgik.grs.record.Record;
import gr.uoa.di.madgik.grs.writer.GRS2WriterException;
import gr.uoa.di.madgik.grs.writer.RecordWriter;
import java.net.URI;
import java.util.concurrent.TimeUnit;
import org.gcube.data.streams.Stream;
import org.gcube.data.streams.Utils;
import org.gcube.data.streams.exceptions.StreamPublishException;
import org.gcube.data.streams.exceptions.StreamSkipSignal;
import org.gcube.data.streams.exceptions.StreamStopSignal;
import org.gcube.data.streams.generators.Generator;
import org.gcube.data.streams.handlers.FaultHandler;
import org.gcube.data.streams.handlers.RethrowHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:WEB-INF/lib/streams-2.0.1-SNAPSHOT.jar:org/gcube/data/streams/publishers/RsPublisher.class */
public class RsPublisher<E> implements StreamPublisher {
    private static Logger log = LoggerFactory.getLogger(RsPublisher.class);
    private final Stream<E> stream;
    private final RecordFactory<E> factory;
    private RsTransport transport;
    private int bufferSize;
    private long timeout;
    private TimeUnit timeoutUnit;
    private boolean onDemand;
    private ThreadProvider provider;
    private FaultHandler handler;

    public RsPublisher(Stream<E> stream, Generator<E, String> generator) {
        this(stream, new RsStringRecordFactory(generator));
    }

    public RsPublisher(Stream<E> stream, RecordFactory<E> recordFactory) {
        this.bufferSize = RecordWriter.DefaultBufferCapacity;
        this.timeout = RecordWriter.DefaultInactivityTimeout;
        this.timeoutUnit = RecordWriter.DefaultInactivityTimeUnit;
        this.onDemand = true;
        this.provider = new ThreadProvider() { // from class: org.gcube.data.streams.publishers.RsPublisher.1
            @Override // org.gcube.data.streams.publishers.ThreadProvider
            public Thread newThread(Runnable runnable) {
                return new Thread(runnable);
            }
        };
        this.handler = new RethrowHandler();
        if (stream == null || recordFactory == null || recordFactory.definitions() == null) {
            throw new IllegalArgumentException("invalid or null inputs");
        }
        this.stream = stream;
        this.factory = recordFactory;
    }

    public void setBufferSize(int i) throws IllegalArgumentException {
        if (i <= 0) {
            throw new IllegalArgumentException("invalid empty buffer");
        }
        this.bufferSize = i;
    }

    public void setTimeout(long j, TimeUnit timeUnit) throws IllegalArgumentException {
        if (j <= 0 || timeUnit == null) {
            throw new IllegalArgumentException("invalid  timeout or time unit");
        }
        this.timeout = j;
        this.timeoutUnit = timeUnit;
    }

    public void setTransport(RsTransport rsTransport) {
        if (rsTransport == null) {
            throw new IllegalArgumentException("invalid null transport");
        }
        this.transport = rsTransport;
    }

    public void setOnDemand(boolean z) {
        this.onDemand = z;
    }

    public void setThreadProvider(ThreadProvider threadProvider) {
        if (threadProvider == null) {
            throw new IllegalArgumentException("invalid null provider");
        }
        this.provider = threadProvider;
    }

    public void setFaultHandler(FaultHandler faultHandler) {
        if (faultHandler == null) {
            throw new IllegalArgumentException("invalid null handler");
        }
        this.handler = faultHandler;
    }

    @Override // org.gcube.data.streams.publishers.StreamPublisher
    public URI publish() throws StreamPublishException {
        Utils.initialiseRS();
        if (this.transport == null) {
            this.transport = RsTransport.TCP;
        }
        try {
            RecordWriter<Record> recordWriter = new RecordWriter<>(this.transport.proxy(), this.factory.definitions(), this.bufferSize, RecordWriter.DefaultConcurrentPartialCapacity, RecordWriter.DefaultMirrorBufferFactor, this.timeout, this.timeoutUnit);
            URI locator = recordWriter.getLocator();
            this.provider.newThread(newFeeder(recordWriter, locator)).start();
            return locator;
        } catch (GRS2WriterException e) {
            throw new StreamPublishException("cannot publish stream as resultset", e);
        }
    }

    private Runnable newFeeder(final RecordWriter<Record> recordWriter, final URI uri) {
        return new Runnable() { // from class: org.gcube.data.streams.publishers.RsPublisher.2
            @Override // java.lang.Runnable
            public void run() {
                while (RsPublisher.this.stream.hasNext()) {
                    try {
                        RsPublisher.this.publishNextElementOrFailure(recordWriter);
                    } catch (RuntimeException e) {
                        if (RsPublisher.this.onDemand) {
                            break;
                        } else {
                            RsPublisher.this.close(recordWriter, uri);
                        }
                    }
                }
                RsPublisher.this.close(recordWriter, uri);
                RsPublisher.this.stream.close();
            }
        };
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void publishNextElementOrFailure(RecordWriter<Record> recordWriter) {
        try {
            try {
                try {
                    publish(recordWriter, nextRecord());
                } catch (StreamStopSignal e) {
                    throw e;
                }
            } catch (StreamSkipSignal e2) {
            } catch (RuntimeException e3) {
                publish(recordWriter, e3);
                if (!Utils.isContingency(e3)) {
                    throw e3;
                }
            }
        } catch (GRS2WriterException e4) {
            throw new RuntimeException((Throwable) e4);
        }
    }

    private Record nextRecord() {
        try {
            return this.factory.mo40newRecord(this.stream.next());
        } catch (RuntimeException e) {
            try {
                this.handler.handle(e);
                throw new StreamSkipSignal();
            } catch (StreamStopSignal e2) {
                throw e;
            }
        }
    }

    private void publish(RecordWriter<Record> recordWriter, Record record) throws GRS2WriterException {
        if (recordWriter.getStatus() != IBuffer.Status.Open || recordWriter.put(record, this.timeout, this.timeoutUnit)) {
            return;
        }
        log.trace("client is not consuming resulset, stop publishing");
        throw new GRS2WriterException();
    }

    private void publish(RecordWriter<Record> recordWriter, Throwable th) throws GRS2WriterException {
        if (recordWriter.getStatus() != IBuffer.Status.Open || recordWriter.put(th, this.timeout, this.timeoutUnit)) {
            return;
        }
        log.trace("client is not consuming resulset, stop publishing");
        throw new GRS2WriterException();
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void close(RecordWriter<Record> recordWriter, URI uri) {
        if (recordWriter.getStatus() == IBuffer.Status.Open) {
            try {
                recordWriter.close();
            } catch (GRS2WriterException e) {
                log.error("error closing resultset at " + uri, e);
            }
        }
    }
}
