ros_subscribe.hxx 3.61 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
#ifndef DYNAMIC_GRAPH_ROS_SUBSCRIBE_HXX
# define DYNAMIC_GRAPH_ROS_SUBSCRIBE_HXX
# include <vector>
# include <boost/bind.hpp>
# include <boost/date_time/posix_time/posix_time.hpp>
# include <dynamic-graph/signal-caster.h>
# include <dynamic-graph/linear-algebra.h>
# include <dynamic-graph/signal-cast-helper.h>
# include <std_msgs/Float64.h>
# include "dynamic_graph_bridge_msgs/Matrix.h"
# include "dynamic_graph_bridge_msgs/Vector.h"

namespace dg = dynamicgraph;

namespace dynamicgraph
{
  namespace internal
  {
19 20
    static const int BUFFER_SIZE = 50;

21 22 23 24
    template <typename T>
    struct Add
    {
      void operator () (RosQueuedSubscribe& rosSubscribe,
Joseph Mirabel's avatar
Joseph Mirabel committed
25
			const std::string& type,
26 27 28 29 30
			const std::string& signal,
			const std::string& topic)
      {
        typedef typename SotToRos<T>::sot_t sot_t;
	typedef typename SotToRos<T>::ros_const_ptr_t ros_const_ptr_t;
31
        typedef BindedSignal<sot_t, BUFFER_SIZE> BindedSignal_t;
32 33 34
	typedef typename BindedSignal_t::Signal_t Signal_t;

	// Initialize the bindedSignal object.
35
        BindedSignal_t* bs = new BindedSignal_t(&rosSubscribe);
36 37 38
        SotToRos<T>::setDefault (bs->last);

	// Initialize the signal.
Joseph Mirabel's avatar
Joseph Mirabel committed
39 40
	boost::format signalName ("RosQueuedSubscribe(%1%)::output(%2%)::%3%");
	signalName % rosSubscribe.getName () % type % signal;
41 42 43 44 45 46 47 48 49 50

	bs->signal.reset (new Signal_t (signalName.str ()));
        bs->signal->setFunction (boost::bind(&BindedSignal_t::reader, bs, _1, _2));
	rosSubscribe.signalRegistration (*bs->signal);

	// Initialize the subscriber.
	typedef boost::function<void (const ros_const_ptr_t& data)> callback_t;
	callback_t callback = boost::bind
	  (&BindedSignal_t::template writer<ros_const_ptr_t>, bs, _1);

51 52
  // Keep 50 messages in queue, but only 20 are sent every 100ms
  // -> No message should be lost because of a full buffer
53 54
	bs->subscriber =
	  boost::make_shared<ros::Subscriber>
55
	  (rosSubscribe.nh ().subscribe (topic, BUFFER_SIZE, callback)); 
56 57 58 59 60 61 62

	RosQueuedSubscribe::bindedSignal_t bindedSignal (bs);
	rosSubscribe.bindedSignal ()[signal] = bindedSignal;
      }
    };

    // template <typename T, typename R>
63
    template <typename T, int N>
64
    template <typename R>
65
    void BindedSignal<T, N>::writer (const R& data)
66
    {
67 68
      // synchronize with method clear
      wmutex.lock();
Joseph Mirabel's avatar
Joseph Mirabel committed
69
      converter (buffer[backIdx], data);
70 71 72 73 74 75
      // assert(!full());
      // No need to synchronize with reader here because:
      // - if the buffer was not empty, then it stays not empty,
      // - if it was empty, then the current value will be used at next time. It
      //   means the transmission bandwidth is too low.
      backIdx = (backIdx+1) % N;
76
      if (!init) {
Joseph Mirabel's avatar
Joseph Mirabel committed
77
        last = buffer[backIdx];
78 79
        init = true;
      }
80
      wmutex.unlock();
81 82
    }

83
    template <typename T, int N>
Joseph Mirabel's avatar
Joseph Mirabel committed
84
    T& BindedSignal<T, N>::reader (T& data, int time)
85
    {
86 87 88 89
      // synchronize with method clear:
      // If reading from the list cannot be done, then return last value.
      bool readingIsEnabled = rmutex.try_lock();
      if (!readingIsEnabled || entity->readQueue_ == -1 || time < entity->readQueue_) {
90
        data = last;
91
      } else {
92
        if (empty())
93 94
          data = last;
        else {
95 96
          data = buffer[frontIdx];
          frontIdx = (frontIdx + 1) % N;
97 98
          last = data;
        }
99
      }
100 101
      if (readingIsEnabled)
        rmutex.unlock();
102 103 104 105 106
      return data;
    }
  } // end of namespace internal.

  template <typename T>
Joseph Mirabel's avatar
Joseph Mirabel committed
107
  void RosQueuedSubscribe::add (const std::string& type, const std::string& signal, const std::string& topic)
108
  {
Joseph Mirabel's avatar
Joseph Mirabel committed
109
    internal::Add<T> () (*this, type, signal, topic);
110 111 112 113
  }
} // end of namespace dynamicgraph.

#endif //! DYNAMIC_GRAPH_ROS_SUBSCRIBE_HXX