/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.causalclustering.messaging;

import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufAllocator;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.hamcrest.CoreMatchers;
import org.hamcrest.Matcher;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.neo4j.causalclustering.messaging.ReconnectingChannel;
import org.neo4j.causalclustering.net.Server;
import org.neo4j.helpers.ListenSocketAddress;
import org.neo4j.helpers.SocketAddress;
import org.neo4j.logging.Log;
import org.neo4j.logging.NullLogProvider;
import org.neo4j.ports.allocation.PortAuthority;

public class ReconnectingChannelIT {
    private static final int PORT = PortAuthority.allocatePort();
    private static final long DEFAULT_TIMEOUT_MS = 20000L;
    private final Log log = NullLogProvider.getInstance().getLog(this.getClass());
    private final ListenSocketAddress listenAddress = new ListenSocketAddress("localhost", PORT);
    private final Server server = new Server(channel -> {}, this.listenAddress, "test-server");
    private EventLoopGroup elg;
    private ReconnectingChannel channel;
    private AtomicInteger childCount = new AtomicInteger();
    private final ChannelHandler childCounter = new ChannelInitializer<SocketChannel>(){

        protected void initChannel(SocketChannel ch) {
            ch.pipeline().addLast(new ChannelHandler[]{new ChannelInboundHandlerAdapter(){

                public void channelActive(ChannelHandlerContext ctx) {
                    ReconnectingChannelIT.this.childCount.incrementAndGet();
                }

                public void channelInactive(ChannelHandlerContext ctx) {
                    ReconnectingChannelIT.this.childCount.decrementAndGet();
                }
            }});
        }
    };

    @Before
    public void before() {
        this.elg = new NioEventLoopGroup(0);
        Bootstrap bootstrap = (Bootstrap)((Bootstrap)((Bootstrap)new Bootstrap().channel(NioSocketChannel.class)).group(this.elg)).handler(this.childCounter);
        this.channel = new ReconnectingChannel(bootstrap, this.elg.next(), (SocketAddress)this.listenAddress, this.log);
    }

    @After
    public void after() throws Throwable {
        this.elg.shutdownGracefully(0L, 20000L, TimeUnit.MILLISECONDS).awaitUninterruptibly();
        this.server.stop();
    }

    @Test
    public void shouldBeAbleToSendMessage() throws Throwable {
        this.server.start();
        this.channel.start();
        Future fSend = this.channel.writeAndFlush((Object)this.emptyBuffer());
        fSend.get(20000L, TimeUnit.MILLISECONDS);
    }

    @Test
    public void shouldAllowDeferredSend() throws Throwable {
        this.channel.start();
        this.server.start();
        Future fSend = this.channel.writeAndFlush((Object)this.emptyBuffer());
        fSend.get(20000L, TimeUnit.MILLISECONDS);
    }

    @Test(expected=ExecutionException.class)
    public void shouldFailSendWhenNoServer() throws Exception {
        this.channel.start();
        Future fSend = this.channel.writeAndFlush((Object)this.emptyBuffer());
        fSend.get(20000L, TimeUnit.MILLISECONDS);
    }

    @Test
    public void shouldReconnectAfterServerComesBack() throws Throwable {
        this.server.start();
        this.channel.start();
        Future fSend = this.channel.writeAndFlush((Object)this.emptyBuffer());
        fSend.get(20000L, TimeUnit.MILLISECONDS);
        this.server.stop();
        fSend = this.channel.writeAndFlush((Object)this.emptyBuffer());
        try {
            fSend.get(20000L, TimeUnit.MILLISECONDS);
            Assert.fail((String)"Expected failure to send");
        }
        catch (ExecutionException executionException) {
            // empty catch block
        }
        this.server.start();
        fSend = this.channel.writeAndFlush((Object)this.emptyBuffer());
        fSend.get(20000L, TimeUnit.MILLISECONDS);
    }

    @Test
    public void shouldNotAllowSendingOnDisposedChannel() throws Throwable {
        this.server.start();
        this.channel.start();
        Future fSend = this.channel.writeAndFlush((Object)this.emptyBuffer());
        fSend.get(20000L, TimeUnit.MILLISECONDS);
        org.neo4j.test.assertion.Assert.assertEventually(this.childCount::get, (Matcher)CoreMatchers.equalTo((Object)1), (long)20000L, (TimeUnit)TimeUnit.MILLISECONDS);
        this.channel.dispose();
        try {
            this.channel.writeAndFlush((Object)this.emptyBuffer());
        }
        catch (IllegalStateException illegalStateException) {
        }
        org.neo4j.test.assertion.Assert.assertEventually(this.childCount::get, (Matcher)CoreMatchers.equalTo((Object)0), (long)20000L, (TimeUnit)TimeUnit.MILLISECONDS);
    }

    private ByteBuf emptyBuffer() {
        return ByteBufAllocator.DEFAULT.buffer();
    }
}

