source: src/Fragmentation/Automation/FragmentScheduler.hpp@ a8f54b6

Action_Thermostats Add_AtomRandomPerturbation Add_FitFragmentPartialChargesAction Add_RotateAroundBondAction Add_SelectAtomByNameAction Added_ParseSaveFragmentResults AddingActions_SaveParseParticleParameters Adding_Graph_to_ChangeBondActions Adding_MD_integration_tests Adding_ParticleName_to_Atom Adding_StructOpt_integration_tests AtomFragments Automaking_mpqc_open AutomationFragmentation_failures Candidate_v1.5.4 Candidate_v1.6.0 Candidate_v1.6.1 ChangeBugEmailaddress ChangingTestPorts ChemicalSpaceEvaluator CombiningParticlePotentialParsing Combining_Subpackages Debian_Package_split Debian_package_split_molecuildergui_only Disabling_MemDebug Docu_Python_wait EmpiricalPotential_contain_HomologyGraph EmpiricalPotential_contain_HomologyGraph_documentation Enable_parallel_make_install Enhance_userguide Enhanced_StructuralOptimization Enhanced_StructuralOptimization_continued Example_ManyWaysToTranslateAtom Exclude_Hydrogens_annealWithBondGraph FitPartialCharges_GlobalError Fix_BoundInBox_CenterInBox_MoleculeActions Fix_ChargeSampling_PBC Fix_ChronosMutex Fix_FitPartialCharges Fix_FitPotential_needs_atomicnumbers Fix_ForceAnnealing Fix_IndependentFragmentGrids Fix_ParseParticles Fix_ParseParticles_split_forward_backward_Actions Fix_PopActions Fix_QtFragmentList_sorted_selection Fix_Restrictedkeyset_FragmentMolecule Fix_StatusMsg Fix_StepWorldTime_single_argument Fix_Verbose_Codepatterns Fix_fitting_potentials Fixes ForceAnnealing_goodresults ForceAnnealing_oldresults ForceAnnealing_tocheck ForceAnnealing_with_BondGraph ForceAnnealing_with_BondGraph_continued ForceAnnealing_with_BondGraph_continued_betteresults ForceAnnealing_with_BondGraph_contraction-expansion FragmentAction_writes_AtomFragments FragmentMolecule_checks_bonddegrees GeometryObjects Gui_Fixes Gui_displays_atomic_force_velocity ImplicitCharges IndependentFragmentGrids IndependentFragmentGrids_IndividualZeroInstances IndependentFragmentGrids_IntegrationTest IndependentFragmentGrids_Sole_NN_Calculation JobMarket_RobustOnKillsSegFaults JobMarket_StableWorkerPool JobMarket_unresolvable_hostname_fix MoreRobust_FragmentAutomation ODR_violation_mpqc_open PartialCharges_OrthogonalSummation PdbParser_setsAtomName PythonUI_with_named_parameters QtGui_reactivate_TimeChanged_changes Recreated_GuiChecks Rewrite_FitPartialCharges RotateToPrincipalAxisSystem_UndoRedo SaturateAtoms_findBestMatching SaturateAtoms_singleDegree StoppableMakroAction Subpackage_CodePatterns Subpackage_JobMarket Subpackage_LinearAlgebra Subpackage_levmar Subpackage_mpqc_open Subpackage_vmg Switchable_LogView ThirdParty_MPQC_rebuilt_buildsystem TrajectoryDependenant_MaxOrder TremoloParser_IncreasedPrecision TremoloParser_MultipleTimesteps TremoloParser_setsAtomName Ubuntu_1604_changes stable
Last change on this file since a8f54b6 was 38032a, checked in by Frederik Heber <heber@…>, 13 years ago

Renamed SchedulerStates -> ControllerChoices enum.

  • extracted from types.hpp as well.
  • Property mode set to 100644
File size: 5.7 KB
Line 
1/*
2 * FragmentScheduler.hpp
3 *
4 * Created on: Oct 19, 2011
5 * Author: heber
6 */
7
8#ifndef FRAGMENTSCHEDULER_HPP_
9#define FRAGMENTSCHEDULER_HPP_
10
11// include config.h
12#ifdef HAVE_CONFIG_H
13#include <config.h>
14#endif
15
16#include <vector>
17#include <boost/asio.hpp>
18#include <boost/function.hpp>
19
20#include "Connection.hpp"
21#include "ControllerChoices.hpp"
22#include "Controller/Commands/SendJobToWorkerOperation.hpp"
23#include "FragmentQueue.hpp"
24#include "GlobalJobId.hpp"
25#include "Jobs/FragmentJob.hpp"
26#include "Listener.hpp"
27#include "Results/FragmentResult.hpp"
28#include "types.hpp"
29#include "Pool/WorkerPool.hpp"
30#include "WorkerAddress.hpp"
31
32/** FragmentScheduler serves FragmentJobs to Workers and accepts commands from
33 * a Controller.
34 *
35 */
36class FragmentScheduler
37{
38public:
39 /// Constructor opens the acceptor and starts waiting for the first incoming
40 /// Connection.
41 FragmentScheduler(boost::asio::io_service& io_service, unsigned short workerport, unsigned short controllerport);
42
43private:
44 void sendJobToWorker(const WorkerAddress &address, FragmentJob::ptr &job);
45// void shutdownWorker(const WorkerAddress &address);
46// void removeAllWorkers();
47
48 class WorkerListener_t : public Listener
49 {
50 public:
51 WorkerListener_t(
52 boost::asio::io_service& io_service,
53 unsigned short port,
54 FragmentQueue &_JobsQueue,
55 WorkerPool &_pool,
56 boost::function<void (const WorkerAddress&, FragmentJob::ptr&)> _callback) :
57 Listener(io_service, port),
58 address("127.0.0.1", "0"),
59 JobsQueue(_JobsQueue),
60 pool(_pool),
61 result( new FragmentResult(JobId::NoJob) ),
62 callback_sendJobToWorker(_callback)
63 {}
64 virtual ~WorkerListener_t() {}
65
66 protected:
67 /// Handle completion of a accept worker operation.
68 void handle_Accept(const boost::system::error_code& e, connection_ptr conn);
69
70 /// Worker callback function when job has been sent.
71 void handle_checkAddress(const boost::system::error_code& e, connection_ptr conn);
72
73 /// Worker callback function when new worker has enrolled.
74 void handle_enrolled(const boost::system::error_code& e, connection_ptr conn);
75
76 /// Worker callback function when result has been received.
77 void handle_ReceiveResultFromWorker(const boost::system::error_code& e, connection_ptr conn);
78 private:
79 //!> address of new Worker
80 WorkerAddress address;
81
82 //!> reference to Queue
83 FragmentQueue &JobsQueue;
84
85 //!> callback reference to container class
86 WorkerPool &pool;
87
88 /// result that is received from the client.
89 FragmentResult::ptr result;
90
91 //!> callback function to access send job function
92 boost::function<void (const WorkerAddress&, FragmentJob::ptr&)> callback_sendJobToWorker;
93
94 // static entity to indicate to clients that the queue is empty.
95 static FragmentJob::ptr NoJob;
96 };
97
98 class ControllerListener_t : public Listener
99 {
100 public:
101 ControllerListener_t(
102 boost::asio::io_service& io_service,
103 unsigned short port,
104 FragmentQueue &_JobsQueue,
105 boost::function<void ()> _initiateWorkerSocket) :
106 Listener(io_service, port),
107 JobsQueue(_JobsQueue),
108 jobInfo((size_t)2, 0),
109 choice(NoControllerOperation),
110 globalId(0),
111 initiateWorkerSocket(_initiateWorkerSocket)
112 {}
113 virtual ~ControllerListener_t() {}
114
115 protected:
116 /// Handle completion of a accept controller operation.
117 void handle_Accept(const boost::system::error_code& e, connection_ptr conn);
118
119 /// Handle completion of controller operation to read choice
120 void handle_ReadChoice(const boost::system::error_code& e, connection_ptr conn);
121
122 /// Controller callback function when job has been sent.
123 void handle_ReceiveJobs(const boost::system::error_code& e, connection_ptr conn);
124
125 /// Controller callback function when checking on state of results.
126 void handle_CheckResultState(const boost::system::error_code& e, connection_ptr conn);
127
128 /// Controller callback function when checking on state of results.
129 void handle_GetNextJobIdState(const boost::system::error_code& e, connection_ptr conn);
130
131 /// Controller callback function when result has been received.
132 void handle_SendResults(const boost::system::error_code& e, connection_ptr conn);
133
134 private:
135 //!> reference to external FragmentQueue containing jobs to work on
136 FragmentQueue & JobsQueue;
137
138 /// bunch of jobs received from controller before placed in JobsQueue
139 std::vector<FragmentJob::ptr> jobs;
140
141 /// number of jobs that are waiting to be and are calculated, required for returning status
142 std::vector<size_t> jobInfo;
143
144 // choice
145 enum ControllerChoices choice;
146
147 // TODO: replace this instance by a IdPool.
148 //!> global id to give next available job id
149 GlobalJobId globalId;
150
151 //!> callback function to tell that worker socket should be enabled
152 boost::function<void ()> initiateWorkerSocket;
153 };
154
155private:
156 /// Queue with data to be sent to each client.
157 FragmentQueue JobsQueue;
158
159 //!> Pool of Workers
160 WorkerPool pool;
161
162 //!> Listener instance that waits for a worker
163 WorkerListener_t WorkerListener;
164
165 //!> Listener instance that waits for a controller
166 ControllerListener_t ControllerListener;
167
168public:
169 /** Getter for Exitflag.
170 *
171 * @return Exitflag of operations
172 */
173 size_t getExitflag() const
174 {
175 if (WorkerListener.getExitflag() != 0)
176 return WorkerListener.getExitflag();
177 if (ControllerListener.getExitflag() != 0)
178 return ControllerListener.getExitflag();
179 return 0;
180 }
181
182private:
183 //!> Connection for sending jobs to workers
184 Connection connection;
185
186 //!> internal operation to send jobs to workers
187 mutable SendJobToWorkerOperation sendJobOp;
188};
189
190#endif /* FRAGMENTSCHEDULER_HPP_ */
Note: See TracBrowser for help on using the repository browser.