Ignore:
Timestamp:
Jul 2, 2012, 7:54:11 AM (13 years ago)
Author:
Frederik Heber <heber@…>
Branches:
Action_Thermostats, Add_AtomRandomPerturbation, Add_FitFragmentPartialChargesAction, Add_RotateAroundBondAction, Add_SelectAtomByNameAction, Added_ParseSaveFragmentResults, AddingActions_SaveParseParticleParameters, Adding_Graph_to_ChangeBondActions, Adding_MD_integration_tests, Adding_ParticleName_to_Atom, Adding_StructOpt_integration_tests, AtomFragments, Automaking_mpqc_open, AutomationFragmentation_failures, Candidate_v1.5.4, Candidate_v1.6.0, Candidate_v1.6.1, ChangeBugEmailaddress, ChangingTestPorts, ChemicalSpaceEvaluator, CombiningParticlePotentialParsing, Combining_Subpackages, Debian_Package_split, Debian_package_split_molecuildergui_only, Disabling_MemDebug, Docu_Python_wait, EmpiricalPotential_contain_HomologyGraph, EmpiricalPotential_contain_HomologyGraph_documentation, Enable_parallel_make_install, Enhance_userguide, Enhanced_StructuralOptimization, Enhanced_StructuralOptimization_continued, Example_ManyWaysToTranslateAtom, Exclude_Hydrogens_annealWithBondGraph, FitPartialCharges_GlobalError, Fix_BoundInBox_CenterInBox_MoleculeActions, Fix_ChargeSampling_PBC, Fix_ChronosMutex, Fix_FitPartialCharges, Fix_FitPotential_needs_atomicnumbers, Fix_ForceAnnealing, Fix_IndependentFragmentGrids, Fix_ParseParticles, Fix_ParseParticles_split_forward_backward_Actions, Fix_PopActions, Fix_QtFragmentList_sorted_selection, Fix_Restrictedkeyset_FragmentMolecule, Fix_StatusMsg, Fix_StepWorldTime_single_argument, Fix_Verbose_Codepatterns, Fix_fitting_potentials, Fixes, ForceAnnealing_goodresults, ForceAnnealing_oldresults, ForceAnnealing_tocheck, ForceAnnealing_with_BondGraph, ForceAnnealing_with_BondGraph_continued, ForceAnnealing_with_BondGraph_continued_betteresults, ForceAnnealing_with_BondGraph_contraction-expansion, FragmentAction_writes_AtomFragments, FragmentMolecule_checks_bonddegrees, GeometryObjects, Gui_Fixes, Gui_displays_atomic_force_velocity, ImplicitCharges, IndependentFragmentGrids, IndependentFragmentGrids_IndividualZeroInstances, IndependentFragmentGrids_IntegrationTest, IndependentFragmentGrids_Sole_NN_Calculation, JobMarket_RobustOnKillsSegFaults, JobMarket_StableWorkerPool, JobMarket_unresolvable_hostname_fix, MoreRobust_FragmentAutomation, ODR_violation_mpqc_open, PartialCharges_OrthogonalSummation, PdbParser_setsAtomName, PythonUI_with_named_parameters, QtGui_reactivate_TimeChanged_changes, Recreated_GuiChecks, Rewrite_FitPartialCharges, RotateToPrincipalAxisSystem_UndoRedo, SaturateAtoms_findBestMatching, SaturateAtoms_singleDegree, StoppableMakroAction, Subpackage_CodePatterns, Subpackage_JobMarket, Subpackage_LinearAlgebra, Subpackage_levmar, Subpackage_mpqc_open, Subpackage_vmg, Switchable_LogView, ThirdParty_MPQC_rebuilt_buildsystem, TrajectoryDependenant_MaxOrder, TremoloParser_IncreasedPrecision, TremoloParser_MultipleTimesteps, TremoloParser_setsAtomName, Ubuntu_1604_changes, stable
Children:
1cfd17
Parents:
fb255d
git-author:
Frederik Heber <heber@…> (03/04/12 16:24:25)
git-committer:
Frederik Heber <heber@…> (07/02/12 07:54:11)
Message:

FragmentScheduler is now an Observer of JobsQueue and pool.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • src/Fragmentation/Automation/FragmentScheduler.cpp

    rfb255d r2344a3  
    3434#include "CodePatterns/Info.hpp"
    3535#include "CodePatterns/Log.hpp"
     36#include "CodePatterns/Observer/Notification.hpp"
     37#include "ControllerChoices.hpp"
    3638#include "Controller/Commands/EnrollInPoolOperation.hpp"
    3739#include "Jobs/MPQCCommandJob.hpp"
     
    4143#include "FragmentScheduler.hpp"
    4244
    43 FragmentJob::ptr FragmentScheduler::WorkerListener_t::NoJob(new SystemCommandJob(std::string("/bin/true"), std::string("dosomething"), JobId::NoJob));
     45FragmentJob::ptr FragmentScheduler::NoJob(new SystemCommandJob(std::string(""), std::string(""), JobId::NoJob));
    4446
    4547/** Helper function to enforce binding of FragmentWorker to possible derived
     
    5961 * \param controllerport port to listen for controller connections.
    6062 */
    61 FragmentScheduler::FragmentScheduler(boost::asio::io_service& io_service, unsigned short workerport, unsigned short controllerport) :
    62     WorkerListener(io_service, workerport, JobsQueue, pool,
     63FragmentScheduler::FragmentScheduler(boost::asio::io_service& _io_service, unsigned short workerport, unsigned short controllerport) :
     64    Observer("FragmentScheduler"),
     65    io_service(_io_service),
     66    WorkerListener(_io_service, workerport, JobsQueue, pool,
    6367        boost::bind(&FragmentScheduler::sendJobToWorker, boost::ref(*this), _1, _2)),
    64     ControllerListener(io_service, controllerport, JobsQueue,
    65         boost::bind(&Listener::initiateSocket, boost::ref(WorkerListener))),
    66     connection(io_service),
     68    ControllerListener(_io_service, controllerport, JobsQueue,
     69        boost::bind(&FragmentScheduler::shutdown, boost::ref(*this))),
     70    connection(_io_service),
    6771    sendJobOp(connection)
    6872{
    6973  Info info(__FUNCTION__);
     74
     75  // sign on to idle workers and present jobs
     76  pool.signOn(this, WorkerPool::WorkerIdle);
     77  JobsQueue.signOn(this, FragmentQueue::JobAdded);
    7078
    7179  // listen for controller
    7280  ControllerListener.initiateSocket();
    7381
    74   // only initiate socket if jobs are already present
    75   if (JobsQueue.isJobPresent()) {
    76     WorkerListener.initiateSocket();
    77   }
     82  // listen for workers
     83  WorkerListener.initiateSocket();
     84}
     85
     86FragmentScheduler::~FragmentScheduler()
     87{
     88  // sign off
     89  pool.signOff(this, WorkerPool::WorkerIdle);
     90  JobsQueue.signOff(this, FragmentQueue::JobAdded);
    7891}
    7992
     
    208221    // restore NoOperation choice such that choice is not read twice
    209222    choice = NoWorkerOperation;
     223
     224    initiateSocket();
    210225  }
    211226  else
     
    217232    ELOG(0, e.message());
    218233  }
    219 
    220   if (JobsQueue.isJobPresent()) {
    221     // Start an accept operation for a new Connection only when there
    222     // are still jobs present
    223     initiateSocket();
    224   }
    225234}
    226235
     
    234243{
    235244  Info info(__FUNCTION__);
    236   if (!e)
    237   {
    238     if (JobsQueue.isJobPresent()) {
    239       FragmentJob::ptr job = JobsQueue.popJob();
    240       callback_sendJobToWorker(pool.getNextIdleWorker(), job);
    241     }
    242   }
    243   else
     245  if (e)
    244246  {
    245247    // An error occurred. Log it and return. Since we are not starting a new
     
    276278  result.reset();
    277279  LOG(1, "INFO: JobsQueue has " << JobsQueue.getDoneJobs() << " results.");
    278 
    279   // send out new job if present
    280   if (JobsQueue.isJobPresent()) {
    281     FragmentJob::ptr job = JobsQueue.popJob();
    282     callback_sendJobToWorker(pool.getNextIdleWorker(), job);
    283   }
    284280}
    285281
     
    412408      // Start an accept operation for a new Connection.
    413409      initiateSocket();
     410    } else {
     411      // we shutdown? Hence, also shutdown controller
     412      shutdownAllSockets();
    414413    }
    415414  }
     
    436435{
    437436  Info info(__FUNCTION__);
    438   bool need_initiateSocket = !JobsQueue.isJobPresent();
    439 
    440437  // jobs are received, hence place in JobsQueue
    441438  if (!jobs.empty()) {
     
    443440    JobsQueue.pushJobs(jobs);
    444441  }
    445 
    446442  jobs.clear();
    447 
    448   // initiate socket if we had no jobs before
    449   if (need_initiateSocket)
    450     initiateWorkerSocket();
    451443}
    452444
     
    504496}
    505497
    506 ///** Helper function to shutdown a single worker.
    507 // *
    508 // * We send NoJob to indicate shutdown
    509 // *
    510 // * @param address of worker to shutdown
    511 // */
    512 //void FragmentScheduler::shutdownWorker(const WorkerAddress &address)
    513 //{
    514 //  sendJobToWorker(address, NoJob);
    515 //}
    516 //
    517 ///** Sends shutdown to all current workers in the pool.
    518 // *
    519 // */
    520 //void FragmentScheduler::removeAllWorkers()
    521 //{
    522 //  // give all workers shutdown signal
    523 //  while (pool.presentIdleWorkers()) {
    524 //    const WorkerAddress address = pool.getNextIdleWorker();
    525 //    shutdownWorker(address);
    526 //  }
    527 //}
     498/** Helper function to shutdown a single worker.
     499 *
     500 * We send NoJob to indicate shutdown
     501 *
     502 * @param address of worker to shutdown
     503 */
     504void FragmentScheduler::shutdownWorker(const WorkerAddress &address)
     505{
     506  LOG(3, "DEBUG: Shutting down worker " << address << "...");
     507  sendJobToWorker(address, NoJob);
     508}
     509
     510/** Sends shutdown to all current workers in the pool.
     511 *
     512 */
     513void FragmentScheduler::removeAllWorkers()
     514{
     515  LOG(2, "INFO: Shutting down workers ...");
     516
     517  // give all workers shutdown signal
     518  while (pool.presentIdleWorkers()) {
     519    const WorkerAddress address = pool.getNextIdleWorker();
     520    shutdownWorker(address);
     521  }
     522}
     523
     524/** Helper function to shutdown the server properly.
     525 *
     526 * \todo one should idle here until all workers have returned from
     527 * calculating stuff (or workers need to still listen while the are
     528 * calculating which is probably better).
     529 *
     530 */
     531void FragmentScheduler::shutdown()
     532{
     533  LOG(1, "INFO: Shutting all down ...");
     534
     535  /// Remove all workers
     536  removeAllWorkers();
     537
     538  /// close the worker listener's socket
     539  WorkerListener.closeSocket();
     540
     541  /// close the controller listener's socket
     542  ControllerListener.closeSocket();
     543
     544  /// finally, stop the io_service
     545  io_service.stop();
     546}
     547
     548/** Internal helper to send the next available job to the next idle worker.
     549 *
     550 */
     551void FragmentScheduler::sendAvailableJobToNextIdleWorker()
     552{
     553  const WorkerAddress address = pool.getNextIdleWorker();
     554  FragmentJob::ptr job = JobsQueue.popJob();
     555  sendJobToWorker(address, job);
     556}
     557
     558void FragmentScheduler::update(Observable *publisher)
     559{
     560  ASSERT(0, "FragmentScheduler::update() - we are not signed on for global updates.");
     561}
     562
     563void FragmentScheduler::recieveNotification(Observable *publisher, Notification_ptr notification)
     564{
     565  // we have an idle worker
     566  if ((publisher == &pool) && (notification->getChannelNo() == WorkerPool::WorkerIdle)) {
     567    LOG(1, "INFO: We are notified of an idle worker.");
     568    // are jobs available?
     569    if (JobsQueue.isJobPresent()) {
     570      sendAvailableJobToNextIdleWorker();
     571    }
     572  }
     573
     574  // we have new jobs
     575  if ((publisher == &JobsQueue) && (notification->getChannelNo() == FragmentQueue::JobAdded)) {
     576    LOG(1, "INFO: We are notified of a new job.");
     577    // check for idle workers
     578    if (pool.presentIdleWorkers()) {
     579      sendAvailableJobToNextIdleWorker();
     580    }
     581  }
     582}
     583
     584void FragmentScheduler::subjectKilled(Observable *publisher)
     585{}
Note: See TracChangeset for help on using the changeset viewer.