/*
 * Decompiled with CFR 0.152.
 */
package gr.uoa.di.madgik.searchlibrary.operatorlibrary.test.rsfromfilegenerator;

import gr.uoa.di.madgik.grs.buffer.IBuffer;
import gr.uoa.di.madgik.grs.events.BufferEvent;
import gr.uoa.di.madgik.grs.events.KeyValueEvent;
import gr.uoa.di.madgik.grs.proxy.IProxy;
import gr.uoa.di.madgik.grs.proxy.IWriterProxy;
import gr.uoa.di.madgik.grs.proxy.http.HTTPWriterProxy;
import gr.uoa.di.madgik.grs.proxy.local.LocalWriterProxy;
import gr.uoa.di.madgik.grs.proxy.tcp.TCPWriterProxy;
import gr.uoa.di.madgik.grs.record.GenericRecord;
import gr.uoa.di.madgik.grs.record.GenericRecordDefinition;
import gr.uoa.di.madgik.grs.record.Record;
import gr.uoa.di.madgik.grs.record.RecordDefinition;
import gr.uoa.di.madgik.grs.record.field.Field;
import gr.uoa.di.madgik.grs.record.field.FieldDefinition;
import gr.uoa.di.madgik.grs.record.field.StringField;
import gr.uoa.di.madgik.grs.record.field.StringFieldDefinition;
import gr.uoa.di.madgik.grs.writer.RecordWriter;
import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.net.URI;
import java.util.Calendar;
import java.util.LinkedHashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RSFromFileGeneratorWorker
extends Thread {
    private static Logger logger = LoggerFactory.getLogger((String)RSFromFileGeneratorWorker.class.getName());
    private RecordWriter<Record> writer = null;
    private URI outLocator = null;
    private boolean onlyFinalEvent = false;
    private int id;
    private long timeout;
    private IProxy.ProxyType proxyType;
    private TimeUnit timeUnit;
    private Float threshold;
    private String inFile;
    private Object synchWriter = null;

    public RSFromFileGeneratorWorker(String inFile, boolean onlyFinalEvent, int id, IProxy.ProxyType proxyType, long timeout, TimeUnit timeUnit, Float threshold, Object synchWriter) throws Exception {
        this.inFile = inFile;
        this.onlyFinalEvent = onlyFinalEvent;
        this.id = id;
        this.proxyType = proxyType;
        this.timeout = timeout;
        this.timeUnit = timeUnit;
        this.threshold = threshold;
        if (this.threshold != null && (this.threshold.floatValue() < 0.0f || this.threshold.floatValue() > 1.0f)) {
            throw new Exception("Invalid threshold value");
        }
        this.synchWriter = synchWriter;
    }

    public URI getLocator() {
        return this.outLocator;
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void run() {
        this.setName("RSFromFileWorker #" + this.id);
        int rc = 0;
        int eventsEmitted = 0;
        long now = Calendar.getInstance().getTimeInMillis();
        boolean finalEmitted = false;
        try {
            LocalWriterProxy producerProxy = null;
            switch (this.proxyType) {
                case Local: {
                    producerProxy = new LocalWriterProxy();
                    break;
                }
                case TCP: {
                    producerProxy = new TCPWriterProxy();
                    break;
                }
                case HTTP: {
                    producerProxy = new HTTPWriterProxy();
                }
            }
            BufferedReader tmpReader = new BufferedReader(new FileReader(new File(this.inFile)));
            int count = 0;
            String line = null;
            while ((line = tmpReader.readLine()) != null) {
                ++count;
            }
            tmpReader.close();
            BufferedReader fileReader = new BufferedReader(new FileReader(new File(this.inFile)));
            line = null;
            while ((line = fileReader.readLine()) != null) {
                int i;
                String[] fieldEntries = line.split(" ");
                LinkedHashMap<String, String> fieldValues = new LinkedHashMap<String, String>();
                for (String fieldEntry : fieldEntries) {
                    String[] fv = fieldEntry.split("=");
                    if (fv.length != 2) {
                        throw new Exception("Invalid field format: " + fieldEntry);
                    }
                    fieldValues.put(fv[0], fv[1]);
                }
                if (rc == 0) {
                    Object baseWriter;
                    RecordDefinition[] defs = null;
                    StringFieldDefinition[] fieldDefs = new StringFieldDefinition[fieldValues.size()];
                    i = 0;
                    for (Map.Entry fieldValue : fieldValues.entrySet()) {
                        fieldDefs[i] = new StringFieldDefinition((String)fieldValue.getKey());
                        ++i;
                    }
                    defs = new RecordDefinition[]{new GenericRecordDefinition((FieldDefinition[])fieldDefs)};
                    this.writer = this.threshold == null ? (baseWriter = new RecordWriter((IWriterProxy)producerProxy, defs, 50, RecordWriter.DefaultConcurrentPartialCapacity, RecordWriter.DefaultMirrorBufferFactor)) : (baseWriter = new RecordWriter((IWriterProxy)producerProxy, defs, 50, RecordWriter.DefaultConcurrentPartialCapacity, RecordWriter.DefaultMirrorBufferFactor));
                    this.outLocator = producerProxy.getLocator();
                    baseWriter = this.synchWriter;
                    synchronized (baseWriter) {
                        this.synchWriter.notify();
                    }
                }
                if (this.writer.getStatus() == IBuffer.Status.Close || this.writer.getStatus() == IBuffer.Status.Dispose) {
                    logger.info("Consumer side stopped consumption. Random generator #" + this.id + " stopping prematurely");
                    System.out.println("Consumer side stopped consumption. Random generator #" + this.id + " stopping prematurely");
                    break;
                }
                if (rc == 1) {
                    System.out.println("Time to first input: " + (Calendar.getInstance().getTimeInMillis() - now));
                }
                GenericRecord outRec = new GenericRecord();
                Field[] outFields = new Field[fieldValues.size()];
                i = 0;
                for (Map.Entry fieldValue : fieldValues.entrySet()) {
                    String outPayload = (String)fieldValue.getValue();
                    outPayload = outPayload.replaceAll("%nl", "\n");
                    outPayload = outPayload.replace("%%nl", "%nl");
                    outFields[i++] = new StringField(outPayload);
                }
                outRec.setFields(outFields);
                if (!this.onlyFinalEvent || eventsEmitted < 10) {
                    if (rc % 100 == 0 && rc > 0) {
                        this.writer.emit((BufferEvent)new KeyValueEvent("resultsNumber", "" + rc));
                        ++eventsEmitted;
                    }
                } else if (!finalEmitted) {
                    this.writer.emit((BufferEvent)new KeyValueEvent("resultsNumberFinal", "" + count));
                    finalEmitted = true;
                }
                if (!this.writer.put((Record)outRec, this.timeout, this.timeUnit)) {
                    if (this.writer.getStatus() == IBuffer.Status.Open) {
                        logger.warn("Could not write record " + rc + ". Skipping. Available Records = " + this.writer.availableRecords());
                        ++rc;
                        continue;
                    }
                    System.out.println("Consumer side stopped consumption. Random generator #" + this.id + " stopping prematurely");
                    break;
                }
                ++rc;
            }
        }
        catch (Exception e) {
            logger.warn("Error while generating random records", (Throwable)e);
        }
        finally {
            try {
                this.writer.close();
            }
            catch (Exception e) {}
        }
        long closestop = Calendar.getInstance().getTimeInMillis();
        logger.info("Data generation took " + (closestop - now));
        logger.info("Produced " + rc + " records");
        logger.info("Production rate was " + (float)rc / (float)(closestop - now) * 1000.0f);
    }
}

