- Timestamp:
- Jun 27, 2012, 4:07:21 PM (13 years ago)
- Branches:
- Action_Thermostats, Add_AtomRandomPerturbation, Add_FitFragmentPartialChargesAction, Add_RotateAroundBondAction, Add_SelectAtomByNameAction, Added_ParseSaveFragmentResults, AddingActions_SaveParseParticleParameters, Adding_Graph_to_ChangeBondActions, Adding_MD_integration_tests, Adding_ParticleName_to_Atom, Adding_StructOpt_integration_tests, AtomFragments, Automaking_mpqc_open, AutomationFragmentation_failures, Candidate_v1.5.4, Candidate_v1.6.0, Candidate_v1.6.1, ChangeBugEmailaddress, ChangingTestPorts, ChemicalSpaceEvaluator, CombiningParticlePotentialParsing, Combining_Subpackages, Debian_Package_split, Debian_package_split_molecuildergui_only, Disabling_MemDebug, Docu_Python_wait, EmpiricalPotential_contain_HomologyGraph, EmpiricalPotential_contain_HomologyGraph_documentation, Enable_parallel_make_install, Enhance_userguide, Enhanced_StructuralOptimization, Enhanced_StructuralOptimization_continued, Example_ManyWaysToTranslateAtom, Exclude_Hydrogens_annealWithBondGraph, FitPartialCharges_GlobalError, Fix_BoundInBox_CenterInBox_MoleculeActions, Fix_ChargeSampling_PBC, Fix_ChronosMutex, Fix_FitPartialCharges, Fix_FitPotential_needs_atomicnumbers, Fix_ForceAnnealing, Fix_IndependentFragmentGrids, Fix_ParseParticles, Fix_ParseParticles_split_forward_backward_Actions, Fix_PopActions, Fix_QtFragmentList_sorted_selection, Fix_Restrictedkeyset_FragmentMolecule, Fix_StatusMsg, Fix_StepWorldTime_single_argument, Fix_Verbose_Codepatterns, Fix_fitting_potentials, Fixes, ForceAnnealing_goodresults, ForceAnnealing_oldresults, ForceAnnealing_tocheck, ForceAnnealing_with_BondGraph, ForceAnnealing_with_BondGraph_continued, ForceAnnealing_with_BondGraph_continued_betteresults, ForceAnnealing_with_BondGraph_contraction-expansion, FragmentAction_writes_AtomFragments, FragmentMolecule_checks_bonddegrees, GeometryObjects, Gui_Fixes, Gui_displays_atomic_force_velocity, ImplicitCharges, IndependentFragmentGrids, IndependentFragmentGrids_IndividualZeroInstances, IndependentFragmentGrids_IntegrationTest, IndependentFragmentGrids_Sole_NN_Calculation, JobMarket_RobustOnKillsSegFaults, JobMarket_StableWorkerPool, JobMarket_unresolvable_hostname_fix, MoreRobust_FragmentAutomation, ODR_violation_mpqc_open, PartialCharges_OrthogonalSummation, PdbParser_setsAtomName, PythonUI_with_named_parameters, QtGui_reactivate_TimeChanged_changes, Recreated_GuiChecks, Rewrite_FitPartialCharges, RotateToPrincipalAxisSystem_UndoRedo, SaturateAtoms_findBestMatching, SaturateAtoms_singleDegree, StoppableMakroAction, Subpackage_CodePatterns, Subpackage_JobMarket, Subpackage_LinearAlgebra, Subpackage_levmar, Subpackage_mpqc_open, Subpackage_vmg, Switchable_LogView, ThirdParty_MPQC_rebuilt_buildsystem, TrajectoryDependenant_MaxOrder, TremoloParser_IncreasedPrecision, TremoloParser_MultipleTimesteps, TremoloParser_setsAtomName, Ubuntu_1604_changes, stable
- Children:
- 95454a
- Parents:
- 95b384
- git-author:
- Frederik Heber <heber@…> (02/22/12 17:50:06)
- git-committer:
- Frederik Heber <heber@…> (06/27/12 16:07:21)
- Location:
- src/Fragmentation/Automation
- Files:
-
- 2 added
- 3 edited
Legend:
- Unmodified
- Added
- Removed
-
src/Fragmentation/Automation/FragmentScheduler.cpp
r95b384 r8036b7 40 40 #include "FragmentScheduler.hpp" 41 41 42 FragmentJob::ptr FragmentScheduler:: NoJob(new SystemCommandJob(std::string("/bin/true"), std::string("dosomething"), JobId::NoJob));42 FragmentJob::ptr FragmentScheduler::WorkerListener_t::NoJob(new SystemCommandJob(std::string("/bin/true"), std::string("dosomething"), JobId::NoJob)); 43 43 44 44 /** Helper function to enforce binding of FragmentWorker to possible derived … … 59 59 */ 60 60 FragmentScheduler::FragmentScheduler(boost::asio::io_service& io_service, unsigned short workerport, unsigned short controllerport) : 61 worker_acceptor_(io_service, 62 boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), workerport) 63 ), 64 controller_acceptor_(io_service, 65 boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), controllerport) 66 ), 67 result( new FragmentResult(JobId::NoJob) ), 68 jobInfo((size_t)2, 0), 69 choice(NoOperation), 70 globalId(0), 71 Exitflag(OkFlag) 61 WorkerListener(io_service, workerport, JobsQueue), 62 ControllerListener(io_service, controllerport, JobsQueue, 63 boost::bind(&Listener::initiateSocket, boost::ref(WorkerListener))) 72 64 { 73 65 Info info(__FUNCTION__); … … 75 67 // only initiate socket if jobs are already present 76 68 if (JobsQueue.isJobPresent()) { 77 LOG(1, "Listening for workers on port " << workerport << "."); 78 initiateWorkerSocket(); 79 } 80 81 initiateControllerSocket(); 82 LOG(1, "Listening for controller on port " << controllerport << "."); 83 } 84 85 /** Internal function to start worker connection. 86 * 87 */ 88 void FragmentScheduler::initiateWorkerSocket() 89 { 90 // Start an accept operation for worker connections. 91 connection_ptr new_conn(new Connection(worker_acceptor_.get_io_service())); 92 worker_acceptor_.async_accept(new_conn->socket(), 93 boost::bind(&FragmentScheduler::handle_AcceptWorker, this, 94 boost::asio::placeholders::error, new_conn)); 95 } 96 97 /** Internal function to start controller connection. 98 * 99 */ 100 void FragmentScheduler::initiateControllerSocket() 101 { 102 // Start an accept operation for controller connection. 103 connection_ptr new_conn(new Connection(controller_acceptor_.get_io_service())); 104 controller_acceptor_.async_accept(new_conn->socket(), 105 boost::bind(&FragmentScheduler::handle_AcceptController, this, 106 boost::asio::placeholders::error, new_conn)); 107 } 108 69 WorkerListener.initiateSocket(); 70 } 71 72 ControllerListener.initiateSocket(); 73 } 109 74 110 75 /** Handle a new worker connection. … … 117 82 * \param conn reference with the connection 118 83 */ 119 void FragmentScheduler:: handle_AcceptWorker(const boost::system::error_code& e, connection_ptr conn)84 void FragmentScheduler::WorkerListener_t::handle_Accept(const boost::system::error_code& e, connection_ptr conn) 120 85 { 121 86 Info info(__FUNCTION__); … … 131 96 LOG(1, "INFO: Sending job #" << job->getId() << "."); 132 97 conn->async_write(job, 133 boost::bind(&FragmentScheduler:: handle_SendJobtoWorker, this,98 boost::bind(&FragmentScheduler::WorkerListener_t::handle_SendJobtoWorker, this, 134 99 boost::asio::placeholders::error, conn)); 135 100 … … 137 102 // send the static NoJob 138 103 conn->async_write(NoJob, 139 boost::bind(&FragmentScheduler:: handle_SendJobtoWorker, this,104 boost::bind(&FragmentScheduler::WorkerListener_t::handle_SendJobtoWorker, this, 140 105 boost::asio::placeholders::error, conn)); 141 106 … … 150 115 // accept operation the io_service will run out of work to do and the 151 116 // server will exit. 152 Exitflag = WorkerErrorFlag;117 Exitflag = ErrorFlag; 153 118 ELOG(0, e.message()); 154 119 } … … 157 122 // Start an accept operation for a new Connection only when there 158 123 // are still jobs present 159 initiate WorkerSocket();124 initiateSocket(); 160 125 } 161 126 } … … 170 135 * \param conn reference with the connection 171 136 */ 172 void FragmentScheduler:: handle_SendJobtoWorker(const boost::system::error_code& e, connection_ptr conn)137 void FragmentScheduler::WorkerListener_t::handle_SendJobtoWorker(const boost::system::error_code& e, connection_ptr conn) 173 138 { 174 139 Info info(__FUNCTION__); … … 177 142 LOG(1, "INFO: Receiving result for a job ..."); 178 143 conn->async_read(result, 179 boost::bind(&FragmentScheduler:: handle_ReceiveResultFromWorker, this,144 boost::bind(&FragmentScheduler::WorkerListener_t::handle_ReceiveResultFromWorker, this, 180 145 boost::asio::placeholders::error, conn)); 181 146 } … … 186 151 * \param conn reference with the connection 187 152 */ 188 void FragmentScheduler:: handle_ReceiveResultFromWorker(const boost::system::error_code& e, connection_ptr conn)153 void FragmentScheduler::WorkerListener_t::handle_ReceiveResultFromWorker(const boost::system::error_code& e, connection_ptr conn) 189 154 { 190 155 Info info(__FUNCTION__); … … 212 177 * \param conn reference with the connection 213 178 */ 214 void FragmentScheduler:: handle_AcceptController(const boost::system::error_code& e, connection_ptr conn)179 void FragmentScheduler::ControllerListener_t::handle_Accept(const boost::system::error_code& e, connection_ptr conn) 215 180 { 216 181 Info info(__FUNCTION__); … … 218 183 { 219 184 conn->async_read(choice, 220 boost::bind(&FragmentScheduler:: handle_ReadChoice, this,185 boost::bind(&FragmentScheduler::ControllerListener_t::handle_ReadChoice, this, 221 186 boost::asio::placeholders::error, conn)); 222 187 } … … 226 191 // accept operation the io_service will run out of work to do and the 227 192 // server will exit. 228 Exitflag = ControllerErrorFlag;193 Exitflag = ErrorFlag; 229 194 ELOG(0, e.message()); 230 195 } … … 236 201 * \param conn reference with the connection 237 202 */ 238 void FragmentScheduler:: handle_ReadChoice(const boost::system::error_code& e, connection_ptr conn)203 void FragmentScheduler::ControllerListener_t::handle_ReadChoice(const boost::system::error_code& e, connection_ptr conn) 239 204 { 240 205 Info info(__FUNCTION__); … … 255 220 LOG(1, "INFO: Sending next available job id " << nextid << " to controller ..."); 256 221 conn->async_write(nextid, 257 boost::bind(&FragmentScheduler:: handle_GetNextJobIdState, this,222 boost::bind(&FragmentScheduler::ControllerListener_t::handle_GetNextJobIdState, this, 258 223 boost::asio::placeholders::error, conn)); 259 224 break; … … 265 230 LOG(1, "INFO: Receiving bunch of jobs from a controller ..."); 266 231 conn->async_read(jobs, 267 boost::bind(&FragmentScheduler:: handle_ReceiveJobs, this,232 boost::bind(&FragmentScheduler::ControllerListener_t::handle_ReceiveJobs, this, 268 233 boost::asio::placeholders::error, conn)); 269 234 break; … … 277 242 LOG(1, "INFO: Sending state that "+toString(jobInfo[0])+" jobs are present and "+toString(jobInfo[1])+" jobs are done to controller ..."); 278 243 conn->async_write(jobInfo, 279 boost::bind(&FragmentScheduler:: handle_CheckResultState, this,244 boost::bind(&FragmentScheduler::ControllerListener_t::handle_CheckResultState, this, 280 245 boost::asio::placeholders::error, conn)); 281 246 break; … … 287 252 LOG(1, "INFO: Sending "+toString(results.size())+" results to controller ..."); 288 253 conn->async_write(results, 289 boost::bind(&FragmentScheduler:: handle_SendResults, this,254 boost::bind(&FragmentScheduler::ControllerListener_t::handle_SendResults, this, 290 255 boost::asio::placeholders::error, conn)); 291 256 break; … … 297 262 } 298 263 default: 299 Exitflag = ControllerErrorFlag;264 Exitflag = ErrorFlag; 300 265 ELOG(1, "FragmentScheduler::handle_ReadChoice() - called with no valid choice."); 301 266 break; … … 307 272 LOG(1, "Launching new acceptor on socket."); 308 273 // Start an accept operation for a new Connection. 309 initiate ControllerSocket();274 initiateSocket(); 310 275 } 311 276 } … … 315 280 // accept operation the io_service will run out of work to do and the 316 281 // server will exit. 317 Exitflag = ControllerErrorFlag;282 Exitflag = ErrorFlag; 318 283 ELOG(0, e.message()); 319 284 } … … 329 294 * \param conn reference with the connection 330 295 */ 331 void FragmentScheduler:: handle_ReceiveJobs(const boost::system::error_code& e, connection_ptr conn)332 { 333 Info info(__FUNCTION__); 334 bool initiateSocket = !JobsQueue.isJobPresent();296 void FragmentScheduler::ControllerListener_t::handle_ReceiveJobs(const boost::system::error_code& e, connection_ptr conn) 297 { 298 Info info(__FUNCTION__); 299 bool need_initiateSocket = !JobsQueue.isJobPresent(); 335 300 336 301 // jobs are received, hence place in JobsQueue … … 338 303 LOG(1, "INFO: Pushing " << jobs.size() << " jobs into queue."); 339 304 JobsQueue.pushJobs(jobs); 340 // initiate socket if we had no jobs before341 if (initiateSocket)342 initiateWorkerSocket();343 305 } 344 306 345 307 jobs.clear(); 346 308 309 // initiate socket if we had no jobs before 310 if (need_initiateSocket) 311 initiateWorkerSocket(); 347 312 } 348 313 … … 352 317 * \param conn reference with the connection 353 318 */ 354 void FragmentScheduler:: handle_CheckResultState(const boost::system::error_code& e, connection_ptr conn)319 void FragmentScheduler::ControllerListener_t::handle_CheckResultState(const boost::system::error_code& e, connection_ptr conn) 355 320 { 356 321 Info info(__FUNCTION__); … … 364 329 * \param conn reference with the connection 365 330 */ 366 void FragmentScheduler:: handle_GetNextJobIdState(const boost::system::error_code& e, connection_ptr conn)331 void FragmentScheduler::ControllerListener_t::handle_GetNextJobIdState(const boost::system::error_code& e, connection_ptr conn) 367 332 { 368 333 Info info(__FUNCTION__); … … 376 341 * \param conn reference with the connection 377 342 */ 378 void FragmentScheduler:: handle_SendResults(const boost::system::error_code& e, connection_ptr conn)343 void FragmentScheduler::ControllerListener_t::handle_SendResults(const boost::system::error_code& e, connection_ptr conn) 379 344 { 380 345 Info info(__FUNCTION__); -
src/Fragmentation/Automation/FragmentScheduler.hpp
r95b384 r8036b7 16 16 #include <vector> 17 17 #include <boost/asio.hpp> 18 #include <boost/function.hpp> 18 19 19 20 #include "Connection.hpp" … … 22 23 #include "GlobalJobId.hpp" 23 24 #include "Jobs/FragmentJob.hpp" 25 #include "Listener.hpp" 24 26 #include "Results/FragmentResult.hpp" 25 27 #include "types.hpp" 26 28 27 /** FragmentScheduler serves FragmentJobs to Workers. 29 /** FragmentScheduler serves FragmentJobs to Workers and accepts commands from 30 * a Controller. 28 31 * 29 32 */ … … 35 38 FragmentScheduler(boost::asio::io_service& io_service, unsigned short workerport, unsigned short controllerport); 36 39 37 enum Exitflag_t{ 38 OkFlag = 0, 39 QueueError = 128, 40 ControllerErrorFlag = 254, 41 WorkerErrorFlag = 255 40 private: 41 class WorkerListener_t : public Listener 42 { 43 public: 44 WorkerListener_t( 45 boost::asio::io_service& io_service, 46 unsigned short port, 47 FragmentQueue &_JobsQueue) : 48 Listener(io_service, port), 49 JobsQueue(_JobsQueue), 50 result( new FragmentResult(JobId::NoJob) ) 51 {} 52 virtual ~WorkerListener_t() {} 53 54 protected: 55 /// Handle completion of a accept worker operation. 56 void handle_Accept(const boost::system::error_code& e, connection_ptr conn); 57 58 /// Worker callback function when job has been sent. 59 void handle_SendJobtoWorker(const boost::system::error_code& e, connection_ptr conn); 60 61 /// Worker callback function when result has been received. 62 void handle_ReceiveResultFromWorker(const boost::system::error_code& e, connection_ptr conn); 63 private: 64 //!> reference to external FragmentQueue containing jobs to work on 65 FragmentQueue & JobsQueue; 66 67 /// result that is received from the client. 68 FragmentResult::ptr result; 69 70 // static entity to indicate to clients that the queue is empty. 71 static FragmentJob::ptr NoJob; 42 72 }; 43 73 74 class ControllerListener_t : public Listener 75 { 76 public: 77 ControllerListener_t( 78 boost::asio::io_service& io_service, 79 unsigned short port, 80 FragmentQueue &_JobsQueue, 81 boost::function<void ()> _initiateWorkerSocket) : 82 Listener(io_service, port), 83 JobsQueue(_JobsQueue), 84 jobInfo((size_t)2, 0), 85 choice(NoOperation), 86 globalId(0), 87 initiateWorkerSocket(_initiateWorkerSocket) 88 {} 89 virtual ~ControllerListener_t() {} 90 91 protected: 92 /// Handle completion of a accept controller operation. 93 void handle_Accept(const boost::system::error_code& e, connection_ptr conn); 94 95 /// Handle completion of controller operation to read choice 96 void handle_ReadChoice(const boost::system::error_code& e, connection_ptr conn); 97 98 /// Controller callback function when job has been sent. 99 void handle_ReceiveJobs(const boost::system::error_code& e, connection_ptr conn); 100 101 /// Controller callback function when checking on state of results. 102 void handle_CheckResultState(const boost::system::error_code& e, connection_ptr conn); 103 104 /// Controller callback function when checking on state of results. 105 void handle_GetNextJobIdState(const boost::system::error_code& e, connection_ptr conn); 106 107 /// Controller callback function when result has been received. 108 void handle_SendResults(const boost::system::error_code& e, connection_ptr conn); 109 110 private: 111 //!> reference to external FragmentQueue containing jobs to work on 112 FragmentQueue & JobsQueue; 113 114 /// bunch of jobs received from controller before placed in JobsQueue 115 std::vector<FragmentJob::ptr> jobs; 116 117 /// number of jobs that are waiting to be and are calculated, required for returning status 118 std::vector<size_t> jobInfo; 119 120 // choice 121 enum ControllerChoices choice; 122 123 // TODO: replace this instance by a IdPool. 124 //!> global id to give next available job id 125 GlobalJobId globalId; 126 127 //!> callback function to tell that worker socket should be enabled 128 boost::function<void ()> initiateWorkerSocket; 129 }; 130 131 private: 132 //!> Listener instance that waits for a worker 133 WorkerListener_t WorkerListener; 134 135 //!> Listener instance that waits for a controller 136 ControllerListener_t ControllerListener; 137 138 /// Queue with data to be sent to each client. 139 FragmentQueue JobsQueue; 140 141 public: 44 142 /** Getter for Exitflag. 45 143 * … … 48 146 size_t getExitflag() const 49 147 { 50 return Exitflag; 148 if (WorkerListener.getExitflag() != 0) 149 return WorkerListener.getExitflag(); 150 if (ControllerListener.getExitflag() != 0) 151 return ControllerListener.getExitflag(); 152 return 0; 51 153 } 52 154 53 protected:54 /// Handle completion of a accept worker operation.55 void handle_AcceptWorker(const boost::system::error_code& e, connection_ptr conn);56 57 /// Handle completion of a accept controller operation.58 void handle_AcceptController(const boost::system::error_code& e, connection_ptr conn);59 60 /// Handle completion of controller operation to read choice61 void handle_ReadChoice(const boost::system::error_code& e, connection_ptr conn);62 63 /// Worker callback function when job has been sent.64 void handle_SendJobtoWorker(const boost::system::error_code& e, connection_ptr conn);65 66 /// Worker callback function when result has been received.67 void handle_ReceiveResultFromWorker(const boost::system::error_code& e, connection_ptr conn);68 69 /// Controller callback function when job has been sent.70 void handle_ReceiveJobs(const boost::system::error_code& e, connection_ptr conn);71 72 private:73 74 /// Controller callback function when checking on state of results.75 void handle_CheckResultState(const boost::system::error_code& e, connection_ptr conn);76 77 /// Controller callback function when checking on state of results.78 void handle_GetNextJobIdState(const boost::system::error_code& e, connection_ptr conn);79 80 /// Controller callback function when result has been received.81 void handle_SendResults(const boost::system::error_code& e, connection_ptr conn);82 83 /// internal function to prepare worker connections84 void initiateWorkerSocket();85 86 /// internal function to prepare controller connections87 void initiateControllerSocket();88 89 private:90 /// The acceptor object used to accept incoming worker socket connections.91 boost::asio::ip::tcp::acceptor worker_acceptor_;92 93 /// The acceptor object used to accept incoming controller socket connections.94 boost::asio::ip::tcp::acceptor controller_acceptor_;95 96 /// result that is received from the client.97 FragmentResult::ptr result;98 99 /// bunch of jobs received from controller before placed in JobsQueue100 std::vector<FragmentJob::ptr> jobs;101 102 /// number of jobs that are waiting to be and are calculated, required for returning status103 std::vector<size_t> jobInfo;104 105 // choice106 enum ControllerChoices choice;107 108 /// Queue with data to be sent to each client.109 FragmentQueue JobsQueue;110 111 // static entity to indicate to clients that the queue is empty.112 static FragmentJob::ptr NoJob;113 114 // TODO: replace this instance by a IdPool.115 //!> global id to give next available job id116 GlobalJobId globalId;117 118 // Exit flag on program exit119 enum Exitflag_t Exitflag;120 155 }; 121 156 -
src/Fragmentation/Automation/Makefile.am
r95b384 r8036b7 86 86 87 87 SERVERSOURCE = \ 88 FragmentScheduler.cpp 88 FragmentScheduler.cpp \ 89 Listener.cpp 89 90 90 91 SERVERHEADER = \ 91 92 Connection.hpp \ 92 93 ControllerChoices.hpp \ 93 FragmentScheduler.hpp 94 FragmentScheduler.hpp \ 95 Listener.hpp 94 96 95 97 WORKERSOURCE = \
Note:
See TracChangeset
for help on using the changeset viewer.