天天看點

Apache Storm 實時流處理系統通信機制源碼分析

        我們今天就來仔細研究一下Apache Storm 2.0.0-SNAPSHOT的通信機制。下面我将從大緻思想以及源碼分析,然後我們細緻分析實時流處理系統中源碼通信機制研究。

1. 簡介

        Worker間的通信經常需要通過網絡跨節點進行,Storm使用ZeroMQ或Netty(0.9以後預設使用)作為程序間通信的消息架構。

Worker程序内部通信:不同worker的thread通信使用LMAX Disruptor來完成。

不同topologey之間的通信,Storm不負責,需要自己想辦法實作,例如使用kafka等;

Worker程序間消息傳遞機制,消息的接收和處理的大概流程見下圖:

Apache Storm 實時流處理系統通信機制源碼分析

        實時流處理系統一般都有一個Worker程序用來配置設定資源,Worker程序是資源配置設定的最小機關。每個Worker程序中又包含多個Executor,Executor是用來真正執行Task的元件,裡面包含一個工作線程和發送線程。每個Executor都有自己的接收隊列和發送隊列。

1.每個Worker程序都有一個單獨的接受線程監聽接收端口。Worker接收線程将收到的消息通過Task編号傳遞給對應的Executor(一個或則多個)接收隊列。Worker接收線程将每個從網絡上傳來的消息發送到相應的Executor的接收隊列中。Executor接收隊列存放Worker或者Worker内部其他xecutor發送過來的消息。

2. Executor工作線程從接收隊列中拿出資料,然後調用execute方法,發送Tuple到Executor的發送隊列。

3. Executor的發送線程從發送隊列中擷取消息,按照消息目的位址選擇發送到Worker的傳輸隊列中或者其他Executor的接收隊列中。

4. 最後Worker的發送線程從傳輸隊列中讀取消息,然後将Tuple元組發送到網絡中。

1.1 具體細節

       對于worker程序來說,為了管理流入和傳出的消息,每個worker程序有一個獨立的接收線程(對配置的TCP端口supervisor.slots.ports進行監聽);

        每個executor有自己的sendQueue和receiveQueue。

        Worker接收線程将收到的消息通過task編号傳遞給對應的executor(一個或多個)的receiveQueue;

        每個executor有單獨的線程分别來處理spout/bolt的業務邏輯,業務邏輯輸出的中間資料會存放在sendQueue中,當executor的sendQueue中的tuple達到一定的閥值,executor的發送線程将批量擷取sendQueue中的tuple,并發送到TransferQueue中。

        每個worker程序控制一個或多個executor線程,使用者可在代碼中進行配置。其實就是我們在代碼中設定的并發度個數。

        一個worker程序運作一個專用的接收線程來負責将外部發送過來的消息移動到對應的executor線程的receiveQueue中

        TransferQueue的大小由參數topology.transfer.buffer.size來設定。TransferQueue的每個元素實際上代表一個tuple的集合

        TransferQueue的大小由參數topology.transfer.buffer.size來設定。

        executor的sendQueue的大小使用者可以自定義配置。

        executor的receiveQueue的大小使用者可以自定義配置

1.2 Worker程序間通信分析

Apache Storm 實時流處理系統通信機制源碼分析

1、 Worker接受線程通過網絡接受資料,并根據Tuple中包含的taskId,比對到對應的executor;然後根據executor找到對應的incoming-queue,将資料存發送到receiveQueue隊列中。

2、 業務邏輯執行現成消費receiveQueue的資料,通過調用Bolt的execute(xxxx)方法,将Tuple作為參數傳輸給使用者自定義的方法

3、 業務邏輯執行完畢之後,将計算的中間資料發送給sendQueue隊列,當sendQueue中的tuple達到一定的閥值,executor的發送線程将批量擷取sendQueue中的tuple,并發送到Worker的TransferQueue中

4、 Worker發送線程消費TransferQueue中資料,計算Tuple的目的地,連接配接不同的node+port将資料通過網絡傳輸的方式傳送給另一個的Worker。

5、 另一個worker執行以上步驟1的操作。

2. Storm 通信機制源碼分析

2.1 Spout/Bolt 發送Tuple資料

        SpoutOutputCollector 調用emit方法,然後在emit方法中調用sendSpoutMsg方法。我們仔細分析sendSpoutMsg方法,首先根據emit方法中指定的stream和values,調用taskData的taskData.getOutgoingTasks(stream, values)的方法,擷取資料要發往哪個TaskID(根據上遊spout和下遊bolt之間的分組資訊)。然後根據這個TaskID,循環周遊将資料封裝成TupleImpl。然後outputCollector通過調用executor的ExecutorTransfer類的transfer方法()将tuple添加目标taskId資訊,封裝成AddressTuple,将資料發送到相應的目标Task。

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.storm.executor.spout;

public class SpoutOutputCollectorImpl implements ISpoutOutputCollector {

    private final SpoutExecutor executor;
    private final Task taskData;
    private final int taskId;
    private final MutableLong emittedCount;
    private final boolean hasAckers;
    private final Random random;
    private final Boolean isEventLoggers;
    private final Boolean isDebug;
    private final RotatingMap<Long, TupleInfo> pending;

    @SuppressWarnings("unused")
    public SpoutOutputCollectorImpl(ISpout spout, SpoutExecutor executor, Task taskData, int taskId,
                                    MutableLong emittedCount, boolean hasAckers, Random random,
                                    Boolean isEventLoggers, Boolean isDebug, RotatingMap<Long, TupleInfo> pending) {
        this.executor = executor;
        this.taskData = taskData;
        this.taskId = taskId;
        this.emittedCount = emittedCount;
        this.hasAckers = hasAckers;
        this.random = random;
        this.isEventLoggers = isEventLoggers;
        this.isDebug = isDebug;
        this.pending = pending;
    }

    @Override
    public List<Integer> emit(String streamId, List<Object> tuple, Object messageId) {
        return sendSpoutMsg(streamId, tuple, messageId, null);
    }

    @Override
    public void emitDirect(int taskId, String streamId, List<Object> tuple, Object messageId) {
        sendSpoutMsg(streamId, tuple, messageId, taskId);
    }

    @Override
    public long getPendingCount() {
        return pending.size();
    }

    @Override
    public void reportError(Throwable error) {
        executor.getErrorReportingMetrics().incrReportedErrorCount();
        executor.getReportError().report(error);
    }

    /**
     * 1. 首先Spout調用sendSpoutMsg() 發送一個tuple到下遊bolt
     * @param stream
     * @param values
     * @param messageId
     * @param outTaskId
     * @return
     */
    private List<Integer> sendSpoutMsg(String stream, List<Object> values, Object messageId, Integer outTaskId) {
        emittedCount.increment();

        List<Integer> outTasks;
        if (outTaskId != null) {
            outTasks = taskData.getOutgoingTasks(outTaskId, stream, values);
        } else {
            outTasks = taskData.getOutgoingTasks(stream, values);
        }

        List<Long> ackSeq = new ArrayList<>();
        boolean needAck = (messageId != null) && hasAckers;

        long rootId = MessageId.generateId(random);
        //2.根據上遊spout和下遊bolt之間的分組資訊,将tuple發送到下遊相應的task中,并且封裝成TupleImpl類
        for (Integer t : outTasks) {
            MessageId msgId;
            if (needAck) {
                long as = MessageId.generateId(random);
                msgId = MessageId.makeRootId(rootId, as);
                ackSeq.add(as);
            } else {
                msgId = MessageId.makeUnanchored();
            }

            TupleImpl tuple = new TupleImpl(executor.getWorkerTopologyContext(), values, this.taskId, stream, msgId);
            //3.outputCollector調用executor的ExecutorTransfer類的transfer方法()将tuple添加目标taskId資訊,封裝成AddressTuple
            executor.getExecutorTransfer().transfer(t, tuple);
        }
        if (isEventLoggers) {
            executor.sendToEventLogger(executor, taskData, values, executor.getComponentId(), messageId, random);
        }

        boolean sample = false;
        try {
            sample = executor.getSampler().call();
        } catch (Exception ignored) {
        }
        if (needAck) {
            TupleInfo info = new TupleInfo();
            info.setTaskId(this.taskId);
            info.setStream(stream);
            info.setMessageId(messageId);
            if (isDebug) {
                info.setValues(values);
            }
            if (sample) {
                info.setTimestamp(System.currentTimeMillis());
            }

            pending.put(rootId, info);
            List<Object> ackInitTuple = new Values(rootId, Utils.bitXorVals(ackSeq), this.taskId);
            executor.sendUnanchored(taskData, Acker.ACKER_INIT_STREAM_ID, ackInitTuple, executor.getExecutorTransfer());
        } else if (messageId != null) {
            TupleInfo info = new TupleInfo();
            info.setStream(stream);
            info.setValues(values);
            info.setMessageId(messageId);
            info.setTimestamp(0);
            Long timeDelta = sample ? 0L : null;
            info.setId("0:");
            executor.ackSpoutMsg(executor, taskData, timeDelta, info);
        }

        return outTasks;
    }
}
           

        BoltOutputCollector 調用emit方法,然後在emit方法中調用BoltEmit方法。我們仔細分析BoltEmit方法,首先根據emit方法中指定的stream和values,調用taskData的taskData.getOutgoingTasks(stream, values)的方法,擷取資料要發往哪個TaskID。然後根據這個TaskID,循環周遊将資料封裝成TupleImpl。然後再調用executor.getExecutorTransfer().transfer(t, tuple) 将資料發送到相應的目标Task。

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.storm.executor.bolt;

public class BoltOutputCollectorImpl implements IOutputCollector {

    private static final Logger LOG = LoggerFactory.getLogger(BoltOutputCollectorImpl.class);

    private final BoltExecutor executor;
    private final Task taskData;
    private final int taskId;
    private final Random random;
    private final boolean isEventLoggers;
    private final boolean isDebug;

    public BoltOutputCollectorImpl(BoltExecutor executor, Task taskData, int taskId, Random random,
                                   boolean isEventLoggers, boolean isDebug) {
        this.executor = executor;
        this.taskData = taskData;
        this.taskId = taskId;
        this.random = random;
        this.isEventLoggers = isEventLoggers;
        this.isDebug = isDebug;
    }

    public List<Integer> emit(String streamId, Collection<Tuple> anchors, List<Object> tuple) {
        return boltEmit(streamId, anchors, tuple, null);
    }

    @Override
    public void emitDirect(int taskId, String streamId, Collection<Tuple> anchors, List<Object> tuple) {
        boltEmit(streamId, anchors, tuple, taskId);
    }

    /**
     * 1.首先Bolt調用boltEmit() 發送一個tuple到下遊bolt
     * @param streamId
     * @param anchors
     * @param values
     * @param targetTaskId
     * @return
     */
    private List<Integer> boltEmit(String streamId, Collection<Tuple> anchors, List<Object> values, Integer targetTaskId) {
        List<Integer> outTasks;
        if (targetTaskId != null) {
            outTasks = taskData.getOutgoingTasks(targetTaskId, streamId, values);
        } else {
            outTasks = taskData.getOutgoingTasks(streamId, values);
        }

        for (Integer t : outTasks) {
            Map<Long, Long> anchorsToIds = new HashMap<>();
            if (anchors != null) {
                for (Tuple a : anchors) {
                    Set<Long> rootIds = a.getMessageId().getAnchorsToIds().keySet();
                    if (rootIds.size() > 0) {
                        long edgeId = MessageId.generateId(random);
                        ((TupleImpl) a).updateAckVal(edgeId);
                        for (Long root_id : rootIds) {
                            putXor(anchorsToIds, root_id, edgeId);
                        }
                    }
                }
            }
            MessageId msgId = MessageId.makeId(anchorsToIds);
            TupleImpl tupleExt = new TupleImpl(executor.getWorkerTopologyContext(), values, taskId, streamId, msgId);
            executor.getExecutorTransfer().transfer(t, tupleExt);
        }
        if (isEventLoggers) {
            executor.sendToEventLogger(executor, taskData, values, executor.getComponentId(), null, random);
        }
        return outTasks;
    }

    @Override
    public void ack(Tuple input) {
        long ackValue = ((TupleImpl) input).getAckVal();
        Map<Long, Long> anchorsToIds = input.getMessageId().getAnchorsToIds();
        for (Map.Entry<Long, Long> entry : anchorsToIds.entrySet()) {
            executor.sendUnanchored(taskData, Acker.ACKER_ACK_STREAM_ID,
                    new Values(entry.getKey(), Utils.bitXor(entry.getValue(), ackValue)),
                    executor.getExecutorTransfer());
        }
        long delta = tupleTimeDelta((TupleImpl) input);
        if (isDebug) {
            LOG.info("BOLT ack TASK: {} TIME: {} TUPLE: {}", taskId, delta, input);
        }
        BoltAckInfo boltAckInfo = new BoltAckInfo(input, taskId, delta);
        boltAckInfo.applyOn(taskData.getUserContext());
        if (delta >= 0) {
            ((BoltExecutorStats) executor.getStats()).boltAckedTuple(
                    input.getSourceComponent(), input.getSourceStreamId(), delta);
        }
    }

    @Override
    public void fail(Tuple input) {
        Set<Long> roots = input.getMessageId().getAnchors();
        for (Long root : roots) {
            executor.sendUnanchored(taskData, Acker.ACKER_FAIL_STREAM_ID,
                    new Values(root), executor.getExecutorTransfer());
        }
        long delta = tupleTimeDelta((TupleImpl) input);
        if (isDebug) {
            LOG.info("BOLT fail TASK: {} TIME: {} TUPLE: {}", taskId, delta, input);
        }
        BoltFailInfo boltFailInfo = new BoltFailInfo(input, taskId, delta);
        boltFailInfo.applyOn(taskData.getUserContext());
        if (delta >= 0) {
            ((BoltExecutorStats) executor.getStats()).boltFailedTuple(
                    input.getSourceComponent(), input.getSourceStreamId(), delta);
        }
    }

    @Override
    public void resetTimeout(Tuple input) {
        Set<Long> roots = input.getMessageId().getAnchors();
        for (Long root : roots) {
            executor.sendUnanchored(taskData, Acker.ACKER_RESET_TIMEOUT_STREAM_ID,
                    new Values(root), executor.getExecutorTransfer());
        }
    }

    @Override
    public void reportError(Throwable error) {
        executor.getErrorReportingMetrics().incrReportedErrorCount();
        executor.getReportError().report(error);
    }

    private long tupleTimeDelta(TupleImpl tuple) {
        Long ms = tuple.getProcessSampleStartTime();
        if (ms != null) {
            return Time.deltaMs(ms);
        }
        return -1;
    }

    private void putXor(Map<Long, Long> pending, Long key, Long id) {
        Long curr = pending.get(key);
        if (curr == null) {
            curr = 0l;
        }
        pending.put(key, Utils.bitXor(curr, id));
    }
}
           

2.2 ExecutorTransfer類調用transfer方法發送Tuple

        ExecutorTransfer類是Executor類的一個成員變量,用來将發送Executor中的Task資料的。在這裡,ExecutorTransfer将tuple添加目标task資訊,将tuple封裝成AddressedTuple。并将封裝後的結果AddressedTuple publish到batchTransferQueue隊列中。這個batchTransferQueue就說Executor中的發送隊列sendQueue。

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.storm.executor;

public class ExecutorTransfer implements EventHandler, Callable {
    private static final Logger LOG = LoggerFactory.getLogger(ExecutorTransfer.class);

    private final WorkerState workerData;
    private final DisruptorQueue batchTransferQueue;
    private final Map<String, Object> topoConf;
    private final KryoTupleSerializer serializer;
    private final MutableObject cachedEmit;
    private final boolean isDebug;

    public ExecutorTransfer(WorkerState workerData, DisruptorQueue batchTransferQueue, Map<String, Object> topoConf) {
        this.workerData = workerData;
        this.batchTransferQueue = batchTransferQueue;
        this.topoConf = topoConf;
        this.serializer = new KryoTupleSerializer(topoConf, workerData.getWorkerTopologyContext());
        this.cachedEmit = new MutableObject(new ArrayList<>());
        this.isDebug = ObjectReader.getBoolean(topoConf.get(Config.TOPOLOGY_DEBUG), false);
    }

    //4.ExecutorTransfer将tuple添加目标task資訊,将tuple封裝成AddressedTuple。并将封裝後的結果AddressedTuple publish到batchTransferQueue隊列中。
    // batchTransferQueue也就是Executor的發送隊列。
    public void transfer(int task, Tuple tuple) {
        AddressedTuple val = new AddressedTuple(task, tuple);
        if (isDebug) {
            LOG.info("TRANSFERRING tuple {}", val);
        }
        batchTransferQueue.publish(val);
    }


    @VisibleForTesting
    public DisruptorQueue getBatchTransferQueue() {
        return this.batchTransferQueue;
    }

    /**
     * 6.ExecutorTransfer的Call方法被調用。batchTransferQueue批量的消費消息
     * @return
     * @throws Exception
     */
    @Override
    public Object call() throws Exception {
        batchTransferQueue.consumeBatchWhenAvailable(this);
        return 0L;
    }

    public String getName() {
        return batchTransferQueue.getName();
    }

    /**
     * 7.相應事件,不斷的批量消費batchTransferQueue中的AddressedTuple對象
     * @param event
     * @param sequence
     * @param endOfBatch
     * @throws Exception
     */
    @Override
    public void onEvent(Object event, long sequence, boolean endOfBatch) throws Exception {
        ArrayList cachedEvents = (ArrayList) cachedEmit.getObject();
        cachedEvents.add(event);
        if (endOfBatch) {
            //8.調用WorkerState的transfer方法。對AddressedTuple進行序列化操作
            workerData.transfer(serializer, cachedEvents);
            cachedEmit.setObject(new ArrayList<>());
        }
    }
}
           

2.3 Executor 中發送隊列達到一定的門檻值後,就開始調用WorkerState的transfer方法

        Executor是Storm中正在執行任務的線程,Worker程序中會啟動所有的Executor。每個Executor都調用execute方法,在Executor方法中的execute線程中。Executor線程 執行execute()方法後,不斷的Loop調用executorTransfer的Callable接口。一旦sendQueue buffer達到一定的門檻值後,就開始調用ExecutorTransfer的Call方法。

       ExecutorTransfer類是Executor類的一個成員變量,用來将發送Executor中的Task資料的(如上節中所述)。當Executor中的sendQueue buffer達到一定的門檻值後,就開始調用ExecutorTransfer的Call方法。Executor的batchTransferQueue批量的消費消息,也就是Executor的sendQueue被批量消費消息,不斷的批量消費batchTransferQueue中的AddressedTuple對象,最後調用WorkerState的transfer方法,對AddressedTuple進行序列化操作,并且将資料發送到Worker的傳輸隊列中去。

public abstract class Executor implements Callable, EventHandler<Object> {

    private static final Logger LOG = LoggerFactory.getLogger(Executor.class);
........
    /**
     * separated from mkExecutor in order to replace executor transfer in executor data for testing
     */
    public ExecutorShutdown execute() throws Exception {
        LOG.info("Loading executor tasks " + componentId + ":" + executorId);

        registerBackpressure();
        //5.在Executor線程 執行execute()方法後,不斷的Loop調用executorTransfer的Callable接口。一旦sendQueue buffer達到一定的門檻值後。
        // 調用ExecutorTransfer的Call方法
        Utils.SmartThread systemThreads =
                Utils.asyncLoop(executorTransfer, executorTransfer.getName(), reportErrorDie);

        String handlerName = componentId + "-executor" + executorId;
        Utils.SmartThread handlers =
                Utils.asyncLoop(this, false, reportErrorDie, Thread.NORM_PRIORITY, true, true, handlerName);
        setupTicks(StatsUtil.SPOUT.equals(type));

        LOG.info("Finished loading executor " + componentId + ":" + executorId);
        return new ExecutorShutdown(this, Lists.newArrayList(systemThreads, handlers), idToTask, receiveQueue, sendQueue);
    .............}}
           

ExecutorTransfer源碼

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.storm.executor;

public class ExecutorTransfer implements EventHandler, Callable {
    private static final Logger LOG = LoggerFactory.getLogger(ExecutorTransfer.class);

    private final WorkerState workerData;
    private final DisruptorQueue batchTransferQueue;
    private final Map<String, Object> topoConf;
    private final KryoTupleSerializer serializer;
    private final MutableObject cachedEmit;
    private final boolean isDebug;

    public ExecutorTransfer(WorkerState workerData, DisruptorQueue batchTransferQueue, Map<String, Object> topoConf) {
        this.workerData = workerData;
        this.batchTransferQueue = batchTransferQueue;
        this.topoConf = topoConf;
        this.serializer = new KryoTupleSerializer(topoConf, workerData.getWorkerTopologyContext());
        this.cachedEmit = new MutableObject(new ArrayList<>());
        this.isDebug = ObjectReader.getBoolean(topoConf.get(Config.TOPOLOGY_DEBUG), false);
    }

    //4.ExecutorTransfer将tuple添加目标task資訊,将tuple封裝成AddressedTuple。并将封裝後的結果AddressedTuple publish到batchTransferQueue隊列中。
    // batchTransferQueue也就是Executor的發送隊列。
    public void transfer(int task, Tuple tuple) {
        AddressedTuple val = new AddressedTuple(task, tuple);
        if (isDebug) {
            LOG.info("TRANSFERRING tuple {}", val);
        }
        batchTransferQueue.publish(val);
    }


    @VisibleForTesting
    public DisruptorQueue getBatchTransferQueue() {
        return this.batchTransferQueue;
    }

    /**
     * 6.ExecutorTransfer的Call方法被調用。batchTransferQueue批量的消費消息
     * @return
     * @throws Exception
     */
    @Override
    public Object call() throws Exception {
        batchTransferQueue.consumeBatchWhenAvailable(this);
        return 0L;
    }

    public String getName() {
        return batchTransferQueue.getName();
    }

    /**
     * 7.相應事件,不斷的批量消費batchTransferQueue中的AddressedTuple對象
     * @param event
     * @param sequence
     * @param endOfBatch
     * @throws Exception
     */
    @Override
    public void onEvent(Object event, long sequence, boolean endOfBatch) throws Exception {
        ArrayList cachedEvents = (ArrayList) cachedEmit.getObject();
        cachedEvents.add(event);
        if (endOfBatch) {
            //8.調用WorkerState的transfer方法。對AddressedTuple進行序列化操作
            workerData.transfer(serializer, cachedEvents);
            cachedEmit.setObject(new ArrayList<>());
        }
    }
}
           

2.4 WorkerState調用transfer方法

        WorkerState是Worker類的一個成員變量,儲存着Worker中大量的存在的狀态以及對Worker程序通信以及一些操作。這裡介紹WorkerState的transfer方法。在WorkerState的Transfer方法中,首先定義了兩個局部變量。一個是List<AddressedTuple> local 還有一個是 Map<Integer, List<TaskMessage>> remoteMap。分别用來儲存Executor中AddressTuple發送到本地Worker程序和遠端Worke程序中相應TaskID的Map集合。

        然後Executor方法中不斷的對AddressedTuple進行序列化操作,并将要發送到相同的task的AddressedTuple進行打包批量的發送消息。如果需要發送到本地worker的taskid,我們調用WorkerState的transferLocal方法發送到本地,本地發送不需要序列化。需要發送到遠端Worker的消息,序列化後進行打包成Map<Integer, List<TaskMessage>>對象Pulish到Worker的傳輸隊列中去。

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.storm.daemon.worker;



public class WorkerState {

........
  
    //12.注冊回調函數,WorkerState中的registerCallbacks()方法中注冊反序列化連接配接回調函數。
    public void registerCallbacks() {
        LOG.info("Registering IConnectionCallbacks for {}:{}", assignmentId, port);
        receiver.registerRecv(new DeserializingConnectionCallback(topologyConf,
            getWorkerTopologyContext(),
            this::transferLocal));
    }

    //14.調用用第一個步驟聲明的transferLocal()方法 在Worker内部本地發送到相應的線程
    public void transferLocal(List<AddressedTuple> tupleBatch) {
        Map<Integer, List<AddressedTuple>> grouped = new HashMap<>();
        for (AddressedTuple tuple : tupleBatch) {
            Integer executor = taskToShortExecutor.get(tuple.dest);
            if (null == executor) {
                LOG.warn("Received invalid messages for unknown tasks. Dropping... ");
                continue;
            }
            List<AddressedTuple> current = grouped.get(executor);
            if (null == current) {
                current = new ArrayList<>();
                grouped.put(executor, current);
            }
            current.add(tuple);
        }

        for (Map.Entry<Integer, List<AddressedTuple>> entry : grouped.entrySet()) {
            DisruptorQueue queue = shortExecutorReceiveQueueMap.get(entry.getKey());
            if (null != queue) {
                queue.publish(entry.getValue());
            } else {
                LOG.warn("Received invalid messages for unknown tasks. Dropping... ");
            }
        }
    }

    //9.不斷的對AddressedTuple進行序列化操作,并将要發送到相同的task的AddressedTuple進行打包批量的發送消息。
    // 如果需要發送到本地worker的taskid,我們調用WorkerState的transferLocal方法發送到本地。本地發送不需要序列化
    // 需要發送到遠端Worker的消息,序列化後進行打包成Map<Integer, List<TaskMessage>>對象發送到Worker的傳輸隊列中去
    public void transfer(KryoTupleSerializer serializer, List<AddressedTuple> tupleBatch) {
        if (trySerializeLocal) {
            assertCanSerialize(serializer, tupleBatch);
        }
        List<AddressedTuple> local = new ArrayList<>();
        Map<Integer, List<TaskMessage>> remoteMap = new HashMap<>();
        LOG.info("the time of start serializing : {}", System.currentTimeMillis());
        for (AddressedTuple addressedTuple : tupleBatch) {
            int destTask = addressedTuple.getDest();
            if (taskIds.contains(destTask)) {
                // Local task
                local.add(addressedTuple);
            } else {
                // Using java objects directly to avoid performance issues in java code
                if (! remoteMap.containsKey(destTask)) {
                    remoteMap.put(destTask, new ArrayList<>());
                }
                remoteMap.get(destTask).add(new TaskMessage(destTask, serializer.serialize(addressedTuple.getTuple())));
            }
        }
        LOG.info("the time of end serializing : {}", System.currentTimeMillis());

        if (!local.isEmpty()) {
            transferLocal(local);
        }
        if (!remoteMap.isEmpty()) {
            transferQueue.publish(remoteMap);
        }
    }

    .........
}
           

2.5 Worker程序發送TaskMessage到相應的Task任務

        Worker程序是配置設定資源的最小機關。這裡我們主要分析Worker程序之間的通信過程。Worker的傳輸線程不斷的異步從Worker的傳輸隊列中循環調用,不斷的批量消費傳輸隊列中的消息,發送到相應的遠端Worker中。這裡調用WorkerState的sendTuplesToRemoteWorker方法将(HashMap<Integer, ArrayList<TaskMessage>>) packets資料packets發送到相應的遠端Worker中。

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * <p>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.storm.daemon.worker;

public class Worker implements Shutdownable, DaemonCommon {
    .........
    public void start() throws Exception {
        LOG.info("Launching worker for {} on {}:{} with id {} and conf {}", topologyId, assignmentId, port, workerId,
            conf);
        // because in local mode, its not a separate
        // process. supervisor will register it in this case
        // if ConfigUtils.isLocalMode(conf) returns false then it is in distributed mode.
        if (!ConfigUtils.isLocalMode(conf)) {
            // Distributed mode
            SysOutOverSLF4J.sendSystemOutAndErrToSLF4J();
            String pid = Utils.processPid();
            FileUtils.touch(new File(ConfigUtils.workerPidPath(conf, workerId, pid)));
            FileUtils.writeStringToFile(new File(ConfigUtils.workerArtifactsPidPath(conf, topologyId, port)), pid,
                Charset.forName("UTF-8"));
        }
        final Map<String, Object> topologyConf =
            ConfigUtils.overrideLoginConfigWithSystemProperty(ConfigUtils.readSupervisorStormConf(conf, topologyId));
        List<ACL> acls = Utils.getWorkerACL(topologyConf);
        IStateStorage stateStorage =
            ClusterUtils.mkStateStorage(conf, topologyConf, acls, new ClusterStateContext(DaemonType.WORKER));
        IStormClusterState stormClusterState =
            ClusterUtils.mkStormClusterState(stateStorage, acls, new ClusterStateContext());
        Credentials initialCredentials = stormClusterState.credentials(topologyId, null);
        Map<String, String> initCreds = new HashMap<>();
        if (initialCredentials != null) {
            initCreds.putAll(initialCredentials.get_creds());
        }
        autoCreds = AuthUtils.GetAutoCredentials(topologyConf);
        subject = AuthUtils.populateSubject(null, autoCreds, initCreds);
        backpressureZnodeTimeoutMs = ObjectReader.getInt(topologyConf.get(Config.BACKPRESSURE_ZNODE_TIMEOUT_SECS)) * 1000;

        Subject.doAs(subject, new PrivilegedExceptionAction<Object>() {
            @Override public Object run() throws Exception {
                workerState =
                    new WorkerState(conf, context, topologyId, assignmentId, port, workerId, topologyConf, stateStorage,
                        stormClusterState);

                // Heartbeat here so that worker process dies if this fails
                // it's important that worker heartbeat to supervisor ASAP so that supervisor knows
                // that worker is running and moves on
                doHeartBeat();

                executorsAtom = new AtomicReference<>(null);

                // launch heartbeat threads immediately so that slow-loading tasks don't cause the worker to timeout
                // to the supervisor
                workerState.heartbeatTimer
                    .scheduleRecurring(0, (Integer) conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS), () -> {
                        try {
                            doHeartBeat();
                        } catch (IOException e) {
                            throw new RuntimeException(e);
                        }
                    });

                workerState.executorHeartbeatTimer
                    .scheduleRecurring(0, (Integer) conf.get(Config.WORKER_HEARTBEAT_FREQUENCY_SECS),
                        Worker.this::doExecutorHeartbeats);

                //11.worker注冊相應的回調函數用來接受遠端worker發送來的消息。
                workerState.registerCallbacks();

                workerState.refreshConnections(null);

                workerState.activateWorkerWhenAllConnectionsReady();

                workerState.refreshStormActive(null);

                workerState.runWorkerStartHooks();

                List<IRunningExecutor> newExecutors = new ArrayList<IRunningExecutor>();
                for (List<Long> e : workerState.getExecutors()) {
                    if (ConfigUtils.isLocalMode(topologyConf)) {
                        newExecutors.add(
                            LocalExecutor.mkExecutor(workerState, e, initCreds)
                                .execute());
                    } else {
                        newExecutors.add(
                            Executor.mkExecutor(workerState, e, initCreds)
                                .execute());
                    }
                }
                executorsAtom.set(newExecutors);

                //10.Worker的傳輸線程不斷的異步從Worker的傳輸隊列中循環調用,不斷的批量消費傳輸隊列中的消息。發送到相應的遠端Worker中
                EventHandler<Object> tupleHandler = (packets, seqId, batchEnd) -> workerState
                    .sendTuplesToRemoteWorker((HashMap<Integer, ArrayList<TaskMessage>>) packets, seqId, batchEnd);

                // This thread will publish the messages destined for remote tasks to remote connections
                transferThread = Utils.asyncLoop(() -> {
                    workerState.transferQueue.consumeBatchWhenAvailable(tupleHandler);
                    return 0L;
                });

                DisruptorBackpressureCallback disruptorBackpressureHandler =
                    mkDisruptorBackpressureHandler(workerState);
                workerState.transferQueue.registerBackpressureCallback(disruptorBackpressureHandler);
                workerState.transferQueue
                    .setEnableBackpressure((Boolean) topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE));
                workerState.transferQueue
                    .setHighWaterMark(ObjectReader.getDouble(topologyConf.get(Config.BACKPRESSURE_DISRUPTOR_HIGH_WATERMARK)));
                workerState.transferQueue
                    .setLowWaterMark(ObjectReader.getDouble(topologyConf.get(Config.BACKPRESSURE_DISRUPTOR_LOW_WATERMARK)));

                WorkerBackpressureCallback backpressureCallback = mkBackpressureHandler(topologyConf);
                backpressureThread = new WorkerBackpressureThread(workerState.backpressureTrigger, workerState, backpressureCallback);
                if ((Boolean) topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE)) {
                    backpressureThread.start();
                    stormClusterState.topologyBackpressure(topologyId, backpressureZnodeTimeoutMs, workerState::refreshThrottle);
                    
                    int pollingSecs = ObjectReader.getInt(topologyConf.get(Config.TASK_BACKPRESSURE_POLL_SECS));
                    workerState.refreshBackpressureTimer.scheduleRecurring(0, pollingSecs, workerState::refreshThrottle);
                }

                credentialsAtom = new AtomicReference<Credentials>(initialCredentials);

                establishLogSettingCallback();

                workerState.stormClusterState.credentials(topologyId, Worker.this::checkCredentialsChanged);

                workerState.refreshCredentialsTimer.scheduleRecurring(0,
                    (Integer) conf.get(Config.TASK_CREDENTIALS_POLL_SECS), new Runnable() {
                        @Override public void run() {
                            checkCredentialsChanged();
                            if ((Boolean) topologyConf.get(Config.TOPOLOGY_BACKPRESSURE_ENABLE)) {
                               checkThrottleChanged();
                            }
                        }
                    });

                workerState.checkForUpdatedBlobsTimer.scheduleRecurring(0,
                        (Integer) conf.getOrDefault(Config.WORKER_BLOB_UPDATE_POLL_INTERVAL_SECS, 10), new Runnable() {
                            @Override public void run() {
                                try {
                                    LOG.debug("Checking if blobs have updated");
                                    updateBlobUpdates();
                                } catch (IOException e) {
                                    // IOException from reading the version files to be ignored
                                    LOG.error(e.getStackTrace().toString());
                                }
                            }
                        });

                // The jitter allows the clients to get the data at different times, and avoids thundering herd
                if (!(Boolean) topologyConf.get(Config.TOPOLOGY_DISABLE_LOADAWARE_MESSAGING)) {
                    workerState.refreshLoadTimer.scheduleRecurringWithJitter(0, 1, 500, Worker.this::doRefreshLoad);
                }

                workerState.refreshConnectionsTimer.scheduleRecurring(0,
                    (Integer) conf.get(Config.TASK_REFRESH_POLL_SECS), workerState::refreshConnections);

                workerState.resetLogLevelsTimer.scheduleRecurring(0,
                    (Integer) conf.get(Config.WORKER_LOG_LEVEL_RESET_POLL_SECS), logConfigManager::resetLogLevels);

                workerState.refreshActiveTimer.scheduleRecurring(0, (Integer) conf.get(Config.TASK_REFRESH_POLL_SECS),
                    workerState::refreshStormActive);

                LOG.info("Worker has topology config {}", Utils.redactValue(topologyConf, Config.STORM_ZOOKEEPER_TOPOLOGY_AUTH_PAYLOAD));
                LOG.info("Worker {} for storm {} on {}:{}  has finished loading", workerId, topologyId, assignmentId, port);
                return this;
            };
        });
        .......................
    }
           

        WorkerState調用sendTuplesToRemoteWorker将packets發送到相應的遠端Worker中。不斷的将資料添加到TransferDrainer貯水池中。 當資料批次到達批次末尾的時候,然後調用TransferDrainer的send方法将資料将資料發送到遠端的Worker中去。

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.storm.daemon.worker;



public class WorkerState {

........
   /**
     * WorkerState調用sendTuplesToRemoteWorker将packets發送到相應的遠端Worker中。不斷的将資料添加到TransferDrainer貯水池中。
     * 當資料批次到達批次末尾的時候,然後調用TransferDrainer的send方法将資料将資料發送到遠端的Worker中去。
     * @param packets
     * @param seqId
     * @param batchEnd
     */
     public void sendTuplesToRemoteWorker(HashMap<Integer, ArrayList<TaskMessage>> packets, long seqId, boolean batchEnd) {
        drainer.add(packets);
        if (batchEnd) {
            ReentrantReadWriteLock.ReadLock readLock = endpointSocketLock.readLock();
            try {
                readLock.lock();
                drainer.send(cachedTaskToNodePort.get(), cachedNodeToPortSocket.get());
            } finally {
                readLock.unlock();
            }
            drainer.clear();
        }
    }

    .........
}
           

        TransferDrainer調用Send方法将資料發送到相應的遠端Worker程序。

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.storm.utils;

public class TransferDrainer {

  private Map<Integer, ArrayList<ArrayList<TaskMessage>>> bundles = new HashMap();
  private static final Logger LOG = LoggerFactory.getLogger(TransferDrainer.class);
  
  public void add(HashMap<Integer, ArrayList<TaskMessage>> taskTupleSetMap) {
    for (Map.Entry<Integer, ArrayList<TaskMessage>> entry : taskTupleSetMap.entrySet()) {
      addListRefToMap(this.bundles, entry.getKey(), entry.getValue());
    }
  }
  
  public void send(Map<Integer, NodeInfo> taskToNode, Map<NodeInfo, IConnection> connections) {
    HashMap<NodeInfo, ArrayList<ArrayList<TaskMessage>>> bundleMapByDestination = groupBundleByDestination(taskToNode);

    for (Map.Entry<NodeInfo, ArrayList<ArrayList<TaskMessage>>> entry : bundleMapByDestination.entrySet()) {
      NodeInfo hostPort = entry.getKey();
      IConnection connection = connections.get(hostPort);
      if (null != connection) {
        ArrayList<ArrayList<TaskMessage>> bundle = entry.getValue();
        Iterator<TaskMessage> iter = getBundleIterator(bundle);
        if (null != iter && iter.hasNext()) {
          connection.send(iter);
        }
      } else {
        LOG.warn("Connection is not available for hostPort {}", hostPort);
      }
    }
  }

  private HashMap<NodeInfo, ArrayList<ArrayList<TaskMessage>>> groupBundleByDestination(Map<Integer, NodeInfo> taskToNode) {
    HashMap<NodeInfo, ArrayList<ArrayList<TaskMessage>>> bundleMap = Maps.newHashMap();
    for (Integer task : this.bundles.keySet()) {
      NodeInfo hostPort = taskToNode.get(task);
      if (hostPort != null) {
        for (ArrayList<TaskMessage> chunk : this.bundles.get(task)) {
          addListRefToMap(bundleMap, hostPort, chunk);
        }
      } else {
        LOG.warn("No remote destination available for task {}", task);
      }
    }
    return bundleMap;
  }

  private <T> void addListRefToMap(Map<T, ArrayList<ArrayList<TaskMessage>>> bundleMap,
                                   T key, ArrayList<TaskMessage> tuples) {
    ArrayList<ArrayList<TaskMessage>> bundle = bundleMap.get(key);

    if (null == bundle) {
      bundle = new ArrayList<ArrayList<TaskMessage>>();
      bundleMap.put(key, bundle);
    }

    if (null != tuples && tuples.size() > 0) {
      bundle.add(tuples);
    }
  }

  private Iterator<TaskMessage> getBundleIterator(final ArrayList<ArrayList<TaskMessage>> bundle) {
    
    if (null == bundle) {
      return null;
    }
    
    return new Iterator<TaskMessage> () {
      
      private int offset = 0;
      private int size = 0;
      {
        for (ArrayList<TaskMessage> list : bundle) {
            size += list.size();
        }
      }
      
      private int bundleOffset = 0;
      private Iterator<TaskMessage> iter = bundle.get(bundleOffset).iterator();
      
      @Override
      public boolean hasNext() {
          return offset < size;
      }

      @Override
      public TaskMessage next() {
        TaskMessage msg;
        if (iter.hasNext()) {
          msg = iter.next(); 
        } else {
          bundleOffset++;
          iter = bundle.get(bundleOffset).iterator();
          msg = iter.next();
        }
        if (null != msg) {
          offset++;
        }
        return msg;
      }

      @Override
      public void remove() {
        throw new RuntimeException("not supported");
      }
    };
  }
  
  public void clear() {
    bundles.clear();
  }
}
           

2.6 Worker程序接收從其他Worker程序發送過來的資料

        首先Worker程序在初始化的時候。Worker程序會注冊相應的回調函數用來接收遠端worker注冊發送過來的消息。

//11.worker注冊相應的回調函數用來接受遠端worker發送來的消息。
                workerState.registerCallbacks();
           

        在workerState的registerCallbacks方法中,它會注冊接收資料的回調用函數,并且注冊反序列化接口DeserializingConnectionCallback。

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.storm.daemon.worker;



public class WorkerState {

........
  
    //12.注冊回調函數,WorkerState中的registerCallbacks()方法中注冊反序列化連接配接回調函數。
    public void registerCallbacks() {
        LOG.info("Registering IConnectionCallbacks for {}:{}", assignmentId, port);
        receiver.registerRecv(new DeserializingConnectionCallback(topologyConf,
            getWorkerTopologyContext(),
            this::transferLocal));
    }

    //14.調用用第一個步驟聲明的transferLocal()方法 在Worker内部本地發送到相應的線程
    public void transferLocal(List<AddressedTuple> tupleBatch) {
        Map<Integer, List<AddressedTuple>> grouped = new HashMap<>();
        for (AddressedTuple tuple : tupleBatch) {
            Integer executor = taskToShortExecutor.get(tuple.dest);
            if (null == executor) {
                LOG.warn("Received invalid messages for unknown tasks. Dropping... ");
                continue;
            }
            List<AddressedTuple> current = grouped.get(executor);
            if (null == current) {
                current = new ArrayList<>();
                grouped.put(executor, current);
            }
            current.add(tuple);
        }

        for (Map.Entry<Integer, List<AddressedTuple>> entry : grouped.entrySet()) {
            DisruptorQueue queue = shortExecutorReceiveQueueMap.get(entry.getKey());
            if (null != queue) {
                queue.publish(entry.getValue());
            } else {
                LOG.warn("Received invalid messages for unknown tasks. Dropping... ");
            }
        }
    }

    //9.不斷的對AddressedTuple進行序列化操作,并将要發送到相同的task的AddressedTuple進行打包批量的發送消息。
    // 如果需要發送到本地worker的taskid,我們調用WorkerState的transferLocal方法發送到本地。本地發送不需要序列化
    // 需要發送到遠端Worker的消息,序列化後進行打包成Map<Integer, List<TaskMessage>>對象發送到Worker的傳輸隊列中去
    public void transfer(KryoTupleSerializer serializer, List<AddressedTuple> tupleBatch) {
        if (trySerializeLocal) {
            assertCanSerialize(serializer, tupleBatch);
        }
        List<AddressedTuple> local = new ArrayList<>();
        Map<Integer, List<TaskMessage>> remoteMap = new HashMap<>();
        LOG.info("the time of start serializing : {}", System.currentTimeMillis());
        for (AddressedTuple addressedTuple : tupleBatch) {
            int destTask = addressedTuple.getDest();
            if (taskIds.contains(destTask)) {
                // Local task
                local.add(addressedTuple);
            } else {
                // Using java objects directly to avoid performance issues in java code
                if (! remoteMap.containsKey(destTask)) {
                    remoteMap.put(destTask, new ArrayList<>());
                }
                remoteMap.get(destTask).add(new TaskMessage(destTask, serializer.serialize(addressedTuple.getTuple())));
            }
        }
        LOG.info("the time of end serializing : {}", System.currentTimeMillis());

        if (!local.isEmpty()) {
            transferLocal(local);
        }
        if (!remoteMap.isEmpty()) {
            transferQueue.publish(remoteMap);
        }
    }

    .........
}
           

        在Worker程序中的反序列化接口類中DeserializingConnectionCallback,對消息進行反序列化,将List<TaskMessage> batch反序列化成ArrayList<AddressedTuple> ret。Worker接收線程從接收隊列中讀取TaskMessage序列化後的資料,然後将其進行反序列化操作。最終得到帶有消息頭的AddressTuple。然後調用WorkerState的TransferLocal方法,在Worker程序中本地發送AddressedTuple。

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.storm.messaging;

/**
 * A class that is called when a TaskMessage arrives.
 */
public class DeserializingConnectionCallback implements IConnectionCallback, IMetric {

    private static final Logger LOG = LoggerFactory.getLogger(DeserializingConnectionCallback.class);

    private final WorkerState.ILocalTransferCallback cb;
    private final Map conf;
    private final GeneralTopologyContext context;

    private final ThreadLocal<KryoTupleDeserializer> _des =
        new ThreadLocal<KryoTupleDeserializer>() {
            @Override
            protected KryoTupleDeserializer initialValue() {
                return new KryoTupleDeserializer(conf, context);
            }
        };

    // Track serialized size of messages.
    private final boolean sizeMetricsEnabled;
    private final ConcurrentHashMap<String, AtomicLong> byteCounts = new ConcurrentHashMap<>();


    public DeserializingConnectionCallback(final Map conf, final GeneralTopologyContext context, WorkerState.ILocalTransferCallback callback) {
        this.conf = conf;
        this.context = context;
        cb = callback;
        sizeMetricsEnabled = ObjectReader.getBoolean(conf.get(Config.TOPOLOGY_SERIALIZED_MESSAGE_SIZE_METRICS), false);

    }

    //13.當有消息發送到Worker中時。Worker接收線程從接收隊列中讀取TaskMessage序列化後的資料,然後将其進行反序列化操作。最終得到帶有消息頭的AddressTuple。
    //然後調用回調函數的transfer方法。
    @Override
    public void recv(List<TaskMessage> batch) {
        KryoTupleDeserializer des = _des.get();
        ArrayList<AddressedTuple> ret = new ArrayList<>(batch.size());
        LOG.info("the time of start deserializing : {}", System.currentTimeMillis());
        for (TaskMessage message: batch) {
            Tuple tuple = des.deserialize(message.message());
            AddressedTuple addrTuple = new AddressedTuple(message.task(), tuple);
            updateMetrics(tuple.getSourceTask(), message);
            ret.add(addrTuple);
        }
        LOG.info("the time of start deserializing : {}", System.currentTimeMillis());
        cb.transfer(ret);
    }

    /**
     * Returns serialized byte count traffic metrics.
     * @return Map of metric counts, or null if disabled
     */
    @Override
    public Object getValueAndReset() {
        if (!sizeMetricsEnabled) {
            return null;
        }
        HashMap<String, Long> outMap = new HashMap<>();
        for (Map.Entry<String, AtomicLong> ent : byteCounts.entrySet()) {
            AtomicLong count = ent.getValue();
            if (count.get() > 0) {
                outMap.put(ent.getKey(), count.getAndSet(0L));
            }
        }
        return outMap;
    }

    /**
     * Update serialized byte counts for each message.
     * @param sourceTaskId source task
     * @param message serialized message
     */
    protected void updateMetrics(int sourceTaskId, TaskMessage message) {
        if (sizeMetricsEnabled) {
            int dest = message.task();
            int len = message.message().length;
            String key = Integer.toString(sourceTaskId) + "-" + Integer.toString(dest);
            byteCounts.computeIfAbsent(key, k -> new AtomicLong(0L)).addAndGet(len);
        }
    }

}
           

        在WorkerState的TransferLocal()方法中,将AddressedTuple發送到相應的Executor的接收隊列中去。

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.storm.daemon.worker;



public class WorkerState {

........
  
    //14.調用用第一個步驟聲明的transferLocal()方法 在Worker内部本地發送到相應的線程
    public void transferLocal(List<AddressedTuple> tupleBatch) {
        Map<Integer, List<AddressedTuple>> grouped = new HashMap<>();
        for (AddressedTuple tuple : tupleBatch) {
            Integer executor = taskToShortExecutor.get(tuple.dest);
            if (null == executor) {
                LOG.warn("Received invalid messages for unknown tasks. Dropping... ");
                continue;
            }
            List<AddressedTuple> current = grouped.get(executor);
            if (null == current) {
                current = new ArrayList<>();
                grouped.put(executor, current);
            }
            current.add(tuple);
        }

        for (Map.Entry<Integer, List<AddressedTuple>> entry : grouped.entrySet()) {
            DisruptorQueue queue = shortExecutorReceiveQueueMap.get(entry.getKey());
            if (null != queue) {
                queue.publish(entry.getValue());
            } else {
                LOG.warn("Received invalid messages for unknown tasks. Dropping... ");
            }
        }
    }
    .........
}
           

2.7 Executor 從本地receiveQueue中消費資料

        Executor從本地Executor 接收隊列中不斷的消費資料。将接收到的ArrayList<AddressedTuple> addressedTuples循環周遊。然後調用tupleActionFn發送到相應的BoltExecutor或者SpoutExecutor,執行相應的tupleActionFn 函數進行邏輯處理

public abstract class Executor implements Callable, EventHandler<Object> {
........
//15.Executor調用OnEvent()方法接收從Worker發送過來的AddressTuple.然後調用tupleActionFn發送到相應的BoltExecutor或者SpoutExecutor
    @SuppressWarnings("unchecked")
    @Override
    public void onEvent(Object event, long seq, boolean endOfBatch) throws Exception {
        ArrayList<AddressedTuple> addressedTuples = (ArrayList<AddressedTuple>) event;
        for (AddressedTuple addressedTuple : addressedTuples) {
            TupleImpl tuple = (TupleImpl) addressedTuple.getTuple();
            int taskId = addressedTuple.getDest();
            if (isDebug) {
                LOG.info("Processing received message FOR {} TUPLE: {}", taskId, tuple);
            }
            LOG.info("the time of receiving message: {}", System.currentTimeMillis());
            if (taskId != AddressedTuple.BROADCAST_DEST) {
                tupleActionFn(taskId, tuple);
            } else {
                for (Integer t : taskIds) {
                    tupleActionFn(t, tuple);
                }
            }
        }
    }
...........
}
           

繼續閱讀