Newer
Older
from sot_talos_balance.control_manager import ControlManager
from sot_talos_balance.example import Example
from sot_talos_balance.parameter_server import ParameterServer
from dynamic_graph.tracer_real_time import TracerRealTime
from time import sleep
from sot_talos_balance.base_estimator import BaseEstimator
from sot_talos_balance.madgwickahrs import MadgwickAHRS
from sot_talos_balance.imu_offset_compensation import ImuOffsetCompensation

François Bailly
committed
from sot_talos_balance.dcm_estimator import DcmEstimator
from dynamic_graph.sot.core.operator import Mix_of_vector
from dynamic_graph.sot.core.operator import Selec_of_vector
from sot_talos_balance.nd_trajectory_generator import NdTrajectoryGenerator
from sot_talos_balance.joint_position_controller import JointPositionController
from sot_talos_balance.admittance_controller import AdmittanceController
from sot_talos_balance.dummy_dcm_estimator import DummyDcmEstimator
from sot_talos_balance.com_admittance_controller import ComAdmittanceController
from sot_talos_balance.dcm_controller import DcmController
from sot_talos_balance.dcm_com_controller import DcmComController
from sot_talos_balance.simple_zmp_estimator import SimpleZmpEstimator
# python
from sot_talos_balance.utils.filter_utils import create_chebi1_checby2_series_filter
from sot_talos_balance.utils.sot_utils import Bunch
from dynamic_graph import plug
def create_extend_mix(n_in,n_out):
assert n_out>n_in
mix_of_vector = Mix_of_vector( "mix " + str(n_in) + "-" + str(n_out) )
mix_of_vector.setSignalNumber(3)
n_diff = n_out-n_in
mix_of_vector.addSelec(1,0,n_diff)
mix_of_vector.addSelec(2,n_diff,n_in)
mix_of_vector.default.value=[0.0]*n_out
mix_of_vector.signal("sin1").value = [0.0]*n_diff
mix_of_vector.signal("sin2").value = [2.0]*n_in
return mix_of_vector
def create_joint_trajectory_generator(dt):
jtg = NdTrajectoryGenerator("jtg");
jtg.trigger.value = 1.0;
jtg.init(dt, N_JOINTS);
return jtg;
def create_config_trajectory_generator(dt):
N_CONFIG = N_JOINTS + 6
jtg = NdTrajectoryGenerator("jtg");
jtg.initial_value.value = tuple(N_CONFIG*[0.0]);
jtg.trigger.value = 1.0;
jtg.init(dt, N_CONFIG);
return jtg;
def create_com_trajectory_generator(dt,robot):
comTrajGen = NdTrajectoryGenerator("comTrajGen");
comTrajGen.initial_value.value = robot.dynamic.com.value
comTrajGen.trigger.value = 1.0;
comTrajGen.init(dt, 3);
return comTrajGen;
def create_joint_controller(Kp):
controller = JointPositionController("posctrl")
controller.Kp.value = Kp
return controller
def create_joint_admittance_controller(joint,Kp,dt,robot):
controller = AdmittanceController("jadmctrl")
robot.stateselec = Selec_of_vector("state_selec")
robot.stateselec.selec(joint+6,joint+7)
plug(robot.device.state,robot.stateselec.sin)
plug(robot.stateselec.sout,controller.state)
robot.tauselec = Selec_of_vector("tau_selec")
robot.tauselec.selec(joint,joint+1)
plug(robot.device.ptorque,robot.tauselec.sin)
plug(robot.tauselec.sout,controller.tau)
controller.tauDes.value = [0.0]
controller.init(dt, 1)
controller.setPosition([ robot.device.state.value[joint+6] ])
def create_imu_offset_compensation(robot, dt):
imu_offset_compensation = ImuOffsetCompensation('imu_offset_comp');
imu_offset_compensation.init(dt);
plug(robot.device.accelerometer, imu_offset_compensation.accelerometer_in);
plug(robot.device.gyrometer, imu_offset_compensation.gyrometer_in);
return imu_offset_compensation;
def create_device_filters(robot, dt):
filters = Bunch();
filters.joints_kin = create_chebi1_checby2_series_filter("joints_kin", dt, N_JOINTS);
filters.ft_RF_filter = create_chebi1_checby2_series_filter("ft_RF_filter", dt, 6);
filters.ft_LF_filter = create_chebi1_checby2_series_filter("ft_LF_filter", dt, 6);
filters.ft_RH_filter = create_chebi1_checby2_series_filter("ft_RH_filter", dt, 6);
filters.ft_LH_filter = create_chebi1_checby2_series_filter("ft_LH_filter", dt, 6);
filters.torque_filter = create_chebi1_checby2_series_filter("ptorque_filter", dt, N_JOINTS);
filters.acc_filter = create_chebi1_checby2_series_filter("acc_filter", dt, 3);
filters.gyro_filter = create_chebi1_checby2_series_filter("gyro_filter", dt, 3);
filters.estimator_kin = create_chebi1_checby2_series_filter("estimator_kin", dt, N_JOINTS);
plug(robot.device.joint_angles, filters.estimator_kin.x); # device.state, device.joint_angles or device.motor_angles ?
plug(robot.device.forceRLEG, filters.ft_RF_filter.x);
plug(robot.device.forceLLEG, filters.ft_LF_filter.x);
plug(robot.device.forceRARM, filters.ft_RH_filter.x);
plug(robot.device.forceLARM, filters.ft_LH_filter.x);
plug(robot.device.ptorque, filters.torque_filter.x);
# switch following lines if willing to use imu offset compensation
#~ plug(robot.imu_offset_compensation.accelerometer_out, filters.acc_filter.x);
plug(robot.device.accelerometer, filters.acc_filter.x);
#~ plug(robot.imu_offset_compensation.gyrometer_out, filters.gyro_filter.x);
plug(robot.device.gyrometer, filters.gyro_filter.x);
return filters
def create_be_filters(robot, dt):
be_filters = Bunch();
be_filters.test = create_chebi1_checby2_series_filter("test_filter", dt, N_JOINTS);
plug(robot.base_estimator.q, be_filters.test.x);
return be_filters
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
def create_ctrl_manager(conf, motor_params, dt, robot_name='robot'):
ctrl_manager = ControlManager("ctrl_man");
ctrl_manager.tau_predicted.value = N_JOINTS*(0.0,);
ctrl_manager.i_measured.value = N_JOINTS*(0.0,);
ctrl_manager.tau_max.value = N_JOINTS*(conf.TAU_MAX,);
ctrl_manager.i_max.value = N_JOINTS*(conf.CURRENT_MAX,);
ctrl_manager.u_max.value = N_JOINTS*(conf.CTRL_MAX,);
# Init should be called before addCtrlMode
# because the size of state vector must be known.
ctrl_manager.init(dt, conf.urdfFileName, robot_name)
# Set the map from joint name to joint ID
for key in conf.mapJointNameToID:
ctrl_manager.setNameToId(key,conf.mapJointNameToID[key])
# Set the map joint limits for each id
for key in conf.mapJointLimits:
ctrl_manager.setJointLimitsFromId(key,conf.mapJointLimits[key][0], \
conf.mapJointLimits[key][1])
# Set the force limits for each id
for key in conf.mapForceIdToForceLimits:
ctrl_manager.setForceLimitsFromId(key,tuple(conf.mapForceIdToForceLimits[key][0]), \
tuple(conf.mapForceIdToForceLimits[key][1]))
# Set the force sensor id for each sensor name
for key in conf.mapNameToForceId:
ctrl_manager.setForceNameToForceId(key,conf.mapNameToForceId[key])
# Set the map from the urdf joint list to the sot joint list
ctrl_manager.setJointsUrdfToSot(conf.urdftosot)
# Set the foot frame name
for key in conf.footFrameNames:
ctrl_manager.setFootFrameName(key,conf.footFrameNames[key])
# Set IMU hosting joint name
ctrl_manager.setImuJointName(conf.ImuJointName)
ctrl_manager.setRightFootForceSensorXYZ(conf.rightFootSensorXYZ);
ctrl_manager.setRightFootSoleXYZ(conf.rightFootSoleXYZ);
return ctrl_manager;
def create_base_estimator(robot, dt, conf, robot_name="robot"):
base_estimator = BaseEstimator('base_estimator');
base_estimator.init(dt, robot_name);

François Bailly
committed
plug(robot.device.joint_angles, base_estimator.joint_positions); # device.state, device.joint_angles or device.motor_angles ?
plug(robot.device_filters.ft_LF_filter.x_filtered, base_estimator.forceLLEG)
plug(robot.device_filters.ft_RF_filter.x_filtered, base_estimator.forceRLEG)
plug(robot.device_filters.ft_LF_filter.dx, base_estimator.dforceLLEG)
plug(robot.device_filters.ft_RF_filter.dx, base_estimator.dforceRLEG)
plug(robot.device_filters.estimator_kin.dx, base_estimator.joint_velocities);
plug(robot.imu_filters.imu_quat, base_estimator.imu_quaternion);
plug(robot.device_filters.gyro_filter.x_filtered, base_estimator.gyroscope);
plug(robot.device_filters.acc_filter.x_filtered, base_estimator.accelerometer);
base_estimator.K_fb_feet_poses.value = conf.K_fb_feet_poses;
base_estimator.w_lf_in.value = conf.w_lf_in;
base_estimator.w_rf_in.value = conf.w_rf_in;
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
base_estimator.set_imu_weight(conf.w_imu);
base_estimator.set_stiffness_right_foot(conf.K);
base_estimator.set_stiffness_left_foot(conf.K);
base_estimator.set_zmp_std_dev_right_foot(conf.std_dev_zmp)
base_estimator.set_zmp_std_dev_left_foot(conf.std_dev_zmp)
base_estimator.set_normal_force_std_dev_right_foot(conf.std_dev_fz)
base_estimator.set_normal_force_std_dev_left_foot(conf.std_dev_fz)
base_estimator.set_zmp_margin_right_foot(conf.zmp_margin)
base_estimator.set_zmp_margin_left_foot(conf.zmp_margin)
base_estimator.set_normal_force_margin_right_foot(conf.normal_force_margin)
base_estimator.set_normal_force_margin_left_foot(conf.normal_force_margin)
base_estimator.set_right_foot_sizes(conf.RIGHT_FOOT_SIZES)
base_estimator.set_left_foot_sizes(conf.LEFT_FOOT_SIZES)
return base_estimator;
def create_imu_filters(robot, dt):
imu_filter = MadgwickAHRS('imu_filter');
imu_filter.init(dt);
plug(robot.device_filters.acc_filter.x_filtered, imu_filter.accelerometer); # no IMU compensation
plug(robot.device_filters.gyro_filter.x_filtered, imu_filter.gyroscope); # no IMU compensation
return imu_filter;
def addTrace(tracer, entity, signalName):
"""
Add a signal to a tracer
"""
signal = '{0}.{1}'.format(entity.name, signalName);
filename = '{0}-{1}'.format(entity.name, signalName);
tracer.add(signal, filename);
def addSignalsToTracer(tracer, device, outputs):
for sign in outputs :
addTrace(tracer,device,sign);
return
def create_tracer(robot,entity,tracer_name, outputs=None):
tracer = TracerRealTime(tracer_name)
tracer.setBufferSize(80*(2**20))
tracer.open('/tmp','dg_','.dat')
robot.device.after.addSignal('{0}.triger'.format(tracer.name))
if outputs is not None:
addSignalsToTracer(tracer, entity, outputs)
return tracer
def reset_tracer(device,tracer):
tracer.stop();
sleep(0.2);
tracer.close();
sleep(0.2);
tracer.clear();
sleep(0.2);
tracer = create_tracer(device, tracer.name);
return tracer;
def dump_tracer(tracer):
tracer.stop();
sleep(0.2);
tracer.dump()
sleep(0.2);
tracer.close();
def create_rospublish(robot, name):
rospub = RosPublish(name)
robot.device.after.addSignal(rospub.name+'.trigger')
return rospub
def create_topic(rospub, entity, signalName, robot=None, data_type='vector'):
if not entity.hasSignal(signalName): # check needed to prevent creation of broken topic
raise AttributeError( 'Entity %s does not have signal %s' % (entity.name, signalName) )
rospub_signalName = '{0}_{1}'.format(entity.name, signalName)
topicname = '/sot/{0}/{1}'.format(entity.name, signalName)
rospub.add(data_type,rospub_signalName,topicname)
plug(entity.signal(signalName), rospub.signal(rospub_signalName))
if robot is not None:
robot.device.after.addSignal( '{0}.{1}'.format(entity.name, signalName) )
def create_dummy_dcm_estimator(robot):
from math import sqrt
estimator = DummyDcmEstimator("dummy")
robot.dynamic.com.recompute(0)
h = robot.dynamic.com.value[2]
g = 9.81
omega = sqrt(g/h)
estimator.mass.value = mass
estimator.omega.value = omega
plug(robot.dynamic.com,estimator.com)
plug(robot.dynamic.momenta,estimator.momenta)
estimator.init()
return estimator
def create_com_admittance_controller(Kp,dt,robot):
controller = ComAdmittanceController("comAdmCtrl")
controller.Kp.value = Kp
plug(robot.dynamic.zmp,controller.zmp)
robot.dynamic.zmp.recompute(0)
controller.zmpDes.value = robot.dynamic.zmp.value
controller.ddcomDes.value = [0.0,0.0,0.0]
controller.init(dt)
robot.dynamic.com.recompute(0)
controller.setState(robot.dynamic.com.value,[0.0,0.0,0.0])
return controller
def create_dcm_controller(Kp,Ki,dt,robot,dcmSignal):
from math import sqrt
controller = DcmController("dcmCtrl")
robot.dynamic.com.recompute(0)
h = robot.dynamic.com.value[2]
g = 9.81
omega = sqrt(g/h)
controller.Kp.value = Kp
controller.Ki.value = Ki
controller.decayFactor.value = 0.2
controller.mass.value = mass
controller.omega.value = omega
plug(robot.dynamic.com,controller.com)
plug(dcmSignal,controller.dcm)
robot.dynamic.zmp.recompute(0)
controller.zmpDes.value = robot.dynamic.zmp.value
controller.dcmDes.value = robot.dynamic.zmp.value
controller.init(dt)
return controller
def create_dcm_com_controller(Kp,Ki,dt,robot,dcmSignal):
from math import sqrt
controller = DcmComController("dcmComCtrl")
robot.dynamic.com.recompute(0)
h = robot.dynamic.com.value[2]
g = 9.81
omega = sqrt(g/h)
controller.Kp.value = Kp
controller.Ki.value = Ki
controller.decayFactor.value = 0.2
controller.mass.value = mass
controller.omega.value = omega
controller.ddcomDes.value = [0.0,0.0,0.0]
plug(dcmSignal,controller.dcm)
robot.dynamic.com.recompute(0)
controller.comDes.value = robot.dynamic.com.value
controller.dcmDes.value = (robot.dynamic.com.value[0], robot.dynamic.com.value[1], 0.0)
controller.init(dt)
return controller
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
def create_parameter_server(conf, dt, robot_name='robot'):
param_server = ParameterServer("param_server");
# Init should be called before addCtrlMode
# because the size of state vector must be known.
param_server.init(dt, conf.urdfFileName, robot_name)
# Set the map from joint name to joint ID
for key in conf.mapJointNameToID:
param_server.setNameToId(key,conf.mapJointNameToID[key])
# Set the map joint limits for each id
for key in conf.mapJointLimits:
param_server.setJointLimitsFromId(key,conf.mapJointLimits[key][0], \
conf.mapJointLimits[key][1])
# Set the force limits for each id
for key in conf.mapForceIdToForceLimits:
param_server.setForceLimitsFromId(key,tuple(conf.mapForceIdToForceLimits[key][0]), \
tuple(conf.mapForceIdToForceLimits[key][1]))
# Set the force sensor id for each sensor name
for key in conf.mapNameToForceId:
param_server.setForceNameToForceId(key,conf.mapNameToForceId[key])
# Set the map from the urdf joint list to the sot joint list
param_server.setJointsUrdfToSot(conf.urdftosot)
# Set the foot frame name
for key in conf.footFrameNames:
param_server.setFootFrameName(key,conf.footFrameNames[key])
# Set IMU hosting joint name
param_server.setImuJointName(conf.ImuJointName)
param_server.setRightFootForceSensorXYZ(conf.rightFootSensorXYZ);
param_server.setRightFootSoleXYZ(conf.rightFootSoleXYZ);
return param_server;
def create_example(robot_name='robot', firstAdd = 0., secondAdd = 0.):
example = Example('example')
example.firstAddend.value = firstAdd
example.secondAddend.value = secondAdd
example.init(robot_name)
return example

François Bailly
committed
def create_dcm_estimator(robot, dt, robot_name='robot'):
dcm_estimator = DcmEstimator('dcm_estimator')
dcm_estimator.init(dt, robot_name)
plug(robot.base_estimator.q, dcm_estimator.q)
plug(robot.base_estimator.v, dcm_estimator.v)
return dcm_estimator
def create_zmp_estimator(robot):
estimator = SimpleZmpEstimator("zmpEst")
plug(robot.dynamic.LF,estimator.poseLeft)
plug(robot.dynamic.RF,estimator.poseRight)
plug(robot.device.forceLLEG,estimator.wrenchLeft)
plug(robot.device.forceRLEG,estimator.wrenchRight)