-
Notifications
You must be signed in to change notification settings - Fork 5
Expand file tree
/
Copy pathcontroller.py
More file actions
102 lines (90 loc) · 4.89 KB
/
controller.py
File metadata and controls
102 lines (90 loc) · 4.89 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
#! /usr/bin/env python
import random
import rospy
# Import constant name defined to structure the architecture.
from arch_skeleton import architecture_name_mapper as anm
# Import the ActionServer implementation used.
from actionlib import SimpleActionServer
# Import custom message, actions and services.
from arch_skeleton.msg import ControlFeedback, ControlResult
from arch_skeleton.srv import SetPose
import arch_skeleton # This is required to pass the `PlanAction` type for instantiating the `SimpleActionServer`.
# A tag for identifying logs producer.
LOG_TAG = anm.NODE_CONTROLLER
# An action server to simulate motion controlling.
# Given a plan as a set of via points, it simulate the movements
# to reach each point with a random delay. This server updates
# the current robot position stored in the `robot-state` node.
class ControllingAction(object):
def __init__(self):
# Get random-based parameters used by this server
self._random_motion_time = rospy.get_param(anm.PARAM_CONTROLLER_TIME, [0.1, 2.0])
# Instantiate and start the action server based on the `SimpleActionServer` class.
self._as = SimpleActionServer(anm.ACTION_CONTROLLER,
arch_skeleton.msg.ControlAction,
execute_cb=self.execute_callback,
auto_start=False)
self._as.start()
# Log information.
log_msg = (f'`{anm.ACTION_CONTROLLER}` Action Server initialised. It will navigate trough the plan with a delay '
f'between each via point spanning in [{self._random_motion_time[0]}, {self._random_motion_time[1]}).')
rospy.loginfo(anm.tag_log(log_msg, LOG_TAG))
# The callback invoked when a client set a goal to the `controller` server.
# This function requires a list of via points (i.e., the plan), and it simulate
# a movement through each point with a delay spanning in
# ['self._random_motion_time[0]`, `self._random_motion_time[1]`).
# As soon as each via point is reached, the related robot position is updated
# in the `robot-state` node.
def execute_callback(self, goal):
# Check if the provided plan is processable. If not, this service will be aborted.
if goal is None or goal.via_points is None or len(goal.via_points) == 0:
rospy.logerr(anm.tag_log('No via points provided! This service will be aborted!', LOG_TAG))
self._as.set_aborted()
return
# Construct the feedback and loop for each via point.
feedback = ControlFeedback()
rospy.loginfo(anm.tag_log('Server is controlling...', LOG_TAG))
for point in goal.via_points:
# Check that the client did not cancel this service.
if self._as.is_preempt_requested():
rospy.loginfo(anm.tag_log('Service has been cancelled by the client!', LOG_TAG))
# Actually cancel this service.
self._as.set_preempted()
return
# Wait before to reach the following via point. This is just for testing purposes.
delay = random.uniform(self._random_motion_time[0], self._random_motion_time[1])
rospy.sleep(delay)
# Publish a feedback to the client to simulate that a via point has been reached.
feedback.reached_point = point
self._as.publish_feedback(feedback)
# Set the new current position into the `robot-state` node.
_set_pose_client(point)
# Log current robot position.
log_msg = f'Reaching point ({point.x}, {point.y}).'
rospy.loginfo(anm.tag_log(log_msg, LOG_TAG))
# Publish the results to the client.
result = ControlResult()
result.reached_point = feedback.reached_point
rospy.loginfo(anm.tag_log('Motion control successes.', LOG_TAG))
self._as.set_succeeded(result)
return # Succeeded.
# Update the current robot `pose` stored in the `robot-state` node.
# This method is performed for each point provided in the action's server feedback.
def _set_pose_client(pose):
# Eventually, wait for the server to be initialised.
rospy.wait_for_service(anm.SERVER_SET_POSE)
try:
# Log service call.
log_msg = f'Set current robot position to the `{anm.SERVER_SET_POSE}` node.'
rospy.loginfo(anm.tag_log(log_msg, LOG_TAG))
# Call the service and set the current robot position.
service = rospy.ServiceProxy(anm.SERVER_SET_POSE, SetPose)
service(pose) # The `response` is not used.
except rospy.ServiceException as e:
log_msg = f'Server cannot set current robot position: {e}'
rospy.logerr(anm.tag_log(log_msg, LOG_TAG))
if __name__ == '__main__':
# Initialise the node, its action server, and wait.
rospy.init_node(anm.NODE_CONTROLLER, log_level=rospy.INFO)
server = ControllingAction()
rospy.spin()