source: src/Fragmentation/Automation/FragmentScheduler.cpp@ befcf8

Action_Thermostats Add_AtomRandomPerturbation Add_FitFragmentPartialChargesAction Add_RotateAroundBondAction Add_SelectAtomByNameAction Added_ParseSaveFragmentResults AddingActions_SaveParseParticleParameters Adding_Graph_to_ChangeBondActions Adding_MD_integration_tests Adding_ParticleName_to_Atom Adding_StructOpt_integration_tests AtomFragments Automaking_mpqc_open AutomationFragmentation_failures Candidate_v1.5.4 Candidate_v1.6.0 Candidate_v1.6.1 ChangeBugEmailaddress ChangingTestPorts ChemicalSpaceEvaluator CombiningParticlePotentialParsing Combining_Subpackages Debian_Package_split Debian_package_split_molecuildergui_only Disabling_MemDebug Docu_Python_wait EmpiricalPotential_contain_HomologyGraph EmpiricalPotential_contain_HomologyGraph_documentation Enable_parallel_make_install Enhance_userguide Enhanced_StructuralOptimization Enhanced_StructuralOptimization_continued Example_ManyWaysToTranslateAtom Exclude_Hydrogens_annealWithBondGraph FitPartialCharges_GlobalError Fix_BoundInBox_CenterInBox_MoleculeActions Fix_ChargeSampling_PBC Fix_ChronosMutex Fix_FitPartialCharges Fix_FitPotential_needs_atomicnumbers Fix_ForceAnnealing Fix_IndependentFragmentGrids Fix_ParseParticles Fix_ParseParticles_split_forward_backward_Actions Fix_PopActions Fix_QtFragmentList_sorted_selection Fix_Restrictedkeyset_FragmentMolecule Fix_StatusMsg Fix_StepWorldTime_single_argument Fix_Verbose_Codepatterns Fix_fitting_potentials Fixes ForceAnnealing_goodresults ForceAnnealing_oldresults ForceAnnealing_tocheck ForceAnnealing_with_BondGraph ForceAnnealing_with_BondGraph_continued ForceAnnealing_with_BondGraph_continued_betteresults ForceAnnealing_with_BondGraph_contraction-expansion FragmentAction_writes_AtomFragments FragmentMolecule_checks_bonddegrees GeometryObjects Gui_Fixes Gui_displays_atomic_force_velocity ImplicitCharges IndependentFragmentGrids IndependentFragmentGrids_IndividualZeroInstances IndependentFragmentGrids_IntegrationTest IndependentFragmentGrids_Sole_NN_Calculation JobMarket_RobustOnKillsSegFaults JobMarket_StableWorkerPool JobMarket_unresolvable_hostname_fix MoreRobust_FragmentAutomation ODR_violation_mpqc_open PartialCharges_OrthogonalSummation PdbParser_setsAtomName PythonUI_with_named_parameters QtGui_reactivate_TimeChanged_changes Recreated_GuiChecks Rewrite_FitPartialCharges RotateToPrincipalAxisSystem_UndoRedo SaturateAtoms_findBestMatching SaturateAtoms_singleDegree StoppableMakroAction Subpackage_CodePatterns Subpackage_JobMarket Subpackage_LinearAlgebra Subpackage_levmar Subpackage_mpqc_open Subpackage_vmg Switchable_LogView ThirdParty_MPQC_rebuilt_buildsystem TrajectoryDependenant_MaxOrder TremoloParser_IncreasedPrecision TremoloParser_MultipleTimesteps TremoloParser_setsAtomName Ubuntu_1604_changes stable
Last change on this file since befcf8 was befcf8, checked in by Frederik Heber <heber@…>, 13 years ago

Rewrote FragmentScheduler::shutdown to make sure idle_queue is truely const.

  • WorkerPool has new function to return idle_queue addresses as Vector.
  • We first get all addresses, then shutdown each and also we iterate over this as long as OperationQueue has Ops and idle_queue has idlers.
  • Property mode set to 100644
File size: 20.3 KB
RevLine 
[72eaf7f]1/*
[cd4a6e]2 * Project: MoleCuilder
3 * Description: creates and alters molecular systems
4 * Copyright (C) 2011 University of Bonn. All rights reserved.
5 * Please see the LICENSE file or "Copyright notice" in builder.cpp for details.
6 */
7
8/*
9 * \file FragmentScheduler.cpp
10 *
11 * This file strongly follows the Serialization example from the boost::asio
12 * library (see server.cpp)
[72eaf7f]13 *
[cd4a6e]14 * Created on: Oct 19, 2011
[72eaf7f]15 * Author: heber
16 */
17
[f93842]18// include config.h
19#ifdef HAVE_CONFIG_H
20#include <config.h>
21#endif
22
[c6bcd0]23// boost asio needs specific operator new
[72eaf7f]24#include <boost/asio.hpp>
[c6bcd0]25
26#include "CodePatterns/MemDebug.hpp"
27
[c4f43e]28#include <algorithm>
[72eaf7f]29#include <boost/bind.hpp>
[9a6b895]30#include <boost/lambda/lambda.hpp>
[72eaf7f]31#include <boost/lexical_cast.hpp>
32#include <iostream>
33#include <vector>
[af3aed]34#include "Connection.hpp" // Must come before boost/serialization headers.
[72eaf7f]35#include <boost/serialization/vector.hpp>
[af3aed]36#include "CodePatterns/Info.hpp"
[b0b64c]37#include "CodePatterns/Log.hpp"
[2344a3]38#include "CodePatterns/Observer/Notification.hpp"
39#include "ControllerChoices.hpp"
[9a6b895]40#include "Operations/Servers/SendJobToWorkerOperation.hpp"
[50d095]41#include "Operations/Workers/EnrollInPoolOperation.hpp"
[ff60cfa]42#include "Jobs/MPQCCommandJob.hpp"
[d920b9]43#include "Jobs/SystemCommandJob.hpp"
[ef2767]44#include "JobId.hpp"
[72eaf7f]45
[cd4a6e]46#include "FragmentScheduler.hpp"
[72eaf7f]47
[ff60cfa]48/** Helper function to enforce binding of FragmentWorker to possible derived
49 * FragmentJob classes.
50 */
51void dummyInit() {
52 SystemCommandJob("/bin/false", "something", JobId::IllegalJob);
53 MPQCCommandJob("nofile", JobId::IllegalJob);
54}
[c7deca]55
[db03d9]56/** Constructor of class FragmentScheduler.
57 *
58 * We setup both acceptors to accept connections from workers and Controller.
59 *
60 * \param io_service io_service of the asynchronous communications
61 * \param workerport port to listen for worker connections
62 * \param controllerport port to listen for controller connections.
63 */
[2344a3]64FragmentScheduler::FragmentScheduler(boost::asio::io_service& _io_service, unsigned short workerport, unsigned short controllerport) :
65 Observer("FragmentScheduler"),
66 io_service(_io_service),
67 WorkerListener(_io_service, workerport, JobsQueue, pool,
[41c1b7]68 boost::bind(&FragmentScheduler::sendJobToWorker, boost::ref(*this), _1, _2)),
[2344a3]69 ControllerListener(_io_service, controllerport, JobsQueue,
70 boost::bind(&FragmentScheduler::shutdown, boost::ref(*this))),
[ba995d]71 connection(_io_service)
[ed2c5b]72{
[b0b64c]73 Info info(__FUNCTION__);
[72eaf7f]74
[2344a3]75 // sign on to idle workers and present jobs
76 pool.signOn(this, WorkerPool::WorkerIdle);
77 JobsQueue.signOn(this, FragmentQueue::JobAdded);
78
[41c1b7]79 // listen for controller
80 ControllerListener.initiateSocket();
81
[2344a3]82 // listen for workers
83 WorkerListener.initiateSocket();
84}
85
86FragmentScheduler::~FragmentScheduler()
87{
88 // sign off
89 pool.signOff(this, WorkerPool::WorkerIdle);
90 JobsQueue.signOff(this, FragmentQueue::JobAdded);
[402bde]91}
92
[db03d9]93/** Handle a new worker connection.
94 *
[41c1b7]95 * We store the given address in the pool.
[db03d9]96 *
97 * \param e error code if something went wrong
98 * \param conn reference with the connection
99 */
[8036b7]100void FragmentScheduler::WorkerListener_t::handle_Accept(const boost::system::error_code& e, connection_ptr conn)
[ed2c5b]101{
[cd4a6e]102 Info info(__FUNCTION__);
[ed2c5b]103 if (!e)
[72eaf7f]104 {
[b0b64c]105 // Successfully accepted a new connection.
[41c1b7]106 // read address
107 conn->async_read(address,
[9a3f84]108 boost::bind(&FragmentScheduler::WorkerListener_t::handle_ReadAddress, this,
[41c1b7]109 boost::asio::placeholders::error, conn));
[9a3f84]110 }
111 else
112 {
[41c1b7]113 // An error occurred. Log it and return. Since we are not starting a new
114 // accept operation the io_service will run out of work to do and the
115 // server will exit.
116 Exitflag = ErrorFlag;
117 ELOG(0, e.message());
118 }
119}
[0bdd51b]120
[9a3f84]121/** Handle having received Worker's address
[41c1b7]122 *
123 * \param e error code if something went wrong
124 * \param conn reference with the connection
125 */
[9a3f84]126void FragmentScheduler::WorkerListener_t::handle_ReadAddress(const boost::system::error_code& e, connection_ptr conn)
[41c1b7]127{
128 Info info(__FUNCTION__);
129 if (!e)
130 {
[9a3f84]131 // Successfully accepted a new connection.
132 // read address
133 conn->async_read(choice,
134 boost::bind(&FragmentScheduler::WorkerListener_t::handle_ReadChoice, this,
135 boost::asio::placeholders::error, conn));
136 }
137 else
138 {
139 // An error occurred. Log it and return. Since we are not starting a new
140 // accept operation the io_service will run out of work to do and the
141 // server will exit.
142 Exitflag = ErrorFlag;
143 ELOG(0, e.message());
144 }
145}
146
147/** Controller callback function to read the choice for next operation.
148 *
149 * \param e error code if something went wrong
150 * \param conn reference with the connection
151 */
152void FragmentScheduler::WorkerListener_t::handle_ReadChoice(const boost::system::error_code& e, connection_ptr conn)
153{
154 Info info(__FUNCTION__);
155 if (!e)
156 {
157 LOG(1, "INFO: Received request for operation " << choice << ".");
158 // switch over the desired choice read previously
159 switch(choice) {
160 case NoWorkerOperation:
161 {
162 ELOG(1, "WorkerListener_t::handle_ReadChoice() - called with NoOperation.");
163 break;
164 }
165 case EnrollInPool:
166 {
167 if (pool.presentInPool(address)) {
168 ELOG(1, "INFO: worker "+toString(address)+" is already contained in pool.");
169 enum EnrollInPoolOperation::EnrollFlag flag = EnrollInPoolOperation::Fail;
170 conn->async_write(flag,
171 boost::bind(&FragmentScheduler::WorkerListener_t::handle_enrolled, this,
172 boost::asio::placeholders::error, conn));
173 } else {
174 // insert as its new worker
175 LOG(1, "INFO: Adding " << address << " to pool ...");
176 pool.addWorker(address);
177 enum EnrollInPoolOperation::EnrollFlag flag = EnrollInPoolOperation::Success;
178 conn->async_write(flag,
179 boost::bind(&FragmentScheduler::WorkerListener_t::handle_enrolled, this,
180 boost::asio::placeholders::error, conn));
181 break;
182 }
183 case SendResult:
184 {
185 if (pool.presentInPool(address)) {
186 // check whether its priority is busy_priority
187 if (pool.isWorkerBusy(address)) {
188 conn->async_read(result,
189 boost::bind(&FragmentScheduler::WorkerListener_t::handle_ReceiveResultFromWorker, this,
190 boost::asio::placeholders::error, conn));
191 } else {
192 ELOG(1, "Worker " << address << " trying to send result who is not marked as busy.");
193 conn->async_read(result,
194 boost::bind(&FragmentScheduler::WorkerListener_t::handle_RejectResultFromWorker, this,
195 boost::asio::placeholders::error, conn));
196 }
197 } else {
198 ELOG(1, "Worker " << address << " trying to send result who is not in pool.");
199 conn->async_read(result,
200 boost::bind(&FragmentScheduler::WorkerListener_t::handle_RejectResultFromWorker, this,
201 boost::asio::placeholders::error, conn));
202 }
203 break;
204 }
205 case RemoveFromPool:
206 {
207 if (pool.presentInPool(address)) {
208 // removing present worker
209 pool.removeWorker(address);
210 } else {
211 ELOG(1, "Shutting down Worker " << address << " not contained in pool.");
212 }
213 break;
214 }
215 default:
216 Exitflag = ErrorFlag;
217 ELOG(1, "WorkerListener_t::handle_ReadChoice() - called with no valid choice.");
218 break;
[41c1b7]219 }
[b0b64c]220 }
[9a3f84]221 // restore NoOperation choice such that choice is not read twice
222 choice = NoWorkerOperation;
[2344a3]223
224 initiateSocket();
[cd4a6e]225 }
226 else
227 {
228 // An error occurred. Log it and return. Since we are not starting a new
229 // accept operation the io_service will run out of work to do and the
230 // server will exit.
[8036b7]231 Exitflag = ErrorFlag;
[b0b64c]232 ELOG(0, e.message());
[cd4a6e]233 }
[ed2c5b]234}
[72eaf7f]235
[9a3f84]236
[41c1b7]237/** Callback function when new worker has enrolled.
[db03d9]238 *
239 * \param e error code if something went wrong
240 * \param conn reference with the connection
241 */
[41c1b7]242void FragmentScheduler::WorkerListener_t::handle_enrolled(const boost::system::error_code& e, connection_ptr conn)
[ed2c5b]243{
[41c1b7]244 Info info(__FUNCTION__);
[2344a3]245 if (e)
[41c1b7]246 {
247 // An error occurred. Log it and return. Since we are not starting a new
248 // accept operation the io_service will run out of work to do and the
249 // server will exit.
250 Exitflag = ErrorFlag;
251 ELOG(0, e.message());
252 }
[ef2767]253}
254
[db03d9]255/** Callback function when result has been received.
256 *
257 * \param e error code if something went wrong
258 * \param conn reference with the connection
259 */
[8036b7]260void FragmentScheduler::WorkerListener_t::handle_ReceiveResultFromWorker(const boost::system::error_code& e, connection_ptr conn)
[ef2767]261{
[db03d9]262 Info info(__FUNCTION__);
[35f587]263 LOG(1, "INFO: Received result for job #" << result->getId() << " ...");
[41c1b7]264
[35f587]265 // and push into queue
266 ASSERT(result->getId() != (JobId_t)JobId::NoJob,
[41c1b7]267 "WorkerListener_t::handle_ReceiveResultFromWorker() - result received has NoJob id.");
[35f587]268 ASSERT(result->getId() != (JobId_t)JobId::IllegalJob,
[41c1b7]269 "WorkerListener_t::handle_ReceiveResultFromWorker() - result received has IllegalJob id.");
[778abb]270 // place id into expected
[35f587]271 if ((result->getId() != (JobId_t)JobId::NoJob) && (result->getId() != (JobId_t)JobId::IllegalJob))
[db03d9]272 JobsQueue.pushResult(result);
[41c1b7]273
274 // mark as idle
275 pool.unmarkWorkerBusy(address);
276
[db03d9]277 // erase result
[35f587]278 result.reset();
[778abb]279 LOG(1, "INFO: JobsQueue has " << JobsQueue.getDoneJobs() << " results.");
[db03d9]280}
281
[9a3f84]282/** Callback function when result has been received.
283 *
284 * \param e error code if something went wrong
285 * \param conn reference with the connection
286 */
287void FragmentScheduler::WorkerListener_t::handle_RejectResultFromWorker(const boost::system::error_code& e, connection_ptr conn)
288{
289 Info info(__FUNCTION__);
290 // nothing to do
291 LOG(1, "INFO: Rejecting result for job #" << result->getId() << ", placing back into queue.");
292
293 JobsQueue.resubmitJob(result->getId());
294
295 LOG(1, "INFO: JobsQueue has " << JobsQueue.getDoneJobs() << " results.");
296}
297
[41c1b7]298
[db03d9]299/** Handle a new controller connection.
300 *
301 * \sa handle_ReceiveJobs()
302 * \sa handle_CheckResultState()
303 * \sa handle_SendResults()
304 *
305 * \param e error code if something went wrong
306 * \param conn reference with the connection
307 */
[8036b7]308void FragmentScheduler::ControllerListener_t::handle_Accept(const boost::system::error_code& e, connection_ptr conn)
[db03d9]309{
310 Info info(__FUNCTION__);
311 if (!e)
312 {
[778abb]313 conn->async_read(choice,
[8036b7]314 boost::bind(&FragmentScheduler::ControllerListener_t::handle_ReadChoice, this,
[778abb]315 boost::asio::placeholders::error, conn));
316 }
317 else
318 {
319 // An error occurred. Log it and return. Since we are not starting a new
320 // accept operation the io_service will run out of work to do and the
321 // server will exit.
[8036b7]322 Exitflag = ErrorFlag;
[778abb]323 ELOG(0, e.message());
324 }
325}
326
327/** Controller callback function to read the choice for next operation.
328 *
329 * \param e error code if something went wrong
330 * \param conn reference with the connection
331 */
[8036b7]332void FragmentScheduler::ControllerListener_t::handle_ReadChoice(const boost::system::error_code& e, connection_ptr conn)
[778abb]333{
334 Info info(__FUNCTION__);
335 if (!e)
336 {
[0196c6]337 bool LaunchNewAcceptor = true;
[d1dbfc]338 LOG(1, "INFO: Received request for operation " << choice << ".");
[778abb]339 // switch over the desired choice read previously
340 switch(choice) {
[38032a]341 case NoControllerOperation:
[778abb]342 {
[9a3f84]343 ELOG(1, "ControllerListener_t::handle_ReadChoice() - called with NoOperation.");
[778abb]344 break;
345 }
[d1dbfc]346 case GetNextJobId:
347 {
[c4f43e]348 LOG(1, "INFO: Receiving number of desired job ids from controller ...");
349 conn->async_read(NumberIds,
[8036b7]350 boost::bind(&FragmentScheduler::ControllerListener_t::handle_GetNextJobIdState, this,
[d1dbfc]351 boost::asio::placeholders::error, conn));
352 break;
353 }
[425fc6]354 case SendJobs:
[d1dbfc]355 {
356 // The connection::async_write() function will automatically
357 // serialize the data structure for us.
358 LOG(1, "INFO: Receiving bunch of jobs from a controller ...");
359 conn->async_read(jobs,
[8036b7]360 boost::bind(&FragmentScheduler::ControllerListener_t::handle_ReceiveJobs, this,
[d1dbfc]361 boost::asio::placeholders::error, conn));
362 break;
363 }
[778abb]364 case CheckState:
365 {
[3c4a5e]366 // first update number
[6f2bc7]367 jobInfo[0] = JobsQueue.getPresentJobs();
368 jobInfo[1] = JobsQueue.getDoneJobs();
[3c4a5e]369 // now we accept connections to check for state of calculations
[6f2bc7]370 LOG(1, "INFO: Sending state that "+toString(jobInfo[0])+" jobs are present and "+toString(jobInfo[1])+" jobs are done to controller ...");
371 conn->async_write(jobInfo,
[8036b7]372 boost::bind(&FragmentScheduler::ControllerListener_t::handle_CheckResultState, this,
[3c4a5e]373 boost::asio::placeholders::error, conn));
[778abb]374 break;
375 }
[9d14c3]376 case ReceiveResults:
[778abb]377 {
[35f587]378 const std::vector<FragmentResult::ptr> results = JobsQueue.getAllResults();
[778abb]379 // ... or we give the results
380 LOG(1, "INFO: Sending "+toString(results.size())+" results to controller ...");
381 conn->async_write(results,
[8036b7]382 boost::bind(&FragmentScheduler::ControllerListener_t::handle_SendResults, this,
[778abb]383 boost::asio::placeholders::error, conn));
[0196c6]384 break;
385 }
[38032a]386 case ShutdownControllerSocket:
[0196c6]387 {
[9a3f84]388 LOG(1, "INFO: Received shutdown from controller ...");
389 // only allow for shutdown when there are no more jobs in the queue
390 if (!JobsQueue.isJobPresent()) {
391 LaunchNewAcceptor = false;
392 } else {
393 ELOG(2, "There are still jobs waiting in the queue.");
394 }
[778abb]395 break;
[db03d9]396 }
[778abb]397 default:
[8036b7]398 Exitflag = ErrorFlag;
[9a3f84]399 ELOG(1, "ControllerListener_t::handle_ReadChoice() - called with no valid choice.");
[778abb]400 break;
401 }
[38032a]402 // restore NoControllerOperation choice such that choice is not read twice
403 choice = NoControllerOperation;
[778abb]404
[0196c6]405 if (LaunchNewAcceptor) {
406 LOG(1, "Launching new acceptor on socket.");
407 // Start an accept operation for a new Connection.
[8036b7]408 initiateSocket();
[2344a3]409 } else {
410 // we shutdown? Hence, also shutdown controller
411 shutdownAllSockets();
[0196c6]412 }
[db03d9]413 }
414 else
415 {
416 // An error occurred. Log it and return. Since we are not starting a new
417 // accept operation the io_service will run out of work to do and the
418 // server will exit.
[8036b7]419 Exitflag = ErrorFlag;
[db03d9]420 ELOG(0, e.message());
421 }
422}
423
424/** Controller callback function when job has been sent.
[778abb]425 *
426 * We check here whether the worker socket is accepting, if there
427 * have been no jobs we re-activate it, as it is shut down after
428 * last job.
[db03d9]429 *
430 * \param e error code if something went wrong
431 * \param conn reference with the connection
432 */
[8036b7]433void FragmentScheduler::ControllerListener_t::handle_ReceiveJobs(const boost::system::error_code& e, connection_ptr conn)
[db03d9]434{
435 Info info(__FUNCTION__);
436 // jobs are received, hence place in JobsQueue
437 if (!jobs.empty()) {
438 LOG(1, "INFO: Pushing " << jobs.size() << " jobs into queue.");
439 JobsQueue.pushJobs(jobs);
440 }
441 jobs.clear();
[ed2c5b]442}
[cd4a6e]443
[3c4a5e]444/** Controller callback function when checking on state of results.
445 *
446 * \param e error code if something went wrong
447 * \param conn reference with the connection
448 */
[8036b7]449void FragmentScheduler::ControllerListener_t::handle_CheckResultState(const boost::system::error_code& e, connection_ptr conn)
[3c4a5e]450{
451 Info info(__FUNCTION__);
452 // do nothing
[6f2bc7]453 LOG(1, "INFO: Sent that " << jobInfo << " jobs are (scheduled, done).");
[3c4a5e]454}
[778abb]455
[d1dbfc]456/** Controller callback function when checking on state of results.
457 *
458 * \param e error code if something went wrong
459 * \param conn reference with the connection
460 */
[8036b7]461void FragmentScheduler::ControllerListener_t::handle_GetNextJobIdState(const boost::system::error_code& e, connection_ptr conn)
[c4f43e]462{
463 Info info(__FUNCTION__);
464
465 std::vector<JobId_t> nextids( NumberIds, JobId::IllegalJob);
466 std::generate(nextids.begin(), nextids.end(),
467 boost::bind(&GlobalJobId::getNextId, boost::ref(globalId)));
468 LOG(1, "INFO: Sending next available job ids " << nextids << " to controller ...");
469 conn->async_write(nextids,
470 boost::bind(&FragmentScheduler::ControllerListener_t::handle_SendIds, this,
471 boost::asio::placeholders::error, conn));
472}
473
474/** Controller callback function when free job ids have been sent.
475 *
476 * \param e error code if something went wrong
477 * \param conn reference with the connection
478 */
479void FragmentScheduler::ControllerListener_t::handle_SendIds(const boost::system::error_code& e, connection_ptr conn)
[d1dbfc]480{
481 Info info(__FUNCTION__);
482 // do nothing
[c4f43e]483 LOG(1, "INFO: Ids have been sent.");
[d1dbfc]484}
485
[778abb]486/** Controller callback function when result has been received.
487 *
488 * \param e error code if something went wrong
489 * \param conn reference with the connection
490 */
[8036b7]491void FragmentScheduler::ControllerListener_t::handle_SendResults(const boost::system::error_code& e, connection_ptr conn)
[778abb]492{
493 Info info(__FUNCTION__);
494 // do nothing
495 LOG(1, "INFO: Results have been sent.");
496}
497
[41c1b7]498
499/** Helper function to send a job to worker.
[9a3f84]500 *
501 * Note that we do not set the worker as busy. We simply send it the job.
[41c1b7]502 *
503 * @param address address of worker
504 * @param job job to send
505 */
506void FragmentScheduler::sendJobToWorker(const WorkerAddress &address, FragmentJob::ptr &job)
507{
[9a3f84]508 ASSERT( pool.isWorkerBusy(address),
509 "FragmentScheduler::sendJobToWorker() - Worker "+toString(address)+" is not marked as busy.");
[41c1b7]510 LOG(1, "INFO: Sending job " << job->getId() << " to worker " << address << ".");
[9a6b895]511
512 // create op, sign on, and hand over to queue
513 AsyncOperation *sendJobOp = new SendJobToWorkerOperation(connection,job);
514 OpQueue.push_back(sendJobOp, address);
[41c1b7]515}
516
[2344a3]517/** Helper function to shutdown a single worker.
518 *
519 * We send NoJob to indicate shutdown
520 *
521 * @param address of worker to shutdown
522 */
523void FragmentScheduler::shutdownWorker(const WorkerAddress &address)
524{
[6ea7f4]525 ASSERT( !pool.isWorkerBusy(address),
526 "FragmentScheduler::sendJobToWorker() - Worker "+toString(address)+" is already busy.");
527 LOG(2, "INFO: Shutting down worker " << address << "...");
[ba995d]528 AsyncOperation *shutdownWorkerOp = new ShutdownWorkerOperation(connection);
529 OpQueue.push_back(shutdownWorkerOp, address);
[2344a3]530}
531
532/** Sends shutdown to all current workers in the pool.
533 *
534 */
535void FragmentScheduler::removeAllWorkers()
536{
[6b3a37]537 // first, sign off such that no new jobs are given to workers
538 pool.signOff(this, WorkerPool::WorkerIdle);
[befcf8]539
540 LOG(2, "DEBUG: Waiting for busy workers to finish ...");
[6b3a37]541 while (pool.hasBusyWorkers())
542 ;
543
[befcf8]544 LOG(2, "INFO: Shutting down workers ...");
545 // iterate until there are no more idle workers
546 do {
547 // get list of all idle workers
548 typedef std::vector<std::pair<std::string, std::string> > WorkerList_t;
549 WorkerList_t WorkerList = pool.getListOfIdleWorkers();
550
551 // give all workers shutdown signal
552 for (WorkerList_t::const_iterator iter = WorkerList.begin(); iter != WorkerList.end(); ++iter)
553 shutdownWorker(WorkerAddress(iter->first, iter->second));
554
555 // wait for pending shutdown operations
556 while (!OpQueue.empty())
557 ;
558 } while (pool.presentIdleWorkers());
[6b3a37]559 pool.removeAllWorkers();
[2344a3]560}
561
562/** Helper function to shutdown the server properly.
563 *
564 * \todo one should idle here until all workers have returned from
565 * calculating stuff (or workers need to still listen while the are
566 * calculating which is probably better).
567 *
568 */
569void FragmentScheduler::shutdown()
570{
571 LOG(1, "INFO: Shutting all down ...");
572
573 /// Remove all workers
574 removeAllWorkers();
575
576 /// close the worker listener's socket
577 WorkerListener.closeSocket();
578
579 /// close the controller listener's socket
580 ControllerListener.closeSocket();
581
582 /// finally, stop the io_service
583 io_service.stop();
584}
585
586/** Internal helper to send the next available job to the next idle worker.
587 *
588 */
589void FragmentScheduler::sendAvailableJobToNextIdleWorker()
590{
591 const WorkerAddress address = pool.getNextIdleWorker();
592 FragmentJob::ptr job = JobsQueue.popJob();
593 sendJobToWorker(address, job);
594}
595
596void FragmentScheduler::update(Observable *publisher)
597{
[a40c85]598 ASSERT(0, "FragmentScheduler::update() - we are not signed on for global updates.");
[2344a3]599}
600
601void FragmentScheduler::recieveNotification(Observable *publisher, Notification_ptr notification)
602{
603 if ((publisher == &pool) && (notification->getChannelNo() == WorkerPool::WorkerIdle)) {
[e032b4]604 // we have an idle worker
[2344a3]605 LOG(1, "INFO: We are notified of an idle worker.");
606 // are jobs available?
607 if (JobsQueue.isJobPresent()) {
608 sendAvailableJobToNextIdleWorker();
609 }
[e032b4]610 } else if ((publisher == &JobsQueue) && (notification->getChannelNo() == FragmentQueue::JobAdded)) {
611 // we have new jobs
[2344a3]612 LOG(1, "INFO: We are notified of a new job.");
613 // check for idle workers
614 if (pool.presentIdleWorkers()) {
615 sendAvailableJobToNextIdleWorker();
616 }
[e032b4]617 } else {
618 ASSERT(0, "FragmentScheduler::recieveNotification() - we are not signed on for updates in channel "
619 +toString(notification->getChannelNo())+".");
[2344a3]620 }
621}
622
623void FragmentScheduler::subjectKilled(Observable *publisher)
624{}
Note: See TracBrowser for help on using the repository browser.