package com.dianping.cat.message.io;

import com.dianping.cat.CatConstants;
import com.dianping.cat.analyzer.LocalAggregator;
import com.dianping.cat.analyzer.ptest.PtestConstants;
import com.dianping.cat.configuration.ClientConfigManager;
import com.dianping.cat.message.Message;
import com.dianping.cat.message.internal.DefaultTransaction;
import com.dianping.cat.message.internal.MessageIdFactory;
import com.dianping.cat.message.queue.DefaultMessageQueue;
import com.dianping.cat.message.queue.PriorityMessageQueue;
import com.dianping.cat.message.spi.MessageCodec;
import com.dianping.cat.message.spi.MessageQueue;
import com.dianping.cat.message.spi.MessageStatistics;
import com.dianping.cat.message.spi.MessageTree;
import com.dianping.cat.message.spi.codec.NativeMessageCodec;
import com.dianping.cat.message.spi.internal.DefaultMessageTree;
import com.dianping.cat.status.AbstractCollector;
import com.dianping.cat.status.StatusExtensionRegister;
import com.dianping.cat.util.MtraceTracer;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelFuture;
import java.net.InetSocketAddress;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import org.codehaus.plexus.logging.LogEnabled;
import org.codehaus.plexus.logging.Logger;
import org.unidal.helper.Threads;
import org.unidal.lookup.annotation.Inject;
import org.unidal.lookup.annotation.Named;

@Named
/* loaded from: input_file:com/dianping/cat/message/io/TcpSocketSender.class */
public class TcpSocketSender implements Threads.Task, MessageSender, LogEnabled {

    @Inject({NativeMessageCodec.ID})
    private MessageCodec m_nativeCodec;

    @Inject
    private MessageStatistics m_statistics;

    @Inject
    private ClientConfigManager m_configManager;

    @Inject
    private MessageIdFactory m_factory;
    private MessageQueue m_messageQueue = new PriorityMessageQueue(SIZE);
    private AtomicMessageManager m_atomicQueueManager = new AtomicMessageManager(SIZE);
    private ChannelManager m_channelManager;
    private Logger m_logger;
    private boolean m_active;
    private static final int SIZE = 5000;
    private static final long HOUR = 3600000;

    /* loaded from: input_file:com/dianping/cat/message/io/TcpSocketSender$AtomicMessageManager.class */
    public class AtomicMessageManager {
        private MessageQueue m_smallMessages;
        private MessageQueue m_perfTestSmallMessages;
        private static final long HOUR = 3600000;
        private static final int MAX_CHILD_NUMBER = 200;
        private static final int MAX_DURATION = 30000;

        public AtomicMessageManager(int i) {
            this.m_smallMessages = new DefaultMessageQueue(i);
            this.m_perfTestSmallMessages = new DefaultMessageQueue(i);
        }

        public int getPtestQueueSize() {
            return this.m_perfTestSmallMessages.size();
        }

        public int getQueueSize() {
            return this.m_smallMessages.size();
        }

        private boolean isSameHour(long j, long j2) {
            return ((int) (j / 3600000)) == ((int) (j2 / 3600000));
        }

        private MessageTree mergeTree(MessageQueue messageQueue) {
            MessageTree peek;
            MessageTree poll;
            DefaultTransaction defaultTransaction = new DefaultTransaction(CatConstants.CAT_SYSTEM, "AtomicAggregator");
            MessageTree poll2 = messageQueue.poll();
            Message message = poll2.getMessage();
            long timestamp = message.getTimestamp();
            defaultTransaction.setStatus(Message.SUCCESS);
            defaultTransaction.setCompleted(true);
            defaultTransaction.setDurationStart(timestamp);
            defaultTransaction.setTimestamp(timestamp);
            defaultTransaction.setDurationInMicros(0L);
            defaultTransaction.addChild(message);
            for (int i = MAX_CHILD_NUMBER; i >= 0 && (peek = messageQueue.peek()) != null && isSameHour(timestamp, peek.getMessage().getTimestamp()) && (poll = messageQueue.poll()) != null; i--) {
                defaultTransaction.addChild(poll.getMessage());
            }
            ((DefaultMessageTree) poll2).setMessage(defaultTransaction);
            return poll2;
        }

        public boolean offerToQueue(MessageTree messageTree, boolean z) {
            return z ? this.m_perfTestSmallMessages.offer(messageTree) : this.m_smallMessages.offer(messageTree);
        }

        public void processAtomicMessage() {
            processNormalAtomicMessage();
            processPerfAtomicMessage();
        }

        private void processNormalAtomicMessage() {
            while (shouldMerge(this.m_smallMessages)) {
                TcpSocketSender.this.offer(mergeTree(this.m_smallMessages), false);
            }
        }

        private void processPerfAtomicMessage() {
            while (shouldMerge(this.m_perfTestSmallMessages)) {
                TcpSocketSender.this.offer(mergeTree(this.m_perfTestSmallMessages), true);
            }
        }

        private boolean shouldMerge(MessageQueue messageQueue) {
            MessageTree peek = messageQueue.peek();
            if (peek != null) {
                return System.currentTimeMillis() - peek.getMessage().getTimestamp() > 30000 || messageQueue.size() >= MAX_CHILD_NUMBER;
            }
            return false;
        }
    }

    public void enableLogging(Logger logger) {
        this.m_logger = logger;
    }

    public String getName() {
        return "TcpSocketSender";
    }

    /* JADX WARN: Multi-variable type inference failed */
    @Override // com.dianping.cat.message.io.MessageSender
    public void initialize(List<InetSocketAddress> list) {
        this.m_channelManager = new ChannelManager(this.m_logger, list, this.m_configManager, this.m_factory);
        Threads.forGroup("cat").start(this);
        Threads.forGroup("cat").start(this.m_channelManager);
        Runtime.getRuntime().addShutdownHook(new Thread() { // from class: com.dianping.cat.message.io.TcpSocketSender.1
            @Override // java.lang.Thread, java.lang.Runnable
            public void run() {
                TcpSocketSender.this.m_logger.info("shut down cat client in runtime shut down hook!");
                TcpSocketSender.this.shutdown();
            }
        });
        StatusExtensionRegister.getInstance().register(new AbstractCollector() { // from class: com.dianping.cat.message.io.TcpSocketSender.2
            @Override // com.dianping.cat.status.StatusExtension
            public String getId() {
                return "cat.status";
            }

            @Override // com.dianping.cat.status.StatusExtension
            public Map<String, String> getProperties() {
                LinkedHashMap linkedHashMap = new LinkedHashMap();
                linkedHashMap.put("cat.status.send.queue.size", String.valueOf(TcpSocketSender.this.m_messageQueue.size()));
                linkedHashMap.put("cat.status.send.atomic.queue.size", String.valueOf(TcpSocketSender.this.m_atomicQueueManager.getQueueSize()));
                linkedHashMap.put("cat.status.send.atomic.ptest.queue.size", String.valueOf(TcpSocketSender.this.m_atomicQueueManager.getPtestQueueSize()));
                for (Map.Entry<String, Long> entry : TcpSocketSender.this.m_statistics.getStatistics().entrySet()) {
                    linkedHashMap.put(entry.getKey(), String.valueOf(entry.getValue()));
                }
                return linkedHashMap;
            }
        });
    }

    private void logMessageDiscard(MessageTree messageTree) {
        this.m_statistics.onOverflowed(messageTree);
    }

    /* JADX INFO: Access modifiers changed from: private */
    public void offer(MessageTree messageTree, boolean z) {
        boolean offer;
        if (this.m_configManager.isAtomicMessage(messageTree)) {
            offer = this.m_atomicQueueManager.offerToQueue(messageTree, z);
        } else {
            if (z) {
                messageTree.setDomain(PtestConstants.PERF_DOMAIN);
            }
            offer = this.m_messageQueue.offer(messageTree);
        }
        if (offer) {
            return;
        }
        processTreeInClient(messageTree, z);
        if (messageTree.canDiscard()) {
            return;
        }
        logMessageDiscard(messageTree);
    }

    private void processMessage() {
        ChannelFuture channel = this.m_channelManager.channel();
        if (channel != null) {
            try {
                MessageTree poll = this.m_messageQueue.poll();
                if (poll != null) {
                    sendInternal(channel, poll);
                    poll.setMessage(null);
                } else {
                    try {
                        Thread.sleep(5L);
                    } catch (Exception e) {
                        this.m_active = false;
                    }
                }
                return;
            } catch (Throwable th) {
                this.m_logger.error("Error when sending message over TCP socket!", th);
                return;
            }
        }
        long currentTimeMillis = System.currentTimeMillis() - 3600000;
        while (true) {
            try {
                MessageTree peek = this.m_messageQueue.peek();
                if (peek == null || peek.getMessage().getTimestamp() >= currentTimeMillis) {
                    break;
                }
                MessageTree poll2 = this.m_messageQueue.poll();
                if (poll2 != null) {
                    this.m_statistics.onOverflowed(poll2);
                }
            } catch (Exception e2) {
                this.m_logger.error(e2.getMessage(), e2);
            }
        }
        try {
            Thread.sleep(5L);
        } catch (Exception e3) {
            this.m_active = false;
        }
    }

    private void processTreeInClient(MessageTree messageTree, boolean z) {
        LocalAggregator.aggregate(messageTree, z);
    }

    public void run() {
        this.m_active = true;
        while (this.m_active) {
            processMessage();
            this.m_atomicQueueManager.processAtomicMessage();
        }
        this.m_atomicQueueManager.processAtomicMessage();
        while (true) {
            MessageTree poll = this.m_messageQueue.poll();
            if (poll == null) {
                return;
            }
            ChannelFuture channel = this.m_channelManager.channel();
            if (channel != null) {
                sendInternal(channel, poll);
            } else {
                offer(poll, false);
            }
        }
    }

    @Override // com.dianping.cat.message.io.MessageSender
    public void send(MessageTree messageTree) {
        if (this.m_configManager.isMessageBlock()) {
            return;
        }
        double samplingRate = this.m_configManager.getSamplingRate();
        boolean isTest = MtraceTracer.isTest();
        if (!messageTree.canDiscard() || samplingRate >= 1.0d || messageTree.isHitSample()) {
            offer(messageTree, isTest);
        } else {
            processTreeInClient(messageTree, isTest);
        }
    }

    private void sendInternal(ChannelFuture channelFuture, MessageTree messageTree) {
        if (messageTree.getMessageId() == null) {
            messageTree.setMessageId(this.m_factory.getNextId());
        }
        ByteBuf encode = this.m_nativeCodec.encode(messageTree);
        int readableBytes = encode.readableBytes();
        channelFuture.channel().writeAndFlush(encode);
        if (this.m_statistics != null) {
            this.m_statistics.onBytes(readableBytes);
        }
    }

    public void sendMessageForTest(MessageTree messageTree) {
        if (messageTree.getMessageId() == null) {
            messageTree.setMessageId(this.m_factory.getNextId());
        }
        ChannelFuture channel = this.m_channelManager.channel();
        if (channel != null) {
            ByteBuf encode = this.m_nativeCodec.encode(messageTree);
            int readableBytes = encode.readableBytes();
            channel.channel().writeAndFlush(encode);
            if (this.m_statistics != null) {
                this.m_statistics.onBytes(readableBytes);
            }
        }
    }

    @Override // com.dianping.cat.message.io.MessageSender
    public void shutdown() {
        this.m_active = false;
        this.m_channelManager.shutdown();
    }
}
