dlvhex  2.5.0
include/dlvhex2/ConcurrentMessageQueueOwning.h
Go to the documentation of this file.
00001 /* DMCS -- Distributed Nonmonotonic Multi-Context Systems.
00002  * Copyright (C) 2006-2015 Thomas Krennwallner
00003  *
00004  * This file is part of DMCS.
00005  *
00006  *  DMCS is free software: you can redistribute it and/or modify
00007  *  it under the terms of the GNU General Public License as published by
00008  *  the Free Software Foundation, either version 3 of the License, or
00009  *  (at your option) any later version.
00010  *
00011  *  DMCS is distributed in the hope that it will be useful,
00012  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
00013  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
00014  *  GNU General Public License for more details.
00015  *
00016  *  You should have received a copy of the GNU General Public License
00017  *  along with DMCS.  If not, see <http://www.gnu.org/licenses/>.
00018  */
00019 
00029 #ifndef _CONCURRENT_MESSAGE_QUEUE_OWNING_H
00030 #define _CONCURRENT_MESSAGE_QUEUE_OWNING_H
00031 
00032 #include <boost/shared_ptr.hpp>
00033 #include <boost/thread/mutex.hpp>
00034 #include <boost/thread/condition_variable.hpp>
00035 #include <boost/thread/thread_time.hpp>
00036 #include <boost/date_time/time_duration.hpp>
00037 
00038 #include <queue>
00039 
00040 namespace dlvhex
00041 {
00042 
00047     template<class MessageBase>
00048         class ConcurrentMessageQueueOwning
00049     {
00050         private:
00051             typedef boost::shared_ptr<MessageBase> MessagePtr;
00052 
00054             std::queue<MessagePtr> q;
00056             const std::size_t n;
00058             std::size_t enq;
00060             std::size_t deq;
00061 
00063             mutable boost::mutex mtx;
00065             boost::condition_variable cnd;
00066 
00068             inline void
00069             notifyConsumer() {
00070                 if (deq > 0) {   // is some consumer waiting?
00071                                  // notify one consuming thread
00072                     cnd.notify_one();
00073                 }
00074             }
00075 
00077             inline void
00078             notifyProducer() {
00079                 if (enq > 0) {   // is some producer waiting?
00080                                  // notify one producing thread
00081                     cnd.notify_one();
00082                 }
00083             }
00084 
00086             inline void
00087             waitOnCapacity(boost::mutex::scoped_lock& lock) {
00088                                  // maximum capacity reached
00089                 while (n == q.size()) {
00090                     ++enq;
00091                     cnd.wait(lock);
00092                     --enq;
00093                 }
00094 
00095                 notifyConsumer();
00096             }
00097 
00099             inline void
00100             waitOnEmpty(boost::mutex::scoped_lock& lock) {
00101                                  // minimum capacity reached
00102                 while (q.empty()) {
00103                     ++deq;
00104                     cnd.wait(lock);
00105                     --deq;
00106                 }
00107 
00108                 notifyProducer();
00109             }
00110 
00115             inline bool
00116             waitOnTimedCapacity(boost::mutex::scoped_lock& lock, const boost::posix_time::time_duration& t) {
00117                 bool no_timeout = true;
00118 
00119                                  // maximum capacity reached
00120                 while (n == q.size() && no_timeout) {
00121                     ++enq;
00122                     no_timeout = cnd.timed_wait(lock, t);
00123                     --enq;
00124                 }
00125 
00126                 notifyConsumer();
00127 
00128                 return no_timeout;
00129             }
00130 
00135             inline bool
00136             waitOnTimedEmpty(boost::mutex::scoped_lock& lock, const boost::posix_time::time_duration& t) {
00137                 bool no_timeout = true;
00138 
00139                                  // minimum capacity reached
00140                 while (q.empty() && no_timeout) {
00141                     ++deq;
00142                     no_timeout = cnd.timed_wait(lock, t);
00143                     --deq;
00144                 }
00145 
00146                 notifyProducer();
00147 
00148                 return no_timeout;
00149             }
00150 
00153             inline void
00154             pushMessage (MessagePtr m) {
00155                 q.push(m);
00156             }
00157 
00160             inline void
00161             popMessage (MessagePtr& m) {
00162                 m = q.front();
00163                 q.pop();
00164             }
00165 
00166         public:
00167 
00169             ConcurrentMessageQueueOwning()
00170                 : n(1), enq(0), deq(0)
00171                 { }
00172 
00178             ConcurrentMessageQueueOwning(std::size_t capacity)
00179                 : n(capacity > 0 ? capacity : 1), enq(0), deq(0)
00180                 { }
00181 
00184             ConcurrentMessageQueueOwning(const ConcurrentMessageQueueOwning<MessageBase>& q)
00185                 : n(q.n), enq(0), deq(0)
00186                 { }
00187 
00189             virtual
00190             ~ConcurrentMessageQueueOwning() {
00191                 flush();
00192             }
00193 
00195             void flush() { {
00196                     boost::mutex::scoped_lock lock(mtx);
00197                     // just pop all elements from the queue (the smart pointers automatically
00198                     // destruct the elements and free the memory)
00199                     while (!q.empty())
00200                         q.pop();
00201                 }
00202                 notifyProducer();
00203             }
00204 
00207             bool
00208                 empty () const
00209             {
00210                 boost::mutex::scoped_lock lock(mtx);
00211                 return q.empty();
00212             }
00213 
00216             bool
00217                 size () const
00218             {
00219                 return n;
00220             }
00221 
00225             void
00226             send (MessagePtr m, unsigned int prio) {
00227                 boost::mutex::scoped_lock lock(mtx);
00228                 waitOnCapacity(lock);
00229                 pushMessage(m);
00230             }
00231 
00236             bool
00237             try_send (MessagePtr m, unsigned int prio) {
00238                 boost::mutex::scoped_lock lock(mtx);
00239 
00240                 if (q.size() < n) {
00241                     pushMessage(m);
00242                     notifyConsumer();
00243                     return true;
00244                 }
00245 
00246                 return false;
00247             }
00248 
00254             bool
00255             timed_send (MessagePtr m, unsigned int prio, const boost::posix_time::time_duration& t) {
00256                 boost::mutex::scoped_lock lock(mtx);
00257 
00258                 if (waitOnTimedCapacity(lock, t)) {
00259                     pushMessage(m);
00260                     return true;
00261                 }
00262 
00263                 return false;
00264             }
00265 
00269             void
00270             receive (MessagePtr& m, unsigned int& prio) {
00271                 boost::mutex::scoped_lock lock(mtx);
00272                 waitOnEmpty(lock);
00273                 popMessage(m);
00274             }
00275 
00280             bool
00281             try_receive (MessagePtr& m, unsigned int prio) {
00282                 boost::mutex::scoped_lock lock(mtx);
00283 
00284                 if (!q.empty()) {
00285                     popMessage(m);
00286                     notifyProducer();
00287                     return true;
00288                 }
00289 
00290                 return false;
00291             }
00292 
00298             bool
00299             timed_receive (MessagePtr& m, unsigned int& prio, const boost::posix_time::time_duration& t) {
00300                 boost::mutex::scoped_lock lock(mtx);
00301 
00302                 if (waitOnTimedEmpty(lock, t)) {
00303                     popMessage(m);
00304                     return true;
00305                 }
00306 
00307                 return false;
00308             }
00309 
00310     };
00311 
00312 }                                // namespace dlvhex
00313 #endif                           // _CONCURRENT_MESSAGE_QUEUE_H
00314 
00315 
00316 // vim:expandtab:ts=4:sw=4:
00317 // mode: C++
00318 // End: