/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.causalclustering.catchup.tx;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.ArgumentMatchers;
import org.mockito.Mockito;
import org.mockito.verification.VerificationMode;
import org.neo4j.causalclustering.catchup.CatchUpClient;
import org.neo4j.causalclustering.catchup.CatchUpResponseCallback;
import org.neo4j.causalclustering.catchup.CatchupAddressProvider;
import org.neo4j.causalclustering.catchup.CatchupResult;
import org.neo4j.causalclustering.catchup.storecopy.LocalDatabase;
import org.neo4j.causalclustering.catchup.storecopy.StoreCopyProcess;
import org.neo4j.causalclustering.catchup.tx.BatchingTxApplier;
import org.neo4j.causalclustering.catchup.tx.CatchupPollingProcess;
import org.neo4j.causalclustering.catchup.tx.TxPullRequest;
import org.neo4j.causalclustering.catchup.tx.TxPullResponse;
import org.neo4j.causalclustering.catchup.tx.TxStreamFinishedResponse;
import org.neo4j.causalclustering.core.consensus.schedule.CountingTimerService;
import org.neo4j.causalclustering.core.consensus.schedule.Timer;
import org.neo4j.causalclustering.core.consensus.schedule.TimerService;
import org.neo4j.causalclustering.discovery.TopologyService;
import org.neo4j.causalclustering.helper.Suspendable;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.causalclustering.identity.StoreId;
import org.neo4j.causalclustering.messaging.CatchUpRequest;
import org.neo4j.causalclustering.upstream.UpstreamDatabaseStrategySelector;
import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.helpers.collection.Iterables;
import org.neo4j.kernel.impl.transaction.log.TransactionIdStore;
import org.neo4j.kernel.internal.DatabaseHealth;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.LogProvider;
import org.neo4j.logging.NullLogProvider;
import org.neo4j.scheduler.JobScheduler;
import org.neo4j.test.FakeClockJobScheduler;

public class CatchupPollingProcessTest {
    private final CatchUpClient catchUpClient = (CatchUpClient)Mockito.mock(CatchUpClient.class);
    private final UpstreamDatabaseStrategySelector strategyPipeline = (UpstreamDatabaseStrategySelector)Mockito.mock(UpstreamDatabaseStrategySelector.class);
    private final MemberId coreMemberId = (MemberId)Mockito.mock(MemberId.class);
    private final TransactionIdStore idStore = (TransactionIdStore)Mockito.mock(TransactionIdStore.class);
    private final BatchingTxApplier txApplier = (BatchingTxApplier)Mockito.mock(BatchingTxApplier.class);
    private final FakeClockJobScheduler scheduler = new FakeClockJobScheduler();
    private final CountingTimerService timerService = new CountingTimerService((JobScheduler)this.scheduler, (LogProvider)NullLogProvider.getInstance());
    private final long txPullIntervalMillis = 100L;
    private final StoreCopyProcess storeCopyProcess = (StoreCopyProcess)Mockito.mock(StoreCopyProcess.class);
    private final StoreId storeId = new StoreId(1L, 2L, 3L, 4L);
    private final LocalDatabase localDatabase = (LocalDatabase)Mockito.mock(LocalDatabase.class);
    private final TopologyService topologyService = (TopologyService)Mockito.mock(TopologyService.class);
    private final AdvertisedSocketAddress coreMemberAddress = new AdvertisedSocketAddress("hostname", 1234);
    private final CatchupAddressProvider catchupAddressProvider = CatchupAddressProvider.fromSingleAddress((AdvertisedSocketAddress)this.coreMemberAddress);
    private final Suspendable startStopOnStoreCopy;
    private final CatchupPollingProcess txPuller;

    public CatchupPollingProcessTest() {
        Mockito.when((Object)this.localDatabase.storeId()).thenReturn((Object)this.storeId);
        Mockito.when((Object)this.topologyService.findCatchupAddress(this.coreMemberId)).thenReturn(Optional.of(this.coreMemberAddress));
        this.startStopOnStoreCopy = (Suspendable)Mockito.mock(Suspendable.class);
        this.txPuller = new CatchupPollingProcess((LogProvider)NullLogProvider.getInstance(), this.localDatabase, this.startStopOnStoreCopy, this.catchUpClient, this.strategyPipeline, (TimerService)this.timerService, 100L, this.txApplier, new Monitors(), this.storeCopyProcess, () -> (DatabaseHealth)Mockito.mock(DatabaseHealth.class), this.topologyService);
    }

    @Before
    public void before() throws Throwable {
        Mockito.when((Object)this.idStore.getLastCommittedTransactionId()).thenReturn((Object)1L);
        Mockito.when((Object)this.strategyPipeline.bestUpstreamDatabase()).thenReturn((Object)this.coreMemberId);
    }

    @Test
    public void shouldSendPullRequestOnTick() throws Throwable {
        this.txPuller.start();
        long lastAppliedTxId = 99L;
        Mockito.when((Object)this.txApplier.lastQueuedTxId()).thenReturn((Object)lastAppliedTxId);
        this.timerService.invoke((TimerService.TimerName)CatchupPollingProcess.Timers.TX_PULLER_TIMER);
        ((CatchUpClient)Mockito.verify((Object)this.catchUpClient)).makeBlockingRequest((AdvertisedSocketAddress)ArgumentMatchers.any(AdvertisedSocketAddress.class), (CatchUpRequest)ArgumentMatchers.any(TxPullRequest.class), (CatchUpResponseCallback)ArgumentMatchers.any(CatchUpResponseCallback.class));
    }

    @Test
    public void shouldKeepMakingPullRequestsUntilEndOfStream() throws Throwable {
        this.txPuller.start();
        long lastAppliedTxId = 99L;
        Mockito.when((Object)this.txApplier.lastQueuedTxId()).thenReturn((Object)lastAppliedTxId);
        Mockito.when((Object)this.catchUpClient.makeBlockingRequest((AdvertisedSocketAddress)ArgumentMatchers.any(AdvertisedSocketAddress.class), (CatchUpRequest)ArgumentMatchers.any(TxPullRequest.class), (CatchUpResponseCallback)ArgumentMatchers.any(CatchUpResponseCallback.class))).thenReturn((Object)new TxStreamFinishedResponse(CatchupResult.SUCCESS_END_OF_BATCH, 10L), (Object[])new TxStreamFinishedResponse[]{new TxStreamFinishedResponse(CatchupResult.SUCCESS_END_OF_STREAM, 10L)});
        this.timerService.invoke((TimerService.TimerName)CatchupPollingProcess.Timers.TX_PULLER_TIMER);
        ((CatchUpClient)Mockito.verify((Object)this.catchUpClient, (VerificationMode)Mockito.times((int)2))).makeBlockingRequest((AdvertisedSocketAddress)ArgumentMatchers.any(AdvertisedSocketAddress.class), (CatchUpRequest)ArgumentMatchers.any(TxPullRequest.class), (CatchUpResponseCallback)ArgumentMatchers.any(CatchUpResponseCallback.class));
    }

    @Test
    public void shouldRenewTxPullTimeoutOnSuccessfulTxPulling() throws Throwable {
        this.txPuller.start();
        Mockito.when((Object)this.catchUpClient.makeBlockingRequest((AdvertisedSocketAddress)ArgumentMatchers.any(AdvertisedSocketAddress.class), (CatchUpRequest)ArgumentMatchers.any(TxPullRequest.class), (CatchUpResponseCallback)ArgumentMatchers.any(CatchUpResponseCallback.class))).thenReturn((Object)new TxStreamFinishedResponse(CatchupResult.SUCCESS_END_OF_STREAM, 0L));
        this.timerService.invoke((TimerService.TimerName)CatchupPollingProcess.Timers.TX_PULLER_TIMER);
        Assert.assertEquals((long)1L, (long)this.timerService.invocationCount((TimerService.TimerName)CatchupPollingProcess.Timers.TX_PULLER_TIMER));
    }

    @Test
    public void nextStateShouldBeStoreCopyingIfRequestedTransactionHasBeenPrunedAway() throws Throwable {
        this.txPuller.start();
        Mockito.when((Object)this.catchUpClient.makeBlockingRequest((AdvertisedSocketAddress)ArgumentMatchers.any(AdvertisedSocketAddress.class), (CatchUpRequest)ArgumentMatchers.any(TxPullRequest.class), (CatchUpResponseCallback)ArgumentMatchers.any(CatchUpResponseCallback.class))).thenReturn((Object)new TxStreamFinishedResponse(CatchupResult.E_TRANSACTION_PRUNED, 0L));
        this.timerService.invoke((TimerService.TimerName)CatchupPollingProcess.Timers.TX_PULLER_TIMER);
        Assert.assertEquals((Object)CatchupPollingProcess.State.STORE_COPYING, (Object)this.txPuller.state());
    }

    @Test
    public void nextStateShouldBeTxPullingAfterASuccessfulStoreCopy() throws Throwable {
        this.txPuller.start();
        Mockito.when((Object)this.catchUpClient.makeBlockingRequest((AdvertisedSocketAddress)ArgumentMatchers.any(AdvertisedSocketAddress.class), (CatchUpRequest)ArgumentMatchers.any(TxPullRequest.class), (CatchUpResponseCallback)ArgumentMatchers.any(CatchUpResponseCallback.class))).thenReturn((Object)new TxStreamFinishedResponse(CatchupResult.E_TRANSACTION_PRUNED, 0L));
        this.timerService.invoke((TimerService.TimerName)CatchupPollingProcess.Timers.TX_PULLER_TIMER);
        this.timerService.invoke((TimerService.TimerName)CatchupPollingProcess.Timers.TX_PULLER_TIMER);
        ((LocalDatabase)Mockito.verify((Object)this.localDatabase)).stopForStoreCopy();
        ((Suspendable)Mockito.verify((Object)this.startStopOnStoreCopy)).disable();
        ((StoreCopyProcess)Mockito.verify((Object)this.storeCopyProcess)).replaceWithStoreFrom((CatchupAddressProvider)ArgumentMatchers.any(CatchupAddressProvider.class), (StoreId)ArgumentMatchers.eq((Object)this.storeId));
        ((LocalDatabase)Mockito.verify((Object)this.localDatabase)).start();
        ((Suspendable)Mockito.verify((Object)this.startStopOnStoreCopy)).enable();
        ((BatchingTxApplier)Mockito.verify((Object)this.txApplier)).refreshFromNewStore();
        Assert.assertEquals((Object)CatchupPollingProcess.State.TX_PULLING, (Object)this.txPuller.state());
    }

    @Test
    public void shouldNotRenewTheTimeoutIfInPanicState() {
        this.txPuller.start();
        CatchUpResponseCallback callback = (CatchUpResponseCallback)Mockito.mock(CatchUpResponseCallback.class);
        ((CatchUpResponseCallback)Mockito.doThrow((Throwable[])new Throwable[]{new RuntimeException("Panic all the things")}).when((Object)callback)).onTxPullResponse((CompletableFuture)ArgumentMatchers.any(CompletableFuture.class), (TxPullResponse)ArgumentMatchers.any(TxPullResponse.class));
        Timer timer = (Timer)Mockito.spy((Object)Iterables.single((Iterable)this.timerService.getTimers((TimerService.TimerName)CatchupPollingProcess.Timers.TX_PULLER_TIMER)));
        this.timerService.invoke((TimerService.TimerName)CatchupPollingProcess.Timers.TX_PULLER_TIMER);
        Assert.assertEquals((Object)CatchupPollingProcess.State.PANIC, (Object)this.txPuller.state());
        ((Timer)Mockito.verify((Object)timer, (VerificationMode)Mockito.never())).reset();
    }

    @Test
    public void shouldNotSignalOperationalUntilPulling() throws Throwable {
        Mockito.when((Object)this.catchUpClient.makeBlockingRequest((AdvertisedSocketAddress)ArgumentMatchers.any(AdvertisedSocketAddress.class), (CatchUpRequest)ArgumentMatchers.any(TxPullRequest.class), (CatchUpResponseCallback)ArgumentMatchers.any(CatchUpResponseCallback.class))).thenReturn((Object)new TxStreamFinishedResponse(CatchupResult.E_TRANSACTION_PRUNED, 0L), (Object[])new TxStreamFinishedResponse[]{new TxStreamFinishedResponse(CatchupResult.SUCCESS_END_OF_BATCH, 10L), new TxStreamFinishedResponse(CatchupResult.SUCCESS_END_OF_STREAM, 15L)});
        this.txPuller.start();
        Future operationalFuture = this.txPuller.upToDateFuture();
        Assert.assertFalse((boolean)operationalFuture.isDone());
        this.timerService.invoke((TimerService.TimerName)CatchupPollingProcess.Timers.TX_PULLER_TIMER);
        Assert.assertFalse((boolean)operationalFuture.isDone());
        this.timerService.invoke((TimerService.TimerName)CatchupPollingProcess.Timers.TX_PULLER_TIMER);
        Assert.assertFalse((boolean)operationalFuture.isDone());
        this.timerService.invoke((TimerService.TimerName)CatchupPollingProcess.Timers.TX_PULLER_TIMER);
        Assert.assertTrue((boolean)operationalFuture.isDone());
        Assert.assertTrue((boolean)((Boolean)operationalFuture.get()));
        Assert.assertEquals((Object)CatchupPollingProcess.State.TX_PULLING, (Object)this.txPuller.state());
    }
}

