/*
 * Decompiled with CFR 0.152.
 */
package org.neo4j.bolt.runtime;

import io.netty.channel.Channel;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.neo4j.bolt.BoltChannel;
import org.neo4j.bolt.BoltKernelExtension;
import org.neo4j.bolt.runtime.BoltConnection;
import org.neo4j.bolt.runtime.BoltConnectionLifetimeListener;
import org.neo4j.bolt.runtime.BoltConnectionQueueMonitor;
import org.neo4j.bolt.v1.packstream.PackOutput;
import org.neo4j.bolt.v1.runtime.BoltConnectionAuthFatality;
import org.neo4j.bolt.v1.runtime.BoltProtocolBreachFatality;
import org.neo4j.bolt.v1.runtime.BoltStateMachine;
import org.neo4j.bolt.v1.runtime.Job;
import org.neo4j.bolt.v1.runtime.Neo4jError;
import org.neo4j.kernel.api.exceptions.Status;
import org.neo4j.kernel.impl.logging.LogService;
import org.neo4j.logging.Log;
import org.neo4j.util.FeatureToggles;

public class DefaultBoltConnection
implements BoltConnection {
    protected static final int DEFAULT_MAX_BATCH_SIZE = FeatureToggles.getInteger(BoltKernelExtension.class, (String)"max_batch_size", (int)100);
    private final String id;
    private final BoltChannel channel;
    private final BoltStateMachine machine;
    private final BoltConnectionLifetimeListener listener;
    private final BoltConnectionQueueMonitor queueMonitor;
    private final PackOutput output;
    private final Log log;
    private final Log userLog;
    private final int maxBatchSize;
    private final List<Job> batch;
    private final LinkedBlockingQueue<Job> queue = new LinkedBlockingQueue();
    private final AtomicBoolean shouldClose = new AtomicBoolean();
    private final AtomicBoolean closed = new AtomicBoolean();

    public DefaultBoltConnection(BoltChannel channel, PackOutput output, BoltStateMachine machine, LogService logService, BoltConnectionLifetimeListener listener, BoltConnectionQueueMonitor queueMonitor) {
        this(channel, output, machine, logService, listener, queueMonitor, DEFAULT_MAX_BATCH_SIZE);
    }

    public DefaultBoltConnection(BoltChannel channel, PackOutput output, BoltStateMachine machine, LogService logService, BoltConnectionLifetimeListener listener, BoltConnectionQueueMonitor queueMonitor, int maxBatchSize) {
        this.id = channel.id();
        this.channel = channel;
        this.output = output;
        this.machine = machine;
        this.listener = listener;
        this.queueMonitor = queueMonitor;
        this.log = logService.getInternalLog(this.getClass());
        this.userLog = logService.getUserLog(this.getClass());
        this.maxBatchSize = maxBatchSize;
        this.batch = new ArrayList<Job>(maxBatchSize);
    }

    @Override
    public String id() {
        return this.id;
    }

    @Override
    public SocketAddress localAddress() {
        return this.channel.serverAddress();
    }

    @Override
    public SocketAddress remoteAddress() {
        return this.channel.clientAddress();
    }

    @Override
    public Channel channel() {
        return this.channel.rawChannel();
    }

    @Override
    public PackOutput output() {
        return this.output;
    }

    @Override
    public boolean hasPendingJobs() {
        return !this.queue.isEmpty();
    }

    @Override
    public void start() {
        this.notifyCreated();
    }

    @Override
    public void enqueue(Job job) {
        this.enqueueInternal(job);
    }

    @Override
    public boolean processNextBatch() {
        return this.processNextBatch(this.maxBatchSize, false);
    }

    /*
     * WARNING - Removed try catching itself - possible behaviour change.
     */
    protected boolean processNextBatch(int batchCount, boolean exitIfNoJobsAvailable) {
        try {
            boolean waitForMessage = false;
            boolean loop = false;
            while (!this.willClose()) {
                if (waitForMessage || !this.queue.isEmpty()) {
                    this.queue.drainTo(this.batch, batchCount);
                    if (this.batch.size() == 0 && !exitIfNoJobsAvailable) {
                        while (!this.willClose()) {
                            Job nextJob = this.queue.poll(10L, TimeUnit.SECONDS);
                            if (nextJob != null) {
                                this.batch.add(nextJob);
                                break;
                            }
                            this.machine.validateTransaction();
                        }
                    }
                    this.notifyDrained(this.batch);
                    while (this.batch.size() > 0) {
                        Job current = this.batch.remove(0);
                        current.perform(this.machine);
                    }
                    waitForMessage = loop = this.machine.shouldStickOnThread();
                }
                if (this.queue.size() == 0 || this.maxBatchSize == 1) {
                    this.output.flush();
                }
                if (loop) continue;
            }
            if (!this.willClose()) assert (!this.machine.hasOpenStatement());
        }
        catch (BoltConnectionAuthFatality ex) {
            this.shouldClose.set(true);
        }
        catch (BoltProtocolBreachFatality ex) {
            this.shouldClose.set(true);
            this.log.error(String.format("Protocol breach detected in bolt session '%s'.", this.id()), (Throwable)ex);
        }
        catch (InterruptedException ex) {
            this.shouldClose.set(true);
            this.log.info("Bolt session '%s' is interrupted probably due to server shutdown.", new Object[]{this.id()});
        }
        catch (Throwable t) {
            this.shouldClose.set(true);
            this.userLog.error(String.format("Unexpected error detected in bolt session '%s'.", this.id()), t);
        }
        finally {
            if (this.willClose()) {
                this.close();
            }
        }
        return !this.closed.get();
    }

    @Override
    public void handleSchedulingError(Throwable t) {
        String message;
        Neo4jError error;
        if (ExceptionUtils.hasCause((Throwable)t, RejectedExecutionException.class)) {
            error = Neo4jError.from((Status)Status.Request.NoThreadsAvailable, Status.Request.NoThreadsAvailable.code().description());
            message = String.format("Unable to schedule bolt session '%s' for execution since there are no available threads to serve it at the moment. You can retry at a later time or consider increasing max thread pool size for bolt connector(s).", this.id());
        } else {
            error = Neo4jError.fatalFrom(t);
            message = String.format("Unexpected error during scheduling of bolt session '%s'.", this.id());
        }
        this.log.error(message, t);
        this.userLog.error(message);
        this.machine.markFailed(error);
        this.processNextBatch(1, true);
    }

    @Override
    public void interrupt() {
        this.machine.interrupt();
    }

    @Override
    public void stop() {
        if (this.shouldClose.compareAndSet(false, true)) {
            this.machine.terminate();
            this.enqueueInternal(ignore -> {});
        }
    }

    private boolean willClose() {
        return this.shouldClose.get();
    }

    private void close() {
        if (this.closed.compareAndSet(false, true)) {
            try {
                this.output.close();
            }
            catch (Throwable t) {
                this.log.error(String.format("Unable to close pack output of bolt session '%s'.", this.id()), t);
            }
            try {
                this.machine.close();
            }
            catch (Throwable t) {
                this.log.error(String.format("Unable to close state machine of bolt session '%s'.", this.id()), t);
            }
            finally {
                this.notifyDestroyed();
            }
        }
    }

    private void enqueueInternal(Job job) {
        this.queue.offer(job);
        this.notifyEnqueued(job);
    }

    private void notifyCreated() {
        if (this.listener != null) {
            this.listener.created(this);
        }
    }

    private void notifyDestroyed() {
        if (this.listener != null) {
            this.listener.closed(this);
        }
    }

    private void notifyEnqueued(Job job) {
        if (this.queueMonitor != null) {
            this.queueMonitor.enqueued(this, job);
        }
    }

    private void notifyDrained(List<Job> jobs) {
        if (this.queueMonitor != null && jobs.size() > 0) {
            this.queueMonitor.drained(this, jobs);
        }
    }
}

