/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.causalclustering.catchup;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.concurrent.GenericFutureListener;
import java.net.ConnectException;
import java.net.SocketAddress;
import java.time.Clock;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.Function;
import org.neo4j.causalclustering.catchup.CatchUpChannelPool;
import org.neo4j.causalclustering.catchup.CatchUpClientException;
import org.neo4j.causalclustering.catchup.CatchUpResponseAdaptor;
import org.neo4j.causalclustering.catchup.CatchUpResponseCallback;
import org.neo4j.causalclustering.catchup.CatchUpResponseHandler;
import org.neo4j.causalclustering.catchup.TimeoutLoop;
import org.neo4j.causalclustering.catchup.TrackingResponseHandler;
import org.neo4j.causalclustering.messaging.CatchUpRequest;
import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.helpers.NamedThreadFactory;
import org.neo4j.kernel.lifecycle.LifecycleAdapter;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

public class CatchUpClient
extends LifecycleAdapter {
    private final Log log;
    private final Clock clock;
    private final long inactivityTimeoutMillis;
    private final Function<CatchUpResponseHandler, ChannelInitializer<SocketChannel>> channelInitializer;
    private final CatchUpChannelPool<CatchUpChannel> pool = new CatchUpChannelPool<CatchUpChannel>(x$0 -> new CatchUpChannel((AdvertisedSocketAddress)x$0));
    private NioEventLoopGroup eventLoopGroup;

    public CatchUpClient(LogProvider logProvider, Clock clock, long inactivityTimeoutMillis, Function<CatchUpResponseHandler, ChannelInitializer<SocketChannel>> channelInitializer) {
        this.log = logProvider.getLog(((Object)((Object)this)).getClass());
        this.clock = clock;
        this.inactivityTimeoutMillis = inactivityTimeoutMillis;
        this.channelInitializer = channelInitializer;
    }

    public <T> T makeBlockingRequest(AdvertisedSocketAddress upstream, CatchUpRequest request, CatchUpResponseCallback<T> responseHandler) throws CatchUpClientException {
        CompletableFuture future = new CompletableFuture();
        CatchUpChannel channel = null;
        try {
            channel = this.pool.acquire(upstream);
            channel.setResponseHandler(responseHandler, future);
            future.whenComplete((BiConsumer)new ReleaseOnComplete(channel));
            channel.send(request);
        }
        catch (Exception e) {
            if (channel != null) {
                this.pool.dispose(channel);
            }
            throw new CatchUpClientException("Failed to send request", e);
        }
        String operation = String.format("Completed exceptionally when executing operation %s on %s ", request, upstream);
        return TimeoutLoop.waitForCompletion(future, operation, channel::millisSinceLastResponse, this.inactivityTimeoutMillis, this.log);
    }

    public void start() {
        this.eventLoopGroup = new NioEventLoopGroup(0, (ThreadFactory)new NamedThreadFactory("catch-up-client"));
    }

    public void stop() {
        this.log.info("CatchUpClient stopping");
        try {
            this.pool.close();
            this.eventLoopGroup.shutdownGracefully(0L, 0L, TimeUnit.MICROSECONDS).sync();
        }
        catch (InterruptedException e) {
            this.log.warn("Interrupted while stopping catch up client.");
        }
    }

    private class CatchUpChannel
    implements CatchUpChannelPool.Channel {
        private final TrackingResponseHandler handler;
        private final AdvertisedSocketAddress destination;
        private Channel nettyChannel;
        private final Bootstrap bootstrap;

        CatchUpChannel(AdvertisedSocketAddress destination) {
            this.destination = destination;
            this.handler = new TrackingResponseHandler(new CatchUpResponseAdaptor(), CatchUpClient.this.clock);
            this.bootstrap = (Bootstrap)((Bootstrap)((Bootstrap)new Bootstrap().group((EventLoopGroup)CatchUpClient.this.eventLoopGroup)).channel(NioSocketChannel.class)).handler((ChannelHandler)CatchUpClient.this.channelInitializer.apply(this.handler));
        }

        void setResponseHandler(CatchUpResponseCallback responseHandler, CompletableFuture<?> requestOutcomeSignal) {
            this.handler.setResponseHandler(responseHandler, requestOutcomeSignal);
        }

        void send(CatchUpRequest request) throws ConnectException {
            if (!this.isActive()) {
                throw new ConnectException("Channel is not connected");
            }
            this.nettyChannel.write((Object)request.messageType());
            this.nettyChannel.writeAndFlush((Object)request);
        }

        Optional<Long> millisSinceLastResponse() {
            return this.handler.lastResponseTime().map(lastResponseMillis -> CatchUpClient.this.clock.millis() - lastResponseMillis);
        }

        @Override
        public AdvertisedSocketAddress destination() {
            return this.destination;
        }

        @Override
        public void connect() throws Exception {
            ChannelFuture channelFuture = this.bootstrap.connect((SocketAddress)this.destination.socketAddress());
            this.nettyChannel = channelFuture.sync().channel();
            this.nettyChannel.closeFuture().addListener((GenericFutureListener)((ChannelFutureListener)future -> this.handler.onClose()));
        }

        @Override
        public boolean isActive() {
            return this.nettyChannel.isActive();
        }

        @Override
        public void close() {
            if (this.nettyChannel != null) {
                this.nettyChannel.close();
            }
        }
    }

    private class ReleaseOnComplete
    implements BiConsumer<Object, Throwable> {
        private CatchUpChannel catchUpChannel;

        ReleaseOnComplete(CatchUpChannel catchUpChannel) {
            this.catchUpChannel = catchUpChannel;
        }

        @Override
        public void accept(Object o, Throwable throwable) {
            if (throwable == null) {
                CatchUpClient.this.pool.release(this.catchUpChannel);
            } else {
                CatchUpClient.this.pool.dispose(this.catchUpChannel);
            }
        }
    }
}

