天天看點

應用監控CAT之cat-consumer源碼閱讀(二)

  之前講了 cat-client 進行cat埋點上報,那麼上報給誰呢?以及後續故事如何?讓我們來看看 cat-consumer 是如何接收處理的?

  由cat-client發送資料,cat-consumer進行接收請求處理,開始了處理問題之旅!

首先,讓我們來回顧一下 TcpSocketSender 是如何發送資料的:

// TcpSocketSender 往channel中寫入資料,此處有興趣的同學可以延伸下 netty 的源碼!
    private void sendInternal(MessageTree tree) {
        ChannelFuture future = m_manager.channel();
        ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(10 * 1024); // 10K

        m_codec.encode(tree, buf);

        int size = buf.readableBytes();
        Channel channel = future.channel();

        // 以 ByteBuf 形式發送資料
        channel.writeAndFlush(buf);
        // 更新統計資料
        if (m_statistics != null) { 
            m_statistics.onBytes(size);
        }
    }      

// TcpSocketReceiver, 接收發送過來的資料,預設端口 2280, 注冊服務,線上為分布式部署,即為接口調用式。

public void init() {
        try {
            startServer(m_port);
        } catch (Throwable e) {
            m_logger.error(e.getMessage(), e);
        }
    }

    public synchronized void startServer(int port) throws InterruptedException {
        boolean linux = getOSMatches("Linux") || getOSMatches("LINUX");
        int threads = 24;
        ServerBootstrap bootstrap = new ServerBootstrap();

        m_bossGroup = linux ? new EpollEventLoopGroup(threads) : new NioEventLoopGroup(threads);
        m_workerGroup = linux ? new EpollEventLoopGroup(threads) : new NioEventLoopGroup(threads);
        bootstrap.group(m_bossGroup, m_workerGroup);
        bootstrap.channel(linux ? EpollServerSocketChannel.class : NioServerSocketChannel.class);

        // 添加處理handler, 進行請求邏輯處理
        bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                ChannelPipeline pipeline = ch.pipeline();
                
                // 此處僅為一個解碼器,實際功能在該解碼器中完成
                pipeline.addLast("decode", new MessageDecoder());
            }
        });

        bootstrap.childOption(ChannelOption.SO_REUSEADDR, true);
        bootstrap.childOption(ChannelOption.TCP_NODELAY, true);
        bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
        bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);

        try {
            m_future = bootstrap.bind(port).sync();
            m_logger.info("start netty server!");
        } catch (Exception e) {
            m_logger.error("Started Netty Server Failed:" + port, e);
        }
    }

// 消息解碼器,并處理具體業務邏輯,先确認資料已上傳完成,再進行邏輯處理
    public class MessageDecoder extends ByteToMessageDecoder {

        @Override
        protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
            if (buffer.readableBytes() < 4) {
                return;
            }
            buffer.markReaderIndex();
            int length = buffer.readInt();
            buffer.resetReaderIndex();
            if (buffer.readableBytes() < length + 4) {
                return;
            }
            try {
                if (length > 0) {
                    ByteBuf readBytes = buffer.readBytes(length + 4);
                    readBytes.markReaderIndex();
                    readBytes.readInt();
            // 消息解碼,擷取頭資訊,消息體
                    DefaultMessageTree tree = (DefaultMessageTree) m_codec.decode(readBytes);

                    readBytes.resetReaderIndex();
                    tree.setBuffer(readBytes);
                    // 交由handler處理實際邏輯
                    m_handler.handle(tree);
                    m_processCount++;

                    long flag = m_processCount % CatConstants.SUCCESS_COUNT;

                    if (flag == 0) {
                        m_serverStateManager.addMessageTotal(CatConstants.SUCCESS_COUNT);
                    }
                } else {
                    // client message is error
                    buffer.readBytes(length);
                }
            } catch (Exception e) {
                m_serverStateManager.addMessageTotalLoss(1);
                m_logger.error(e.getMessage(), e);
            }
        }
    }
          

// 消息解碼,為後續處理提供基礎設施

應用監控CAT之cat-consumer源碼閱讀(二)
應用監控CAT之cat-consumer源碼閱讀(二)
//PlainTextMessageCodec.decode() 消息解碼
    @Override
    public MessageTree decode(ByteBuf buf) {
        MessageTree tree = new DefaultMessageTree();

        // 使用一個預設消息樹,用于接收消息
        decode(buf, tree);
        return tree;
    }

    @Override
    public void decode(ByteBuf buf, MessageTree tree) {
        Context ctx = m_ctx.get().setBuffer(buf);

        // 解析頭資訊
        decodeHeader(ctx, tree);

        // 解析消息體
        if (buf.readableBytes() > 0) {
            decodeMessage(ctx, tree);
        }
    }
    // 解析頭資訊,以tab='\t' 和 lf='\n', 進行分割
    protected void decodeHeader(Context ctx, MessageTree tree) {
        BufferHelper helper = m_bufferHelper;
        String id = helper.read(ctx, TAB);
        String domain = helper.read(ctx, TAB);
        String hostName = helper.read(ctx, TAB);
        String ipAddress = helper.read(ctx, TAB);
        String threadGroupName = helper.read(ctx, TAB);
        String threadId = helper.read(ctx, TAB);
        String threadName = helper.read(ctx, TAB);
        String messageId = helper.read(ctx, TAB);
        String parentMessageId = helper.read(ctx, TAB);
        String rootMessageId = helper.read(ctx, TAB);
        String sessionToken = helper.read(ctx, LF);

        if (VERSION.equals(id)) {
            tree.setDomain(domain);
            tree.setHostName(hostName);
            tree.setIpAddress(ipAddress);
            tree.setThreadGroupName(threadGroupName);
            tree.setThreadId(threadId);
            tree.setThreadName(threadName);
            tree.setMessageId(messageId);
            tree.setParentMessageId(parentMessageId);
            tree.setRootMessageId(rootMessageId);
            tree.setSessionToken(sessionToken);
        } else {
            throw new RuntimeException(String.format("Unrecognized id(%s) for plain text message codec!", id));
        }
    }
    // 解析消息體
    protected void decodeMessage(Context ctx, MessageTree tree) {
        Stack<DefaultTransaction> stack = new Stack<DefaultTransaction>();
        Message parent = decodeLine(ctx, null, stack);

        tree.setMessage(parent);

        // 循環讀取消息體,直到讀取完成
        while (ctx.getBuffer().readableBytes() > 0) {
            Message message = decodeLine(ctx, (DefaultTransaction) parent, stack);

            if (message instanceof DefaultTransaction) {
                parent = message;
            } else {
                break;
            }
        }
    }
    // 解析内容棧出來
    protected Message decodeLine(Context ctx, DefaultTransaction parent, Stack<DefaultTransaction> stack) {
        BufferHelper helper = m_bufferHelper;
        byte identifier = ctx.getBuffer().readByte();
        String timestamp = helper.read(ctx, TAB);
        String type = helper.read(ctx, TAB);
        String name = helper.read(ctx, TAB);

        switch (identifier) {
        // t: transaction 類型消息, T: pop結束, E:Event, M: Metrics, L: Trace, H: heartbeat 消息
        case 't':
            DefaultTransaction transaction = new DefaultTransaction(type, name, null);

            helper.read(ctx, LF); // get rid of line feed
            transaction.setTimestamp(m_dateHelper.parse(timestamp));

            if (parent != null) {
                parent.addChild(transaction);
            }

            stack.push(parent);
            return transaction;
        case 'A':
            DefaultTransaction tran = new DefaultTransaction(type, name, null);
            String status = helper.read(ctx, TAB);
            String duration = helper.read(ctx, TAB);
            String data = helper.read(ctx, TAB);

            helper.read(ctx, LF); // get rid of line feed
            tran.setTimestamp(m_dateHelper.parse(timestamp));
            tran.setStatus(status);
            tran.addData(data);

            long d = Long.parseLong(duration.substring(0, duration.length() - 2));
            tran.setDurationInMicros(d);

            if (parent != null) {
                parent.addChild(tran);
                return parent;
            } else {
                return tran;
            }
        case 'T':
            String transactionStatus = helper.read(ctx, TAB);
            String transactionDuration = helper.read(ctx, TAB);
            String transactionData = helper.read(ctx, TAB);

            helper.read(ctx, LF); // get rid of line feed
            parent.setStatus(transactionStatus);
            parent.addData(transactionData);

            long transactionD = Long.parseLong(transactionDuration.substring(0, transactionDuration.length() - 2));

            parent.setDurationInMicros(transactionD);

            return stack.pop();
        case 'E':
            DefaultEvent event = new DefaultEvent(type, name);
            String eventStatus = helper.read(ctx, TAB);
            String eventData = helper.read(ctx, TAB);

            helper.read(ctx, LF); // get rid of line feed
            event.setTimestamp(m_dateHelper.parse(timestamp));
            event.setStatus(eventStatus);
            event.addData(eventData);

            if (parent != null) {
                parent.addChild(event);
                return parent;
            } else {
                return event;
            }
        case 'M':
            DefaultMetric metric = new DefaultMetric(type, name);
            String metricStatus = helper.read(ctx, TAB);
            String metricData = helper.read(ctx, TAB);

            helper.read(ctx, LF); // get rid of line feed
            metric.setTimestamp(m_dateHelper.parse(timestamp));
            metric.setStatus(metricStatus);
            metric.addData(metricData);

            if (parent != null) {
                parent.addChild(metric);
                return parent;
            } else {
                return metric;
            }
        case 'L':
            DefaultTrace trace = new DefaultTrace(type, name);
            String traceStatus = helper.read(ctx, TAB);
            String traceData = helper.read(ctx, TAB);

            helper.read(ctx, LF); // get rid of line feed
            trace.setTimestamp(m_dateHelper.parse(timestamp));
            trace.setStatus(traceStatus);
            trace.addData(traceData);

            if (parent != null) {
                parent.addChild(trace);
                return parent;
            } else {
                return trace;
            }
        case 'H':
            DefaultHeartbeat heartbeat = new DefaultHeartbeat(type, name);
            String heartbeatStatus = helper.read(ctx, TAB);
            String heartbeatData = helper.read(ctx, TAB);

            helper.read(ctx, LF); // get rid of line feed
            heartbeat.setTimestamp(m_dateHelper.parse(timestamp));
            heartbeat.setStatus(heartbeatStatus);
            heartbeat.addData(heartbeatData);

            if (parent != null) {
                parent.addChild(heartbeat);
                return parent;
            } else {
                return heartbeat;
            }
        default:
            m_logger.warn("Unknown identifier(" + (char) identifier + ") of message: "
                  + ctx.getBuffer().toString(Charset.forName("utf-8")));
            throw new RuntimeException("Unknown identifier int name");
        }
    }      

View Code

// handler 處理流程,由DefaultMessageHandler接手,安排後續工作。

// DefaultMessageHandler, 接過處理器的第一棒, 交由另一實際的consumer(RealtimeConsumer) handler處理
    @Override
    public void handle(MessageTree tree) {
        if (m_consumer == null) {
            m_consumer = lookup(MessageConsumer.class);
        }

        try {
            m_consumer.consume(tree);
        } catch (Throwable e) {
            m_logger.error("Error when consuming message in " + m_consumer + "! tree: " + tree, e);
        }
    }
    // RealtimeConsumer, 進行消費資料
    @Override
    public void consume(MessageTree tree) {
        String domain = tree.getDomain();
        String ip = tree.getIpAddress();

        // 進行權限檢測,ip,domain
        if (!m_blackListManager.isBlack(domain, ip)) {
            long timestamp = tree.getMessage().getTimestamp();
            Period period = m_periodManager.findPeriod(timestamp);

            // 找到period, 再将消息配置設定過去,否則算作網絡異常
            if (period != null) {
                period.distribute(tree);
            } else {
                m_serverStateManager.addNetworkTimeError(1);
            }
        } else {
            m_black++;

            if (m_black % CatConstants.SUCCESS_COUNT == 0) {
                Cat.logEvent("Discard", domain);
            }
        }
    }      

// Period.distribute, 将消息依次取出,進行分發到隊列

public void distribute(MessageTree tree) {
        // 統計進行數進行加1
        m_serverStateManager.addMessageTotal(tree.getDomain(), 1);
        boolean success = true;
        String domain = tree.getDomain();

        // 将各種類型的監控資料分别取出進行處理
        for (Entry<String, List<PeriodTask>> entry : m_tasks.entrySet()) {
            List<PeriodTask> tasks = entry.getValue();
            int length = tasks.size();
            int index = 0;
            boolean manyTasks = length > 1;

            if (manyTasks) {
                index = Math.abs(domain.hashCode()) % length;
            }
            PeriodTask task = tasks.get(index);
            // 如果有金條消息,将task重新入隊
            boolean enqueue = task.enqueue(tree);

            if (enqueue == false) {
                if (manyTasks) {
                    task = tasks.get((index + 1) % length);
                    enqueue = task.enqueue(tree);

                    if (enqueue == false) {
                        success = false;
                    }
                } else {
                    success = false;
                }
            }
        }

        if (!success) {
            m_serverStateManager.addMessageTotalLoss(tree.getDomain(), 1);
        }
    }
    // PeriodTask.enqueue, 重新入隊消息,讓消費線程自行消費 LinkedBlockingQueue.offer(..)
    public boolean enqueue(MessageTree tree) {
        boolean result = m_queue.offer(tree);

        if (!result) { // trace queue overflow, 記錄入隊失敗日志
            m_queueOverflow++;

            if (m_queueOverflow % (10 * CatConstants.ERROR_COUNT) == 0) {
                m_logger.warn(m_analyzer.getClass().getSimpleName() + " queue overflow number " + m_queueOverflow);
            }
        }
        return result;
    }      

到此,一條消費線路就完成了。

// PeriodTask 線程,作為第二個消費線路

@Override
    public void run() {
        try {
            // 分析各消息資料,做背景消費處理
            m_analyzer.analyze(m_queue);
        } catch (Exception e) {
            Cat.logError(e);
        }
    }
    // 調用統一的抽象類的模闆方法,由各類進行具體的 process 處理
    @Override
    public void analyze(MessageQueue queue) {
        while (!isTimeout() && isActive()) {
            MessageTree tree = queue.poll();

            if (tree != null) {
                try {
                    // 調用具體類的process 
                    process(tree);
                } catch (Throwable e) {
                    m_errors++;

                    if (m_errors == 1 || m_errors % 10000 == 0) {
                        Cat.logError(e);
                    }
                }
            }
        }

        // 如果出現逾時或者停止動作,則把剩餘隊列處理完成再退出線程
        while (true) {
            MessageTree tree = queue.poll();

            if (tree != null) {
                try {
                    process(tree);
                } catch (Throwable e) {
                    m_errors++;

                    if (m_errors == 1 || m_errors % 10000 == 0) {
                        Cat.logError(e);
                    }
                }
            } else {
                break;
            }
        }
    }
    // 逾時規則,目前時間 > 開始時間+1小時+設定額外逾時時間
    protected boolean isTimeout() {
        long currentTime = System.currentTimeMillis();
        long endTime = m_startTime + m_duration + m_extraTime;

        return currentTime > endTime;
    }
          

// 具體的 Anlalyzer示例: DumpAnlalyzer.process

// 具體的 Anlalyzer示例: DumpAnlalyzer.process
    @Override
    public void process(MessageTree tree) {
        String domain = tree.getDomain();

        if ("PhoenixAgent".equals(domain)) {
            return;
        } else {
            MessageId messageId = MessageId.parse(tree.getMessageId());

            if (messageId.getVersion() == 2) {
                // 計算出目前時間範圍,
                long time = tree.getMessage().getTimestamp();
                long fixedTime = time - time % (TimeHelper.ONE_HOUR);
                long idTime = messageId.getTimestamp();
                long duration = fixedTime - idTime;

                if (duration == 0 || duration == ONE_HOUR || duration == -ONE_HOUR) {
                    m_bucketManager.storeMessage(tree, messageId);
                } else {
                    m_serverStateManager.addPigeonTimeError(1);
                }
            }
        }
    }
// 存儲log消息到本地檔案,并後續上傳到hdfs
    @Override
    public void storeMessage(final MessageTree tree, final MessageId id) {
        boolean errorFlag = true;
        int hash = Math.abs((id.getDomain() + '-' + id.getIpAddress()).hashCode());
        int index = (int) (hash % m_gzipThreads);
        MessageItem item = new MessageItem(tree, id);
        LinkedBlockingQueue<MessageItem> queue = m_messageQueues.get(index % (m_gzipThreads - 1));
        boolean result = queue.offer(item);

        if (result) {
            errorFlag = false;
        } else {
            if (m_last.offer(item)) {
                errorFlag = false;
            }
        }

        if (errorFlag) {
            m_serverStateManager.addMessageDumpLoss(1);
        }
        logStorageState(tree);
    }
    // 每1000個消息添加一個messageDump=1000
    protected void logStorageState(final MessageTree tree) {
        String domain = tree.getDomain();
        int size = ((DefaultMessageTree) tree).getBuffer().readableBytes();

        m_serverStateManager.addMessageSize(domain, size);
        if ((++m_total) % CatConstants.SUCCESS_COUNT == 0) {
            m_serverStateManager.addMessageDump(CatConstants.SUCCESS_COUNT);
        }
    }      

// EventAnalyzer.process 處理event消息

@Override
    public void process(MessageTree tree) {
        String domain = tree.getDomain();

        if (m_serverFilterConfigManager.validateDomain(domain)) {
            EventReport report = m_reportManager.getHourlyReport(getStartTime(), domain, true);
            Message message = tree.getMessage();
            String ip = tree.getIpAddress();

            if (message instanceof Transaction) {
                processTransaction(report, tree, (Transaction) message, ip);
            } else if (message instanceof Event) {
                processEvent(report, tree, (Event) message, ip);
            }
        }
    }
    // 循環處理多個transation
    private void processTransaction(EventReport report, MessageTree tree, Transaction t, String ip) {
        List<Message> children = t.getChildren();

        for (Message child : children) {
            if (child instanceof Transaction) {
                processTransaction(report, tree, (Transaction) child, ip);
            } else if (child instanceof Event) {
                processEvent(report, tree, (Event) child, ip);
            }
        }
    }
// StateAnalyzer.process 對cat的機器作展示
    @Override
    protected void process(MessageTree tree) {
        String domain = tree.getDomain();

        if (m_serverFilterConfigManager.validateDomain(domain)) {
            StateReport report = m_reportManager.getHourlyReport(getStartTime(), Constants.CAT, true);
            String ip = tree.getIpAddress();
            Machine machine = report.findOrCreateMachine(NetworkInterfaceManager.INSTANCE.getLocalHostAddress());

            machine.findOrCreateProcessDomain(domain).addIp(ip);
        }
    }      

// 所有分析線程,由 Period 進行初始化啟動所有的Analyzer備用

public void start() {
        SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");

        m_logger.info(String.format("Starting %s tasks in period [%s, %s]", m_tasks.size(),
              df.format(new Date(m_startTime)), df.format(new Date(m_endTime - 1))));

        for (Entry<String, List<PeriodTask>> tasks : m_tasks.entrySet()) {
            List<PeriodTask> taskList = tasks.getValue();

            for (int i = 0; i < taskList.size(); i++) {
                PeriodTask task = taskList.get(i);

                task.setIndex(i);

                Threads.forGroup("Cat-RealtimeConsumer").start(task);
            }
        }
    }      

// 為保證高可用,使用 ChannelManager, 專門檢查channel通道是否仍然存活,如果出問題,則發起重連。

@Override
    public void run() {
        while (m_active) {
            // make save message id index asyc
            m_idfactory.saveMark();
            checkServerChanged();

            ChannelFuture activeFuture = m_activeChannelHolder.getActiveFuture();
            List<InetSocketAddress> serverAddresses = m_activeChannelHolder.getServerAddresses();

            doubleCheckActiveServer(activeFuture);
            reconnectDefaultServer(activeFuture, serverAddresses);

            try {
                Thread.sleep(10 * 1000L); // check every 10 seconds
            } catch (InterruptedException e) {
                // ignore
            }
        }
    }
          

// 最後一個關鍵點也是很重要的一個點,PeriodManager, 用于滾動式處理每小時的監控資料,儲存資料到磁盤

// PeriodManager, 用于滾動式處理每小時的監控資料

public class PeriodManager implements Task {
    private PeriodStrategy m_strategy;

    private List<Period> m_periods = new ArrayList<Period>();

    private boolean m_active;

    @Inject
    private MessageAnalyzerManager m_analyzerManager;

    @Inject
    private ServerStatisticManager m_serverStateManager;

    @Inject
    private Logger m_logger;

    public static long EXTRATIME = 3 * 60 * 1000L;

    public PeriodManager(long duration, MessageAnalyzerManager analyzerManager,
          ServerStatisticManager serverStateManager, Logger logger) {
        m_strategy = new PeriodStrategy(duration, EXTRATIME, EXTRATIME);
        m_active = true;
        m_analyzerManager = analyzerManager;
        m_serverStateManager = serverStateManager;
        m_logger = logger;
    }

    private void endPeriod(long startTime) {
        int len = m_periods.size();

        for (int i = 0; i < len; i++) {
            Period period = m_periods.get(i);

            if (period.isIn(startTime)) {
                period.finish();
                m_periods.remove(i);
                break;
            }
        }
    }

    public void init() {
        long startTime = m_strategy.next(System.currentTimeMillis());

        startPeriod(startTime);
    }

    @Override
    public void run() {
        while (m_active) {
            try {
                long now = System.currentTimeMillis();
                long value = m_strategy.next(now);

                if (value > 0) {
                    startPeriod(value);
                } else if (value < 0) {
                    // 上個運作周期,即1小時已完成後,啟用一個結束線程進行規劃原來的資料
                    Threads.forGroup("cat").start(new EndTaskThread(-value));
                }
            } catch (Throwable e) {
                Cat.logError(e);
            }

            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
                break;
            }
        }
    }

    private void startPeriod(long startTime) {
        long endTime = startTime + m_strategy.getDuration();
        Period period = new Period(startTime, endTime, m_analyzerManager, m_serverStateManager, m_logger);

        m_periods.add(period);
        period.start();
    }

    private class EndTaskThread implements Task {

        private long m_startTime;

        public EndTaskThread(long startTime) {
            m_startTime = startTime;
        }

        @Override
        public void run() {
            // 調用外部類的結束方法
            endPeriod(m_startTime);
        }

    }
}
    // Period.finish(), 結束
    public void finish() {
        SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        Date startDate = new Date(m_startTime);
        Date endDate = new Date(m_endTime - 1);

        m_logger.info(String.format("Finishing %s tasks in period [%s, %s]", m_tasks.size(), df.format(startDate),
              df.format(endDate)));

        try {
            for (Entry<String, List<PeriodTask>> tasks : m_tasks.entrySet()) {
                for (PeriodTask task : tasks.getValue()) {
                    task.finish();
                }
            }
        } catch (Throwable e) {
            Cat.logError(e);
        } finally {
            m_logger.info(String.format("Finished %s tasks in period [%s, %s]", m_tasks.size(), df.format(startDate),
                  df.format(endDate)));
        }
    }
    // PeriodTask.finish(), 真正處理上一周期資料
    public void finish() {
        try {
            // 調用各自分析器的 doCheckpoint 檢查,後續處理
            m_analyzer.doCheckpoint(true);
            // 銷毀分析器, help gc
            m_analyzer.destroy();
        } catch (Exception e) {
            Cat.logError(e);
        }
    }
    // 舉例 EventAnalyzer.doCheckpoint, 需加鎖處理
    @Override
    public synchronized void doCheckpoint(boolean atEnd) {
        if (atEnd && !isLocalMode()) {
            m_reportManager.storeHourlyReports(getStartTime(), StoragePolicy.FILE_AND_DB, m_index);
        } else {
            m_reportManager.storeHourlyReports(getStartTime(), StoragePolicy.FILE, m_index);
        }
    }
    // DefaultReportManager.storeHourlyReports, 存儲logview, 存在統計資料到db
    @Override
    public void storeHourlyReports(long startTime, StoragePolicy policy, int index) {
        Transaction t = Cat.newTransaction("Checkpoint", m_name);
        Map<String, T> reports = m_reports.get(startTime);
        ReportBucket bucket = null;

        try {
            t.addData("reports", reports == null ? 0 : reports.size());

            if (reports != null) {
                Set<String> errorDomains = new HashSet<String>();

                for (String domain : reports.keySet()) {
                    if (!m_validator.validate(domain)) {
                        errorDomains.add(domain);
                    }
                }
                for (String domain : errorDomains) {
                    reports.remove(domain);
                }
                if (!errorDomains.isEmpty()) {
                    m_logger.info("error domain:" + errorDomains);
                }

                m_reportDelegate.beforeSave(reports);

                if (policy.forFile()) {
                    bucket = m_bucketManager.getReportBucket(startTime, m_name, index);

                    try {
                        storeFile(reports, bucket);
                    } finally {
                        m_bucketManager.closeBucket(bucket);
                    }
                }

                if (policy.forDatabase()) {
                    storeDatabase(startTime, reports);
                }
            }
            t.setStatus(Message.SUCCESS);
        } catch (Throwable e) {
            Cat.logError(e);
            t.setStatus(e);
            m_logger.error(String.format("Error when storing %s reports of %s!", m_name, new Date(startTime)), e);
        } finally {
            cleanup(startTime);
            t.complete();

            if (bucket != null) {
                m_bucketManager.closeBucket(bucket);
            }
        }
    }
    // DefaultReportManager.storeDatabase, 存儲    db
    private void storeDatabase(long startTime, Map<String, T> reports) {
        Date period = new Date(startTime);
        String ip = NetworkInterfaceManager.INSTANCE.getLocalHostAddress();

        for (T report : reports.values()) {
            try {
                String domain = m_reportDelegate.getDomain(report);
                HourlyReport r = m_reportDao.createLocal();

                r.setName(m_name);
                r.setDomain(domain);
                r.setPeriod(period);
                r.setIp(ip);
                r.setType(1);

                m_reportDao.insert(r);

                int id = r.getId();
                byte[] binaryContent = m_reportDelegate.buildBinary(report);
                HourlyReportContent content = m_reportContentDao.createLocal();

                content.setReportId(id);
                content.setContent(binaryContent);
                m_reportContentDao.insert(content);
                m_reportDelegate.createHourlyTask(report);
            } catch (Throwable e) {
                Cat.getProducer().logError(e);
            }
        }
    }
    // DefaultReportManager.storeFile, 存在file, xml
    private void storeFile(Map<String, T> reports, ReportBucket bucket) {
        for (T report : reports.values()) {
            try {
                String domain = m_reportDelegate.getDomain(report);
                String xml = m_reportDelegate.buildXml(report);

                bucket.storeById(domain, xml);
            } catch (Exception e) {
                Cat.logError(e);
            }
        }
    }      

總結起來就幾個東西:

  1. 使用netty開啟高性能的接收服務;

  2. 使用隊列進行儲存消息;

  3. 使用單獨線程檢測channel有效性,保證高可用;

  4. 所有單小時的資料,儲存在記憶體中,速度特别快;

  5. 多線程技術發揮得很好;

  6. 模闆模式的應用,阻塞隊列的應用;

  7. hdfs的應用,優雅停機的應用;

等等,來個圖展示下:

應用監控CAT之cat-consumer源碼閱讀(二)

task 運作過程:

應用監控CAT之cat-consumer源碼閱讀(二)

周期報告,彙總:

應用監控CAT之cat-consumer源碼閱讀(二)

不要害怕今日的苦,你要相信明天,更苦!