/* * Project: MoleCuilder * Description: creates and alters molecular systems * Copyright (C) 2012 University of Bonn. All rights reserved. * Please see the LICENSE file or "Copyright notice" in builder.cpp for details. */ /* * \file PoolWorker.cpp * * This file strongly follows the Serialization example from the boost::asio * library (see client.cpp). * * Created on: Feb 28, 2012 * Author: heber */ // include config.h #ifdef HAVE_CONFIG_H #include #endif // boost asio needs specific operator new #include #include "CodePatterns/MemDebug.hpp" #include #include #include #include #include "Connection.hpp" // Must come before boost/serialization headers. #include #include "CodePatterns/Info.hpp" #include "CodePatterns/Log.hpp" #include "Jobs/FragmentJob.hpp" #include "Jobs/MPQCCommandJob.hpp" #include "Jobs/SystemCommandJob.hpp" #include "Results/FragmentResult.hpp" #include "PoolWorker.hpp" /** Helper function to enforce binding of PoolWorker to possible derived * FragmentJob classes. */ void dummyInit() { SystemCommandJob("/bin/false", "something", JobId::IllegalJob); MPQCCommandJob("nofile", JobId::IllegalJob); } /** Constructor for class PoolWorker. * * We automatically connect to the given pool and enroll. * * @param _io_service io service for creating connections * @param _host host part of address of pool to connect to * @param _service service part of address of pool to connect to * @param listenhost host part of address of this instance for listening to pool connections * @param listenservice seervice part of address of this instance for listening to pool connections */ PoolWorker::PoolWorker( boost::asio::io_service& _io_service, const std::string& _host, const std::string& _service, const std::string& listenhost, const std::string& listenservice) : io_service(_io_service), PoolListener(_io_service, boost::lexical_cast(listenservice), *this), address(listenhost, listenservice), connection_(_io_service), enrollOp(connection_, address), submitOp(connection_, address), submitresult(boost::bind(&AsyncOperation::operator(), boost::ref(submitOp), _host, _service)), removeOp(connection_, address), removeme(boost::bind(&Operation::operator(), boost::ref(removeOp), _host, _service)) { Info info(__FUNCTION__); // always enroll enrollOp(_host,_service); // initiate listening PoolListener.initiateSocket(); } /// Handle completion of a accept server operation. void PoolWorker::PoolListener_t::handle_Accept(const boost::system::error_code& e, connection_ptr conn) { Info info(__FUNCTION__); if (!e) { conn->async_read(job, boost::bind(&PoolWorker::PoolListener_t::handle_ReceiveJob, this, boost::asio::placeholders::error, conn)); // and listen for following connections initiateSocket(); } else { // An error occurred. Log it and return. Since we are not starting a new // accept operation the io_service will run out of work to do and the // server will exit. ELOG(0, e.message()); } } /// Controller callback function when job has been sent. void PoolWorker::PoolListener_t::handle_ReceiveJob(const boost::system::error_code& e, connection_ptr conn) { Info info(__FUNCTION__); if (!e) { if (job->getId() != JobId::NoJob) { LOG(1, "INFO: Working on job " << job->getId() << "."); callback.WorkOnJob(job); } else { LOG(1, "INFO: Received NoJob."); callback.shutdown(); } } else { // An error occurred. Log it and return. Since we are not starting a new // accept operation the io_service will run out of work to do and the // server will exit. ELOG(0, e.message()); } } /** Works on the given job and send the results. * * @param job job to work on */ void PoolWorker::WorkOnJob(FragmentJob::ptr &job) { // work on job and create result LOG(2, "DEBUG: Beginning to work on " << job->getId() << "."); FragmentResult::ptr result = job->Work(); LOG(2, "DEBUG: Setting result " << result->getId() << "."); submitOp.setResult(result); // submit result LOG(2, "DEBUG: Sending result ..."); submitresult(); } /** Wrapper function to allow use as signal handler. * * We remove us from server's pool and then just call \sa PoolWorker::shutdown(). * * @param sig signal received */ void PoolWorker::shutdown(int sig) { LOG(1, "INFO: Shutting down due to signal "+toString(sig)+"."); // remove us from pool removeme(); shutdown(); } /** Helper function to shutdown the worker properly. * * Note that we will use ShutdownWorkerOperation to unlist from server's pool. */ void PoolWorker::shutdown() { // somehow stop listener PoolListener.closeSocket(); // finally, stop io_service io_service.stop(); }