- Timestamp:
- Jun 27, 2012, 4:07:21 PM (13 years ago)
- Branches:
- Action_Thermostats, Add_AtomRandomPerturbation, Add_FitFragmentPartialChargesAction, Add_RotateAroundBondAction, Add_SelectAtomByNameAction, Added_ParseSaveFragmentResults, AddingActions_SaveParseParticleParameters, Adding_Graph_to_ChangeBondActions, Adding_MD_integration_tests, Adding_ParticleName_to_Atom, Adding_StructOpt_integration_tests, AtomFragments, Automaking_mpqc_open, AutomationFragmentation_failures, Candidate_v1.5.4, Candidate_v1.6.0, Candidate_v1.6.1, ChangeBugEmailaddress, ChangingTestPorts, ChemicalSpaceEvaluator, CombiningParticlePotentialParsing, Combining_Subpackages, Debian_Package_split, Debian_package_split_molecuildergui_only, Disabling_MemDebug, Docu_Python_wait, EmpiricalPotential_contain_HomologyGraph, EmpiricalPotential_contain_HomologyGraph_documentation, Enable_parallel_make_install, Enhance_userguide, Enhanced_StructuralOptimization, Enhanced_StructuralOptimization_continued, Example_ManyWaysToTranslateAtom, Exclude_Hydrogens_annealWithBondGraph, FitPartialCharges_GlobalError, Fix_BoundInBox_CenterInBox_MoleculeActions, Fix_ChargeSampling_PBC, Fix_ChronosMutex, Fix_FitPartialCharges, Fix_FitPotential_needs_atomicnumbers, Fix_ForceAnnealing, Fix_IndependentFragmentGrids, Fix_ParseParticles, Fix_ParseParticles_split_forward_backward_Actions, Fix_PopActions, Fix_QtFragmentList_sorted_selection, Fix_Restrictedkeyset_FragmentMolecule, Fix_StatusMsg, Fix_StepWorldTime_single_argument, Fix_Verbose_Codepatterns, Fix_fitting_potentials, Fixes, ForceAnnealing_goodresults, ForceAnnealing_oldresults, ForceAnnealing_tocheck, ForceAnnealing_with_BondGraph, ForceAnnealing_with_BondGraph_continued, ForceAnnealing_with_BondGraph_continued_betteresults, ForceAnnealing_with_BondGraph_contraction-expansion, FragmentAction_writes_AtomFragments, FragmentMolecule_checks_bonddegrees, GeometryObjects, Gui_Fixes, Gui_displays_atomic_force_velocity, ImplicitCharges, IndependentFragmentGrids, IndependentFragmentGrids_IndividualZeroInstances, IndependentFragmentGrids_IntegrationTest, IndependentFragmentGrids_Sole_NN_Calculation, JobMarket_RobustOnKillsSegFaults, JobMarket_StableWorkerPool, JobMarket_unresolvable_hostname_fix, MoreRobust_FragmentAutomation, ODR_violation_mpqc_open, PartialCharges_OrthogonalSummation, PdbParser_setsAtomName, PythonUI_with_named_parameters, QtGui_reactivate_TimeChanged_changes, Recreated_GuiChecks, Rewrite_FitPartialCharges, RotateToPrincipalAxisSystem_UndoRedo, SaturateAtoms_findBestMatching, SaturateAtoms_singleDegree, StoppableMakroAction, Subpackage_CodePatterns, Subpackage_JobMarket, Subpackage_LinearAlgebra, Subpackage_levmar, Subpackage_mpqc_open, Subpackage_vmg, Switchable_LogView, ThirdParty_MPQC_rebuilt_buildsystem, TrajectoryDependenant_MaxOrder, TremoloParser_IncreasedPrecision, TremoloParser_MultipleTimesteps, TremoloParser_setsAtomName, Ubuntu_1604_changes, stable
- Children:
- 38032a
- Parents:
- 5d8c0f
- git-author:
- Frederik Heber <heber@…> (02/29/12 08:32:10)
- git-committer:
- Frederik Heber <heber@…> (06/27/12 16:07:21)
- Location:
- src/Fragmentation/Automation
- Files:
-
- 11 added
- 4 deleted
- 6 edited
Legend:
- Unmodified
- Added
- Removed
-
src/Fragmentation/Automation/Controller/Commands/ShutdownOperation.cpp
r5d8c0f r41c1b7 2 2 * Project: MoleCuilder 3 3 * Description: creates and alters molecular systems 4 * Copyright (C) 201 1-2012 University of Bonn. All rights reserved.4 * Copyright (C) 2012 University of Bonn. All rights reserved. 5 5 * Please see the LICENSE file or "Copyright notice" in builder.cpp for details. 6 6 */ … … 9 9 * ShutdownOperation.cpp 10 10 * 11 * Created on: Dec 11, 201111 * Created on: Feb 28, 2012 12 12 * Author: heber 13 13 */ -
src/Fragmentation/Automation/FragmentScheduler.cpp
r5d8c0f r41c1b7 34 34 #include "CodePatterns/Info.hpp" 35 35 #include "CodePatterns/Log.hpp" 36 #include "Controller/Commands/EnrollInPoolOperation.hpp" 36 37 #include "Jobs/MPQCCommandJob.hpp" 37 38 #include "Jobs/SystemCommandJob.hpp" … … 59 60 */ 60 61 FragmentScheduler::FragmentScheduler(boost::asio::io_service& io_service, unsigned short workerport, unsigned short controllerport) : 61 WorkerListener(io_service, workerport, JobsQueue), 62 WorkerListener(io_service, workerport, JobsQueue, pool, 63 boost::bind(&FragmentScheduler::sendJobToWorker, boost::ref(*this), _1, _2)), 62 64 ControllerListener(io_service, controllerport, JobsQueue, 63 boost::bind(&Listener::initiateSocket, boost::ref(WorkerListener))) 64 { 65 Info info(__FUNCTION__); 65 boost::bind(&Listener::initiateSocket, boost::ref(WorkerListener))), 66 connection(io_service), 67 sendJobOp(connection) 68 { 69 Info info(__FUNCTION__); 70 71 // listen for controller 72 ControllerListener.initiateSocket(); 66 73 67 74 // only initiate socket if jobs are already present … … 69 76 WorkerListener.initiateSocket(); 70 77 } 71 72 ControllerListener.initiateSocket();73 78 } 74 79 75 80 /** Handle a new worker connection. 76 81 * 77 * We check whether jobs are in the JobsQueue. If present, job is sent. 78 * 79 * \sa handle_SendJobtoWorker() 82 * We store the given address in the pool. 80 83 * 81 84 * \param e error code if something went wrong … … 88 91 { 89 92 // Successfully accepted a new connection. 90 // Check whether there are jobs in the queue 91 if (JobsQueue.isJobPresent()) { 92 // pop a job and send it to the client. 93 FragmentJob::ptr job(JobsQueue.popJob()); 94 // The connection::async_write() function will automatically 95 // serialize the data structure for us. 96 LOG(1, "INFO: Sending job #" << job->getId() << "."); 97 conn->async_write(job, 98 boost::bind(&FragmentScheduler::WorkerListener_t::handle_SendJobtoWorker, this, 99 boost::asio::placeholders::error, conn)); 100 101 } else { 102 // send the static NoJob 103 conn->async_write(NoJob, 104 boost::bind(&FragmentScheduler::WorkerListener_t::handle_SendJobtoWorker, this, 105 boost::asio::placeholders::error, conn)); 106 107 // then there must be no read necesary 108 109 ELOG(2, "There is currently no job present in the queue."); 110 } 111 } 112 else 113 { 93 // read address 94 conn->async_read(address, 95 boost::bind(&FragmentScheduler::WorkerListener_t::handle_checkAddress, this, 96 boost::asio::placeholders::error, conn)); 97 } else { 114 98 // An error occurred. Log it and return. Since we are not starting a new 115 99 // accept operation the io_service will run out of work to do and the … … 118 102 ELOG(0, e.message()); 119 103 } 104 } 105 106 /** Callback function when worker address has been received. 107 * 108 * \param e error code if something went wrong 109 * \param conn reference with the connection 110 */ 111 void FragmentScheduler::WorkerListener_t::handle_checkAddress(const boost::system::error_code& e, connection_ptr conn) 112 { 113 Info info(__FUNCTION__); 114 if (!e) 115 { 116 if (pool.presentInPool(address)) { 117 // check whether its priority is busy_priority 118 if (pool.isWorkerBusy(address)) { 119 conn->async_read(result, 120 boost::bind(&FragmentScheduler::WorkerListener_t::handle_ReceiveResultFromWorker, this, 121 boost::asio::placeholders::error, conn)); 122 } else { 123 ELOG(1, "INFO: Idle worker "+toString(address)+" has logged in again."); 124 } 125 } else { 126 // insert as its new worker 127 pool.addWorker(address); 128 enum EnrollInPoolOperation::EnrollFlag flag = EnrollInPoolOperation::Success; 129 conn->async_write(flag, 130 boost::bind(&FragmentScheduler::WorkerListener_t::handle_enrolled, this, 131 boost::asio::placeholders::error, conn)); 132 } 133 } 134 else 135 { 136 // An error occurred. Log it and return. Since we are not starting a new 137 // accept operation the io_service will run out of work to do and the 138 // server will exit. 139 Exitflag = ErrorFlag; 140 ELOG(0, e.message()); 141 } 120 142 121 143 if (JobsQueue.isJobPresent()) { … … 126 148 } 127 149 128 /** Callback function when job has been sent. 129 * 130 * After job has been sent we start async_read() for the result. 131 * 132 * \sa handle_ReceiveResultFromWorker() 133 * 134 * \param e error code if something went wrong 135 * \param conn reference with the connection 136 */ 137 void FragmentScheduler::WorkerListener_t::handle_SendJobtoWorker(const boost::system::error_code& e, connection_ptr conn) 138 { 139 Info info(__FUNCTION__); 140 LOG(1, "INFO: Job sent."); 141 // obtain result 142 LOG(1, "INFO: Receiving result for a job ..."); 143 conn->async_read(result, 144 boost::bind(&FragmentScheduler::WorkerListener_t::handle_ReceiveResultFromWorker, this, 145 boost::asio::placeholders::error, conn)); 150 /** Callback function when new worker has enrolled. 151 * 152 * \param e error code if something went wrong 153 * \param conn reference with the connection 154 */ 155 void FragmentScheduler::WorkerListener_t::handle_enrolled(const boost::system::error_code& e, connection_ptr conn) 156 { 157 Info info(__FUNCTION__); 158 if (!e) 159 { 160 FragmentJob::ptr job; 161 if (JobsQueue.isJobPresent()) { 162 job = JobsQueue.popJob(); 163 } else { 164 job = NoJob; 165 } 166 callback_sendJobToWorker(address, job); 167 } 168 else 169 { 170 // An error occurred. Log it and return. Since we are not starting a new 171 // accept operation the io_service will run out of work to do and the 172 // server will exit. 173 Exitflag = ErrorFlag; 174 ELOG(0, e.message()); 175 } 146 176 } 147 177 … … 155 185 Info info(__FUNCTION__); 156 186 LOG(1, "INFO: Received result for job #" << result->getId() << " ..."); 187 157 188 // and push into queue 158 189 ASSERT(result->getId() != (JobId_t)JobId::NoJob, 159 " FragmentScheduler::handle_ReceiveResultFromWorker() - result received has NoJob id.");190 "WorkerListener_t::handle_ReceiveResultFromWorker() - result received has NoJob id."); 160 191 ASSERT(result->getId() != (JobId_t)JobId::IllegalJob, 161 " FragmentScheduler::handle_ReceiveResultFromWorker() - result received has IllegalJob id.");192 "WorkerListener_t::handle_ReceiveResultFromWorker() - result received has IllegalJob id."); 162 193 // place id into expected 163 194 if ((result->getId() != (JobId_t)JobId::NoJob) && (result->getId() != (JobId_t)JobId::IllegalJob)) 164 195 JobsQueue.pushResult(result); 196 197 // mark as idle 198 pool.unmarkWorkerBusy(address); 199 // for now remove worker again from pool such that other may connect 200 pool.removeWorker(address); 201 165 202 // erase result 166 203 result.reset(); 167 204 LOG(1, "INFO: JobsQueue has " << JobsQueue.getDoneJobs() << " results."); 168 205 } 206 169 207 170 208 /** Handle a new controller connection. … … 348 386 } 349 387 388 389 /** Helper function to send a job to worker. 390 * 391 * @param address address of worker 392 * @param job job to send 393 */ 394 void FragmentScheduler::sendJobToWorker(const WorkerAddress &address, FragmentJob::ptr &job) 395 { 396 ASSERT( !pool.isWorkerBusy(address), 397 "FragmentScheduler::sendJobToWorker() - Worker "+toString(address)+" is already busy."); 398 LOG(1, "INFO: Sending job " << job->getId() << " to worker " << address << "."); 399 sendJobOp.setJob(job); 400 sendJobOp(address.host, address.service); 401 // set worker as busy (assuming it has been removed from idle queue already) 402 WorkerPool::Idle_Queue_t::iterator iter = pool.getIdleWorker(address); 403 ASSERT( iter != pool.getIdleEnd(), 404 "FragmentScheduler::sendJobToWorker() - cannot find worker " 405 +toString(address)+" in idle queue."); 406 pool.markWorkerBusy(iter); 407 } 408 409 ///** Helper function to shutdown a single worker. 410 // * 411 // * We send NoJob to indicate shutdown 412 // * 413 // * @param address of worker to shutdown 414 // */ 415 //void FragmentScheduler::shutdownWorker(const WorkerAddress &address) 416 //{ 417 // sendJobToWorker(address, NoJob); 418 //} 419 // 420 ///** Sends shutdown to all current workers in the pool. 421 // * 422 // */ 423 //void FragmentScheduler::removeAllWorkers() 424 //{ 425 // // give all workers shutdown signal 426 // while (pool.presentIdleWorkers()) { 427 // const WorkerAddress address = pool.getNextIdleWorker(); 428 // shutdownWorker(address); 429 // } 430 //} -
src/Fragmentation/Automation/FragmentScheduler.hpp
r5d8c0f r41c1b7 20 20 #include "Connection.hpp" 21 21 #include "ControllerChoices.hpp" 22 #include "Controller/Commands/SendJobToWorkerOperation.hpp" 22 23 #include "FragmentQueue.hpp" 23 24 #include "GlobalJobId.hpp" … … 26 27 #include "Results/FragmentResult.hpp" 27 28 #include "types.hpp" 29 #include "Pool/WorkerPool.hpp" 30 #include "WorkerAddress.hpp" 28 31 29 32 /** FragmentScheduler serves FragmentJobs to Workers and accepts commands from … … 39 42 40 43 private: 44 void sendJobToWorker(const WorkerAddress &address, FragmentJob::ptr &job); 45 // void shutdownWorker(const WorkerAddress &address); 46 // void removeAllWorkers(); 47 41 48 class WorkerListener_t : public Listener 42 49 { … … 45 52 boost::asio::io_service& io_service, 46 53 unsigned short port, 47 FragmentQueue &_JobsQueue) : 54 FragmentQueue &_JobsQueue, 55 WorkerPool &_pool, 56 boost::function<void (const WorkerAddress&, FragmentJob::ptr&)> _callback) : 48 57 Listener(io_service, port), 58 address("127.0.0.1", "0"), 49 59 JobsQueue(_JobsQueue), 50 result( new FragmentResult(JobId::NoJob) ) 60 pool(_pool), 61 result( new FragmentResult(JobId::NoJob) ), 62 callback_sendJobToWorker(_callback) 51 63 {} 52 64 virtual ~WorkerListener_t() {} … … 57 69 58 70 /// Worker callback function when job has been sent. 59 void handle_SendJobtoWorker(const boost::system::error_code& e, connection_ptr conn); 71 void handle_checkAddress(const boost::system::error_code& e, connection_ptr conn); 72 73 /// Worker callback function when new worker has enrolled. 74 void handle_enrolled(const boost::system::error_code& e, connection_ptr conn); 60 75 61 76 /// Worker callback function when result has been received. 62 77 void handle_ReceiveResultFromWorker(const boost::system::error_code& e, connection_ptr conn); 63 78 private: 64 //!> reference to external FragmentQueue containing jobs to work on 65 FragmentQueue & JobsQueue; 79 //!> address of new Worker 80 WorkerAddress address; 81 82 //!> reference to Queue 83 FragmentQueue &JobsQueue; 84 85 //!> callback reference to container class 86 WorkerPool &pool; 66 87 67 88 /// result that is received from the client. 68 89 FragmentResult::ptr result; 90 91 //!> callback function to access send job function 92 boost::function<void (const WorkerAddress&, FragmentJob::ptr&)> callback_sendJobToWorker; 69 93 70 94 // static entity to indicate to clients that the queue is empty. … … 130 154 131 155 private: 156 /// Queue with data to be sent to each client. 157 FragmentQueue JobsQueue; 158 159 //!> Pool of Workers 160 WorkerPool pool; 161 132 162 //!> Listener instance that waits for a worker 133 163 WorkerListener_t WorkerListener; … … 135 165 //!> Listener instance that waits for a controller 136 166 ControllerListener_t ControllerListener; 137 138 /// Queue with data to be sent to each client.139 FragmentQueue JobsQueue;140 167 141 168 public: … … 153 180 } 154 181 182 private: 183 //!> Connection for sending jobs to workers 184 Connection connection; 185 186 //!> internal operation to send jobs to workers 187 mutable SendJobToWorkerOperation sendJobOp; 155 188 }; 156 189 -
src/Fragmentation/Automation/Makefile.am
r5d8c0f r41c1b7 44 44 FRAGMENTATIONCOMMANDSSOURCE = \ 45 45 Controller/Commands/CheckResultsOperation.cpp \ 46 Controller/Commands/EnrollInPoolOperation.cpp \ 46 47 Controller/Commands/GetNextJobIdOperation.cpp \ 48 Controller/Commands/ObtainJobOperation.cpp \ 47 49 Controller/Commands/Operation.cpp \ 48 50 Controller/Commands/ReceiveJobsOperation.cpp \ 51 Controller/Commands/SendJobToWorkerOperation.cpp \ 49 52 Controller/Commands/SendResultsOperation.cpp \ 50 53 Controller/Commands/ShutdownOperation.cpp \ 51 Controller/Commands/ WorkOnJobOperation.cpp54 Controller/Commands/SubmitResultOperation.cpp 52 55 53 56 FRAGMENTATIONCOMMANDSHEADER = \ 54 57 ControllerChoices.hpp \ 55 58 Controller/Commands/CheckResultsOperation.hpp \ 59 Controller/Commands/EnrollInPoolOperation.hpp \ 56 60 Controller/Commands/GetNextJobIdOperation.hpp \ 61 Controller/Commands/ObtainJobOperation.hpp \ 57 62 Controller/Commands/Operation.hpp \ 58 63 Controller/Commands/ReceiveJobsOperation.hpp \ 64 Controller/Commands/SendJobToWorkerOperation.hpp \ 59 65 Controller/Commands/SendResultsOperation.hpp \ 60 66 Controller/Commands/ShutdownOperation.hpp \ 61 Controller/Commands/ WorkOnJobOperation.hpp67 Controller/Commands/SubmitResultOperation.hpp 62 68 63 69 noinst_LTLIBRARIES += libMolecuilderFragmentationCommands.la … … 69 75 FRAGMENTATIONAUTOMATIONHELPERSOURCE = \ 70 76 atexit.cpp \ 71 GlobalJobId.cpp 77 GlobalJobId.cpp \ 78 Listener.cpp \ 79 WorkerAddress.cpp 72 80 73 81 FRAGMENTATIONAUTOMATIONHELPERHEADER = \ 74 82 atexit.hpp \ 75 GlobalJobId.hpp 83 GlobalJobId.hpp \ 84 Listener.hpp \ 85 WorkerAddress.hpp 76 86 77 87 noinst_LTLIBRARIES += libMolecuilderFragmentationAutomationHelper.la … … 86 96 AM_CPPFLAGS = ${BOOST_CPPFLAGS} ${CodePatterns_CFLAGS} 87 97 88 bin_PROGRAMS += Controller Server Worker98 bin_PROGRAMS += Controller PoolWorker Server 89 99 90 100 CONTROLLERSOURCE = \ … … 97 107 Controller/FragmentController.hpp 98 108 109 POOLWORKERSOURCE = \ 110 Pool/PoolWorker.cpp 111 112 POOLWORKERHEADER = \ 113 Connection.hpp \ 114 Pool/PoolWorker.hpp 115 99 116 SERVERSOURCE = \ 100 117 FragmentScheduler.cpp \ 101 Listener.cpp118 Pool/WorkerPool.cpp 102 119 103 120 SERVERHEADER = \ … … 105 122 ControllerChoices.hpp \ 106 123 FragmentScheduler.hpp \ 107 Listener.hpp 108 109 WORKERSOURCE = \ 110 FragmentWorker.cpp \ 111 Worker.cpp 112 113 WORKERHEADER = \ 114 Connection.hpp \ 115 FragmentWorker.hpp 124 Pool/WorkerPool.hpp 116 125 117 126 Controller_SOURCES = $(CONTROLLERSOURCE) $(CONTROLLERHEADER) controller.cpp … … 119 128 Controller_CXXFLAGS = $(AM_CPPFLAGS) 120 129 Controller_LDADD = \ 130 libMolecuilderFragmentationAutomationHelper.la \ 131 libMolecuilderFragmentationCommands.la \ 132 libMolecuilderFragmentJobs.la \ 133 $(BOOST_ASIO_LIBS) \ 134 $(BOOST_SERIALIZATION_LIBS) \ 135 $(BOOST_THREAD_LIBS) \ 136 $(BOOST_SYSTEM_LIBS) \ 137 ${CodePatterns_LIBS} 138 139 PoolWorker_SOURCES = $(POOLWORKERSOURCE) $(POOLWORKERHEADER) poolworker.cpp 140 PoolWorker_LDFLAGS = $(AM_LDFLAGS) $(BOOST_ASIO_LDFLAGS) $(BOOST_SYSTEM_LDFLAGS) $(BOOST_THREAD_LDFLAGS) $(BOOST_SERIALIZATION_LDFLAGS) 141 PoolWorker_CXXFLAGS = $(AM_CPPFLAGS) $(BOOST_ASIO_DEBUG) 142 PoolWorker_LDADD = \ 121 143 libMolecuilderFragmentationAutomationHelper.la \ 122 144 libMolecuilderFragmentationCommands.la \ … … 141 163 ${CodePatterns_LIBS} 142 164 143 Worker_SOURCES = $(WORKERSOURCE) $(WORKERHEADER)144 Worker_LDFLAGS = $(AM_LDFLAGS) $(BOOST_ASIO_LDFLAGS) $(BOOST_SYSTEM_LDFLAGS) $(BOOST_THREAD_LDFLAGS) $(BOOST_SERIALIZATION_LDFLAGS)145 Worker_CXXFLAGS = $(AM_CPPFLAGS)146 Worker_LDADD = \147 libMolecuilderFragmentationAutomationHelper.la \148 libMolecuilderFragmentationCommands.la \149 libMolecuilderFragmentJobs.la \150 $(BOOST_ASIO_LIBS) \151 $(BOOST_SERIALIZATION_LIBS) \152 $(BOOST_THREAD_LIBS) \153 $(BOOST_SYSTEM_LIBS) \154 ${CodePatterns_LIBS}155 -
src/Fragmentation/Automation/Pool/WorkerPool.cpp
r5d8c0f r41c1b7 76 76 // return address 77 77 return returnaddress; 78 } 79 80 WorkerPool::Idle_Queue_t::iterator WorkerPool::getIdleWorker(const WorkerAddress &address) 81 { 82 Idle_Queue_t::iterator idleiter = idle_queue.begin(); 83 while (idleiter != idle_queue.end()) { 84 if (idleiter->second == address) { 85 break; 86 } 87 ++idleiter; 88 } 89 return idleiter; 78 90 } 79 91 … … 130 142 Pool_t::iterator iter = pool.find( address ); 131 143 if (iter != pool.end()) { 132 Idle_Queue_t::iterator idleiter = idle_queue.begin(); 133 while (idleiter != idle_queue.end()) { 134 if (idleiter->second == address) { 135 idle_queue.erase(idleiter); 136 break; 137 } 138 } 144 Idle_Queue_t::iterator idleiter = getIdleWorker(address); 145 if (idleiter != idle_queue.end()) 146 idle_queue.erase(idleiter); 139 147 Busy_Queue_t::iterator busyiter = busy_queue.find(address); 140 148 if (busyiter != busy_queue.end()) … … 147 155 +" is in pool and both idle and busy!"); 148 156 pool.erase(iter); 157 LOG(1, "INFO: Removed worker " << address << " from pool."); 149 158 return true; 150 159 } else { … … 174 183 void WorkerPool::markWorkerBusy(Idle_Queue_t::iterator &iter) 175 184 { 176 if (isWorkerBusy(iter->second)) 185 const WorkerAddress returnaddress = iter->second; 186 if (isWorkerBusy(returnaddress)) 177 187 return; 178 const WorkerAddress returnaddress = iter->second;179 188 const priority_t priority = iter->first; 180 189 … … 206 215 busy_queue.erase(address); 207 216 idle_queue.insert( make_pair( priority, address) ); 208 } 209 } 217 218 LOG(1, "INFO: Worker " << address << " is now marked idle."); 219 } 220 } -
src/Fragmentation/Automation/Pool/WorkerPool.hpp
r5d8c0f r41c1b7 46 46 void unmarkWorkerBusy(const WorkerAddress &address); 47 47 48 private: 48 // this is currently for the passing time until Worker pool is fully operable 49 49 50 //!> typedef of the priority in the idle queue of a worker 50 51 typedef size_t priority_t; … … 53 54 typedef std::multimap<priority_t, WorkerAddress> Idle_Queue_t; 54 55 56 Idle_Queue_t::iterator getIdleWorker(const WorkerAddress &address); 57 Idle_Queue_t::iterator getIdleEnd() 58 { 59 return idle_queue.end(); 60 } 61 void markWorkerBusy(Idle_Queue_t::iterator &iter); 62 63 private: 55 64 //!> typedef for the worker queue being a map with priority and address of worker 56 65 typedef std::map<WorkerAddress, priority_t> Busy_Queue_t; … … 60 69 61 70 private: 62 void markWorkerBusy(Idle_Queue_t::iterator &iter);63 71 void removeAllWorkers(); 64 72
Note:
See TracChangeset
for help on using the changeset viewer.