Commit b1087d95 authored by Joseph Mirabel's avatar Joseph Mirabel Committed by Joseph Mirabel

Fix multithreading issues in RosQueuedSubscribe

parent dc68ddb9
......@@ -5,6 +5,7 @@
# include <boost/shared_ptr.hpp>
# include <boost/thread/mutex.hpp>
# include <boost/thread/condition_variable.hpp>
# include <dynamic-graph/entity.h>
# include <dynamic-graph/signal-time-dependent.h>
......@@ -105,6 +106,7 @@ namespace dynamicgraph
// synchronize with method reader
rmutex.lock();
frontIdx = backIdx = 0;
fullCondition.notify_all ();
rmutex.unlock();
wmutex.unlock();
}
......@@ -113,7 +115,7 @@ namespace dynamicgraph
{
return frontIdx == backIdx;
}
bool full () const
{
return ((backIdx + 1) % BufferSize) == frontIdx;
......@@ -121,10 +123,10 @@ namespace dynamicgraph
size_type size () const
{
if (frontIdx < backIdx)
return frontIdx + BufferSize - backIdx;
if (frontIdx <= backIdx)
return backIdx - frontIdx;
else
return frontIdx - backIdx;
return backIdx + BufferSize - frontIdx;
}
SignalPtr_t signal;
......@@ -134,6 +136,7 @@ namespace dynamicgraph
size_type backIdx;
buffer_t buffer;
boost::mutex wmutex, rmutex;
boost::condition_variable fullCondition;
T last;
bool init;
......
......@@ -11,12 +11,13 @@
# include "dynamic_graph_bridge_msgs/Vector.h"
namespace dg = dynamicgraph;
typedef boost::mutex::scoped_lock scoped_lock;
namespace dynamicgraph
{
namespace internal
{
static const int BUFFER_SIZE = 50;
static const int BUFFER_SIZE = 1 << 10;
template <typename T>
struct Add
......@@ -65,19 +66,22 @@ namespace dynamicgraph
void BindedSignal<T, N>::writer (const R& data)
{
// synchronize with method clear
wmutex.lock();
boost::mutex::scoped_lock lock(wmutex);
boost::mutex dummy;
boost::unique_lock<boost::mutex> lock_dummy (dummy);
while (full()) {
fullCondition.wait (lock_dummy);
}
converter (buffer[backIdx], data);
// No need to synchronize with reader here because:
// - if the buffer was not empty, then it stays not empty,
// - if it was empty, then the current value will be used at next time. It
// means the transmission bandwidth is too low.
backIdx = (backIdx+1) % N;
assert(!full());
if (!init) {
last = buffer[backIdx];
init = true;
}
wmutex.unlock();
backIdx = (backIdx+1) % N;
}
template <typename T, int N>
......@@ -85,8 +89,8 @@ namespace dynamicgraph
{
// synchronize with method clear:
// If reading from the list cannot be done, then return last value.
bool readingIsEnabled = rmutex.try_lock();
if (!readingIsEnabled || entity->readQueue_ == -1 || time < entity->readQueue_) {
scoped_lock lock(rmutex, boost::try_to_lock);
if (!lock.owns_lock() || entity->readQueue_ == -1 || time < entity->readQueue_) {
data = last;
} else {
if (empty())
......@@ -95,10 +99,9 @@ namespace dynamicgraph
data = buffer[frontIdx];
frontIdx = (frontIdx + 1) % N;
last = data;
fullCondition.notify_all();
}
}
if (readingIsEnabled)
rmutex.unlock();
return data;
}
} // end of namespace internal.
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment