package gr.uoa.di.madgik.searchlibrary.operatorlibrary.gmerge;

import gr.uoa.di.madgik.grs.buffer.IBuffer;
import gr.uoa.di.madgik.grs.proxy.local.LocalWriterProxy;
import gr.uoa.di.madgik.grs.reader.GRS2ReaderException;
import gr.uoa.di.madgik.grs.record.Record;
import gr.uoa.di.madgik.grs.writer.GRS2WriterException;
import gr.uoa.di.madgik.grs.writer.RecordWriter;
import gr.uoa.di.madgik.searchlibrary.operatorlibrary.merge.ReaderHolder;
import gr.uoa.di.madgik.searchlibrary.operatorlibrary.merge.ReaderScan;
import gr.uoa.di.madgik.searchlibrary.operatorlibrary.merge.RecordBufferEntry;
import gr.uoa.di.madgik.searchlibrary.operatorlibrary.stats.StatsContainer;
import java.net.URI;
import java.util.LinkedList;
import java.util.Vector;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/* loaded from: input_file:gr/uoa/di/madgik/searchlibrary/operatorlibrary/gmerge/GradualMergeWorker.class */
public class GradualMergeWorker extends Thread {
    private Vector<ReaderHolder> readers;
    private Object synchDispatcher;
    private String uid;
    private long timeout;
    private TimeUnit timeUnit;
    private GradualLocatorReader inputRetriever;
    private StatsContainer stats;
    private Object synchMergingStart;
    private Logger log = LoggerFactory.getLogger(GradualMergeWorker.class.getName());
    private RecordWriter<Record> writer = null;
    private Object synchWriterInit = new Object();
    private int count = 0;
    private long firststop = 0;
    private SynchFinished synchFinished = new SynchFinished();

    public GradualMergeWorker(Vector<ReaderHolder> vector, StatsContainer statsContainer, long j, TimeUnit timeUnit, String str, GradualLocatorReader gradualLocatorReader, Object obj, Object obj2) {
        this.readers = null;
        this.uid = null;
        this.readers = vector;
        this.stats = statsContainer;
        this.timeout = j;
        this.timeUnit = timeUnit;
        this.uid = str;
        this.inputRetriever = gradualLocatorReader;
        this.synchDispatcher = obj;
        this.synchMergingStart = obj2;
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        InterruptedException interruptedException;
        Thread.currentThread().setName(GradualMergeWorker.class.getName());
        long currentTimeMillis = System.currentTimeMillis();
        long currentTimeMillis2 = System.currentTimeMillis();
        setName("Merge Worker");
        BlockingQueue blockingQueue = null;
        try {
            try {
                this.log.info("Queue capacity for " + this.uid + " is 100");
                ArrayBlockingQueue arrayBlockingQueue = new ArrayBlockingQueue(100);
                Vector vector = new Vector(this.readers.size());
                new FirstAvailableGradualScanDispatcher(vector, this.readers, arrayBlockingQueue, new LinkedList(), this.uid, this.synchDispatcher, this.synchWriterInit, this.synchFinished).start();
                synchronized (this.synchWriterInit) {
                    while (vector.size() == 0 && !this.synchFinished.isFinished()) {
                        this.synchWriterInit.wait();
                    }
                }
                boolean z = false;
                long j = 10;
                while (true) {
                    if (z) {
                        break;
                    }
                    if (this.count == 0) {
                        this.firststop = System.currentTimeMillis();
                        this.stats.timeToFirstInput(this.firststop - currentTimeMillis);
                        this.log.debug("First stop: " + (this.firststop - currentTimeMillis));
                    }
                    RecordBufferEntry recordBufferEntry = (RecordBufferEntry) arrayBlockingQueue.poll(j < 500 ? j : 500L, TimeUnit.MILLISECONDS);
                    if (recordBufferEntry != null) {
                        j = 10;
                        Record record = recordBufferEntry.record;
                        if (this.writer == null) {
                            int i = 0;
                            while (true) {
                                if (i >= this.readers.size()) {
                                    break;
                                }
                                ReaderHolder readerHolder = this.readers.get(i);
                                if (!readerHolder.getWaitingForInit()) {
                                    this.writer = new RecordWriter<>(new LocalWriterProxy(), readerHolder.getReader().getRecordDefinitions(), RecordWriter.DefaultBufferCapacity, RecordWriter.DefaultConcurrentPartialCapacity, RecordWriter.DefaultMirrorBufferFactor, this.timeout, this.timeUnit);
                                    synchronized (this.synchMergingStart) {
                                        this.synchMergingStart.notify();
                                    }
                                    break;
                                }
                                i++;
                            }
                        }
                        if (this.writer.getStatus() == IBuffer.Status.Close || this.writer.getStatus() == IBuffer.Status.Dispose) {
                            break;
                        }
                        if (this.writer.importRecord(record, 0, this.timeout, this.timeUnit)) {
                            this.count++;
                        } else if (this.writer.getStatus() == IBuffer.Status.Open) {
                            this.log.warn("Writer of " + this.uid + " has timed out");
                        }
                    } else {
                        if (this.inputRetriever.hasFinished()) {
                            int i2 = 0;
                            for (int i3 = 0; i3 < vector.size(); i3++) {
                                if (!this.readers.get(i3).hasFinished()) {
                                    i2++;
                                }
                            }
                            if (i2 == 0) {
                                z = true;
                            }
                        }
                        j *= 2;
                    }
                }
                this.log.info("Consumer side of " + this.uid + " stopped consumption. Notifying all " + this.readers.size() + " readers to stop.");
                for (int i4 = 0; i4 < this.readers.size(); i4++) {
                    this.readers.get(i4).setFinished(true);
                    try {
                        this.readers.get(i4).getReader().close();
                    } catch (GRS2ReaderException e) {
                        this.log.warn("Could not close reader #" + i4);
                    }
                }
                arrayBlockingQueue.clear();
                this.synchFinished.setFinished(true);
                long currentTimeMillis3 = System.currentTimeMillis();
                for (int i5 = 0; i5 < vector.size(); i5++) {
                    do {
                        try {
                            ((ReaderScan) vector.get(i5)).join();
                            interruptedException = null;
                        } catch (InterruptedException e2) {
                            interruptedException = e2;
                        }
                    } while (interruptedException != null);
                }
                arrayBlockingQueue.clear();
                for (int i6 = 0; i6 < this.readers.size(); i6++) {
                    this.readers.get(i6).setFinished(true);
                    try {
                        if (this.readers.get(i6).getReader().getStatus() != IBuffer.Status.Dispose) {
                            this.readers.get(i6).getReader().close();
                        }
                    } catch (Exception e3) {
                        this.log.warn("Could not close reader #" + i6);
                    }
                }
                try {
                    if (this.writer != null) {
                        this.writer.close();
                    }
                } catch (Exception e4) {
                }
                synchronized (this.synchDispatcher) {
                    this.synchDispatcher.notify();
                }
                synchronized (this.synchWriterInit) {
                    this.synchWriterInit.notify();
                }
                this.log.info("MERGE OPERATOR " + this.uid + ":\nProduced first result in " + (this.firststop - currentTimeMillis) + " milliseconds\nProduced last result in " + (currentTimeMillis3 - currentTimeMillis) + " milliseconds\nProduced " + this.count + " records\nProduction rate was " + ((this.count / ((float) (currentTimeMillis3 - currentTimeMillis))) * 1000.0f) + " records per second");
            } catch (Exception e5) {
                this.log.error("Could not complete background merging for " + this.uid + ". Closing", e5);
                blockingQueue.clear();
                for (int i7 = 0; i7 < this.readers.size(); i7++) {
                    this.readers.get(i7).setFinished(true);
                    try {
                        if (this.readers.get(i7).getReader().getStatus() != IBuffer.Status.Dispose) {
                            this.readers.get(i7).getReader().close();
                        }
                    } catch (Exception e6) {
                        this.log.warn("Could not close reader #" + i7);
                    }
                }
                try {
                    if (this.writer != null) {
                        this.writer.close();
                    }
                } catch (Exception e7) {
                }
                synchronized (this.synchDispatcher) {
                    this.synchDispatcher.notify();
                    synchronized (this.synchWriterInit) {
                        this.synchWriterInit.notify();
                        this.log.info("MERGE OPERATOR " + this.uid + ":\nProduced first result in " + (this.firststop - currentTimeMillis) + " milliseconds\nProduced last result in " + (currentTimeMillis2 - currentTimeMillis) + " milliseconds\nProduced " + this.count + " records\nProduction rate was " + ((this.count / ((float) (currentTimeMillis2 - currentTimeMillis))) * 1000.0f) + " records per second");
                    }
                }
            }
        } catch (Throwable th) {
            blockingQueue.clear();
            for (int i8 = 0; i8 < this.readers.size(); i8++) {
                this.readers.get(i8).setFinished(true);
                try {
                    if (this.readers.get(i8).getReader().getStatus() != IBuffer.Status.Dispose) {
                        this.readers.get(i8).getReader().close();
                    }
                } catch (Exception e8) {
                    this.log.warn("Could not close reader #" + i8);
                }
            }
            try {
                if (this.writer != null) {
                    this.writer.close();
                }
            } catch (Exception e9) {
            }
            synchronized (this.synchDispatcher) {
                this.synchDispatcher.notify();
                synchronized (this.synchWriterInit) {
                    this.synchWriterInit.notify();
                    this.log.info("MERGE OPERATOR " + this.uid + ":\nProduced first result in " + (this.firststop - currentTimeMillis) + " milliseconds\nProduced last result in " + (currentTimeMillis2 - currentTimeMillis) + " milliseconds\nProduced " + this.count + " records\nProduction rate was " + ((this.count / ((float) (currentTimeMillis2 - currentTimeMillis))) * 1000.0f) + " records per second");
                    throw th;
                }
            }
        }
    }

    public URI getWriterLocator() {
        if (this.writer == null) {
            return null;
        }
        try {
            return this.writer.getLocator();
        } catch (GRS2WriterException e) {
            return null;
        }
    }
}
