|
|
@@ -0,0 +1,453 @@
|
|
|
+/*! \file
|
|
|
+* \brief Thread pool core.
|
|
|
+*
|
|
|
+* This file contains the threadpool's core class: pool<Task, SchedulingPolicy>.
|
|
|
+*
|
|
|
+* Thread pools are a mechanism for asynchronous and parallel processing
|
|
|
+* within the same process. The pool class provides a convenient way
|
|
|
+* for dispatching asynchronous tasks as functions objects. The scheduling
|
|
|
+* of these tasks can be easily controlled by using customized schedulers.
|
|
|
+*
|
|
|
+* Copyright (c) 2005-2007 Philipp Henkel
|
|
|
+*
|
|
|
+* Use, modification, and distribution are subject to the
|
|
|
+* Boost Software License, Version 1.0. (See accompanying file
|
|
|
+* LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
|
|
|
+*
|
|
|
+* http://threadpool.sourceforge.net
|
|
|
+*
|
|
|
+*/
|
|
|
+
|
|
|
+
|
|
|
+#ifndef THREADPOOL_POOL_CORE_HPP_INCLUDED
|
|
|
+#define THREADPOOL_POOL_CORE_HPP_INCLUDED
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+#include "locking_ptr.hpp"
|
|
|
+#include "worker_thread.hpp"
|
|
|
+
|
|
|
+#include "../task_adaptors.hpp"
|
|
|
+
|
|
|
+#include <boost/thread.hpp>
|
|
|
+#include <boost/thread/exceptions.hpp>
|
|
|
+#include <boost/thread/mutex.hpp>
|
|
|
+#include <boost/thread/condition.hpp>
|
|
|
+#include <boost/smart_ptr.hpp>
|
|
|
+#include <boost/bind.hpp>
|
|
|
+#include <boost/static_assert.hpp>
|
|
|
+#include <boost/type_traits.hpp>
|
|
|
+
|
|
|
+#include <vector>
|
|
|
+
|
|
|
+
|
|
|
+/// The namespace threadpool contains a thread pool and related utility classes.
|
|
|
+namespace boost { namespace threadpool { namespace detail
|
|
|
+{
|
|
|
+
|
|
|
+ /*! \brief Thread pool.
|
|
|
+ *
|
|
|
+ * Thread pools are a mechanism for asynchronous and parallel processing
|
|
|
+ * within the same process. The pool class provides a convenient way
|
|
|
+ * for dispatching asynchronous tasks as functions objects. The scheduling
|
|
|
+ * of these tasks can be easily controlled by using customized schedulers.
|
|
|
+ * A task must not throw an exception.
|
|
|
+ *
|
|
|
+ * A pool_impl is DefaultConstructible and NonCopyable.
|
|
|
+ *
|
|
|
+ * \param Task A function object which implements the operator 'void operator() (void) const'. The operator () is called by the pool to execute the task. Exceptions are ignored.
|
|
|
+ * \param Scheduler A task container which determines how tasks are scheduled. It is guaranteed that this container is accessed only by one thread at a time. The scheduler shall not throw exceptions.
|
|
|
+ *
|
|
|
+ * \remarks The pool class is thread-safe.
|
|
|
+ *
|
|
|
+ * \see Tasks: task_func, prio_task_func
|
|
|
+ * \see Scheduling policies: fifo_scheduler, lifo_scheduler, prio_scheduler
|
|
|
+ */
|
|
|
+ template <
|
|
|
+ typename Task,
|
|
|
+
|
|
|
+ template <typename> class SchedulingPolicy,
|
|
|
+ template <typename> class SizePolicy,
|
|
|
+ template <typename> class SizePolicyController,
|
|
|
+ template <typename> class ShutdownPolicy
|
|
|
+ >
|
|
|
+ class pool_core
|
|
|
+ : public enable_shared_from_this< pool_core<Task, SchedulingPolicy, SizePolicy, SizePolicyController, ShutdownPolicy > >
|
|
|
+ , private noncopyable
|
|
|
+ {
|
|
|
+
|
|
|
+ public: // Type definitions
|
|
|
+ typedef Task task_type; //!< Indicates the task's type.
|
|
|
+ typedef SchedulingPolicy<task_type> scheduler_type; //!< Indicates the scheduler's type.
|
|
|
+ typedef pool_core<Task,
|
|
|
+ SchedulingPolicy,
|
|
|
+ SizePolicy,
|
|
|
+ SizePolicyController,
|
|
|
+ ShutdownPolicy > pool_type; //!< Indicates the thread pool's type.
|
|
|
+ typedef SizePolicy<pool_type> size_policy_type; //!< Indicates the sizer's type.
|
|
|
+ //typedef typename size_policy_type::size_controller size_controller_type;
|
|
|
+
|
|
|
+ typedef SizePolicyController<pool_type> size_controller_type;
|
|
|
+
|
|
|
+// typedef SizePolicy<pool_type>::size_controller size_controller_type;
|
|
|
+ typedef ShutdownPolicy<pool_type> shutdown_policy_type;//!< Indicates the shutdown policy's type.
|
|
|
+
|
|
|
+ typedef worker_thread<pool_type> worker_type;
|
|
|
+
|
|
|
+ // The task is required to be a nullary function.
|
|
|
+ BOOST_STATIC_ASSERT(function_traits<task_type()>::arity == 0);
|
|
|
+
|
|
|
+ // The task function's result type is required to be void.
|
|
|
+ BOOST_STATIC_ASSERT(is_void<typename result_of<task_type()>::type >::value);
|
|
|
+
|
|
|
+
|
|
|
+ private: // Friends
|
|
|
+ friend class worker_thread<pool_type>;
|
|
|
+
|
|
|
+#if defined(__SUNPRO_CC) && (__SUNPRO_CC <= 0x580) // Tested with CC: Sun C++ 5.8 Patch 121018-08 2006/12/06
|
|
|
+ friend class SizePolicy;
|
|
|
+ friend class ShutdownPolicy;
|
|
|
+#else
|
|
|
+ friend class SizePolicy<pool_type>;
|
|
|
+ friend class ShutdownPolicy<pool_type>;
|
|
|
+#endif
|
|
|
+
|
|
|
+ private: // The following members may be accessed by _multiple_ threads at the same time:
|
|
|
+ volatile size_t m_worker_count;
|
|
|
+ volatile size_t m_target_worker_count;
|
|
|
+ volatile size_t m_active_worker_count;
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+ private: // The following members are accessed only by _one_ thread at the same time:
|
|
|
+ scheduler_type m_scheduler;
|
|
|
+ scoped_ptr<size_policy_type> m_size_policy; // is never null
|
|
|
+
|
|
|
+ bool m_terminate_all_workers; // Indicates if termination of all workers was triggered.
|
|
|
+ std::vector<shared_ptr<worker_type> > m_terminated_workers; // List of workers which are terminated but not fully destructed.
|
|
|
+
|
|
|
+ private: // The following members are implemented thread-safe:
|
|
|
+ mutable recursive_mutex m_monitor;
|
|
|
+ mutable condition m_worker_idle_or_terminated_event; // A worker is idle or was terminated.
|
|
|
+ mutable condition m_task_or_terminate_workers_event; // Task is available OR total worker count should be reduced.
|
|
|
+
|
|
|
+ public:
|
|
|
+ /// Constructor.
|
|
|
+ pool_core()
|
|
|
+ : m_worker_count(0)
|
|
|
+ , m_target_worker_count(0)
|
|
|
+ , m_active_worker_count(0)
|
|
|
+ , m_terminate_all_workers(false)
|
|
|
+ {
|
|
|
+ pool_type volatile & self_ref = *this;
|
|
|
+ m_size_policy.reset(new size_policy_type(self_ref));
|
|
|
+
|
|
|
+ m_scheduler.clear();
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ /// Destructor.
|
|
|
+ ~pool_core()
|
|
|
+ {
|
|
|
+ }
|
|
|
+
|
|
|
+ /*! Gets the size controller which manages the number of threads in the pool.
|
|
|
+ * \return The size controller.
|
|
|
+ * \see SizePolicy
|
|
|
+ */
|
|
|
+ size_controller_type size_controller()
|
|
|
+ {
|
|
|
+ return size_controller_type(*m_size_policy, this->shared_from_this());
|
|
|
+ }
|
|
|
+
|
|
|
+ /*! Gets the number of threads in the pool.
|
|
|
+ * \return The number of threads.
|
|
|
+ */
|
|
|
+ size_t size() const volatile
|
|
|
+ {
|
|
|
+ return m_worker_count;
|
|
|
+ }
|
|
|
+
|
|
|
+// TODO is only called once
|
|
|
+ void shutdown()
|
|
|
+ {
|
|
|
+ ShutdownPolicy<pool_type>::shutdown(*this);
|
|
|
+ }
|
|
|
+
|
|
|
+ /*! Schedules a task for asynchronous execution. The task will be executed once only.
|
|
|
+ * \param task The task function object. It should not throw execeptions.
|
|
|
+ * \return true, if the task could be scheduled and false otherwise.
|
|
|
+ */
|
|
|
+ bool schedule(task_type const & task) volatile
|
|
|
+ {
|
|
|
+ locking_ptr<pool_type, recursive_mutex> lockedThis(*this, m_monitor);
|
|
|
+
|
|
|
+ if(lockedThis->m_scheduler.push(task))
|
|
|
+ {
|
|
|
+ lockedThis->m_task_or_terminate_workers_event.notify_one();
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ /*! Returns the number of tasks which are currently executed.
|
|
|
+ * \return The number of active tasks.
|
|
|
+ */
|
|
|
+ size_t active() const volatile
|
|
|
+ {
|
|
|
+ return m_active_worker_count;
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ /*! Returns the number of tasks which are ready for execution.
|
|
|
+ * \return The number of pending tasks.
|
|
|
+ */
|
|
|
+ size_t pending() const volatile
|
|
|
+ {
|
|
|
+ locking_ptr<const pool_type, recursive_mutex> lockedThis(*this, m_monitor);
|
|
|
+ return lockedThis->m_scheduler.size();
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ /*! Removes all pending tasks from the pool's scheduler.
|
|
|
+ */
|
|
|
+ void clear() volatile
|
|
|
+ {
|
|
|
+ locking_ptr<pool_type, recursive_mutex> lockedThis(*this, m_monitor);
|
|
|
+ lockedThis->m_scheduler.clear();
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ /*! Indicates that there are no tasks pending.
|
|
|
+ * \return true if there are no tasks ready for execution.
|
|
|
+ * \remarks This function is more efficient that the check 'pending() == 0'.
|
|
|
+ */
|
|
|
+ bool empty() const volatile
|
|
|
+ {
|
|
|
+ locking_ptr<const pool_type, recursive_mutex> lockedThis(*this, m_monitor);
|
|
|
+ return lockedThis->m_scheduler.empty();
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ /*! The current thread of execution is blocked until the sum of all active
|
|
|
+ * and pending tasks is equal or less than a given threshold.
|
|
|
+ * \param task_threshold The maximum number of tasks in pool and scheduler.
|
|
|
+ */
|
|
|
+ void wait(size_t const task_threshold = 0) const volatile
|
|
|
+ {
|
|
|
+ const pool_type* self = const_cast<const pool_type*>(this);
|
|
|
+ recursive_mutex::scoped_lock lock(self->m_monitor);
|
|
|
+
|
|
|
+ if(0 == task_threshold)
|
|
|
+ {
|
|
|
+ while(0 != self->m_active_worker_count || !self->m_scheduler.empty())
|
|
|
+ {
|
|
|
+ self->m_worker_idle_or_terminated_event.wait(lock);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ while(task_threshold < self->m_active_worker_count + self->m_scheduler.size())
|
|
|
+ {
|
|
|
+ self->m_worker_idle_or_terminated_event.wait(lock);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ /*! The current thread of execution is blocked until the timestamp is met
|
|
|
+ * or the sum of all active and pending tasks is equal or less
|
|
|
+ * than a given threshold.
|
|
|
+ * \param timestamp The time when function returns at the latest.
|
|
|
+ * \param task_threshold The maximum number of tasks in pool and scheduler.
|
|
|
+ * \return true if the task sum is equal or less than the threshold, false otherwise.
|
|
|
+ */
|
|
|
+ bool wait(xtime const & timestamp, size_t const task_threshold = 0) const volatile
|
|
|
+ {
|
|
|
+ const pool_type* self = const_cast<const pool_type*>(this);
|
|
|
+ recursive_mutex::scoped_lock lock(self->m_monitor);
|
|
|
+
|
|
|
+ if(0 == task_threshold)
|
|
|
+ {
|
|
|
+ while(0 != self->m_active_worker_count || !self->m_scheduler.empty())
|
|
|
+ {
|
|
|
+ if(!self->m_worker_idle_or_terminated_event.timed_wait(lock, timestamp)) return false;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ while(task_threshold < self->m_active_worker_count + self->m_scheduler.size())
|
|
|
+ {
|
|
|
+ if(!self->m_worker_idle_or_terminated_event.timed_wait(lock, timestamp)) return false;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ private:
|
|
|
+
|
|
|
+
|
|
|
+ void terminate_all_workers(bool const wait) volatile
|
|
|
+ {
|
|
|
+ pool_type* self = const_cast<pool_type*>(this);
|
|
|
+ recursive_mutex::scoped_lock lock(self->m_monitor);
|
|
|
+
|
|
|
+ self->m_terminate_all_workers = true;
|
|
|
+
|
|
|
+ m_target_worker_count = 0;
|
|
|
+ self->m_task_or_terminate_workers_event.notify_all();
|
|
|
+
|
|
|
+ if(wait)
|
|
|
+ {
|
|
|
+ while(m_active_worker_count > 0)
|
|
|
+ {
|
|
|
+ self->m_worker_idle_or_terminated_event.wait(lock);
|
|
|
+ }
|
|
|
+
|
|
|
+ for(typename std::vector<shared_ptr<worker_type> >::iterator it = self->m_terminated_workers.begin();
|
|
|
+ it != self->m_terminated_workers.end();
|
|
|
+ ++it)
|
|
|
+ {
|
|
|
+ (*it)->join();
|
|
|
+ }
|
|
|
+ self->m_terminated_workers.clear();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ /*! Changes the number of worker threads in the pool. The resizing
|
|
|
+ * is handled by the SizePolicy.
|
|
|
+ * \param threads The new number of worker threads.
|
|
|
+ * \return true, if pool will be resized and false if not.
|
|
|
+ */
|
|
|
+ bool resize(size_t const worker_count) volatile
|
|
|
+ {
|
|
|
+ locking_ptr<pool_type, recursive_mutex> lockedThis(*this, m_monitor);
|
|
|
+
|
|
|
+ if(!m_terminate_all_workers)
|
|
|
+ {
|
|
|
+ m_target_worker_count = worker_count;
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ if(m_worker_count <= m_target_worker_count)
|
|
|
+ { // increase worker count
|
|
|
+ while(m_worker_count < m_target_worker_count)
|
|
|
+ {
|
|
|
+ try
|
|
|
+ {
|
|
|
+ worker_thread<pool_type>::create_and_attach(lockedThis->shared_from_this());
|
|
|
+ m_worker_count++;
|
|
|
+ m_active_worker_count++;
|
|
|
+ }
|
|
|
+ catch(thread_resource_error)
|
|
|
+ {
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ else
|
|
|
+ { // decrease worker count
|
|
|
+ lockedThis->m_task_or_terminate_workers_event.notify_all(); // TODO: Optimize number of notified workers
|
|
|
+ }
|
|
|
+
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ // worker died with unhandled exception
|
|
|
+ void worker_died_unexpectedly(shared_ptr<worker_type> worker) volatile
|
|
|
+ {
|
|
|
+ locking_ptr<pool_type, recursive_mutex> lockedThis(*this, m_monitor);
|
|
|
+
|
|
|
+ m_worker_count--;
|
|
|
+ m_active_worker_count--;
|
|
|
+ lockedThis->m_worker_idle_or_terminated_event.notify_all();
|
|
|
+
|
|
|
+ if(m_terminate_all_workers)
|
|
|
+ {
|
|
|
+ lockedThis->m_terminated_workers.push_back(worker);
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ lockedThis->m_size_policy->worker_died_unexpectedly(m_worker_count);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ void worker_destructed(shared_ptr<worker_type> worker) volatile
|
|
|
+ {
|
|
|
+ locking_ptr<pool_type, recursive_mutex> lockedThis(*this, m_monitor);
|
|
|
+ m_worker_count--;
|
|
|
+ m_active_worker_count--;
|
|
|
+ lockedThis->m_worker_idle_or_terminated_event.notify_all();
|
|
|
+
|
|
|
+ if(m_terminate_all_workers)
|
|
|
+ {
|
|
|
+ lockedThis->m_terminated_workers.push_back(worker);
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ bool execute_task() volatile
|
|
|
+ {
|
|
|
+ function0<void> task;
|
|
|
+
|
|
|
+ { // fetch task
|
|
|
+ pool_type* lockedThis = const_cast<pool_type*>(this);
|
|
|
+ recursive_mutex::scoped_lock lock(lockedThis->m_monitor);
|
|
|
+
|
|
|
+ // decrease number of threads if necessary
|
|
|
+ if(m_worker_count > m_target_worker_count)
|
|
|
+ {
|
|
|
+ return false; // terminate worker
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ // wait for tasks
|
|
|
+ while(lockedThis->m_scheduler.empty())
|
|
|
+ {
|
|
|
+ // decrease number of workers if necessary
|
|
|
+ if(m_worker_count > m_target_worker_count)
|
|
|
+ {
|
|
|
+ return false; // terminate worker
|
|
|
+ }
|
|
|
+ else
|
|
|
+ {
|
|
|
+ m_active_worker_count--;
|
|
|
+ lockedThis->m_worker_idle_or_terminated_event.notify_all();
|
|
|
+ lockedThis->m_task_or_terminate_workers_event.wait(lock);
|
|
|
+ m_active_worker_count++;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ task = lockedThis->m_scheduler.top();
|
|
|
+ lockedThis->m_scheduler.pop();
|
|
|
+ }
|
|
|
+
|
|
|
+ // call task function
|
|
|
+ if(task)
|
|
|
+ {
|
|
|
+ task();
|
|
|
+ }
|
|
|
+
|
|
|
+ //guard->disable();
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+
|
|
|
+} } } // namespace boost::threadpool::detail
|
|
|
+
|
|
|
+#endif // THREADPOOL_POOL_CORE_HPP_INCLUDED
|