/*
 * Decompiled with CFR 0.152.
 */
package com.dianping.cat.message.io;

import com.dianping.cat.configuration.ClientConfigManager;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.Transaction;
import com.dianping.cat.message.internal.DefaultTransaction;
import com.dianping.cat.message.internal.MessageIdFactory;
import com.dianping.cat.message.io.ChannelManager;
import com.dianping.cat.message.io.DefaultMessageQueue;
import com.dianping.cat.message.io.MessageSender;
import com.dianping.cat.message.spi.MessageCodec;
import com.dianping.cat.message.spi.MessageQueue;
import com.dianping.cat.message.spi.MessageStatistics;
import com.dianping.cat.message.spi.MessageTree;
import com.dianping.cat.message.spi.internal.DefaultMessageTree;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.PooledByteBufAllocator;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import java.net.InetSocketAddress;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import org.codehaus.plexus.logging.LogEnabled;
import org.codehaus.plexus.logging.Logger;
import org.unidal.helper.Threads;
import org.unidal.lookup.annotation.Inject;

/*
 * This class specifies class file version 49.0 but uses Java 6 signatures.  Assumed Java 6.
 */
public class TcpSocketSender
implements Threads.Task,
MessageSender,
LogEnabled {
    public static final String ID = "tcp-socket-sender";
    private static final int MAX_CHILD_NUMBER = 200;
    private static final long HOUR = 3600000L;
    @Inject
    private MessageCodec m_codec;
    @Inject
    private MessageStatistics m_statistics;
    @Inject
    private ClientConfigManager m_configManager;
    @Inject
    private MessageIdFactory m_factory;
    private MessageQueue m_queue;
    private MessageQueue m_atomicTrees;
    private List<InetSocketAddress> m_serverAddresses;
    private ChannelManager m_manager;
    private Logger m_logger;
    private transient boolean m_active;
    private AtomicInteger m_errors = new AtomicInteger();
    private AtomicInteger m_attempts = new AtomicInteger();

    public static int getQueueSize() {
        String size = System.getProperty("queue.size", "1000");
        return Integer.parseInt(size);
    }

    private boolean checkWritable(ChannelFuture future) throws InterruptedException {
        boolean isWriteable = false;
        Channel channel = future.channel();
        if (channel.isOpen()) {
            if (channel.isActive() && channel.isWritable()) {
                isWriteable = true;
            } else {
                int count = this.m_attempts.incrementAndGet();
                if (count % 1000 == 0 || count == 1) {
                    this.m_logger.warn("Netty write buffer is full! Attempts: " + count);
                }
                TimeUnit.MILLISECONDS.sleep(5L);
            }
        }
        return isWriteable;
    }

    public void enableLogging(Logger logger) {
        this.m_logger = logger;
    }

    public String getName() {
        return "TcpSocketSender";
    }

    @Override
    public void initialize() {
        int len = TcpSocketSender.getQueueSize();
        this.m_queue = new DefaultMessageQueue(len);
        this.m_atomicTrees = new DefaultMessageQueue(len);
        this.m_manager = new ChannelManager(this.m_logger, this.m_serverAddresses, this.m_queue, this.m_configManager, this.m_factory);
        Threads.forGroup((String)"cat").start((Runnable)((Object)this));
        Threads.forGroup((String)"cat").start((Runnable)((Object)this.m_manager));
        Threads.forGroup((String)"cat").start((Runnable)((Object)new MergeAtomicTask()));
    }

    private boolean isAtomicMessage(MessageTree tree) {
        Message message = tree.getMessage();
        if (message instanceof Transaction) {
            String type = message.getType();
            return type.startsWith("Cache.") || "SQL".equals(type);
        }
        return true;
    }

    private void logQueueFullInfo(MessageTree tree) {
        int count;
        if (this.m_statistics != null) {
            this.m_statistics.onOverflowed(tree);
        }
        if ((count = this.m_errors.incrementAndGet()) % 1000 == 0 || count == 1) {
            this.m_logger.error("Message queue is full in tcp socket sender! Count: " + count);
        }
        tree = null;
    }

    private MessageTree mergeTree(MessageQueue trees) {
        DefaultTransaction t = new DefaultTransaction("_CatMergeTree", "_CatMergeTree", null);
        MessageTree first = trees.poll();
        t.setStatus("0");
        t.setCompleted(true);
        t.addChild(first.getMessage());
        t.setTimestamp(first.getMessage().getTimestamp());
        long lastTimestamp = 0L;
        long lastDuration = 0L;
        for (int max = 200; max >= 0; --max) {
            MessageTree tree = trees.poll();
            if (tree == null) {
                t.setDurationInMillis(lastTimestamp - t.getTimestamp() + lastDuration);
                break;
            }
            lastTimestamp = tree.getMessage().getTimestamp();
            lastDuration = tree.getMessage() instanceof DefaultTransaction ? ((DefaultTransaction)tree.getMessage()).getDurationInMillis() : 0L;
            t.addChild(tree.getMessage());
            this.m_factory.reuse(tree.getMessageId());
        }
        ((DefaultMessageTree)first).setMessage(t);
        return first;
    }

    /*
     * Enabled aggressive block sorting
     * Enabled unnecessary exception pruning
     * Enabled aggressive exception aggregation
     */
    public void run() {
        this.m_active = true;
        try {
            while (this.m_active) {
                ChannelFuture channel = this.m_manager.channel();
                if (channel != null && this.checkWritable(channel)) {
                    try {
                        MessageTree tree = this.m_queue.poll();
                        if (tree == null) continue;
                        this.sendInternal(tree);
                        tree.setMessage(null);
                        continue;
                    }
                    catch (Throwable t) {
                        this.m_logger.error("Error when sending message over TCP socket!", t);
                        continue;
                    }
                }
                long current = System.currentTimeMillis();
                long oldTimestamp = current - 3600000L;
                try {
                    MessageTree tree;
                    while ((tree = this.m_queue.peek()) != null && tree.getMessage().getTimestamp() < oldTimestamp) {
                        MessageTree discradTree = this.m_queue.poll();
                        if (discradTree == null) continue;
                        this.m_statistics.onOverflowed(discradTree);
                    }
                }
                catch (Exception e) {
                    this.m_logger.error(e.getMessage(), (Throwable)e);
                }
                TimeUnit.MILLISECONDS.sleep(5L);
            }
            return;
        }
        catch (InterruptedException e) {
            this.m_active = false;
        }
    }

    @Override
    public void send(MessageTree tree) {
        if (this.isAtomicMessage(tree)) {
            boolean result = this.m_atomicTrees.offer(tree, this.m_manager.getSample());
            if (!result) {
                this.logQueueFullInfo(tree);
            }
        } else {
            boolean result = this.m_queue.offer(tree, this.m_manager.getSample());
            if (!result) {
                this.logQueueFullInfo(tree);
            }
        }
    }

    private void sendInternal(MessageTree tree) {
        ChannelFuture future = this.m_manager.channel();
        ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(10240);
        this.m_codec.encode(tree, buf);
        int size = buf.readableBytes();
        Channel channel = future.channel();
        channel.writeAndFlush((Object)buf);
        if (this.m_statistics != null) {
            this.m_statistics.onBytes(size);
        }
    }

    public void setServerAddresses(List<InetSocketAddress> serverAddresses) {
        this.m_serverAddresses = serverAddresses;
    }

    private boolean shouldMerge(MessageQueue trees) {
        MessageTree tree = trees.peek();
        if (tree != null) {
            long firstTime = tree.getMessage().getTimestamp();
            int maxDuration = 30000;
            if (System.currentTimeMillis() - firstTime > (long)maxDuration || trees.size() >= 200) {
                return true;
            }
        }
        return false;
    }

    @Override
    public void shutdown() {
        this.m_active = false;
        this.m_manager.shutdown();
    }

    public class MergeAtomicTask
    implements Threads.Task {
        public String getName() {
            return "merge-atomic-task";
        }

        public void run() {
            while (true) {
                if (TcpSocketSender.this.shouldMerge(TcpSocketSender.this.m_atomicTrees)) {
                    MessageTree tree = TcpSocketSender.this.mergeTree(TcpSocketSender.this.m_atomicTrees);
                    boolean result = TcpSocketSender.this.m_queue.offer(tree);
                    if (result) continue;
                    TcpSocketSender.this.logQueueFullInfo(tree);
                    continue;
                }
                try {
                    Thread.sleep(5L);
                }
                catch (InterruptedException e) {
                    return;
                }
            }
        }

        public void shutdown() {
        }
    }
}

