source: src/Fragmentation/Automation/FragmentScheduler.cpp@ 469f53

Action_Thermostats Add_AtomRandomPerturbation Add_FitFragmentPartialChargesAction Add_RotateAroundBondAction Add_SelectAtomByNameAction Added_ParseSaveFragmentResults AddingActions_SaveParseParticleParameters Adding_Graph_to_ChangeBondActions Adding_MD_integration_tests Adding_ParticleName_to_Atom Adding_StructOpt_integration_tests AtomFragments Automaking_mpqc_open AutomationFragmentation_failures Candidate_v1.5.4 Candidate_v1.6.0 Candidate_v1.6.1 ChangeBugEmailaddress ChangingTestPorts ChemicalSpaceEvaluator CombiningParticlePotentialParsing Combining_Subpackages Debian_Package_split Debian_package_split_molecuildergui_only Disabling_MemDebug Docu_Python_wait EmpiricalPotential_contain_HomologyGraph EmpiricalPotential_contain_HomologyGraph_documentation Enable_parallel_make_install Enhance_userguide Enhanced_StructuralOptimization Enhanced_StructuralOptimization_continued Example_ManyWaysToTranslateAtom Exclude_Hydrogens_annealWithBondGraph FitPartialCharges_GlobalError Fix_BoundInBox_CenterInBox_MoleculeActions Fix_ChargeSampling_PBC Fix_ChronosMutex Fix_FitPartialCharges Fix_FitPotential_needs_atomicnumbers Fix_ForceAnnealing Fix_IndependentFragmentGrids Fix_ParseParticles Fix_ParseParticles_split_forward_backward_Actions Fix_PopActions Fix_QtFragmentList_sorted_selection Fix_Restrictedkeyset_FragmentMolecule Fix_StatusMsg Fix_StepWorldTime_single_argument Fix_Verbose_Codepatterns Fix_fitting_potentials Fixes ForceAnnealing_goodresults ForceAnnealing_oldresults ForceAnnealing_tocheck ForceAnnealing_with_BondGraph ForceAnnealing_with_BondGraph_continued ForceAnnealing_with_BondGraph_continued_betteresults ForceAnnealing_with_BondGraph_contraction-expansion FragmentAction_writes_AtomFragments FragmentMolecule_checks_bonddegrees GeometryObjects Gui_Fixes Gui_displays_atomic_force_velocity ImplicitCharges IndependentFragmentGrids IndependentFragmentGrids_IndividualZeroInstances IndependentFragmentGrids_IntegrationTest IndependentFragmentGrids_Sole_NN_Calculation JobMarket_RobustOnKillsSegFaults JobMarket_StableWorkerPool JobMarket_unresolvable_hostname_fix MoreRobust_FragmentAutomation ODR_violation_mpqc_open PartialCharges_OrthogonalSummation PdbParser_setsAtomName PythonUI_with_named_parameters QtGui_reactivate_TimeChanged_changes Recreated_GuiChecks Rewrite_FitPartialCharges RotateToPrincipalAxisSystem_UndoRedo SaturateAtoms_findBestMatching SaturateAtoms_singleDegree StoppableMakroAction Subpackage_CodePatterns Subpackage_JobMarket Subpackage_LinearAlgebra Subpackage_levmar Subpackage_mpqc_open Subpackage_vmg Switchable_LogView ThirdParty_MPQC_rebuilt_buildsystem TrajectoryDependenant_MaxOrder TremoloParser_IncreasedPrecision TremoloParser_MultipleTimesteps TremoloParser_setsAtomName Ubuntu_1604_changes stable
Last change on this file since 469f53 was bff93d, checked in by Frederik Heber <heber@…>, 13 years ago

RemoveFromPoolOperation is now async.

  • changed PoolWorker such that we close down socket and listener via success callback handler of the operartion.
  • FragmentScheduler now sends true/false as status of removal.
  • Property mode set to 100644
File size: 21.9 KB
Line 
1/*
2 * Project: MoleCuilder
3 * Description: creates and alters molecular systems
4 * Copyright (C) 2011 University of Bonn. All rights reserved.
5 * Please see the LICENSE file or "Copyright notice" in builder.cpp for details.
6 */
7
8/*
9 * \file FragmentScheduler.cpp
10 *
11 * This file strongly follows the Serialization example from the boost::asio
12 * library (see server.cpp)
13 *
14 * Created on: Oct 19, 2011
15 * Author: heber
16 */
17
18// include config.h
19#ifdef HAVE_CONFIG_H
20#include <config.h>
21#endif
22
23// boost asio needs specific operator new
24#include <boost/asio.hpp>
25
26#include "CodePatterns/MemDebug.hpp"
27
28#include <algorithm>
29#include <boost/bind.hpp>
30#include <boost/lambda/lambda.hpp>
31#include <boost/lexical_cast.hpp>
32#include <iostream>
33#include <vector>
34#include "Connection.hpp" // Must come before boost/serialization headers.
35#include <boost/serialization/vector.hpp>
36#include "CodePatterns/Info.hpp"
37#include "CodePatterns/Log.hpp"
38#include "CodePatterns/Observer/Notification.hpp"
39#include "ControllerChoices.hpp"
40#include "Operations/Servers/SendJobToWorkerOperation.hpp"
41#include "Operations/Workers/EnrollInPoolOperation.hpp"
42#include "Jobs/MPQCCommandJob.hpp"
43#include "Jobs/SystemCommandJob.hpp"
44#include "JobId.hpp"
45
46#include "FragmentScheduler.hpp"
47
48/** Helper function to enforce binding of FragmentWorker to possible derived
49 * FragmentJob classes.
50 */
51void dummyInit() {
52 SystemCommandJob("/bin/false", "something", JobId::IllegalJob);
53 MPQCCommandJob("nofile", JobId::IllegalJob);
54}
55
56/** Constructor of class FragmentScheduler.
57 *
58 * We setup both acceptors to accept connections from workers and Controller.
59 *
60 * \param io_service io_service of the asynchronous communications
61 * \param workerport port to listen for worker connections
62 * \param controllerport port to listen for controller connections.
63 */
64FragmentScheduler::FragmentScheduler(boost::asio::io_service& _io_service, unsigned short workerport, unsigned short controllerport) :
65 Observer("FragmentScheduler"),
66 io_service(_io_service),
67 WorkerListener(_io_service, workerport, JobsQueue, pool,
68 boost::bind(&FragmentScheduler::sendJobToWorker, boost::ref(*this), _1, _2)),
69 ControllerListener(_io_service, controllerport, JobsQueue,
70 boost::bind(&FragmentScheduler::removeAllWorkers, boost::ref(*this)),
71 boost::bind(&FragmentScheduler::shutdown, boost::ref(*this))),
72 connection(_io_service)
73{
74 Info info(__FUNCTION__);
75
76 // sign on to idle workers and present jobs
77 pool.signOn(this, WorkerPool::WorkerIdle);
78 JobsQueue.signOn(this, FragmentQueue::JobAdded);
79
80 // listen for controller
81 ControllerListener.initiateSocket();
82
83 // listen for workers
84 WorkerListener.initiateSocket();
85}
86
87FragmentScheduler::~FragmentScheduler()
88{
89 // sign off
90 pool.signOff(this, WorkerPool::WorkerIdle);
91 JobsQueue.signOff(this, FragmentQueue::JobAdded);
92}
93
94/** Handle a new worker connection.
95 *
96 * We store the given address in the pool.
97 *
98 * \param e error code if something went wrong
99 * \param conn reference with the connection
100 */
101void FragmentScheduler::WorkerListener_t::handle_Accept(const boost::system::error_code& e, connection_ptr conn)
102{
103 Info info(__FUNCTION__);
104 if (!e)
105 {
106 // Successfully accepted a new connection.
107 // read address
108 conn->async_read(address,
109 boost::bind(&FragmentScheduler::WorkerListener_t::handle_ReadAddress, this,
110 boost::asio::placeholders::error, conn));
111 }
112 else
113 {
114 // An error occurred. Log it and return. Since we are not starting a new
115 // accept operation the io_service will run out of work to do and the
116 // server will exit.
117 Exitflag = ErrorFlag;
118 ELOG(0, e.message());
119 }
120}
121
122/** Handle having received Worker's address
123 *
124 * \param e error code if something went wrong
125 * \param conn reference with the connection
126 */
127void FragmentScheduler::WorkerListener_t::handle_ReadAddress(const boost::system::error_code& e, connection_ptr conn)
128{
129 Info info(__FUNCTION__);
130 if (!e)
131 {
132 // Successfully accepted a new connection.
133 // read address
134 conn->async_read(choice,
135 boost::bind(&FragmentScheduler::WorkerListener_t::handle_ReadChoice, this,
136 boost::asio::placeholders::error, conn));
137 }
138 else
139 {
140 // An error occurred. Log it and return. Since we are not starting a new
141 // accept operation the io_service will run out of work to do and the
142 // server will exit.
143 Exitflag = ErrorFlag;
144 ELOG(0, e.message());
145 }
146}
147
148/** Controller callback function to read the choice for next operation.
149 *
150 * \param e error code if something went wrong
151 * \param conn reference with the connection
152 */
153void FragmentScheduler::WorkerListener_t::handle_ReadChoice(const boost::system::error_code& e, connection_ptr conn)
154{
155 Info info(__FUNCTION__);
156 if (!e)
157 {
158 LOG(1, "INFO: Received request for operation " << choice << ".");
159 // switch over the desired choice read previously
160 switch(choice) {
161 case NoWorkerOperation:
162 {
163 ELOG(1, "WorkerListener_t::handle_ReadChoice() - called with NoOperation.");
164 break;
165 }
166 case EnrollInPool:
167 {
168 if (pool.presentInPool(address)) {
169 ELOG(1, "INFO: worker "+toString(address)+" is already contained in pool.");
170 conn->async_write(false,
171 boost::bind(&FragmentScheduler::WorkerListener_t::handle_enrolled, this,
172 boost::asio::placeholders::error, conn));
173 } else {
174 // insert as its new worker
175 LOG(1, "INFO: Adding " << address << " to pool ...");
176 pool.addWorker(address);
177 conn->async_write(true,
178 boost::bind(&FragmentScheduler::WorkerListener_t::handle_enrolled, this,
179 boost::asio::placeholders::error, conn));
180 break;
181 }
182 case SendResult:
183 {
184 if (pool.presentInPool(address)) {
185 // check whether its priority is busy_priority
186 if (pool.isWorkerBusy(address)) {
187 conn->async_read(result,
188 boost::bind(&FragmentScheduler::WorkerListener_t::handle_ReceiveResultFromWorker, this,
189 boost::asio::placeholders::error, conn));
190 } else {
191 ELOG(1, "Worker " << address << " trying to send result who is not marked as busy.");
192 conn->async_read(result,
193 boost::bind(&FragmentScheduler::WorkerListener_t::handle_RejectResultFromWorker, this,
194 boost::asio::placeholders::error, conn));
195 }
196 } else {
197 ELOG(1, "Worker " << address << " trying to send result who is not in pool.");
198 conn->async_read(result,
199 boost::bind(&FragmentScheduler::WorkerListener_t::handle_RejectResultFromWorker, this,
200 boost::asio::placeholders::error, conn));
201 }
202 break;
203 }
204 case RemoveFromPool:
205 {
206 if (pool.presentInPool(address)) {
207 conn->async_write(true,
208 boost::bind(&FragmentScheduler::WorkerListener_t::handle_removed, this,
209 boost::asio::placeholders::error, conn));
210 } else {
211 ELOG(1, "Shutting down Worker " << address << " not contained in pool.");
212 conn->async_write(false,
213 boost::bind(&FragmentScheduler::WorkerListener_t::handle_removed, this,
214 boost::asio::placeholders::error, conn));
215 }
216 break;
217 }
218 default:
219 Exitflag = ErrorFlag;
220 ELOG(1, "WorkerListener_t::handle_ReadChoice() - called with no valid choice.");
221 break;
222 }
223 }
224 // restore NoOperation choice such that choice is not read twice
225 choice = NoWorkerOperation;
226
227 initiateSocket();
228 }
229 else
230 {
231 // An error occurred. Log it and return. Since we are not starting a new
232 // accept operation the io_service will run out of work to do and the
233 // server will exit.
234 Exitflag = ErrorFlag;
235 ELOG(0, e.message());
236 }
237}
238
239
240/** Callback function when new worker has enrolled.
241 *
242 * \param e error code if something went wrong
243 * \param conn reference with the connection
244 */
245void FragmentScheduler::WorkerListener_t::handle_enrolled(const boost::system::error_code& e, connection_ptr conn)
246{
247 Info info(__FUNCTION__);
248 if (!e) {
249 LOG(2, "DEBUG: Successfully enrolled.");
250 LOG(1, "INFO: There are " << pool.getNoTotalWorkers() << " workers in the queue, "
251 << pool.getNoIdleWorkers() << " of which are idle.");
252 } else {
253 // An error occurred. Log it and return. Since we are not starting a new
254 // accept operation the io_service will run out of work to do and the
255 // server will exit.
256 Exitflag = ErrorFlag;
257 ELOG(0, e.message());
258 }
259}
260
261/** Callback function when new worker has enrolled.
262 *
263 * \param e error code if something went wrong
264 * \param conn reference with the connection
265 */
266void FragmentScheduler::WorkerListener_t::handle_removed(const boost::system::error_code& e, connection_ptr conn)
267{
268 Info info(__FUNCTION__);
269 if (!e) {
270 // removing present worker
271 pool.removeWorker(address);
272 } else {
273 // An error occurred. Log it and return. Since we are not starting a new
274 // accept operation the io_service will run out of work to do and the
275 // server will exit.
276 Exitflag = ErrorFlag;
277 ELOG(0, e.message());
278 }
279}
280
281/** Callback function when result has been received.
282 *
283 * \param e error code if something went wrong
284 * \param conn reference with the connection
285 */
286void FragmentScheduler::WorkerListener_t::handle_ReceiveResultFromWorker(const boost::system::error_code& e, connection_ptr conn)
287{
288 Info info(__FUNCTION__);
289 LOG(1, "INFO: Received result for job #" << result->getId() << " ...");
290
291 // and push into queue
292 ASSERT(result->getId() != (JobId_t)JobId::NoJob,
293 "WorkerListener_t::handle_ReceiveResultFromWorker() - result received has NoJob id.");
294 ASSERT(result->getId() != (JobId_t)JobId::IllegalJob,
295 "WorkerListener_t::handle_ReceiveResultFromWorker() - result received has IllegalJob id.");
296 // place id into expected
297 if ((result->getId() != (JobId_t)JobId::NoJob) && (result->getId() != (JobId_t)JobId::IllegalJob))
298 JobsQueue.pushResult(result);
299
300 // mark as idle
301 pool.unmarkWorkerBusy(address);
302 LOG(1, "INFO: There are " << pool.getNoTotalWorkers() << " workers in the queue, "
303 << pool.getNoIdleWorkers() << " of which are idle.");
304
305 // erase result
306 result.reset();
307 LOG(1, "INFO: JobsQueue has " << JobsQueue.getDoneJobs() << " results.");
308}
309
310/** Callback function when result has been received.
311 *
312 * \param e error code if something went wrong
313 * \param conn reference with the connection
314 */
315void FragmentScheduler::WorkerListener_t::handle_RejectResultFromWorker(const boost::system::error_code& e, connection_ptr conn)
316{
317 Info info(__FUNCTION__);
318 // nothing to do
319 LOG(1, "INFO: Rejecting result for job #" << result->getId() << ", placing back into queue.");
320
321 JobsQueue.resubmitJob(result->getId());
322
323 LOG(1, "INFO: JobsQueue has " << JobsQueue.getDoneJobs() << " results.");
324}
325
326
327/** Handle a new controller connection.
328 *
329 * \sa handle_ReceiveJobs()
330 * \sa handle_CheckResultState()
331 * \sa handle_SendResults()
332 *
333 * \param e error code if something went wrong
334 * \param conn reference with the connection
335 */
336void FragmentScheduler::ControllerListener_t::handle_Accept(const boost::system::error_code& e, connection_ptr conn)
337{
338 Info info(__FUNCTION__);
339 if (!e)
340 {
341 conn->async_read(choice,
342 boost::bind(&FragmentScheduler::ControllerListener_t::handle_ReadChoice, this,
343 boost::asio::placeholders::error, conn));
344 }
345 else
346 {
347 // An error occurred. Log it and return. Since we are not starting a new
348 // accept operation the io_service will run out of work to do and the
349 // server will exit.
350 Exitflag = ErrorFlag;
351 ELOG(0, e.message());
352 }
353}
354
355/** Controller callback function to read the choice for next operation.
356 *
357 * \param e error code if something went wrong
358 * \param conn reference with the connection
359 */
360void FragmentScheduler::ControllerListener_t::handle_ReadChoice(const boost::system::error_code& e, connection_ptr conn)
361{
362 Info info(__FUNCTION__);
363 if (!e)
364 {
365 bool LaunchNewAcceptor = true;
366 LOG(1, "INFO: Received request for operation " << choice << ".");
367 // switch over the desired choice read previously
368 switch(choice) {
369 case NoControllerOperation:
370 {
371 ELOG(1, "ControllerListener_t::handle_ReadChoice() - called with NoOperation.");
372 break;
373 }
374 case GetNextJobId:
375 {
376 LOG(1, "INFO: Receiving number of desired job ids from controller ...");
377 conn->async_read(NumberIds,
378 boost::bind(&FragmentScheduler::ControllerListener_t::handle_GetNextJobIdState, this,
379 boost::asio::placeholders::error, conn));
380 break;
381 }
382 case SendJobs:
383 {
384 // The connection::async_write() function will automatically
385 // serialize the data structure for us.
386 LOG(1, "INFO: Receiving bunch of jobs from a controller ...");
387 conn->async_read(jobs,
388 boost::bind(&FragmentScheduler::ControllerListener_t::handle_ReceiveJobs, this,
389 boost::asio::placeholders::error, conn));
390 break;
391 }
392 case CheckState:
393 {
394 // first update number
395 jobInfo[0] = JobsQueue.getPresentJobs();
396 jobInfo[1] = JobsQueue.getDoneJobs();
397 // now we accept connections to check for state of calculations
398 LOG(1, "INFO: Sending state that "+toString(jobInfo[0])+" jobs are present and "+toString(jobInfo[1])+" jobs are done to controller ...");
399 conn->async_write(jobInfo,
400 boost::bind(&FragmentScheduler::ControllerListener_t::handle_CheckResultState, this,
401 boost::asio::placeholders::error, conn));
402 break;
403 }
404 case RemoveAll:
405 {
406 removeallWorkers();
407 break;
408 }
409 case ReceiveResults:
410 {
411 const std::vector<FragmentResult::ptr> results = JobsQueue.getAllResults();
412 // ... or we give the results
413 LOG(1, "INFO: Sending "+toString(results.size())+" results to controller ...");
414 conn->async_write(results,
415 boost::bind(&FragmentScheduler::ControllerListener_t::handle_SendResults, this,
416 boost::asio::placeholders::error, conn));
417 break;
418 }
419 case ShutdownControllerSocket:
420 {
421 LOG(1, "INFO: Received shutdown from controller ...");
422 // only allow for shutdown when there are no more jobs in the queue
423 if (!JobsQueue.isJobPresent()) {
424 // we shutdown? Hence, also shutdown controller
425 LaunchNewAcceptor = !shutdownAllSockets();
426 } else {
427 ELOG(2, "There are still jobs waiting in the queue.");
428 }
429 break;
430 }
431 default:
432 Exitflag = ErrorFlag;
433 ELOG(1, "ControllerListener_t::handle_ReadChoice() - called with no valid choice.");
434 break;
435 }
436 // restore NoControllerOperation choice such that choice is not read twice
437 choice = NoControllerOperation;
438
439 if (LaunchNewAcceptor) {
440 LOG(1, "Launching new acceptor on socket.");
441 // Start an accept operation for a new Connection.
442 initiateSocket();
443 }
444 }
445 else
446 {
447 // An error occurred. Log it and return. Since we are not starting a new
448 // accept operation the io_service will run out of work to do and the
449 // server will exit.
450 Exitflag = ErrorFlag;
451 ELOG(0, e.message());
452 }
453}
454
455/** Controller callback function when job has been sent.
456 *
457 * We check here whether the worker socket is accepting, if there
458 * have been no jobs we re-activate it, as it is shut down after
459 * last job.
460 *
461 * \param e error code if something went wrong
462 * \param conn reference with the connection
463 */
464void FragmentScheduler::ControllerListener_t::handle_ReceiveJobs(const boost::system::error_code& e, connection_ptr conn)
465{
466 Info info(__FUNCTION__);
467 // jobs are received, hence place in JobsQueue
468 if (!jobs.empty()) {
469 LOG(1, "INFO: Pushing " << jobs.size() << " jobs into queue.");
470 JobsQueue.pushJobs(jobs);
471 }
472 jobs.clear();
473}
474
475/** Controller callback function when checking on state of results.
476 *
477 * \param e error code if something went wrong
478 * \param conn reference with the connection
479 */
480void FragmentScheduler::ControllerListener_t::handle_CheckResultState(const boost::system::error_code& e, connection_ptr conn)
481{
482 Info info(__FUNCTION__);
483 // do nothing
484 LOG(1, "INFO: Sent that " << jobInfo << " jobs are (scheduled, done).");
485}
486
487/** Controller callback function when checking on state of results.
488 *
489 * \param e error code if something went wrong
490 * \param conn reference with the connection
491 */
492void FragmentScheduler::ControllerListener_t::handle_GetNextJobIdState(const boost::system::error_code& e, connection_ptr conn)
493{
494 Info info(__FUNCTION__);
495
496 std::vector<JobId_t> nextids( NumberIds, JobId::IllegalJob);
497 std::generate(nextids.begin(), nextids.end(),
498 boost::bind(&GlobalJobId::getNextId, boost::ref(globalId)));
499 LOG(1, "INFO: Sending next available job ids " << nextids << " to controller ...");
500 conn->async_write(nextids,
501 boost::bind(&FragmentScheduler::ControllerListener_t::handle_SendIds, this,
502 boost::asio::placeholders::error, conn));
503}
504
505/** Controller callback function when free job ids have been sent.
506 *
507 * \param e error code if something went wrong
508 * \param conn reference with the connection
509 */
510void FragmentScheduler::ControllerListener_t::handle_SendIds(const boost::system::error_code& e, connection_ptr conn)
511{
512 Info info(__FUNCTION__);
513 // do nothing
514 LOG(1, "INFO: Ids have been sent.");
515}
516
517/** Controller callback function when result has been received.
518 *
519 * \param e error code if something went wrong
520 * \param conn reference with the connection
521 */
522void FragmentScheduler::ControllerListener_t::handle_SendResults(const boost::system::error_code& e, connection_ptr conn)
523{
524 Info info(__FUNCTION__);
525 // do nothing
526 LOG(1, "INFO: Results have been sent.");
527}
528
529
530/** Helper function to send a job to worker.
531 *
532 * Note that we do not set the worker as busy. We simply send it the job.
533 *
534 * @param address address of worker
535 * @param job job to send
536 */
537void FragmentScheduler::sendJobToWorker(const WorkerAddress &address, FragmentJob::ptr &job)
538{
539 ASSERT( pool.isWorkerBusy(address),
540 "FragmentScheduler::sendJobToWorker() - Worker "+toString(address)+" is not marked as busy.");
541 LOG(1, "INFO: Sending job " << job->getId() << " to worker " << address << ".");
542
543 // create op, sign on, and hand over to queue
544 AsyncOperation *sendJobOp = new SendJobToWorkerOperation(connection,job);
545 OpQueue.push_back(sendJobOp, address);
546}
547
548/** Helper function to shutdown a single worker.
549 *
550 * We send NoJob to indicate shutdown
551 *
552 * @param address of worker to shutdown
553 */
554void FragmentScheduler::shutdownWorker(const WorkerAddress &address)
555{
556 ASSERT( !pool.isWorkerBusy(address),
557 "FragmentScheduler::sendJobToWorker() - Worker "+toString(address)+" is already busy.");
558 LOG(2, "INFO: Shutting down worker " << address << "...");
559 AsyncOperation *shutdownWorkerOp = new ShutdownWorkerOperation(connection);
560 OpQueue.push_back(shutdownWorkerOp, address);
561}
562
563/** Sends shutdown to all current workers in the pool.
564 *
565 */
566void FragmentScheduler::removeAllWorkers()
567{
568 // first, sign off such that no new jobs are given to workers
569 pool.signOff(this, WorkerPool::WorkerIdle);
570
571 LOG(2, "DEBUG: Waiting for busy workers to finish ...");
572 while (pool.hasBusyWorkers())
573 ;
574
575 LOG(2, "INFO: Shutting down workers ...");
576 // iterate until there are no more idle workers
577 // get list of all idle workers
578 typedef std::vector<std::pair<std::string, std::string> > WorkerList_t;
579 WorkerList_t WorkerList = pool.getListOfIdleWorkers();
580
581 // give all workers shutdown signal
582 for (WorkerList_t::const_iterator iter = WorkerList.begin(); iter != WorkerList.end(); ++iter)
583 shutdownWorker(WorkerAddress(iter->first, iter->second));
584}
585
586/** Function to shutdown server properly, e.g. for use as signal handler.
587 *
588 * @param sig signal number
589 */
590void FragmentScheduler::shutdown(int sig)
591{
592 LOG(0, "STATUS: Shutting down due to signal " << sig << ".");
593
594 if (!pool.presentIdleWorkers() && !pool.hasBusyWorkers()) {
595 shutdown();
596 } else {
597 removeAllWorkers();
598 }
599}
600
601/** Helper function to shutdown the server properly.
602 *
603 * \todo one should idle here until all workers have returned from
604 * calculating stuff (or workers need to still listen while they are
605 * calculating which is probably better).
606 *
607 * \note We only shutdown when there are no workers left
608 *
609 * @return true - doing shutdown, false - precondition not met, not shutting down
610 */
611bool FragmentScheduler::shutdown()
612{
613 if (!pool.presentIdleWorkers() && !pool.hasBusyWorkers()) {
614 LOG(1, "INFO: Shutting all down ...");
615
616 /// close the worker listener's socket
617 WorkerListener.closeSocket();
618
619 /// close the controller listener's socket
620 ControllerListener.closeSocket();
621
622 /// finally, stop the io_service
623 io_service.stop();
624 return true;
625 } else {
626 ELOG(2, "There are still idle or busy workers present.");
627 return false;
628 }
629}
630
631/** Internal helper to send the next available job to the next idle worker.
632 *
633 */
634void FragmentScheduler::sendAvailableJobToNextIdleWorker()
635{
636 const WorkerAddress address = pool.getNextIdleWorker();
637 FragmentJob::ptr job = JobsQueue.popJob();
638 sendJobToWorker(address, job);
639}
640
641void FragmentScheduler::update(Observable *publisher)
642{
643 ASSERT(0, "FragmentScheduler::update() - we are not signed on for global updates.");
644}
645
646void FragmentScheduler::recieveNotification(Observable *publisher, Notification_ptr notification)
647{
648 if ((publisher == &pool) && (notification->getChannelNo() == WorkerPool::WorkerIdle)) {
649 // we have an idle worker
650 LOG(1, "INFO: We are notified of an idle worker.");
651 // are jobs available?
652 if (JobsQueue.isJobPresent()) {
653 sendAvailableJobToNextIdleWorker();
654 }
655 } else if ((publisher == &JobsQueue) && (notification->getChannelNo() == FragmentQueue::JobAdded)) {
656 // we have new jobs
657 LOG(1, "INFO: We are notified of a new job.");
658 // check for idle workers
659 if (pool.presentIdleWorkers()) {
660 sendAvailableJobToNextIdleWorker();
661 }
662 } else {
663 ASSERT(0, "FragmentScheduler::recieveNotification() - we are not signed on for updates in channel "
664 +toString(notification->getChannelNo())+".");
665 }
666}
667
668void FragmentScheduler::subjectKilled(Observable *publisher)
669{}
Note: See TracBrowser for help on using the repository browser.