source: src/Fragmentation/Automation/FragmentScheduler.cpp@ a8f54b6

Action_Thermostats Add_AtomRandomPerturbation Add_FitFragmentPartialChargesAction Add_RotateAroundBondAction Add_SelectAtomByNameAction Added_ParseSaveFragmentResults AddingActions_SaveParseParticleParameters Adding_Graph_to_ChangeBondActions Adding_MD_integration_tests Adding_ParticleName_to_Atom Adding_StructOpt_integration_tests AtomFragments Automaking_mpqc_open AutomationFragmentation_failures Candidate_v1.5.4 Candidate_v1.6.0 Candidate_v1.6.1 ChangeBugEmailaddress ChangingTestPorts ChemicalSpaceEvaluator CombiningParticlePotentialParsing Combining_Subpackages Debian_Package_split Debian_package_split_molecuildergui_only Disabling_MemDebug Docu_Python_wait EmpiricalPotential_contain_HomologyGraph EmpiricalPotential_contain_HomologyGraph_documentation Enable_parallel_make_install Enhance_userguide Enhanced_StructuralOptimization Enhanced_StructuralOptimization_continued Example_ManyWaysToTranslateAtom Exclude_Hydrogens_annealWithBondGraph FitPartialCharges_GlobalError Fix_BoundInBox_CenterInBox_MoleculeActions Fix_ChargeSampling_PBC Fix_ChronosMutex Fix_FitPartialCharges Fix_FitPotential_needs_atomicnumbers Fix_ForceAnnealing Fix_IndependentFragmentGrids Fix_ParseParticles Fix_ParseParticles_split_forward_backward_Actions Fix_PopActions Fix_QtFragmentList_sorted_selection Fix_Restrictedkeyset_FragmentMolecule Fix_StatusMsg Fix_StepWorldTime_single_argument Fix_Verbose_Codepatterns Fix_fitting_potentials Fixes ForceAnnealing_goodresults ForceAnnealing_oldresults ForceAnnealing_tocheck ForceAnnealing_with_BondGraph ForceAnnealing_with_BondGraph_continued ForceAnnealing_with_BondGraph_continued_betteresults ForceAnnealing_with_BondGraph_contraction-expansion FragmentAction_writes_AtomFragments FragmentMolecule_checks_bonddegrees GeometryObjects Gui_Fixes Gui_displays_atomic_force_velocity ImplicitCharges IndependentFragmentGrids IndependentFragmentGrids_IndividualZeroInstances IndependentFragmentGrids_IntegrationTest IndependentFragmentGrids_Sole_NN_Calculation JobMarket_RobustOnKillsSegFaults JobMarket_StableWorkerPool JobMarket_unresolvable_hostname_fix MoreRobust_FragmentAutomation ODR_violation_mpqc_open PartialCharges_OrthogonalSummation PdbParser_setsAtomName PythonUI_with_named_parameters QtGui_reactivate_TimeChanged_changes Recreated_GuiChecks Rewrite_FitPartialCharges RotateToPrincipalAxisSystem_UndoRedo SaturateAtoms_findBestMatching SaturateAtoms_singleDegree StoppableMakroAction Subpackage_CodePatterns Subpackage_JobMarket Subpackage_LinearAlgebra Subpackage_levmar Subpackage_mpqc_open Subpackage_vmg Switchable_LogView ThirdParty_MPQC_rebuilt_buildsystem TrajectoryDependenant_MaxOrder TremoloParser_IncreasedPrecision TremoloParser_MultipleTimesteps TremoloParser_setsAtomName Ubuntu_1604_changes stable
Last change on this file since a8f54b6 was 38032a, checked in by Frederik Heber <heber@…>, 13 years ago

Renamed SchedulerStates -> ControllerChoices enum.

  • extracted from types.hpp as well.
  • Property mode set to 100644
File size: 13.7 KB
RevLine 
[72eaf7f]1/*
[cd4a6e]2 * Project: MoleCuilder
3 * Description: creates and alters molecular systems
4 * Copyright (C) 2011 University of Bonn. All rights reserved.
5 * Please see the LICENSE file or "Copyright notice" in builder.cpp for details.
6 */
7
8/*
9 * \file FragmentScheduler.cpp
10 *
11 * This file strongly follows the Serialization example from the boost::asio
12 * library (see server.cpp)
[72eaf7f]13 *
[cd4a6e]14 * Created on: Oct 19, 2011
[72eaf7f]15 * Author: heber
16 */
17
[f93842]18// include config.h
19#ifdef HAVE_CONFIG_H
20#include <config.h>
21#endif
22
[c6bcd0]23// boost asio needs specific operator new
[72eaf7f]24#include <boost/asio.hpp>
[c6bcd0]25
26#include "CodePatterns/MemDebug.hpp"
27
[72eaf7f]28#include <boost/bind.hpp>
29#include <boost/lexical_cast.hpp>
30#include <iostream>
31#include <vector>
[af3aed]32#include "Connection.hpp" // Must come before boost/serialization headers.
[72eaf7f]33#include <boost/serialization/vector.hpp>
[af3aed]34#include "CodePatterns/Info.hpp"
[b0b64c]35#include "CodePatterns/Log.hpp"
[41c1b7]36#include "Controller/Commands/EnrollInPoolOperation.hpp"
[ff60cfa]37#include "Jobs/MPQCCommandJob.hpp"
[d920b9]38#include "Jobs/SystemCommandJob.hpp"
[ef2767]39#include "JobId.hpp"
[72eaf7f]40
[cd4a6e]41#include "FragmentScheduler.hpp"
[72eaf7f]42
[8036b7]43FragmentJob::ptr FragmentScheduler::WorkerListener_t::NoJob(new SystemCommandJob(std::string("/bin/true"), std::string("dosomething"), JobId::NoJob));
[ff60cfa]44
45/** Helper function to enforce binding of FragmentWorker to possible derived
46 * FragmentJob classes.
47 */
48void dummyInit() {
49 SystemCommandJob("/bin/false", "something", JobId::IllegalJob);
50 MPQCCommandJob("nofile", JobId::IllegalJob);
51}
[c7deca]52
[db03d9]53/** Constructor of class FragmentScheduler.
54 *
55 * We setup both acceptors to accept connections from workers and Controller.
56 *
57 * \param io_service io_service of the asynchronous communications
58 * \param workerport port to listen for worker connections
59 * \param controllerport port to listen for controller connections.
60 */
61FragmentScheduler::FragmentScheduler(boost::asio::io_service& io_service, unsigned short workerport, unsigned short controllerport) :
[41c1b7]62 WorkerListener(io_service, workerport, JobsQueue, pool,
63 boost::bind(&FragmentScheduler::sendJobToWorker, boost::ref(*this), _1, _2)),
[8036b7]64 ControllerListener(io_service, controllerport, JobsQueue,
[41c1b7]65 boost::bind(&Listener::initiateSocket, boost::ref(WorkerListener))),
66 connection(io_service),
67 sendJobOp(connection)
[ed2c5b]68{
[b0b64c]69 Info info(__FUNCTION__);
[72eaf7f]70
[41c1b7]71 // listen for controller
72 ControllerListener.initiateSocket();
73
[778abb]74 // only initiate socket if jobs are already present
75 if (JobsQueue.isJobPresent()) {
[8036b7]76 WorkerListener.initiateSocket();
[778abb]77 }
[402bde]78}
79
[db03d9]80/** Handle a new worker connection.
81 *
[41c1b7]82 * We store the given address in the pool.
[db03d9]83 *
84 * \param e error code if something went wrong
85 * \param conn reference with the connection
86 */
[8036b7]87void FragmentScheduler::WorkerListener_t::handle_Accept(const boost::system::error_code& e, connection_ptr conn)
[ed2c5b]88{
[cd4a6e]89 Info info(__FUNCTION__);
[ed2c5b]90 if (!e)
[72eaf7f]91 {
[b0b64c]92 // Successfully accepted a new connection.
[41c1b7]93 // read address
94 conn->async_read(address,
95 boost::bind(&FragmentScheduler::WorkerListener_t::handle_checkAddress, this,
96 boost::asio::placeholders::error, conn));
97 } else {
98 // An error occurred. Log it and return. Since we are not starting a new
99 // accept operation the io_service will run out of work to do and the
100 // server will exit.
101 Exitflag = ErrorFlag;
102 ELOG(0, e.message());
103 }
104}
[0bdd51b]105
[41c1b7]106/** Callback function when worker address has been received.
107 *
108 * \param e error code if something went wrong
109 * \param conn reference with the connection
110 */
111void FragmentScheduler::WorkerListener_t::handle_checkAddress(const boost::system::error_code& e, connection_ptr conn)
112{
113 Info info(__FUNCTION__);
114 if (!e)
115 {
116 if (pool.presentInPool(address)) {
117 // check whether its priority is busy_priority
118 if (pool.isWorkerBusy(address)) {
119 conn->async_read(result,
120 boost::bind(&FragmentScheduler::WorkerListener_t::handle_ReceiveResultFromWorker, this,
121 boost::asio::placeholders::error, conn));
122 } else {
123 ELOG(1, "INFO: Idle worker "+toString(address)+" has logged in again.");
124 }
[b0b64c]125 } else {
[41c1b7]126 // insert as its new worker
127 pool.addWorker(address);
128 enum EnrollInPoolOperation::EnrollFlag flag = EnrollInPoolOperation::Success;
129 conn->async_write(flag,
130 boost::bind(&FragmentScheduler::WorkerListener_t::handle_enrolled, this,
[c7deca]131 boost::asio::placeholders::error, conn));
[b0b64c]132 }
[cd4a6e]133 }
134 else
135 {
136 // An error occurred. Log it and return. Since we are not starting a new
137 // accept operation the io_service will run out of work to do and the
138 // server will exit.
[8036b7]139 Exitflag = ErrorFlag;
[b0b64c]140 ELOG(0, e.message());
[cd4a6e]141 }
[778abb]142
[95b384]143 if (JobsQueue.isJobPresent()) {
144 // Start an accept operation for a new Connection only when there
145 // are still jobs present
[8036b7]146 initiateSocket();
[95b384]147 }
[ed2c5b]148}
[72eaf7f]149
[41c1b7]150/** Callback function when new worker has enrolled.
[db03d9]151 *
152 * \param e error code if something went wrong
153 * \param conn reference with the connection
154 */
[41c1b7]155void FragmentScheduler::WorkerListener_t::handle_enrolled(const boost::system::error_code& e, connection_ptr conn)
[ed2c5b]156{
[41c1b7]157 Info info(__FUNCTION__);
158 if (!e)
159 {
160 FragmentJob::ptr job;
161 if (JobsQueue.isJobPresent()) {
162 job = JobsQueue.popJob();
163 } else {
164 job = NoJob;
165 }
166 callback_sendJobToWorker(address, job);
167 }
168 else
169 {
170 // An error occurred. Log it and return. Since we are not starting a new
171 // accept operation the io_service will run out of work to do and the
172 // server will exit.
173 Exitflag = ErrorFlag;
174 ELOG(0, e.message());
175 }
[ef2767]176}
177
[db03d9]178/** Callback function when result has been received.
179 *
180 * \param e error code if something went wrong
181 * \param conn reference with the connection
182 */
[8036b7]183void FragmentScheduler::WorkerListener_t::handle_ReceiveResultFromWorker(const boost::system::error_code& e, connection_ptr conn)
[ef2767]184{
[db03d9]185 Info info(__FUNCTION__);
[35f587]186 LOG(1, "INFO: Received result for job #" << result->getId() << " ...");
[41c1b7]187
[35f587]188 // and push into queue
189 ASSERT(result->getId() != (JobId_t)JobId::NoJob,
[41c1b7]190 "WorkerListener_t::handle_ReceiveResultFromWorker() - result received has NoJob id.");
[35f587]191 ASSERT(result->getId() != (JobId_t)JobId::IllegalJob,
[41c1b7]192 "WorkerListener_t::handle_ReceiveResultFromWorker() - result received has IllegalJob id.");
[778abb]193 // place id into expected
[35f587]194 if ((result->getId() != (JobId_t)JobId::NoJob) && (result->getId() != (JobId_t)JobId::IllegalJob))
[db03d9]195 JobsQueue.pushResult(result);
[41c1b7]196
197 // mark as idle
198 pool.unmarkWorkerBusy(address);
199 // for now remove worker again from pool such that other may connect
200 pool.removeWorker(address);
201
[db03d9]202 // erase result
[35f587]203 result.reset();
[778abb]204 LOG(1, "INFO: JobsQueue has " << JobsQueue.getDoneJobs() << " results.");
[db03d9]205}
206
[41c1b7]207
[db03d9]208/** Handle a new controller connection.
209 *
210 * \sa handle_ReceiveJobs()
211 * \sa handle_CheckResultState()
212 * \sa handle_SendResults()
213 *
214 * \param e error code if something went wrong
215 * \param conn reference with the connection
216 */
[8036b7]217void FragmentScheduler::ControllerListener_t::handle_Accept(const boost::system::error_code& e, connection_ptr conn)
[db03d9]218{
219 Info info(__FUNCTION__);
220 if (!e)
221 {
[778abb]222 conn->async_read(choice,
[8036b7]223 boost::bind(&FragmentScheduler::ControllerListener_t::handle_ReadChoice, this,
[778abb]224 boost::asio::placeholders::error, conn));
225 }
226 else
227 {
228 // An error occurred. Log it and return. Since we are not starting a new
229 // accept operation the io_service will run out of work to do and the
230 // server will exit.
[8036b7]231 Exitflag = ErrorFlag;
[778abb]232 ELOG(0, e.message());
233 }
234}
235
236/** Controller callback function to read the choice for next operation.
237 *
238 * \param e error code if something went wrong
239 * \param conn reference with the connection
240 */
[8036b7]241void FragmentScheduler::ControllerListener_t::handle_ReadChoice(const boost::system::error_code& e, connection_ptr conn)
[778abb]242{
243 Info info(__FUNCTION__);
244 if (!e)
245 {
[0196c6]246 bool LaunchNewAcceptor = true;
[d1dbfc]247 LOG(1, "INFO: Received request for operation " << choice << ".");
[778abb]248 // switch over the desired choice read previously
249 switch(choice) {
[38032a]250 case NoControllerOperation:
[778abb]251 {
252 ELOG(1, "FragmentScheduler::handle_ReadChoice() - called with NoOperation.");
253 break;
254 }
[d1dbfc]255 case GetNextJobId:
256 {
257 const JobId_t nextid = globalId.getNextId();
258 LOG(1, "INFO: Sending next available job id " << nextid << " to controller ...");
259 conn->async_write(nextid,
[8036b7]260 boost::bind(&FragmentScheduler::ControllerListener_t::handle_GetNextJobIdState, this,
[d1dbfc]261 boost::asio::placeholders::error, conn));
262 break;
263 }
[778abb]264 case ReceiveJobs:
[d1dbfc]265 {
266 // The connection::async_write() function will automatically
267 // serialize the data structure for us.
268 LOG(1, "INFO: Receiving bunch of jobs from a controller ...");
269 conn->async_read(jobs,
[8036b7]270 boost::bind(&FragmentScheduler::ControllerListener_t::handle_ReceiveJobs, this,
[d1dbfc]271 boost::asio::placeholders::error, conn));
272 break;
273 }
[778abb]274 case CheckState:
275 {
[3c4a5e]276 // first update number
[6f2bc7]277 jobInfo[0] = JobsQueue.getPresentJobs();
278 jobInfo[1] = JobsQueue.getDoneJobs();
[3c4a5e]279 // now we accept connections to check for state of calculations
[6f2bc7]280 LOG(1, "INFO: Sending state that "+toString(jobInfo[0])+" jobs are present and "+toString(jobInfo[1])+" jobs are done to controller ...");
281 conn->async_write(jobInfo,
[8036b7]282 boost::bind(&FragmentScheduler::ControllerListener_t::handle_CheckResultState, this,
[3c4a5e]283 boost::asio::placeholders::error, conn));
[778abb]284 break;
285 }
286 case SendResults:
287 {
[35f587]288 const std::vector<FragmentResult::ptr> results = JobsQueue.getAllResults();
[778abb]289 // ... or we give the results
290 LOG(1, "INFO: Sending "+toString(results.size())+" results to controller ...");
291 conn->async_write(results,
[8036b7]292 boost::bind(&FragmentScheduler::ControllerListener_t::handle_SendResults, this,
[778abb]293 boost::asio::placeholders::error, conn));
[0196c6]294 break;
295 }
[38032a]296 case ShutdownControllerSocket:
[0196c6]297 {
298 LaunchNewAcceptor = false;
[778abb]299 break;
[db03d9]300 }
[778abb]301 default:
[8036b7]302 Exitflag = ErrorFlag;
[778abb]303 ELOG(1, "FragmentScheduler::handle_ReadChoice() - called with no valid choice.");
304 break;
305 }
[38032a]306 // restore NoControllerOperation choice such that choice is not read twice
307 choice = NoControllerOperation;
[778abb]308
[0196c6]309 if (LaunchNewAcceptor) {
310 LOG(1, "Launching new acceptor on socket.");
311 // Start an accept operation for a new Connection.
[8036b7]312 initiateSocket();
[0196c6]313 }
[db03d9]314 }
315 else
316 {
317 // An error occurred. Log it and return. Since we are not starting a new
318 // accept operation the io_service will run out of work to do and the
319 // server will exit.
[8036b7]320 Exitflag = ErrorFlag;
[db03d9]321 ELOG(0, e.message());
322 }
323}
324
325/** Controller callback function when job has been sent.
[778abb]326 *
327 * We check here whether the worker socket is accepting, if there
328 * have been no jobs we re-activate it, as it is shut down after
329 * last job.
[db03d9]330 *
331 * \param e error code if something went wrong
332 * \param conn reference with the connection
333 */
[8036b7]334void FragmentScheduler::ControllerListener_t::handle_ReceiveJobs(const boost::system::error_code& e, connection_ptr conn)
[db03d9]335{
336 Info info(__FUNCTION__);
[8036b7]337 bool need_initiateSocket = !JobsQueue.isJobPresent();
[778abb]338
[db03d9]339 // jobs are received, hence place in JobsQueue
340 if (!jobs.empty()) {
341 LOG(1, "INFO: Pushing " << jobs.size() << " jobs into queue.");
342 JobsQueue.pushJobs(jobs);
343 }
344
345 jobs.clear();
[778abb]346
[8036b7]347 // initiate socket if we had no jobs before
348 if (need_initiateSocket)
349 initiateWorkerSocket();
[ed2c5b]350}
[cd4a6e]351
[3c4a5e]352/** Controller callback function when checking on state of results.
353 *
354 * \param e error code if something went wrong
355 * \param conn reference with the connection
356 */
[8036b7]357void FragmentScheduler::ControllerListener_t::handle_CheckResultState(const boost::system::error_code& e, connection_ptr conn)
[3c4a5e]358{
359 Info info(__FUNCTION__);
360 // do nothing
[6f2bc7]361 LOG(1, "INFO: Sent that " << jobInfo << " jobs are (scheduled, done).");
[3c4a5e]362}
[778abb]363
[d1dbfc]364/** Controller callback function when checking on state of results.
365 *
366 * \param e error code if something went wrong
367 * \param conn reference with the connection
368 */
[8036b7]369void FragmentScheduler::ControllerListener_t::handle_GetNextJobIdState(const boost::system::error_code& e, connection_ptr conn)
[d1dbfc]370{
371 Info info(__FUNCTION__);
372 // do nothing
373 LOG(1, "INFO: Sent next available job id.");
374}
375
[778abb]376/** Controller callback function when result has been received.
377 *
378 * \param e error code if something went wrong
379 * \param conn reference with the connection
380 */
[8036b7]381void FragmentScheduler::ControllerListener_t::handle_SendResults(const boost::system::error_code& e, connection_ptr conn)
[778abb]382{
383 Info info(__FUNCTION__);
384 // do nothing
385 LOG(1, "INFO: Results have been sent.");
386}
387
[41c1b7]388
389/** Helper function to send a job to worker.
390 *
391 * @param address address of worker
392 * @param job job to send
393 */
394void FragmentScheduler::sendJobToWorker(const WorkerAddress &address, FragmentJob::ptr &job)
395{
396 ASSERT( !pool.isWorkerBusy(address),
397 "FragmentScheduler::sendJobToWorker() - Worker "+toString(address)+" is already busy.");
398 LOG(1, "INFO: Sending job " << job->getId() << " to worker " << address << ".");
399 sendJobOp.setJob(job);
400 sendJobOp(address.host, address.service);
401 // set worker as busy (assuming it has been removed from idle queue already)
402 WorkerPool::Idle_Queue_t::iterator iter = pool.getIdleWorker(address);
403 ASSERT( iter != pool.getIdleEnd(),
404 "FragmentScheduler::sendJobToWorker() - cannot find worker "
405 +toString(address)+" in idle queue.");
406 pool.markWorkerBusy(iter);
407}
408
409///** Helper function to shutdown a single worker.
410// *
411// * We send NoJob to indicate shutdown
412// *
413// * @param address of worker to shutdown
414// */
415//void FragmentScheduler::shutdownWorker(const WorkerAddress &address)
416//{
417// sendJobToWorker(address, NoJob);
418//}
419//
420///** Sends shutdown to all current workers in the pool.
421// *
422// */
423//void FragmentScheduler::removeAllWorkers()
424//{
425// // give all workers shutdown signal
426// while (pool.presentIdleWorkers()) {
427// const WorkerAddress address = pool.getNextIdleWorker();
428// shutdownWorker(address);
429// }
430//}
Note: See TracBrowser for help on using the repository browser.