/* * WorkerPool.hpp * * Created on: 22.02.2012 * Author: heber */ #ifndef WORKERPOOL_HPP_ #define WORKERPOOL_HPP_ // include config.h #ifdef HAVE_CONFIG_H #include #endif #include #include #include #include "CodePatterns/Observer/Observable.hpp" #include "WorkerAddress.hpp" class FragmentQueue; class FragmentScheduler; class WorkerPoolTest; /** Class WorkerPool contains a number of PoolWorkers that connect to it * and wait for jobs to be sent to them. The class manages this pool of * workers. * */ class WorkerPool : public Observable { //!> grant unit test access friend class WorkerPoolTest; public: WorkerPool(); ~WorkerPool(); /** Channels for this observable. * * \note Be especially cautious of cyclic updates here as the updates are * used by \ref FragmentScheduler to send new jobs to idle workers! * If e.g. WorkerRemoved is added, then the marking of the one idle * worker as now busy inside the callback will cause a cycle in the * update, as the notification for WorkerIdle/Added is not yet * removed because we are still inside the callback! */ enum NotificationType { WorkerIdle, WorkerAdded, NotificationType_MAX // denotes the maximum of available notification types }; bool addWorker(const WorkerAddress &address); bool presentInPool(const WorkerAddress &address) const; bool presentIdleWorkers() const { return !idle_queue.empty(); } WorkerAddress getNextIdleWorker(); bool isWorkerBusy(const WorkerAddress &address) const; bool removeWorker(const WorkerAddress& address); void unmarkWorkerBusy(const WorkerAddress &address); // this is currently for the passing time until Worker pool is fully operable //!> typedef of the priority in the idle queue of a worker typedef size_t priority_t; //!> typedef for the worker queue being a map with priority and address of worker typedef std::multimap Idle_Queue_t; // constant iterators on idle queue contents Idle_Queue_t::const_iterator begin_idle() const { return idle_queue.begin(); } Idle_Queue_t::const_iterator end_idle() const { return idle_queue.end(); } Idle_Queue_t::iterator getIdleWorker(const WorkerAddress &address); void markWorkerBusy(Idle_Queue_t::iterator &iter); bool hasBusyWorkers() const { return (busy_queue.size() != 0); } private: //!> typedef for the worker queue being a map with priority and address of worker typedef std::map Busy_Queue_t; //!> typedef for the pool of workers being a set to keep only unique addresses typedef std::set Pool_t; private: //!> FragmentScheduler needs access to removeAllWorkers() friend class FragmentScheduler; void removeAllWorkers(); private: //!> static for defining default priority for new workers static priority_t default_priority; //!> empty address in case queue is idle static WorkerAddress emptyAddress; //!> pool of all worker addresses to connect to and work on jobs Pool_t pool; //!> pool of worker addresses to connect to and work on jobs Idle_Queue_t idle_queue; //!> pool of worker addresses to connect to and work on jobs mutable Busy_Queue_t busy_queue; }; #endif /* WORKERPOOL_HPP_ */