/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.core.eventbus.impl.clustered;

import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.buffer.Buffer;
import io.vertx.core.eventbus.impl.clustered.ClusteredEventBus;
import io.vertx.core.eventbus.impl.clustered.ClusteredMessage;
import io.vertx.core.eventbus.impl.codecs.PingMessageCodec;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.core.net.NetClient;
import io.vertx.core.net.NetClientOptions;
import io.vertx.core.net.NetSocket;
import io.vertx.core.net.impl.NetClientImpl;
import io.vertx.core.net.impl.ServerID;
import io.vertx.core.spi.metrics.EventBusMetrics;
import java.util.ArrayDeque;
import java.util.Queue;

class ConnectionHolder {
    private static final Logger log = LoggerFactory.getLogger(ConnectionHolder.class);
    private static final String PING_ADDRESS = "__vertx_ping";
    private final ClusteredEventBus eventBus;
    private final NetClient client;
    private final ServerID serverID;
    private final Vertx vertx;
    private final EventBusMetrics metrics;
    private Queue<ClusteredMessage> pending;
    private NetSocket socket;
    private boolean connected;
    private long timeoutID = -1L;
    private long pingTimeoutID = -1L;

    ConnectionHolder(ClusteredEventBus eventBus, ServerID serverID) {
        this.eventBus = eventBus;
        this.serverID = serverID;
        this.vertx = eventBus.vertx();
        this.metrics = eventBus.getMetrics();
        this.client = new NetClientImpl(eventBus.vertx(), new NetClientOptions().setConnectTimeout(60000), false);
    }

    synchronized void connect() {
        if (this.connected) {
            throw new IllegalStateException("Already connected");
        }
        this.client.connect(this.serverID.port, this.serverID.host, res -> {
            if (res.succeeded()) {
                this.connected((NetSocket)res.result());
            } else {
                this.close();
            }
        });
    }

    synchronized void writeMessage(ClusteredMessage message) {
        if (this.connected) {
            Buffer data = message.encodeToWire();
            this.metrics.messageWritten(message.address(), data.length());
            this.socket.write(data);
        } else {
            if (this.pending == null) {
                this.pending = new ArrayDeque<ClusteredMessage>();
            }
            this.pending.add(message);
        }
    }

    void close() {
        if (this.timeoutID != -1L) {
            this.vertx.cancelTimer(this.timeoutID);
        }
        if (this.pingTimeoutID != -1L) {
            this.vertx.cancelTimer(this.pingTimeoutID);
        }
        try {
            this.client.close();
        }
        catch (Exception exception) {
            // empty catch block
        }
        if (this.eventBus.connections().remove(this.serverID, this)) {
            log.debug("Cluster connection closed: " + this.serverID + " holder " + this);
        }
    }

    private void schedulePing() {
        VertxOptions options = this.eventBus.options();
        this.pingTimeoutID = this.vertx.setTimer(options.getClusterPingInterval(), id1 -> {
            this.timeoutID = this.vertx.setTimer(options.getClusterPingReplyInterval(), id2 -> {
                log.warn("No pong from server " + this.serverID + " - will consider it dead");
                this.close();
            });
            ClusteredMessage<String, String> pingMessage = new ClusteredMessage<String, String>(this.serverID, PING_ADDRESS, null, null, null, new PingMessageCodec(), true, this.eventBus);
            Buffer data = pingMessage.encodeToWire();
            this.socket.write(data);
        });
    }

    private synchronized void connected(NetSocket socket) {
        this.socket = socket;
        this.connected = true;
        socket.exceptionHandler(t -> this.close());
        socket.closeHandler(v -> this.close());
        socket.handler(data -> {
            this.vertx.cancelTimer(this.timeoutID);
            this.schedulePing();
        });
        this.schedulePing();
        for (ClusteredMessage message : this.pending) {
            Buffer data2 = message.encodeToWire();
            this.metrics.messageWritten(message.address(), data2.length());
            socket.write(data2);
        }
        this.pending.clear();
    }
}

