/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.causalclustering.discovery;

import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.function.IntFunction;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.neo4j.causalclustering.core.CausalClusteringSettings;
import org.neo4j.causalclustering.core.CoreGraphDatabase;
import org.neo4j.causalclustering.core.consensus.NoLeaderFoundException;
import org.neo4j.causalclustering.core.consensus.roles.Role;
import org.neo4j.causalclustering.core.state.machines.id.IdGenerationException;
import org.neo4j.causalclustering.discovery.ClusterMember;
import org.neo4j.causalclustering.discovery.CoreClusterMember;
import org.neo4j.causalclustering.discovery.CoreTopologyService;
import org.neo4j.causalclustering.discovery.DiscoveryServiceFactory;
import org.neo4j.causalclustering.discovery.IpFamily;
import org.neo4j.causalclustering.discovery.ReadReplica;
import org.neo4j.causalclustering.helper.ErrorHandler;
import org.neo4j.causalclustering.readreplica.ReadReplicaGraphDatabase;
import org.neo4j.concurrent.Futures;
import org.neo4j.function.Predicates;
import org.neo4j.function.ThrowingSupplier;
import org.neo4j.graphdb.DatabaseShutdownException;
import org.neo4j.graphdb.GraphDatabaseService;
import org.neo4j.graphdb.Transaction;
import org.neo4j.graphdb.security.WriteOperationsNotAllowedException;
import org.neo4j.helpers.AdvertisedSocketAddress;
import org.neo4j.helpers.collection.Iterables;
import org.neo4j.internal.kernel.api.exceptions.TransactionFailureException;
import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.kernel.internal.DatabaseHealth;
import org.neo4j.kernel.internal.GraphDatabaseAPI;
import org.neo4j.kernel.monitoring.Monitors;
import org.neo4j.ports.allocation.PortAuthority;
import org.neo4j.storageengine.api.lock.AcquireLockTimeoutException;
import org.neo4j.test.DbRepresentation;

public class Cluster {
    private static final int DEFAULT_TIMEOUT_MS = 120000;
    private static final int DEFAULT_CLUSTER_SIZE = 3;
    protected final File parentDir;
    private final Map<String, String> coreParams;
    private final Map<String, IntFunction<String>> instanceCoreParams;
    private final Map<String, String> readReplicaParams;
    private final Map<String, IntFunction<String>> instanceReadReplicaParams;
    private final String recordFormat;
    protected final DiscoveryServiceFactory discoveryServiceFactory;
    protected final String listenAddress;
    protected final String advertisedAddress;
    private final Set<String> dbNames;
    private Map<Integer, CoreClusterMember> coreMembers = new ConcurrentHashMap<Integer, CoreClusterMember>();
    private Map<Integer, ReadReplica> readReplicas = new ConcurrentHashMap<Integer, ReadReplica>();
    private int highestCoreServerId;
    private int highestReplicaServerId;

    public Cluster(File parentDir, int noOfCoreMembers, int noOfReadReplicas, DiscoveryServiceFactory discoveryServiceFactory, Map<String, String> coreParams, Map<String, IntFunction<String>> instanceCoreParams, Map<String, String> readReplicaParams, Map<String, IntFunction<String>> instanceReadReplicaParams, String recordFormat, IpFamily ipFamily, boolean useWildcard) {
        this(parentDir, noOfCoreMembers, noOfReadReplicas, discoveryServiceFactory, coreParams, instanceCoreParams, readReplicaParams, instanceReadReplicaParams, recordFormat, ipFamily, useWildcard, Collections.singleton(CausalClusteringSettings.database.getDefaultValue()));
    }

    public Cluster(File parentDir, int noOfCoreMembers, int noOfReadReplicas, DiscoveryServiceFactory discoveryServiceFactory, Map<String, String> coreParams, Map<String, IntFunction<String>> instanceCoreParams, Map<String, String> readReplicaParams, Map<String, IntFunction<String>> instanceReadReplicaParams, String recordFormat, IpFamily ipFamily, boolean useWildcard, Set<String> dbNames) {
        this.discoveryServiceFactory = discoveryServiceFactory;
        this.parentDir = parentDir;
        this.coreParams = coreParams;
        this.instanceCoreParams = instanceCoreParams;
        this.readReplicaParams = readReplicaParams;
        this.instanceReadReplicaParams = instanceReadReplicaParams;
        this.recordFormat = recordFormat;
        this.listenAddress = useWildcard ? ipFamily.wildcardAddress() : ipFamily.localhostAddress();
        this.advertisedAddress = ipFamily.localhostName();
        List<AdvertisedSocketAddress> initialHosts = this.initialHosts(noOfCoreMembers);
        this.createCoreMembers(noOfCoreMembers, initialHosts, coreParams, instanceCoreParams, recordFormat);
        this.createReadReplicas(noOfReadReplicas, initialHosts, readReplicaParams, instanceReadReplicaParams, recordFormat);
        this.dbNames = dbNames;
    }

    private List<AdvertisedSocketAddress> initialHosts(int noOfCoreMembers) {
        return IntStream.range(0, noOfCoreMembers).mapToObj(ignored -> PortAuthority.allocatePort()).map(port -> new AdvertisedSocketAddress(this.advertisedAddress, port.intValue())).collect(Collectors.toList());
    }

    public void start() throws InterruptedException, ExecutionException {
        this.startCoreMembers();
        this.startReadReplicas();
    }

    public Set<CoreClusterMember> healthyCoreMembers() {
        return this.coreMembers.values().stream().filter(db -> ((DatabaseHealth)db.database().getDependencyResolver().resolveDependency(DatabaseHealth.class)).isHealthy()).collect(Collectors.toSet());
    }

    public CoreClusterMember getCoreMemberById(int memberId) {
        return this.coreMembers.get(memberId);
    }

    public ReadReplica getReadReplicaById(int memberId) {
        return this.readReplicas.get(memberId);
    }

    public CoreClusterMember addCoreMemberWithId(int memberId) {
        return this.addCoreMemberWithId(memberId, this.coreParams, this.instanceCoreParams, this.recordFormat);
    }

    public CoreClusterMember newCoreMember() {
        int newCoreServerId = ++this.highestCoreServerId;
        return this.addCoreMemberWithId(newCoreServerId);
    }

    public ReadReplica newReadReplica() {
        int newReplicaServerId = ++this.highestReplicaServerId;
        return this.addReadReplicaWithId(newReplicaServerId);
    }

    private CoreClusterMember addCoreMemberWithId(int memberId, Map<String, String> extraParams, Map<String, IntFunction<String>> instanceExtraParams, String recordFormat) {
        List<AdvertisedSocketAddress> initialHosts = this.extractInitialHosts(this.coreMembers);
        CoreClusterMember coreClusterMember = this.createCoreClusterMember(memberId, PortAuthority.allocatePort(), 3, initialHosts, recordFormat, extraParams, instanceExtraParams);
        this.coreMembers.put(memberId, coreClusterMember);
        return coreClusterMember;
    }

    public ReadReplica addReadReplicaWithIdAndRecordFormat(int memberId, String recordFormat) {
        return this.addReadReplica(memberId, recordFormat, new Monitors());
    }

    public ReadReplica addReadReplicaWithId(int memberId) {
        return this.addReadReplicaWithIdAndRecordFormat(memberId, this.recordFormat);
    }

    public ReadReplica addReadReplicaWithIdAndMonitors(int memberId, Monitors monitors) {
        return this.addReadReplica(memberId, this.recordFormat, monitors);
    }

    private ReadReplica addReadReplica(int memberId, String recordFormat, Monitors monitors) {
        List<AdvertisedSocketAddress> initialHosts = this.extractInitialHosts(this.coreMembers);
        ReadReplica member = this.createReadReplica(memberId, initialHosts, this.readReplicaParams, this.instanceReadReplicaParams, recordFormat, monitors);
        this.readReplicas.put(memberId, member);
        return member;
    }

    public void shutdown() {
        try (ErrorHandler errorHandler = new ErrorHandler("Error when trying to shutdown cluster");){
            this.shutdownCoreMembers(this.coreMembers(), errorHandler);
            this.shutdownReadReplicas(errorHandler);
        }
    }

    private void shutdownCoreMembers(Collection<CoreClusterMember> members, ErrorHandler errorHandler) {
        this.shutdownMembers(members, errorHandler);
    }

    public void shutdownCoreMembers() {
        this.shutdownCoreMembers(this.coreMembers());
    }

    public void shutdownCoreMember(CoreClusterMember member) {
        this.shutdownCoreMembers(Collections.singleton(member));
    }

    public void shutdownCoreMembers(Collection<CoreClusterMember> members) {
        try (ErrorHandler errorHandler = new ErrorHandler("Error when trying to shutdown core members");){
            this.shutdownCoreMembers(members, errorHandler);
        }
    }

    private void shutdownMembers(Collection<? extends ClusterMember> clusterMembers, ErrorHandler errorHandler) {
        try {
            Futures.combine(this.invokeAll("cluster-shutdown", clusterMembers, cm -> {
                cm.shutdown();
                return null;
            })).get();
        }
        catch (Exception e) {
            errorHandler.add((Throwable)e);
        }
    }

    private <X extends GraphDatabaseAPI, T extends ClusterMember<X>, R> List<Future<R>> invokeAll(String threadName, Collection<T> members, Function<T, R> call) {
        ArrayList<Future<R>> list = new ArrayList<Future<R>>(members.size());
        int threadNumber = 0;
        for (ClusterMember member : members) {
            FutureTask<Object> task = new FutureTask<Object>(() -> call.apply(member));
            ThreadGroup threadGroup = member.threadGroup();
            Thread thread = new Thread(threadGroup, task, threadName + "-" + threadNumber);
            thread.start();
            ++threadNumber;
            list.add(task);
        }
        return list;
    }

    public void removeCoreMemberWithServerId(int serverId) {
        CoreClusterMember memberToRemove = this.getCoreMemberById(serverId);
        if (memberToRemove == null) {
            throw new RuntimeException("Could not remove core member with id " + serverId);
        }
        memberToRemove.shutdown();
        this.removeCoreMember(memberToRemove);
    }

    public void removeCoreMember(CoreClusterMember memberToRemove) {
        memberToRemove.shutdown();
        this.coreMembers.values().remove(memberToRemove);
    }

    public void removeReadReplicaWithMemberId(int memberId) {
        ReadReplica memberToRemove = this.getReadReplicaById(memberId);
        if (memberToRemove == null) {
            throw new RuntimeException("Could not remove core member with member id " + memberId);
        }
        this.removeReadReplica(memberToRemove);
    }

    private void removeReadReplica(ReadReplica memberToRemove) {
        memberToRemove.shutdown();
        this.readReplicas.values().remove(memberToRemove);
    }

    public Collection<CoreClusterMember> coreMembers() {
        return this.coreMembers.values();
    }

    public Collection<ReadReplica> readReplicas() {
        return this.readReplicas.values();
    }

    public ReadReplica findAnyReadReplica() {
        return (ReadReplica)Iterables.firstOrNull(this.readReplicas.values());
    }

    private void ensureDBName(String dbName) throws IllegalArgumentException {
        if (!this.dbNames.contains(dbName)) {
            throw new IllegalArgumentException("Database name " + dbName + " does not exist in this cluster.");
        }
    }

    public CoreClusterMember getMemberWithRole(Role role) {
        return this.getMemberWithAnyRole(role);
    }

    public List<CoreClusterMember> getAllMembersWithRole(Role role) {
        return this.getAllMembersWithAnyRole(role);
    }

    public CoreClusterMember getMemberWithRole(String dbName, Role role) {
        return this.getMemberWithAnyRole(dbName, role);
    }

    public List<CoreClusterMember> getAllMembersWithRole(String dbName, Role role) {
        return this.getAllMembersWithAnyRole(dbName, role);
    }

    public CoreClusterMember getMemberWithAnyRole(Role ... roles) {
        String dbName = CausalClusteringSettings.database.getDefaultValue();
        return this.getMemberWithAnyRole(dbName, roles);
    }

    public List<CoreClusterMember> getAllMembersWithAnyRole(Role ... roles) {
        String dbName = CausalClusteringSettings.database.getDefaultValue();
        return this.getAllMembersWithAnyRole(dbName, roles);
    }

    public CoreClusterMember getMemberWithAnyRole(String dbName, Role ... roles) {
        return this.getAllMembersWithAnyRole(dbName, roles).stream().findFirst().orElse(null);
    }

    public List<CoreClusterMember> getAllMembersWithAnyRole(String dbName, Role ... roles) {
        this.ensureDBName(dbName);
        Set roleSet = Arrays.stream(roles).collect(Collectors.toSet());
        ArrayList<CoreClusterMember> list = new ArrayList<CoreClusterMember>();
        for (CoreClusterMember m : this.coreMembers.values()) {
            CoreGraphDatabase database = m.database();
            if (database == null || !m.dbName().equals(dbName) || !roleSet.contains(database.getRole())) continue;
            list.add(m);
        }
        return list;
    }

    public CoreClusterMember awaitLeader() throws TimeoutException {
        return this.awaitCoreMemberWithRole(Role.LEADER, 120000L, TimeUnit.MILLISECONDS);
    }

    public CoreClusterMember awaitLeader(String dbName) throws TimeoutException {
        return this.awaitCoreMemberWithRole(dbName, Role.LEADER, 120000L, TimeUnit.MILLISECONDS);
    }

    public CoreClusterMember awaitLeader(String dbName, long timeout, TimeUnit timeUnit) throws TimeoutException {
        return this.awaitCoreMemberWithRole(dbName, Role.LEADER, timeout, timeUnit);
    }

    public CoreClusterMember awaitLeader(long timeout, TimeUnit timeUnit) throws TimeoutException {
        return this.awaitCoreMemberWithRole(Role.LEADER, timeout, timeUnit);
    }

    public CoreClusterMember awaitCoreMemberWithRole(Role role, long timeout, TimeUnit timeUnit) throws TimeoutException {
        return (CoreClusterMember)Predicates.await(() -> this.getMemberWithRole(role), (Predicate)Predicates.notNull(), (long)timeout, (TimeUnit)timeUnit);
    }

    public CoreClusterMember awaitCoreMemberWithRole(String dbName, Role role, long timeout, TimeUnit timeUnit) throws TimeoutException {
        return (CoreClusterMember)Predicates.await(() -> this.getMemberWithRole(dbName, role), (Predicate)Predicates.notNull(), (long)timeout, (TimeUnit)timeUnit);
    }

    public int numberOfCoreMembersReportedByTopology() {
        CoreClusterMember aCoreGraphDb = this.coreMembers.values().stream().filter(member -> member.database() != null).findAny().orElseThrow(IllegalArgumentException::new);
        CoreTopologyService coreTopologyService = (CoreTopologyService)aCoreGraphDb.database().getDependencyResolver().resolveDependency(CoreTopologyService.class);
        return coreTopologyService.localCoreServers().members().size();
    }

    public CoreClusterMember coreTx(BiConsumer<CoreGraphDatabase, Transaction> op) throws Exception {
        String dbName = CausalClusteringSettings.database.getDefaultValue();
        return this.coreTx(dbName, op);
    }

    public CoreClusterMember coreTx(String dbName, BiConsumer<CoreGraphDatabase, Transaction> op) throws Exception {
        this.ensureDBName(dbName);
        return this.leaderTx(dbName, op, 120000, TimeUnit.MILLISECONDS);
    }

    private CoreClusterMember leaderTx(String dbName, BiConsumer<CoreGraphDatabase, Transaction> op, int timeout, TimeUnit timeUnit) throws Exception {
        ThrowingSupplier supplier = () -> {
            CoreClusterMember member = this.awaitLeader(dbName, timeout, timeUnit);
            CoreGraphDatabase db = member.database();
            if (db == null) {
                throw new DatabaseShutdownException();
            }
            try (Transaction tx = db.beginTx();){
                op.accept(db, tx);
                CoreClusterMember coreClusterMember = member;
                return coreClusterMember;
            }
            catch (Throwable e) {
                if (!Cluster.isTransientFailure(e)) throw e;
                return null;
            }
        };
        return (CoreClusterMember)Predicates.awaitEx((ThrowingSupplier)supplier, Predicates.notNull()::test, (long)timeout, (TimeUnit)timeUnit);
    }

    private static boolean isTransientFailure(Throwable e) {
        return e instanceof IdGenerationException || Cluster.isLockExpired(e) || Cluster.isLockOnFollower(e) || Cluster.isWriteNotOnLeader(e) || Cluster.isUnableToReplicate(e);
    }

    private static boolean isWriteNotOnLeader(Throwable e) {
        return e instanceof WriteOperationsNotAllowedException && e.getMessage().startsWith(String.format("No write operations are allowed directly on this database. Writes must pass through the leader. The role of this server is: %s", ""));
    }

    private static boolean isLockOnFollower(Throwable e) {
        return e instanceof AcquireLockTimeoutException && (e.getMessage().equals("Should only attempt to take locks when leader.") || e.getCause() instanceof NoLeaderFoundException);
    }

    private static boolean isUnableToReplicate(Throwable e) {
        return e instanceof TransactionFailureException && ((TransactionFailureException)e).status().equals(Status.Cluster.ReplicationFailure);
    }

    private static boolean isLockExpired(Throwable e) {
        return e instanceof org.neo4j.graphdb.TransactionFailureException && e.getCause() instanceof TransactionFailureException && ((TransactionFailureException)e.getCause()).status() == Status.Transaction.LockSessionExpired;
    }

    private List<AdvertisedSocketAddress> extractInitialHosts(Map<Integer, CoreClusterMember> coreMembers) {
        return coreMembers.values().stream().map(CoreClusterMember::discoveryPort).map(port -> new AdvertisedSocketAddress(this.advertisedAddress, port.intValue())).collect(Collectors.toList());
    }

    private void createCoreMembers(int noOfCoreMembers, List<AdvertisedSocketAddress> initialHosts, Map<String, String> extraParams, Map<String, IntFunction<String>> instanceExtraParams, String recordFormat) {
        for (int i = 0; i < initialHosts.size(); ++i) {
            int discoveryListenAddress = initialHosts.get(i).getPort();
            CoreClusterMember coreClusterMember = this.createCoreClusterMember(i, discoveryListenAddress, noOfCoreMembers, initialHosts, recordFormat, extraParams, instanceExtraParams);
            this.coreMembers.put(i, coreClusterMember);
        }
        this.highestCoreServerId = noOfCoreMembers - 1;
    }

    protected CoreClusterMember createCoreClusterMember(int serverId, int hazelcastPort, int clusterSize, List<AdvertisedSocketAddress> initialHosts, String recordFormat, Map<String, String> extraParams, Map<String, IntFunction<String>> instanceExtraParams) {
        int txPort = PortAuthority.allocatePort();
        int raftPort = PortAuthority.allocatePort();
        int boltPort = PortAuthority.allocatePort();
        int httpPort = PortAuthority.allocatePort();
        int backupPort = PortAuthority.allocatePort();
        return new CoreClusterMember(serverId, hazelcastPort, txPort, raftPort, boltPort, httpPort, backupPort, clusterSize, initialHosts, this.discoveryServiceFactory, recordFormat, this.parentDir, extraParams, instanceExtraParams, this.listenAddress, this.advertisedAddress);
    }

    protected ReadReplica createReadReplica(int serverId, List<AdvertisedSocketAddress> initialHosts, Map<String, String> extraParams, Map<String, IntFunction<String>> instanceExtraParams, String recordFormat, Monitors monitors) {
        int boltPort = PortAuthority.allocatePort();
        int httpPort = PortAuthority.allocatePort();
        int txPort = PortAuthority.allocatePort();
        int backupPort = PortAuthority.allocatePort();
        return new ReadReplica(this.parentDir, serverId, boltPort, httpPort, txPort, backupPort, this.discoveryServiceFactory, initialHosts, extraParams, instanceExtraParams, recordFormat, monitors, this.advertisedAddress, this.listenAddress);
    }

    public void startCoreMembers() throws InterruptedException, ExecutionException {
        this.startCoreMembers(this.coreMembers.values());
    }

    public void startCoreMember(CoreClusterMember member) throws InterruptedException, ExecutionException {
        this.startCoreMembers(Collections.singleton(member));
    }

    public void startCoreMembers(Collection<CoreClusterMember> members) throws InterruptedException, ExecutionException {
        List<Future<CoreGraphDatabase>> futures = this.invokeAll("cluster-starter", members, cm -> {
            cm.start();
            return cm.database();
        });
        for (Future<CoreGraphDatabase> future : futures) {
            future.get();
        }
    }

    private void startReadReplicas() throws InterruptedException, ExecutionException {
        Collection<ReadReplica> members = this.readReplicas.values();
        List<Future<ReadReplicaGraphDatabase>> futures = this.invokeAll("cluster-starter", members, cm -> {
            cm.start();
            return cm.database();
        });
        for (Future<ReadReplicaGraphDatabase> future : futures) {
            future.get();
        }
    }

    private void createReadReplicas(int noOfReadReplicas, List<AdvertisedSocketAddress> initialHosts, Map<String, String> extraParams, Map<String, IntFunction<String>> instanceExtraParams, String recordFormat) {
        for (int i = 0; i < noOfReadReplicas; ++i) {
            ReadReplica readReplica = this.createReadReplica(i, initialHosts, extraParams, instanceExtraParams, recordFormat, new Monitors());
            this.readReplicas.put(i, readReplica);
        }
        this.highestReplicaServerId = noOfReadReplicas - 1;
    }

    private void shutdownReadReplicas(ErrorHandler errorHandler) {
        this.shutdownMembers(this.readReplicas(), errorHandler);
    }

    public static void dataOnMemberEventuallyLooksLike(CoreClusterMember memberThatChanges, CoreClusterMember memberToLookLike) throws TimeoutException {
        Predicates.await(() -> {
            try {
                DbRepresentation representationToLookLike = DbRepresentation.of((GraphDatabaseService)memberToLookLike.database());
                DbRepresentation representationThatChanges = DbRepresentation.of((GraphDatabaseService)memberThatChanges.database());
                return representationToLookLike.equals((Object)representationThatChanges);
            }
            catch (DatabaseShutdownException databaseShutdownException) {
                return false;
            }
        }, (long)120000L, (TimeUnit)TimeUnit.MILLISECONDS);
    }

    public static <T extends ClusterMember> void dataMatchesEventually(ClusterMember source, Collection<T> targets) throws TimeoutException {
        Cluster.dataMatchesEventually(DbRepresentation.of(source.database()), targets);
    }

    public static <T extends ClusterMember> void dataMatchesEventually(DbRepresentation source, Collection<T> targets) throws TimeoutException {
        for (ClusterMember targetDB : targets) {
            Predicates.await(() -> {
                DbRepresentation representation = DbRepresentation.of(targetDB.database());
                return source.equals((Object)representation);
            }, (long)120000L, (TimeUnit)TimeUnit.MILLISECONDS);
        }
    }

    public ClusterMember getMemberByBoltAddress(AdvertisedSocketAddress advertisedSocketAddress) {
        for (CoreClusterMember coreClusterMember : this.coreMembers.values()) {
            if (!coreClusterMember.boltAdvertisedAddress().equals(advertisedSocketAddress.toString())) continue;
            return coreClusterMember;
        }
        for (ReadReplica readReplica : this.readReplicas.values()) {
            if (!readReplica.boltAdvertisedAddress().equals(advertisedSocketAddress.toString())) continue;
            return readReplica;
        }
        throw new RuntimeException("Could not find a member for bolt address " + advertisedSocketAddress);
    }

    public Optional<ClusterMember> randomMember(boolean mustBeStarted) {
        Stream<ClusterMember<ReadReplicaGraphDatabase>> members = Stream.concat(this.coreMembers().stream(), this.readReplicas().stream());
        if (mustBeStarted) {
            members = members.filter(m -> !m.isShutdown());
        }
        List eligible = members.collect(Collectors.toList());
        return Cluster.random(eligible);
    }

    public Optional<CoreClusterMember> randomCoreMember(boolean mustBeStarted) {
        Stream<CoreClusterMember> members = this.coreMembers().stream();
        if (mustBeStarted) {
            members = members.filter(m -> !m.isShutdown());
        }
        List eligible = members.collect(Collectors.toList());
        return Cluster.random(eligible);
    }

    private static <T> Optional<T> random(List<T> list) {
        if (list.size() == 0) {
            return Optional.empty();
        }
        int ordinal = ThreadLocalRandom.current().nextInt(list.size());
        return Optional.of(list.get(ordinal));
    }
}

