- Timestamp:
- Jun 29, 2012, 1:23:14 PM (13 years ago)
- Branches:
- Action_Thermostats, Add_AtomRandomPerturbation, Add_FitFragmentPartialChargesAction, Add_RotateAroundBondAction, Add_SelectAtomByNameAction, Added_ParseSaveFragmentResults, AddingActions_SaveParseParticleParameters, Adding_Graph_to_ChangeBondActions, Adding_MD_integration_tests, Adding_ParticleName_to_Atom, Adding_StructOpt_integration_tests, AtomFragments, Automaking_mpqc_open, AutomationFragmentation_failures, Candidate_v1.5.4, Candidate_v1.6.0, Candidate_v1.6.1, ChangeBugEmailaddress, ChangingTestPorts, ChemicalSpaceEvaluator, CombiningParticlePotentialParsing, Combining_Subpackages, Debian_Package_split, Debian_package_split_molecuildergui_only, Disabling_MemDebug, Docu_Python_wait, EmpiricalPotential_contain_HomologyGraph, EmpiricalPotential_contain_HomologyGraph_documentation, Enable_parallel_make_install, Enhance_userguide, Enhanced_StructuralOptimization, Enhanced_StructuralOptimization_continued, Example_ManyWaysToTranslateAtom, Exclude_Hydrogens_annealWithBondGraph, FitPartialCharges_GlobalError, Fix_BoundInBox_CenterInBox_MoleculeActions, Fix_ChargeSampling_PBC, Fix_ChronosMutex, Fix_FitPartialCharges, Fix_FitPotential_needs_atomicnumbers, Fix_ForceAnnealing, Fix_IndependentFragmentGrids, Fix_ParseParticles, Fix_ParseParticles_split_forward_backward_Actions, Fix_PopActions, Fix_QtFragmentList_sorted_selection, Fix_Restrictedkeyset_FragmentMolecule, Fix_StatusMsg, Fix_StepWorldTime_single_argument, Fix_Verbose_Codepatterns, Fix_fitting_potentials, Fixes, ForceAnnealing_goodresults, ForceAnnealing_oldresults, ForceAnnealing_tocheck, ForceAnnealing_with_BondGraph, ForceAnnealing_with_BondGraph_continued, ForceAnnealing_with_BondGraph_continued_betteresults, ForceAnnealing_with_BondGraph_contraction-expansion, FragmentAction_writes_AtomFragments, FragmentMolecule_checks_bonddegrees, GeometryObjects, Gui_Fixes, Gui_displays_atomic_force_velocity, ImplicitCharges, IndependentFragmentGrids, IndependentFragmentGrids_IndividualZeroInstances, IndependentFragmentGrids_IntegrationTest, IndependentFragmentGrids_Sole_NN_Calculation, JobMarket_RobustOnKillsSegFaults, JobMarket_StableWorkerPool, JobMarket_unresolvable_hostname_fix, MoreRobust_FragmentAutomation, ODR_violation_mpqc_open, PartialCharges_OrthogonalSummation, PdbParser_setsAtomName, PythonUI_with_named_parameters, QtGui_reactivate_TimeChanged_changes, Recreated_GuiChecks, Rewrite_FitPartialCharges, RotateToPrincipalAxisSystem_UndoRedo, SaturateAtoms_findBestMatching, SaturateAtoms_singleDegree, StoppableMakroAction, Subpackage_CodePatterns, Subpackage_JobMarket, Subpackage_LinearAlgebra, Subpackage_levmar, Subpackage_mpqc_open, Subpackage_vmg, Switchable_LogView, ThirdParty_MPQC_rebuilt_buildsystem, TrajectoryDependenant_MaxOrder, TremoloParser_IncreasedPrecision, TremoloParser_MultipleTimesteps, TremoloParser_setsAtomName, Ubuntu_1604_changes, stable
- Children:
- aec098
- Parents:
- a8f54b6
- git-author:
- Frederik Heber <heber@…> (05/21/12 07:14:59)
- git-committer:
- Frederik Heber <heber@…> (06/29/12 13:23:14)
- Location:
- src/Fragmentation/Automation
- Files:
-
- 9 edited
Legend:
- Unmodified
- Added
- Removed
-
src/Fragmentation/Automation/Controller/Commands/EnrollInPoolOperation.cpp
ra8f54b6 r9a3f84 30 30 #include "Jobs/FragmentJob.hpp" 31 31 #include "WorkerAddress.hpp" 32 #include "WorkerChoices.hpp" 32 33 33 34 /// Handle completion of a connect operation. … … 41 42 // of jobs. The connection::async_read() function will automatically 42 43 // decode the data that is read from the underlying socket. 43 LOG(1, "INFO: Enrolling in pool with " << address << "...");44 44 connection_.async_write(address, 45 boost::bind(&EnrollInPoolOperation::handle_ FinishOperation, this,45 boost::bind(&EnrollInPoolOperation::handle_SendChoice, this, 46 46 boost::asio::placeholders::error)); 47 47 } else if (endpoint_iterator != boost::asio::ip::tcp::resolver::iterator()) { … … 61 61 62 62 63 /// Handle sent of address 64 void EnrollInPoolOperation::handle_sendAddress(const boost::system::error_code& e) 63 /// Handle sending choice 64 void EnrollInPoolOperation::handle_SendChoice(const boost::system::error_code& e) 65 { 66 Info info(__FUNCTION__); 67 if (!e) 68 { 69 // Successfully established connection. Start operation to read the list 70 // of jobs. The connection::async_read() function will automatically 71 // decode the data that is read from the underlying socket. 72 LOG(1, "INFO: Enrolling in pool with " << address << " ..."); 73 enum WorkerChoices choice = EnrollInPool; 74 connection_.async_write(choice, 75 boost::bind(&EnrollInPoolOperation::handle_ReceiveFlag, this, 76 boost::asio::placeholders::error)); 77 } else { 78 // An error occurred. Log it and return. Since we are not starting a new 79 // operation the io_service will run out of work to do and the client will 80 // exit. 81 ELOG(1, e.message()); 82 } 83 } 84 85 86 /// Handle receiving flag 87 void EnrollInPoolOperation::handle_ReceiveFlag(const boost::system::error_code& e) 65 88 { 66 89 Info info(__FUNCTION__); … … 81 104 } 82 105 83 /// Handle sent of address106 /// Handle received flag 84 107 void EnrollInPoolOperation::handle_FinishOperation(const boost::system::error_code& e) 85 108 { -
src/Fragmentation/Automation/Controller/Commands/EnrollInPoolOperation.hpp
ra8f54b6 r9a3f84 47 47 boost::asio::ip::tcp::resolver::iterator endpoint_iterator); 48 48 49 /// Handle sen t of address50 void handle_ sendAddress(const boost::system::error_code& e);49 /// Handle sending choice 50 void handle_SendChoice(const boost::system::error_code& e); 51 51 52 /// Handle sent of address 52 /// Handle receiving flag 53 void handle_ReceiveFlag(const boost::system::error_code& e); 54 55 /// Handle received flag 53 56 void handle_FinishOperation(const boost::system::error_code& e); 54 57 -
src/Fragmentation/Automation/Controller/Commands/ObtainJobOperation.cpp
ra8f54b6 r9a3f84 29 29 #include "CodePatterns/Log.hpp" 30 30 #include "Jobs/FragmentJob.hpp" 31 #include "WorkerChoices.hpp" 31 32 32 33 /// Handle completion of a connect operation. -
src/Fragmentation/Automation/Controller/Commands/SubmitResultOperation.cpp
ra8f54b6 r9a3f84 29 29 #include "CodePatterns/Log.hpp" 30 30 #include "Results/FragmentResult.hpp" 31 #include "WorkerChoices.hpp" 31 32 32 33 … … 43 44 LOG(1, "INFO: Sending address " << address << " ..."); 44 45 connection_.async_write(address, 45 boost::bind(&SubmitResultOperation::handle_Send Address, this,46 boost::bind(&SubmitResultOperation::handle_SendChoice, this, 46 47 boost::asio::placeholders::error)); 47 48 } else if (endpoint_iterator != boost::asio::ip::tcp::resolver::iterator()) { … … 62 63 63 64 /// Callback function when address has been sent and result is about to 64 void SubmitResultOperation::handle_Send Address(const boost::system::error_code& e)65 void SubmitResultOperation::handle_SendChoice(const boost::system::error_code& e) 65 66 { 66 67 Info info(__FUNCTION__); … … 70 71 // of jobs. The connection::async_write() function will automatically 71 72 // decode the data that is read from the underlying socket. 72 LOG(1, "INFO: Sending result #" << result->getId() << " ...");73 connection_.async_write( result,73 enum WorkerChoices choice = SendResult; 74 connection_.async_write(choice, 74 75 boost::bind(&SubmitResultOperation::handle_SendResult, this, 75 76 boost::asio::placeholders::error)); … … 83 84 } 84 85 85 /// Callback function when result has been sent.86 /// Callback function when address has been sent and result is about to 86 87 void SubmitResultOperation::handle_SendResult(const boost::system::error_code& e) 87 88 { 88 89 Info info(__FUNCTION__); 89 if (!e) { 90 LOG(1, "INFO: result #" << result->getId() << " sent."); 91 AsyncOperation::handle_FinishOperation(e); 90 if (!e) 91 { 92 // Successfully established connection. Start operation to read the list 93 // of jobs. The connection::async_write() function will automatically 94 // decode the data that is read from the underlying socket. 95 LOG(1, "INFO: Sending result #" << result->getId() << " ..."); 96 connection_.async_write(result, 97 boost::bind(&SubmitResultOperation::handle_FinishOperation, this, 98 boost::asio::placeholders::error)); 92 99 } else { 93 100 // An error occurred. Log it and return. Since we are not starting a new … … 98 105 } 99 106 } 107 108 /// Callback function when result has been sent. 109 void SubmitResultOperation::handle_FinishOperation(const boost::system::error_code& e) 110 { 111 Info info(__FUNCTION__); 112 LOG(1, "INFO: result #" << result->getId() << " sent."); 113 AsyncOperation::handle_FinishOperation(e); 114 } -
src/Fragmentation/Automation/Controller/Commands/SubmitResultOperation.hpp
ra8f54b6 r9a3f84 68 68 69 69 /// Callback function when address has been sent and result is about to. 70 void handle_Send Address(const boost::system::error_code& e);70 void handle_SendChoice(const boost::system::error_code& e); 71 71 72 72 /// Callback function when result has been sent. 73 73 void handle_SendResult(const boost::system::error_code& e); 74 75 /// Callback function when result has been sent. 76 void handle_FinishOperation(const boost::system::error_code& e); 74 77 75 78 private: -
src/Fragmentation/Automation/FragmentQueue.cpp
ra8f54b6 r9a3f84 216 216 AttemptsMap::iterator attemptiter = attempts.find(id); 217 217 // check whether succeeded or (finally) failed 218 if ((_result->exitflag == 0) || ((attemptiter != attempts.end()) && (attemptiter->second >= Max_Attempts))) { 218 if ((_result->exitflag == 0) 219 || ((attemptiter != attempts.end()) && (attemptiter->second >= Max_Attempts)) 220 || (Max_Attempts == 1)) { 219 221 // give notice if it is resubmitted job 220 222 if (attemptiter != attempts.end()) { … … 258 260 +" not stored in backup."); 259 261 if (iter != backup.end()) { 260 // remove result262 // remove present result 261 263 ResultMap::iterator resiter = results.find(jobid); 262 264 ASSERT( resiter != results.end(), 263 "FragmentQueue::resubmitJob() - resubmittingjob "+toString(jobid)264 +" for which no result ispresent.");265 "FragmentQueue::resubmitJob() - job "+toString(jobid) 266 +" to resubmit has no result present."); 265 267 results.erase(resiter); 266 268 pushJob(iter->second); -
src/Fragmentation/Automation/FragmentScheduler.cpp
ra8f54b6 r9a3f84 93 93 // read address 94 94 conn->async_read(address, 95 boost::bind(&FragmentScheduler::WorkerListener_t::handle_ checkAddress, this,95 boost::bind(&FragmentScheduler::WorkerListener_t::handle_ReadAddress, this, 96 96 boost::asio::placeholders::error, conn)); 97 } else { 97 } 98 else 99 { 98 100 // An error occurred. Log it and return. Since we are not starting a new 99 101 // accept operation the io_service will run out of work to do and the … … 104 106 } 105 107 106 /** Callback function when worker address has been received.107 * 108 * \param e error code if something went wrong 109 * \param conn reference with the connection 110 */ 111 void FragmentScheduler::WorkerListener_t::handle_ checkAddress(const boost::system::error_code& e, connection_ptr conn)108 /** Handle having received Worker's address 109 * 110 * \param e error code if something went wrong 111 * \param conn reference with the connection 112 */ 113 void FragmentScheduler::WorkerListener_t::handle_ReadAddress(const boost::system::error_code& e, connection_ptr conn) 112 114 { 113 115 Info info(__FUNCTION__); 114 116 if (!e) 115 117 { 116 if (pool.presentInPool(address)) { 117 // check whether its priority is busy_priority 118 if (pool.isWorkerBusy(address)) { 119 conn->async_read(result, 120 boost::bind(&FragmentScheduler::WorkerListener_t::handle_ReceiveResultFromWorker, this, 121 boost::asio::placeholders::error, conn)); 122 } else { 123 ELOG(1, "INFO: Idle worker "+toString(address)+" has logged in again."); 124 } 125 } else { 126 // insert as its new worker 127 pool.addWorker(address); 128 enum EnrollInPoolOperation::EnrollFlag flag = EnrollInPoolOperation::Success; 129 conn->async_write(flag, 130 boost::bind(&FragmentScheduler::WorkerListener_t::handle_enrolled, this, 131 boost::asio::placeholders::error, conn)); 132 } 118 // Successfully accepted a new connection. 119 // read address 120 conn->async_read(choice, 121 boost::bind(&FragmentScheduler::WorkerListener_t::handle_ReadChoice, this, 122 boost::asio::placeholders::error, conn)); 133 123 } 134 124 else … … 140 130 ELOG(0, e.message()); 141 131 } 132 } 133 134 /** Controller callback function to read the choice for next operation. 135 * 136 * \param e error code if something went wrong 137 * \param conn reference with the connection 138 */ 139 void FragmentScheduler::WorkerListener_t::handle_ReadChoice(const boost::system::error_code& e, connection_ptr conn) 140 { 141 Info info(__FUNCTION__); 142 if (!e) 143 { 144 LOG(1, "INFO: Received request for operation " << choice << "."); 145 // switch over the desired choice read previously 146 switch(choice) { 147 case NoWorkerOperation: 148 { 149 ELOG(1, "WorkerListener_t::handle_ReadChoice() - called with NoOperation."); 150 break; 151 } 152 case EnrollInPool: 153 { 154 if (pool.presentInPool(address)) { 155 ELOG(1, "INFO: worker "+toString(address)+" is already contained in pool."); 156 enum EnrollInPoolOperation::EnrollFlag flag = EnrollInPoolOperation::Fail; 157 conn->async_write(flag, 158 boost::bind(&FragmentScheduler::WorkerListener_t::handle_enrolled, this, 159 boost::asio::placeholders::error, conn)); 160 } else { 161 // insert as its new worker 162 LOG(1, "INFO: Adding " << address << " to pool ..."); 163 pool.addWorker(address); 164 enum EnrollInPoolOperation::EnrollFlag flag = EnrollInPoolOperation::Success; 165 conn->async_write(flag, 166 boost::bind(&FragmentScheduler::WorkerListener_t::handle_enrolled, this, 167 boost::asio::placeholders::error, conn)); 168 break; 169 } 170 case SendResult: 171 { 172 if (pool.presentInPool(address)) { 173 // check whether its priority is busy_priority 174 if (pool.isWorkerBusy(address)) { 175 conn->async_read(result, 176 boost::bind(&FragmentScheduler::WorkerListener_t::handle_ReceiveResultFromWorker, this, 177 boost::asio::placeholders::error, conn)); 178 } else { 179 ELOG(1, "Worker " << address << " trying to send result who is not marked as busy."); 180 conn->async_read(result, 181 boost::bind(&FragmentScheduler::WorkerListener_t::handle_RejectResultFromWorker, this, 182 boost::asio::placeholders::error, conn)); 183 } 184 } else { 185 ELOG(1, "Worker " << address << " trying to send result who is not in pool."); 186 conn->async_read(result, 187 boost::bind(&FragmentScheduler::WorkerListener_t::handle_RejectResultFromWorker, this, 188 boost::asio::placeholders::error, conn)); 189 } 190 break; 191 } 192 case RemoveFromPool: 193 { 194 if (pool.presentInPool(address)) { 195 // removing present worker 196 pool.removeWorker(address); 197 } else { 198 ELOG(1, "Shutting down Worker " << address << " not contained in pool."); 199 } 200 break; 201 } 202 default: 203 Exitflag = ErrorFlag; 204 ELOG(1, "WorkerListener_t::handle_ReadChoice() - called with no valid choice."); 205 break; 206 } 207 } 208 // restore NoOperation choice such that choice is not read twice 209 choice = NoWorkerOperation; 210 } 211 else 212 { 213 // An error occurred. Log it and return. Since we are not starting a new 214 // accept operation the io_service will run out of work to do and the 215 // server will exit. 216 Exitflag = ErrorFlag; 217 ELOG(0, e.message()); 218 } 142 219 143 220 if (JobsQueue.isJobPresent()) { … … 147 224 } 148 225 } 226 149 227 150 228 /** Callback function when new worker has enrolled. … … 164 242 job = NoJob; 165 243 } 166 callback_sendJobToWorker( address, job);244 callback_sendJobToWorker(pool.getNextIdleWorker(), job); 167 245 } 168 246 else … … 205 283 } 206 284 285 /** Callback function when result has been received. 286 * 287 * \param e error code if something went wrong 288 * \param conn reference with the connection 289 */ 290 void FragmentScheduler::WorkerListener_t::handle_RejectResultFromWorker(const boost::system::error_code& e, connection_ptr conn) 291 { 292 Info info(__FUNCTION__); 293 // nothing to do 294 LOG(1, "INFO: Rejecting result for job #" << result->getId() << ", placing back into queue."); 295 296 JobsQueue.resubmitJob(result->getId()); 297 298 LOG(1, "INFO: JobsQueue has " << JobsQueue.getDoneJobs() << " results."); 299 } 300 207 301 208 302 /** Handle a new controller connection. … … 250 344 case NoControllerOperation: 251 345 { 252 ELOG(1, " FragmentScheduler::handle_ReadChoice() - called with NoOperation.");346 ELOG(1, "ControllerListener_t::handle_ReadChoice() - called with NoOperation."); 253 347 break; 254 348 } … … 296 390 case ShutdownControllerSocket: 297 391 { 298 LaunchNewAcceptor = false; 392 LOG(1, "INFO: Received shutdown from controller ..."); 393 // only allow for shutdown when there are no more jobs in the queue 394 if (!JobsQueue.isJobPresent()) { 395 LaunchNewAcceptor = false; 396 } else { 397 ELOG(2, "There are still jobs waiting in the queue."); 398 } 299 399 break; 300 400 } 301 401 default: 302 402 Exitflag = ErrorFlag; 303 ELOG(1, " FragmentScheduler::handle_ReadChoice() - called with no valid choice.");403 ELOG(1, "ControllerListener_t::handle_ReadChoice() - called with no valid choice."); 304 404 break; 305 405 } … … 389 489 /** Helper function to send a job to worker. 390 490 * 491 * Note that we do not set the worker as busy. We simply send it the job. 492 * 391 493 * @param address address of worker 392 494 * @param job job to send … … 394 496 void FragmentScheduler::sendJobToWorker(const WorkerAddress &address, FragmentJob::ptr &job) 395 497 { 396 ASSERT( !pool.isWorkerBusy(address),397 "FragmentScheduler::sendJobToWorker() - Worker "+toString(address)+" is alreadybusy.");498 ASSERT( pool.isWorkerBusy(address), 499 "FragmentScheduler::sendJobToWorker() - Worker "+toString(address)+" is not marked as busy."); 398 500 LOG(1, "INFO: Sending job " << job->getId() << " to worker " << address << "."); 399 501 sendJobOp.setJob(job); 400 502 sendJobOp(address.host, address.service); 401 // set worker as busy (assuming it has been removed from idle queue already)402 WorkerPool::Idle_Queue_t::iterator iter = pool.getIdleWorker(address);403 ASSERT( iter != pool.getIdleEnd(),404 "FragmentScheduler::sendJobToWorker() - cannot find worker "405 +toString(address)+" in idle queue.");406 pool.markWorkerBusy(iter);407 503 } 408 504 -
src/Fragmentation/Automation/FragmentScheduler.hpp
ra8f54b6 r9a3f84 29 29 #include "Pool/WorkerPool.hpp" 30 30 #include "WorkerAddress.hpp" 31 #include "WorkerChoices.hpp" 31 32 32 33 /** FragmentScheduler serves FragmentJobs to Workers and accepts commands from … … 60 61 pool(_pool), 61 62 result( new FragmentResult(JobId::NoJob) ), 62 callback_sendJobToWorker(_callback) 63 callback_sendJobToWorker(_callback), 64 choice(NoWorkerOperation) 63 65 {} 64 66 virtual ~WorkerListener_t() {} … … 68 70 void handle_Accept(const boost::system::error_code& e, connection_ptr conn); 69 71 72 /// Handle completion of Worker operation to read choice 73 void handle_ReadChoice(const boost::system::error_code& e, connection_ptr conn); 74 70 75 /// Worker callback function when job has been sent. 71 void handle_ checkAddress(const boost::system::error_code& e, connection_ptr conn);76 void handle_ReadAddress(const boost::system::error_code& e, connection_ptr conn); 72 77 73 78 /// Worker callback function when new worker has enrolled. … … 76 81 /// Worker callback function when result has been received. 77 82 void handle_ReceiveResultFromWorker(const boost::system::error_code& e, connection_ptr conn); 83 84 /// Worker callback function when invalid result has been received. 85 void handle_RejectResultFromWorker(const boost::system::error_code& e, connection_ptr conn); 78 86 private: 87 //!> static entity to indicate to clients that the queue is empty. 88 static FragmentJob::ptr NoJob; 89 79 90 //!> address of new Worker 80 91 WorkerAddress address; … … 86 97 WorkerPool &pool; 87 98 88 // /result that is received from the client.99 //!> result that is received from the client. 89 100 FragmentResult::ptr result; 90 101 … … 92 103 boost::function<void (const WorkerAddress&, FragmentJob::ptr&)> callback_sendJobToWorker; 93 104 94 // static entity to indicate to clients that the queue is empty.95 static FragmentJob::ptr NoJob;105 //!> choice 106 enum WorkerChoices choice; 96 107 }; 97 108 … … 136 147 FragmentQueue & JobsQueue; 137 148 138 // /bunch of jobs received from controller before placed in JobsQueue149 //!> bunch of jobs received from controller before placed in JobsQueue 139 150 std::vector<FragmentJob::ptr> jobs; 140 151 141 // /number of jobs that are waiting to be and are calculated, required for returning status152 //!> number of jobs that are waiting to be and are calculated, required for returning status 142 153 std::vector<size_t> jobInfo; 143 154 144 // choice155 //!> choice 145 156 enum ControllerChoices choice; 146 157 … … 154 165 155 166 private: 156 // /Queue with data to be sent to each client.167 //!> Queue with data to be sent to each client. 157 168 FragmentQueue JobsQueue; 158 169 -
src/Fragmentation/Automation/Pool/WorkerPool.hpp
ra8f54b6 r9a3f84 55 55 56 56 Idle_Queue_t::iterator getIdleWorker(const WorkerAddress &address); 57 Idle_Queue_t::iterator getIdleEnd()58 {59 return idle_queue.end();60 }61 57 void markWorkerBusy(Idle_Queue_t::iterator &iter); 62 58
Note:
See TracChangeset
for help on using the changeset viewer.