source: src/Fragmentation/Automation/FragmentScheduler.cpp@ ff60cfa

Action_Thermostats Add_AtomRandomPerturbation Add_FitFragmentPartialChargesAction Add_RotateAroundBondAction Add_SelectAtomByNameAction Added_ParseSaveFragmentResults AddingActions_SaveParseParticleParameters Adding_Graph_to_ChangeBondActions Adding_MD_integration_tests Adding_ParticleName_to_Atom Adding_StructOpt_integration_tests AtomFragments Automaking_mpqc_open AutomationFragmentation_failures Candidate_v1.5.4 Candidate_v1.6.0 Candidate_v1.6.1 ChangeBugEmailaddress ChangingTestPorts ChemicalSpaceEvaluator CombiningParticlePotentialParsing Combining_Subpackages Debian_Package_split Debian_package_split_molecuildergui_only Disabling_MemDebug Docu_Python_wait EmpiricalPotential_contain_HomologyGraph EmpiricalPotential_contain_HomologyGraph_documentation Enable_parallel_make_install Enhance_userguide Enhanced_StructuralOptimization Enhanced_StructuralOptimization_continued Example_ManyWaysToTranslateAtom Exclude_Hydrogens_annealWithBondGraph FitPartialCharges_GlobalError Fix_BoundInBox_CenterInBox_MoleculeActions Fix_ChargeSampling_PBC Fix_ChronosMutex Fix_FitPartialCharges Fix_FitPotential_needs_atomicnumbers Fix_ForceAnnealing Fix_IndependentFragmentGrids Fix_ParseParticles Fix_ParseParticles_split_forward_backward_Actions Fix_PopActions Fix_QtFragmentList_sorted_selection Fix_Restrictedkeyset_FragmentMolecule Fix_StatusMsg Fix_StepWorldTime_single_argument Fix_Verbose_Codepatterns Fix_fitting_potentials Fixes ForceAnnealing_goodresults ForceAnnealing_oldresults ForceAnnealing_tocheck ForceAnnealing_with_BondGraph ForceAnnealing_with_BondGraph_continued ForceAnnealing_with_BondGraph_continued_betteresults ForceAnnealing_with_BondGraph_contraction-expansion FragmentAction_writes_AtomFragments FragmentMolecule_checks_bonddegrees GeometryObjects Gui_Fixes Gui_displays_atomic_force_velocity ImplicitCharges IndependentFragmentGrids IndependentFragmentGrids_IndividualZeroInstances IndependentFragmentGrids_IntegrationTest IndependentFragmentGrids_Sole_NN_Calculation JobMarket_RobustOnKillsSegFaults JobMarket_StableWorkerPool JobMarket_unresolvable_hostname_fix MoreRobust_FragmentAutomation ODR_violation_mpqc_open PartialCharges_OrthogonalSummation PdbParser_setsAtomName PythonUI_with_named_parameters QtGui_reactivate_TimeChanged_changes Recreated_GuiChecks Rewrite_FitPartialCharges RotateToPrincipalAxisSystem_UndoRedo SaturateAtoms_findBestMatching SaturateAtoms_singleDegree StoppableMakroAction Subpackage_CodePatterns Subpackage_JobMarket Subpackage_LinearAlgebra Subpackage_levmar Subpackage_mpqc_open Subpackage_vmg Switchable_LogView ThirdParty_MPQC_rebuilt_buildsystem TrajectoryDependenant_MaxOrder TremoloParser_IncreasedPrecision TremoloParser_MultipleTimesteps TremoloParser_setsAtomName Ubuntu_1604_changes stable
Last change on this file since ff60cfa was ff60cfa, checked in by Frederik Heber <heber@…>, 13 years ago

Controller has new function addjobs.

  • addjobs parses mpqc input file and creates MPQCCommandJob which is sent to send jobs
  • old sendjobs is now createjobs which creates empty SystemCommandJob.
  • added new class GlobalJobId which holds a global JobId such that FragmentJobs created by Controller each have a unique id. NOTE: This should probably be replaced by IdPool implementation when Fragmentation/Automation is integrated into rest of molecuilder.
  • dummyInit() to have MPQCCommandJob instances known to FragmentScheduler.
  • TESTFIX: Adapted regression tests Fragmentation/Automation adding-jobs, server-worker, and completerun due to command token change.
  • Property mode set to 100644
File size: 11.4 KB
Line 
1/*
2 * Project: MoleCuilder
3 * Description: creates and alters molecular systems
4 * Copyright (C) 2011 University of Bonn. All rights reserved.
5 * Please see the LICENSE file or "Copyright notice" in builder.cpp for details.
6 */
7
8/*
9 * \file FragmentScheduler.cpp
10 *
11 * This file strongly follows the Serialization example from the boost::asio
12 * library (see server.cpp)
13 *
14 * Created on: Oct 19, 2011
15 * Author: heber
16 */
17
18// include config.h
19#ifdef HAVE_CONFIG_H
20#include <config.h>
21#endif
22
23// boost asio needs specific operator new
24#include <boost/asio.hpp>
25
26#include "CodePatterns/MemDebug.hpp"
27
28#include <boost/bind.hpp>
29#include <boost/lexical_cast.hpp>
30#include <iostream>
31#include <vector>
32#include "Connection.hpp" // Must come before boost/serialization headers.
33#include <boost/serialization/vector.hpp>
34#include "CodePatterns/Info.hpp"
35#include "CodePatterns/Log.hpp"
36#include "Jobs/MPQCCommandJob.hpp"
37#include "Jobs/SystemCommandJob.hpp"
38#include "JobId.hpp"
39
40#include "FragmentScheduler.hpp"
41
42FragmentJob::ptr FragmentScheduler::NoJob(new SystemCommandJob(std::string("/bin/true"), std::string("dosomething"), JobId::NoJob));
43
44/** Helper function to enforce binding of FragmentWorker to possible derived
45 * FragmentJob classes.
46 */
47void dummyInit() {
48 SystemCommandJob("/bin/false", "something", JobId::IllegalJob);
49 MPQCCommandJob("nofile", JobId::IllegalJob);
50}
51
52/** Constructor of class FragmentScheduler.
53 *
54 * We setup both acceptors to accept connections from workers and Controller.
55 *
56 * \param io_service io_service of the asynchronous communications
57 * \param workerport port to listen for worker connections
58 * \param controllerport port to listen for controller connections.
59 */
60FragmentScheduler::FragmentScheduler(boost::asio::io_service& io_service, unsigned short workerport, unsigned short controllerport) :
61 worker_acceptor_(io_service,
62 boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), workerport)
63 ),
64 controller_acceptor_(io_service,
65 boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), controllerport)
66 ),
67 result( new FragmentResult(JobId::NoJob) ),
68 choice(NoOperation),
69 Exitflag(OkFlag)
70{
71 Info info(__FUNCTION__);
72
73 // only initiate socket if jobs are already present
74 if (JobsQueue.isJobPresent()) {
75 LOG(1, "Listening for workers on port " << workerport << ".");
76 initiateWorkerSocket();
77 }
78
79 initiateControllerSocket();
80 LOG(1, "Listening for controller on port " << controllerport << ".");
81}
82
83/** Internal function to start worker connection.
84 *
85 */
86void FragmentScheduler::initiateWorkerSocket()
87{
88 // Start an accept operation for worker connections.
89 connection_ptr new_conn(new Connection(worker_acceptor_.get_io_service()));
90 worker_acceptor_.async_accept(new_conn->socket(),
91 boost::bind(&FragmentScheduler::handle_AcceptWorker, this,
92 boost::asio::placeholders::error, new_conn));
93}
94
95/** Internal function to start controller connection.
96 *
97 */
98void FragmentScheduler::initiateControllerSocket()
99{
100 // Start an accept operation for controller connection.
101 connection_ptr new_conn(new Connection(controller_acceptor_.get_io_service()));
102 controller_acceptor_.async_accept(new_conn->socket(),
103 boost::bind(&FragmentScheduler::handle_AcceptController, this,
104 boost::asio::placeholders::error, new_conn));
105}
106
107
108/** Handle a new worker connection.
109 *
110 * We check whether jobs are in the JobsQueue. If present, job is sent.
111 *
112 * \sa handle_SendJobtoWorker()
113 *
114 * \param e error code if something went wrong
115 * \param conn reference with the connection
116 */
117void FragmentScheduler::handle_AcceptWorker(const boost::system::error_code& e, connection_ptr conn)
118{
119 Info info(__FUNCTION__);
120 if (!e)
121 {
122 // Successfully accepted a new connection.
123 // Check whether there are jobs in the queue
124 if (JobsQueue.isJobPresent()) {
125 // pop a job and send it to the client.
126 FragmentJob::ptr job(JobsQueue.popJob());
127 // The connection::async_write() function will automatically
128 // serialize the data structure for us.
129 LOG(1, "INFO: Sending job #" << job->getId() << ".");
130 conn->async_write(job,
131 boost::bind(&FragmentScheduler::handle_SendJobtoWorker, this,
132 boost::asio::placeholders::error, conn));
133
134 } else {
135 // send the static NoJob
136 conn->async_write(NoJob,
137 boost::bind(&FragmentScheduler::handle_SendJobtoWorker, this,
138 boost::asio::placeholders::error, conn));
139
140 // then there must be no read necesary
141
142 ELOG(2, "There is currently no job present in the queue.");
143 }
144 }
145 else
146 {
147 // An error occurred. Log it and return. Since we are not starting a new
148 // accept operation the io_service will run out of work to do and the
149 // server will exit.
150 Exitflag = WorkerErrorFlag;
151 ELOG(0, e.message());
152 }
153
154 // Start an accept operation for a new Connection only when there
155 // are still jobs present
156 if (JobsQueue.isJobPresent())
157 initiateWorkerSocket();
158}
159
160/** Callback function when job has been sent.
161 *
162 * After job has been sent we start async_read() for the result.
163 *
164 * \sa handle_ReceiveResultFromWorker()
165 *
166 * \param e error code if something went wrong
167 * \param conn reference with the connection
168 */
169void FragmentScheduler::handle_SendJobtoWorker(const boost::system::error_code& e, connection_ptr conn)
170{
171 Info info(__FUNCTION__);
172 LOG(1, "INFO: Job sent.");
173 // obtain result
174 LOG(1, "INFO: Receiving result for a job ...");
175 conn->async_read(result,
176 boost::bind(&FragmentScheduler::handle_ReceiveResultFromWorker, this,
177 boost::asio::placeholders::error, conn));
178}
179
180/** Callback function when result has been received.
181 *
182 * \param e error code if something went wrong
183 * \param conn reference with the connection
184 */
185void FragmentScheduler::handle_ReceiveResultFromWorker(const boost::system::error_code& e, connection_ptr conn)
186{
187 Info info(__FUNCTION__);
188 LOG(1, "INFO: Received result for job #" << result->getId() << " ...");
189 // and push into queue
190 ASSERT(result->getId() != (JobId_t)JobId::NoJob,
191 "FragmentScheduler::handle_ReceiveResultFromWorker() - result received has NoJob id.");
192 ASSERT(result->getId() != (JobId_t)JobId::IllegalJob,
193 "FragmentScheduler::handle_ReceiveResultFromWorker() - result received has IllegalJob id.");
194 // place id into expected
195 if ((result->getId() != (JobId_t)JobId::NoJob) && (result->getId() != (JobId_t)JobId::IllegalJob))
196 JobsQueue.pushResult(result);
197 // erase result
198 result.reset();
199 LOG(1, "INFO: JobsQueue has " << JobsQueue.getDoneJobs() << " results.");
200}
201
202/** Handle a new controller connection.
203 *
204 * \sa handle_ReceiveJobs()
205 * \sa handle_CheckResultState()
206 * \sa handle_SendResults()
207 *
208 * \param e error code if something went wrong
209 * \param conn reference with the connection
210 */
211void FragmentScheduler::handle_AcceptController(const boost::system::error_code& e, connection_ptr conn)
212{
213 Info info(__FUNCTION__);
214 if (!e)
215 {
216 conn->async_read(choice,
217 boost::bind(&FragmentScheduler::handle_ReadChoice, this,
218 boost::asio::placeholders::error, conn));
219 }
220 else
221 {
222 // An error occurred. Log it and return. Since we are not starting a new
223 // accept operation the io_service will run out of work to do and the
224 // server will exit.
225 Exitflag = ControllerErrorFlag;
226 ELOG(0, e.message());
227 }
228}
229
230/** Controller callback function to read the choice for next operation.
231 *
232 * \param e error code if something went wrong
233 * \param conn reference with the connection
234 */
235void FragmentScheduler::handle_ReadChoice(const boost::system::error_code& e, connection_ptr conn)
236{
237 Info info(__FUNCTION__);
238 if (!e)
239 {
240 bool LaunchNewAcceptor = true;
241 // switch over the desired choice read previously
242 switch(choice) {
243 case NoOperation:
244 {
245 ELOG(1, "FragmentScheduler::handle_ReadChoice() - called with NoOperation.");
246 break;
247 }
248 case ReceiveJobs:
249 {
250 // The connection::async_write() function will automatically
251 // serialize the data structure for us.
252 LOG(1, "INFO: Receiving bunch of jobs from a controller ...");
253 conn->async_read(jobs,
254 boost::bind(&FragmentScheduler::handle_ReceiveJobs, this,
255 boost::asio::placeholders::error, conn));
256 break;
257 }
258 case CheckState:
259 {
260 // first update number
261 doneJobs = JobsQueue.getDoneJobs();
262 // now we accept connections to check for state of calculations
263 LOG(1, "INFO: Sending state that "+toString(doneJobs)+" jobs are done to controller ...");
264 conn->async_write(doneJobs,
265 boost::bind(&FragmentScheduler::handle_CheckResultState, this,
266 boost::asio::placeholders::error, conn));
267 break;
268 }
269 case SendResults:
270 {
271 const std::vector<FragmentResult::ptr> results = JobsQueue.getAllResults();
272 // ... or we give the results
273 LOG(1, "INFO: Sending "+toString(results.size())+" results to controller ...");
274 conn->async_write(results,
275 boost::bind(&FragmentScheduler::handle_SendResults, this,
276 boost::asio::placeholders::error, conn));
277 break;
278 }
279 case Shutdown:
280 {
281 LaunchNewAcceptor = false;
282 break;
283 }
284 default:
285 Exitflag = ControllerErrorFlag;
286 ELOG(1, "FragmentScheduler::handle_ReadChoice() - called with no valid choice.");
287 break;
288 }
289 // restore NoOperation choice such that choice is not read twice
290 choice = NoOperation;
291
292 if (LaunchNewAcceptor) {
293 LOG(1, "Launching new acceptor on socket.");
294 // Start an accept operation for a new Connection.
295 connection_ptr new_conn(new Connection(controller_acceptor_.get_io_service()));
296 controller_acceptor_.async_accept(new_conn->socket(),
297 boost::bind(&FragmentScheduler::handle_AcceptController, this,
298 boost::asio::placeholders::error, new_conn));
299 }
300 }
301 else
302 {
303 // An error occurred. Log it and return. Since we are not starting a new
304 // accept operation the io_service will run out of work to do and the
305 // server will exit.
306 Exitflag = ControllerErrorFlag;
307 ELOG(0, e.message());
308 }
309}
310
311/** Controller callback function when job has been sent.
312 *
313 * We check here whether the worker socket is accepting, if there
314 * have been no jobs we re-activate it, as it is shut down after
315 * last job.
316 *
317 * \param e error code if something went wrong
318 * \param conn reference with the connection
319 */
320void FragmentScheduler::handle_ReceiveJobs(const boost::system::error_code& e, connection_ptr conn)
321{
322 Info info(__FUNCTION__);
323 bool initiateSocket = !JobsQueue.isJobPresent();
324
325 // jobs are received, hence place in JobsQueue
326 if (!jobs.empty()) {
327 LOG(1, "INFO: Pushing " << jobs.size() << " jobs into queue.");
328 JobsQueue.pushJobs(jobs);
329 // initiate socket if we had no jobs before
330 if (initiateSocket)
331 initiateWorkerSocket();
332 }
333
334 jobs.clear();
335
336}
337
338/** Controller callback function when checking on state of results.
339 *
340 * \param e error code if something went wrong
341 * \param conn reference with the connection
342 */
343void FragmentScheduler::handle_CheckResultState(const boost::system::error_code& e, connection_ptr conn)
344{
345 Info info(__FUNCTION__);
346 // do nothing
347 LOG(1, "INFO: Sent that " << doneJobs << " jobs are done.");
348}
349
350/** Controller callback function when result has been received.
351 *
352 * \param e error code if something went wrong
353 * \param conn reference with the connection
354 */
355void FragmentScheduler::handle_SendResults(const boost::system::error_code& e, connection_ptr conn)
356{
357 Info info(__FUNCTION__);
358 // do nothing
359 LOG(1, "INFO: Results have been sent.");
360}
361
Note: See TracBrowser for help on using the repository browser.