Commit af39e893 authored by Joseph Mirabel's avatar Joseph Mirabel Committed by Joseph Mirabel

Merge branch 'mutex_ros_subscribe'

parents 33224191 6cf2e937
......@@ -70,13 +70,20 @@ namespace dynamicgraph
RosQueuedSubscribe* entity;
};
template <typename T>
template <typename T, int BufferSize>
struct BindedSignal : BindedSignalBase {
typedef dynamicgraph::Signal<T, int> Signal_t;
typedef boost::shared_ptr<Signal_t> SignalPtr_t;
typedef std::queue<T> Queue_t;
BindedSignal(RosQueuedSubscribe* e) : BindedSignalBase (e), init(false) {}
typedef std::vector<T> buffer_t;
typedef typename buffer_t::size_type size_type;
BindedSignal(RosQueuedSubscribe* e)
: BindedSignalBase (e)
, frontIdx(0)
, backIdx(0)
, buffer (BufferSize)
, init(false)
{}
~BindedSignal()
{
std::cout << signal->getName() << ": Delete" << std::endl;
......@@ -84,22 +91,49 @@ namespace dynamicgraph
clear();
}
/// See comments in reader and writer for details about synchronisation.
void clear ()
{
qmutex.lock();
if (!queue.empty()) last = queue.back();
queue = Queue_t();
qmutex.unlock();
// synchronize with method writer
wmutex.lock();
if (!empty()) {
if (backIdx == 0)
last = buffer[BufferSize-1];
else
last = buffer[backIdx-1];
}
// synchronize with method reader
rmutex.lock();
frontIdx = backIdx = 0;
rmutex.unlock();
wmutex.unlock();
}
bool empty () const
{
return frontIdx == backIdx;
}
bool full () const
{
return ((backIdx + 1) % BufferSize) == frontIdx;
}
std::size_t size () const
size_type size () const
{
return queue.size();
if (frontIdx < backIdx)
return frontIdx + BufferSize - backIdx;
else
return frontIdx - backIdx;
}
SignalPtr_t signal;
Queue_t queue;
boost::mutex qmutex;
/// Index of the next value to be read.
size_type frontIdx;
/// Index of the slot where to write next value (does not contain valid data).
size_type backIdx;
buffer_t buffer;
boost::mutex wmutex, rmutex;
T last;
bool init;
......
......@@ -16,6 +16,8 @@ namespace dynamicgraph
{
namespace internal
{
static const int BUFFER_SIZE = 50;
template <typename T>
struct Add
{
......@@ -26,7 +28,7 @@ namespace dynamicgraph
{
typedef typename SotToRos<T>::sot_t sot_t;
typedef typename SotToRos<T>::ros_const_ptr_t ros_const_ptr_t;
typedef BindedSignal<sot_t> BindedSignal_t;
typedef BindedSignal<sot_t, BUFFER_SIZE> BindedSignal_t;
typedef typename BindedSignal_t::Signal_t Signal_t;
// Initialize the bindedSignal object.
......@@ -50,7 +52,7 @@ namespace dynamicgraph
// -> No message should be lost because of a full buffer
bs->subscriber =
boost::make_shared<ros::Subscriber>
(rosSubscribe.nh ().subscribe (topic, 50, callback));
(rosSubscribe.nh ().subscribe (topic, BUFFER_SIZE, callback));
RosQueuedSubscribe::bindedSignal_t bindedSignal (bs);
rosSubscribe.bindedSignal ()[signal] = bindedSignal;
......@@ -58,37 +60,45 @@ namespace dynamicgraph
};
// template <typename T, typename R>
template <typename T>
template <typename T, int N>
template <typename R>
void BindedSignal<T>::writer (const R& data)
void BindedSignal<T, N>::writer (const R& data)
{
T value;
converter (value, data);
// synchronize with method clear
wmutex.lock();
converter (buffer[backIdx], data);
// assert(!full());
// No need to synchronize with reader here because:
// - if the buffer was not empty, then it stays not empty,
// - if it was empty, then the current value will be used at next time. It
// means the transmission bandwidth is too low.
backIdx = (backIdx+1) % N;
if (!init) {
last = value;
last = buffer[backIdx];
init = true;
}
qmutex.lock();
queue.push (value);
qmutex.unlock();
wmutex.unlock();
}
template <typename T>
T& BindedSignal<T>::reader (T& data, int time)
template <typename T, int N>
T& BindedSignal<T, N>::reader (T& data, int time)
{
if (entity->readQueue_ == -1 || time < entity->readQueue_) {
// synchronize with method clear:
// If reading from the list cannot be done, then return last value.
bool readingIsEnabled = rmutex.try_lock();
if (!readingIsEnabled || entity->readQueue_ == -1 || time < entity->readQueue_) {
data = last;
} else {
qmutex.lock();
if (queue.empty())
if (empty())
data = last;
else {
data = queue.front();
queue.pop();
data = buffer[frontIdx];
frontIdx = (frontIdx + 1) % N;
last = data;
}
qmutex.unlock();
}
if (readingIsEnabled)
rmutex.unlock();
return data;
}
} // end of namespace internal.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment