| [b5ebb5] | 1 | /* | 
|---|
|  | 2 | * Project: MoleCuilder | 
|---|
|  | 3 | * Description: creates and alters molecular systems | 
|---|
|  | 4 | * Copyright (C)  2011 University of Bonn. All rights reserved. | 
|---|
|  | 5 | * Please see the LICENSE file or "Copyright notice" in builder.cpp for details. | 
|---|
|  | 6 | */ | 
|---|
|  | 7 |  | 
|---|
|  | 8 | /* | 
|---|
|  | 9 | * FragmentQueue.cpp | 
|---|
|  | 10 | * | 
|---|
|  | 11 | *  Created on: Oct 19, 2011 | 
|---|
|  | 12 | *      Author: heber | 
|---|
|  | 13 | */ | 
|---|
|  | 14 |  | 
|---|
|  | 15 | // include config.h | 
|---|
|  | 16 | #ifdef HAVE_CONFIG_H | 
|---|
|  | 17 | #include <config.h> | 
|---|
|  | 18 | #endif | 
|---|
|  | 19 |  | 
|---|
|  | 20 | #include "CodePatterns/MemDebug.hpp" | 
|---|
|  | 21 |  | 
|---|
|  | 22 | #include "FragmentQueue.hpp" | 
|---|
|  | 23 |  | 
|---|
|  | 24 | #include "CodePatterns/Assert.hpp" | 
|---|
| [d6b12c] | 25 | #include "CodePatterns/Observer/Channels.hpp" | 
|---|
| [fe95b7] | 26 | #include "CodePatterns/Log.hpp" | 
|---|
| [b5ebb5] | 27 |  | 
|---|
| [35f587] | 28 | FragmentResult::ptr FragmentQueue::NoResult( new FragmentResult(-1) ); | 
|---|
|  | 29 | FragmentResult::ptr FragmentQueue::NoResultQueued( new FragmentResult(-2) ); | 
|---|
|  | 30 | FragmentResult::ptr FragmentQueue::ResultDelivered( new FragmentResult(-3) ); | 
|---|
| [2344a3] | 31 | size_t FragmentQueue::Max_Attempts = (size_t)3; | 
|---|
| [b5ebb5] | 32 |  | 
|---|
|  | 33 | /** Constructor for class FragmentQueue. | 
|---|
|  | 34 | * | 
|---|
|  | 35 | */ | 
|---|
| [d6b12c] | 36 | FragmentQueue::FragmentQueue() : | 
|---|
|  | 37 | Observable("FragmentQueue") | 
|---|
|  | 38 | { | 
|---|
|  | 39 | // observable stuff | 
|---|
|  | 40 | Channels *OurChannel = new Channels; | 
|---|
|  | 41 | NotificationChannels.insert( std::make_pair(this, OurChannel) ); | 
|---|
|  | 42 | // add instance for each notification type | 
|---|
|  | 43 | for (size_t type = 0; type < NotificationType_MAX; ++type) | 
|---|
|  | 44 | OurChannel->addChannel(type); | 
|---|
|  | 45 | } | 
|---|
| [b5ebb5] | 46 |  | 
|---|
|  | 47 | /** Destructor for class FragmentQueue. | 
|---|
|  | 48 | * | 
|---|
|  | 49 | */ | 
|---|
|  | 50 | FragmentQueue::~FragmentQueue() | 
|---|
| [78ad7d] | 51 | { | 
|---|
|  | 52 | jobs.clear(); | 
|---|
|  | 53 | results.clear(); | 
|---|
|  | 54 | } | 
|---|
| [b5ebb5] | 55 |  | 
|---|
| [12d15a] | 56 | /** Checks whether there are jobs in the queue at all. | 
|---|
|  | 57 | * \return true - jobs present, false - queue is empty | 
|---|
|  | 58 | */ | 
|---|
|  | 59 | bool FragmentQueue::isJobPresent() const | 
|---|
|  | 60 | { | 
|---|
|  | 61 | return !jobs.empty(); | 
|---|
|  | 62 | } | 
|---|
|  | 63 |  | 
|---|
| [b5ebb5] | 64 | /** Pushes a FragmentJob into the internal queue for delivery to server. | 
|---|
|  | 65 | * | 
|---|
|  | 66 | * \note we throw assertion when jobid has already been used. | 
|---|
|  | 67 | * | 
|---|
|  | 68 | * \param job job to enter into queue | 
|---|
|  | 69 | */ | 
|---|
| [78ad7d] | 70 | void FragmentQueue::pushJob(FragmentJob::ptr job) | 
|---|
| [b5ebb5] | 71 | { | 
|---|
| [fb255d] | 72 | if ((job->getId() != JobId::IllegalJob) && (!results.count(job->getId()))) { | 
|---|
|  | 73 | OBSERVE; | 
|---|
|  | 74 | NOTIFY(JobAdded); | 
|---|
|  | 75 | results.insert( std::make_pair(job->getId(), NoResult)); | 
|---|
|  | 76 | jobs.push_back(job); | 
|---|
|  | 77 | } else { | 
|---|
|  | 78 | ELOG(1, "job to push has IllegalJob id or id "+toString(job->getId())+" has already been used."); | 
|---|
|  | 79 | ASSERT(false, | 
|---|
|  | 80 | "job to push has IllegalJob id or id "+toString(job->getId())+" has already been used."); | 
|---|
|  | 81 | } | 
|---|
| [12d15a] | 82 | } | 
|---|
|  | 83 |  | 
|---|
| [9875cc] | 84 | /** Pushes a bunch of FragmentJob's into the internal queue for delivery to server. | 
|---|
|  | 85 | * | 
|---|
|  | 86 | * \note we throw assertion when jobid has already been used. | 
|---|
|  | 87 | * | 
|---|
|  | 88 | * \sa pushJob() | 
|---|
|  | 89 | * | 
|---|
|  | 90 | * \param _jobs jobs to enter into queue | 
|---|
|  | 91 | */ | 
|---|
| [78ad7d] | 92 | void FragmentQueue::pushJobs(std::vector<FragmentJob::ptr> &_jobs) | 
|---|
| [9875cc] | 93 | { | 
|---|
| [78ad7d] | 94 | for (std::vector<FragmentJob::ptr>::iterator iter = _jobs.begin(); | 
|---|
| [9875cc] | 95 | iter != _jobs.end(); ++iter) | 
|---|
|  | 96 | pushJob(*iter); | 
|---|
|  | 97 | } | 
|---|
|  | 98 |  | 
|---|
| [12d15a] | 99 | /** Pops top-most FragmentJob from internal queue. | 
|---|
|  | 100 | * | 
|---|
|  | 101 | * From here on, we expect a result in FragmentQueue::results. | 
|---|
|  | 102 | * | 
|---|
|  | 103 | * \return job topmost job from queue | 
|---|
|  | 104 | */ | 
|---|
| [78ad7d] | 105 | FragmentJob::ptr FragmentQueue::popJob() | 
|---|
| [12d15a] | 106 | { | 
|---|
| [d6b12c] | 107 | OBSERVE; | 
|---|
|  | 108 | NOTIFY(JobRemoved); | 
|---|
| [12d15a] | 109 | ASSERT(jobs.size(), | 
|---|
|  | 110 | "FragmentQueue::popJob() - there are no jobs on the queue."); | 
|---|
| [78ad7d] | 111 | FragmentJob::ptr job = jobs.front(); | 
|---|
| [fe95b7] | 112 | #ifndef NDEBUG | 
|---|
|  | 113 | std::pair< BackupMap::iterator, bool> inserter = | 
|---|
|  | 114 | #endif | 
|---|
|  | 115 | backup.insert( std::make_pair( job->getId(), job )); | 
|---|
|  | 116 | ASSERT (inserter.second, | 
|---|
|  | 117 | "FragmentQueue::popJob() - job "+toString(job->getId())+ | 
|---|
|  | 118 | " is already in the backup."); | 
|---|
| [78ad7d] | 119 | ResultMap::iterator iter = results.find(job->getId()); | 
|---|
| [12d15a] | 120 | ASSERT(iter != results.end(), | 
|---|
| [78ad7d] | 121 | "FragmentQueue::popJob() - for job "+toString(job->getId())+" no result place has been stored."); | 
|---|
| [12d15a] | 122 | iter->second = NoResultQueued; | 
|---|
|  | 123 | jobs.pop_front(); | 
|---|
|  | 124 | return job; | 
|---|
| [b5ebb5] | 125 | } | 
|---|
|  | 126 |  | 
|---|
| [b9c486] | 127 | /** Internal function to check whether result is not one of static entities. | 
|---|
|  | 128 | * | 
|---|
|  | 129 | * @param result result to check against | 
|---|
|  | 130 | * @return true - result is a present, valid result, false - result is one of the statics | 
|---|
|  | 131 | */ | 
|---|
| [35f587] | 132 | bool FragmentQueue::isPresentResult(const FragmentResult::ptr result) const | 
|---|
| [b9c486] | 133 | { | 
|---|
| [35f587] | 134 | return (*result != *NoResult) | 
|---|
|  | 135 | && (*result != *NoResultQueued) | 
|---|
|  | 136 | && (*result != *ResultDelivered); | 
|---|
| [b9c486] | 137 | } | 
|---|
|  | 138 |  | 
|---|
| [b5ebb5] | 139 | /** Queries whether a job has already been finished and the result is present. | 
|---|
|  | 140 | * | 
|---|
|  | 141 | * \param jobid id of job to query | 
|---|
|  | 142 | * \return true - result is present, false - result is not present | 
|---|
|  | 143 | */ | 
|---|
|  | 144 | bool FragmentQueue::isResultPresent(JobId_t jobid) const | 
|---|
|  | 145 | { | 
|---|
|  | 146 | ResultMap::const_iterator iter = results.find(jobid); | 
|---|
|  | 147 | return ((iter != results.end()) | 
|---|
| [b9c486] | 148 | && isPresentResult(iter->second)); | 
|---|
| [b5ebb5] | 149 | } | 
|---|
|  | 150 |  | 
|---|
| [8ee5ac] | 151 | /** Counts the number of jobs for which we have a calculated result present. | 
|---|
|  | 152 | * | 
|---|
|  | 153 | * \return number of calculated results | 
|---|
|  | 154 | */ | 
|---|
|  | 155 | size_t FragmentQueue::getDoneJobs() const | 
|---|
|  | 156 | { | 
|---|
|  | 157 | size_t doneJobs = 0; | 
|---|
|  | 158 | for (ResultMap::const_iterator iter = results.begin(); | 
|---|
|  | 159 | iter != results.end(); ++iter) | 
|---|
| [35f587] | 160 | if (isPresentResult(iter->second)) | 
|---|
| [8ee5ac] | 161 | ++doneJobs; | 
|---|
| [bf56f6] | 162 | return doneJobs; | 
|---|
|  | 163 | } | 
|---|
|  | 164 |  | 
|---|
|  | 165 | /** Counts the number of jobs for which still have to be calculated. | 
|---|
|  | 166 | * | 
|---|
|  | 167 | * \return number of jobs to be calculated | 
|---|
|  | 168 | */ | 
|---|
|  | 169 | size_t FragmentQueue::getPresentJobs() const | 
|---|
|  | 170 | { | 
|---|
|  | 171 | const size_t presentJobs = jobs.size(); | 
|---|
|  | 172 | return presentJobs; | 
|---|
| [8ee5ac] | 173 | } | 
|---|
|  | 174 |  | 
|---|
| [b5ebb5] | 175 | /** Delivers result for a finished job. | 
|---|
|  | 176 | * | 
|---|
|  | 177 | * \note we throw assertion if not present | 
|---|
|  | 178 | * | 
|---|
|  | 179 | * \param jobid id of job | 
|---|
|  | 180 | * \return result for job of given \a jobid | 
|---|
|  | 181 | */ | 
|---|
| [35f587] | 182 | FragmentResult::ptr FragmentQueue::getResult(JobId_t jobid) | 
|---|
| [b5ebb5] | 183 | { | 
|---|
|  | 184 | ResultMap::iterator iter = results.find(jobid); | 
|---|
|  | 185 | ASSERT(iter != results.end(), | 
|---|
|  | 186 | "FragmentQueue::pushResult() - job "+toString(jobid)+" is not known to us."); | 
|---|
| [35f587] | 187 | ASSERT(*iter->second != *NoResult, | 
|---|
| [12d15a] | 188 | "FragmentQueue::pushResult() - job "+toString(jobid)+" has not been request for calculation yet."); | 
|---|
| [35f587] | 189 | ASSERT(*iter->second != *NoResultQueued, | 
|---|
| [12d15a] | 190 | "FragmentQueue::pushResult() - job "+toString(jobid)+"'s calculation is underway but not result has arrived yet."); | 
|---|
| [35f587] | 191 | ASSERT(*iter->second != *ResultDelivered, | 
|---|
| [b5ebb5] | 192 | "FragmentQueue::pushResult() - job "+toString(jobid)+"'s result has already been delivered."); | 
|---|
|  | 193 | /// store result | 
|---|
| [35f587] | 194 | FragmentResult::ptr _result = iter->second; | 
|---|
| [b5ebb5] | 195 | /// mark as delivered in map | 
|---|
|  | 196 | iter->second = ResultDelivered; | 
|---|
|  | 197 | /// and return result | 
|---|
| [b9c486] | 198 | return _result; | 
|---|
|  | 199 | } | 
|---|
|  | 200 |  | 
|---|
| [35f587] | 201 | std::vector<FragmentResult::ptr> FragmentQueue::getAllResults() | 
|---|
| [b9c486] | 202 | { | 
|---|
| [35f587] | 203 | std::vector<FragmentResult::ptr> returnresults; | 
|---|
| [b9c486] | 204 | for (ResultMap::iterator iter = results.begin(); | 
|---|
|  | 205 | iter != results.end(); ++iter) { | 
|---|
|  | 206 | if (isPresentResult(iter->second)) { | 
|---|
|  | 207 | returnresults.push_back(getResult(iter->first)); | 
|---|
|  | 208 | iter = results.begin(); | 
|---|
|  | 209 | } | 
|---|
|  | 210 | } | 
|---|
|  | 211 |  | 
|---|
|  | 212 | return returnresults; | 
|---|
| [b5ebb5] | 213 | } | 
|---|
|  | 214 |  | 
|---|
|  | 215 | /** Pushes a result for a finished job. | 
|---|
|  | 216 | * | 
|---|
|  | 217 | * \note we throw assertion if job already has result or is not known. | 
|---|
|  | 218 | * | 
|---|
|  | 219 | * \param result result of job to store | 
|---|
|  | 220 | */ | 
|---|
| [35f587] | 221 | void FragmentQueue::pushResult(FragmentResult::ptr &_result) | 
|---|
| [b5ebb5] | 222 | { | 
|---|
| [fe95b7] | 223 | const JobId_t id = _result->getId(); | 
|---|
| [b5ebb5] | 224 | /// check for presence | 
|---|
| [fe95b7] | 225 | ResultMap::iterator iter = results.find(id); | 
|---|
| [b5ebb5] | 226 | ASSERT(iter != results.end(), | 
|---|
| [fe95b7] | 227 | "FragmentQueue::pushResult() - job "+toString(id)+" is not known to us."); | 
|---|
| [35f587] | 228 | ASSERT(*iter->second == *NoResultQueued, | 
|---|
| [fe95b7] | 229 | "FragmentQueue::pushResult() - is not waiting for the result of job "+toString(id)+"."); | 
|---|
|  | 230 | // check whether this is a resubmitted job | 
|---|
|  | 231 | AttemptsMap::iterator attemptiter = attempts.find(id); | 
|---|
|  | 232 | // check whether succeeded or (finally) failed | 
|---|
| [9a3f84] | 233 | if ((_result->exitflag == 0) | 
|---|
|  | 234 | || ((attemptiter != attempts.end()) && (attemptiter->second >= Max_Attempts)) | 
|---|
|  | 235 | || (Max_Attempts == 1)) { | 
|---|
| [fe95b7] | 236 | // give notice if it is resubmitted job | 
|---|
|  | 237 | if (attemptiter != attempts.end()) { | 
|---|
|  | 238 | if (attemptiter->second >= Max_Attempts) | 
|---|
|  | 239 | ELOG(1, "Job #" << id << " failed on " << Max_Attempts << "th attempt for the last time."); | 
|---|
|  | 240 | else | 
|---|
|  | 241 | LOG(1, "INFO: Job #" << id << " succeeded on " << attemptiter->second << "th attempt."); | 
|---|
|  | 242 | } | 
|---|
|  | 243 | // remove in attempts | 
|---|
|  | 244 | if (attemptiter != attempts.end()) | 
|---|
|  | 245 | attempts.erase(attemptiter); | 
|---|
|  | 246 | // remove in backup map | 
|---|
|  | 247 | BackupMap::iterator backupiter = backup.find(id); | 
|---|
|  | 248 | ASSERT( backupiter != backup.end(), | 
|---|
|  | 249 | "FragmentQueue::pushResult() - cannot find job "+toString(id) | 
|---|
|  | 250 | +" in backup."); | 
|---|
|  | 251 | backup.erase(backupiter); | 
|---|
|  | 252 | /// and overwrite NoResult in found entry | 
|---|
|  | 253 | iter->second = _result; | 
|---|
|  | 254 | } else { | 
|---|
|  | 255 | LOG(1, "Job " << id << " failed, resubmitting."); | 
|---|
|  | 256 | // increase attempts | 
|---|
|  | 257 | if (attemptiter != attempts.end()) | 
|---|
|  | 258 | ++(attemptiter->second); | 
|---|
|  | 259 | else | 
|---|
|  | 260 | attempts.insert( std::make_pair(id, (size_t)1) ); | 
|---|
|  | 261 | // resubmit job | 
|---|
|  | 262 | resubmitJob(id); | 
|---|
|  | 263 | } | 
|---|
|  | 264 | } | 
|---|
|  | 265 |  | 
|---|
|  | 266 | /** Resubmit a job which a worker failed to calculate. | 
|---|
|  | 267 | * | 
|---|
|  | 268 | * @param jobid id of the failed job | 
|---|
|  | 269 | */ | 
|---|
|  | 270 | void FragmentQueue::resubmitJob(const JobId_t jobid) | 
|---|
|  | 271 | { | 
|---|
|  | 272 | BackupMap::iterator iter = backup.find(jobid); | 
|---|
|  | 273 | ASSERT( iter != backup.end(), | 
|---|
|  | 274 | "FragmentQueue::resubmitJob() - job id "+toString(jobid) | 
|---|
|  | 275 | +" not stored in backup."); | 
|---|
|  | 276 | if (iter != backup.end()) { | 
|---|
| [9a3f84] | 277 | // remove present result | 
|---|
| [fe95b7] | 278 | ResultMap::iterator resiter = results.find(jobid); | 
|---|
|  | 279 | ASSERT( resiter != results.end(), | 
|---|
| [9a3f84] | 280 | "FragmentQueue::resubmitJob() - job "+toString(jobid) | 
|---|
|  | 281 | +" to resubmit has no result present."); | 
|---|
| [fe95b7] | 282 | results.erase(resiter); | 
|---|
|  | 283 | pushJob(iter->second); | 
|---|
|  | 284 | backup.erase(iter); | 
|---|
|  | 285 | } | 
|---|
| [b5ebb5] | 286 | } | 
|---|
| [b9c486] | 287 |  | 
|---|