/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.causalclustering.catchup;

import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.ByteToMessageDecoder;
import io.netty.handler.codec.MessageToByteEncoder;
import java.nio.channels.ClosedChannelException;
import java.time.Clock;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.neo4j.causalclustering.catchup.CatchUpClient;
import org.neo4j.causalclustering.catchup.CatchUpClientException;
import org.neo4j.causalclustering.catchup.CatchUpResponseAdaptor;
import org.neo4j.causalclustering.catchup.storecopy.GetStoreIdRequest;
import org.neo4j.causalclustering.messaging.CatchUpRequest;
import org.neo4j.causalclustering.net.Server;
import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.helpers.ListenSocketAddress;
import org.neo4j.kernel.lifecycle.LifeSupport;
import org.neo4j.kernel.lifecycle.Lifecycle;
import org.neo4j.kernel.lifecycle.LifecycleException;
import org.neo4j.logging.LogProvider;
import org.neo4j.logging.NullLogProvider;
import org.neo4j.ports.allocation.PortAuthority;

public class CatchUpClientIT {
    private LifeSupport lifeSupport;

    @Before
    public void initLifeCycles() {
        this.lifeSupport = new LifeSupport();
    }

    @After
    public void shutdownLifeSupport() {
        this.lifeSupport.stop();
        this.lifeSupport.shutdown();
    }

    @Test
    public void shouldCloseHandlerIfChannelIsClosedInClient() throws LifecycleException {
        String hostname = "localhost";
        int port = PortAuthority.allocatePort();
        ListenSocketAddress listenSocketAddress = new ListenSocketAddress(hostname, port);
        AtomicBoolean wasClosedByClient = new AtomicBoolean(false);
        Server emptyServer = this.catchupServer(listenSocketAddress, new ChannelHandler[0]);
        CatchUpClient closingClient = this.closingChannelCatchupClient(wasClosedByClient);
        this.lifeSupport.add((Lifecycle)emptyServer);
        this.lifeSupport.add((Lifecycle)closingClient);
        this.lifeSupport.init();
        this.lifeSupport.start();
        this.assertClosedChannelException(hostname, port, closingClient);
        Assert.assertTrue((boolean)wasClosedByClient.get());
    }

    @Test
    public void shouldCloseHandlerIfChannelIsClosedOnServer() {
        String hostname = "localhost";
        int port = PortAuthority.allocatePort();
        ListenSocketAddress listenSocketAddress = new ListenSocketAddress(hostname, port);
        AtomicBoolean wasClosedByServer = new AtomicBoolean(false);
        Server closingChannelServer = this.closingChannelCatchupServer(listenSocketAddress, wasClosedByServer);
        CatchUpClient emptyClient = this.emptyClient();
        this.lifeSupport.add((Lifecycle)closingChannelServer);
        this.lifeSupport.add((Lifecycle)emptyClient);
        this.lifeSupport.init();
        this.lifeSupport.start();
        this.assertClosedChannelException(hostname, port, emptyClient);
        Assert.assertTrue((boolean)wasClosedByServer.get());
    }

    private CatchUpClient emptyClient() {
        return this.catchupClient(new ChannelHandler[]{new MessageToByteEncoder<GetStoreIdRequest>(){

            protected void encode(ChannelHandlerContext channelHandlerContext, GetStoreIdRequest getStoreIdRequest, ByteBuf byteBuf) {
                byteBuf.writeByte(1);
            }
        }});
    }

    private void assertClosedChannelException(String hostname, int port, CatchUpClient closingClient) {
        try {
            closingClient.makeBlockingRequest(new AdvertisedSocketAddress(hostname, port), (CatchUpRequest)new GetStoreIdRequest(), this.neverCompletingAdaptor());
            Assert.fail();
        }
        catch (CatchUpClientException e) {
            Throwable cause = e.getCause();
            Assert.assertEquals(cause.getClass(), ExecutionException.class);
            Throwable actualCause = cause.getCause();
            Assert.assertEquals(actualCause.getClass(), ClosedChannelException.class);
        }
    }

    private CatchUpResponseAdaptor<Object> neverCompletingAdaptor() {
        return new CatchUpResponseAdaptor();
    }

    private CatchUpClient closingChannelCatchupClient(final AtomicBoolean wasClosedByClient) {
        return this.catchupClient(new ChannelHandler[]{new MessageToByteEncoder(){

            protected void encode(ChannelHandlerContext ctx, Object msg, ByteBuf out) {
                wasClosedByClient.set(true);
                ctx.channel().close();
            }
        }});
    }

    private Server closingChannelCatchupServer(ListenSocketAddress listenSocketAddress, final AtomicBoolean wasClosedByServer) {
        return this.catchupServer(listenSocketAddress, new ChannelHandler[]{new ByteToMessageDecoder(){

            protected void decode(ChannelHandlerContext ctx, ByteBuf byteBuf, List<Object> list) {
                wasClosedByServer.set(true);
                ctx.channel().close();
            }
        }});
    }

    private CatchUpClient catchupClient(final ChannelHandler ... channelHandlers) {
        return new CatchUpClient((LogProvider)NullLogProvider.getInstance(), Clock.systemUTC(), 10000L, catchUpResponseHandler -> new ChannelInitializer<SocketChannel>(){

            protected void initChannel(SocketChannel ch) {
                ch.pipeline().addLast(channelHandlers);
            }
        });
    }

    private Server catchupServer(ListenSocketAddress listenSocketAddress, ChannelHandler ... channelHandlers) {
        return new Server(channel -> channel.pipeline().addLast(channelHandlers), listenSocketAddress, "empty-test-server");
    }
}

