Commit 59c95055 authored by Nicolas Mansard's avatar Nicolas Mansard Committed by Nicolas Mansard
Browse files

New python scripts.

parent fa2ecd59
......@@ -266,5 +266,16 @@ INSTALL(FILES
${CMAKE_CURRENT_SOURCE_DIR}/dynamic_graph/sot/core/meta_tasks.py
${CMAKE_CURRENT_SOURCE_DIR}/dynamic_graph/sot/core/meta_task_6d.py
${CMAKE_CURRENT_SOURCE_DIR}/dynamic_graph/sot/core/meta_tasks_kine.py
${CMAKE_CURRENT_SOURCE_DIR}/dynamic_graph/sot/core/meta_task_posture.py
${CMAKE_CURRENT_SOURCE_DIR}/dynamic_graph/sot/core/meta_task_visual_point.py
DESTINATION ${PYTHON_SITELIB}/dynamic_graph/sot/core
)
INSTALL(FILES
${CMAKE_CURRENT_SOURCE_DIR}/dynamic_graph/sot/core/utils/__init__.py
${CMAKE_CURRENT_SOURCE_DIR}/dynamic_graph/sot/core/utils/attime.py
${CMAKE_CURRENT_SOURCE_DIR}/dynamic_graph/sot/core/utils/history.py
${CMAKE_CURRENT_SOURCE_DIR}/dynamic_graph/sot/core/utils/thread_interruptible_loop.py
${CMAKE_CURRENT_SOURCE_DIR}/dynamic_graph/sot/core/utils/viewer_loger.py
${CMAKE_CURRENT_SOURCE_DIR}/dynamic_graph/sot/core/utils/viewer_helper.py
DESTINATION ${PYTHON_SITELIB}/dynamic_graph/sot/core/utils
)
from dynamic_graph.sot.core.meta_task_6d import toFlags
from dynamic_graph import plug
from dynamic_graph.sot.core import *
from dynamic_graph.sot.core.meta_tasks import setGain
from dynamic_graph.sot.dyninv import *
from dynamic_graph.sot.core.matrix_util import matrixToTuple, vectorToTuple,rotate
from numpy import matrix, identity, zeros, eye
class MetaTaskPosture(object):
postureRange = { \
"rleg": range(6,12), \
"lleg": range(12,18), \
"chest": range(18,20), \
"head": range(20,22), \
"rarm": range(22,28), \
"rhand": [28], \
"larm": range(29,35), \
"lhand": [35], \
}
nbDof = None
def __init__(self,dyn,name="posture"):
self.dyn=dyn
self.name=name
self.feature = FeatureGeneric('feature'+name)
self.featureDes = FeatureGeneric('featureDes'+name)
self.gain = GainAdaptive('gain'+name)
plug(dyn.position,self.feature.errorIN)
robotDim = len(dyn.position.value)
self.feature.jacobianIN.value = matrixToTuple( identity(robotDim) )
self.feature.setReference(self.featureDes.name)
def plugTask(self):
self.task.add(self.feature.name)
plug(self.task.error,self.gain.error)
plug(self.gain.gain,self.task.controlGain)
@property
def ref(self):
return self.featureDes.errorIN.value
@ref.setter
def ref(self,v):
self.featureDes.errorIN.value = v
def gotoq(self,gain=None,qdes=None,**kwargs):
act=list()
if qdes!=None:
if isinstance(qdes,tuple): qdes=matrix(qdes).T
else:
if MetaTaskPosture.nbDof==None:
MetaTaskPosture.nbDof = len(self.feature.errorIN.value)
qdes = zeros((MetaTaskPosture.nbDof,1))
for limbName,jointValues in kwargs.items():
limbRange = self.postureRange[limbName]
act += limbRange
if jointValues!=[]:
if isinstance(jointValues,matrix):
qdes[limbRange,0] = vectorToTuple(jointValues)
else: qdes[limbRange,0] = jointValues
self.ref = vectorToTuple(qdes)
self.feature.selec.value = toFlags(act)
setGain(self.gain,gain)
class MetaTaskKinePosture(MetaTaskPosture):
def __init__(self,dyn,name="posture"):
MetaTaskPosture.__init__(self,dyn,name)
self.task = Task('task'+name)
self.plugTask()
from dynamic_graph import plug
from dynamic_graph.sot.core import *
from dynamic_graph.sot.core.meta_tasks import setGain
class MetaTaskVisualPoint(object):
name=''
opPoint=''
dyn=0
task=0
feature=0
featureDes=0
proj=0
def opPointExist(self,opPoint):
sigsP = filter(lambda x: x.getName().split(':')[-1] == opPoint,
self.dyn.signals())
sigsJ = filter(lambda x: x.getName().split(':')[-1] == 'J'+opPoint,
self.dyn.signals())
return len(sigsP)==1 & len(sigsJ)==1
def defineDynEntities(self,dyn):
self.dyn=dyn
def createOpPoint(self,opPoint,opPointRef = 'right-wrist'):
self.opPoint=opPoint
if self.opPointExist(opPoint): return
self.dyn.createOpPoint(opPoint,opPointRef)
def createOpPointModif(self):
self.opPointModif = OpPointModifier('opmodif'+self.name)
plug(self.dyn.signal(self.opPoint),self.opPointModif.signal('positionIN'))
plug(self.dyn.signal('J'+self.opPoint),self.opPointModif.signal('jacobianIN'))
self.opPointModif.activ = False
def createFeatures(self):
self.feature = FeatureVisualPoint('feature'+self.name)
self.featureDes = FeatureVisualPoint('feature'+self.name+'_ref')
self.feature.selec.value = '11'
def createTask(self):
self.task = Task('task'+self.name)
def createGain(self):
self.gain = GainAdaptive('gain'+self.name)
self.gain.set(0.1,0.1,125e3)
def createProj(self):
self.proj = VisualPointProjecter('proj'+self.name)
def plugEverything(self):
self.feature.setReference(self.featureDes.name)
plug(self.dyn.signal(self.opPoint),self.proj.signal('transfo'))
plug(self.proj.signal('point2D'),self.feature.signal('xy'))
plug(self.proj.signal('depth'),self.feature.signal('Z'))
plug(self.dyn.signal('J'+self.opPoint),self.feature.signal('Jq'))
self.task.add(self.feature.name)
plug(self.task.error,self.gain.error)
plug(self.gain.gain,self.task.controlGain)
def __init__(self,name,dyn,opPoint,opPointRef='right-wrist'):
self.name=name
self.defineDynEntities(dyn)
self.createOpPoint(opPoint,opPointRef)
self.createOpPointModif()
self.createFeatures()
self.createTask()
self.createGain()
self.createProj()
self.plugEverything()
@property
def ref(self):
return self.featureDes.xy.value
@ref.setter
def ref(self,m):
self.featureDes.xy.value = m
@property
def target(self):
return self.proj.point3D
@target.setter
def target(self,m):
self.proj.point3D.value = m
@property
def opmodif(self):
if not self.opPointModif.activ: return False
else: return self.opPointModif.getTransformation()
@opmodif.setter
def opmodif(self,m):
if isinstance(m,bool) and m==False:
plug(self.dyn.signal(self.opPoint),self.proj.signal('transfo'))
plug(self.dyn.signal('J'+self.opPoint),self.feature.signal('Jq'))
self.opPointModif.activ = False
else:
if not self.opPointModif.activ:
plug(self.opPointModif.signal('position'),self.proj.signal('transfo') )
plug(self.opPointModif.signal('jacobian'),self.feature.signal('Jq'))
self.opPointModif.setTransformation(m)
self.opPointModif.activ = True
def goto3D(self,point3D,gain=None,ref2D=None,selec=None):
self.target = point3D
if ref2D!=None: self.ref=ref2D
if selec!=None: self.feature.selec.value = selec
setGain(self.gain,gain)
# Define a python class to execute sequencial action during a SOT execution.
#
# Examples would be:
# attime(1800,lambda: sigset(sot.damping,0.9))
# attime(1920,lambda: sigset(sot.damping,0.1))
# attime(1920,lambda: pop(tw))
# attime(850,lambda: sigset(taskSupportSmall.controlGain,0.01))
# @attime(400)
# def action400():
# print 'toto'
#
class attimeAlways:
None
ALWAYS = attimeAlways()
class attimeStop:
None
STOP = attimeStop()
class Calendar:
def __init__(self):
self.events=dict()
self.ping = list()
#self.periodic=list()
def __repr__(self):
res=''
for iter,funpairs in sorted( self.events.iteritems() ):
res += str(iter)+": \n"
for funpair in funpairs:
if funpair[1]=='': res+=funpair[0]+'\n'
else: res += str(funpair[1])+'\n'
return res
def stop(self,*args): self.registerEvents(STOP,*args)
def registerEvent( self,iter,pairfundoc ):
#if iter==ALWAYS:
# self.periodic.append(pairfundoc)
# return
if iter==STOP:
self.events[ALWAYS].remove(pairfundoc)
if not iter in self.events.keys(): self.events[iter] = list()
self.events[iter].append(pairfundoc)
def registerEvents( self,iter,*funs ):
'''
3 entry types are possible: 1. only the functor. 2. a pair
(functor,doc). 3. a list of pairs (functor,doc).
'''
if len(funs)==2 and callable(funs[0]) and isinstance( funs[1],str ):
self.registerEvent(iter, ( funs[0],funs[1] ) )
else:
for fun in funs:
if isinstance( fun,tuple ):
self.registerEvent(iter,fun)
else: # assert iscallable(fun)
if 'functor' in fun.__dict__:
self.registerEvent(iter, (fun.functor,fun.functor.__doc__) )
else:
self.registerEvent(iter, (fun,fun.__doc__) )
def addPing(self,f): self.ping.append(f)
def callPing(self):
for f in self.ping: f()
def run(self,iter,*args):
if ALWAYS in self.events:
for fun,doc in self.events[ALWAYS]:
fun(*args)
if iter in self.events:
self.callPing()
for fun,doc in self.events[iter]:
intro = "At time "+str(iter)+": "
if doc!=None: print intro, doc
else:
if fun.__doc__ != None: print intro, fun.__doc__
else: print intro,"Runing ", fun
fun(*args)
def __call__(self,iterarg,*funs):
if len(funs)==0: return self.generatorDecorator(iterarg)
else: self.registerEvents(iterarg,*funs)
def generatorDecorator(self,iterarg):
"""
This next calling pattern is a little bit strange. Use it to decorate
a function definition: @attime(30) def run30(): ...
"""
class calendarDeco:
iterRef=iterarg
calendarRef=self
fun=None
def __init__(selfdeco,functer):
if functer.__doc__==None: functer.__doc__ = "No doc fun"
if len(functer.__doc__)>0:
selfdeco.__doc__ = functer.__doc__
selfdeco.__doc__ += " (will be run at time "+str(selfdeco.iterRef)+")"
selfdeco.fun=functer
selfdeco.calendarRef.registerEvents(selfdeco.iterRef,functer,functer.__doc__)
def __call__(selfdeco,*args):
selfdeco.fun(*args)
return calendarDeco
def fastForward(self,t):
for i in range(t+1): self.run(i)
attime=Calendar()
sigset = ( lambda s,v : s.__class__.value.__set__(s,v) )
refset = ( lambda mt,v : mt.__class__.ref.__set__(mt,v) )
# This class is used to record the data during an execution and restore it at a
# given time isntant.
from numpy import *
from dynamic_graph.sot.core.matrix_util import vectorToTuple
class History:
def __init__(self,dynEnt,freq=100,zmpSig=None):
self.q = list()
self.qdot = list()
self.zmp = list()
self.freq=freq
self.zmpSig = zmpSig
self.dynEnt = dynEnt
self.withZmp = (self.zmpSig!=None) and ("waist" in map(lambda x:x.name,self.dynEnt.signals()) )
def record(self):
i=self.dynEnt.position.time
if i%self.freq == 0:
self.q.append(self.dynEnt.position.value)
self.qdot.append(self.dynEnt.velocity.value)
if self.withZmp:
waMwo = matrix(self.dynEnt.waist.value).I
wo_z=matrix(self.zmpSig.value+(1,)).T
self.zmp.append(list(vectorToTuple(waMwo*wo_z)))
def restore(self,t):
t= int(t/self.freq)
print "robot.set(",self.q[t],")"
print "robot.setVelocity(",self.qdot[t],")"
print "T0 = ",t
print "robot.state.time = T0"
print "[ t.feature.position.recompute(T0) for t in refreshTaskList]"
print "attime.fastForward(T0)"
def dumpToOpenHRP(self,baseName = "dyninv",sample = 1):
filePos = open(baseName+'.pos','w')
fileRPY = open(baseName+'.hip','w')
fileWaist = open(baseName+'.waist','w')
sampleT = 0.005
for nT,q in enumerate(self.q):
fileRPY.write(str(sampleT*nT)+' '+str(q[3])+' '+str(q[4])+' '+str(q[5])+'\n')
fileWaist.write(str(sampleT*nT)+' '+str(q[0])+' '+str(q[1])+' '+str(q[2])+' '+str(q[3])+' '+str(q[4])+' '+str(q[5])+'\n')
filePos.write(str(sampleT*nT)+' ')
for j in range(6,36):
filePos.write(str(q[j])+' ')
filePos.write(10*' 0'+'\n')
if self.withZmp:
fileZMP = open(baseName+'.zmp','w')
for nT,z in enumerate(self.zmp):
fileZMP.write(str(sampleT*nT)+' '+str(z[0])+' '+str(z[1])+' '+str(z[2])+'\n')
filePos0 = open(baseName+'_pos0.py','w')
filePos0.write( "dyninv_posinit = '" )
q0 = self.q[0]
for x in q0[6:36]:
filePos0.write( str(x*180.0/pi)+' ' )
filePos0.write( " 0 0 0 0 0 0 0 0 0 0 '")
# This class embeds a given loop function (generally by using a decorator) into a
# dedicated thread, whose execution can be stoped or run on demand from the
# script thread.
#
# To use the previous class, a 'loop' function has to be define.
# Everything will be embedded by using the decorator below. Just
# use it as:
# >>> @loopInThread
# >>> def Runner():
# >>> to what you want here
# >>> runner = Runner()
# >>> runner.pause()/play()/quit() ...
import threading, time
class ThreadInterruptibleLoop(threading.Thread):
isQuit=False
isPlay=False
sleepTime=1e-3
previousHandler = None
isOnce=0
isRunning=False
iter=0
def __init__(self):
threading.Thread.__init__(self)
self.daemon = True
def quit(self): self.isQuit = True
def setPlay(self,mode):
self.isPlay = mode
def play(self):
if not self.isRunning: self.start()
self.isOnce = False
self.setPlay(True)
def pause(self): self.setPlay(False)
def once(self):
self.isOnce=True
self.setPlay(True)
def run(self):
self.isQuit=False
self.isRunning=True
while not self.isQuit:
if self.isPlay:
self.loop()
self.iter+=1
if self.isOnce: self.pause()
time.sleep(self.sleepTime)
self.isRunning=False
print 'Thread loop will now end.'
def start(self):
self.setPlay(True)
threading.Thread.start(self)
def restart(self):
self.join()
self.play()
self.setSigHandler()
threading.Thread.start(self)
def loop(self):
None
# To use the previous class, a 'loop' function has to be define.
# Everything will be embedded by using the decorator below. Just
# use it as:
# >>> @loopInThread
# >>> def Runner():
# >>> to what you want here
# >>> runner = Runner()
# >>> runner.pause()/play()/quit() ...
def loopInThread( funLoop ):
class ThreadViewer(ThreadInterruptibleLoop):
def __init__(self):
ThreadInterruptibleLoop.__init__(self)
# self.start()
def loop(self):
funLoop()
return ThreadViewer
from dynamic_graph.script_shortcuts import optionalparentheses
# Define the 4 classical shortcuts to control the loop.
def loopShortcuts(runner):
@optionalparentheses
def go(): runner.play()
@optionalparentheses
def stop(): runner.pause()
@optionalparentheses
def next(): runner.loop()
class NextInc:
def __add__(self,x):
for i in range(x): next()
n=NextInc()
return [go,stop,next,n]
class RobotViewerFaked:
def update(*args): None
def updateElementConfig(*args): None
def stateFullSize(robot,additionalData = () ):
return [float(val) for val in robot.state.value+additionalData]
def refreshView( robot ):
if robot.name=='robot': name='hrp'
else: name=robot.name
robot.viewer.updateElementConfig(name,robot.stateFullSize())
for f in robot.displayList:
f()
def incrementView(robot,dt):
'''Increment then refresh.'''
robot.incrementNoView(dt)
robot.refresh()
def setView( robot,*args ):
'''Set robot config then refresh.'''
robot.setNoView(*args)
#print('view')
robot.refresh()
def addRobotViewer(robot,**args):
'''
Arguments are:
* small=False
* server='XML-RPC' == { 'XML-RPC' | 'CORBA' }
* verbose=True
'''
verbose = args.get('verbose',True)
small = args.get('small',False)
small_extra = args.get('small_extra',10)
server = args.get('server','XML-RPC')
try:
import robotviewer
RobotClass = robot.__class__
if small:
if verbose: print 'using a small robot'
RobotClass.stateFullSize = lambda x: stateFullSize(x,small_extra*(0.0,))
else: RobotClass.stateFullSize = stateFullSize
robot.viewer = robotviewer.client(server)
RobotClass.refresh = refreshView
RobotClass.incrementNoView = RobotClass.increment
RobotClass.increment = incrementView
RobotClass.setNoView = RobotClass.set
RobotClass.set = setView
robot.displayList= []
# Check the connection
if args.get('dorefresh',True):
robot.refresh()
except:
if verbose: print "No robot viewer, sorry."
robot.viewer = RobotViewerFaked()
# ------------------------------------------------------------------------------
# ------------------------------------------------------------------------------
# ------------------------------------------------------------------------------
# Add a visual output when an event is called.
class VisualPinger:
def __init__(self,viewer):
self.pos = 1
self.viewer=viewer
self.refresh()
def refresh(self):
self.viewer.updateElementConfig('pong', [ 0, 0.9, self.pos*0.1, 0, 0, 0 ])
def __call__(self):
self.pos += 1
self.refresh()
def updateComDisplay(robot,comsig,objname='com'):
if comsig.time > 0:
robot.viewer.updateElementConfig(objname,[comsig.value[0],comsig.value[1],0,0,0,0])
class ViewerLoger:
'''
This class replace the robotviewer client and log the data sent to the
viewer for future replay.
Example of use:
from viewer_loger import ViewerLoger
robot.viewer = ViewerLoger(robot)
'''
def __init__(self,robot):
self.robot=robot
self.viewer=robot.viewer
self.fileMap = {}
import glob,os
for f in glob.glob('/tmp/view*.dat'):
os.remove(f)
def test(a,b,c):
None
def updateElementConfig(self,name,state):
t=self.robot.state.time