source: src/Fragmentation/Automation/FragmentScheduler.cpp@ 95454a

Action_Thermostats Add_AtomRandomPerturbation Add_FitFragmentPartialChargesAction Add_RotateAroundBondAction Add_SelectAtomByNameAction Added_ParseSaveFragmentResults AddingActions_SaveParseParticleParameters Adding_Graph_to_ChangeBondActions Adding_MD_integration_tests Adding_ParticleName_to_Atom Adding_StructOpt_integration_tests AtomFragments Automaking_mpqc_open AutomationFragmentation_failures Candidate_v1.5.4 Candidate_v1.6.0 Candidate_v1.6.1 ChangeBugEmailaddress ChangingTestPorts ChemicalSpaceEvaluator CombiningParticlePotentialParsing Combining_Subpackages Debian_Package_split Debian_package_split_molecuildergui_only Disabling_MemDebug Docu_Python_wait EmpiricalPotential_contain_HomologyGraph EmpiricalPotential_contain_HomologyGraph_documentation Enable_parallel_make_install Enhance_userguide Enhanced_StructuralOptimization Enhanced_StructuralOptimization_continued Example_ManyWaysToTranslateAtom Exclude_Hydrogens_annealWithBondGraph FitPartialCharges_GlobalError Fix_BoundInBox_CenterInBox_MoleculeActions Fix_ChargeSampling_PBC Fix_ChronosMutex Fix_FitPartialCharges Fix_FitPotential_needs_atomicnumbers Fix_ForceAnnealing Fix_IndependentFragmentGrids Fix_ParseParticles Fix_ParseParticles_split_forward_backward_Actions Fix_PopActions Fix_QtFragmentList_sorted_selection Fix_Restrictedkeyset_FragmentMolecule Fix_StatusMsg Fix_StepWorldTime_single_argument Fix_Verbose_Codepatterns Fix_fitting_potentials Fixes ForceAnnealing_goodresults ForceAnnealing_oldresults ForceAnnealing_tocheck ForceAnnealing_with_BondGraph ForceAnnealing_with_BondGraph_continued ForceAnnealing_with_BondGraph_continued_betteresults ForceAnnealing_with_BondGraph_contraction-expansion FragmentAction_writes_AtomFragments FragmentMolecule_checks_bonddegrees GeometryObjects Gui_Fixes Gui_displays_atomic_force_velocity ImplicitCharges IndependentFragmentGrids IndependentFragmentGrids_IndividualZeroInstances IndependentFragmentGrids_IntegrationTest IndependentFragmentGrids_Sole_NN_Calculation JobMarket_RobustOnKillsSegFaults JobMarket_StableWorkerPool JobMarket_unresolvable_hostname_fix MoreRobust_FragmentAutomation ODR_violation_mpqc_open PartialCharges_OrthogonalSummation PdbParser_setsAtomName PythonUI_with_named_parameters QtGui_reactivate_TimeChanged_changes Recreated_GuiChecks Rewrite_FitPartialCharges RotateToPrincipalAxisSystem_UndoRedo SaturateAtoms_findBestMatching SaturateAtoms_singleDegree StoppableMakroAction Subpackage_CodePatterns Subpackage_JobMarket Subpackage_LinearAlgebra Subpackage_levmar Subpackage_mpqc_open Subpackage_vmg Switchable_LogView ThirdParty_MPQC_rebuilt_buildsystem TrajectoryDependenant_MaxOrder TremoloParser_IncreasedPrecision TremoloParser_MultipleTimesteps TremoloParser_setsAtomName Ubuntu_1604_changes stable
Last change on this file since 95454a was 8036b7, checked in by Frederik Heber <heber@…>, 13 years ago

Refactored Listener out of FragmentScheduler.

  • this is preparatory for creating PoolWorker class, i.e. Worker that listen for jobs sent to them by the server.
  • the connection is just reset() on new initiateSocket().
  • closeSocket() correctly shutdown()s and close()s the socket.
  • Note: As ControllerListener receives new jobs and thus knows when the server is required to listen on worker port again, it needs a bound function to WorkerListener_t::initiateSocket(). This will be removed when the new Pool is in place that handles Worker connections.
  • Property mode set to 100644
File size: 11.3 KB
RevLine 
[72eaf7f]1/*
[cd4a6e]2 * Project: MoleCuilder
3 * Description: creates and alters molecular systems
4 * Copyright (C) 2011 University of Bonn. All rights reserved.
5 * Please see the LICENSE file or "Copyright notice" in builder.cpp for details.
6 */
7
8/*
9 * \file FragmentScheduler.cpp
10 *
11 * This file strongly follows the Serialization example from the boost::asio
12 * library (see server.cpp)
[72eaf7f]13 *
[cd4a6e]14 * Created on: Oct 19, 2011
[72eaf7f]15 * Author: heber
16 */
17
[f93842]18// include config.h
19#ifdef HAVE_CONFIG_H
20#include <config.h>
21#endif
22
[c6bcd0]23// boost asio needs specific operator new
[72eaf7f]24#include <boost/asio.hpp>
[c6bcd0]25
26#include "CodePatterns/MemDebug.hpp"
27
[72eaf7f]28#include <boost/bind.hpp>
29#include <boost/lexical_cast.hpp>
30#include <iostream>
31#include <vector>
[af3aed]32#include "Connection.hpp" // Must come before boost/serialization headers.
[72eaf7f]33#include <boost/serialization/vector.hpp>
[af3aed]34#include "CodePatterns/Info.hpp"
[b0b64c]35#include "CodePatterns/Log.hpp"
[ff60cfa]36#include "Jobs/MPQCCommandJob.hpp"
[d920b9]37#include "Jobs/SystemCommandJob.hpp"
[ef2767]38#include "JobId.hpp"
[72eaf7f]39
[cd4a6e]40#include "FragmentScheduler.hpp"
[72eaf7f]41
[8036b7]42FragmentJob::ptr FragmentScheduler::WorkerListener_t::NoJob(new SystemCommandJob(std::string("/bin/true"), std::string("dosomething"), JobId::NoJob));
[ff60cfa]43
44/** Helper function to enforce binding of FragmentWorker to possible derived
45 * FragmentJob classes.
46 */
47void dummyInit() {
48 SystemCommandJob("/bin/false", "something", JobId::IllegalJob);
49 MPQCCommandJob("nofile", JobId::IllegalJob);
50}
[c7deca]51
[db03d9]52/** Constructor of class FragmentScheduler.
53 *
54 * We setup both acceptors to accept connections from workers and Controller.
55 *
56 * \param io_service io_service of the asynchronous communications
57 * \param workerport port to listen for worker connections
58 * \param controllerport port to listen for controller connections.
59 */
60FragmentScheduler::FragmentScheduler(boost::asio::io_service& io_service, unsigned short workerport, unsigned short controllerport) :
[8036b7]61 WorkerListener(io_service, workerport, JobsQueue),
62 ControllerListener(io_service, controllerport, JobsQueue,
63 boost::bind(&Listener::initiateSocket, boost::ref(WorkerListener)))
[ed2c5b]64{
[b0b64c]65 Info info(__FUNCTION__);
[72eaf7f]66
[778abb]67 // only initiate socket if jobs are already present
68 if (JobsQueue.isJobPresent()) {
[8036b7]69 WorkerListener.initiateSocket();
[778abb]70 }
[402bde]71
[8036b7]72 ControllerListener.initiateSocket();
[402bde]73}
74
[db03d9]75/** Handle a new worker connection.
76 *
77 * We check whether jobs are in the JobsQueue. If present, job is sent.
78 *
79 * \sa handle_SendJobtoWorker()
80 *
81 * \param e error code if something went wrong
82 * \param conn reference with the connection
83 */
[8036b7]84void FragmentScheduler::WorkerListener_t::handle_Accept(const boost::system::error_code& e, connection_ptr conn)
[ed2c5b]85{
[cd4a6e]86 Info info(__FUNCTION__);
[ed2c5b]87 if (!e)
[72eaf7f]88 {
[b0b64c]89 // Successfully accepted a new connection.
90 // Check whether there are jobs in the queue
91 if (JobsQueue.isJobPresent()) {
92 // pop a job and send it to the client.
[78ad7d]93 FragmentJob::ptr job(JobsQueue.popJob());
[b0b64c]94 // The connection::async_write() function will automatically
95 // serialize the data structure for us.
[78ad7d]96 LOG(1, "INFO: Sending job #" << job->getId() << ".");
[ef2767]97 conn->async_write(job,
[8036b7]98 boost::bind(&FragmentScheduler::WorkerListener_t::handle_SendJobtoWorker, this,
[b0b64c]99 boost::asio::placeholders::error, conn));
[0bdd51b]100
[b0b64c]101 } else {
[c7deca]102 // send the static NoJob
103 conn->async_write(NoJob,
[8036b7]104 boost::bind(&FragmentScheduler::WorkerListener_t::handle_SendJobtoWorker, this,
[c7deca]105 boost::asio::placeholders::error, conn));
106
[ef2767]107 // then there must be no read necesary
108
[b0b64c]109 ELOG(2, "There is currently no job present in the queue.");
110 }
[cd4a6e]111 }
112 else
113 {
114 // An error occurred. Log it and return. Since we are not starting a new
115 // accept operation the io_service will run out of work to do and the
116 // server will exit.
[8036b7]117 Exitflag = ErrorFlag;
[b0b64c]118 ELOG(0, e.message());
[cd4a6e]119 }
[778abb]120
[95b384]121 if (JobsQueue.isJobPresent()) {
122 // Start an accept operation for a new Connection only when there
123 // are still jobs present
[8036b7]124 initiateSocket();
[95b384]125 }
[ed2c5b]126}
[72eaf7f]127
[db03d9]128/** Callback function when job has been sent.
129 *
130 * After job has been sent we start async_read() for the result.
131 *
132 * \sa handle_ReceiveResultFromWorker()
133 *
134 * \param e error code if something went wrong
135 * \param conn reference with the connection
136 */
[8036b7]137void FragmentScheduler::WorkerListener_t::handle_SendJobtoWorker(const boost::system::error_code& e, connection_ptr conn)
[ed2c5b]138{
[af3aed]139 Info info(__FUNCTION__);
[ef2767]140 LOG(1, "INFO: Job sent.");
141 // obtain result
142 LOG(1, "INFO: Receiving result for a job ...");
143 conn->async_read(result,
[8036b7]144 boost::bind(&FragmentScheduler::WorkerListener_t::handle_ReceiveResultFromWorker, this,
[ef2767]145 boost::asio::placeholders::error, conn));
146}
147
[db03d9]148/** Callback function when result has been received.
149 *
150 * \param e error code if something went wrong
151 * \param conn reference with the connection
152 */
[8036b7]153void FragmentScheduler::WorkerListener_t::handle_ReceiveResultFromWorker(const boost::system::error_code& e, connection_ptr conn)
[ef2767]154{
[db03d9]155 Info info(__FUNCTION__);
[35f587]156 LOG(1, "INFO: Received result for job #" << result->getId() << " ...");
157 // and push into queue
158 ASSERT(result->getId() != (JobId_t)JobId::NoJob,
[db03d9]159 "FragmentScheduler::handle_ReceiveResultFromWorker() - result received has NoJob id.");
[35f587]160 ASSERT(result->getId() != (JobId_t)JobId::IllegalJob,
[db03d9]161 "FragmentScheduler::handle_ReceiveResultFromWorker() - result received has IllegalJob id.");
[778abb]162 // place id into expected
[35f587]163 if ((result->getId() != (JobId_t)JobId::NoJob) && (result->getId() != (JobId_t)JobId::IllegalJob))
[db03d9]164 JobsQueue.pushResult(result);
165 // erase result
[35f587]166 result.reset();
[778abb]167 LOG(1, "INFO: JobsQueue has " << JobsQueue.getDoneJobs() << " results.");
[db03d9]168}
169
170/** Handle a new controller connection.
171 *
172 * \sa handle_ReceiveJobs()
173 * \sa handle_CheckResultState()
174 * \sa handle_SendResults()
175 *
176 * \param e error code if something went wrong
177 * \param conn reference with the connection
178 */
[8036b7]179void FragmentScheduler::ControllerListener_t::handle_Accept(const boost::system::error_code& e, connection_ptr conn)
[db03d9]180{
181 Info info(__FUNCTION__);
182 if (!e)
183 {
[778abb]184 conn->async_read(choice,
[8036b7]185 boost::bind(&FragmentScheduler::ControllerListener_t::handle_ReadChoice, this,
[778abb]186 boost::asio::placeholders::error, conn));
187 }
188 else
189 {
190 // An error occurred. Log it and return. Since we are not starting a new
191 // accept operation the io_service will run out of work to do and the
192 // server will exit.
[8036b7]193 Exitflag = ErrorFlag;
[778abb]194 ELOG(0, e.message());
195 }
196}
197
198/** Controller callback function to read the choice for next operation.
199 *
200 * \param e error code if something went wrong
201 * \param conn reference with the connection
202 */
[8036b7]203void FragmentScheduler::ControllerListener_t::handle_ReadChoice(const boost::system::error_code& e, connection_ptr conn)
[778abb]204{
205 Info info(__FUNCTION__);
206 if (!e)
207 {
[0196c6]208 bool LaunchNewAcceptor = true;
[d1dbfc]209 LOG(1, "INFO: Received request for operation " << choice << ".");
[778abb]210 // switch over the desired choice read previously
211 switch(choice) {
212 case NoOperation:
213 {
214 ELOG(1, "FragmentScheduler::handle_ReadChoice() - called with NoOperation.");
215 break;
216 }
[d1dbfc]217 case GetNextJobId:
218 {
219 const JobId_t nextid = globalId.getNextId();
220 LOG(1, "INFO: Sending next available job id " << nextid << " to controller ...");
221 conn->async_write(nextid,
[8036b7]222 boost::bind(&FragmentScheduler::ControllerListener_t::handle_GetNextJobIdState, this,
[d1dbfc]223 boost::asio::placeholders::error, conn));
224 break;
225 }
[778abb]226 case ReceiveJobs:
[d1dbfc]227 {
228 // The connection::async_write() function will automatically
229 // serialize the data structure for us.
230 LOG(1, "INFO: Receiving bunch of jobs from a controller ...");
231 conn->async_read(jobs,
[8036b7]232 boost::bind(&FragmentScheduler::ControllerListener_t::handle_ReceiveJobs, this,
[d1dbfc]233 boost::asio::placeholders::error, conn));
234 break;
235 }
[778abb]236 case CheckState:
237 {
[3c4a5e]238 // first update number
[6f2bc7]239 jobInfo[0] = JobsQueue.getPresentJobs();
240 jobInfo[1] = JobsQueue.getDoneJobs();
[3c4a5e]241 // now we accept connections to check for state of calculations
[6f2bc7]242 LOG(1, "INFO: Sending state that "+toString(jobInfo[0])+" jobs are present and "+toString(jobInfo[1])+" jobs are done to controller ...");
243 conn->async_write(jobInfo,
[8036b7]244 boost::bind(&FragmentScheduler::ControllerListener_t::handle_CheckResultState, this,
[3c4a5e]245 boost::asio::placeholders::error, conn));
[778abb]246 break;
247 }
248 case SendResults:
249 {
[35f587]250 const std::vector<FragmentResult::ptr> results = JobsQueue.getAllResults();
[778abb]251 // ... or we give the results
252 LOG(1, "INFO: Sending "+toString(results.size())+" results to controller ...");
253 conn->async_write(results,
[8036b7]254 boost::bind(&FragmentScheduler::ControllerListener_t::handle_SendResults, this,
[778abb]255 boost::asio::placeholders::error, conn));
[0196c6]256 break;
257 }
258 case Shutdown:
259 {
260 LaunchNewAcceptor = false;
[778abb]261 break;
[db03d9]262 }
[778abb]263 default:
[8036b7]264 Exitflag = ErrorFlag;
[778abb]265 ELOG(1, "FragmentScheduler::handle_ReadChoice() - called with no valid choice.");
266 break;
267 }
268 // restore NoOperation choice such that choice is not read twice
269 choice = NoOperation;
270
[0196c6]271 if (LaunchNewAcceptor) {
272 LOG(1, "Launching new acceptor on socket.");
273 // Start an accept operation for a new Connection.
[8036b7]274 initiateSocket();
[0196c6]275 }
[db03d9]276 }
277 else
278 {
279 // An error occurred. Log it and return. Since we are not starting a new
280 // accept operation the io_service will run out of work to do and the
281 // server will exit.
[8036b7]282 Exitflag = ErrorFlag;
[db03d9]283 ELOG(0, e.message());
284 }
285}
286
287/** Controller callback function when job has been sent.
[778abb]288 *
289 * We check here whether the worker socket is accepting, if there
290 * have been no jobs we re-activate it, as it is shut down after
291 * last job.
[db03d9]292 *
293 * \param e error code if something went wrong
294 * \param conn reference with the connection
295 */
[8036b7]296void FragmentScheduler::ControllerListener_t::handle_ReceiveJobs(const boost::system::error_code& e, connection_ptr conn)
[db03d9]297{
298 Info info(__FUNCTION__);
[8036b7]299 bool need_initiateSocket = !JobsQueue.isJobPresent();
[778abb]300
[db03d9]301 // jobs are received, hence place in JobsQueue
302 if (!jobs.empty()) {
303 LOG(1, "INFO: Pushing " << jobs.size() << " jobs into queue.");
304 JobsQueue.pushJobs(jobs);
305 }
306
307 jobs.clear();
[778abb]308
[8036b7]309 // initiate socket if we had no jobs before
310 if (need_initiateSocket)
311 initiateWorkerSocket();
[ed2c5b]312}
[cd4a6e]313
[3c4a5e]314/** Controller callback function when checking on state of results.
315 *
316 * \param e error code if something went wrong
317 * \param conn reference with the connection
318 */
[8036b7]319void FragmentScheduler::ControllerListener_t::handle_CheckResultState(const boost::system::error_code& e, connection_ptr conn)
[3c4a5e]320{
321 Info info(__FUNCTION__);
322 // do nothing
[6f2bc7]323 LOG(1, "INFO: Sent that " << jobInfo << " jobs are (scheduled, done).");
[3c4a5e]324}
[778abb]325
[d1dbfc]326/** Controller callback function when checking on state of results.
327 *
328 * \param e error code if something went wrong
329 * \param conn reference with the connection
330 */
[8036b7]331void FragmentScheduler::ControllerListener_t::handle_GetNextJobIdState(const boost::system::error_code& e, connection_ptr conn)
[d1dbfc]332{
333 Info info(__FUNCTION__);
334 // do nothing
335 LOG(1, "INFO: Sent next available job id.");
336}
337
[778abb]338/** Controller callback function when result has been received.
339 *
340 * \param e error code if something went wrong
341 * \param conn reference with the connection
342 */
[8036b7]343void FragmentScheduler::ControllerListener_t::handle_SendResults(const boost::system::error_code& e, connection_ptr conn)
[778abb]344{
345 Info info(__FUNCTION__);
346 // do nothing
347 LOG(1, "INFO: Results have been sent.");
348}
349
Note: See TracBrowser for help on using the repository browser.