package org.gcube.contentmanagement.contentmanager.state;

import gr.uoa.di.madgik.grs.buffer.IBuffer;
import gr.uoa.di.madgik.grs.proxy.tcp.TCPWriterProxy;
import gr.uoa.di.madgik.grs.record.GenericRecord;
import gr.uoa.di.madgik.grs.record.field.Field;
import gr.uoa.di.madgik.grs.record.field.StringField;
import gr.uoa.di.madgik.grs.writer.GRS2WriterException;
import gr.uoa.di.madgik.grs.writer.RecordWriter;
import java.net.URI;
import java.util.Calendar;
import java.util.concurrent.TimeUnit;
import org.gcube.common.UpdateNotificationType;
import org.gcube.common.UpdateNotificationTypeWrapper;
import org.gcube.common.core.contexts.GHNContext;
import org.gcube.common.core.state.GCUBEWSLiteResource;
import org.gcube.common.core.utils.events.GCUBEConsumer;
import org.gcube.common.core.utils.events.GCUBEEvent;
import org.gcube.contentmanagement.contentmanager.context.ServiceContext;
import org.gcube.contentmanagement.contentmanager.plugin.delegates.Collection;
import org.gcube.contentmanagement.contentmanager.plugin.delegates.ManagerDelegate;
import org.gcube.contentmanagement.contentmanager.state.ResultSerialisers;
import org.gcube.contentmanagement.contentmanager.stubs.calls.Constants;
import org.gcube.contentmanagement.contentmanager.stubs.calls.iterators.RemoteIterator;
import org.globus.wsrf.ResourceException;
import org.globus.wsrf.impl.ReflectionResourceProperty;
import org.globus.wsrf.impl.SimpleTopic;

/* loaded from: input_file:org/gcube/contentmanagement/contentmanager/state/CollectionManager.class */
public abstract class CollectionManager extends GCUBEWSLiteResource<CollectionResource> implements GCUBEConsumer<Collection.ChangeTopic, Void> {
    protected static final int PART_SIZE = 10;
    private static final int DEFAULT_CHANGE_COALESCING_DELAY = 60;
    private Thread notifier;
    private SimpleTopic updateTopic = new SimpleTopic(Constants.UPDATETOPIC_QNAME);

    protected void initialise(Object... objArr) throws Exception {
        subscribeForChange();
    }

    protected abstract ManagerDelegate getDelegate() throws ResourceException;

    protected String[] getPropertyNames() {
        return new String[0];
    }

    protected void initialiseContainers() throws Exception {
        super.initialiseContainers();
        getResourcePropertySet().add(new ReflectionResourceProperty(Constants.COLLECTIONID_RP, this));
        getResourcePropertySet().add(new ReflectionResourceProperty(Constants.TYPE_RP, this));
        getResourcePropertySet().add(new ReflectionResourceProperty(Constants.PLUGIN_RP, this));
        getResourcePropertySet().add(new ReflectionResourceProperty(Constants.CARDINALITY_RP, this));
        getResourcePropertySet().add(new ReflectionResourceProperty(Constants.LAST_UPDATE_RP, this));
        getTopicList().addTopic(this.updateTopic);
    }

    public String getPlugin() throws ResourceException {
        return ((CollectionResource) getLocalResource()).getPluginName();
    }

    public String getCollectionID() throws ResourceException {
        return ((CollectionResource) getLocalResource()).getCollection().getID();
    }

    public long getCardinality() throws ResourceException {
        return ((CollectionResource) getLocalResource()).getCollection().getCardinality();
    }

    void onReuse() {
    }

    public Calendar getLastUpdate() throws ResourceException {
        return ((CollectionResource) getLocalResource()).getCollection().getLastUpdate();
    }

    public abstract String getType();

    /* JADX INFO: Access modifiers changed from: protected */
    public void subscribeForChange() throws ResourceException {
        this.logger.trace(getID() + " is subscribing for change with delegate");
        ((CollectionResource) getLocalResource()).getCollection().subscribe(this);
    }

    public <T1 extends Collection.ChangeTopic, P1 extends Void> void onEvent(GCUBEEvent<T1, P1>... gCUBEEventArr) {
        if (this.notifier == null || !this.notifier.isAlive()) {
            this.notifier = new Thread(getClass().getSimpleName() + "-delayed") { // from class: org.gcube.contentmanagement.contentmanager.state.CollectionManager.1
                @Override // java.lang.Thread, java.lang.Runnable
                public void run() {
                    try {
                        CollectionManager.this.logger.trace("will notify change in 60 second/s");
                        TimeUnit.SECONDS.sleep(60L);
                        CollectionManager.this.logger.trace("sending change notification (" + CollectionManager.this.getID() + ")");
                        if (GHNContext.getContext().getMode() == GHNContext.Mode.CONNECTED) {
                            CollectionManager.this.updateTopic.notify(new UpdateNotificationTypeWrapper(new UpdateNotificationType(CollectionManager.this.getCardinality(), CollectionManager.this.getCollectionID(), CollectionManager.this.getLastUpdate())));
                        }
                    } catch (InterruptedException e) {
                        CollectionManager.this.logger.error("unexpected interruption", e);
                    } catch (Exception e2) {
                        CollectionManager.this.logger.warn(CollectionManager.this.getID() + " could not send change notification", e2);
                    }
                }
            };
            this.notifier.start();
        }
    }

    /* JADX INFO: Access modifiers changed from: protected */
    public <T> URI toRS(final RemoteIterator<T> remoteIterator, final ResultSerialisers.ResultSerialiser<T> resultSerialiser, int i) throws Exception {
        final RecordWriter recordWriter = new RecordWriter(new TCPWriterProxy(), Constants.UNTYPED_RECORD, i, 2, 0.5f, 60L, TimeUnit.SECONDS);
        URI locator = recordWriter.getLocator();
        ServiceContext.getContext().newServiceThread(new Runnable() { // from class: org.gcube.contentmanagement.contentmanager.state.CollectionManager.2
            @Override // java.lang.Runnable
            public void run() {
                int i2 = 1;
                while (remoteIterator.hasNext()) {
                    try {
                    } catch (GRS2WriterException e) {
                        CollectionManager.this.logger.warn("error adding node to the resultset (stopping)", e);
                    } catch (Exception e2) {
                        CollectionManager.this.logger.warn("error adding node to the resultset (continuing) ", e2);
                    }
                    if (recordWriter.getStatus() != IBuffer.Status.Open) {
                        break;
                    }
                    Object next = remoteIterator.next();
                    GenericRecord genericRecord = new GenericRecord();
                    genericRecord.setFields(new Field[]{new StringField(resultSerialiser.serialise(next))});
                    CollectionManager.this.logger.trace("writing element n. " + i2 + " in the RS");
                    if (!recordWriter.put(genericRecord, 60L, TimeUnit.SECONDS)) {
                        CollectionManager.this.logger.warn("the resultset buffer is full and the timeout has expired");
                        break;
                    }
                    i2++;
                }
                if (recordWriter.getStatus() != IBuffer.Status.Dispose) {
                    try {
                        CollectionManager.this.logger.warn("closing resultset writer");
                        recordWriter.close();
                    } catch (Exception e3) {
                        CollectionManager.this.logger.warn("error closing the resultset", e3);
                    }
                }
                remoteIterator.close();
            }
        }).start();
        return locator;
    }
}
