/*
 * Decompiled with CFR 0.152.
 */
package io.vertx.core.eventbus.impl;

import io.vertx.core.AsyncResult;
import io.vertx.core.Context;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.eventbus.Message;
import io.vertx.core.eventbus.MessageConsumer;
import io.vertx.core.eventbus.ReplyException;
import io.vertx.core.eventbus.ReplyFailure;
import io.vertx.core.eventbus.impl.BodyReadStream;
import io.vertx.core.eventbus.impl.EventBusImpl;
import io.vertx.core.eventbus.impl.clustered.ClusteredMessage;
import io.vertx.core.impl.Arguments;
import io.vertx.core.logging.Logger;
import io.vertx.core.logging.LoggerFactory;
import io.vertx.core.spi.metrics.EventBusMetrics;
import io.vertx.core.streams.ReadStream;
import java.util.ArrayDeque;
import java.util.Objects;
import java.util.Queue;

public class HandlerRegistration<T>
implements MessageConsumer<T>,
Handler<Message<T>> {
    private static final Logger log = LoggerFactory.getLogger(HandlerRegistration.class);
    public static final int DEFAULT_MAX_BUFFERED_MESSAGES = 1000;
    private final Vertx vertx;
    private final EventBusMetrics metrics;
    private final EventBusImpl eventBus;
    private final String address;
    private final String repliedAddress;
    private final boolean localOnly;
    private final Handler<AsyncResult<Message<T>>> asyncResultHandler;
    private long timeoutID = -1L;
    private boolean registered;
    private Handler<Message<T>> handler;
    private AsyncResult<Void> result;
    private Handler<AsyncResult<Void>> completionHandler;
    private Handler<Void> endHandler;
    private Handler<Message<T>> discardHandler;
    private int maxBufferedMessages = 1000;
    private final Queue<Message<T>> pending = new ArrayDeque<Message<T>>(8);
    private boolean paused;
    private Object metric;

    public HandlerRegistration(Vertx vertx, EventBusMetrics metrics, EventBusImpl eventBus, String address, String repliedAddress, boolean localOnly, Handler<AsyncResult<Message<T>>> asyncResultHandler, long timeout) {
        this.vertx = vertx;
        this.metrics = metrics;
        this.eventBus = eventBus;
        this.address = address;
        this.repliedAddress = repliedAddress;
        this.localOnly = localOnly;
        this.asyncResultHandler = asyncResultHandler;
        if (timeout != -1L) {
            this.timeoutID = vertx.setTimer(timeout, tid -> {
                metrics.replyFailure(address, ReplyFailure.TIMEOUT);
                this.sendAsyncResultFailure(ReplyFailure.TIMEOUT, "Timed out waiting for a reply");
            });
        }
    }

    @Override
    public synchronized MessageConsumer<T> setMaxBufferedMessages(int maxBufferedMessages) {
        Arguments.require(maxBufferedMessages >= 0, "Max buffered messages cannot be negative");
        while (this.pending.size() > maxBufferedMessages) {
            this.pending.poll();
        }
        this.maxBufferedMessages = maxBufferedMessages;
        return this;
    }

    @Override
    public synchronized int getMaxBufferedMessages() {
        return this.maxBufferedMessages;
    }

    @Override
    public String address() {
        return this.address;
    }

    @Override
    public synchronized void completionHandler(Handler<AsyncResult<Void>> completionHandler) {
        Objects.requireNonNull(completionHandler);
        if (this.result != null) {
            AsyncResult<Void> value = this.result;
            this.vertx.runOnContext(v -> completionHandler.handle(value));
        } else {
            this.completionHandler = completionHandler;
        }
    }

    @Override
    public synchronized void unregister() {
        this.unregister(false);
    }

    @Override
    public synchronized void unregister(Handler<AsyncResult<Void>> completionHandler) {
        Objects.requireNonNull(completionHandler);
        this.doUnregister(completionHandler, false);
    }

    public void unregister(boolean callEndHandler) {
        this.doUnregister(null, callEndHandler);
    }

    public void sendAsyncResultFailure(ReplyFailure failure, String msg) {
        this.unregister();
        this.asyncResultHandler.handle(Future.failedFuture(new ReplyException(failure, msg)));
    }

    private void doUnregister(Handler<AsyncResult<Void>> completionHandler, boolean callEndHandler) {
        if (this.timeoutID != -1L) {
            this.vertx.cancelTimer(this.timeoutID);
        }
        if (this.endHandler != null && callEndHandler) {
            Handler<Void> theEndHandler = this.endHandler;
            Handler<AsyncResult<Void>> handler = completionHandler;
            completionHandler = ar -> {
                theEndHandler.handle(null);
                if (handler != null) {
                    handler.handle((AsyncResult<Void>)ar);
                }
            };
        }
        if (this.registered) {
            this.registered = false;
            this.eventBus.removeRegistration(this.address, this, completionHandler);
        } else {
            this.callCompletionHandlerAsync(completionHandler);
        }
        this.registered = false;
    }

    private void callCompletionHandlerAsync(Handler<AsyncResult<Void>> completionHandler) {
        if (completionHandler != null) {
            this.vertx.runOnContext(v -> completionHandler.handle(Future.succeededFuture()));
        }
    }

    public synchronized void setResult(AsyncResult<Void> result) {
        this.result = result;
        if (this.completionHandler != null) {
            if (result.succeeded()) {
                this.metric = this.metrics.handlerRegistered(this.address, this.repliedAddress);
            }
            Handler<AsyncResult<Void>> callback = this.completionHandler;
            this.vertx.runOnContext(v -> callback.handle(result));
        } else if (result.failed()) {
            log.error("Failed to propagate registration for handler " + this.handler + " and address " + this.address);
        } else {
            this.metric = this.metrics.handlerRegistered(this.address, this.repliedAddress);
        }
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    @Override
    public void handle(Message<T> message) {
        Handler<Message<T>> theHandler = null;
        HandlerRegistration handlerRegistration = this;
        synchronized (handlerRegistration) {
            if (this.paused) {
                if (this.pending.size() < this.maxBufferedMessages) {
                    this.pending.add(message);
                } else if (this.discardHandler != null) {
                    this.discardHandler.handle(message);
                } else {
                    log.warn("Discarding message as more than " + this.maxBufferedMessages + " buffered in paused consumer");
                }
            } else {
                this.checkNextTick();
                if (this.metrics.isEnabled()) {
                    ClusteredMessage cmsg;
                    boolean local = true;
                    if (message instanceof ClusteredMessage && (cmsg = (ClusteredMessage)message).isFromWire()) {
                        local = false;
                    }
                    this.metrics.beginHandleMessage(this.metric, local);
                }
                theHandler = this.handler;
            }
        }
        if (theHandler != null) {
            String creditsAddress = message.headers().get("__vertx.credit");
            if (creditsAddress != null) {
                this.eventBus.send(creditsAddress, 1);
            }
            this.handleMessage(theHandler, message);
        }
    }

    private void handleMessage(Handler<Message<T>> theHandler, Message<T> message) {
        try {
            theHandler.handle(message);
            this.metrics.endHandleMessage(this.metric, null);
        }
        catch (Exception e) {
            log.error((Object)"Failed to handleMessage", e);
            this.metrics.endHandleMessage(this.metric, e);
            throw e;
        }
    }

    public synchronized void discardHandler(Handler<Message<T>> handler) {
        this.discardHandler = handler;
    }

    @Override
    public synchronized MessageConsumer<T> handler(Handler<Message<T>> handler) {
        this.handler = handler;
        if (this.handler != null && !this.registered) {
            this.registered = true;
            this.eventBus.addRegistration(this.address, this, this.repliedAddress != null, this.localOnly);
        } else if (this.handler == null && this.registered) {
            this.unregister();
        }
        return this;
    }

    @Override
    public ReadStream<T> bodyStream() {
        return new BodyReadStream(this);
    }

    @Override
    public synchronized boolean isRegistered() {
        return this.registered;
    }

    @Override
    public synchronized MessageConsumer<T> pause() {
        if (!this.paused) {
            this.paused = true;
        }
        return this;
    }

    @Override
    public synchronized MessageConsumer<T> resume() {
        if (this.paused) {
            this.paused = false;
            this.checkNextTick();
        }
        return this;
    }

    @Override
    public synchronized MessageConsumer<T> endHandler(Handler<Void> endHandler) {
        if (endHandler != null) {
            Context endCtx = this.vertx.getOrCreateContext();
            this.endHandler = v1 -> endCtx.runOnContext(v2 -> endHandler.handle(null));
        } else {
            this.endHandler = null;
        }
        return this;
    }

    @Override
    public synchronized MessageConsumer<T> exceptionHandler(Handler<Throwable> handler) {
        return this;
    }

    private void checkNextTick() {
        if (!this.pending.isEmpty()) {
            this.vertx.runOnContext(v -> {
                Message<T> message;
                if (!this.paused && (message = this.pending.poll()) != null) {
                    this.handle(message);
                }
            });
        }
    }

    public Handler<Message<T>> getHandler() {
        return this.handler;
    }

    public Object getMetric() {
        return this.metric;
    }
}

