/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.causalclustering.core;

import java.time.Instant;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.function.Function;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import org.neo4j.causalclustering.core.BatchingMessageHandler;
import org.neo4j.causalclustering.core.consensus.ContinuousJob;
import org.neo4j.causalclustering.core.consensus.RaftMessages;
import org.neo4j.causalclustering.core.consensus.ReplicatedString;
import org.neo4j.causalclustering.core.replication.ReplicatedContent;
import org.neo4j.causalclustering.identity.ClusterId;
import org.neo4j.causalclustering.messaging.LifecycleMessageHandler;
import org.neo4j.causalclustering.messaging.Message;
import org.neo4j.logging.AssertableLogProvider;
import org.neo4j.logging.LogProvider;
import org.neo4j.logging.NullLogProvider;

public class BatchingMessageHandlerTest {
    private static final int MAX_BATCH = 16;
    private static final int QUEUE_SIZE = 64;
    private final Instant now = Instant.now();
    private LifecycleMessageHandler<RaftMessages.ReceivedInstantClusterIdAwareMessage<?>> downstreamHandler = (LifecycleMessageHandler)Mockito.mock(LifecycleMessageHandler.class);
    private ClusterId localClusterId = new ClusterId(UUID.randomUUID());
    private ContinuousJob mockJob = (ContinuousJob)Mockito.mock(ContinuousJob.class);
    private Function<Runnable, ContinuousJob> jobSchedulerFactory = ignored -> this.mockJob;

    @Test
    public void shouldInvokeInnerHandlerWhenRun() {
        BatchingMessageHandler batchHandler = new BatchingMessageHandler(this.downstreamHandler, 64, 16, this.jobSchedulerFactory, (LogProvider)NullLogProvider.getInstance());
        RaftMessages.ReceivedInstantClusterIdAwareMessage message = RaftMessages.ReceivedInstantClusterIdAwareMessage.of((Instant)this.now, (ClusterId)this.localClusterId, (RaftMessages.RaftMessage)new RaftMessages.NewEntry.Request(null, null));
        batchHandler.handle(message);
        Mockito.verifyZeroInteractions((Object[])new Object[]{this.downstreamHandler});
        batchHandler.run();
        ((LifecycleMessageHandler)Mockito.verify(this.downstreamHandler)).handle((Message)message);
    }

    @Test
    public void shouldInvokeHandlerOnQueuedMessage() throws Throwable {
        BatchingMessageHandler batchHandler = new BatchingMessageHandler(this.downstreamHandler, 64, 16, this.jobSchedulerFactory, (LogProvider)NullLogProvider.getInstance());
        RaftMessages.ReceivedInstantClusterIdAwareMessage message = RaftMessages.ReceivedInstantClusterIdAwareMessage.of((Instant)this.now, (ClusterId)this.localClusterId, (RaftMessages.RaftMessage)new RaftMessages.NewEntry.Request(null, null));
        ExecutorService executor = Executors.newCachedThreadPool();
        Future<?> future = executor.submit((Runnable)batchHandler);
        Thread.sleep(50L);
        batchHandler.handle(message);
        future.get();
        ((LifecycleMessageHandler)Mockito.verify(this.downstreamHandler)).handle((Message)message);
    }

    @Test
    public void shouldBatchRequests() {
        BatchingMessageHandler batchHandler = new BatchingMessageHandler(this.downstreamHandler, 64, 16, this.jobSchedulerFactory, (LogProvider)NullLogProvider.getInstance());
        ReplicatedString contentA = new ReplicatedString("A");
        ReplicatedString contentB = new ReplicatedString("B");
        RaftMessages.NewEntry.Request messageA = new RaftMessages.NewEntry.Request(null, (ReplicatedContent)contentA);
        RaftMessages.NewEntry.Request messageB = new RaftMessages.NewEntry.Request(null, (ReplicatedContent)contentB);
        batchHandler.handle(RaftMessages.ReceivedInstantClusterIdAwareMessage.of((Instant)this.now, (ClusterId)this.localClusterId, (RaftMessages.RaftMessage)messageA));
        batchHandler.handle(RaftMessages.ReceivedInstantClusterIdAwareMessage.of((Instant)this.now, (ClusterId)this.localClusterId, (RaftMessages.RaftMessage)messageB));
        Mockito.verifyZeroInteractions((Object[])new Object[]{this.downstreamHandler});
        batchHandler.run();
        RaftMessages.NewEntry.BatchRequest batchRequest = new RaftMessages.NewEntry.BatchRequest(2);
        batchRequest.add((ReplicatedContent)contentA);
        batchRequest.add((ReplicatedContent)contentB);
        ((LifecycleMessageHandler)Mockito.verify(this.downstreamHandler)).handle((Message)RaftMessages.ReceivedInstantClusterIdAwareMessage.of((Instant)this.now, (ClusterId)this.localClusterId, (RaftMessages.RaftMessage)batchRequest));
    }

    @Test
    public void shouldBatchUsingReceivedInstantOfFirstReceivedMessage() {
        BatchingMessageHandler batchHandler = new BatchingMessageHandler(this.downstreamHandler, 64, 16, this.jobSchedulerFactory, (LogProvider)NullLogProvider.getInstance());
        ReplicatedString content = new ReplicatedString("A");
        RaftMessages.NewEntry.Request messageA = new RaftMessages.NewEntry.Request(null, (ReplicatedContent)content);
        Instant firstReceived = Instant.ofEpochMilli(1L);
        Instant secondReceived = firstReceived.plusMillis(1L);
        batchHandler.handle(RaftMessages.ReceivedInstantClusterIdAwareMessage.of((Instant)firstReceived, (ClusterId)this.localClusterId, (RaftMessages.RaftMessage)messageA));
        batchHandler.handle(RaftMessages.ReceivedInstantClusterIdAwareMessage.of((Instant)secondReceived, (ClusterId)this.localClusterId, (RaftMessages.RaftMessage)messageA));
        batchHandler.run();
        RaftMessages.NewEntry.BatchRequest batchRequest = new RaftMessages.NewEntry.BatchRequest(2);
        batchRequest.add((ReplicatedContent)content);
        batchRequest.add((ReplicatedContent)content);
        ((LifecycleMessageHandler)Mockito.verify(this.downstreamHandler)).handle((Message)RaftMessages.ReceivedInstantClusterIdAwareMessage.of((Instant)firstReceived, (ClusterId)this.localClusterId, (RaftMessages.RaftMessage)batchRequest));
    }

    @Test
    public void shouldBatchNewEntriesAndHandleOtherMessagesSingularly() {
        BatchingMessageHandler batchHandler = new BatchingMessageHandler(this.downstreamHandler, 64, 16, this.jobSchedulerFactory, (LogProvider)NullLogProvider.getInstance());
        ReplicatedString contentA = new ReplicatedString("A");
        ReplicatedString contentC = new ReplicatedString("C");
        RaftMessages.ReceivedInstantClusterIdAwareMessage messageA = RaftMessages.ReceivedInstantClusterIdAwareMessage.of((Instant)this.now, (ClusterId)this.localClusterId, (RaftMessages.RaftMessage)new RaftMessages.NewEntry.Request(null, (ReplicatedContent)contentA));
        RaftMessages.ReceivedInstantClusterIdAwareMessage messageB = RaftMessages.ReceivedInstantClusterIdAwareMessage.of((Instant)this.now, (ClusterId)this.localClusterId, (RaftMessages.RaftMessage)new RaftMessages.Heartbeat(null, 0L, 0L, 0L));
        RaftMessages.ReceivedInstantClusterIdAwareMessage messageC = RaftMessages.ReceivedInstantClusterIdAwareMessage.of((Instant)this.now, (ClusterId)this.localClusterId, (RaftMessages.RaftMessage)new RaftMessages.NewEntry.Request(null, (ReplicatedContent)contentC));
        RaftMessages.ReceivedInstantClusterIdAwareMessage messageD = RaftMessages.ReceivedInstantClusterIdAwareMessage.of((Instant)this.now, (ClusterId)this.localClusterId, (RaftMessages.RaftMessage)new RaftMessages.Heartbeat(null, 1L, 1L, 1L));
        batchHandler.handle(messageA);
        batchHandler.handle(messageB);
        batchHandler.handle(messageC);
        batchHandler.handle(messageD);
        Mockito.verifyZeroInteractions((Object[])new Object[]{this.downstreamHandler});
        batchHandler.run();
        RaftMessages.NewEntry.BatchRequest batchRequest = new RaftMessages.NewEntry.BatchRequest(2);
        batchRequest.add((ReplicatedContent)contentA);
        batchRequest.add((ReplicatedContent)contentC);
        ((LifecycleMessageHandler)Mockito.verify(this.downstreamHandler)).handle((Message)RaftMessages.ReceivedInstantClusterIdAwareMessage.of((Instant)this.now, (ClusterId)this.localClusterId, (RaftMessages.RaftMessage)batchRequest));
        ((LifecycleMessageHandler)Mockito.verify(this.downstreamHandler)).handle((Message)messageB);
        ((LifecycleMessageHandler)Mockito.verify(this.downstreamHandler)).handle((Message)messageD);
    }

    @Test
    public void shouldDropMessagesAfterBeingStopped() throws Throwable {
        AssertableLogProvider logProvider = new AssertableLogProvider();
        BatchingMessageHandler batchHandler = new BatchingMessageHandler(this.downstreamHandler, 64, 16, this.jobSchedulerFactory, (LogProvider)logProvider);
        RaftMessages.ReceivedInstantClusterIdAwareMessage message = RaftMessages.ReceivedInstantClusterIdAwareMessage.of((Instant)this.now, (ClusterId)this.localClusterId, (RaftMessages.RaftMessage)new RaftMessages.NewEntry.Request(null, null));
        batchHandler.stop();
        batchHandler.handle(message);
        batchHandler.run();
        ((LifecycleMessageHandler)Mockito.verify(this.downstreamHandler, (VerificationMode)Mockito.never())).handle((Message)ArgumentMatchers.any(RaftMessages.ReceivedInstantClusterIdAwareMessage.class));
        logProvider.assertAtLeastOnce(new AssertableLogProvider.LogMatcher[]{AssertableLogProvider.inLog(BatchingMessageHandler.class).debug("This handler has been stopped, dropping the message: %s", new Object[]{message})});
    }

    @Test(timeout=5000L)
    public void shouldGiveUpAddingMessagesInTheQueueIfTheHandlerHasBeenStopped() throws Throwable {
        int queueSize = 1;
        BatchingMessageHandler batchHandler = new BatchingMessageHandler(this.downstreamHandler, queueSize, 16, this.jobSchedulerFactory, (LogProvider)NullLogProvider.getInstance());
        RaftMessages.ReceivedInstantClusterIdAwareMessage message = RaftMessages.ReceivedInstantClusterIdAwareMessage.of((Instant)this.now, (ClusterId)this.localClusterId, (RaftMessages.RaftMessage)new RaftMessages.NewEntry.Request(null, null));
        batchHandler.handle(message);
        CountDownLatch latch = new CountDownLatch(1);
        Thread thread = new Thread(() -> {
            latch.countDown();
            batchHandler.handle(message);
        });
        thread.start();
        latch.await();
        batchHandler.stop();
        thread.join();
    }

    @Test
    public void shouldDelegateStart() throws Throwable {
        BatchingMessageHandler batchHandler = new BatchingMessageHandler(this.downstreamHandler, 64, 16, this.jobSchedulerFactory, (LogProvider)NullLogProvider.getInstance());
        ClusterId clusterId = new ClusterId(UUID.randomUUID());
        batchHandler.start(clusterId);
        ((LifecycleMessageHandler)Mockito.verify(this.downstreamHandler)).start(clusterId);
    }

    @Test
    public void shouldDelegateStop() throws Throwable {
        BatchingMessageHandler batchHandler = new BatchingMessageHandler(this.downstreamHandler, 64, 16, this.jobSchedulerFactory, (LogProvider)NullLogProvider.getInstance());
        batchHandler.stop();
        ((LifecycleMessageHandler)Mockito.verify(this.downstreamHandler)).stop();
    }

    @Test
    public void shouldStartJob() throws Throwable {
        BatchingMessageHandler batchHandler = new BatchingMessageHandler(this.downstreamHandler, 64, 16, this.jobSchedulerFactory, (LogProvider)NullLogProvider.getInstance());
        ClusterId clusterId = new ClusterId(UUID.randomUUID());
        batchHandler.start(clusterId);
        ((ContinuousJob)Mockito.verify((Object)this.mockJob)).start();
    }

    @Test
    public void shouldStopJob() throws Throwable {
        BatchingMessageHandler batchHandler = new BatchingMessageHandler(this.downstreamHandler, 64, 16, this.jobSchedulerFactory, (LogProvider)NullLogProvider.getInstance());
        batchHandler.stop();
        ((ContinuousJob)Mockito.verify((Object)this.mockJob)).stop();
    }
}

