/* * Project: MoleCuilder * Description: creates and alters molecular systems * Copyright (C) 2011 University of Bonn. All rights reserved. * Please see the LICENSE file or "Copyright notice" in builder.cpp for details. */ /* * \file controller.cpp * * This file strongly follows the Serialization example from the boost::asio * library (see client.cpp) * * Created on: Nov 27, 2011 * Author: heber */ // include config.h #ifdef HAVE_CONFIG_H #include #endif // boost asio needs specific operator new #include #include "CodePatterns/MemDebug.hpp" #include #include #include #include #include #include #include #include #include #include "Fragmentation/Automation/atexit.hpp" #include "CodePatterns/Info.hpp" #include "CodePatterns/Log.hpp" #include "Fragmentation/EnergyMatrix.hpp" #include "Fragmentation/ForceMatrix.hpp" #include "Fragmentation/KeySetsContainer.hpp" #include "FragmentController.hpp" #include "Helpers/defs.hpp" #include "Jobs/MPQCCommandJob.hpp" #include "Jobs/MPQCCommandJob_MPQCData.hpp" #include "Fragmentation/Automation/Jobs/SystemCommandJob.hpp" #include "Fragmentation/Automation/Results/FragmentResult.hpp" #include "ControllerOptions_MPQCCommandJob.hpp" /** Print the status of scheduled and done jobs. * * @param status pair of number of schedule and done jobs */ void printJobStatus(const std::pair &JobStatus) { LOG(1, "INFO: #" << JobStatus.first << " are waiting in the queue and #" << JobStatus.second << " jobs are calculated so far."); } /** Creates a SystemCommandJob out of give \a command with \a argument. * * @param jobs created job is added to this vector * @param command command to execute for SystemCommandJob * @param argument argument for command to execute * @param nextid id for this job */ void createjobs( std::vector &jobs, const std::string &command, const std::string &argument, const JobId_t nextid) { FragmentJob::ptr testJob( new SystemCommandJob(command, argument, nextid) ); jobs.push_back(testJob); LOG(1, "INFO: Added one SystemCommandJob."); } /** Creates a MPQCCommandJob with argument \a filename. * * @param jobs created job is added to this vector * @param command mpqc command to execute * @param filename filename being argument to job * @param nextid id for this job */ void parsejob( std::vector &jobs, const std::string &command, const std::string &filename, const JobId_t nextid) { std::ifstream file; file.open(filename.c_str()); ASSERT( file.good(), "parsejob() - file "+filename+" does not exist."); std::string output((std::istreambuf_iterator(file)), std::istreambuf_iterator()); FragmentJob::ptr testJob( new MPQCCommandJob(output, nextid, command) ); jobs.push_back(testJob); file.close(); LOG(1, "INFO: Added MPQCCommandJob from file "+filename+"."); } /** Print received results. * * @param results received results to print */ void printReceivedResults(const std::vector &results) { for (std::vector::const_iterator iter = results.begin(); iter != results.end(); ++iter) LOG(1, "RESULT: job #"+toString((*iter)->getId())+": "+toString((*iter)->result)); } /** Print MPQCData from received results. * * @param results received results to extract MPQCData from * @param KeySetFilename filename with keysets to associate forces correctly * @param NoAtoms total number of atoms */ bool printReceivedMPQCResults( const std::vector &results, const std::string &KeySetFilename, size_t NoAtoms) { EnergyMatrix Energy; EnergyMatrix EnergyFragments; ForceMatrix Force; ForceMatrix ForceFragments; KeySetsContainer KeySet; // align fragments std::map< JobId_t, size_t > MatrixNrLookup; size_t FragmentCounter = 0; { // bring ids in order ... typedef std::map< JobId_t, FragmentResult::ptr> IdResultMap_t; IdResultMap_t IdResultMap; for (std::vector::const_iterator iter = results.begin(); iter != results.end(); ++iter) { #ifndef NDEBUG std::pair< IdResultMap_t::iterator, bool> inserter = #endif IdResultMap.insert( make_pair((*iter)->getId(), *iter) ); ASSERT( inserter.second, "printReceivedMPQCResults() - two results have same id " +toString((*iter)->getId())+"."); } // ... and fill lookup for(IdResultMap_t::const_iterator iter = IdResultMap.begin(); iter != IdResultMap.end(); ++iter) MatrixNrLookup.insert( make_pair(iter->first, FragmentCounter++) ); } LOG(1, "INFO: There are " << FragmentCounter << " fragments."); // extract results std::vector fragmentData(results.size()); MPQCData combinedData; LOG(2, "DEBUG: Parsing now through " << results.size() << " results."); for (std::vector::const_iterator iter = results.begin(); iter != results.end(); ++iter) { LOG(1, "RESULT: job #"+toString((*iter)->getId())+": "+toString((*iter)->result)); MPQCData extractedData; std::stringstream inputstream((*iter)->result); LOG(2, "DEBUG: First 50 characters FragmentResult's string: "+(*iter)->result.substr(0, 50)); boost::archive::text_iarchive ia(inputstream); ia >> extractedData; LOG(1, "INFO: extracted data is " << extractedData << "."); // place results into EnergyMatrix ... { MatrixContainer::MatrixArray matrix; matrix.resize(1); matrix[0].resize(1, extractedData.energy); if (!Energy.AddMatrix( std::string("MPQCJob ")+toString((*iter)->getId()), matrix, MatrixNrLookup[(*iter)->getId()])) { ELOG(1, "Adding energy matrix failed."); return false; } } // ... and ForceMatrix (with two empty columns in front) { MatrixContainer::MatrixArray matrix; const size_t rows = extractedData.forces.size(); matrix.resize(rows); for (size_t i=0;igetId()), matrix, MatrixNrLookup[(*iter)->getId()])) { ELOG(1, "Adding force matrix failed."); return false; } } } // add one more matrix (not required for energy) MatrixContainer::MatrixArray matrix; matrix.resize(1); matrix[0].resize(1, 0.); if (!Energy.AddMatrix(std::string("MPQCJob total"), matrix, FragmentCounter)) return false; // but for energy because we need to know total number of atoms matrix.resize(NoAtoms); for (size_t i = 0; i< NoAtoms; ++i) matrix[i].resize(2+NDIM, 0.); if (!Force.AddMatrix(std::string("MPQCJob total"), matrix, FragmentCounter)) return false; // combine all found data if (!Energy.InitialiseIndices()) return false; if (!Force.ParseIndices(KeySetFilename.c_str())) return false; if (!KeySet.ParseKeySets(KeySetFilename.c_str(), Force.RowCounter, Force.MatrixCounter)) return false; if (!KeySet.ParseManyBodyTerms()) return false; if (!EnergyFragments.AllocateMatrix(Energy.Header, Energy.MatrixCounter, Energy.RowCounter, Energy.ColumnCounter)) return false; if (!ForceFragments.AllocateMatrix(Force.Header, Force.MatrixCounter, Force.RowCounter, Force.ColumnCounter)) return false; if(!Energy.SetLastMatrix(0., 0)) return false; if(!Force.SetLastMatrix(0., 2)) return false; for (int BondOrder=0;BondOrder jobs; createjobs(jobs, ControllerInfo.executable, ControllerInfo.jobcommand, next_id); controller.addJobs(jobs); controller.sendJobs(ControllerInfo.server, ControllerInfo.serverport); } /** Creates a MPQCCommandJob out of give \a command with \a argument. * * @param controller reference to controller to add jobs * @param ControllerInfo information on the job */ void AddJobs(FragmentController &controller, const ControllerOptions_MPQCCommandJob &ControllerInfo) { std::vector jobs; for (std::vector< std::string >::const_iterator iter = ControllerInfo.jobfiles.begin(); iter != ControllerInfo.jobfiles.end(); ++iter) { const JobId_t next_id = controller.getAvailableId(); const std::string &filename = *iter; LOG(1, "INFO: Creating MPQCCommandJob with filename '" +filename+"', and id "+toString(next_id)+"."); parsejob(jobs, ControllerInfo.executable, filename, next_id); } controller.addJobs(jobs); controller.sendJobs(ControllerInfo.server, ControllerInfo.serverport); } int main(int argc, char* argv[]) { // from this moment on, we need to be sure to deeinitialize in the correct order // this is handled by the cleanup function atexit(cleanUp); size_t Exitflag = 0; ControllerOptions_MPQCCommandJob ControllerInfo; boost::asio::io_service io_service; FragmentController controller(io_service); boost::program_options::variables_map vm; typedef boost::function ControllerCommand; typedef std::map > CommandsMap_t; CommandsMap_t CommandsMap; // prepare the command queues for each ControllerCommand // note: we need "< ControllerCommand >" because parseExecutable(),... return int // in contrast to other functions that return void std::deque addjobsQueue = boost::assign::list_of< ControllerCommand > (boost::bind(&FragmentController::requestIds, boost::ref(controller), boost::cref(ControllerInfo.server), boost::cref(ControllerInfo.serverport), boost::bind(&std::vector::size, boost::cref(ControllerInfo.jobfiles)))) (boost::bind(&AddJobs, boost::ref(controller), boost::cref(ControllerInfo))) ; std::deque createjobsQueue = boost::assign::list_of< ControllerCommand > (boost::bind(&FragmentController::requestIds, boost::ref(controller), boost::cref(ControllerInfo.server), boost::cref(ControllerInfo.serverport), 1)) (boost::bind(&creatingJob, boost::ref(controller), boost::cref(ControllerInfo))) ; std::deque checkresultsQueue = boost::assign::list_of< ControllerCommand > (boost::bind(&FragmentController::checkResults, boost::ref(controller), boost::cref(ControllerInfo.server), boost::cref(ControllerInfo.serverport))) (boost::bind(&printJobStatus, boost::bind(&FragmentController::getJobStatus, boost::ref(controller)))) ; std::deque receiveresultsQueue = boost::assign::list_of< ControllerCommand > (boost::bind(&FragmentController::receiveResults, boost::ref(controller), boost::cref(ControllerInfo.server), boost::cref(ControllerInfo.serverport))) (boost::bind(&printReceivedResults, boost::bind(&FragmentController::getReceivedResults, boost::ref(controller)))) ; std::deque receivempqcresultsQueue = boost::assign::list_of< ControllerCommand > (boost::bind(&FragmentController::receiveResults, boost::ref(controller), boost::cref(ControllerInfo.server), boost::cref(ControllerInfo.serverport))) (boost::bind(&printReceivedMPQCResults, boost::bind(&FragmentController::getReceivedResults, boost::ref(controller)), boost::cref(ControllerInfo.fragmentpath), boost::bind(&getNoAtomsFromAdjacencyFile, boost::cref(ControllerInfo.fragmentpath)))) ; std::deque removeallQueue = boost::assign::list_of< ControllerCommand > (boost::bind(&FragmentController::removeall, boost::ref(controller), boost::cref(ControllerInfo.server), boost::cref(ControllerInfo.serverport))) ; std::deque shutdownQueue = boost::assign::list_of< ControllerCommand > (boost::bind(&FragmentController::shutdown, boost::ref(controller), boost::cref(ControllerInfo.server), boost::cref(ControllerInfo.serverport))) ; CommandsMap.insert( std::make_pair("addjobs", addjobsQueue) ); CommandsMap.insert( std::make_pair("createjobs", createjobsQueue) ); CommandsMap.insert( std::make_pair("checkresults", checkresultsQueue) ); CommandsMap.insert( std::make_pair("receiveresults", receiveresultsQueue) ); CommandsMap.insert( std::make_pair("receivempqc", receivempqcresultsQueue) ); CommandsMap.insert( std::make_pair("removeall", removeallQueue) ); CommandsMap.insert( std::make_pair("shutdown", shutdownQueue) ); std::vector Commands; for (CommandsMap_t::const_iterator iter = CommandsMap.begin(); iter != CommandsMap.end(); ++iter) Commands.push_back(iter->first); // Declare the supported options. boost::program_options::options_description desc("Allowed options"); desc.add_options() ("help,h", "produce help message") ("verbosity,v", boost::program_options::value(), "set verbosity level") ("server", boost::program_options::value< std::string >(), "connect to server at this address (host:port)") ("command", boost::program_options::value< std::string >(), (std::string("command to send to server: ")+toString(Commands)).c_str()) ("executable", boost::program_options::value< std::string >(), "executable for commands 'addjobs' and 'createjobs'") ("fragment-path", boost::program_options::value< std::string >(), "path to fragment files for 'receivempqc'") ("jobcommand", boost::program_options::value< std::string >(), "command argument for executable for 'createjobs'") ("jobfiles", boost::program_options::value< std::vector< std::string > >()->multitoken(), "list of files as single argument to executable for 'addjobs'") ; // parse command line boost::program_options::store(boost::program_options::parse_command_line(argc, argv, desc), vm); boost::program_options::notify(vm); // set controller information int status = 0; status = ControllerInfo.parseHelp(vm, desc); if (status) return status; status = ControllerInfo.parseVerbosity(vm); if (status) return status; status = ControllerInfo.parseServerPort(vm); if (status) return status; status = ControllerInfo.parseCommand(vm, Commands); if (status) return status; // all later parse functions depend on parsed command status = ControllerInfo.parseExecutable(vm); if (status) return status; status = ControllerInfo.parseJobCommand(vm); if (status) return status; status = ControllerInfo.parseFragmentpath(vm); if (status) return status; status = ControllerInfo.parseJobfiles(vm); if (status) return status; // parse given ControllerCommand if( CommandsMap.count(ControllerInfo.command) == 0) { ELOG(1, "Unrecognized command '"+toString(ControllerInfo.command)+"'."); return 255; } std::deque &commands = CommandsMap[ControllerInfo.command]; try { // execute each command in the queue synchronously size_t phase = 1; while (!commands.empty()) { ControllerCommand command = commands.front(); commands.pop_front(); command(); { io_service.reset(); Info info((std::string("io_service: ")+toString(phase)).c_str()); io_service.run(); } } Exitflag = controller.getExitflag(); } catch (std::exception& e) { std::cerr << e.what() << std::endl; } return Exitflag; }