/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.causalclustering.core.replication;

import java.util.concurrent.Future;
import java.util.function.BiConsumer;
import org.neo4j.causalclustering.core.consensus.LeaderInfo;
import org.neo4j.causalclustering.core.consensus.LeaderListener;
import org.neo4j.causalclustering.core.consensus.LeaderLocator;
import org.neo4j.causalclustering.core.consensus.NoLeaderFoundException;
import org.neo4j.causalclustering.core.consensus.RaftMessages;
import org.neo4j.causalclustering.core.replication.DistributedOperation;
import org.neo4j.causalclustering.core.replication.Progress;
import org.neo4j.causalclustering.core.replication.ProgressTracker;
import org.neo4j.causalclustering.core.replication.ReplicatedContent;
import org.neo4j.causalclustering.core.replication.ReplicationFailureException;
import org.neo4j.causalclustering.core.replication.Replicator;
import org.neo4j.causalclustering.core.replication.Throttler;
import org.neo4j.causalclustering.core.replication.monitoring.LoggingReplicationMonitor;
import org.neo4j.causalclustering.core.replication.monitoring.ReplicationMonitor;
import org.neo4j.causalclustering.core.replication.session.LocalSessionPool;
import org.neo4j.causalclustering.core.replication.session.OperationContext;
import org.neo4j.causalclustering.helper.TimeoutStrategy;
import org.neo4j.causalclustering.identity.MemberId;
import org.neo4j.causalclustering.messaging.Outbound;
import org.neo4j.kernel.AvailabilityGuard;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.logging.Log;
import org.neo4j.logging.LogProvider;

public class RaftReplicator
implements Replicator,
LeaderListener {
    private final MemberId me;
    private final Outbound<MemberId, RaftMessages.RaftMessage> outbound;
    private final ProgressTracker progressTracker;
    private final LocalSessionPool sessionPool;
    private final TimeoutStrategy progressTimeoutStrategy;
    private final AvailabilityGuard availabilityGuard;
    private final LeaderLocator leaderLocator;
    private final TimeoutStrategy leaderTimeoutStrategy;
    private final Log log;
    private final Throttler throttler;
    private final ReplicationMonitor replicationMonitor;
    private final long availabilityTimeoutMillis;

    public RaftReplicator(LeaderLocator leaderLocator, MemberId me, Outbound<MemberId, RaftMessages.RaftMessage> outbound, LocalSessionPool sessionPool, ProgressTracker progressTracker, TimeoutStrategy progressTimeoutStrategy, TimeoutStrategy leaderTimeoutStrategy, long availabilityTimeoutMillis, AvailabilityGuard availabilityGuard, LogProvider logProvider, long replicationLimit, Monitors monitors) {
        this.me = me;
        this.outbound = outbound;
        this.progressTracker = progressTracker;
        this.sessionPool = sessionPool;
        this.progressTimeoutStrategy = progressTimeoutStrategy;
        this.leaderTimeoutStrategy = leaderTimeoutStrategy;
        this.availabilityTimeoutMillis = availabilityTimeoutMillis;
        this.availabilityGuard = availabilityGuard;
        this.throttler = new Throttler(replicationLimit);
        this.leaderLocator = leaderLocator;
        leaderLocator.registerListener(this);
        this.log = logProvider.getLog(this.getClass());
        monitors.addMonitorListener((Object)new LoggingReplicationMonitor(this.log), new String[0]);
        this.replicationMonitor = (ReplicationMonitor)monitors.newMonitor(ReplicationMonitor.class, new String[0]);
    }

    @Override
    public Future<Object> replicate(ReplicatedContent command, boolean trackResult) throws ReplicationFailureException {
        MemberId originalLeader;
        try {
            originalLeader = this.leaderLocator.getLeader();
        }
        catch (NoLeaderFoundException e) {
            throw new ReplicationFailureException("Replication aborted since no leader was available", e);
        }
        if (command.hasSize()) {
            try {
                return (Future)this.throttler.invoke(() -> this.replicate0(command, trackResult, originalLeader), command.size());
            }
            catch (InterruptedException e) {
                throw new ReplicationFailureException("Interrupted while waiting for replication credits", e);
            }
        }
        return this.replicate0(command, trackResult, originalLeader);
    }

    private Future<Object> replicate0(ReplicatedContent command, boolean trackResult, MemberId leader) throws ReplicationFailureException {
        this.replicationMonitor.startReplication(command);
        try {
            this.assertNoLeaderSwitch(leader);
            OperationContext session = this.sessionPool.acquireSession();
            DistributedOperation operation = new DistributedOperation(command, session.globalSession(), session.localOperationId());
            Progress progress = this.progressTracker.start(operation);
            TimeoutStrategy.Timeout progressTimeout = this.progressTimeoutStrategy.newTimeout();
            TimeoutStrategy.Timeout leaderTimeout = this.leaderTimeoutStrategy.newTimeout();
            try {
                do {
                    this.replicationMonitor.replicationAttempt();
                    this.assertDatabaseAvailable();
                    try {
                        this.outbound.send(leader, new RaftMessages.NewEntry.Request(this.me, operation), true);
                        leaderTimeout = this.leaderTimeoutStrategy.newTimeout();
                        progress.awaitReplication(progressTimeout.getMillis());
                        progressTimeout.increment();
                        leader = this.leaderLocator.getLeader();
                    }
                    catch (NoLeaderFoundException e) {
                        this.log.debug("Could not replicate operation " + operation + " because no leader was found. Retrying.", (Throwable)e);
                        Thread.sleep(leaderTimeout.getMillis());
                        leaderTimeout.increment();
                    }
                } while (!progress.isReplicated());
            }
            catch (InterruptedException e) {
                this.progressTracker.abort(operation);
                throw new ReplicationFailureException("Interrupted while replicating", e);
            }
            BiConsumer<Object, Throwable> cleanup = (ignored1, ignored2) -> this.sessionPool.releaseSession(session);
            if (trackResult) {
                progress.futureResult().whenComplete((BiConsumer)cleanup);
            } else {
                cleanup.accept(null, null);
            }
            this.replicationMonitor.successfulReplication();
            return progress.futureResult();
        }
        catch (Throwable t) {
            this.replicationMonitor.failedReplication(t);
            throw t;
        }
    }

    private void assertNoLeaderSwitch(MemberId originalLeader) throws ReplicationFailureException {
        MemberId currentLeader;
        try {
            currentLeader = this.leaderLocator.getLeader();
        }
        catch (NoLeaderFoundException e) {
            throw new ReplicationFailureException("Replication aborted since no leader was available", e);
        }
        if (!currentLeader.equals(originalLeader)) {
            throw new ReplicationFailureException("Replication aborted since a leader switch was detected");
        }
    }

    @Override
    public void onLeaderSwitch(LeaderInfo leaderInfo) {
        this.progressTracker.triggerReplicationEvent();
    }

    private void assertDatabaseAvailable() throws ReplicationFailureException {
        try {
            this.availabilityGuard.await(this.availabilityTimeoutMillis);
        }
        catch (AvailabilityGuard.UnavailableException e) {
            throw new ReplicationFailureException("Database is not available, transaction cannot be replicated.", e);
        }
    }
}

