|
@@ -1,453 +0,0 @@
|
|
|
-/*! \file
|
|
|
|
|
-* \brief Thread pool core.
|
|
|
|
|
-*
|
|
|
|
|
-* This file contains the threadpool's core class: pool<Task, SchedulingPolicy>.
|
|
|
|
|
-*
|
|
|
|
|
-* Thread pools are a mechanism for asynchronous and parallel processing
|
|
|
|
|
-* within the same process. The pool class provides a convenient way
|
|
|
|
|
-* for dispatching asynchronous tasks as functions objects. The scheduling
|
|
|
|
|
-* of these tasks can be easily controlled by using customized schedulers.
|
|
|
|
|
-*
|
|
|
|
|
-* Copyright (c) 2005-2007 Philipp Henkel
|
|
|
|
|
-*
|
|
|
|
|
-* Use, modification, and distribution are subject to the
|
|
|
|
|
-* Boost Software License, Version 1.0. (See accompanying file
|
|
|
|
|
-* LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
|
|
|
|
|
-*
|
|
|
|
|
-* http://threadpool.sourceforge.net
|
|
|
|
|
-*
|
|
|
|
|
-*/
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-#ifndef THREADPOOL_POOL_CORE_HPP_INCLUDED
|
|
|
|
|
-#define THREADPOOL_POOL_CORE_HPP_INCLUDED
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-#include "locking_ptr.hpp"
|
|
|
|
|
-#include "worker_thread.hpp"
|
|
|
|
|
-
|
|
|
|
|
-#include "../task_adaptors.hpp"
|
|
|
|
|
-
|
|
|
|
|
-#include <boost/thread.hpp>
|
|
|
|
|
-#include <boost/thread/exceptions.hpp>
|
|
|
|
|
-#include <boost/thread/mutex.hpp>
|
|
|
|
|
-#include <boost/thread/condition.hpp>
|
|
|
|
|
-#include <boost/smart_ptr.hpp>
|
|
|
|
|
-#include <boost/bind.hpp>
|
|
|
|
|
-#include <boost/static_assert.hpp>
|
|
|
|
|
-#include <boost/type_traits.hpp>
|
|
|
|
|
-
|
|
|
|
|
-#include <vector>
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-/// The namespace threadpool contains a thread pool and related utility classes.
|
|
|
|
|
-namespace boost { namespace threadpool { namespace detail
|
|
|
|
|
-{
|
|
|
|
|
-
|
|
|
|
|
- /*! \brief Thread pool.
|
|
|
|
|
- *
|
|
|
|
|
- * Thread pools are a mechanism for asynchronous and parallel processing
|
|
|
|
|
- * within the same process. The pool class provides a convenient way
|
|
|
|
|
- * for dispatching asynchronous tasks as functions objects. The scheduling
|
|
|
|
|
- * of these tasks can be easily controlled by using customized schedulers.
|
|
|
|
|
- * A task must not throw an exception.
|
|
|
|
|
- *
|
|
|
|
|
- * A pool_impl is DefaultConstructible and NonCopyable.
|
|
|
|
|
- *
|
|
|
|
|
- * \param Task A function object which implements the operator 'void operator() (void) const'. The operator () is called by the pool to execute the task. Exceptions are ignored.
|
|
|
|
|
- * \param Scheduler A task container which determines how tasks are scheduled. It is guaranteed that this container is accessed only by one thread at a time. The scheduler shall not throw exceptions.
|
|
|
|
|
- *
|
|
|
|
|
- * \remarks The pool class is thread-safe.
|
|
|
|
|
- *
|
|
|
|
|
- * \see Tasks: task_func, prio_task_func
|
|
|
|
|
- * \see Scheduling policies: fifo_scheduler, lifo_scheduler, prio_scheduler
|
|
|
|
|
- */
|
|
|
|
|
- template <
|
|
|
|
|
- typename Task,
|
|
|
|
|
-
|
|
|
|
|
- template <typename> class SchedulingPolicy,
|
|
|
|
|
- template <typename> class SizePolicy,
|
|
|
|
|
- template <typename> class SizePolicyController,
|
|
|
|
|
- template <typename> class ShutdownPolicy
|
|
|
|
|
- >
|
|
|
|
|
- class pool_core
|
|
|
|
|
- : public enable_shared_from_this< pool_core<Task, SchedulingPolicy, SizePolicy, SizePolicyController, ShutdownPolicy > >
|
|
|
|
|
- , private noncopyable
|
|
|
|
|
- {
|
|
|
|
|
-
|
|
|
|
|
- public: // Type definitions
|
|
|
|
|
- typedef Task task_type; //!< Indicates the task's type.
|
|
|
|
|
- typedef SchedulingPolicy<task_type> scheduler_type; //!< Indicates the scheduler's type.
|
|
|
|
|
- typedef pool_core<Task,
|
|
|
|
|
- SchedulingPolicy,
|
|
|
|
|
- SizePolicy,
|
|
|
|
|
- SizePolicyController,
|
|
|
|
|
- ShutdownPolicy > pool_type; //!< Indicates the thread pool's type.
|
|
|
|
|
- typedef SizePolicy<pool_type> size_policy_type; //!< Indicates the sizer's type.
|
|
|
|
|
- //typedef typename size_policy_type::size_controller size_controller_type;
|
|
|
|
|
-
|
|
|
|
|
- typedef SizePolicyController<pool_type> size_controller_type;
|
|
|
|
|
-
|
|
|
|
|
-// typedef SizePolicy<pool_type>::size_controller size_controller_type;
|
|
|
|
|
- typedef ShutdownPolicy<pool_type> shutdown_policy_type;//!< Indicates the shutdown policy's type.
|
|
|
|
|
-
|
|
|
|
|
- typedef worker_thread<pool_type> worker_type;
|
|
|
|
|
-
|
|
|
|
|
- // The task is required to be a nullary function.
|
|
|
|
|
- BOOST_STATIC_ASSERT(function_traits<task_type()>::arity == 0);
|
|
|
|
|
-
|
|
|
|
|
- // The task function's result type is required to be void.
|
|
|
|
|
- BOOST_STATIC_ASSERT(is_void<typename result_of<task_type()>::type >::value);
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
- private: // Friends
|
|
|
|
|
- friend class worker_thread<pool_type>;
|
|
|
|
|
-
|
|
|
|
|
-#if defined(__SUNPRO_CC) && (__SUNPRO_CC <= 0x580) // Tested with CC: Sun C++ 5.8 Patch 121018-08 2006/12/06
|
|
|
|
|
- friend class SizePolicy;
|
|
|
|
|
- friend class ShutdownPolicy;
|
|
|
|
|
-#else
|
|
|
|
|
- friend class SizePolicy<pool_type>;
|
|
|
|
|
- friend class ShutdownPolicy<pool_type>;
|
|
|
|
|
-#endif
|
|
|
|
|
-
|
|
|
|
|
- private: // The following members may be accessed by _multiple_ threads at the same time:
|
|
|
|
|
- volatile size_t m_worker_count;
|
|
|
|
|
- volatile size_t m_target_worker_count;
|
|
|
|
|
- volatile size_t m_active_worker_count;
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
- private: // The following members are accessed only by _one_ thread at the same time:
|
|
|
|
|
- scheduler_type m_scheduler;
|
|
|
|
|
- scoped_ptr<size_policy_type> m_size_policy; // is never null
|
|
|
|
|
-
|
|
|
|
|
- bool m_terminate_all_workers; // Indicates if termination of all workers was triggered.
|
|
|
|
|
- std::vector<shared_ptr<worker_type> > m_terminated_workers; // List of workers which are terminated but not fully destructed.
|
|
|
|
|
-
|
|
|
|
|
- private: // The following members are implemented thread-safe:
|
|
|
|
|
- mutable recursive_mutex m_monitor;
|
|
|
|
|
- mutable condition m_worker_idle_or_terminated_event; // A worker is idle or was terminated.
|
|
|
|
|
- mutable condition m_task_or_terminate_workers_event; // Task is available OR total worker count should be reduced.
|
|
|
|
|
-
|
|
|
|
|
- public:
|
|
|
|
|
- /// Constructor.
|
|
|
|
|
- pool_core()
|
|
|
|
|
- : m_worker_count(0)
|
|
|
|
|
- , m_target_worker_count(0)
|
|
|
|
|
- , m_active_worker_count(0)
|
|
|
|
|
- , m_terminate_all_workers(false)
|
|
|
|
|
- {
|
|
|
|
|
- pool_type volatile & self_ref = *this;
|
|
|
|
|
- m_size_policy.reset(new size_policy_type(self_ref));
|
|
|
|
|
-
|
|
|
|
|
- m_scheduler.clear();
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
- /// Destructor.
|
|
|
|
|
- ~pool_core()
|
|
|
|
|
- {
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- /*! Gets the size controller which manages the number of threads in the pool.
|
|
|
|
|
- * \return The size controller.
|
|
|
|
|
- * \see SizePolicy
|
|
|
|
|
- */
|
|
|
|
|
- size_controller_type size_controller()
|
|
|
|
|
- {
|
|
|
|
|
- return size_controller_type(*m_size_policy, this->shared_from_this());
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- /*! Gets the number of threads in the pool.
|
|
|
|
|
- * \return The number of threads.
|
|
|
|
|
- */
|
|
|
|
|
- size_t size() const volatile
|
|
|
|
|
- {
|
|
|
|
|
- return m_worker_count;
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
-// TODO is only called once
|
|
|
|
|
- void shutdown()
|
|
|
|
|
- {
|
|
|
|
|
- ShutdownPolicy<pool_type>::shutdown(*this);
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- /*! Schedules a task for asynchronous execution. The task will be executed once only.
|
|
|
|
|
- * \param task The task function object. It should not throw execeptions.
|
|
|
|
|
- * \return true, if the task could be scheduled and false otherwise.
|
|
|
|
|
- */
|
|
|
|
|
- bool schedule(task_type const & task) volatile
|
|
|
|
|
- {
|
|
|
|
|
- locking_ptr<pool_type, recursive_mutex> lockedThis(*this, m_monitor);
|
|
|
|
|
-
|
|
|
|
|
- if(lockedThis->m_scheduler.push(task))
|
|
|
|
|
- {
|
|
|
|
|
- lockedThis->m_task_or_terminate_workers_event.notify_one();
|
|
|
|
|
- return true;
|
|
|
|
|
- }
|
|
|
|
|
- else
|
|
|
|
|
- {
|
|
|
|
|
- return false;
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
- /*! Returns the number of tasks which are currently executed.
|
|
|
|
|
- * \return The number of active tasks.
|
|
|
|
|
- */
|
|
|
|
|
- size_t active() const volatile
|
|
|
|
|
- {
|
|
|
|
|
- return m_active_worker_count;
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
- /*! Returns the number of tasks which are ready for execution.
|
|
|
|
|
- * \return The number of pending tasks.
|
|
|
|
|
- */
|
|
|
|
|
- size_t pending() const volatile
|
|
|
|
|
- {
|
|
|
|
|
- locking_ptr<const pool_type, recursive_mutex> lockedThis(*this, m_monitor);
|
|
|
|
|
- return lockedThis->m_scheduler.size();
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
- /*! Removes all pending tasks from the pool's scheduler.
|
|
|
|
|
- */
|
|
|
|
|
- void clear() volatile
|
|
|
|
|
- {
|
|
|
|
|
- locking_ptr<pool_type, recursive_mutex> lockedThis(*this, m_monitor);
|
|
|
|
|
- lockedThis->m_scheduler.clear();
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
- /*! Indicates that there are no tasks pending.
|
|
|
|
|
- * \return true if there are no tasks ready for execution.
|
|
|
|
|
- * \remarks This function is more efficient that the check 'pending() == 0'.
|
|
|
|
|
- */
|
|
|
|
|
- bool empty() const volatile
|
|
|
|
|
- {
|
|
|
|
|
- locking_ptr<const pool_type, recursive_mutex> lockedThis(*this, m_monitor);
|
|
|
|
|
- return lockedThis->m_scheduler.empty();
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
- /*! The current thread of execution is blocked until the sum of all active
|
|
|
|
|
- * and pending tasks is equal or less than a given threshold.
|
|
|
|
|
- * \param task_threshold The maximum number of tasks in pool and scheduler.
|
|
|
|
|
- */
|
|
|
|
|
- void wait(size_t const task_threshold = 0) const volatile
|
|
|
|
|
- {
|
|
|
|
|
- const pool_type* self = const_cast<const pool_type*>(this);
|
|
|
|
|
- recursive_mutex::scoped_lock lock(self->m_monitor);
|
|
|
|
|
-
|
|
|
|
|
- if(0 == task_threshold)
|
|
|
|
|
- {
|
|
|
|
|
- while(0 != self->m_active_worker_count || !self->m_scheduler.empty())
|
|
|
|
|
- {
|
|
|
|
|
- self->m_worker_idle_or_terminated_event.wait(lock);
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
- else
|
|
|
|
|
- {
|
|
|
|
|
- while(task_threshold < self->m_active_worker_count + self->m_scheduler.size())
|
|
|
|
|
- {
|
|
|
|
|
- self->m_worker_idle_or_terminated_event.wait(lock);
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- /*! The current thread of execution is blocked until the timestamp is met
|
|
|
|
|
- * or the sum of all active and pending tasks is equal or less
|
|
|
|
|
- * than a given threshold.
|
|
|
|
|
- * \param timestamp The time when function returns at the latest.
|
|
|
|
|
- * \param task_threshold The maximum number of tasks in pool and scheduler.
|
|
|
|
|
- * \return true if the task sum is equal or less than the threshold, false otherwise.
|
|
|
|
|
- */
|
|
|
|
|
- bool wait(xtime const & timestamp, size_t const task_threshold = 0) const volatile
|
|
|
|
|
- {
|
|
|
|
|
- const pool_type* self = const_cast<const pool_type*>(this);
|
|
|
|
|
- recursive_mutex::scoped_lock lock(self->m_monitor);
|
|
|
|
|
-
|
|
|
|
|
- if(0 == task_threshold)
|
|
|
|
|
- {
|
|
|
|
|
- while(0 != self->m_active_worker_count || !self->m_scheduler.empty())
|
|
|
|
|
- {
|
|
|
|
|
- if(!self->m_worker_idle_or_terminated_event.timed_wait(lock, timestamp)) return false;
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
- else
|
|
|
|
|
- {
|
|
|
|
|
- while(task_threshold < self->m_active_worker_count + self->m_scheduler.size())
|
|
|
|
|
- {
|
|
|
|
|
- if(!self->m_worker_idle_or_terminated_event.timed_wait(lock, timestamp)) return false;
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- return true;
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
- private:
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
- void terminate_all_workers(bool const wait) volatile
|
|
|
|
|
- {
|
|
|
|
|
- pool_type* self = const_cast<pool_type*>(this);
|
|
|
|
|
- recursive_mutex::scoped_lock lock(self->m_monitor);
|
|
|
|
|
-
|
|
|
|
|
- self->m_terminate_all_workers = true;
|
|
|
|
|
-
|
|
|
|
|
- m_target_worker_count = 0;
|
|
|
|
|
- self->m_task_or_terminate_workers_event.notify_all();
|
|
|
|
|
-
|
|
|
|
|
- if(wait)
|
|
|
|
|
- {
|
|
|
|
|
- while(m_active_worker_count > 0)
|
|
|
|
|
- {
|
|
|
|
|
- self->m_worker_idle_or_terminated_event.wait(lock);
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- for(typename std::vector<shared_ptr<worker_type> >::iterator it = self->m_terminated_workers.begin();
|
|
|
|
|
- it != self->m_terminated_workers.end();
|
|
|
|
|
- ++it)
|
|
|
|
|
- {
|
|
|
|
|
- (*it)->join();
|
|
|
|
|
- }
|
|
|
|
|
- self->m_terminated_workers.clear();
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
- /*! Changes the number of worker threads in the pool. The resizing
|
|
|
|
|
- * is handled by the SizePolicy.
|
|
|
|
|
- * \param threads The new number of worker threads.
|
|
|
|
|
- * \return true, if pool will be resized and false if not.
|
|
|
|
|
- */
|
|
|
|
|
- bool resize(size_t const worker_count) volatile
|
|
|
|
|
- {
|
|
|
|
|
- locking_ptr<pool_type, recursive_mutex> lockedThis(*this, m_monitor);
|
|
|
|
|
-
|
|
|
|
|
- if(!m_terminate_all_workers)
|
|
|
|
|
- {
|
|
|
|
|
- m_target_worker_count = worker_count;
|
|
|
|
|
- }
|
|
|
|
|
- else
|
|
|
|
|
- {
|
|
|
|
|
- return false;
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
- if(m_worker_count <= m_target_worker_count)
|
|
|
|
|
- { // increase worker count
|
|
|
|
|
- while(m_worker_count < m_target_worker_count)
|
|
|
|
|
- {
|
|
|
|
|
- try
|
|
|
|
|
- {
|
|
|
|
|
- worker_thread<pool_type>::create_and_attach(lockedThis->shared_from_this());
|
|
|
|
|
- m_worker_count++;
|
|
|
|
|
- m_active_worker_count++;
|
|
|
|
|
- }
|
|
|
|
|
- catch(thread_resource_error)
|
|
|
|
|
- {
|
|
|
|
|
- return false;
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
- else
|
|
|
|
|
- { // decrease worker count
|
|
|
|
|
- lockedThis->m_task_or_terminate_workers_event.notify_all(); // TODO: Optimize number of notified workers
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- return true;
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
- // worker died with unhandled exception
|
|
|
|
|
- void worker_died_unexpectedly(shared_ptr<worker_type> worker) volatile
|
|
|
|
|
- {
|
|
|
|
|
- locking_ptr<pool_type, recursive_mutex> lockedThis(*this, m_monitor);
|
|
|
|
|
-
|
|
|
|
|
- m_worker_count--;
|
|
|
|
|
- m_active_worker_count--;
|
|
|
|
|
- lockedThis->m_worker_idle_or_terminated_event.notify_all();
|
|
|
|
|
-
|
|
|
|
|
- if(m_terminate_all_workers)
|
|
|
|
|
- {
|
|
|
|
|
- lockedThis->m_terminated_workers.push_back(worker);
|
|
|
|
|
- }
|
|
|
|
|
- else
|
|
|
|
|
- {
|
|
|
|
|
- lockedThis->m_size_policy->worker_died_unexpectedly(m_worker_count);
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- void worker_destructed(shared_ptr<worker_type> worker) volatile
|
|
|
|
|
- {
|
|
|
|
|
- locking_ptr<pool_type, recursive_mutex> lockedThis(*this, m_monitor);
|
|
|
|
|
- m_worker_count--;
|
|
|
|
|
- m_active_worker_count--;
|
|
|
|
|
- lockedThis->m_worker_idle_or_terminated_event.notify_all();
|
|
|
|
|
-
|
|
|
|
|
- if(m_terminate_all_workers)
|
|
|
|
|
- {
|
|
|
|
|
- lockedThis->m_terminated_workers.push_back(worker);
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
- bool execute_task() volatile
|
|
|
|
|
- {
|
|
|
|
|
- function0<void> task;
|
|
|
|
|
-
|
|
|
|
|
- { // fetch task
|
|
|
|
|
- pool_type* lockedThis = const_cast<pool_type*>(this);
|
|
|
|
|
- recursive_mutex::scoped_lock lock(lockedThis->m_monitor);
|
|
|
|
|
-
|
|
|
|
|
- // decrease number of threads if necessary
|
|
|
|
|
- if(m_worker_count > m_target_worker_count)
|
|
|
|
|
- {
|
|
|
|
|
- return false; // terminate worker
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
- // wait for tasks
|
|
|
|
|
- while(lockedThis->m_scheduler.empty())
|
|
|
|
|
- {
|
|
|
|
|
- // decrease number of workers if necessary
|
|
|
|
|
- if(m_worker_count > m_target_worker_count)
|
|
|
|
|
- {
|
|
|
|
|
- return false; // terminate worker
|
|
|
|
|
- }
|
|
|
|
|
- else
|
|
|
|
|
- {
|
|
|
|
|
- m_active_worker_count--;
|
|
|
|
|
- lockedThis->m_worker_idle_or_terminated_event.notify_all();
|
|
|
|
|
- lockedThis->m_task_or_terminate_workers_event.wait(lock);
|
|
|
|
|
- m_active_worker_count++;
|
|
|
|
|
- }
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- task = lockedThis->m_scheduler.top();
|
|
|
|
|
- lockedThis->m_scheduler.pop();
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- // call task function
|
|
|
|
|
- if(task)
|
|
|
|
|
- {
|
|
|
|
|
- task();
|
|
|
|
|
- }
|
|
|
|
|
-
|
|
|
|
|
- //guard->disable();
|
|
|
|
|
- return true;
|
|
|
|
|
- }
|
|
|
|
|
- };
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
-} } } // namespace boost::threadpool::detail
|
|
|
|
|
-
|
|
|
|
|
-#endif // THREADPOOL_POOL_CORE_HPP_INCLUDED
|
|
|