FACT++  1.0
Queue.h
Go to the documentation of this file.
1 #ifndef FACT_Queue
2 #define FACT_Queue
3 
4 #include <list>
5 
6 #ifndef __CINT__
7 #include <thread>
8 #include <condition_variable>
9 #else
10 namespace std
11 {
12  class mutex;
13  class thread;
14  class condition_variable;
15  template<class T> class function<T>;
16 }
17 #endif
18 
19 
20 // The second template argument must support:
21 // iterator it = begin(); // get the next element to be processed
22 // erase(it); // erase the processed element from the queue
23 // push_back(); // add a new element to the queue
24 // emplace_back(); // emplace a new element to the queue
25 // splice(); // used to efficiently implement post with mutex
26 
27 template<class T, class List=std::list<T>>
28 class Queue
29 {
30  size_t fSize; // Only necessary for before C++11
31 
32  List fList;
33 
34  std::mutex fMutex; // Mutex needed for the conditional
35  std::condition_variable fCond; // Conditional
36 
37  enum state_t
38  {
44  kPrompt
45 
46  };
47 
48  state_t fState; // Stop signal for the thread
49 
50  typedef std::function<bool(const T &)> callback;
51  callback fCallback; // Callback function called by the thread
52 
53  std::thread fThread; // Handle to the thread
54 
55  void Thread()
56  {
57  std::unique_lock<std::mutex> lock(fMutex);
58 
59  // No filling allowed by default (the queue is
60  // always processed until it is empty)
61  size_t allowed = 0;
62 
63  while (1)
64  {
65  while (fSize==allowed && fState==kRun)
66  fCond.wait(lock);
67 
68  // Check if the State flag has been changed
69  if (fState==kAbort)
70  break;
71 
72  if (fState==kStop && fList.empty())
73  break;
74 
75  // If thread got just woken up, move back the state to kRun
76  if (fState==kTrigger)
77  fState = kRun;
78 
79  // Could have been a fState==kTrigger case
80  if (fList.empty())
81  continue;
82 
83  // During the unlocked state, fSize might change.
84  // The current size of the queue needs to be saved.
85  allowed = fSize;
86 
87  // get the first entry from the (sorted) list
88  const auto it = fList.begin();
89 
90  // Theoretically, we can loose a signal here, but this is
91  // not a problem, because then we detect a non-empty queue
92  lock.unlock();
93 
94  // If the first event in the queue could not be processed,
95  // no further processing makes sense until a new event has
96  // been posted (or the number of events in the queue has
97  // changed) [allowed>0], in the case processing was
98  // successfull [allowed==0], the next event will be processed
99  // immediately.
100 
101  if (fCallback && fCallback(*it))
102  allowed = 0;
103 
104  lock.lock();
105 
106  // Whenever an event was successfully processed, allowed
107  // is equal to zero and thus the event will be popped
108  if (allowed>0)
109  continue;
110 
111  fList.erase(it);
112  fSize--;
113  }
114 
115  fList.clear();
116  fSize = 0;
117 
118  fState = kIdle;
119  }
120 
121 public:
122  Queue(const callback &f, bool startup=true) : fSize(0), fState(kIdle), fCallback(f)
123  {
124  if (startup)
125  start();
126  }
127 
128  Queue(const Queue<T,List>& q) : fSize(0), fState(kIdle), fCallback(q.fCallback)
129  {
130  }
131 
133  {
134  fSize = 0;
135  fState = kIdle;
136  fCallback = q.fCallback;
137  return *this;
138  }
139 
140 #ifdef __MARS__ // Needed for the compilatio of the dictionary
141  Queue() : fSize(0), fState(kIdle)
142  {
143  }
144 #endif
145 
147  {
148  wait(true);
149  }
150 
151  bool start()
152  {
153  const std::lock_guard<std::mutex> lock(fMutex);
154  if (fState!=kIdle)
155  return false;
156 
157  fState = kRun;
158  fThread = std::thread(std::bind(&Queue::Thread, this));
159  return true;
160  }
161 
162  bool stop()
163  {
164  const std::lock_guard<std::mutex> lock(fMutex);
165  if (fState==kIdle)
166  return false;
167 
168  fState = kStop;
169  fCond.notify_one();
170 
171  return true;
172  }
173 
174  bool abort()
175  {
176  const std::lock_guard<std::mutex> lock(fMutex);
177  if (fState==kIdle)
178  return false;
179 
180  fState = kAbort;
181  fCond.notify_one();
182 
183  return true;
184  }
185 
186  bool wait(bool abrt=false)
187  {
188  {
189  const std::lock_guard<std::mutex> lock(fMutex);
190  if (fState==kIdle || fState==kPrompt)
191  return false;
192 
193  if (fState==kRun)
194  {
195  fState = abrt ? kAbort : kStop;
196  fCond.notify_one();
197  }
198  }
199 
200  fThread.join();
201  return true;
202  }
203 
205  {
206  const std::lock_guard<std::mutex> lock(fMutex);
207  if (fState!=kIdle || fSize>0)
208  return false;
209 
210  fState = kPrompt;
211  return true;
212  }
213 
215  {
216  const std::lock_guard<std::mutex> lock(fMutex);
217  if (fState!=kPrompt)
218  return false;
219 
220  fState = kIdle;
221  return true;
222  }
223 
224  bool setPromptExecution(bool state)
225  {
226  return state ? enablePromptExecution() : disablePromptExecution();
227  }
228 
229 
230  bool post(const T &val)
231  {
232  const std::lock_guard<std::mutex> lock(fMutex);
233  if (fState==kPrompt)
234  return fCallback(val);
235 
236  if (fState==kIdle)
237  return false;
238 
239  fList.push_back(val);
240  fSize++;
241 
242  fCond.notify_one();
243 
244  return true;
245  }
246 
247  bool notify()
248  {
249  const std::lock_guard<std::mutex> lock(fMutex);
250  if (fState!=kRun)
251  return false;
252 
253  fState = kTrigger;
254  fCond.notify_one();
255 
256  return true;
257  }
258 
259 #ifdef __GXX_EXPERIMENTAL_CXX0X__
260  template<typename... _Args>
261  bool emplace(_Args&&... __args)
262  {
263  const std::lock_guard<std::mutex> lock(fMutex);
264  if (fState==kPrompt)
265  return fCallback(T(__args...));
266 
267  if (fState==kIdle)
268  return false;
269 
270  fList.emplace_back(__args...);
271  fSize++;
272 
273  fCond.notify_one();
274 
275  return true;
276  }
277 
278  bool post(T &&val) { return emplace(std::move(val)); }
279 #endif
280 
281 #ifdef __GXX_EXPERIMENTAL_CXX0X__
282  bool move(List&& x, typename List::iterator i)
283 #else
284  bool move(List& x, typename List::iterator i)
285 #endif
286  {
287  const std::lock_guard<std::mutex> lock(fMutex);
288  if (fState==kIdle)
289  return false;
290 
291  fList.splice(fList.end(), x, i);
292  fSize++;
293 
294  fCond.notify_one();
295 
296  return true;
297  }
298 
299 #ifdef __GXX_EXPERIMENTAL_CXX0X__
300  bool move(List& x, typename List::iterator i) { return move(std::move(x), i); }
301 #endif
302 
303  size_t size() const
304  {
305  return fSize;
306  }
307 
308  bool empty() const
309  {
310  return fSize==0;
311  }
312 
313  bool operator<(const Queue& other) const
314  {
315  return fSize < other.fSize;
316  }
317 
318 };
319 
320 #endif
int start(int initState)
Definition: feeserver.c:1740
Physics trigger decision (PhysicTrigger)
Definition: HeadersFTM.h:206
List fList
Definition: Queue.h:32
Queue(const Queue< T, List > &q)
Definition: Queue.h:128
bool abort()
Definition: Queue.h:174
void operator=(const std::vector< uint16_t > &vec)
Definition: HeadersFTM.h:207
state_t fState
Definition: Queue.h:48
int i
Definition: db_dim_client.c:21
STL namespace.
std::function< bool(const T &)> callback
Definition: Queue.h:50
bool setPromptExecution(bool state)
Definition: Queue.h:224
bool start()
Definition: Queue.h:151
uint16_t fState
State of the FTM central state machine.
Definition: HeadersFTM.h:189
Definition: Queue.h:28
callback fCallback
Definition: Queue.h:51
bool disablePromptExecution()
Definition: Queue.h:214
bool notify()
Definition: Queue.h:247
std::condition_variable fCond
Definition: Queue.h:35
bool wait(bool abrt=false)
Definition: Queue.h:186
std::mutex fMutex
Definition: Queue.h:34
bool empty() const
Definition: Queue.h:308
bool enablePromptExecution()
Definition: Queue.h:204
std::thread fThread
Definition: Queue.h:53
bool stop()
Definition: Queue.h:162
bool post(const T &val)
Definition: Queue.h:230
void Thread()
Definition: Queue.h:55
size_t size() const
Definition: Queue.h:303
bool move(List &x, typename List::iterator i)
Definition: Queue.h:284
size_t fSize
Definition: Queue.h:30
Queue(const callback &f, bool startup=true)
Definition: Queue.h:122
bool operator<(const Queue &other) const
Definition: Queue.h:313
~Queue()
Definition: Queue.h:146