/* * Project: MoleCuilder * Description: creates and alters molecular systems * Copyright (C) 2012 University of Bonn. All rights reserved. * Please see the LICENSE file or "Copyright notice" in builder.cpp for details. */ /* * OperationQueue.cpp * * Created on: Apr 24, 2012 * Author: heber */ // include config.h #ifdef HAVE_CONFIG_H #include #endif // boost asio needs specific operator new #include #include "CodePatterns/MemDebug.hpp" #include #include #include #include "CodePatterns/Log.hpp" #include "CodePatterns/Observer/Observer.hpp" #include "Operations/AsyncOperation.hpp" #include "Operations/OperationQueue.hpp" #include "WorkerAddress.hpp" size_t OperationQueue::max_connections = 1; OperationQueue::OperationQueue_t::iterator OperationQueue::findOperation(AsyncOperation *op) { OperationQueue_t::iterator iter = std::find_if(queue.begin(), queue.end(), boost::bind(&AsyncOp_ptr::get, boost::lambda::_1) == op); return iter; } void OperationQueue::push_back(AsyncOperation *&op, const WorkerAddress &address) { if (op != NULL) { if (!IsBlockedFlag) { AsyncOp_ptr ptr(op); // this always prevents memory loss ptr->signOn(this); OperationQueue_t::iterator iter = queue.insert(queue.end(), ptr ); op = NULL; AddressMap.insert( make_pair(*iter, address) ); LaunchNextOp(); } else { ELOG(1, "Queue is currently blocked, dropping operation "+op->getName()+"."); delete op; op = NULL; } } else { ELOG(1, "Given operation pointer is NULL."); } } void OperationQueue::LaunchNextOp() { // connection available? if (getNumberOfRunningOps() < max_connections) { // only start operation when address is valid OperationQueue_t::iterator queueiter = std::find_if(queue.begin(), queue.end(), boost::bind(&AddressMap_t::count, boost::ref(AddressMap), boost::lambda::_1) ); if (queueiter != queue.end()) { AddressMap_t::iterator mapiter = AddressMap.find(*queueiter); ASSERT( mapiter != AddressMap.end(), "OperationQueue::LaunchNextOp() - cannot find connection "+toString((*queueiter)->getName())+" in AddressMap."); const WorkerAddress address = mapiter->second; AsyncOp_ptr ptr = mapiter->first; // always erase the op from the list of ones pending for launch AddressMap.erase(mapiter); // only launch when not a debug op if ((!address.host.empty()) && (!address.service.empty())) { LOG(2, "DEBUG: Launching next operation " << ptr->getName() << "."); (*ptr)(address.host, address.service); } else { LOG(3, "DEBUG: Skipping debug operation " << ptr->getName() << " with empty address."); } } else { LOG(2, "DEBUG: All remaining operations are already running."); } } else { LOG(2, "DEBUG: Currently there are no free connections."); } } void OperationQueue::remove(AsyncOperation *op, Observer *observer) { if (op != NULL) { OperationQueue_t::iterator iter = findOperation(op); if (iter != queue.end()) { // sign off and remove op if (observer != NULL) op->signOff(observer); queue.erase(iter); } else { ELOG(1, "Could not find Operation " << op->getName() << " in operation's queue."); } } else { ELOG(1, "Given operation pointer is NULL."); } } void OperationQueue::update(Observable *publisher) { AsyncOperation *op = static_cast(publisher); if (op != NULL) { LOG(1, "INFO: We are note notified that " << op->getName() << " is done, removing ..."); // remove from queue remove(op, this); LaunchNextOp(); } } void OperationQueue::recieveNotification(Observable *publisher, Notification_ptr notification) {} void OperationQueue::subjectKilled(Observable *publisher) { AsyncOperation *op = static_cast(publisher); if (op != NULL) { ELOG(2, "DEBUG: AsyncOperation at " << publisher << " got killed before being done?"); // remove from queue remove(op, this); } }