How to advertise and subscribe a topic
導讀
- 衆所周知,Apollo是基于ROS開發的,是以其底層也是基于消息的機制進行節點通信的。但是它在ROS的基礎上做了一些改動,如下:
- P2P——由于原生ROS的消息機制是通過主節點(Master)分發資料來實作的,這樣一個強中心化的結構始終存在一個Master意外導緻系統奔潰的隐患,為了解決這個問題,Apollo使用了Fast-RTPS,用P2P的方式抛棄了主節點實作通信。
- Protobuf——原生支援Google的Protobuf,完美的解決了原來ROS的MD5驗證導緻消息不能後向相容的問題。
- 共享記憶體——使用共享記憶體傳輸,效率更高。
- 上面簡單的介紹了一些Apollo和ROS的淵源,如果想詳細了解可以看Apollo-ROS官方詳細文檔。鑒于與ROS的關系,那麼就不得不說如何在Apollo裡面利用topic進行通信了。雖然出自ROS,但是Apollo在ROS的基礎上進行了大量的封裝和改動,是以,較原生的ROS,在advertise/subscribe(通知/訂閱)topic有了較大的差別。
- 本文我将按照操作步驟、代碼分析、流程圖的步驟簡述一下Apollo的topic使用方法,如果你隻想知道如何使用,那麼看前面的使用方法即可,如果你想刨根問題,那麼請耐心的看完,結合上下文一起觀看,效果更好。還有,代碼的注釋不能放過,有一些上下文聯系的地方,我可能放在代碼注釋裡面來解釋了。
使用方法
訂閱
- 訂閱已配置好的Topic
- 實作回調函數
void ZuoTestSubnode::ImgTestCallback(const sensor_msgs::Image &msg){ AINFO << "ImgTestCallback"; //-- do sth }
- 在InitInternal()内将回調函數添加到對應Topic的回調函數隊列,如下:
bool ZuoTestSubnode::InitInternal(){ AdapterManager::AddImageShortCallback(&ZuoTestSubnode::ImgTestCallback, this); }
- 實作回調函數
- 訂閱未配置好的Topic
如果使用者需要新增Topic,那就需要重新配置,而配置,本質上就是調用subscribe/advertise函數,然後生成空的回調函數隊列和消息釋出句柄。這些是Apollo對ROS做的封裝,主要目的是為了友善使用,以及管理衆多的topic,這部分主要是在adapter_manager.h、message_manager.h等實作的。具體代碼後面再講,先看步驟:
- 在adapter_gflags.cc和adapter_gflags.h裡面添加新topic。如下:
//-- adapter_gflags.h //-- Zuo added on 2018-04-15 for testPublishSubnode DECLARE_string(zuo_test_topic);
//-- adapter_gflags.cc //-- Zuo added on 2018-04-15 for testPublishSubnode DEFINE_string(zuo_test_topic, "/apollo/zuo/zuo_test", "Zuo added for testPublishMsg");
- 在
裡面增加對adapter_manager.h
的适配:ZuoTest
class AdapterManager { public: ...... //-- Zuo added on 2018-04-15 for testPublishSubnode //-- 這裡的ZuoTest可以了解為對topic取了個别名。 REGISTER_ADAPTER(ZuoTest); };
- 在adapter_manager.cc的**Init()**函數的有限狀态機内添加一個分支,如下:
void AdapterManager::Init(const AdapterManagerConfig &configs) { ...... for (const auto &config : configs.config()) { switch (config.type()) { ...... case AdapterConfig::ZUO_TEST: EnableZuoTest(FLAGS_zuo_test_topic, config); break; default: AERROR << "Unknown adapter config type!"; break; } } }
- 在adapter_config.proto裡面增加
标簽:(不然上述的有限狀态機分支就無法進入)ZUO_TEST
message AdapterConfig { enum MessageType { ...... ZUO_TEST = 45; }
- 在message_adapters.h裡面增加對應消息的别名:
- 剩下的就和上述已配置好的Topic的使用方法相同了,我就不贅述了。
- 在使用者的adapter.conf檔案内添加一個config标簽,這個conf檔案是用來配置對應的topic。而config中的type,是用來在後文的有限狀态機中,進入我們新添的topic分支。這裡的adapter.conf是使用者用來配置對topic的subscribe和advertise的。
config { type: ZUO_TEST mode: PUBLISH_ONLY message_history_limit: 5 }
- 所謂的使用者,可以了解為某個子產品,例如Perception子產品裡面就有用來配置這個的。這必定是每個子產品的第一個操作,先subscribe和advertise之後才能操作topic咯。
Status Perception::Init() { AdapterManager::Init(FLAGS_perception_adapter_config_filename); ...... }
- 在adapter_gflags.cc和adapter_gflags.h裡面添加新topic。如下:
通知
- 到這裡就肯定都已經配置好topic了,是以都一樣了就是調用Publish函數釋出資料到topic。
//-- 建立msg PerceptionObstacles obstacles; //-- publish msg common::adapter::AdapterManager::PublishPerceptionObstacles(obstacles); ```
代碼分析
- 上一篇How_to_add_a_subnode已經講到了如何添加一個subnode,但是還沒有說到如何在這個subnode裡面subscribe/publish(訂閱/釋出)消息,下面将根據上述的操作步驟講解在Apollo裡面為什麼要這麼用topic。
- 在說Apollo前,還是帶一下ROS的通信機制,也友善我們後續的了解,詳情可以了解ROS Wiki
- 原生的ROS其實是如上圖一個消息傳遞流程,簡而言之就是,一個節點發,一個節點收,中間是由Master進行轉發,具體操作如下:
//-- Zuo added on 2018-04-13
void chatterCallback(const std_msgs::String::ConstPtr& msg)
{
//-- do sth
printf("msg's data = %s", msg.data);
}
int main(int argc, char **argv)
{
ros::init(argc, argv, "listener");
ros::NodeHandle n;
//-- 向Topic釋出消息
//-- 這裡是告訴Master,我需要在'TopicfullName'上釋出一個消息。
//-- @param_0 Topic名字
//-- @param_1 消息釋出隊列大小
//-- 傳回的句柄需要儲存,用來調用publish函數
ros::Publisher Zuo_pub = n.advertise<std_msgs::String>("TopicfullName", 1000);
std_msgs::String msg;
msg.data = "Hello World";
//-- 使用者根據傳回的句柄調用publish函數,向對應的topic發送消息
Zuo_pub.publish(msg);
//-- 訂閱一個Topic
//-- 如果`TopicfullName`接收到消息,就會觸發這裡的回調函數chatterCallback()
ros::Subscriber sub = n.subscribe("TopicfullName", 1000, chatterCallback);
ros::spin();//ros::spin()進入自循環,循環擷取事件
return 0;
}
注意:上面的兩個函數advertise/subscribe的方式我們可以多留意一下,之後在Apollo的代碼講解裡面也會看到類似的方式。這個在Apollo裡面都是放在裡面的
AdapterManager::Init
來調用了,注意看
EnableXXX()
的内部實作。
adapter_manager.h
- 在Apollo裡面,為了降低subnode在通信過程中對Master節點的依賴,将上圖的機制設計成兩個subnode直接類似P2P通信,相當于對原來的網絡做了一個去中心化的改變。但是這裡我們不用管,對于使用者來說,我們的使用方式沒變,還是通過調用advertise/subscribe兩個接口來進行節點通信。
- 誠然,在Apollo裡面我們可以直接調用advertise/subscribe,但是可以用,不代表應該用。看代碼我們會發現,在Apollo底層(adapter_manager.h)的确是調用了advertise/subscribe,但是在其之上還封裝了幾層,它将對不同Topic的advertise/subscribe使用封裝成了對應的帶有Topic名字标簽的函數,例如:
上述這種本質上就是ROS裡面的publish()函數,但是Apollo對publish()函數封裝後,使得每個不同子產品的publish都帶有各自不同的标簽,每一個PublishXXX()隻能發送對應類型的Msg,也就是說,原生的ROS下,在大量的發送/訂閱的場景下,需要使用者管理大量的句柄(這裡可以參考上述ROS的簡例)。
- 上面是Publish,下面看看Apollo對subscribe是怎麼封裝的:
- Apollo将ROS的回調函數封裝成一個函數隊列,上面的函數就是往這個函數隊列裡添加回調函數。那麼,這個函數在哪裡定義的呢?如果直接搜尋,是找不到的。這裡Apollo用了一個技巧。AdapterManager::AddImageShortCallback()是在adapter_manager裡面用宏展開和##拼接結合的方法而成。通過調用
來實作(REGISTER_ADAPTER的調用參考REGISTER_ADAPTER(name)
裡面的适配操作)。可以看到adapter_manager.h:#90開始,定義了三個不同參數的操作步驟
函數。如下:static void Add##name##Callback()
static void Add##name##Callback(name##Adapter::Callback callback) { \
CHECK(instance()->name##_) \
<< "Initialize adapter before setting callback"; \
instance()->name##_->AddCallback(callback); \
} \
template <class T> \
static void Add##name##Callback( \
void (T::*fp)(const name##Adapter::DataType &data), T *obj) { \
Add##name##Callback(std::bind(fp, obj, std::placeholders::_1)); \
} \
template <class T> \
static void Add##name##Callback( \
void (T::*fp)(const name##Adapter::DataType &data)) { \
Add##name##Callback(fp); \
}
- 我們先不管函數具體作用,但是從上述代碼我們可以看到,後兩個函數最終也是調用了第一個函數,也就是
static void Add##name##Callback(name##Adapter::Callback callback) { \
- 接着跟進去,我們看到這個函數實際上就是調用了:
instance()->name##_->AddCallback(callback); \
- 再跟進,可以看到在adapter.h裡:
/**
* @brief registers the provided callback function to the adapter,
* so that the callback function will be called once right after the
* message hits the adapter.
* @param callback the callback with signature void(const D &).
*/
void AddCallback(Callback callback) {
receive_callbacks_.push_back(callback);
}
receive_callbacks_的定義如下:
/// User defined function when receiving a message
std::vector<Callback> receive_callbacks_;
- 其實到這裡,Apollo訂閱的封裝實作很明顯了,其在
通過宏展開和拼接生成帶有不同名字标簽的函數群,這些函數群包含一系列topic相關操作的功能,如訂閱,壓入回調函數隊列等等,然後在adapter_manager.h
裡面通過有限狀态機根據讀入的config檔案(參考dag_config_path變量)配置需要啟動的subnode。這裡調用的adapter_manager.cc
實作了對不同topic的訂閱以及生成通知句柄,然後就可以調用Enable##name(topicName,config)
來訂閱指定的topic,以及調用Add##name##Callback
向指定topic發送消息。Publish##name
- 那麼Apollo這樣繞了一圈将advertise/subscribe封裝起來,有什麼用呢?原因有如下兩點:
- 當系統裡面的相似類型多了,變量的管理會變得複雜起來,那麼用這種帶有名字标簽的命名方式,可以讓變量名能夠做到顧名思義。
- 本質上,不同的subscribe和publish函數都是一些相同代碼,Apollo通過
拼接和宏展開結合的方式,提高了這一塊的代碼複用率,也友善整個架構後續的延伸擴充。##
- 還有一個要注意,
是不同的消息格式,在name##Adapter
定義的别名,這是為了配合整套方案而将不同的消息都别名成了message_adapters.h
nameAdapter
的形式。
類似上面的這樣取别名,這裡的
是在perception::PerceptionObstacles
的檔案内定義的了,這個和protobuf有關,暫且不說,這是下一章perception_obstacle.pb.h
的内容。How_to_add_a_new_msg
流程圖
- 下面是整個topic配置的整體流程圖,最終呈現在使用者面前的是兩句
和Publish##Name()
。Add##Name##Callbackc()