source: src/Fragmentation/Automation/FragmentScheduler.cpp@ a40c85

Action_Thermostats Add_AtomRandomPerturbation Add_FitFragmentPartialChargesAction Add_RotateAroundBondAction Add_SelectAtomByNameAction Added_ParseSaveFragmentResults AddingActions_SaveParseParticleParameters Adding_Graph_to_ChangeBondActions Adding_MD_integration_tests Adding_ParticleName_to_Atom Adding_StructOpt_integration_tests AtomFragments Automaking_mpqc_open AutomationFragmentation_failures Candidate_v1.5.4 Candidate_v1.6.0 Candidate_v1.6.1 ChangeBugEmailaddress ChangingTestPorts ChemicalSpaceEvaluator CombiningParticlePotentialParsing Combining_Subpackages Debian_Package_split Debian_package_split_molecuildergui_only Disabling_MemDebug Docu_Python_wait EmpiricalPotential_contain_HomologyGraph EmpiricalPotential_contain_HomologyGraph_documentation Enable_parallel_make_install Enhance_userguide Enhanced_StructuralOptimization Enhanced_StructuralOptimization_continued Example_ManyWaysToTranslateAtom Exclude_Hydrogens_annealWithBondGraph FitPartialCharges_GlobalError Fix_BoundInBox_CenterInBox_MoleculeActions Fix_ChargeSampling_PBC Fix_ChronosMutex Fix_FitPartialCharges Fix_FitPotential_needs_atomicnumbers Fix_ForceAnnealing Fix_IndependentFragmentGrids Fix_ParseParticles Fix_ParseParticles_split_forward_backward_Actions Fix_PopActions Fix_QtFragmentList_sorted_selection Fix_Restrictedkeyset_FragmentMolecule Fix_StatusMsg Fix_StepWorldTime_single_argument Fix_Verbose_Codepatterns Fix_fitting_potentials Fixes ForceAnnealing_goodresults ForceAnnealing_oldresults ForceAnnealing_tocheck ForceAnnealing_with_BondGraph ForceAnnealing_with_BondGraph_continued ForceAnnealing_with_BondGraph_continued_betteresults ForceAnnealing_with_BondGraph_contraction-expansion FragmentAction_writes_AtomFragments FragmentMolecule_checks_bonddegrees GeometryObjects Gui_Fixes Gui_displays_atomic_force_velocity ImplicitCharges IndependentFragmentGrids IndependentFragmentGrids_IndividualZeroInstances IndependentFragmentGrids_IntegrationTest IndependentFragmentGrids_Sole_NN_Calculation JobMarket_RobustOnKillsSegFaults JobMarket_StableWorkerPool JobMarket_unresolvable_hostname_fix MoreRobust_FragmentAutomation ODR_violation_mpqc_open PartialCharges_OrthogonalSummation PdbParser_setsAtomName PythonUI_with_named_parameters QtGui_reactivate_TimeChanged_changes Recreated_GuiChecks Rewrite_FitPartialCharges RotateToPrincipalAxisSystem_UndoRedo SaturateAtoms_findBestMatching SaturateAtoms_singleDegree StoppableMakroAction Subpackage_CodePatterns Subpackage_JobMarket Subpackage_LinearAlgebra Subpackage_levmar Subpackage_mpqc_open Subpackage_vmg Switchable_LogView ThirdParty_MPQC_rebuilt_buildsystem TrajectoryDependenant_MaxOrder TremoloParser_IncreasedPrecision TremoloParser_MultipleTimesteps TremoloParser_setsAtomName Ubuntu_1604_changes stable
Last change on this file since a40c85 was a40c85, checked in by Frederik Heber <heber@…>, 12 years ago

FragmentScheduler is relieved of observering Operations, is done by OperationQueue.

  • i.e. OperationQueue is now an Observer and uses push_back() and remove() to sign on and off from the given operations.
  • added unit test function to check this behavior.
  • Property mode set to 100644
File size: 19.9 KB
Line 
1/*
2 * Project: MoleCuilder
3 * Description: creates and alters molecular systems
4 * Copyright (C) 2011 University of Bonn. All rights reserved.
5 * Please see the LICENSE file or "Copyright notice" in builder.cpp for details.
6 */
7
8/*
9 * \file FragmentScheduler.cpp
10 *
11 * This file strongly follows the Serialization example from the boost::asio
12 * library (see server.cpp)
13 *
14 * Created on: Oct 19, 2011
15 * Author: heber
16 */
17
18// include config.h
19#ifdef HAVE_CONFIG_H
20#include <config.h>
21#endif
22
23// boost asio needs specific operator new
24#include <boost/asio.hpp>
25
26#include "CodePatterns/MemDebug.hpp"
27
28#include <algorithm>
29#include <boost/bind.hpp>
30#include <boost/lambda/lambda.hpp>
31#include <boost/lexical_cast.hpp>
32#include <iostream>
33#include <vector>
34#include "Connection.hpp" // Must come before boost/serialization headers.
35#include <boost/serialization/vector.hpp>
36#include "CodePatterns/Info.hpp"
37#include "CodePatterns/Log.hpp"
38#include "CodePatterns/Observer/Notification.hpp"
39#include "ControllerChoices.hpp"
40#include "Operations/Servers/SendJobToWorkerOperation.hpp"
41#include "Operations/Workers/EnrollInPoolOperation.hpp"
42#include "Jobs/MPQCCommandJob.hpp"
43#include "Jobs/SystemCommandJob.hpp"
44#include "JobId.hpp"
45
46#include "FragmentScheduler.hpp"
47
48/** Helper function to enforce binding of FragmentWorker to possible derived
49 * FragmentJob classes.
50 */
51void dummyInit() {
52 SystemCommandJob("/bin/false", "something", JobId::IllegalJob);
53 MPQCCommandJob("nofile", JobId::IllegalJob);
54}
55
56/** Constructor of class FragmentScheduler.
57 *
58 * We setup both acceptors to accept connections from workers and Controller.
59 *
60 * \param io_service io_service of the asynchronous communications
61 * \param workerport port to listen for worker connections
62 * \param controllerport port to listen for controller connections.
63 */
64FragmentScheduler::FragmentScheduler(boost::asio::io_service& _io_service, unsigned short workerport, unsigned short controllerport) :
65 Observer("FragmentScheduler"),
66 io_service(_io_service),
67 WorkerListener(_io_service, workerport, JobsQueue, pool,
68 boost::bind(&FragmentScheduler::sendJobToWorker, boost::ref(*this), _1, _2)),
69 ControllerListener(_io_service, controllerport, JobsQueue,
70 boost::bind(&FragmentScheduler::shutdown, boost::ref(*this))),
71 connection(_io_service),
72 shutdownWorkerOp(connection)
73{
74 Info info(__FUNCTION__);
75
76 // sign on to idle workers and present jobs
77 pool.signOn(this, WorkerPool::WorkerIdle);
78 JobsQueue.signOn(this, FragmentQueue::JobAdded);
79
80 // listen for controller
81 ControllerListener.initiateSocket();
82
83 // listen for workers
84 WorkerListener.initiateSocket();
85}
86
87FragmentScheduler::~FragmentScheduler()
88{
89 // sign off
90 pool.signOff(this, WorkerPool::WorkerIdle);
91 JobsQueue.signOff(this, FragmentQueue::JobAdded);
92}
93
94/** Handle a new worker connection.
95 *
96 * We store the given address in the pool.
97 *
98 * \param e error code if something went wrong
99 * \param conn reference with the connection
100 */
101void FragmentScheduler::WorkerListener_t::handle_Accept(const boost::system::error_code& e, connection_ptr conn)
102{
103 Info info(__FUNCTION__);
104 if (!e)
105 {
106 // Successfully accepted a new connection.
107 // read address
108 conn->async_read(address,
109 boost::bind(&FragmentScheduler::WorkerListener_t::handle_ReadAddress, this,
110 boost::asio::placeholders::error, conn));
111 }
112 else
113 {
114 // An error occurred. Log it and return. Since we are not starting a new
115 // accept operation the io_service will run out of work to do and the
116 // server will exit.
117 Exitflag = ErrorFlag;
118 ELOG(0, e.message());
119 }
120}
121
122/** Handle having received Worker's address
123 *
124 * \param e error code if something went wrong
125 * \param conn reference with the connection
126 */
127void FragmentScheduler::WorkerListener_t::handle_ReadAddress(const boost::system::error_code& e, connection_ptr conn)
128{
129 Info info(__FUNCTION__);
130 if (!e)
131 {
132 // Successfully accepted a new connection.
133 // read address
134 conn->async_read(choice,
135 boost::bind(&FragmentScheduler::WorkerListener_t::handle_ReadChoice, this,
136 boost::asio::placeholders::error, conn));
137 }
138 else
139 {
140 // An error occurred. Log it and return. Since we are not starting a new
141 // accept operation the io_service will run out of work to do and the
142 // server will exit.
143 Exitflag = ErrorFlag;
144 ELOG(0, e.message());
145 }
146}
147
148/** Controller callback function to read the choice for next operation.
149 *
150 * \param e error code if something went wrong
151 * \param conn reference with the connection
152 */
153void FragmentScheduler::WorkerListener_t::handle_ReadChoice(const boost::system::error_code& e, connection_ptr conn)
154{
155 Info info(__FUNCTION__);
156 if (!e)
157 {
158 LOG(1, "INFO: Received request for operation " << choice << ".");
159 // switch over the desired choice read previously
160 switch(choice) {
161 case NoWorkerOperation:
162 {
163 ELOG(1, "WorkerListener_t::handle_ReadChoice() - called with NoOperation.");
164 break;
165 }
166 case EnrollInPool:
167 {
168 if (pool.presentInPool(address)) {
169 ELOG(1, "INFO: worker "+toString(address)+" is already contained in pool.");
170 enum EnrollInPoolOperation::EnrollFlag flag = EnrollInPoolOperation::Fail;
171 conn->async_write(flag,
172 boost::bind(&FragmentScheduler::WorkerListener_t::handle_enrolled, this,
173 boost::asio::placeholders::error, conn));
174 } else {
175 // insert as its new worker
176 LOG(1, "INFO: Adding " << address << " to pool ...");
177 pool.addWorker(address);
178 enum EnrollInPoolOperation::EnrollFlag flag = EnrollInPoolOperation::Success;
179 conn->async_write(flag,
180 boost::bind(&FragmentScheduler::WorkerListener_t::handle_enrolled, this,
181 boost::asio::placeholders::error, conn));
182 break;
183 }
184 case SendResult:
185 {
186 if (pool.presentInPool(address)) {
187 // check whether its priority is busy_priority
188 if (pool.isWorkerBusy(address)) {
189 conn->async_read(result,
190 boost::bind(&FragmentScheduler::WorkerListener_t::handle_ReceiveResultFromWorker, this,
191 boost::asio::placeholders::error, conn));
192 } else {
193 ELOG(1, "Worker " << address << " trying to send result who is not marked as busy.");
194 conn->async_read(result,
195 boost::bind(&FragmentScheduler::WorkerListener_t::handle_RejectResultFromWorker, this,
196 boost::asio::placeholders::error, conn));
197 }
198 } else {
199 ELOG(1, "Worker " << address << " trying to send result who is not in pool.");
200 conn->async_read(result,
201 boost::bind(&FragmentScheduler::WorkerListener_t::handle_RejectResultFromWorker, this,
202 boost::asio::placeholders::error, conn));
203 }
204 break;
205 }
206 case RemoveFromPool:
207 {
208 if (pool.presentInPool(address)) {
209 // removing present worker
210 pool.removeWorker(address);
211 } else {
212 ELOG(1, "Shutting down Worker " << address << " not contained in pool.");
213 }
214 break;
215 }
216 default:
217 Exitflag = ErrorFlag;
218 ELOG(1, "WorkerListener_t::handle_ReadChoice() - called with no valid choice.");
219 break;
220 }
221 }
222 // restore NoOperation choice such that choice is not read twice
223 choice = NoWorkerOperation;
224
225 initiateSocket();
226 }
227 else
228 {
229 // An error occurred. Log it and return. Since we are not starting a new
230 // accept operation the io_service will run out of work to do and the
231 // server will exit.
232 Exitflag = ErrorFlag;
233 ELOG(0, e.message());
234 }
235}
236
237
238/** Callback function when new worker has enrolled.
239 *
240 * \param e error code if something went wrong
241 * \param conn reference with the connection
242 */
243void FragmentScheduler::WorkerListener_t::handle_enrolled(const boost::system::error_code& e, connection_ptr conn)
244{
245 Info info(__FUNCTION__);
246 if (e)
247 {
248 // An error occurred. Log it and return. Since we are not starting a new
249 // accept operation the io_service will run out of work to do and the
250 // server will exit.
251 Exitflag = ErrorFlag;
252 ELOG(0, e.message());
253 }
254}
255
256/** Callback function when result has been received.
257 *
258 * \param e error code if something went wrong
259 * \param conn reference with the connection
260 */
261void FragmentScheduler::WorkerListener_t::handle_ReceiveResultFromWorker(const boost::system::error_code& e, connection_ptr conn)
262{
263 Info info(__FUNCTION__);
264 LOG(1, "INFO: Received result for job #" << result->getId() << " ...");
265
266 // and push into queue
267 ASSERT(result->getId() != (JobId_t)JobId::NoJob,
268 "WorkerListener_t::handle_ReceiveResultFromWorker() - result received has NoJob id.");
269 ASSERT(result->getId() != (JobId_t)JobId::IllegalJob,
270 "WorkerListener_t::handle_ReceiveResultFromWorker() - result received has IllegalJob id.");
271 // place id into expected
272 if ((result->getId() != (JobId_t)JobId::NoJob) && (result->getId() != (JobId_t)JobId::IllegalJob))
273 JobsQueue.pushResult(result);
274
275 // mark as idle
276 pool.unmarkWorkerBusy(address);
277
278 // erase result
279 result.reset();
280 LOG(1, "INFO: JobsQueue has " << JobsQueue.getDoneJobs() << " results.");
281}
282
283/** Callback function when result has been received.
284 *
285 * \param e error code if something went wrong
286 * \param conn reference with the connection
287 */
288void FragmentScheduler::WorkerListener_t::handle_RejectResultFromWorker(const boost::system::error_code& e, connection_ptr conn)
289{
290 Info info(__FUNCTION__);
291 // nothing to do
292 LOG(1, "INFO: Rejecting result for job #" << result->getId() << ", placing back into queue.");
293
294 JobsQueue.resubmitJob(result->getId());
295
296 LOG(1, "INFO: JobsQueue has " << JobsQueue.getDoneJobs() << " results.");
297}
298
299
300/** Handle a new controller connection.
301 *
302 * \sa handle_ReceiveJobs()
303 * \sa handle_CheckResultState()
304 * \sa handle_SendResults()
305 *
306 * \param e error code if something went wrong
307 * \param conn reference with the connection
308 */
309void FragmentScheduler::ControllerListener_t::handle_Accept(const boost::system::error_code& e, connection_ptr conn)
310{
311 Info info(__FUNCTION__);
312 if (!e)
313 {
314 conn->async_read(choice,
315 boost::bind(&FragmentScheduler::ControllerListener_t::handle_ReadChoice, this,
316 boost::asio::placeholders::error, conn));
317 }
318 else
319 {
320 // An error occurred. Log it and return. Since we are not starting a new
321 // accept operation the io_service will run out of work to do and the
322 // server will exit.
323 Exitflag = ErrorFlag;
324 ELOG(0, e.message());
325 }
326}
327
328/** Controller callback function to read the choice for next operation.
329 *
330 * \param e error code if something went wrong
331 * \param conn reference with the connection
332 */
333void FragmentScheduler::ControllerListener_t::handle_ReadChoice(const boost::system::error_code& e, connection_ptr conn)
334{
335 Info info(__FUNCTION__);
336 if (!e)
337 {
338 bool LaunchNewAcceptor = true;
339 LOG(1, "INFO: Received request for operation " << choice << ".");
340 // switch over the desired choice read previously
341 switch(choice) {
342 case NoControllerOperation:
343 {
344 ELOG(1, "ControllerListener_t::handle_ReadChoice() - called with NoOperation.");
345 break;
346 }
347 case GetNextJobId:
348 {
349 LOG(1, "INFO: Receiving number of desired job ids from controller ...");
350 conn->async_read(NumberIds,
351 boost::bind(&FragmentScheduler::ControllerListener_t::handle_GetNextJobIdState, this,
352 boost::asio::placeholders::error, conn));
353 break;
354 }
355 case SendJobs:
356 {
357 // The connection::async_write() function will automatically
358 // serialize the data structure for us.
359 LOG(1, "INFO: Receiving bunch of jobs from a controller ...");
360 conn->async_read(jobs,
361 boost::bind(&FragmentScheduler::ControllerListener_t::handle_ReceiveJobs, this,
362 boost::asio::placeholders::error, conn));
363 break;
364 }
365 case CheckState:
366 {
367 // first update number
368 jobInfo[0] = JobsQueue.getPresentJobs();
369 jobInfo[1] = JobsQueue.getDoneJobs();
370 // now we accept connections to check for state of calculations
371 LOG(1, "INFO: Sending state that "+toString(jobInfo[0])+" jobs are present and "+toString(jobInfo[1])+" jobs are done to controller ...");
372 conn->async_write(jobInfo,
373 boost::bind(&FragmentScheduler::ControllerListener_t::handle_CheckResultState, this,
374 boost::asio::placeholders::error, conn));
375 break;
376 }
377 case ReceiveResults:
378 {
379 const std::vector<FragmentResult::ptr> results = JobsQueue.getAllResults();
380 // ... or we give the results
381 LOG(1, "INFO: Sending "+toString(results.size())+" results to controller ...");
382 conn->async_write(results,
383 boost::bind(&FragmentScheduler::ControllerListener_t::handle_SendResults, this,
384 boost::asio::placeholders::error, conn));
385 break;
386 }
387 case ShutdownControllerSocket:
388 {
389 LOG(1, "INFO: Received shutdown from controller ...");
390 // only allow for shutdown when there are no more jobs in the queue
391 if (!JobsQueue.isJobPresent()) {
392 LaunchNewAcceptor = false;
393 } else {
394 ELOG(2, "There are still jobs waiting in the queue.");
395 }
396 break;
397 }
398 default:
399 Exitflag = ErrorFlag;
400 ELOG(1, "ControllerListener_t::handle_ReadChoice() - called with no valid choice.");
401 break;
402 }
403 // restore NoControllerOperation choice such that choice is not read twice
404 choice = NoControllerOperation;
405
406 if (LaunchNewAcceptor) {
407 LOG(1, "Launching new acceptor on socket.");
408 // Start an accept operation for a new Connection.
409 initiateSocket();
410 } else {
411 // we shutdown? Hence, also shutdown controller
412 shutdownAllSockets();
413 }
414 }
415 else
416 {
417 // An error occurred. Log it and return. Since we are not starting a new
418 // accept operation the io_service will run out of work to do and the
419 // server will exit.
420 Exitflag = ErrorFlag;
421 ELOG(0, e.message());
422 }
423}
424
425/** Controller callback function when job has been sent.
426 *
427 * We check here whether the worker socket is accepting, if there
428 * have been no jobs we re-activate it, as it is shut down after
429 * last job.
430 *
431 * \param e error code if something went wrong
432 * \param conn reference with the connection
433 */
434void FragmentScheduler::ControllerListener_t::handle_ReceiveJobs(const boost::system::error_code& e, connection_ptr conn)
435{
436 Info info(__FUNCTION__);
437 // jobs are received, hence place in JobsQueue
438 if (!jobs.empty()) {
439 LOG(1, "INFO: Pushing " << jobs.size() << " jobs into queue.");
440 JobsQueue.pushJobs(jobs);
441 }
442 jobs.clear();
443}
444
445/** Controller callback function when checking on state of results.
446 *
447 * \param e error code if something went wrong
448 * \param conn reference with the connection
449 */
450void FragmentScheduler::ControllerListener_t::handle_CheckResultState(const boost::system::error_code& e, connection_ptr conn)
451{
452 Info info(__FUNCTION__);
453 // do nothing
454 LOG(1, "INFO: Sent that " << jobInfo << " jobs are (scheduled, done).");
455}
456
457/** Controller callback function when checking on state of results.
458 *
459 * \param e error code if something went wrong
460 * \param conn reference with the connection
461 */
462void FragmentScheduler::ControllerListener_t::handle_GetNextJobIdState(const boost::system::error_code& e, connection_ptr conn)
463{
464 Info info(__FUNCTION__);
465
466 std::vector<JobId_t> nextids( NumberIds, JobId::IllegalJob);
467 std::generate(nextids.begin(), nextids.end(),
468 boost::bind(&GlobalJobId::getNextId, boost::ref(globalId)));
469 LOG(1, "INFO: Sending next available job ids " << nextids << " to controller ...");
470 conn->async_write(nextids,
471 boost::bind(&FragmentScheduler::ControllerListener_t::handle_SendIds, this,
472 boost::asio::placeholders::error, conn));
473}
474
475/** Controller callback function when free job ids have been sent.
476 *
477 * \param e error code if something went wrong
478 * \param conn reference with the connection
479 */
480void FragmentScheduler::ControllerListener_t::handle_SendIds(const boost::system::error_code& e, connection_ptr conn)
481{
482 Info info(__FUNCTION__);
483 // do nothing
484 LOG(1, "INFO: Ids have been sent.");
485}
486
487/** Controller callback function when result has been received.
488 *
489 * \param e error code if something went wrong
490 * \param conn reference with the connection
491 */
492void FragmentScheduler::ControllerListener_t::handle_SendResults(const boost::system::error_code& e, connection_ptr conn)
493{
494 Info info(__FUNCTION__);
495 // do nothing
496 LOG(1, "INFO: Results have been sent.");
497}
498
499
500/** Helper function to send a job to worker.
501 *
502 * Note that we do not set the worker as busy. We simply send it the job.
503 *
504 * @param address address of worker
505 * @param job job to send
506 */
507void FragmentScheduler::sendJobToWorker(const WorkerAddress &address, FragmentJob::ptr &job)
508{
509 ASSERT( pool.isWorkerBusy(address),
510 "FragmentScheduler::sendJobToWorker() - Worker "+toString(address)+" is not marked as busy.");
511 LOG(1, "INFO: Sending job " << job->getId() << " to worker " << address << ".");
512
513 // create op, sign on, and hand over to queue
514 AsyncOperation *sendJobOp = new SendJobToWorkerOperation(connection,job);
515 OpQueue.push_back(sendJobOp, address);
516}
517
518/** Helper function to shutdown a single worker.
519 *
520 * We send NoJob to indicate shutdown
521 *
522 * @param address of worker to shutdown
523 */
524void FragmentScheduler::shutdownWorker(const WorkerAddress &address)
525{
526 ASSERT( !pool.isWorkerBusy(address),
527 "FragmentScheduler::sendJobToWorker() - Worker "+toString(address)+" is already busy.");
528 LOG(2, "INFO: Shutting down worker " << address << "...");
529 shutdownWorkerOp(address.host, address.service);
530}
531
532/** Sends shutdown to all current workers in the pool.
533 *
534 */
535void FragmentScheduler::removeAllWorkers()
536{
537 LOG(2, "INFO: Shutting down workers ...");
538
539 // \todo We have to wait here until all workers are done
540 // first, sign off such that no new jobs are given to workers
541 pool.signOff(this, WorkerPool::WorkerIdle);
542 while (pool.hasBusyWorkers())
543 ;
544
545 // give all workers shutdown signal
546 for (WorkerPool::Idle_Queue_t::const_iterator iter = pool.begin_idle(); iter != pool.end_idle(); ++iter) {
547 const WorkerAddress address = iter->second;
548 shutdownWorker(address);
549 }
550 pool.removeAllWorkers();
551}
552
553/** Helper function to shutdown the server properly.
554 *
555 * \todo one should idle here until all workers have returned from
556 * calculating stuff (or workers need to still listen while the are
557 * calculating which is probably better).
558 *
559 */
560void FragmentScheduler::shutdown()
561{
562 LOG(1, "INFO: Shutting all down ...");
563
564 /// Remove all workers
565 removeAllWorkers();
566
567 /// close the worker listener's socket
568 WorkerListener.closeSocket();
569
570 /// close the controller listener's socket
571 ControllerListener.closeSocket();
572
573 /// finally, stop the io_service
574 io_service.stop();
575}
576
577/** Internal helper to send the next available job to the next idle worker.
578 *
579 */
580void FragmentScheduler::sendAvailableJobToNextIdleWorker()
581{
582 const WorkerAddress address = pool.getNextIdleWorker();
583 FragmentJob::ptr job = JobsQueue.popJob();
584 sendJobToWorker(address, job);
585}
586
587void FragmentScheduler::update(Observable *publisher)
588{
589 ASSERT(0, "FragmentScheduler::update() - we are not signed on for global updates.");
590}
591
592void FragmentScheduler::recieveNotification(Observable *publisher, Notification_ptr notification)
593{
594 if ((publisher == &pool) && (notification->getChannelNo() == WorkerPool::WorkerIdle)) {
595 // we have an idle worker
596 LOG(1, "INFO: We are notified of an idle worker.");
597 // are jobs available?
598 if (JobsQueue.isJobPresent()) {
599 sendAvailableJobToNextIdleWorker();
600 }
601 } else if ((publisher == &JobsQueue) && (notification->getChannelNo() == FragmentQueue::JobAdded)) {
602 // we have new jobs
603 LOG(1, "INFO: We are notified of a new job.");
604 // check for idle workers
605 if (pool.presentIdleWorkers()) {
606 sendAvailableJobToNextIdleWorker();
607 }
608 } else {
609 ASSERT(0, "FragmentScheduler::recieveNotification() - we are not signed on for updates in channel "
610 +toString(notification->getChannelNo())+".");
611 }
612}
613
614void FragmentScheduler::subjectKilled(Observable *publisher)
615{}
Note: See TracBrowser for help on using the repository browser.