Commit e6b4e5c9 authored by Olivier Stasse's avatar Olivier Stasse
Browse files

[tests] Add test_sot_talos_balance.test

The two commands rostest put in the README.md are working by launching
gazebo and each time a script validating the behavior.
The final position of the link valid if the behavior is correct or
not.
parent d05d8761
......@@ -72,7 +72,7 @@ endforeach()
# Testing part
if(CATKIN_ENABLE_TESTING)
catkin_add_nosetests(scripts/test_kine.py)
find_package(rostest REQUIRED)
add_rostest(tests/test_kine.test)
add_rostest(tests/test_sot_talos_balance.test)
endif()
......@@ -7,75 +7,140 @@ import rospy
import time
import roslaunch
import rospkg
import unittest
import math
from std_srvs.srv import Empty
# Start roscore
import subprocess
roscore = subprocess.Popen('roscore')
time.sleep(1)
# Get the path to talos_data
arospack = rospkg.RosPack()
talos_data_path = arospack.get_path('talos_data')
from gazebo_msgs.srv import *
# Start talos_gazebo
rospy.init_node('starting_talos_gazebo', anonymous=True)
uuid = roslaunch.rlutil.get_or_generate_uuid(None, False)
roslaunch.configure_logging(uuid)
from std_srvs.srv import Empty
cli_args = [talos_data_path+'/launch/talos_gazebo_alone.launch',
PKG_NAME='talos_integration_tests'
class TestSoTTalos(unittest.TestCase):
def validation_through_gazebo(self):
gzGetModelPropReq = rospy.ServiceProxy('/gazebo/get_model_state',
GetModelState)
gzGetModelPropResp = gzGetModelPropReq(model_name='talos')
f=open("/tmp/output.dat","w+")
f.write("x:"+str(gzGetModelPropResp.pose.position.x)+"\n")
f.write("y:"+str(gzGetModelPropResp.pose.position.y)+"\n")
f.write("z:"+str(gzGetModelPropResp.pose.position.z)+"\n")
dx=gzGetModelPropResp.pose.position.x-2.8331
dy=gzGetModelPropResp.pose.position.y-0.0405
dz=gzGetModelPropResp.pose.position.z-1.0019
ldistance = math.sqrt(dx*dx+dy*dy+dz*dz)
f.write("dist:"+str(ldistance))
f.close()
if ldistance<0.009:
self.assertTrue(True,msg="Converged to the desired position")
else:
self.assertFalse(False,
msg="Did not converged to the desired position")
def runTest(self):
# Start roscore
import subprocess
roscore = subprocess.Popen('roscore')
time.sleep(1)
# Get the path to talos_data
arospack = rospkg.RosPack()
talos_data_path = arospack.get_path('talos_data')
# Start talos_gazebo
rospy.init_node('starting_talos_gazebo', anonymous=True)
uuid = roslaunch.rlutil.get_or_generate_uuid(None, False)
roslaunch.configure_logging(uuid)
cli_args = [talos_data_path+'/launch/talos_gazebo_alone.launch',
'world:=empty_forced',
'enable_leg_passive:=false'
]
roslaunch_args = cli_args[1:]
roslaunch_file = [(roslaunch.rlutil.resolve_launch_arguments(cli_args)[0], roslaunch_args)]
launch_gazebo_alone = roslaunch.parent.ROSLaunchParent(uuid, roslaunch_file)
launch_gazebo_alone.start()
rospy.loginfo("talos_gazebo_alone started")
rospy.wait_for_service("/gazebo/pause_physics")
gazebo_pause_physics = rospy.ServiceProxy('/gazebo/pause_physics', Empty)
gazebo_pause_physics()
time.sleep(5)
# Spawn talos model in gazebo
launch_gazebo_spawn_hs = roslaunch.parent.ROSLaunchParent(uuid,
[talos_data_path+'/launch/talos_gazebo_spawn_hs.launch'])
#launch_gazebo_spawn_hs = roslaunch.parent.ROSLaunchParent(uuid,
# [talos_data_path+'/launch/talos_gazebo_spawn_hs_wide.launch'])
launch_gazebo_spawn_hs.start()
rospy.loginfo("talos_gazebo_spawn_hs started")
rospy.wait_for_service("/gains/arm_left_1_joint/set_parameters")
time.sleep(5)
gazebo_unpause_physics = rospy.ServiceProxy('/gazebo/unpause_physics', Empty)
gazebo_unpause_physics()
# Start roscontrol
launch_bringup = roslaunch.parent.ROSLaunchParent(uuid,
[talos_data_path+'/launch/talos_bringup.launch'])
launch_bringup.start()
rospy.loginfo("talos_bringup started")
# Start sot
roscontrol_sot_talos_path=arospack.get_path('roscontrol_sot_talos')
launch_roscontrol_sot_talos =roslaunch.parent.ROSLaunchParent(uuid,
[roscontrol_sot_talos_path+'/launch/sot_talos_controller_gazebo.launch'])
launch_roscontrol_sot_talos.start()
rospy.loginfo("roscontrol_sot_talos started")
time.sleep(5)
pkg_name='talos_integration_tests'
executable='test_sot_talos_balance.py'
node_name='test_sot_talos_balance_py'
test_sot_talos_balance_node = roslaunch.core.Node(pkg_name, executable,name=node_name)
launch_test_sot_talos_balance=roslaunch.scriptapi.ROSLaunch()
launch_test_sot_talos_balance.start()
test_sot_talos_balance_process = launch_test_sot_talos_balance.launch(test_sot_talos_balance_node)
rospy.spin()
roslaunch_args = cli_args[1:]
roslaunch_file = [(roslaunch.rlutil.resolve_launch_arguments\
(cli_args)[0], roslaunch_args)]
launch_gazebo_alone = roslaunch.parent.\
ROSLaunchParent(uuid, roslaunch_file)
launch_gazebo_alone.start()
rospy.loginfo("talos_gazebo_alone started")
rospy.wait_for_service("/gazebo/pause_physics")
gazebo_pause_physics = rospy.ServiceProxy('/gazebo/pause_physics',
Empty)
gazebo_pause_physics()
time.sleep(5)
# Spawn talos model in gazebo
launch_gazebo_spawn_hs = roslaunch.parent.ROSLaunchParent(uuid,\
[talos_data_path+'/launch/talos_gazebo_spawn_hs.launch'])
#launch_gazebo_spawn_hs = roslaunch.parent.ROSLaunchParent(uuid,
# [ros_data_path+'/launch/talos_gazebo_spawn_hs_wide.launch'])
launch_gazebo_spawn_hs.start()
rospy.loginfo("talos_gazebo_spawn_hs started")
rospy.wait_for_service("/gains/arm_left_1_joint/set_parameters")
time.sleep(5)
gazebo_unpause_physics = rospy.\
ServiceProxy('/gazebo/unpause_physics', Empty)
gazebo_unpause_physics()
# Start roscontrol
launch_bringup = roslaunch.parent.ROSLaunchParent(uuid,\
[talos_data_path+'/launch/talos_bringup.launch'])
launch_bringup.start()
rospy.loginfo("talos_bringup started")
# Start sot
roscontrol_sot_talos_path=arospack.get_path('roscontrol_sot_talos')
launch_roscontrol_sot_talos =roslaunch.parent.ROSLaunchParent(uuid,\
[roscontrol_sot_talos_path+\
'/launch/sot_talos_controller_gazebo.launch'])
launch_roscontrol_sot_talos.start()
rospy.loginfo("roscontrol_sot_talos started")
time.sleep(5)
pkg_name='talos_integration_tests'
executable='test_sot_talos_balance.py'
node_name='test_sot_talos_balance_py'
test_sot_talos_balance_node = roslaunch.core.\
Node(pkg_name, executable,name=node_name)
launch_test_sot_talos_balance=roslaunch.scriptapi.ROSLaunch()
launch_test_sot_talos_balance.start()
test_sot_talos_balance_process = launch_test_sot_talos_balance.\
launch(test_sot_talos_balance_node)
r = rospy.Rate(10)
while not rospy.is_shutdown():
# Test if sot_talos_balance is finished or not
if not test_sot_talos_balance_process.is_alive():
self.validation_through_gazebo()
# If it is finished then find exit status.
if test_sot_talos_balance_process.exit_code != 0:
exit_status = "test_sot_talos_balance failed"
else:
exit_status=None
print("Stopping SoT")
launch_roscontrol_sot_talos.shutdown()
print("Stopping Gazebo")
launch_gazebo_alone.shutdown()
rospy.signal_shutdown(exit_status)
# Terminate the roscore subprocess
print("Stop roscore")
roscore.terminate()
r.sleep()
if __name__ == '__main__':
import rosunit
rosunit.unitrun(PKG_NAME,'test_sot_talos_balance',TestSoTTalos)
......@@ -7,75 +7,135 @@ import rospy
import time
import roslaunch
import rospkg
import unittest
import math
from std_srvs.srv import Empty
from gazebo_msgs.srv import *
# Start roscore
import subprocess
roscore = subprocess.Popen('roscore')
time.sleep(1)
# Get the path to talos_data
arospack = rospkg.RosPack()
talos_data_path = arospack.get_path('talos_data')
# Start talos_gazebo
rospy.init_node('starting_talos_gazebo', anonymous=True)
uuid = roslaunch.rlutil.get_or_generate_uuid(None, False)
roslaunch.configure_logging(uuid)
cli_args = [talos_data_path+'/launch/talos_gazebo_alone.launch',
'world:=empty_forced',
'enable_leg_passive:=false'
]
roslaunch_args = cli_args[1:]
roslaunch_file = [(roslaunch.rlutil.resolve_launch_arguments(cli_args)[0], roslaunch_args)]
launch_gazebo_alone = roslaunch.parent.ROSLaunchParent(uuid, roslaunch_file)
launch_gazebo_alone.start()
rospy.loginfo("talos_gazebo_alone started")
rospy.wait_for_service("/gazebo/pause_physics")
gazebo_pause_physics = rospy.ServiceProxy('/gazebo/pause_physics', Empty)
gazebo_pause_physics()
time.sleep(5)
# Spawn talos model in gazebo
launch_gazebo_spawn_hs = roslaunch.parent.ROSLaunchParent(uuid,
[talos_data_path+'/launch/talos_gazebo_spawn_hs.launch'])
#launch_gazebo_spawn_hs = roslaunch.parent.ROSLaunchParent(uuid,
# [talos_data_path+'/launch/talos_gazebo_spawn_hs_wide.launch'])
launch_gazebo_spawn_hs.start()
rospy.loginfo("talos_gazebo_spawn_hs started")
rospy.wait_for_service("/gains/arm_left_1_joint/set_parameters")
time.sleep(5)
gazebo_unpause_physics = rospy.ServiceProxy('/gazebo/unpause_physics', Empty)
gazebo_unpause_physics()
# Start roscontrol
launch_bringup = roslaunch.parent.ROSLaunchParent(uuid,
[talos_data_path+'/launch/talos_bringup.launch'])
launch_bringup.start()
rospy.loginfo("talos_bringup started")
# Start sot
roscontrol_sot_talos_path=arospack.get_path('roscontrol_sot_talos')
launch_roscontrol_sot_talos =roslaunch.parent.ROSLaunchParent(uuid,
[roscontrol_sot_talos_path+'/launch/sot_talos_controller_gazebo.launch'])
launch_roscontrol_sot_talos.start()
rospy.loginfo("roscontrol_sot_talos started")
time.sleep(5)
pkg_name='talos_integration_tests'
executable='test_kine.py'
node_name='test_kine_py'
test_kine_node = roslaunch.core.Node(pkg_name, executable,name=node_name)
launch_test_kine=roslaunch.scriptapi.ROSLaunch()
launch_test_kine.start()
test_kine_process = launch_test_kine.launch(test_kine_node)
rospy.spin()
from std_srvs.srv import Empty
PKG_NAME='talos_integration_tests'
class TestSoTTalos(unittest.TestCase):
def validation_through_gazebo(self):
gzGetLinkPropReq = rospy.ServiceProxy('/gazebo/get_link_state',GetLinkState)
gzGetLinkPropResp = gzGetLinkPropReq(link_name='gripper_right_fingertip_1_link')
f=open("/tmp/output.dat","w+")
f.write("x:"+str(gzGetLinkPropResp.link_state.pose.position.x)+"\n")
f.write("y:"+str(gzGetLinkPropResp.link_state.pose.position.y)+"\n")
f.write("z:"+str(gzGetLinkPropResp.link_state.pose.position.z)+"\n")
dx=gzGetLinkPropResp.link_state.pose.position.x-0.5723
dy=gzGetLinkPropResp.link_state.pose.position.y+0.2885
dz=gzGetLinkPropResp.link_state.pose.position.z-0.7745
ldistance = math.sqrt(dx*dx+dy*dy+dz*dz)
f.write("dist:"+str(ldistance))
f.close()
if ldistance < 0.009:
self.assertTrue(True)
else:
self.assertTrue(False)
def runTest(self):
# Start roscore
import subprocess
roscore = subprocess.Popen('roscore')
time.sleep(1)
# Get the path to talos_data
arospack = rospkg.RosPack()
talos_data_path = arospack.get_path('talos_data')
# Start talos_gazebo
rospy.init_node('starting_talos_gazebo', anonymous=True)
uuid = roslaunch.rlutil.get_or_generate_uuid(None, False)
roslaunch.configure_logging(uuid)
cli_args = [talos_data_path+'/launch/talos_gazebo_alone.launch',
'world:=empty_forced',
'enable_leg_passive:=false',
]
roslaunch_args = cli_args[1:]
roslaunch_file = [(roslaunch.rlutil.resolve_launch_arguments(cli_args)[0], roslaunch_args)]
launch_gazebo_alone = roslaunch.parent.ROSLaunchParent(uuid, roslaunch_file)
launch_gazebo_alone.start()
rospy.loginfo("talos_gazebo_alone started")
rospy.wait_for_service("/gazebo/pause_physics")
gazebo_pause_physics = rospy.ServiceProxy('/gazebo/pause_physics', Empty)
gazebo_pause_physics()
time.sleep(5)
# Spawn talos model in gazebo
launch_gazebo_spawn_hs = roslaunch.parent.ROSLaunchParent(uuid,\
[talos_data_path+'/launch/talos_gazebo_spawn_hs.launch'])
#launch_gazebo_spawn_hs = roslaunch.parent.ROSLaunchParent(uuid,
# [talos_data_path+'/launch/talos_gazebo_spawn_hs_wide.launch'])
launch_gazebo_spawn_hs.start()
rospy.loginfo("talos_gazebo_spawn_hs started")
rospy.wait_for_service("/gains/arm_left_1_joint/set_parameters")
time.sleep(5)
gazebo_unpause_physics = \
rospy.ServiceProxy('/gazebo/unpause_physics', Empty)
gazebo_unpause_physics()
# Start roscontrol
launch_bringup = roslaunch.parent.ROSLaunchParent(uuid,\
[talos_data_path+'/launch/talos_bringup.launch'])
launch_bringup.start()
rospy.loginfo("talos_bringup started")
# Start sot
roscontrol_sot_talos_path=arospack.get_path('roscontrol_sot_talos')
launch_roscontrol_sot_talos =roslaunch.parent.ROSLaunchParent(uuid,\
[roscontrol_sot_talos_path+\
'/launch/sot_talos_controller_gazebo.launch'])
launch_roscontrol_sot_talos.start()
rospy.loginfo("roscontrol_sot_talos started")
time.sleep(5)
pkg_name='talos_integration_tests'
executable='test_kine.py'
node_name='test_kine_py'
test_kine_node = roslaunch.core.Node(pkg_name, executable,name=node_name)
launch_test_kine=roslaunch.scriptapi.ROSLaunch()
launch_test_kine.start()
test_kine_process = launch_test_kine.launch(test_kine_node)
r = rospy.Rate(10)
while not rospy.is_shutdown():
# Test if sot_talos_balance is finished or not
if not test_kine_process.is_alive():
self.validation_through_gazebo()
# If it is finished then find exit status.
if test_kine_process.exit_code != 0:
exit_status = "test_sot_talos_balance failed"
else:
exit_status=None
print("Stopping SoT")
launch_roscontrol_sot_talos.shutdown()
print("Stopping Gazebo")
launch_gazebo_alone.shutdown()
# Shutdown the node
rospy.signal_shutdown(exit_status)
# Terminate the roscore subprocess
print("Stop roscore")
roscore.terminate()
r.sleep()
if __name__ == '__main__':
import rosunit
rosunit.unitrun(PKG_NAME,'test_sot_talos_kine',TestSoTTalos)
......@@ -3,86 +3,65 @@ import sys
import rospy
import rospkg
import time
import unittest
import math
from std_srvs.srv import *
from dynamic_graph_bridge_msgs.srv import *
from gazebo_msgs.srv import *
PKG_NAME='talos_integration_tests'
class TestSoTTalos(unittest.TestCase):
def launchScript(self,code,title,description = ""):
rospy.loginfo(title)
rospy.loginfo(code)
for line in code:
if line != '' and line[0] != '#':
print line
answer = self.runCommandClient(str(line))
rospy.logdebug(answer)
print answer
runCommandClient = rospy.ServiceProxy('run_command', RunCommand)
def launchScript(code,title,description = ""):
rospy.loginfo(title)
rospy.loginfo(code)
for line in code:
if line != '' and line[0] != '#':
print line
answer = runCommandClient(str(line))
rospy.logdebug(answer)
print answer
rospy.loginfo("...done with "+title)
def test_sot_talos(self):
# Waiting for services
try:
rospy.loginfo("Waiting for run_command")
rospy.wait_for_service('/run_command')
rospy.loginfo("...ok")
rospy.loginfo("Waiting for start_dynamic_graph")
rospy.wait_for_service('/start_dynamic_graph')
rospy.loginfo("...ok")
rospy.loginfo("Waiting for /gazebo/get_link_state")
rospy.wait_for_service('/gazebo/get_link_state')
rospy.loginfo("...ok")
self.runCommandClient = rospy.ServiceProxy('run_command', RunCommand)
self.runCommandStartDynamicGraph = rospy.ServiceProxy('start_dynamic_graph', Empty)
# get an instance of RosPack with the default search paths
rospack = rospkg.RosPack()
# get the file path for rospy_tutorials
lpath = rospack.get_path(PKG_NAME)
print(lpath)
initCode = open( lpath + '/../../lib/'+PKG_NAME+'/appli.py', "r").read().split("\n")
rospy.loginfo("Stack of Tasks launched")
self.launchScript(initCode,'initialize SoT')
self.runCommandStartDynamicGraph()
self.runCommandClient("target = (0.5,-0.2,1.0)")
self.runCommandClient("gotoNd(taskRH,target,'111',(4.9,0.9,0.01,0.9))")
self.runCommandClient("sot.push(taskRH.task.name)")
time.sleep(10)
gzGetLinkPropReq = rospy.ServiceProxy('/gazebo/get_link_state',GetLinkState)
gzGetLinkPropResp = gzGetLinkPropReq(link_name='gripper_right_fingertip_1_link')
f=open("/tmp/output.dat","w+")
f.write("x:"+str(gzGetLinkPropResp.link_state.pose.position.x)+"\n")
f.write("y:"+str(gzGetLinkPropResp.link_state.pose.position.y)+"\n")
f.write("z:"+str(gzGetLinkPropResp.link_state.pose.position.z)+"\n")
dx=gzGetLinkPropResp.link_state.pose.position.x-0.5723
dy=gzGetLinkPropResp.link_state.pose.position.y+0.2885
dz=gzGetLinkPropResp.link_state.pose.position.z-0.7745
ldistance = math.sqrt(dx*dx+dy*dy+dz*dz)
f.write("dist:"+str(ldistance))
f.close()
if ldistance<0.09:
self.assertTrue(True)
else:
self.assertTrue(False)
except rospy.ServiceException, e:
rospy.logerr("Service call failed: %s" % e)
def runTest():
# Waiting for services
try:
rospy.loginfo("Waiting for run_command")
rospy.wait_for_service('/run_command')
rospy.loginfo("...ok")
rospy.loginfo("Waiting for start_dynamic_graph")
rospy.wait_for_service('/start_dynamic_graph')
rospy.loginfo("...ok")
rospy.loginfo("Waiting for /gazebo/get_link_state")
rospy.wait_for_service('/gazebo/get_link_state')
rospy.loginfo("...ok")
runCommandStartDynamicGraph = rospy.ServiceProxy('start_dynamic_graph',
Empty)
# get an instance of RosPack with the default search paths
rospack = rospkg.RosPack()
# get the file path for rospy_tutorials
lpath = rospack.get_path(PKG_NAME)
print(lpath)
initCode = open( lpath + '/../../lib/'+PKG_NAME+'/appli.py',\
"r").read().split("\n")
rospy.loginfo("Stack of Tasks launched")
launchScript(initCode,'initialize SoT')
runCommandStartDynamicGraph()
runCommandClient("target = (0.5,-0.2,1.0)")
runCommandClient("gotoNd(taskRH,target,'111',(4.9,0.9,0.01,0.9))")
runCommandClient("sot.push(taskRH.task.name)")
time.sleep(10)
except rospy.ServiceException, e:
rospy.logerr("Service call failed: %s" % e)
if __name__ == '__main__':
import rostest
rostest.rosrun(PKG_NAME,'test_sot_talos_kine',TestSoTTalos)
runTest()
......@@ -17,74 +17,44 @@ PKG_NAME='talos_integration_tests'
from sot_talos_balance.utils.run_test_utils import ask_for_confirmation, \
run_ft_calibration, run_test, runCommandClient
class TestSoTTalosBalance(unittest.TestCase):
# get an instance of RosPack with the default search paths
rospack = rospkg.RosPack()
# get the file path for rospy_tutorials
lpath = rospack.get_path(PKG_NAME)
print(lpath)
appli_file_name =lpath + '/../../lib/'+PKG_NAME+\
'/appli_dcmZmpControl_file.py'
time.sleep(2)
rospy.loginfo("Stack of Tasks launched")
test_folder = 'TestKajita2003StraightWalking64/20cm'
print('Using folder ' + test_folder)
rospy.loginfo("Waiting for run_command")
rospy.wait_for_service('/run_command')
rospy.loginfo("...ok")
runCommandClient('test_folder = "' + test_folder + '"')
run_test(appli_file_name,verbosity=1,interactive=False)
time.sleep(5)
# Connect ZMP reference and reset controllers
print('Connect ZMP reference')
runCommandClient('plug(robot.zmp_estimator.emergencyStop,robot.cm.emergencyStop_zmp)')
runCommandClient('plug(robot.dcm_control.zmpRef,robot.com_admittance_control.zmpDes)')
runCommandClient('robot.com_admittance_control.setState(robot.wp.comDes.value,[0.0,0.0,0.0])')
runCommandClient('robot.com_admittance_control.Kp.value = Kp_adm')
runCommandClient('robot.dcm_control.resetDcmIntegralError()')
runCommandClient('robot.dcm_control.Ki.value = Ki_dcm')
print('Executing the trajectory')
#time.sleep(1)
runCommandClient('robot.triggerTrajGen.sin.value = 1')
time.sleep(25)