Commit 97602d95 authored by Joseph Mirabel's avatar Joseph Mirabel Committed by Joseph Mirabel

Remove mutex in RosQueuedSubscribe

parent 0fd9dbb5
......@@ -70,13 +70,20 @@ namespace dynamicgraph
RosQueuedSubscribe* entity;
};
template <typename T>
template <typename T, int BufferSize>
struct BindedSignal : BindedSignalBase {
typedef dynamicgraph::Signal<T, int> Signal_t;
typedef boost::shared_ptr<Signal_t> SignalPtr_t;
typedef std::queue<T> Queue_t;
BindedSignal(RosQueuedSubscribe* e) : BindedSignalBase (e), init(false) {}
typedef std::vector<T> buffer_t;
typedef buffer_t::size_type size_type;
BindedSignal(RosQueuedSubscribe* e)
: BindedSignalBase (e)
, frontIdx(0)
, backIdx(0)
, buffer (BufferSize)
, init(false)
{}
~BindedSignal()
{
std::cout << signal->getName() << ": Delete" << std::endl;
......@@ -84,22 +91,47 @@ namespace dynamicgraph
clear();
}
// TODO Add synchronization between:
// - writer and clear (both changes variable backIdx)
// - reader and clear (both changes variable frontIdx)
void clear ()
{
qmutex.lock();
if (!queue.empty()) last = queue.back();
queue = Queue_t();
qmutex.unlock();
// qmutex.lock();
if (!empty()) {
if (backIdx == 0)
last = buffer[BufferSize-1];
else
last = buffer[backIdx-1];
}
frontIdx = backIdx = 0;
// qmutex.unlock();
}
bool empty () const
{
return frontIdx == backIdx;
}
bool full () const
{
return ((backIdx + 1) % BufferSize) == frontIdx;
}
std::size_t size () const
size_type size () const
{
return queue.size();
if (frontIdx < backIdx)
return frontIdx + BufferSize - backIdx;
else
return frontIdx - backIdx;
}
SignalPtr_t signal;
Queue_t queue;
boost::mutex qmutex;
/// Index of the next value to be read.
size_type frontIdx;
/// Index of the slot where to write next value (does not contain valid data).
size_type backIdx;
buffer_t buffer;
// boost::mutex qmutex;
T last;
bool init;
......
......@@ -16,6 +16,8 @@ namespace dynamicgraph
{
namespace internal
{
static const int BUFFER_SIZE = 50;
template <typename T>
struct Add
{
......@@ -25,7 +27,7 @@ namespace dynamicgraph
{
typedef typename SotToRos<T>::sot_t sot_t;
typedef typename SotToRos<T>::ros_const_ptr_t ros_const_ptr_t;
typedef BindedSignal<sot_t> BindedSignal_t;
typedef BindedSignal<sot_t, BUFFER_SIZE> BindedSignal_t;
typedef typename BindedSignal_t::Signal_t Signal_t;
// Initialize the bindedSignal object.
......@@ -49,7 +51,7 @@ namespace dynamicgraph
// -> No message should be lost because of a full buffer
bs->subscriber =
boost::make_shared<ros::Subscriber>
(rosSubscribe.nh ().subscribe (topic, 50, callback));
(rosSubscribe.nh ().subscribe (topic, BUFFER_SIZE, callback));
RosQueuedSubscribe::bindedSignal_t bindedSignal (bs);
rosSubscribe.bindedSignal ()[signal] = bindedSignal;
......@@ -57,9 +59,9 @@ namespace dynamicgraph
};
// template <typename T, typename R>
template <typename T>
template <typename T, int N>
template <typename R>
void BindedSignal<T>::writer (const R& data)
void BindedSignal<T, N>::writer (const R& data)
{
T value;
converter (value, data);
......@@ -67,26 +69,32 @@ namespace dynamicgraph
last = value;
init = true;
}
qmutex.lock();
queue.push (value);
qmutex.unlock();
// TODO: synchronize with method clear
// qmutex.lock();
buffer[backIdx].push (value);
// assert(!full());
backIdx = (backIdx+1) % N;
// TODO: synchronize with method clear
// qmutex.unlock();
}
template <typename T>
template <typename T, int N>
T& BindedSignal<T>::reader (T& data, int time)
{
if (entity->readQueue_ == -1 || time < entity->readQueue_) {
data = last;
} else {
qmutex.lock();
if (queue.empty())
// TODO: remove me
// qmutex.lock();
if (empty())
data = last;
else {
data = queue.front();
queue.pop();
data = buffer[frontIdx];
frontIdx = (frontIdx + 1) % N;
last = data;
}
qmutex.unlock();
// TODO: remove me
// qmutex.unlock();
}
return data;
}
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment