roscpp/rospy節點編寫
-
- subscriber/advertiser編寫
-
- roscpp執行個體
- rospy執行個體
- server/client編寫
-
- roscpp執行個體
- rospy執行個體
- actionlib編寫
-
- roscpp執行個體
- rospy執行個體
ROS節點中主要以subscriber/advertiser、service/client、action_server/action_client形式存在,通過上述形式完成節點間的通信與組網。
subscriber/advertiser編寫
subscriber與advertiser是消息通信機制中消息的訂閱者與釋出者,支援采用c++、python進行編寫,以下将分别編寫C++、python執行個體。
roscpp執行個體
-
advertiser
在tutorial/src/tutorial_advertiser_node.cpp中添加如下代碼:
#include <ros/ros.h>
#include <tutorial/RobotState.h>
int main(int argc,char **argv)
{
//ros::init():初始化節點
ros::init(argc,argv,"tutorial_advertiser_node");
//ros::NodeHandle:建立句柄
ros::NodeHandle nh;
//建立publisher
ros::Publisher pub = nh.advertise<tutorial::RobotState>("robot_state",1);
//設定頻率,20Hz
ros::Rate r(20);
while(ros::ok())
{
//初始化消息
tutorial::RobotState msg;
//設定消息中字段的值
msg.mode_state = tutorial::RobotState::MANUAL_MODE;
msg.hardware_state = tutorial::RobotState::NORMAL;
//釋出消息
pub.publish(msg);
ROS_INFO("I published a msg");
ros::spinOnce();
//節點進入休眠
r.sleep();
}
return 0;
}
-
subscriber
在tutorial/src/tutorial_subscriber_node.cpp中添加如下代碼:
#include <ros/ros.h>
#include <tutorial/RobotState.h>
/*
* @brief: 消息回調函數,處理收到的消息
* @param msg 消息類型為tutorial::RobotState::ConstPtr的消息
* @return void 無
*/
void callback(const tutorial::RobotState::ConstPtr &msg)
{
ROS_INFO("I received a message:mode_state is %d,hardware_state is %d.",msg->mode_state,msg->hardware_state);
}
int main(int argc,char **argv)
{
//初始化節點
ros::init(argc,argv,"tutorial_subscriber_node");
//建立句柄
ros::NodeHandle nh;
//建立subscriber
ros::Subscriber sub = nh.subscribe("robot_state",1,callback);
ros::spin();
return 0;
}
-
編譯運作節點
在tutorial包的CMakeLists.txt添加如下代碼:
add_executable(tutorial_sub_node src/tutorial_subscriber_node.cpp)
add_executable(tutorial_pub_node src/tutorial_advertiser_node.cpp)
add_dependencies(tutorial_sub_node ${${PROJECT_NAME}_EXPORTED_TARGETS} ${catkin_EXPORTED_TARGETS})
add_dependencies(tutorial_pub_node ${${PROJECT_NAME}_EXPORTED_TARGETS} ${catkin_EXPORTED_TARGETS})
target_link_libraries(tutorial_sub_node ${catkin_LIBRARIES})
target_link_libraries(tutorial_pub_node ${catkin_LIBRARIES})
儲存後編譯即可。
$ catkin_make
分别打開三個終端,運作如下指令:
運作roscore
roscore
運作advertiser
$ cd ~/ros_ws/
$ source devel/setup.bash
$ rosrun tutorial tutorial_pub_node
運作subscriber
$ cd ~/ros_ws/
$ source devel/setup.bash
$ rosrun tutorial tutorial_sub_node
rospy執行個體
在tutorial包中建立檔案夾命名為script,并打開script檔案夾添加tutorial_sub.py和tutorial_pub.py,添加如下代碼:
- advertiser:tutorial_pub.py
#!/usr/bin/env python
import rospy
from tutorial.msg import RobotState
def talker():
rospy.init_node("tutorial_pub_node",anonymous=True)
pub = rospy.Publisher("robot_state",RobotState,queue_size=1);
rate = rospy.Rate(20);
while not rospy.is_shutdown():
msg = RobotState()
msg.mode_state = 0
msg.hardware_state = 0
pub.publish(msg)
rospy.loginfo("I publish a msg")
rate.sleep()
if __name__ == '__main__':
try:
talker()
except rospy.ROSInterruptException:
pass
- subscriber:tutorial_sub.py
#!/usr/bin/env python
import rospy
from tutorial.msg import RobotState
def callback(msg):
rospy.loginfo("I received a msg: mode_state: %d, hardware_state: %d.",msg.mode_state,msg.hardware_state)
def listener():
rospy.init_node("tutorial_sub_node",anonymous=True)
sub = rospy.Subscriber("robot_state",RobotState,callback)
rospy.spin()
if __name__ == '__main__':
listener()
-
運作節點
首先修改python檔案的屬性,添加可執行權限
$ cd ~/ros_ws/
$ roscd tutorial/script/
$ chmod +x tutorial_sub.py tutorial_pub.py
打開三個終端,分别運作roscore、subscriber、advertiser
運作roscore:
$ roscore
運作subscriber
$ cd ~/ros_ws/
$ source devel/setup.bash
$ rosrun tutorial tutorial_sub.py
運作advertiser
$ cd ~/ros_ws/
$ source devel/setup.bash
$ rosrun tutorial tutorial_pub.py
server/client編寫
server/client以應答模式存在,通常用于傳遞不需要高頻率傳遞的消息或執行某種動作。支援roscpp、rospy編寫,以下是C++、python編寫的執行個體。
roscpp執行個體
- server:tutorial_server_node.cpp
//server的回調函數
bool callback(tutorial::RobotStateService::Request &req,tutorial::RobotStateService::Response &res)
{
if(req.state == 255)
{
res.mode_state = mode_state;
res.hardware_state = hardware_state;
}else
{
mode_state = req.state;
res.mode_state = mode_state;
res.hardware_state = hardware_state;
}
ROS_INFO("I had got a request.");
return true;
}
int main(int argc,char **argv)
{
ros::init(argc,argv,"tutorial_server_node");
ros::NodeHandle nh;
//定義server
ros::ServiceServer server = nh.advertiseService("robot_state_srv",callback);
ROS_INFO("server is Ready");
ros::spin();
return 0;
}
- client : tutorial_client_node.cpp
#include <ros/ros.h>
#include <tutorial/RobotStateService.h>
int main(int argc,char **argv)
{
ros::init(argc,argv,"tutorial_client_node");
ros::NodeHandle nh;
//定義client
ros::ServiceClient client = nh.serviceClient<tutorial::RobotStateService>("robot_state_srv");
//定義srv變量
tutorial::RobotStateService req;
req.request.state = 255;
//請求服務
if(client.call(req))
{
ROS_INFO("call server sucessed. mode_state: %d, hardware_state: %d",req.response.mode_state,req.response.hardware_state);
}else
{
ROS_ERROR("Failed to call service.");
return 1;
}
return 0;
}
-
編譯運作
在CMakeLists.txt檔案中添加如下代碼:
add_executable(tutorial_server_node src/tutorial_server_node.cpp)
add_executable(tutorial_client_node src/tutorial_client_node.cpp)
add_dependencies(tutorial_server_node ${${PROJECT_NAME}_EXPORTED_TARGETS} ${catkin_EXPORTED_TARGETS})
add_dependencies(tutorial_client_node ${${PROJECT_NAME}_EXPORTED_TARGETS} ${catkin_EXPORTED_TARGETS})
target_link_libraries(tutorial_server_node ${catkin_LIBRARIES})
target_link_libraries(tutorial_client_node ${catkin_LIBRARIES})
運作編譯指令編譯功能包:
$ cd ~/ros_ws/
$ catkin_make
編譯完成後打開三個終端分别運作roscore、server、client(運作指令可參考subscriber與advertiser部分),運作結果如圖所示:
rospy執行個體
- server : tutorial_server.py
#! /usr/bin/env python
import sys
import rospy
#載入服務類型
from tutorial.srv import RobotStateService,RobotStateServiceRequest,RobotStateServiceResponse
mode_state = 0
hardware_state = 0
#服務回調函數
def callback(req):
global mode_state
res = RobotStateServiceResponse()
if req.state == 255:
res.mode_state = mode_state
res.hardware_state = hardware_state
else:
mode_state = req.state
res.mode_state = mode_state
res.hardware_state = hardware_state
rospy.loginfo("I had got a request")
return res
def server():
#初始化節點
rospy.init_node("tutorial_server",anonymous=True)
#定義server,指定回調函數
server = rospy.Service("robot_state_srv",RobotStateService,callback)
rospy.loginfo("Server is ready.")
rospy.spin()
if __name__ == '__main__':
server()
- client : tutorial_client.py
#!/usr/bin/env python
import rospy
#載入服務類型
from tutorial.srv import RobotStateService,RobotStateServiceRequest,RobotStateServiceResponse
def client():
#初始化節點
rospy.init_node("tutorial_client",anonymous=True)
#等待服務
rospy.wait_for_service("robot_state_srv")
try:
#定義用戶端
client = rospy.ServiceProxy("robot_state_srv",RobotStateService)
#請求用戶端
resp = client(255)
rospy.loginfo("service call sucessed.")
except rospy.ServiceException as e:
rospy.loginfo("service call failed: %s",e)
if __name__ == '__main__':
client()
-
運作節點
運作方法參照advertiser/subscriber的rospy執行個體運作即可,運作結果如圖所示:
actionlib編寫
actionlib是ROS中一個很重要的功能包集合,盡管在ROS中已經提供了srevice機制來滿足請求—響應式的使用場景,但是假如某個請求執行時間很長,在此期間使用者想檢視執行的進度或者取消這個請求的話,service機制就不能滿足了,但是actionlib可滿足使用者這種需求。如,控制機器人運動到地圖中某一目标位置時,這個過程可能複雜而漫長,執行過程中還可能強制中斷或回報資訊,這時就需要用到actionlib。
actionlib使用client-server工作模式,ActionClient 和ActionServer通過"ROS Action Protocol"進行通信,"ROS Action Protocol"以ROS消息方式進行傳輸。此外ActionClient 和ActionServer給使用者提供了一些簡單的接口,使用者使用這些接口可以完成goal請求(client-side)和goal執行(server-side)。
ActionClient 和ActionServer之間使用action protocol通信,action protocol就是預定義的一組ROS message,這些message被放到ROS topic上在 ActionClient 和ActionServer之間進行傳實作二者的溝通。
roscpp執行個體
*actionlib server
#include <ros/ros.h>
#include <tutorial/AutoDockingAction.h>
#include <actionlib/server/simple_action_server.h>
typedef actionlib::SimpleActionServer<tutorial::AutoDockingAction> Server;
void executeCB(const tutorial::AutoDockingGoal::ConstPtr &goal,Server *as)
{
int i = 10;
tutorial::AutoDockingResult result;
tutorial::AutoDockingFeedback feedback;
ros::Rate r(0.5);
//執行動作,可以在此處編寫需要的動作
while(ros::ok())
{
if(i <= 0)
{
feedback.command.linear.x = i;
feedback.command.angular.z = 10 - i;
as->publishFeedback(feedback);
break;
}
i --;
ROS_INFO("%d action server is executing.",i);
r.sleep();
}
result.is_docked = true;
as->setSucceeded(result);
ROS_INFO("action server execute complete");
}
int main(int argc ,char** argv)
{
ros::init(argc,argv,"action_server_node");
ros::NodeHandle nh;
Server server(nh,"auto_dock",boost::bind(&executeCB,_1,&server),false);
server.start();
ros::spin();
return 0;
}
- actionlib client
#include <ros/ros.h>
#include <actionlib/client/simple_action_client.h>
#include <tutorial/AutoDockingAction.h>
typedef actionlib::SimpleActionClient<tutorial::AutoDockingAction> Client;
int main(int argc,char** argv)
{
ros::init(argc,argv,"action_client_node");
ros::NodeHandle nh;
//建立用戶端
Client client("auto_dock",true);
//連接配接用戶端
client.waitForServer();
tutorial::AutoDockingGoal goal;
goal.dock_pose.header.stamp = ros::Time::now();
goal.dock_pose.header.frame_id = "map";
goal.dock_pose.pose.position.x = 1.62830536433;
goal.dock_pose.pose.position.y = 10.4706644315;
goal.dock_pose.pose.position.z = 0.0;
goal.dock_pose.pose.orientation.x = 0.0;
goal.dock_pose.pose.orientation.y = 0.0;
goal.dock_pose.pose.orientation.z = 0.808817724421;
goal.dock_pose.pose.orientation.w = 0.588059426132;
goal.use_move_base = true;
//發送goal
client.sendGoal(goal);
while (ros::ok())
{
//循環查詢狀态
bool flag = client.waitForResult(ros::Duration(2.0));
if (client.getState() == actionlib::SimpleClientGoalState::SUCCEEDED)
{
ROS_INFO("succeeded!");
break;
}
else if(client.getState() == actionlib::SimpleClientGoalState::ACTIVE)
{
ROS_INFO("state = %s.", client.getState().toString().c_str());
continue;
}else
{
ROS_INFO("state = %s.", client.getState().toString().c_str());
return 0;
}
ROS_INFO("flag,%d",flag);
}
return 0;
}
-
編譯運作
在CMakeList.txt中添加如下代碼:
add_executable(action_server_node src/action_server_node.cpp)
add_executable(action_client_node src/action_client_node.cpp)
add_dependencies(action_server_node ${${PROJECT_NAME}_EXPORTED_TARGETS} ${catkin_EXPORTED_TARGETS})
add_dependencies(action_client_node ${${PROJECT_NAME}_EXPORTED_TARGETS} ${catkin_EXPORTED_TARGETS})
target_link_libraries(action_server_node ${catkin_LIBRARIES})
target_link_libraries(action_client_node ${catkin_LIBRARIES})
運作catkin_make編譯節點後,運作節點(參照advertiser/subscriber節點運作方式),運作效果如圖所示:
rospy執行個體
- actionlib server
#!/usr/bin/env python
import rospy
import actionlib
import tutorial.msg
class ActionServer(object):
_feedback = tutorial.msg.AutoDockingFeedback()
_result = tutorial.msg.AutoDockingResult()
def __init__(self,name):
self._action_name = name
#定義server
self._as = actionlib.SimpleActionServer(self._action_name,tutorial.msg.AutoDockingAction,execute_cb=self.execute_cb,auto_start = False)
self._as.start()
#定義回調函數
def execute_cb(self,goal):
rate = rospy.Rate(1)
success = True
i = 10
#主循環體,執行動作
while not rospy.is_shutdown():
#收到cancel指令或其他指令
if self._as.is_preempt_requested():
rospy.loginfo("%s: Preempted.",self._action_name)
self._as.set_preempted()
success = False
break
self._feedback.command.linear.x = i
self._feedback.command.angular.z = 10 - i
self._as.publish_feedback(self._feedback)
rospy.loginfo("%d action server is executing.",i)
i = i - 1
if i < 0:
break
rospy.loginfo("action server exit.")
#執行完成
if success:
self._result.is_docked = success
rospy.loginfo("%s: Succeeded.",self._action_name)
self._as.set_succeeded(self._result)
if __name__ == '__main__':
rospy.init_node('action_server')
server = ActionServer("auto_dock")
rospy.spin()
- actionlib client
#!/usr/bin/env python
import rospy
import actionlib
import tutorial.msg
def action_client():
#定義client
client = actionlib.SimpleActionClient("auto_dock",tutorial.msg.AutoDockingAction)
#等待連接配接
client.wait_for_server()
#設定并發送goal
goal = tutorial.msg.AutoDockingGoal()
goal.dock_pose.header.stamp = rospy.Time.now()
goal.dock_pose.header.frame_id = "map"
goal.dock_pose.pose.position.x = 1.62830536433
goal.dock_pose.pose.position.y = 10.4706644315
goal.dock_pose.pose.position.z = 0.0
goal.dock_pose.pose.orientation.x = 0.0
goal.dock_pose.pose.orientation.y = 0.0
goal.dock_pose.pose.orientation.z = 0.808817724421
goal.dock_pose.pose.orientation.w = 0.588059426132
goal.use_move_base = True
client.send_goal(goal)
#等待結果
client.wait_for_result()
return client.get_result()
if __name__ == '__main__':
try:
rospy.init_node("action_client")
result = action_client()
rospy.loginfo('rusult:'+str(result.is_docked))
except rospy.ROSInterruptException:
pass
-
運作節點
運作節點,參照advertiser/subscriber的rospy節點,運作結果如圖所示: