/* * Project: MoleCuilder * Description: creates and alters molecular systems * Copyright (C) 2011 University of Bonn. All rights reserved. * Please see the LICENSE file or "Copyright notice" in builder.cpp for details. */ /* * \file FragmentScheduler.cpp * * This file strongly follows the Serialization example from the boost::asio * library (see server.cpp) * * Created on: Oct 19, 2011 * Author: heber */ // include config.h #ifdef HAVE_CONFIG_H #include #endif // boost asio needs specific operator new #include #include "CodePatterns/MemDebug.hpp" #include #include #include #include #include #include #include "Connection.hpp" // Must come before boost/serialization headers. #include #include "CodePatterns/Info.hpp" #include "CodePatterns/Log.hpp" #include "CodePatterns/Observer/Notification.hpp" #include "ControllerChoices.hpp" #include "Operations/Servers/SendJobToWorkerOperation.hpp" #include "Operations/Workers/EnrollInPoolOperation.hpp" #include "Jobs/MPQCCommandJob.hpp" #include "Jobs/SystemCommandJob.hpp" #include "JobId.hpp" #include "FragmentScheduler.hpp" /** Helper function to enforce binding of FragmentWorker to possible derived * FragmentJob classes. */ void dummyInit() { SystemCommandJob("/bin/false", "something", JobId::IllegalJob); MPQCCommandJob("nofile", JobId::IllegalJob); } /** Constructor of class FragmentScheduler. * * We setup both acceptors to accept connections from workers and Controller. * * \param io_service io_service of the asynchronous communications * \param workerport port to listen for worker connections * \param controllerport port to listen for controller connections. */ FragmentScheduler::FragmentScheduler(boost::asio::io_service& _io_service, unsigned short workerport, unsigned short controllerport) : Observer("FragmentScheduler"), io_service(_io_service), WorkerListener(_io_service, workerport, JobsQueue, pool, boost::bind(&FragmentScheduler::sendJobToWorker, boost::ref(*this), _1, _2)), ControllerListener(_io_service, controllerport, JobsQueue, boost::bind(&FragmentScheduler::removeAllWorkers, boost::ref(*this)), boost::bind(&FragmentScheduler::shutdown, boost::ref(*this))), connection(_io_service) { Info info(__FUNCTION__); // sign on to idle workers and present jobs pool.signOn(this, WorkerPool::WorkerIdle); JobsQueue.signOn(this, FragmentQueue::JobAdded); // listen for controller ControllerListener.initiateSocket(); // listen for workers WorkerListener.initiateSocket(); } FragmentScheduler::~FragmentScheduler() { // sign off pool.signOff(this, WorkerPool::WorkerIdle); JobsQueue.signOff(this, FragmentQueue::JobAdded); } /** Handle a new worker connection. * * We store the given address in the pool. * * \param e error code if something went wrong * \param conn reference with the connection */ void FragmentScheduler::WorkerListener_t::handle_Accept(const boost::system::error_code& e, connection_ptr conn) { Info info(__FUNCTION__); if (!e) { // Successfully accepted a new connection. // read address conn->async_read(address, boost::bind(&FragmentScheduler::WorkerListener_t::handle_ReadAddress, this, boost::asio::placeholders::error, conn)); } else { // An error occurred. Log it and return. Since we are not starting a new // accept operation the io_service will run out of work to do and the // server will exit. Exitflag = ErrorFlag; ELOG(0, e.message()); } } /** Handle having received Worker's address * * \param e error code if something went wrong * \param conn reference with the connection */ void FragmentScheduler::WorkerListener_t::handle_ReadAddress(const boost::system::error_code& e, connection_ptr conn) { Info info(__FUNCTION__); if (!e) { // Successfully accepted a new connection. // read address conn->async_read(choice, boost::bind(&FragmentScheduler::WorkerListener_t::handle_ReadChoice, this, boost::asio::placeholders::error, conn)); } else { // An error occurred. Log it and return. Since we are not starting a new // accept operation the io_service will run out of work to do and the // server will exit. Exitflag = ErrorFlag; ELOG(0, e.message()); } } /** Controller callback function to read the choice for next operation. * * \param e error code if something went wrong * \param conn reference with the connection */ void FragmentScheduler::WorkerListener_t::handle_ReadChoice(const boost::system::error_code& e, connection_ptr conn) { Info info(__FUNCTION__); if (!e) { LOG(1, "INFO: Received request for operation " << choice << "."); // switch over the desired choice read previously switch(choice) { case NoWorkerOperation: { ELOG(1, "WorkerListener_t::handle_ReadChoice() - called with NoOperation."); break; } case EnrollInPool: { if (pool.presentInPool(address)) { ELOG(1, "INFO: worker "+toString(address)+" is already contained in pool."); enum EnrollInPoolOperation::EnrollFlag flag = EnrollInPoolOperation::Fail; conn->async_write(flag, boost::bind(&FragmentScheduler::WorkerListener_t::handle_enrolled, this, boost::asio::placeholders::error, conn)); } else { // insert as its new worker LOG(1, "INFO: Adding " << address << " to pool ..."); pool.addWorker(address); enum EnrollInPoolOperation::EnrollFlag flag = EnrollInPoolOperation::Success; conn->async_write(flag, boost::bind(&FragmentScheduler::WorkerListener_t::handle_enrolled, this, boost::asio::placeholders::error, conn)); break; } case SendResult: { if (pool.presentInPool(address)) { // check whether its priority is busy_priority if (pool.isWorkerBusy(address)) { conn->async_read(result, boost::bind(&FragmentScheduler::WorkerListener_t::handle_ReceiveResultFromWorker, this, boost::asio::placeholders::error, conn)); } else { ELOG(1, "Worker " << address << " trying to send result who is not marked as busy."); conn->async_read(result, boost::bind(&FragmentScheduler::WorkerListener_t::handle_RejectResultFromWorker, this, boost::asio::placeholders::error, conn)); } } else { ELOG(1, "Worker " << address << " trying to send result who is not in pool."); conn->async_read(result, boost::bind(&FragmentScheduler::WorkerListener_t::handle_RejectResultFromWorker, this, boost::asio::placeholders::error, conn)); } break; } case RemoveFromPool: { if (pool.presentInPool(address)) { // removing present worker pool.removeWorker(address); } else { ELOG(1, "Shutting down Worker " << address << " not contained in pool."); } break; } default: Exitflag = ErrorFlag; ELOG(1, "WorkerListener_t::handle_ReadChoice() - called with no valid choice."); break; } } // restore NoOperation choice such that choice is not read twice choice = NoWorkerOperation; initiateSocket(); } else { // An error occurred. Log it and return. Since we are not starting a new // accept operation the io_service will run out of work to do and the // server will exit. Exitflag = ErrorFlag; ELOG(0, e.message()); } } /** Callback function when new worker has enrolled. * * \param e error code if something went wrong * \param conn reference with the connection */ void FragmentScheduler::WorkerListener_t::handle_enrolled(const boost::system::error_code& e, connection_ptr conn) { Info info(__FUNCTION__); if (e) { // An error occurred. Log it and return. Since we are not starting a new // accept operation the io_service will run out of work to do and the // server will exit. Exitflag = ErrorFlag; ELOG(0, e.message()); } } /** Callback function when result has been received. * * \param e error code if something went wrong * \param conn reference with the connection */ void FragmentScheduler::WorkerListener_t::handle_ReceiveResultFromWorker(const boost::system::error_code& e, connection_ptr conn) { Info info(__FUNCTION__); LOG(1, "INFO: Received result for job #" << result->getId() << " ..."); // and push into queue ASSERT(result->getId() != (JobId_t)JobId::NoJob, "WorkerListener_t::handle_ReceiveResultFromWorker() - result received has NoJob id."); ASSERT(result->getId() != (JobId_t)JobId::IllegalJob, "WorkerListener_t::handle_ReceiveResultFromWorker() - result received has IllegalJob id."); // place id into expected if ((result->getId() != (JobId_t)JobId::NoJob) && (result->getId() != (JobId_t)JobId::IllegalJob)) JobsQueue.pushResult(result); // mark as idle pool.unmarkWorkerBusy(address); // erase result result.reset(); LOG(1, "INFO: JobsQueue has " << JobsQueue.getDoneJobs() << " results."); } /** Callback function when result has been received. * * \param e error code if something went wrong * \param conn reference with the connection */ void FragmentScheduler::WorkerListener_t::handle_RejectResultFromWorker(const boost::system::error_code& e, connection_ptr conn) { Info info(__FUNCTION__); // nothing to do LOG(1, "INFO: Rejecting result for job #" << result->getId() << ", placing back into queue."); JobsQueue.resubmitJob(result->getId()); LOG(1, "INFO: JobsQueue has " << JobsQueue.getDoneJobs() << " results."); } /** Handle a new controller connection. * * \sa handle_ReceiveJobs() * \sa handle_CheckResultState() * \sa handle_SendResults() * * \param e error code if something went wrong * \param conn reference with the connection */ void FragmentScheduler::ControllerListener_t::handle_Accept(const boost::system::error_code& e, connection_ptr conn) { Info info(__FUNCTION__); if (!e) { conn->async_read(choice, boost::bind(&FragmentScheduler::ControllerListener_t::handle_ReadChoice, this, boost::asio::placeholders::error, conn)); } else { // An error occurred. Log it and return. Since we are not starting a new // accept operation the io_service will run out of work to do and the // server will exit. Exitflag = ErrorFlag; ELOG(0, e.message()); } } /** Controller callback function to read the choice for next operation. * * \param e error code if something went wrong * \param conn reference with the connection */ void FragmentScheduler::ControllerListener_t::handle_ReadChoice(const boost::system::error_code& e, connection_ptr conn) { Info info(__FUNCTION__); if (!e) { bool LaunchNewAcceptor = true; LOG(1, "INFO: Received request for operation " << choice << "."); // switch over the desired choice read previously switch(choice) { case NoControllerOperation: { ELOG(1, "ControllerListener_t::handle_ReadChoice() - called with NoOperation."); break; } case GetNextJobId: { LOG(1, "INFO: Receiving number of desired job ids from controller ..."); conn->async_read(NumberIds, boost::bind(&FragmentScheduler::ControllerListener_t::handle_GetNextJobIdState, this, boost::asio::placeholders::error, conn)); break; } case SendJobs: { // The connection::async_write() function will automatically // serialize the data structure for us. LOG(1, "INFO: Receiving bunch of jobs from a controller ..."); conn->async_read(jobs, boost::bind(&FragmentScheduler::ControllerListener_t::handle_ReceiveJobs, this, boost::asio::placeholders::error, conn)); break; } case CheckState: { // first update number jobInfo[0] = JobsQueue.getPresentJobs(); jobInfo[1] = JobsQueue.getDoneJobs(); // now we accept connections to check for state of calculations LOG(1, "INFO: Sending state that "+toString(jobInfo[0])+" jobs are present and "+toString(jobInfo[1])+" jobs are done to controller ..."); conn->async_write(jobInfo, boost::bind(&FragmentScheduler::ControllerListener_t::handle_CheckResultState, this, boost::asio::placeholders::error, conn)); break; } case RemoveAll: { removeallWorkers(); break; } case ReceiveResults: { const std::vector results = JobsQueue.getAllResults(); // ... or we give the results LOG(1, "INFO: Sending "+toString(results.size())+" results to controller ..."); conn->async_write(results, boost::bind(&FragmentScheduler::ControllerListener_t::handle_SendResults, this, boost::asio::placeholders::error, conn)); break; } case ShutdownControllerSocket: { LOG(1, "INFO: Received shutdown from controller ..."); // only allow for shutdown when there are no more jobs in the queue if (!JobsQueue.isJobPresent()) { // we shutdown? Hence, also shutdown controller LaunchNewAcceptor = !shutdownAllSockets(); } else { ELOG(2, "There are still jobs waiting in the queue."); } break; } default: Exitflag = ErrorFlag; ELOG(1, "ControllerListener_t::handle_ReadChoice() - called with no valid choice."); break; } // restore NoControllerOperation choice such that choice is not read twice choice = NoControllerOperation; if (LaunchNewAcceptor) { LOG(1, "Launching new acceptor on socket."); // Start an accept operation for a new Connection. initiateSocket(); } } else { // An error occurred. Log it and return. Since we are not starting a new // accept operation the io_service will run out of work to do and the // server will exit. Exitflag = ErrorFlag; ELOG(0, e.message()); } } /** Controller callback function when job has been sent. * * We check here whether the worker socket is accepting, if there * have been no jobs we re-activate it, as it is shut down after * last job. * * \param e error code if something went wrong * \param conn reference with the connection */ void FragmentScheduler::ControllerListener_t::handle_ReceiveJobs(const boost::system::error_code& e, connection_ptr conn) { Info info(__FUNCTION__); // jobs are received, hence place in JobsQueue if (!jobs.empty()) { LOG(1, "INFO: Pushing " << jobs.size() << " jobs into queue."); JobsQueue.pushJobs(jobs); } jobs.clear(); } /** Controller callback function when checking on state of results. * * \param e error code if something went wrong * \param conn reference with the connection */ void FragmentScheduler::ControllerListener_t::handle_CheckResultState(const boost::system::error_code& e, connection_ptr conn) { Info info(__FUNCTION__); // do nothing LOG(1, "INFO: Sent that " << jobInfo << " jobs are (scheduled, done)."); } /** Controller callback function when checking on state of results. * * \param e error code if something went wrong * \param conn reference with the connection */ void FragmentScheduler::ControllerListener_t::handle_GetNextJobIdState(const boost::system::error_code& e, connection_ptr conn) { Info info(__FUNCTION__); std::vector nextids( NumberIds, JobId::IllegalJob); std::generate(nextids.begin(), nextids.end(), boost::bind(&GlobalJobId::getNextId, boost::ref(globalId))); LOG(1, "INFO: Sending next available job ids " << nextids << " to controller ..."); conn->async_write(nextids, boost::bind(&FragmentScheduler::ControllerListener_t::handle_SendIds, this, boost::asio::placeholders::error, conn)); } /** Controller callback function when free job ids have been sent. * * \param e error code if something went wrong * \param conn reference with the connection */ void FragmentScheduler::ControllerListener_t::handle_SendIds(const boost::system::error_code& e, connection_ptr conn) { Info info(__FUNCTION__); // do nothing LOG(1, "INFO: Ids have been sent."); } /** Controller callback function when result has been received. * * \param e error code if something went wrong * \param conn reference with the connection */ void FragmentScheduler::ControllerListener_t::handle_SendResults(const boost::system::error_code& e, connection_ptr conn) { Info info(__FUNCTION__); // do nothing LOG(1, "INFO: Results have been sent."); } /** Helper function to send a job to worker. * * Note that we do not set the worker as busy. We simply send it the job. * * @param address address of worker * @param job job to send */ void FragmentScheduler::sendJobToWorker(const WorkerAddress &address, FragmentJob::ptr &job) { ASSERT( pool.isWorkerBusy(address), "FragmentScheduler::sendJobToWorker() - Worker "+toString(address)+" is not marked as busy."); LOG(1, "INFO: Sending job " << job->getId() << " to worker " << address << "."); // create op, sign on, and hand over to queue AsyncOperation *sendJobOp = new SendJobToWorkerOperation(connection,job); OpQueue.push_back(sendJobOp, address); } /** Helper function to shutdown a single worker. * * We send NoJob to indicate shutdown * * @param address of worker to shutdown */ void FragmentScheduler::shutdownWorker(const WorkerAddress &address) { ASSERT( !pool.isWorkerBusy(address), "FragmentScheduler::sendJobToWorker() - Worker "+toString(address)+" is already busy."); LOG(2, "INFO: Shutting down worker " << address << "..."); AsyncOperation *shutdownWorkerOp = new ShutdownWorkerOperation(connection); OpQueue.push_back(shutdownWorkerOp, address); } /** Sends shutdown to all current workers in the pool. * */ void FragmentScheduler::removeAllWorkers() { // first, sign off such that no new jobs are given to workers pool.signOff(this, WorkerPool::WorkerIdle); LOG(2, "DEBUG: Waiting for busy workers to finish ..."); while (pool.hasBusyWorkers()) ; LOG(2, "INFO: Shutting down workers ..."); // iterate until there are no more idle workers do { // get list of all idle workers typedef std::vector > WorkerList_t; WorkerList_t WorkerList = pool.getListOfIdleWorkers(); // give all workers shutdown signal for (WorkerList_t::const_iterator iter = WorkerList.begin(); iter != WorkerList.end(); ++iter) shutdownWorker(WorkerAddress(iter->first, iter->second)); // wait for pending shutdown operations while (!OpQueue.empty()) ; } while (pool.presentIdleWorkers()); pool.removeAllWorkers(); } /** Helper function to shutdown the server properly. * * \todo one should idle here until all workers have returned from * calculating stuff (or workers need to still listen while they are * calculating which is probably better). * * \note We only shutdown when there are no workers left * * @return true - doing shutdown, false - precondition not met, not shutting down */ bool FragmentScheduler::shutdown() { if (!pool.presentIdleWorkers() && !pool.hasBusyWorkers()) { LOG(1, "INFO: Shutting all down ..."); /// close the worker listener's socket WorkerListener.closeSocket(); /// close the controller listener's socket ControllerListener.closeSocket(); /// finally, stop the io_service io_service.stop(); return true; } else { ELOG(2, "There are still idle or busy workers present."); return false; } } /** Internal helper to send the next available job to the next idle worker. * */ void FragmentScheduler::sendAvailableJobToNextIdleWorker() { const WorkerAddress address = pool.getNextIdleWorker(); FragmentJob::ptr job = JobsQueue.popJob(); sendJobToWorker(address, job); } void FragmentScheduler::update(Observable *publisher) { ASSERT(0, "FragmentScheduler::update() - we are not signed on for global updates."); } void FragmentScheduler::recieveNotification(Observable *publisher, Notification_ptr notification) { if ((publisher == &pool) && (notification->getChannelNo() == WorkerPool::WorkerIdle)) { // we have an idle worker LOG(1, "INFO: We are notified of an idle worker."); // are jobs available? if (JobsQueue.isJobPresent()) { sendAvailableJobToNextIdleWorker(); } } else if ((publisher == &JobsQueue) && (notification->getChannelNo() == FragmentQueue::JobAdded)) { // we have new jobs LOG(1, "INFO: We are notified of a new job."); // check for idle workers if (pool.presentIdleWorkers()) { sendAvailableJobToNextIdleWorker(); } } else { ASSERT(0, "FragmentScheduler::recieveNotification() - we are not signed on for updates in channel " +toString(notification->getChannelNo())+"."); } } void FragmentScheduler::subjectKilled(Observable *publisher) {}