Changeset 2344a3 for src/Fragmentation/Automation/FragmentScheduler.cpp
- Timestamp:
- Jul 2, 2012, 7:54:11 AM (13 years ago)
- Branches:
- Action_Thermostats, Add_AtomRandomPerturbation, Add_FitFragmentPartialChargesAction, Add_RotateAroundBondAction, Add_SelectAtomByNameAction, Added_ParseSaveFragmentResults, AddingActions_SaveParseParticleParameters, Adding_Graph_to_ChangeBondActions, Adding_MD_integration_tests, Adding_ParticleName_to_Atom, Adding_StructOpt_integration_tests, AtomFragments, Automaking_mpqc_open, AutomationFragmentation_failures, Candidate_v1.5.4, Candidate_v1.6.0, Candidate_v1.6.1, ChangeBugEmailaddress, ChangingTestPorts, ChemicalSpaceEvaluator, CombiningParticlePotentialParsing, Combining_Subpackages, Debian_Package_split, Debian_package_split_molecuildergui_only, Disabling_MemDebug, Docu_Python_wait, EmpiricalPotential_contain_HomologyGraph, EmpiricalPotential_contain_HomologyGraph_documentation, Enable_parallel_make_install, Enhance_userguide, Enhanced_StructuralOptimization, Enhanced_StructuralOptimization_continued, Example_ManyWaysToTranslateAtom, Exclude_Hydrogens_annealWithBondGraph, FitPartialCharges_GlobalError, Fix_BoundInBox_CenterInBox_MoleculeActions, Fix_ChargeSampling_PBC, Fix_ChronosMutex, Fix_FitPartialCharges, Fix_FitPotential_needs_atomicnumbers, Fix_ForceAnnealing, Fix_IndependentFragmentGrids, Fix_ParseParticles, Fix_ParseParticles_split_forward_backward_Actions, Fix_PopActions, Fix_QtFragmentList_sorted_selection, Fix_Restrictedkeyset_FragmentMolecule, Fix_StatusMsg, Fix_StepWorldTime_single_argument, Fix_Verbose_Codepatterns, Fix_fitting_potentials, Fixes, ForceAnnealing_goodresults, ForceAnnealing_oldresults, ForceAnnealing_tocheck, ForceAnnealing_with_BondGraph, ForceAnnealing_with_BondGraph_continued, ForceAnnealing_with_BondGraph_continued_betteresults, ForceAnnealing_with_BondGraph_contraction-expansion, FragmentAction_writes_AtomFragments, FragmentMolecule_checks_bonddegrees, GeometryObjects, Gui_Fixes, Gui_displays_atomic_force_velocity, ImplicitCharges, IndependentFragmentGrids, IndependentFragmentGrids_IndividualZeroInstances, IndependentFragmentGrids_IntegrationTest, IndependentFragmentGrids_Sole_NN_Calculation, JobMarket_RobustOnKillsSegFaults, JobMarket_StableWorkerPool, JobMarket_unresolvable_hostname_fix, MoreRobust_FragmentAutomation, ODR_violation_mpqc_open, PartialCharges_OrthogonalSummation, PdbParser_setsAtomName, PythonUI_with_named_parameters, QtGui_reactivate_TimeChanged_changes, Recreated_GuiChecks, Rewrite_FitPartialCharges, RotateToPrincipalAxisSystem_UndoRedo, SaturateAtoms_findBestMatching, SaturateAtoms_singleDegree, StoppableMakroAction, Subpackage_CodePatterns, Subpackage_JobMarket, Subpackage_LinearAlgebra, Subpackage_levmar, Subpackage_mpqc_open, Subpackage_vmg, Switchable_LogView, ThirdParty_MPQC_rebuilt_buildsystem, TrajectoryDependenant_MaxOrder, TremoloParser_IncreasedPrecision, TremoloParser_MultipleTimesteps, TremoloParser_setsAtomName, Ubuntu_1604_changes, stable
- Children:
- 1cfd17
- Parents:
- fb255d
- git-author:
- Frederik Heber <heber@…> (03/04/12 16:24:25)
- git-committer:
- Frederik Heber <heber@…> (07/02/12 07:54:11)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
src/Fragmentation/Automation/FragmentScheduler.cpp
rfb255d r2344a3 34 34 #include "CodePatterns/Info.hpp" 35 35 #include "CodePatterns/Log.hpp" 36 #include "CodePatterns/Observer/Notification.hpp" 37 #include "ControllerChoices.hpp" 36 38 #include "Controller/Commands/EnrollInPoolOperation.hpp" 37 39 #include "Jobs/MPQCCommandJob.hpp" … … 41 43 #include "FragmentScheduler.hpp" 42 44 43 FragmentJob::ptr FragmentScheduler:: WorkerListener_t::NoJob(new SystemCommandJob(std::string("/bin/true"), std::string("dosomething"), JobId::NoJob));45 FragmentJob::ptr FragmentScheduler::NoJob(new SystemCommandJob(std::string(""), std::string(""), JobId::NoJob)); 44 46 45 47 /** Helper function to enforce binding of FragmentWorker to possible derived … … 59 61 * \param controllerport port to listen for controller connections. 60 62 */ 61 FragmentScheduler::FragmentScheduler(boost::asio::io_service& io_service, unsigned short workerport, unsigned short controllerport) : 62 WorkerListener(io_service, workerport, JobsQueue, pool, 63 FragmentScheduler::FragmentScheduler(boost::asio::io_service& _io_service, unsigned short workerport, unsigned short controllerport) : 64 Observer("FragmentScheduler"), 65 io_service(_io_service), 66 WorkerListener(_io_service, workerport, JobsQueue, pool, 63 67 boost::bind(&FragmentScheduler::sendJobToWorker, boost::ref(*this), _1, _2)), 64 ControllerListener( io_service, controllerport, JobsQueue,65 boost::bind(& Listener::initiateSocket, boost::ref(WorkerListener))),66 connection( io_service),68 ControllerListener(_io_service, controllerport, JobsQueue, 69 boost::bind(&FragmentScheduler::shutdown, boost::ref(*this))), 70 connection(_io_service), 67 71 sendJobOp(connection) 68 72 { 69 73 Info info(__FUNCTION__); 74 75 // sign on to idle workers and present jobs 76 pool.signOn(this, WorkerPool::WorkerIdle); 77 JobsQueue.signOn(this, FragmentQueue::JobAdded); 70 78 71 79 // listen for controller 72 80 ControllerListener.initiateSocket(); 73 81 74 // only initiate socket if jobs are already present 75 if (JobsQueue.isJobPresent()) { 76 WorkerListener.initiateSocket(); 77 } 82 // listen for workers 83 WorkerListener.initiateSocket(); 84 } 85 86 FragmentScheduler::~FragmentScheduler() 87 { 88 // sign off 89 pool.signOff(this, WorkerPool::WorkerIdle); 90 JobsQueue.signOff(this, FragmentQueue::JobAdded); 78 91 } 79 92 … … 208 221 // restore NoOperation choice such that choice is not read twice 209 222 choice = NoWorkerOperation; 223 224 initiateSocket(); 210 225 } 211 226 else … … 217 232 ELOG(0, e.message()); 218 233 } 219 220 if (JobsQueue.isJobPresent()) {221 // Start an accept operation for a new Connection only when there222 // are still jobs present223 initiateSocket();224 }225 234 } 226 235 … … 234 243 { 235 244 Info info(__FUNCTION__); 236 if (!e) 237 { 238 if (JobsQueue.isJobPresent()) { 239 FragmentJob::ptr job = JobsQueue.popJob(); 240 callback_sendJobToWorker(pool.getNextIdleWorker(), job); 241 } 242 } 243 else 245 if (e) 244 246 { 245 247 // An error occurred. Log it and return. Since we are not starting a new … … 276 278 result.reset(); 277 279 LOG(1, "INFO: JobsQueue has " << JobsQueue.getDoneJobs() << " results."); 278 279 // send out new job if present280 if (JobsQueue.isJobPresent()) {281 FragmentJob::ptr job = JobsQueue.popJob();282 callback_sendJobToWorker(pool.getNextIdleWorker(), job);283 }284 280 } 285 281 … … 412 408 // Start an accept operation for a new Connection. 413 409 initiateSocket(); 410 } else { 411 // we shutdown? Hence, also shutdown controller 412 shutdownAllSockets(); 414 413 } 415 414 } … … 436 435 { 437 436 Info info(__FUNCTION__); 438 bool need_initiateSocket = !JobsQueue.isJobPresent();439 440 437 // jobs are received, hence place in JobsQueue 441 438 if (!jobs.empty()) { … … 443 440 JobsQueue.pushJobs(jobs); 444 441 } 445 446 442 jobs.clear(); 447 448 // initiate socket if we had no jobs before449 if (need_initiateSocket)450 initiateWorkerSocket();451 443 } 452 444 … … 504 496 } 505 497 506 ///** Helper function to shutdown a single worker. 507 // * 508 // * We send NoJob to indicate shutdown 509 // * 510 // * @param address of worker to shutdown 511 // */ 512 //void FragmentScheduler::shutdownWorker(const WorkerAddress &address) 513 //{ 514 // sendJobToWorker(address, NoJob); 515 //} 516 // 517 ///** Sends shutdown to all current workers in the pool. 518 // * 519 // */ 520 //void FragmentScheduler::removeAllWorkers() 521 //{ 522 // // give all workers shutdown signal 523 // while (pool.presentIdleWorkers()) { 524 // const WorkerAddress address = pool.getNextIdleWorker(); 525 // shutdownWorker(address); 526 // } 527 //} 498 /** Helper function to shutdown a single worker. 499 * 500 * We send NoJob to indicate shutdown 501 * 502 * @param address of worker to shutdown 503 */ 504 void FragmentScheduler::shutdownWorker(const WorkerAddress &address) 505 { 506 LOG(3, "DEBUG: Shutting down worker " << address << "..."); 507 sendJobToWorker(address, NoJob); 508 } 509 510 /** Sends shutdown to all current workers in the pool. 511 * 512 */ 513 void FragmentScheduler::removeAllWorkers() 514 { 515 LOG(2, "INFO: Shutting down workers ..."); 516 517 // give all workers shutdown signal 518 while (pool.presentIdleWorkers()) { 519 const WorkerAddress address = pool.getNextIdleWorker(); 520 shutdownWorker(address); 521 } 522 } 523 524 /** Helper function to shutdown the server properly. 525 * 526 * \todo one should idle here until all workers have returned from 527 * calculating stuff (or workers need to still listen while the are 528 * calculating which is probably better). 529 * 530 */ 531 void FragmentScheduler::shutdown() 532 { 533 LOG(1, "INFO: Shutting all down ..."); 534 535 /// Remove all workers 536 removeAllWorkers(); 537 538 /// close the worker listener's socket 539 WorkerListener.closeSocket(); 540 541 /// close the controller listener's socket 542 ControllerListener.closeSocket(); 543 544 /// finally, stop the io_service 545 io_service.stop(); 546 } 547 548 /** Internal helper to send the next available job to the next idle worker. 549 * 550 */ 551 void FragmentScheduler::sendAvailableJobToNextIdleWorker() 552 { 553 const WorkerAddress address = pool.getNextIdleWorker(); 554 FragmentJob::ptr job = JobsQueue.popJob(); 555 sendJobToWorker(address, job); 556 } 557 558 void FragmentScheduler::update(Observable *publisher) 559 { 560 ASSERT(0, "FragmentScheduler::update() - we are not signed on for global updates."); 561 } 562 563 void FragmentScheduler::recieveNotification(Observable *publisher, Notification_ptr notification) 564 { 565 // we have an idle worker 566 if ((publisher == &pool) && (notification->getChannelNo() == WorkerPool::WorkerIdle)) { 567 LOG(1, "INFO: We are notified of an idle worker."); 568 // are jobs available? 569 if (JobsQueue.isJobPresent()) { 570 sendAvailableJobToNextIdleWorker(); 571 } 572 } 573 574 // we have new jobs 575 if ((publisher == &JobsQueue) && (notification->getChannelNo() == FragmentQueue::JobAdded)) { 576 LOG(1, "INFO: We are notified of a new job."); 577 // check for idle workers 578 if (pool.presentIdleWorkers()) { 579 sendAvailableJobToNextIdleWorker(); 580 } 581 } 582 } 583 584 void FragmentScheduler::subjectKilled(Observable *publisher) 585 {}
Note:
See TracChangeset
for help on using the changeset viewer.