天天看點

Apollo如何通知/訂閱主題topicHow to advertise and subscribe a topic

How to advertise and subscribe a topic

導讀

  • 衆所周知,Apollo是基于ROS開發的,是以其底層也是基于消息的機制進行節點通信的。但是它在ROS的基礎上做了一些改動,如下:
    • P2P——由于原生ROS的消息機制是通過主節點(Master)分發資料來實作的,這樣一個強中心化的結構始終存在一個Master意外導緻系統奔潰的隐患,為了解決這個問題,Apollo使用了Fast-RTPS,用P2P的方式抛棄了主節點實作通信。
    • Protobuf——原生支援Google的Protobuf,完美的解決了原來ROS的MD5驗證導緻消息不能後向相容的問題。
    • 共享記憶體——使用共享記憶體傳輸,效率更高。
  • 上面簡單的介紹了一些Apollo和ROS的淵源,如果想詳細了解可以看Apollo-ROS官方詳細文檔。鑒于與ROS的關系,那麼就不得不說如何在Apollo裡面利用topic進行通信了。雖然出自ROS,但是Apollo在ROS的基礎上進行了大量的封裝和改動,是以,較原生的ROS,在advertise/subscribe(通知/訂閱)topic有了較大的差別。
  • 本文我将按照操作步驟、代碼分析、流程圖的步驟簡述一下Apollo的topic使用方法,如果你隻想知道如何使用,那麼看前面的使用方法即可,如果你想刨根問題,那麼請耐心的看完,結合上下文一起觀看,效果更好。還有,代碼的注釋不能放過,有一些上下文聯系的地方,我可能放在代碼注釋裡面來解釋了。

使用方法

訂閱

  1. 訂閱已配置好的Topic
    1. 實作回調函數
      void ZuoTestSubnode::ImgTestCallback(const sensor_msgs::Image &msg){
          AINFO << "ImgTestCallback";
          //-- do sth
      }
                 
    2. 在InitInternal()内将回調函數添加到對應Topic的回調函數隊列,如下:
      bool ZuoTestSubnode::InitInternal(){
          AdapterManager::AddImageShortCallback(&ZuoTestSubnode::ImgTestCallback, this);
      }
                 
  2. 訂閱未配置好的Topic
    如果使用者需要新增Topic,那就需要重新配置,而配置,本質上就是調用subscribe/advertise函數,然後生成空的回調函數隊列和消息釋出句柄。這些是Apollo對ROS做的封裝,主要目的是為了友善使用,以及管理衆多的topic,這部分主要是在adapter_manager.h、message_manager.h等實作的。具體代碼後面再講,先看步驟:
    1. 在adapter_gflags.cc和adapter_gflags.h裡面添加新topic。如下:
      //-- adapter_gflags.h
      
      //-- Zuo added on 2018-04-15 for testPublishSubnode
      DECLARE_string(zuo_test_topic);
                 
      //-- adapter_gflags.cc
      
      //-- Zuo added on 2018-04-15 for testPublishSubnode
      DEFINE_string(zuo_test_topic, "/apollo/zuo/zuo_test", "Zuo added for testPublishMsg");
                 
    2. adapter_manager.h

      裡面增加對

      ZuoTest

      的适配:
      class AdapterManager {
        	public:
        	......
         //-- Zuo added on 2018-04-15 for testPublishSubnode
         //-- 這裡的ZuoTest可以了解為對topic取了個别名。
       	REGISTER_ADAPTER(ZuoTest);
       };
                 
    3. 在adapter_manager.cc的**Init()**函數的有限狀态機内添加一個分支,如下:
      void AdapterManager::Init(const AdapterManagerConfig &configs) {
       	......
           for (const auto &config : configs.config()) {
               switch (config.type()) {
                 ......
                 case AdapterConfig::ZUO_TEST:
               	EnableZuoTest(FLAGS_zuo_test_topic, config);
               	break;
             	  default:
                   AERROR << "Unknown adapter config type!";
                   break;
               }
           }
       }
                 
    4. 在adapter_config.proto裡面增加

      ZUO_TEST

      标簽:(不然上述的有限狀态機分支就無法進入)
      message AdapterConfig {
        enum MessageType {
        	......
          ZUO_TEST = 45;
        }
                 
    5. 在message_adapters.h裡面增加對應消息的别名:
    6. 剩下的就和上述已配置好的Topic的使用方法相同了,我就不贅述了。
    7. 在使用者的adapter.conf檔案内添加一個config标簽,這個conf檔案是用來配置對應的topic。而config中的type,是用來在後文的有限狀态機中,進入我們新添的topic分支。這裡的adapter.conf是使用者用來配置對topic的subscribe和advertise的。
      config {
      	type: ZUO_TEST
      	mode: PUBLISH_ONLY
      	message_history_limit: 5
      }
                 
    8. 所謂的使用者,可以了解為某個子產品,例如Perception子產品裡面就有用來配置這個的。這必定是每個子產品的第一個操作,先subscribe和advertise之後才能操作topic咯。
      Status Perception::Init() {
        AdapterManager::Init(FLAGS_perception_adapter_config_filename);
        ......
        }
                 

通知

  • 到這裡就肯定都已經配置好topic了,是以都一樣了就是調用Publish函數釋出資料到topic。
    //-- 建立msg
      PerceptionObstacles obstacles;
      //-- publish msg
      common::adapter::AdapterManager::PublishPerceptionObstacles(obstacles);
      ```
               

代碼分析

  • 上一篇How_to_add_a_subnode已經講到了如何添加一個subnode,但是還沒有說到如何在這個subnode裡面subscribe/publish(訂閱/釋出)消息,下面将根據上述的操作步驟講解在Apollo裡面為什麼要這麼用topic。
    • 在說Apollo前,還是帶一下ROS的通信機制,也友善我們後續的了解,詳情可以了解ROS Wiki
      Apollo如何通知/訂閱主題topicHow to advertise and subscribe a topic
  • 原生的ROS其實是如上圖一個消息傳遞流程,簡而言之就是,一個節點發,一個節點收,中間是由Master進行轉發,具體操作如下:
//-- Zuo added on 2018-04-13
void chatterCallback(const std_msgs::String::ConstPtr& msg)  
{
    //-- do sth
    printf("msg's data = %s", msg.data);
}
int main(int argc, char **argv)  
{
    ros::init(argc, argv, "listener");
    ros::NodeHandle n;
    
    //-- 向Topic釋出消息
    //-- 這裡是告訴Master,我需要在'TopicfullName'上釋出一個消息。
    //-- @param_0 Topic名字
    //-- @param_1 消息釋出隊列大小
    //-- 傳回的句柄需要儲存,用來調用publish函數
    ros::Publisher Zuo_pub = n.advertise<std_msgs::String>("TopicfullName", 1000);
    std_msgs::String msg;
    msg.data = "Hello World";
    //-- 使用者根據傳回的句柄調用publish函數,向對應的topic發送消息
    Zuo_pub.publish(msg);
    
    //-- 訂閱一個Topic
    //-- 如果`TopicfullName`接收到消息,就會觸發這裡的回調函數chatterCallback()
    ros::Subscriber sub = n.subscribe("TopicfullName", 1000, chatterCallback);
    ros::spin();//ros::spin()進入自循環,循環擷取事件
    return 0;
}
           
注意:上面的兩個函數advertise/subscribe的方式我們可以多留意一下,之後在Apollo的代碼講解裡面也會看到類似的方式。這個在Apollo裡面都是放在

AdapterManager::Init

裡面的

EnableXXX()

來調用了,注意看

adapter_manager.h

的内部實作。
  • 在Apollo裡面,為了降低subnode在通信過程中對Master節點的依賴,将上圖的機制設計成兩個subnode直接類似P2P通信,相當于對原來的網絡做了一個去中心化的改變。但是這裡我們不用管,對于使用者來說,我們的使用方式沒變,還是通過調用advertise/subscribe兩個接口來進行節點通信。
  • 誠然,在Apollo裡面我們可以直接調用advertise/subscribe,但是可以用,不代表應該用。看代碼我們會發現,在Apollo底層(adapter_manager.h)的确是調用了advertise/subscribe,但是在其之上還封裝了幾層,它将對不同Topic的advertise/subscribe使用封裝成了對應的帶有Topic名字标簽的函數,例如:
上述這種本質上就是ROS裡面的publish()函數,但是Apollo對publish()函數封裝後,使得每個不同子產品的publish都帶有各自不同的标簽,每一個PublishXXX()隻能發送對應類型的Msg,也就是說,原生的ROS下,在大量的發送/訂閱的場景下,需要使用者管理大量的句柄(這裡可以參考上述ROS的簡例)。
  • 上面是Publish,下面看看Apollo對subscribe是怎麼封裝的:
  • Apollo将ROS的回調函數封裝成一個函數隊列,上面的函數就是往這個函數隊列裡添加回調函數。那麼,這個函數在哪裡定義的呢?如果直接搜尋,是找不到的。這裡Apollo用了一個技巧。AdapterManager::AddImageShortCallback()是在adapter_manager裡面用宏展開和##拼接結合的方法而成。通過調用

    REGISTER_ADAPTER(name)

    來實作(REGISTER_ADAPTER的調用參考

    操作步驟

    裡面的适配操作)。可以看到adapter_manager.h:#90開始,定義了三個不同參數的

    static void Add##name##Callback()

    函數。如下:
static void Add##name##Callback(name##Adapter::Callback callback) {          \
    CHECK(instance()->name##_)                                                 \
        << "Initialize adapter before setting callback";                       \
    instance()->name##_->AddCallback(callback);                                \
  }                                                                            \
  template <class T>                                                           \
  static void Add##name##Callback(                                             \
      void (T::*fp)(const name##Adapter::DataType &data), T *obj) {            \
    Add##name##Callback(std::bind(fp, obj, std::placeholders::_1));            \
  }                                                                            \
  template <class T>                                                           \
  static void Add##name##Callback(                                             \
      void (T::*fp)(const name##Adapter::DataType &data)) {                    \
    Add##name##Callback(fp);                                                   \
  }  
           
  • 我們先不管函數具體作用,但是從上述代碼我們可以看到,後兩個函數最終也是調用了第一個函數,也就是
static void Add##name##Callback(name##Adapter::Callback callback) {          \
           
  • 接着跟進去,我們看到這個函數實際上就是調用了:
instance()->name##_->AddCallback(callback);                                \
           
  • 再跟進,可以看到在adapter.h裡:
/**
   * @brief registers the provided callback function to the adapter,
   * so that the callback function will be called once right after the
   * message hits the adapter.
   * @param callback the callback with signature void(const D &).
   */  
  void AddCallback(Callback callback) {
    receive_callbacks_.push_back(callback);
  }	
           

​ receive_callbacks_的定義如下:

/// User defined function when receiving a message
  std::vector<Callback> receive_callbacks_;
           
  • 其實到這裡,Apollo訂閱的封裝實作很明顯了,其在

    adapter_manager.h

    通過宏展開和拼接生成帶有不同名字标簽的函數群,這些函數群包含一系列topic相關操作的功能,如訂閱,壓入回調函數隊列等等,然後在

    adapter_manager.cc

    裡面通過有限狀态機根據讀入的config檔案(參考dag_config_path變量)配置需要啟動的subnode。這裡調用的

    Enable##name(topicName,config)

    實作了對不同topic的訂閱以及生成通知句柄,然後就可以調用

    Add##name##Callback

    來訂閱指定的topic,以及調用

    Publish##name

    向指定topic發送消息。
  • 那麼Apollo這樣繞了一圈将advertise/subscribe封裝起來,有什麼用呢?原因有如下兩點:
    1. 當系統裡面的相似類型多了,變量的管理會變得複雜起來,那麼用這種帶有名字标簽的命名方式,可以讓變量名能夠做到顧名思義。
    2. 本質上,不同的subscribe和publish函數都是一些相同代碼,Apollo通過

      ##

      拼接和宏展開結合的方式,提高了這一塊的代碼複用率,也友善整個架構後續的延伸擴充。
  • 還有一個要注意,

    name##Adapter

    是不同的消息格式,在

    message_adapters.h

    定義的别名,這是為了配合整套方案而将不同的消息都别名成了

    nameAdapter

    的形式。

    類似上面的這樣取别名,這裡的

    perception::PerceptionObstacles

    是在

    perception_obstacle.pb.h

    的檔案内定義的了,這個和protobuf有關,暫且不說,這是下一章

    How_to_add_a_new_msg

    的内容。

流程圖

  • 下面是整個topic配置的整體流程圖,最終呈現在使用者面前的是兩句

    Publish##Name()

    Add##Name##Callbackc()

    Apollo如何通知/訂閱主題topicHow to advertise and subscribe a topic

繼續閱讀