/* * Project: MoleCuilder * Description: creates and alters molecular systems * Copyright (C) 2012 University of Bonn. All rights reserved. * Please see the LICENSE file or "Copyright notice" in builder.cpp for details. */ /* * \file PoolWorker.cpp * * This file strongly follows the Serialization example from the boost::asio * library (see client.cpp). * * Created on: Feb 28, 2012 * Author: heber */ // include config.h #ifdef HAVE_CONFIG_H #include #endif // boost asio needs specific operator new #include #include "CodePatterns/MemDebug.hpp" #include #include #include #include #include "Connection.hpp" // Must come before boost/serialization headers. #include #include "CodePatterns/Info.hpp" #include "CodePatterns/Log.hpp" #include "Jobs/FragmentJob.hpp" #include "Jobs/MPQCCommandJob.hpp" #include "Jobs/SystemCommandJob.hpp" #include "Operations/Workers/EnrollInPoolOperation.hpp" #include "Operations/Workers/RemoveFromPoolOperation.hpp" #include "Operations/Workers/SubmitResultOperation.hpp" #include "Results/FragmentResult.hpp" #include "PoolWorker.hpp" /** Helper function to enforce binding of PoolWorker to possible derived * FragmentJob classes. */ void dummyInit() { SystemCommandJob("/bin/false", "something", JobId::IllegalJob); MPQCCommandJob("nofile", JobId::IllegalJob); } /** Constructor for class PoolWorker. * * We automatically connect to the given pool and enroll. * * @param _io_service io service for creating connections * @param _host host part of MyAddress of pool to connect to * @param _service service part of MyAddress of pool to connect to * @param listenhost host part of MyAddress of this instance for listening to pool connections * @param listenservice seervice part of MyAddress of this instance for listening to pool connections */ PoolWorker::PoolWorker( boost::asio::io_service& _io_service, const std::string& _host, const std::string& _service, const std::string& listenhost, const std::string& listenservice) : io_service(_io_service), PoolListener(_io_service, boost::lexical_cast(listenservice), *this), MyAddress(listenhost, listenservice), ServerAddress(_host, _service), connection_(_io_service), failed(boost::bind(&ExitflagContainer::setExitflag, this, ExitflagContainer::ErrorFlag)) { Info info(__FUNCTION__); // always enroll and make listenining initiation depend on its success const boost::function initiateme = boost::bind(&PoolListener_t::initiateSocket, boost::ref(PoolListener)); AsyncOperation *enrollOp = new EnrollInPoolOperation(connection_, MyAddress, initiateme, failed); LOG(2, "DEBUG: Putting enroll in pool operation into queue ..."); OpQueue.push_back(enrollOp, ServerAddress); } /// Handle completion of a accept server operation. void PoolWorker::PoolListener_t::handle_Accept(const boost::system::error_code& e, connection_ptr conn) { Info info(__FUNCTION__); if (!e) { conn->async_read(job, boost::bind(&PoolWorker::PoolListener_t::handle_ReceiveJob, this, boost::asio::placeholders::error, conn)); // and listen for following connections initiateSocket(); } else { // An error occurred. Log it and return. Since we are not starting a new // accept operation the io_service will run out of work to do and the // server will exit. Exitflag = ErrorFlag; ELOG(0, e.message()); } } /// Controller callback function when job has been sent. void PoolWorker::PoolListener_t::handle_ReceiveJob(const boost::system::error_code& e, connection_ptr conn) { Info info(__FUNCTION__); if (!e) { if (job->getId() != JobId::NoJob) { LOG(1, "INFO: Working on job " << job->getId() << "."); callback.WorkOnJob(job); } else { LOG(1, "INFO: Received NoJob."); callback.shutdown(); } } else { // An error occurred. Log it and return. Since we are not starting a new // accept operation the io_service will run out of work to do and the // server will exit. Exitflag = ErrorFlag; ELOG(0, e.message()); } } /** Works on the given job and send the results. * * @param job job to work on */ void PoolWorker::WorkOnJob(FragmentJob::ptr &job) { // work on job and create result LOG(2, "DEBUG: Beginning to work on " << job->getId() << "."); FragmentResult::ptr result = job->Work(); LOG(2, "DEBUG: Setting result " << result->getId() << "."); AsyncOperation *submitOp = new SubmitResultOperation(connection_, MyAddress, AsyncOperation::NoOpCallback, failed); static_cast(submitOp)->setResult(result); // submit result LOG(2, "DEBUG: Putting send result operation into queue ..."); OpQueue.push_back(submitOp, ServerAddress); } /** Wrapper function to allow use as signal handler. * * We remove us from server's pool and then just call \sa PoolWorker::shutdown(). * * @param sig signal received */ void PoolWorker::shutdown(int sig) { LOG(1, "INFO: Shutting down due to signal "+toString(sig)+"."); shutdown(); } /** Helper function to shutdown the worker properly. * * Note that we will use RemoveFromPoolOperation to unlist from server's pool. */ void PoolWorker::shutdown() { // remove us from pool boost::function closingdown = boost::bind(&PoolWorker::finish, this); AsyncOperation *removeOp = new RemoveFromPoolOperation(connection_, MyAddress, closingdown, failed); LOG(2, "DEBUG: Putting remove from pool operation into queue ..."); OpQueue.push_back(removeOp, ServerAddress); // block queue such that io_service may stop OpQueue.block(); } /** Helper function to close down listener and stop service. * * This is called after we have been removed from server's pool * We stop the io_service via its callback handler in case of success. */ void PoolWorker::finish() { // somehow stop listener PoolListener.closeSocket(); // finally, stop io_service io_service.stop(); }