source: src/Fragmentation/Automation/FragmentScheduler.hpp@ 122de0

Action_Thermostats Add_AtomRandomPerturbation Add_FitFragmentPartialChargesAction Add_RotateAroundBondAction Add_SelectAtomByNameAction Added_ParseSaveFragmentResults AddingActions_SaveParseParticleParameters Adding_Graph_to_ChangeBondActions Adding_MD_integration_tests Adding_ParticleName_to_Atom Adding_StructOpt_integration_tests AtomFragments Automaking_mpqc_open AutomationFragmentation_failures Candidate_v1.5.4 Candidate_v1.6.0 Candidate_v1.6.1 ChangeBugEmailaddress ChangingTestPorts ChemicalSpaceEvaluator CombiningParticlePotentialParsing Combining_Subpackages Debian_Package_split Debian_package_split_molecuildergui_only Disabling_MemDebug Docu_Python_wait EmpiricalPotential_contain_HomologyGraph EmpiricalPotential_contain_HomologyGraph_documentation Enable_parallel_make_install Enhance_userguide Enhanced_StructuralOptimization Enhanced_StructuralOptimization_continued Example_ManyWaysToTranslateAtom Exclude_Hydrogens_annealWithBondGraph FitPartialCharges_GlobalError Fix_BoundInBox_CenterInBox_MoleculeActions Fix_ChargeSampling_PBC Fix_ChronosMutex Fix_FitPartialCharges Fix_FitPotential_needs_atomicnumbers Fix_ForceAnnealing Fix_IndependentFragmentGrids Fix_ParseParticles Fix_ParseParticles_split_forward_backward_Actions Fix_PopActions Fix_QtFragmentList_sorted_selection Fix_Restrictedkeyset_FragmentMolecule Fix_StatusMsg Fix_StepWorldTime_single_argument Fix_Verbose_Codepatterns Fix_fitting_potentials Fixes ForceAnnealing_goodresults ForceAnnealing_oldresults ForceAnnealing_tocheck ForceAnnealing_with_BondGraph ForceAnnealing_with_BondGraph_continued ForceAnnealing_with_BondGraph_continued_betteresults ForceAnnealing_with_BondGraph_contraction-expansion FragmentAction_writes_AtomFragments FragmentMolecule_checks_bonddegrees GeometryObjects Gui_Fixes Gui_displays_atomic_force_velocity ImplicitCharges IndependentFragmentGrids IndependentFragmentGrids_IndividualZeroInstances IndependentFragmentGrids_IntegrationTest IndependentFragmentGrids_Sole_NN_Calculation JobMarket_RobustOnKillsSegFaults JobMarket_StableWorkerPool JobMarket_unresolvable_hostname_fix MoreRobust_FragmentAutomation ODR_violation_mpqc_open PartialCharges_OrthogonalSummation PdbParser_setsAtomName PythonUI_with_named_parameters QtGui_reactivate_TimeChanged_changes Recreated_GuiChecks Rewrite_FitPartialCharges RotateToPrincipalAxisSystem_UndoRedo SaturateAtoms_findBestMatching SaturateAtoms_singleDegree StoppableMakroAction Subpackage_CodePatterns Subpackage_JobMarket Subpackage_LinearAlgebra Subpackage_levmar Subpackage_mpqc_open Subpackage_vmg Switchable_LogView ThirdParty_MPQC_rebuilt_buildsystem TrajectoryDependenant_MaxOrder TremoloParser_IncreasedPrecision TremoloParser_MultipleTimesteps TremoloParser_setsAtomName Ubuntu_1604_changes stable
Last change on this file since 122de0 was bff93d, checked in by Frederik Heber <heber@…>, 13 years ago

RemoveFromPoolOperation is now async.

  • changed PoolWorker such that we close down socket and listener via success callback handler of the operartion.
  • FragmentScheduler now sends true/false as status of removal.
  • Property mode set to 100644
File size: 7.3 KB
Line 
1/*
2 * FragmentScheduler.hpp
3 *
4 * Created on: Oct 19, 2011
5 * Author: heber
6 */
7
8#ifndef FRAGMENTSCHEDULER_HPP_
9#define FRAGMENTSCHEDULER_HPP_
10
11// include config.h
12#ifdef HAVE_CONFIG_H
13#include <config.h>
14#endif
15
16#include <boost/asio.hpp>
17#include <boost/function.hpp>
18#include <boost/shared_ptr.hpp>
19#include <deque>
20#include <vector>
21
22#include "CodePatterns/Observer/Observer.hpp"
23#include "Connection.hpp"
24#include "ControllerChoices.hpp"
25#include "Operations/AsyncOperation.hpp"
26#include "Operations/OperationQueue.hpp"
27#include "Operations/Servers/ShutdownWorkerOperation.hpp"
28#include "ControllerChoices.hpp"
29#include "FragmentQueue.hpp"
30#include "GlobalJobId.hpp"
31#include "Jobs/FragmentJob.hpp"
32#include "Listener.hpp"
33#include "Results/FragmentResult.hpp"
34#include "types.hpp"
35#include "Pool/WorkerPool.hpp"
36#include "WorkerAddress.hpp"
37#include "WorkerChoices.hpp"
38
39/** FragmentScheduler serves FragmentJobs to Workers and accepts commands from
40 * a Controller.
41 *
42 */
43class FragmentScheduler : public Observer
44{
45public:
46 /// Constructor opens the acceptor and starts waiting for the first incoming
47 /// Connection.
48 FragmentScheduler(boost::asio::io_service& io_service, unsigned short workerport, unsigned short controllerport);
49 ~FragmentScheduler();
50
51 void shutdown(int sig);
52private:
53 void sendJobToWorker(const WorkerAddress &address, FragmentJob::ptr &job);
54 void sendAvailableJobToNextIdleWorker();
55 bool shutdown();
56 void shutdownWorker(const WorkerAddress &address);
57 void removeAllWorkers();
58 void cleanupOperationQueue(AsyncOperation *op);
59
60 void update(Observable *publisher);
61 void recieveNotification(Observable *publisher, Notification_ptr notification);
62 void subjectKilled(Observable *publisher);
63
64 class WorkerListener_t : public Listener
65 {
66 public:
67 WorkerListener_t(
68 boost::asio::io_service& io_service,
69 unsigned short port,
70 FragmentQueue &_JobsQueue,
71 WorkerPool &_pool,
72 boost::function<void (const WorkerAddress&, FragmentJob::ptr&)> _callback) :
73 Listener(io_service, port),
74 address("127.0.0.1", "0"),
75 JobsQueue(_JobsQueue),
76 pool(_pool),
77 result( new FragmentResult(JobId::NoJob) ),
78 callback_sendJobToWorker(_callback),
79 choice(NoWorkerOperation)
80 {}
81 virtual ~WorkerListener_t() {}
82
83 protected:
84 /// Handle completion of a accept worker operation.
85 void handle_Accept(const boost::system::error_code& e, connection_ptr conn);
86
87 /// Handle completion of Worker operation to read choice
88 void handle_ReadChoice(const boost::system::error_code& e, connection_ptr conn);
89
90 /// Worker callback function when job has been sent.
91 void handle_ReadAddress(const boost::system::error_code& e, connection_ptr conn);
92
93 /// Worker callback function when new worker has enrolled.
94 void handle_enrolled(const boost::system::error_code& e, connection_ptr conn);
95
96 /// Worker callback function when worker has been informed of successful removal.
97 void handle_removed(const boost::system::error_code& e, connection_ptr conn);
98
99 /// Worker callback function when result has been received.
100 void handle_ReceiveResultFromWorker(const boost::system::error_code& e, connection_ptr conn);
101
102 /// Worker callback function when invalid result has been received.
103 void handle_RejectResultFromWorker(const boost::system::error_code& e, connection_ptr conn);
104 private:
105 //!> address of new Worker
106 WorkerAddress address;
107
108 //!> reference to Queue
109 FragmentQueue &JobsQueue;
110
111 //!> callback reference to container class
112 WorkerPool &pool;
113
114 //!> result that is received from the client.
115 FragmentResult::ptr result;
116
117 //!> callback function to access send job function
118 boost::function<void (const WorkerAddress&, FragmentJob::ptr&)> callback_sendJobToWorker;
119
120 //!> choice
121 enum WorkerChoices choice;
122 };
123
124 class ControllerListener_t : public Listener
125 {
126 public:
127 ControllerListener_t(
128 boost::asio::io_service& io_service,
129 unsigned short port,
130 FragmentQueue &_JobsQueue,
131 boost::function<void ()> _removeallWorkers,
132 boost::function<bool ()> _shutdownAllSockets) :
133 Listener(io_service, port),
134 JobsQueue(_JobsQueue),
135 jobInfo((size_t)2, 0),
136 choice(NoControllerOperation),
137 removeallWorkers(_removeallWorkers),
138 shutdownAllSockets(_shutdownAllSockets),
139 globalId(0)
140 {}
141 virtual ~ControllerListener_t() {}
142
143 protected:
144 /// Handle completion of a accept controller operation.
145 void handle_Accept(const boost::system::error_code& e, connection_ptr conn);
146
147 /// Handle completion of controller operation to read choice
148 void handle_ReadChoice(const boost::system::error_code& e, connection_ptr conn);
149
150 /// Controller callback function when job has been sent.
151 void handle_ReceiveJobs(const boost::system::error_code& e, connection_ptr conn);
152
153 /// Controller callback function when checking on state of results.
154 void handle_CheckResultState(const boost::system::error_code& e, connection_ptr conn);
155
156 /// Controller callback function when checking on state of results.
157 void handle_GetNextJobIdState(const boost::system::error_code& e, connection_ptr conn);
158
159 /// Controller callback function when free job ids have been sent.
160 void handle_SendIds(const boost::system::error_code& e, connection_ptr conn);
161
162 /// Controller callback function when result has been received.
163 void handle_SendResults(const boost::system::error_code& e, connection_ptr conn);
164
165 private:
166 //!> reference to external FragmentQueue containing jobs to work on
167 FragmentQueue & JobsQueue;
168
169 //!> bunch of jobs received from controller before placed in JobsQueue
170 std::vector<FragmentJob::ptr> jobs;
171
172 //!> number of jobs that are waiting to be and are calculated, required for returning status
173 std::vector<size_t> jobInfo;
174
175 //!> number of job ids request from controller;
176 size_t NumberIds;
177
178 //!> choice
179 enum ControllerChoices choice;
180
181 //!> bound function to remove all workers
182 boost::function<void ()> removeallWorkers;
183
184 //!> bound function to shutdown all sockets
185 boost::function<bool ()> shutdownAllSockets;
186
187 // TODO: replace this instance by a IdPool.
188 //!> global id to give next available job id
189 GlobalJobId globalId;
190 };
191
192private:
193 //!> static entity to indicate to clients that the queue is empty.
194 static FragmentJob::ptr NoJob;
195
196 //!> reference to the io_service which we use for connections
197 boost::asio::io_service& io_service;
198
199 //!> Queue with data to be sent to each client.
200 FragmentQueue JobsQueue;
201
202 //!> Pool of Workers
203 WorkerPool pool;
204
205 //!> Listener instance that waits for a worker
206 WorkerListener_t WorkerListener;
207
208 //!> Listener instance that waits for a controller
209 ControllerListener_t ControllerListener;
210
211public:
212 /** Getter for Exitflag.
213 *
214 * @return Exitflag of operations
215 */
216 size_t getExitflag() const
217 {
218 if (WorkerListener.getExitflag() != 0)
219 return WorkerListener.getExitflag();
220 if (ControllerListener.getExitflag() != 0)
221 return ControllerListener.getExitflag();
222 return 0;
223 }
224
225private:
226 //!> Connection for sending jobs to workers
227 Connection connection;
228
229 //!> internal queue for all asynchronous operations
230 OperationQueue OpQueue;
231};
232
233#endif /* FRAGMENTSCHEDULER_HPP_ */
Note: See TracBrowser for help on using the repository browser.