package org.xtreemfs.foundation.flease;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.DatagramChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.xtreemfs.foundation.LifeCycleThread;
import org.xtreemfs.foundation.buffer.BufferPool;
import org.xtreemfs.foundation.buffer.ReusableBuffer;
import org.xtreemfs.foundation.flease.comm.FleaseMessage;
import org.xtreemfs.foundation.logging.Logging;

/* loaded from: input_file:WEB-INF/lib/BabuDB-0.4.5.jar:org/xtreemfs/foundation/flease/UDPFleaseCommunicator.class */
public class UDPFleaseCommunicator extends LifeCycleThread implements FleaseMessageSenderInterface {
    private final FleaseStage stage;
    private final int port;
    private DatagramChannel channel;
    private Selector selector;
    private volatile boolean quit;
    private final AtomicBoolean sendMode;
    private final LinkedBlockingQueue<FleaseMessage> q;
    private static final int MAX_UDP_SIZE = 16384;
    private long numTx;
    private long numRx;

    public UDPFleaseCommunicator(FleaseConfig fleaseConfig, String str, boolean z, FleaseViewChangeListenerInterface fleaseViewChangeListenerInterface) throws Exception {
        super("FlUDPCom");
        this.stage = new FleaseStage(fleaseConfig, str, this, z, fleaseViewChangeListenerInterface, null);
        this.port = fleaseConfig.getEndpoint().getPort();
        this.q = new LinkedBlockingQueue<>();
        this.sendMode = new AtomicBoolean(false);
        this.numTx = 0L;
        this.numRx = 0L;
    }

    public FleaseStage getStage() {
        return this.stage;
    }

    @Override // org.xtreemfs.foundation.flease.FleaseMessageSenderInterface
    public void sendMessage(FleaseMessage fleaseMessage, InetSocketAddress inetSocketAddress) {
        FleaseMessage m8828clone = fleaseMessage.m8828clone();
        m8828clone.setSender(inetSocketAddress);
        send(m8828clone);
    }

    public void send(FleaseMessage fleaseMessage) {
        this.q.add(fleaseMessage);
        if (this.q.size() == 1) {
            this.selector.wakeup();
        }
    }

    public void shutdown() {
        this.quit = true;
        interrupt();
    }

    @Override // java.lang.Thread, java.lang.Runnable
    public void run() {
        FleaseMessage fleaseMessage;
        long j = 0;
        long j2 = 0;
        long j3 = 0;
        try {
            this.selector = Selector.open();
            this.channel = DatagramChannel.open();
            this.channel.socket().bind(new InetSocketAddress(this.port));
            this.channel.socket().setReceiveBufferSize(1073741824);
            this.channel.socket().setSendBufferSize(1073741824);
            Logging.logMessage(7, Logging.Category.net, this, "sendbuffer size: " + this.channel.socket().getSendBufferSize(), new Object[0]);
            Logging.logMessage(7, Logging.Category.net, this, "recv       size: " + this.channel.socket().getReceiveBufferSize(), new Object[0]);
            this.channel.configureBlocking(false);
            this.channel.register(this.selector, 1);
            if (Logging.isInfo()) {
                Logging.logMessage(6, Logging.Category.net, this, "UDP socket on port %d ready", Integer.valueOf(this.port));
            }
            this.stage.start();
            this.stage.waitForStartup();
            notifyStarted();
            boolean z = true;
            ArrayList arrayList = new ArrayList(5000);
            ReusableBuffer allocate = BufferPool.allocate(16384);
            while (!this.quit) {
                if (this.q.size() == 0) {
                    if (!z) {
                        this.channel.keyFor(this.selector).interestOps(1);
                        z = true;
                    }
                } else if (z) {
                    this.channel.keyFor(this.selector).interestOps(5);
                    z = false;
                }
                int select = this.selector.select();
                if (this.q.size() == 0) {
                    if (!z) {
                        this.channel.keyFor(this.selector).interestOps(1);
                        z = true;
                    }
                } else if (z) {
                    this.channel.keyFor(this.selector).interestOps(5);
                    z = false;
                }
                if (select != 0) {
                    if (this.q.size() > 10000) {
                        System.out.println("QS!!!!! " + this.q.size());
                        System.out.println("is readOnly: " + z);
                    }
                    Iterator<SelectionKey> it2 = this.selector.selectedKeys().iterator();
                    while (it2.hasNext()) {
                        SelectionKey next = it2.next();
                        it2.remove();
                        if (next.isWritable()) {
                            this.q.drainTo(arrayList, 50);
                            while (true) {
                                if (arrayList.isEmpty() || (fleaseMessage = (FleaseMessage) arrayList.remove(arrayList.size() - 1)) == null) {
                                    break;
                                }
                                if (Logging.isDebug()) {
                                    Logging.logMessage(7, Logging.Category.net, this, "sent packet to %s", fleaseMessage.getSender().toString());
                                }
                                allocate.clear();
                                fleaseMessage.serialize(allocate);
                                allocate.flip();
                                if (this.channel.send(allocate.getBuffer(), fleaseMessage.getSender()) == 0) {
                                    System.out.println("cannot send anymore!");
                                    this.q.addAll(arrayList);
                                    arrayList.clear();
                                    break;
                                }
                                this.numTx++;
                            }
                        }
                        if (next.isReadable()) {
                            int i = 0;
                            while (true) {
                                allocate.clear();
                                InetSocketAddress inetSocketAddress = (InetSocketAddress) this.channel.receive(allocate.getBuffer());
                                if (inetSocketAddress != null) {
                                    i++;
                                    if (Logging.isDebug()) {
                                        Logging.logMessage(7, Logging.Category.net, this, "read data from %s", inetSocketAddress.toString());
                                    }
                                    try {
                                        allocate.flip();
                                        FleaseMessage fleaseMessage2 = new FleaseMessage(allocate);
                                        fleaseMessage2.setSender(inetSocketAddress);
                                        this.numRx++;
                                        this.stage.receiveMessage(fleaseMessage2);
                                    } catch (Throwable th) {
                                        th.printStackTrace();
                                        Logging.logMessage(4, Logging.Category.net, this, "received invalid UPD message: " + th, new Object[0]);
                                    }
                                    if (inetSocketAddress == null) {
                                        break;
                                    }
                                } else if (Logging.isDebug()) {
                                    Logging.logMessage(7, Logging.Category.net, this, "read key for empty read", new Object[0]);
                                }
                            }
                            j++;
                            j3 += i;
                            if (i > j2) {
                                j2 = i;
                            }
                        }
                    }
                }
            }
            this.stage.shutdown();
            this.selector.close();
            this.channel.close();
        } catch (ClosedByInterruptException e) {
        } catch (IOException e2) {
            Logging.logError(3, this, e2);
        } catch (Throwable th2) {
            notifyCrashed(th2 instanceof Exception ? (Exception) th2 : new Exception(th2));
            return;
        }
        Logging.logMessage(3, Logging.Category.net, this, "num packets tranferred: %d tx    %d rx", Long.valueOf(this.numTx), Long.valueOf(this.numRx));
        Logging.logMessage(3, Logging.Category.net, this, "numRxCycles %d, maxPkgPerCycle %d, avg/Cycle %d", Long.valueOf(j), Long.valueOf(j2), Long.valueOf(j3 / j));
        notifyStopped();
    }
}
