source: src/Fragmentation/Automation/Pool/PoolWorker.cpp@ 30438f

Action_Thermostats Add_AtomRandomPerturbation Add_FitFragmentPartialChargesAction Add_RotateAroundBondAction Add_SelectAtomByNameAction Added_ParseSaveFragmentResults AddingActions_SaveParseParticleParameters Adding_Graph_to_ChangeBondActions Adding_MD_integration_tests Adding_ParticleName_to_Atom Adding_StructOpt_integration_tests AtomFragments Automaking_mpqc_open AutomationFragmentation_failures Candidate_v1.5.4 Candidate_v1.6.0 Candidate_v1.6.1 ChangeBugEmailaddress ChangingTestPorts ChemicalSpaceEvaluator CombiningParticlePotentialParsing Combining_Subpackages Debian_Package_split Debian_package_split_molecuildergui_only Disabling_MemDebug Docu_Python_wait EmpiricalPotential_contain_HomologyGraph EmpiricalPotential_contain_HomologyGraph_documentation Enable_parallel_make_install Enhance_userguide Enhanced_StructuralOptimization Enhanced_StructuralOptimization_continued Example_ManyWaysToTranslateAtom Exclude_Hydrogens_annealWithBondGraph FitPartialCharges_GlobalError Fix_BoundInBox_CenterInBox_MoleculeActions Fix_ChargeSampling_PBC Fix_ChronosMutex Fix_FitPartialCharges Fix_FitPotential_needs_atomicnumbers Fix_ForceAnnealing Fix_IndependentFragmentGrids Fix_ParseParticles Fix_ParseParticles_split_forward_backward_Actions Fix_PopActions Fix_QtFragmentList_sorted_selection Fix_Restrictedkeyset_FragmentMolecule Fix_StatusMsg Fix_StepWorldTime_single_argument Fix_Verbose_Codepatterns Fix_fitting_potentials Fixes ForceAnnealing_goodresults ForceAnnealing_oldresults ForceAnnealing_tocheck ForceAnnealing_with_BondGraph ForceAnnealing_with_BondGraph_continued ForceAnnealing_with_BondGraph_continued_betteresults ForceAnnealing_with_BondGraph_contraction-expansion FragmentAction_writes_AtomFragments FragmentMolecule_checks_bonddegrees GeometryObjects Gui_Fixes Gui_displays_atomic_force_velocity ImplicitCharges IndependentFragmentGrids IndependentFragmentGrids_IndividualZeroInstances IndependentFragmentGrids_IntegrationTest IndependentFragmentGrids_Sole_NN_Calculation JobMarket_RobustOnKillsSegFaults JobMarket_StableWorkerPool JobMarket_unresolvable_hostname_fix MoreRobust_FragmentAutomation ODR_violation_mpqc_open PartialCharges_OrthogonalSummation PdbParser_setsAtomName PythonUI_with_named_parameters QtGui_reactivate_TimeChanged_changes Recreated_GuiChecks Rewrite_FitPartialCharges RotateToPrincipalAxisSystem_UndoRedo SaturateAtoms_findBestMatching SaturateAtoms_singleDegree StoppableMakroAction Subpackage_CodePatterns Subpackage_JobMarket Subpackage_LinearAlgebra Subpackage_levmar Subpackage_mpqc_open Subpackage_vmg Switchable_LogView ThirdParty_MPQC_rebuilt_buildsystem TrajectoryDependenant_MaxOrder TremoloParser_IncreasedPrecision TremoloParser_MultipleTimesteps TremoloParser_setsAtomName Ubuntu_1604_changes stable
Last change on this file since 30438f was 30438f, checked in by Frederik Heber <heber@…>, 12 years ago

OperationQueue can be blocked to prevent further operations to be pushed.

  • this is used in poolworker after shutdown. E.g. when working on a job and we receive shutdown, then sendresultop might be pushed after remove op which prevents shutdown as io_service is not done.
  • added unit test function to OperationQueueUnitTest.
  • Property mode set to 100644
File size: 6.0 KB
Line 
1/*
2 * Project: MoleCuilder
3 * Description: creates and alters molecular systems
4 * Copyright (C) 2012 University of Bonn. All rights reserved.
5 * Please see the LICENSE file or "Copyright notice" in builder.cpp for details.
6 */
7
8/*
9 * \file PoolWorker.cpp
10 *
11 * This file strongly follows the Serialization example from the boost::asio
12 * library (see client.cpp).
13 *
14 * Created on: Feb 28, 2012
15 * Author: heber
16 */
17
18// include config.h
19#ifdef HAVE_CONFIG_H
20#include <config.h>
21#endif
22
23// boost asio needs specific operator new
24#include <boost/asio.hpp>
25
26#include "CodePatterns/MemDebug.hpp"
27
28#include <boost/bind.hpp>
29#include <boost/lexical_cast.hpp>
30#include <iostream>
31#include <vector>
32#include "Connection.hpp" // Must come before boost/serialization headers.
33#include <boost/serialization/vector.hpp>
34#include "CodePatterns/Info.hpp"
35#include "CodePatterns/Log.hpp"
36#include "Jobs/FragmentJob.hpp"
37#include "Jobs/MPQCCommandJob.hpp"
38#include "Jobs/SystemCommandJob.hpp"
39#include "Operations/Workers/EnrollInPoolOperation.hpp"
40#include "Operations/Workers/RemoveFromPoolOperation.hpp"
41#include "Operations/Workers/SubmitResultOperation.hpp"
42#include "Results/FragmentResult.hpp"
43#include "PoolWorker.hpp"
44
45/** Helper function to enforce binding of PoolWorker to possible derived
46 * FragmentJob classes.
47 */
48void dummyInit() {
49 SystemCommandJob("/bin/false", "something", JobId::IllegalJob);
50 MPQCCommandJob("nofile", JobId::IllegalJob);
51}
52
53/** Constructor for class PoolWorker.
54 *
55 * We automatically connect to the given pool and enroll.
56 *
57 * @param _io_service io service for creating connections
58 * @param _host host part of MyAddress of pool to connect to
59 * @param _service service part of MyAddress of pool to connect to
60 * @param listenhost host part of MyAddress of this instance for listening to pool connections
61 * @param listenservice seervice part of MyAddress of this instance for listening to pool connections
62 */
63PoolWorker::PoolWorker(
64 boost::asio::io_service& _io_service,
65 const std::string& _host,
66 const std::string& _service,
67 const std::string& listenhost,
68 const std::string& listenservice) :
69 io_service(_io_service),
70 PoolListener(_io_service, boost::lexical_cast<unsigned short>(listenservice), *this),
71 MyAddress(listenhost, listenservice),
72 ServerAddress(_host, _service),
73 connection_(_io_service),
74 failed(boost::bind(&ExitflagContainer::setExitflag, this, ExitflagContainer::ErrorFlag))
75{
76 Info info(__FUNCTION__);
77
78 // always enroll and make listenining initiation depend on its success
79 const boost::function<void ()> initiateme =
80 boost::bind(&PoolListener_t::initiateSocket, boost::ref(PoolListener));
81 AsyncOperation *enrollOp = new EnrollInPoolOperation(connection_, MyAddress, initiateme, failed);
82 LOG(2, "DEBUG: Putting enroll in pool operation into queue ...");
83 OpQueue.push_back(enrollOp, ServerAddress);
84}
85
86/// Handle completion of a accept server operation.
87void PoolWorker::PoolListener_t::handle_Accept(const boost::system::error_code& e, connection_ptr conn)
88{
89 Info info(__FUNCTION__);
90 if (!e)
91 {
92 conn->async_read(job,
93 boost::bind(&PoolWorker::PoolListener_t::handle_ReceiveJob, this,
94 boost::asio::placeholders::error, conn));
95 // and listen for following connections
96 initiateSocket();
97 }
98 else
99 {
100 // An error occurred. Log it and return. Since we are not starting a new
101 // accept operation the io_service will run out of work to do and the
102 // server will exit.
103 Exitflag = ErrorFlag;
104 ELOG(0, e.message());
105 }
106}
107
108/// Controller callback function when job has been sent.
109void PoolWorker::PoolListener_t::handle_ReceiveJob(const boost::system::error_code& e, connection_ptr conn)
110{
111 Info info(__FUNCTION__);
112
113 if (!e)
114 {
115 if (job->getId() != JobId::NoJob) {
116 LOG(1, "INFO: Working on job " << job->getId() << ".");
117 callback.WorkOnJob(job);
118 } else {
119 LOG(1, "INFO: Received NoJob.");
120 callback.shutdown();
121 }
122 }
123 else
124 {
125 // An error occurred. Log it and return. Since we are not starting a new
126 // accept operation the io_service will run out of work to do and the
127 // server will exit.
128 Exitflag = ErrorFlag;
129 ELOG(0, e.message());
130 }
131}
132
133/** Works on the given job and send the results.
134 *
135 * @param job job to work on
136 */
137void PoolWorker::WorkOnJob(FragmentJob::ptr &job)
138{
139 // work on job and create result
140 LOG(2, "DEBUG: Beginning to work on " << job->getId() << ".");
141 FragmentResult::ptr result = job->Work();
142 LOG(2, "DEBUG: Setting result " << result->getId() << ".");
143
144 AsyncOperation *submitOp = new SubmitResultOperation(connection_, MyAddress, AsyncOperation::NoOpCallback, failed);
145 static_cast<SubmitResultOperation *>(submitOp)->setResult(result);
146
147 // submit result
148 LOG(2, "DEBUG: Putting send result operation into queue ...");
149 OpQueue.push_back(submitOp, ServerAddress);
150}
151
152/** Wrapper function to allow use as signal handler.
153 *
154 * We remove us from server's pool and then just call \sa PoolWorker::shutdown().
155 *
156 * @param sig signal received
157 */
158void PoolWorker::shutdown(int sig)
159{
160 LOG(1, "INFO: Shutting down due to signal "+toString(sig)+".");
161
162 shutdown();
163}
164
165/** Helper function to shutdown the worker properly.
166 *
167 * Note that we will use RemoveFromPoolOperation to unlist from server's pool.
168 */
169void PoolWorker::shutdown()
170{
171 // remove us from pool
172 boost::function<void ()> closingdown = boost::bind(&PoolWorker::finish, this);
173 AsyncOperation *removeOp = new RemoveFromPoolOperation(connection_, MyAddress, closingdown, failed);
174 LOG(2, "DEBUG: Putting remove from pool operation into queue ...");
175 OpQueue.push_back(removeOp, ServerAddress);
176 // block queue such that io_service may stop
177 OpQueue.block();
178}
179
180/** Helper function to close down listener and stop service.
181 *
182 * This is called after we have been removed from server's pool
183 * We stop the io_service via its callback handler in case of success.
184 */
185void PoolWorker::finish()
186{
187 // somehow stop listener
188 PoolListener.closeSocket();
189
190 // finally, stop io_service
191 io_service.stop();
192}
193
Note: See TracBrowser for help on using the repository browser.