/* * Project: MoleCuilder * Description: creates and alters molecular systems * Copyright (C) 2011 University of Bonn. All rights reserved. * Please see the LICENSE file or "Copyright notice" in builder.cpp for details. */ /* * FragmentQueue.cpp * * Created on: Oct 19, 2011 * Author: heber */ // include config.h #ifdef HAVE_CONFIG_H #include #endif #include "CodePatterns/MemDebug.hpp" #include "FragmentQueue.hpp" #include "CodePatterns/Assert.hpp" #include "CodePatterns/Log.hpp" FragmentResult::ptr FragmentQueue::NoResult( new FragmentResult(-1) ); FragmentResult::ptr FragmentQueue::NoResultQueued( new FragmentResult(-2) ); FragmentResult::ptr FragmentQueue::ResultDelivered( new FragmentResult(-3) ); size_t FragmentQueue::Max_Attempts = (size_t)1; /** Constructor for class FragmentQueue. * */ FragmentQueue::FragmentQueue() {} /** Destructor for class FragmentQueue. * */ FragmentQueue::~FragmentQueue() { jobs.clear(); results.clear(); } /** Checks whether there are jobs in the queue at all. * \return true - jobs present, false - queue is empty */ bool FragmentQueue::isJobPresent() const { return !jobs.empty(); } /** Pushes a FragmentJob into the internal queue for delivery to server. * * \note we throw assertion when jobid has already been used. * * \param job job to enter into queue */ void FragmentQueue::pushJob(FragmentJob::ptr job) { ASSERT(job->getId() != JobId::IllegalJob, "FragmentQueue::pushJob() - job to push has IllegalJob id."); ASSERT(!results.count(job->getId()), "FragmentQueue::pushJob() - job id "+toString(job->getId())+" has already been used."); results.insert( std::make_pair(job->getId(), NoResult)); jobs.push_back(job); } /** Pushes a bunch of FragmentJob's into the internal queue for delivery to server. * * \note we throw assertion when jobid has already been used. * * \sa pushJob() * * \param _jobs jobs to enter into queue */ void FragmentQueue::pushJobs(std::vector &_jobs) { for (std::vector::iterator iter = _jobs.begin(); iter != _jobs.end(); ++iter) pushJob(*iter); } /** Pops top-most FragmentJob from internal queue. * * From here on, we expect a result in FragmentQueue::results. * * \return job topmost job from queue */ FragmentJob::ptr FragmentQueue::popJob() { ASSERT(jobs.size(), "FragmentQueue::popJob() - there are no jobs on the queue."); FragmentJob::ptr job = jobs.front(); #ifndef NDEBUG std::pair< BackupMap::iterator, bool> inserter = #endif backup.insert( std::make_pair( job->getId(), job )); ASSERT (inserter.second, "FragmentQueue::popJob() - job "+toString(job->getId())+ " is already in the backup."); ResultMap::iterator iter = results.find(job->getId()); ASSERT(iter != results.end(), "FragmentQueue::popJob() - for job "+toString(job->getId())+" no result place has been stored."); iter->second = NoResultQueued; jobs.pop_front(); return job; } /** Internal function to check whether result is not one of static entities. * * @param result result to check against * @return true - result is a present, valid result, false - result is one of the statics */ bool FragmentQueue::isPresentResult(const FragmentResult::ptr result) const { return (*result != *NoResult) && (*result != *NoResultQueued) && (*result != *ResultDelivered); } /** Queries whether a job has already been finished and the result is present. * * \param jobid id of job to query * \return true - result is present, false - result is not present */ bool FragmentQueue::isResultPresent(JobId_t jobid) const { ResultMap::const_iterator iter = results.find(jobid); return ((iter != results.end()) && isPresentResult(iter->second)); } /** Counts the number of jobs for which we have a calculated result present. * * \return number of calculated results */ size_t FragmentQueue::getDoneJobs() const { size_t doneJobs = 0; for (ResultMap::const_iterator iter = results.begin(); iter != results.end(); ++iter) if (isPresentResult(iter->second)) ++doneJobs; return doneJobs; } /** Counts the number of jobs for which still have to be calculated. * * \return number of jobs to be calculated */ size_t FragmentQueue::getPresentJobs() const { const size_t presentJobs = jobs.size(); return presentJobs; } /** Delivers result for a finished job. * * \note we throw assertion if not present * * \param jobid id of job * \return result for job of given \a jobid */ FragmentResult::ptr FragmentQueue::getResult(JobId_t jobid) { ResultMap::iterator iter = results.find(jobid); ASSERT(iter != results.end(), "FragmentQueue::pushResult() - job "+toString(jobid)+" is not known to us."); ASSERT(*iter->second != *NoResult, "FragmentQueue::pushResult() - job "+toString(jobid)+" has not been request for calculation yet."); ASSERT(*iter->second != *NoResultQueued, "FragmentQueue::pushResult() - job "+toString(jobid)+"'s calculation is underway but not result has arrived yet."); ASSERT(*iter->second != *ResultDelivered, "FragmentQueue::pushResult() - job "+toString(jobid)+"'s result has already been delivered."); /// store result FragmentResult::ptr _result = iter->second; /// mark as delivered in map iter->second = ResultDelivered; /// and return result return _result; } std::vector FragmentQueue::getAllResults() { std::vector returnresults; for (ResultMap::iterator iter = results.begin(); iter != results.end(); ++iter) { if (isPresentResult(iter->second)) { returnresults.push_back(getResult(iter->first)); iter = results.begin(); } } return returnresults; } /** Pushes a result for a finished job. * * \note we throw assertion if job already has result or is not known. * * \param result result of job to store */ void FragmentQueue::pushResult(FragmentResult::ptr &_result) { const JobId_t id = _result->getId(); /// check for presence ResultMap::iterator iter = results.find(id); ASSERT(iter != results.end(), "FragmentQueue::pushResult() - job "+toString(id)+" is not known to us."); ASSERT(*iter->second == *NoResultQueued, "FragmentQueue::pushResult() - is not waiting for the result of job "+toString(id)+"."); // check whether this is a resubmitted job AttemptsMap::iterator attemptiter = attempts.find(id); // check whether succeeded or (finally) failed if ((_result->exitflag == 0) || ((attemptiter != attempts.end()) && (attemptiter->second >= Max_Attempts))) { // give notice if it is resubmitted job if (attemptiter != attempts.end()) { if (attemptiter->second >= Max_Attempts) ELOG(1, "Job #" << id << " failed on " << Max_Attempts << "th attempt for the last time."); else LOG(1, "INFO: Job #" << id << " succeeded on " << attemptiter->second << "th attempt."); } // remove in attempts if (attemptiter != attempts.end()) attempts.erase(attemptiter); // remove in backup map BackupMap::iterator backupiter = backup.find(id); ASSERT( backupiter != backup.end(), "FragmentQueue::pushResult() - cannot find job "+toString(id) +" in backup."); backup.erase(backupiter); /// and overwrite NoResult in found entry iter->second = _result; } else { LOG(1, "Job " << id << " failed, resubmitting."); // increase attempts if (attemptiter != attempts.end()) ++(attemptiter->second); else attempts.insert( std::make_pair(id, (size_t)1) ); // resubmit job resubmitJob(id); } } /** Resubmit a job which a worker failed to calculate. * * @param jobid id of the failed job */ void FragmentQueue::resubmitJob(const JobId_t jobid) { BackupMap::iterator iter = backup.find(jobid); ASSERT( iter != backup.end(), "FragmentQueue::resubmitJob() - job id "+toString(jobid) +" not stored in backup."); if (iter != backup.end()) { // remove result ResultMap::iterator resiter = results.find(jobid); ASSERT( resiter != results.end(), "FragmentQueue::resubmitJob() - resubmitting job "+toString(jobid) +" for which no result is present."); results.erase(resiter); pushJob(iter->second); backup.erase(iter); } }