/* * Project: MoleCuilder * Description: creates and alters molecular systems * Copyright (C) 2011 University of Bonn. All rights reserved. * Please see the LICENSE file or "Copyright notice" in builder.cpp for details. */ /* * \file controller.cpp * * This file strongly follows the Serialization example from the boost::asio * library (see client.cpp) * * Created on: Nov 27, 2011 * Author: heber */ // include config.h #ifdef HAVE_CONFIG_H #include #endif // boost asio needs specific operator new #include #include "CodePatterns/MemDebug.hpp" #include #include #include #include #include #include #include #include #include "atexit.hpp" #include "CodePatterns/Info.hpp" #include "CodePatterns/Log.hpp" #include "Controller/FragmentController.hpp" #include "Controller/Commands/CheckResultsOperation.hpp" #include "Controller/Commands/GetNextJobIdOperation.hpp" #include "Controller/Commands/ReceiveJobsOperation.hpp" #include "Controller/Commands/SendResultsOperation.hpp" #include "Controller/Commands/ShutdownOperation.hpp" #include "Jobs/MPQCCommandJob.hpp" #include "Jobs/MPQCCommandJob_MPQCData.hpp" #include "Jobs/SystemCommandJob.hpp" #include "Results/FragmentResult.hpp" enum CommandIndices { UnknownCommandIndex = 0, AddJobsIndex = 1, CreateJobsIndex = 2, CheckResultsIndex = 3, ReceiveResultsIndex = 4, ReceiveMPQCIndex = 5, ShutdownIndex = 6, }; /** Requests an available id from server * * @param controller FragmentController with CommandRegistry * @param host address of server * @param service port/service of server */ void requestid( FragmentController &controller, const std::string &host, const std::string &service) { GetNextJobIdOperation *getnextid = static_cast( controller.Commands.getByName("getnextjobid")); (*getnextid)(host,service); } /** Returns another available id from a finished GetNextJobIdOperation. * * @param controller FragmentController with CommandRegistry * @return next available id */ JobId_t getavailableid(FragmentController &controller) { GetNextJobIdOperation *getnextid = static_cast( controller.Commands.getByName("getnextjobid")); const JobId_t nextid = getnextid->getNextId(); LOG(1, "INFO: Next available id is " << nextid << "."); return nextid; } void createjobs( std::vector &jobs, const std::string &command, const std::string &argument, const JobId_t nextid) { FragmentJob::ptr testJob( new SystemCommandJob(command, argument, nextid) ); jobs.push_back(testJob); LOG(1, "INFO: Added one SystemCommandJob."); } /** Creates a MPQCCommandJob with argument \a filename. * * @param jobs created job is added to this vector * @param filename filename being argument to job * @param nextid id for this job */ void parsejob( std::vector &jobs, const std::string &filename, const JobId_t nextid) { std::ifstream file; file.open(filename.c_str()); ASSERT( file.good(), "parsejob() - file "+filename+" does not exist."); std::string output((std::istreambuf_iterator(file)), std::istreambuf_iterator()); FragmentJob::ptr testJob( new MPQCCommandJob(output, nextid) ); jobs.push_back(testJob); file.close(); LOG(1, "INFO: Added MPQCCommandJob from file "+filename+"."); } /** Adds a vector of jobs to the send operation. * * @param controller FragmentController with CommandRegistry * @param jobs jobs to add */ void addjobs( FragmentController &controller, std::vector &jobs) { ReceiveJobsOperation *recjobs = static_cast( controller.Commands.getByName("receivejobs")); recjobs->addJobs(jobs); } /** Sends contained jobs in operation to server * * @param controller FragmentController with CommandRegistry * @param host address of server * @param service port/service of server */ void sendjobs( FragmentController &controller, const std::string &host, const std::string &service) { ReceiveJobsOperation *recjobs = static_cast( controller.Commands.getByName("receivejobs")); (*recjobs)(host, service); } /** Obtains scheduled and done jobs from server * * @param controller FragmentController with CommandRegistry * @param host address of server * @param service port/service of server */ void checkresults( FragmentController &controller, const std::string &host, const std::string &service) { CheckResultsOperation *checkres = static_cast( controller.Commands.getByName("checkresults")); (*checkres)(host, service); } /** Prints scheduled and done jobs. * * @param controller FragmentController with CommandRegistry */ void printdonejobs(FragmentController &controller) { CheckResultsOperation *checkres = static_cast( controller.Commands.getByName("checkresults")); const size_t doneJobs = checkres->getDoneJobs(); const size_t presentJobs = checkres->getPresentJobs(); LOG(1, "INFO: #" << presentJobs << " are waiting in the queue and #" << doneJobs << " jobs are calculated so far."); } /** Obtains results from done jobs from server. * * @param controller FragmentController with CommandRegistry * @param host address of server * @param service port/service of server */ void receiveresults( FragmentController &controller, const std::string &host, const std::string &service) { SendResultsOperation *sendres = static_cast( controller.Commands.getByName("sendresults")); (*sendres)(host, service); } /** Print received results. * * @param controller FragmentController with CommandRegistry */ void printreceivedresults(FragmentController &controller) { SendResultsOperation *sendres = static_cast( controller.Commands.getByName("sendresults")); std::vector results = sendres->getResults(); for (std::vector::const_iterator iter = results.begin(); iter != results.end(); ++iter) LOG(1, "RESULT: job #"+toString((*iter)->getId())+": "+toString((*iter)->result)); } /** Print received results. * * @param controller FragmentController with CommandRegistry */ void printreceivedmpqcresults(FragmentController &controller, const std::string &KeySetFilename) { SendResultsOperation *sendres = static_cast( controller.Commands.getByName("sendresults")); std::vector results = sendres->getResults(); // parse in KeySetfile // const size_t MAXBUFFER = 256; std::ifstream inputfile; inputfile.open(KeySetFilename.c_str()); // while (inputfile.getline(buffer, MAXBUFFER)) { // // } inputfile.close(); // combine all found data std::vector fragmentData(results.size()); MPQCData combinedData; LOG(2, "DEBUG: Parsing now through " << results.size() << " results."); for (std::vector::const_iterator iter = results.begin(); iter != results.end(); ++iter) { LOG(1, "RESULT: job #"+toString((*iter)->getId())+": "+toString((*iter)->result)); MPQCData extractedData; std::stringstream inputstream((*iter)->result); boost::archive::text_iarchive ia(inputstream); ia >> extractedData; LOG(1, "INFO: extracted data is " << extractedData << "."); } } /** Sends shutdown signal to server * * @param controller FragmentController with CommandRegistry * @param host address of server * @param service port/service of server */ void shutdown( FragmentController &controller, const std::string &host, const std::string &service) { ShutdownOperation *shutdown = static_cast( controller.Commands.getByName("shutdown")); (*shutdown)(host, service); } /** Returns a unique index for every command to allow switching over it. * * \param &commandmap map with command strings * \param &cmd command string * \return index from CommandIndices: UnkownCommandIndex - unknown command, else - command index */ CommandIndices getCommandIndex(std::map &commandmap, const std::string &cmd) { std::map::const_iterator iter = commandmap.find(cmd); if (iter != commandmap.end()) return iter->second; else return UnknownCommandIndex; } int main(int argc, char* argv[]) { // from this moment on, we need to be sure to deeinitialize in the correct order // this is handled by the cleanup function atexit(cleanUp); setVerbosity(3); size_t Exitflag = 0; typedef std::map CommandsMap_t; CommandsMap_t CommandsMap; CommandsMap.insert( std::make_pair("addjobs", AddJobsIndex) ); CommandsMap.insert( std::make_pair("createjobs", CreateJobsIndex) ); CommandsMap.insert( std::make_pair("checkresults", CheckResultsIndex) ); CommandsMap.insert( std::make_pair("receiveresults", ReceiveResultsIndex) ); CommandsMap.insert( std::make_pair("receivempqc", ReceiveMPQCIndex) ); CommandsMap.insert( std::make_pair("shutdown", ShutdownIndex) ); try { // Check command line arguments. if (argc < 4) { std::cerr << "Usage: " << argv[0] << " [options to command]" << std::endl; std::cerr << "List of available commands:" << std::endl; for(CommandsMap_t::const_iterator iter = CommandsMap.begin(); iter != CommandsMap.end(); ++iter) { std::cerr << "\t" << iter->first << std::endl; } return 1; } boost::asio::io_service io_service; FragmentController controller(io_service); // Initial phase: information gathering from server switch(getCommandIndex(CommandsMap, argv[3])) { case AddJobsIndex: { if (argc < 5) { ELOG(1, "Please add a filename for the MPQCCommandJob."); } else { // get an id for every filename for (int argcount = 4; argcount < argc; ++argcount) { requestid(controller, argv[1], argv[2]); } } break; } case CreateJobsIndex: { std::vector jobs; if (argc < 6) { ELOG(1, "'createjobs' requires two options: [command] [argument]."); } else { requestid(controller, argv[1], argv[2]); } break; } case CheckResultsIndex: break; case ReceiveResultsIndex: break; case ReceiveMPQCIndex: break; case ShutdownIndex: break; case UnknownCommandIndex: default: ELOG(1, "Unrecognized command '"+toString(argv[3])+"'."); break; } { io_service.reset(); Info info("io_service: Phase One"); io_service.run(); } // Second phase: Building jobs and sending information to server switch(getCommandIndex(CommandsMap, argv[3])) { case AddJobsIndex: { std::vector jobs; if (argc < 5) { ELOG(1, "Please add a filename for the MPQCCommandJob."); } else { for (int argcount = 4; argcount < argc; ++argcount) { const JobId_t next_id = getavailableid(controller); const std::string filename(argv[argcount]); LOG(1, "INFO: Creating MPQCCommandJob with filename'" +filename+"', and id "+toString(next_id)+"."); parsejob(jobs, filename, next_id); } addjobs(controller, jobs); sendjobs(controller, argv[1], argv[2]); } break; } case CreateJobsIndex: { std::vector jobs; if (argc < 6) { ELOG(1, "'createjobs' requires two options: [command] [argument]."); } else { const JobId_t next_id = getavailableid(controller); const std::string command(argv[4]); const std::string argument(argv[5]); LOG(1, "INFO: Creating SystemCommandJob with command '" +command+"', argument '"+argument+"', and id "+toString(next_id)+"."); createjobs(jobs, command, argument, next_id); } addjobs(controller, jobs); sendjobs(controller, argv[1], argv[2]); break; } case CheckResultsIndex: { checkresults(controller, argv[1], argv[2]); break; } case ReceiveResultsIndex: { receiveresults(controller, argv[1], argv[2]); break; } case ReceiveMPQCIndex: { receiveresults(controller, argv[1], argv[2]); break; } case ShutdownIndex: { shutdown(controller, argv[1], argv[2]); break; } case UnknownCommandIndex: default: ELOG(0, "Unrecognized command '"+toString(argv[3])+"'."); break; } { io_service.reset(); Info info("io_service: Phase Two"); io_service.run(); } // Final phase: Print result of command switch(getCommandIndex(CommandsMap, argv[3])) { case AddJobsIndex: case CreateJobsIndex: break; case CheckResultsIndex: { printdonejobs(controller); break; } case ReceiveResultsIndex: { printreceivedresults(controller); break; } case ReceiveMPQCIndex: { if (argc == 4) { ELOG(1, "'receivempqc' requires one option: [KeySetFilename]."); } else { const std::string KeySetFilename = argv[4]; LOG(1, "INFO: Parsing id associations from " << KeySetFilename << "."); printreceivedmpqcresults(controller, KeySetFilename); } break; } case ShutdownIndex: break; case UnknownCommandIndex: default: ELOG(0, "Unrecognized command '"+toString(argv[3])+"'."); break; } Exitflag = controller.getExitflag(); } catch (std::exception& e) { std::cerr << e.what() << std::endl; } return Exitflag; }