/*
 * Decompiled with CFR 0.152.
 */
package org.gcube.documentstore.records.aggregation;

import java.lang.reflect.Constructor;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Calendar;
import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.ServiceLoader;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import org.gcube.documentstore.exception.NotAggregatableRecordsExceptions;
import org.gcube.documentstore.persistence.ExecutorUtils;
import org.gcube.documentstore.persistence.PersistenceBackend;
import org.gcube.documentstore.persistence.PersistenceBackendConfiguration;
import org.gcube.documentstore.persistence.PersistenceExecutor;
import org.gcube.documentstore.records.AggregatedRecord;
import org.gcube.documentstore.records.Record;
import org.gcube.documentstore.records.RecordUtility;
import org.gcube.documentstore.records.aggregation.BufferAggregationScheduler;
import org.gcube.documentstore.records.implementation.ConfigurationGetPropertyValues;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public abstract class AggregationScheduler
implements Runnable {
    public static Logger logger = LoggerFactory.getLogger(AggregationScheduler.class);
    protected int totalBufferedRecords = 0;
    protected Map<String, List<Record>> bufferedRecords = new HashMap<String, List<Record>>();
    protected final PersistenceExecutor persistenceExecutor;
    public static int INITIAL_DELAY = 30;
    public static int DELAY = 30;
    public static final TimeUnit TIME_UNIT = TimeUnit.MINUTES;
    public static final String AGGREGATION_SCHEDULER_TIME = "AggregationSchedulerTime";
    public static final String BUFFER_RECORD_TIME = "BufferRecordTime";
    public static final String BUFFER_RECORD_NUMBER = "BufferRecordNumber";
    public static long TIME_RELOAD_CONFIGURATION = 43200000L;
    public static long TIME_LOAD_CONFIGURATION = 0L;
    protected static int MAX_RECORDS_NUMBER = 100;
    protected static long OLD_RECORD_MAX_TIME_ELAPSED = 1800000L;

    public static AggregationScheduler newInstance(PersistenceExecutor persistenceExecutor) {
        return new BufferAggregationScheduler(persistenceExecutor);
    }

    public static AggregationScheduler newInstance(PersistenceExecutor persistenceExecutor, PersistenceBackendConfiguration configuration) throws NumberFormatException, Exception {
        ConfigurationGetPropertyValues properties = new ConfigurationGetPropertyValues();
        Properties prop = properties.getPropValues();
        Integer delay = 0;
        Integer maxRecordNumber = 0;
        Integer maxRecordTime = 0;
        if (prop == null) {
            logger.trace("Configuration from service end point");
            delay = Integer.parseInt(configuration.getProperty(AGGREGATION_SCHEDULER_TIME));
            maxRecordTime = Integer.parseInt(configuration.getProperty(BUFFER_RECORD_TIME));
            maxRecordNumber = Integer.parseInt(configuration.getProperty(BUFFER_RECORD_NUMBER));
        } else {
            logger.trace("Configuration from properties file");
            delay = Integer.parseInt(prop.getProperty("delay"));
            maxRecordNumber = Integer.parseInt(prop.getProperty("maxrecordnumber"));
            maxRecordTime = Integer.parseInt(prop.getProperty("maxtimenumber"));
        }
        if (delay != 0) {
            DELAY = delay;
            INITIAL_DELAY = delay;
        }
        if (maxRecordNumber != 0) {
            MAX_RECORDS_NUMBER = maxRecordNumber;
        }
        if (maxRecordTime != 0) {
            OLD_RECORD_MAX_TIME_ELAPSED = maxRecordTime * 1000 * 60;
        }
        TIME_LOAD_CONFIGURATION = Calendar.getInstance().getTimeInMillis();
        logger.trace("Start Instance for time load configuration {}", (Object)TIME_LOAD_CONFIGURATION);
        return new BufferAggregationScheduler(persistenceExecutor);
    }

    protected AggregationScheduler(PersistenceExecutor persistenceExecutor) {
        this.persistenceExecutor = persistenceExecutor;
        ScheduledFuture<?> future = ExecutorUtils.scheduler.scheduleAtFixedRate(this, INITIAL_DELAY, DELAY, TIME_UNIT);
        logger.trace("Thread scheduler created in {} ", (Object)this.toString());
        logger.trace("Reload configuration every {}", (Object)TIME_RELOAD_CONFIGURATION);
        logger.trace("Aggregated for max record {}", (Object)MAX_RECORDS_NUMBER);
        logger.trace("Aggregated for max time {}", (Object)OLD_RECORD_MAX_TIME_ELAPSED);
    }

    protected static AggregatedRecord instantiateAggregatedRecord(Record record) throws Exception {
        String recordType = record.getRecordType();
        Class<AggregatedRecord<?, ?>> clz = RecordUtility.getAggregatedRecordClass(recordType);
        Class[] argTypes = new Class[]{record.getClass()};
        Constructor<AggregatedRecord<?, ?>> constructor = clz.getDeclaredConstructor(argTypes);
        Object[] arguments = new Object[]{record};
        return constructor.newInstance(arguments);
    }

    public static AggregatedRecord getAggregatedRecord(Record record) throws Exception {
        AggregatedRecord aggregatedRecord = record instanceof AggregatedRecord ? (AggregatedRecord)record : AggregationScheduler.instantiateAggregatedRecord(record);
        return aggregatedRecord;
    }

    protected void madeAggregation(Record record) {
        String recordType = record.getRecordType();
        if (this.bufferedRecords.containsKey(recordType)) {
            List<Record> records = this.bufferedRecords.get(recordType);
            boolean found = false;
            for (Record bufferedRecord : records) {
                if (!(bufferedRecord instanceof AggregatedRecord)) continue;
                try {
                    AggregatedRecord bufferedAggregatedRecord = (AggregatedRecord)bufferedRecord;
                    if (record instanceof AggregatedRecord) {
                        bufferedAggregatedRecord.aggregate((AggregatedRecord)record);
                    } else {
                        bufferedAggregatedRecord.aggregate(record);
                    }
                    logger.trace("Aggregated Record is {}", (Object)bufferedAggregatedRecord);
                    found = true;
                    break;
                }
                catch (NotAggregatableRecordsExceptions e) {
                    logger.trace("{} is not usable for aggregation", (Object)bufferedRecord);
                }
            }
            if (!found) {
                try {
                    records.add(AggregationScheduler.getAggregatedRecord(record));
                }
                catch (Exception e) {
                    records.add(record);
                }
                ++this.totalBufferedRecords;
                return;
            }
        } else {
            ArrayList<Record> records = new ArrayList<Record>();
            try {
                records.add(AggregationScheduler.getAggregatedRecord(record));
            }
            catch (Exception e) {
                records.add(record);
            }
            ++this.totalBufferedRecords;
            this.bufferedRecords.put(recordType, records);
        }
    }

    public void flush(PersistenceExecutor persistenceExecutor) throws Exception {
        this.aggregate(null, persistenceExecutor, true);
    }

    protected abstract void schedulerSpecificClear();

    protected void clear() {
        this.totalBufferedRecords = 0;
        this.bufferedRecords.clear();
        this.schedulerSpecificClear();
    }

    protected synchronized void aggregate(Record record, PersistenceExecutor persistenceExecutor, boolean forceFlush) throws Exception {
        long now;
        if (record != null) {
            this.madeAggregation(record);
        }
        if (this.isTimeToPersist(MAX_RECORDS_NUMBER, OLD_RECORD_MAX_TIME_ELAPSED) || forceFlush) {
            this.reallyFlush(persistenceExecutor);
        }
        if ((now = Calendar.getInstance().getTimeInMillis()) - TIME_LOAD_CONFIGURATION >= TIME_RELOAD_CONFIGURATION) {
            this.ReloadConfiguration();
        }
    }

    protected void ReloadConfiguration() throws Exception {
        new Thread(){

            @Override
            public void run() {
                Integer delay = 0;
                Integer maxRecordNumber = 0;
                Integer maxRecordTime = 0;
                try {
                    ConfigurationGetPropertyValues properties = new ConfigurationGetPropertyValues();
                    Properties prop = properties.getPropValues();
                    if (prop != null) {
                        logger.trace("Reload Configuration from properties file");
                        delay = Integer.parseInt(prop.getProperty("delay"));
                        maxRecordNumber = Integer.parseInt(prop.getProperty("maxrecordnumber"));
                        maxRecordTime = Integer.parseInt(prop.getProperty("maxtimenumber"));
                    } else {
                        ServiceLoader<PersistenceBackend> serviceLoader = ServiceLoader.load(PersistenceBackend.class);
                        PersistenceBackendConfiguration configuration = null;
                        for (PersistenceBackend found : serviceLoader) {
                            Class<?> foundClass = found.getClass();
                            try {
                                String foundClassName = foundClass.getSimpleName();
                                logger.trace("Testing {}", (Object)foundClassName);
                                configuration = PersistenceBackendConfiguration.getInstance(foundClass);
                                if (configuration == null) continue;
                                logger.debug("{} will be used.", (Object)foundClassName);
                            }
                            catch (Exception e) {
                                logger.error(String.format("%s not initialized correctly. It will not be used. Trying the next one if any.", foundClass.getSimpleName()), e);
                            }
                        }
                        if (configuration != null) {
                            logger.trace("Reload Configuration from service end point");
                            delay = Integer.parseInt(configuration.getProperty(AggregationScheduler.AGGREGATION_SCHEDULER_TIME));
                            maxRecordTime = Integer.parseInt(configuration.getProperty(AggregationScheduler.BUFFER_RECORD_TIME));
                            maxRecordNumber = Integer.parseInt(configuration.getProperty(AggregationScheduler.BUFFER_RECORD_NUMBER));
                        }
                    }
                }
                catch (Exception e) {
                    logger.error(String.format("%s not initialized correctly. It will not be used. Trying the next one if any.", e.getLocalizedMessage()), e);
                }
                if (delay != 0) {
                    DELAY = delay;
                    INITIAL_DELAY = delay;
                }
                if (maxRecordNumber != 0) {
                    MAX_RECORDS_NUMBER = maxRecordNumber;
                }
                if (maxRecordTime != 0) {
                    OLD_RECORD_MAX_TIME_ELAPSED = maxRecordTime * 1000 * 60;
                }
                TIME_LOAD_CONFIGURATION = Calendar.getInstance().getTimeInMillis();
                logger.trace("Aggregated for max record {}", (Object)MAX_RECORDS_NUMBER);
                logger.trace("Aggregated for max time {}", (Object)OLD_RECORD_MAX_TIME_ELAPSED);
            }
        }.start();
    }

    protected void reallyFlush(PersistenceExecutor persistenceExecutor) throws Exception {
        if (this.totalBufferedRecords == 0) {
            return;
        }
        Object[] recordToPersist = new Record[this.totalBufferedRecords];
        int i = 0;
        Collection<List<Record>> values = this.bufferedRecords.values();
        for (List<Record> records : values) {
            for (Record thisRecord : records) {
                recordToPersist[i] = thisRecord;
                ++i;
            }
        }
        logger.trace("reallyFlush It is time to persist buffered records {}", (Object)Arrays.toString(recordToPersist));
        persistenceExecutor.persist((Record[])recordToPersist);
        this.clear();
    }

    public void aggregate(Record record, PersistenceExecutor persistenceExecutor) throws Exception {
        this.aggregate(record, persistenceExecutor, false);
    }

    protected abstract boolean isTimeToPersist(int var1, long var2);

    @Override
    public void run() {
        try {
            this.flush(this.persistenceExecutor);
        }
        catch (Exception e) {
            logger.error("Error flushing Buffered Records", e);
        }
    }
}

