/* * Project: MoleCuilder * Description: creates and alters molecular systems * Copyright (C) 2010 University of Bonn. All rights reserved. * Please see the LICENSE file or "Copyright notice" in builder.cpp for details. */ /* * Observable.cpp * * Created on: Dec 1, 2011 * Author: heber */ // include config.h #ifdef HAVE_CONFIG_H #include #endif #include "CodePatterns/MemDebug.hpp" #include "CodePatterns/Observer/Observable.hpp" #include "CodePatterns/Assert.hpp" #include "CodePatterns/Observer/Channels.hpp" #include "CodePatterns/Observer/defs.hpp" #include "CodePatterns/Observer/Notification.hpp" #include #include #include //!> This function does nothing with the given Observable void NoOp_informer(const Observable *) {} Observable::graveyard_informer_t Observable::noop_informer(&NoOp_informer); Observable::ChannelMap Observable::NotificationChannels; /** Attaching Sub-observables to Observables. * Increases entry in Observable::(GlobalObservableInfo::getInstance().getdepth()) for this \a *publisher by one. * * The two functions \sa start_observer_internal() and \sa finish_observer_internal() * have to be used together at all time. Never use these functions directly * START_OBSERVER and FINISH_OBSERVER also construct a bogus while(0) loop * thus producing compiler-errors whenever only one is used. * \param *publisher reference of sub-observable */ void Observable::start_observer_internal(Observable *publisher) { boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex()); // increase the count for this observable by one // if no entry for this observable is found, an new one is created // by the STL and initialized to 0 (see STL documentation) #ifdef LOG_OBSERVER observerLog().addMessage((GlobalObservableInfo::getInstance().getdepth())[publisher]) << ">> Locking " << observerLog().getName(publisher); #endif (GlobalObservableInfo::getInstance().getdepth())[publisher]++; } /** Detaching Sub-observables from Observables. * Decreases entry in Observable::(GlobalObservableInfo::getInstance().getdepth()) for this \a *publisher by one. If zero, we * start notifying all our Observers. * * The two functions start_observer_internal() and finish_observer_internal() * have to be used together at all time. Never use these functions directly * START_OBSERVER and FINISH_OBSERVER also construct a bogus while(0) loop * thus producing compiler-errors whenever only one is used. * \param *publisher reference of sub-observable */ void Observable::finish_observer_internal(Observable *publisher) { // decrease the count for this observable // if zero is reached all observed blocks are done and we can // start to notify our observers int depth_publisher = 0; { boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex()); --(GlobalObservableInfo::getInstance().getdepth())[publisher]; #ifdef LOG_OBSERVER observerLog().addMessage((GlobalObservableInfo::getInstance().getdepth())[publisher]) << "<< Unlocking " << observerLog().getName(publisher); #endif depth_publisher = (GlobalObservableInfo::getInstance().getdepth())[publisher]; } if(depth_publisher){} else{ publisher->notifyAll(); // this item is done, so we don't have to keep the count with us // save some memory by erasing it { boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex()); (GlobalObservableInfo::getInstance().getdepth()).erase(publisher); } } } void Observable::enque_notification_internal(Observable *publisher, Notification_ptr notification) { boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex()); (GlobalObservableInfo::getInstance().getnotifications())[publisher].insert(notification); } /** Constructor for Observable Protector. * Basically, calls start_observer_internal(). Hence use this class instead of * calling the function directly. * * \param *protege Observable to be protected. */ Observable::_Observable_protector::_Observable_protector(Observable *_protege) : protege(_protege) { start_observer_internal(protege); } Observable::_Observable_protector::_Observable_protector(const _Observable_protector &dest) : protege(dest.protege) { start_observer_internal(protege); } /** Destructor for Observable Protector. * Basically, calls finish_observer_internal(). Hence use this class instead of * calling the function directly. * * \param *protege Observable to be protected. */ Observable::_Observable_protector::~_Observable_protector() { finish_observer_internal(protege); } /************* Notification mechanism for observables **************/ /** Notify all Observers of changes. * Puts \a *this into Observable::(GlobalObservableInfo::getInstance().getbusyObservables()), calls Observer::update() for all in callee_t * and removes from busy list. */ void Observable::notifyAll() { #ifdef LOG_OBSERVER observerLog().addMessage() << "--> " << observerLog().getName(this) << " is about to inform all its Observers."; #endif // we are busy notifying others right now // add ourselves to the list of busy subjects to enable circle detection { boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex()); (GlobalObservableInfo::getInstance().getbusyObservables()).insert(this); } // see if anyone has signed up for observation // and call all observers try { boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex()); GlobalObservableInfo::calltable_t& callTable = GlobalObservableInfo::getInstance().getcallTable(); const bool callTable_contains = callTable.find(this) != callTable.end(); if (callTable_contains) { // elements are stored sorted by keys in the multimap // so iterating over it gives us a the callees sorted by // the priorities // copy such that signOff() within receiving update() does not affect iterating // this is because within the same thread and with the updateKilled() signOff() may be // called and when executed it modifies targets GlobalObservableInfo::callees_t callees = callTable[this]; GlobalObservableInfo::callees_t::iterator iter; for(iter=callees.begin();iter!=callees.end();++iter){ #ifdef LOG_OBSERVER observerLog().addMessage() << "-> Sending update from " << observerLog().getName(this) << " to " << observerLog().getName((*iter).second) << " (priority=" << (*iter).first << ")"; #endif (*iter).second->update(this); } } } ASSERT_NOCATCH("Exception thrown from Observer Update"); // send out all notifications that need to be done { boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex()); GlobalObservableInfo::notificationSet currentNotifications = (GlobalObservableInfo::getInstance().getnotifications())[this]; for(GlobalObservableInfo::notificationSet::iterator it = currentNotifications.begin(); it != currentNotifications.end();++it){ (*it)->notifyAll(this); } } { boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex()); (GlobalObservableInfo::getInstance().getnotifications()).erase(this); // done with notification, we can leave the set of busy subjects (GlobalObservableInfo::getInstance().getbusyObservables()).erase(this); } #ifdef LOG_OBSERVER observerLog().addMessage() << "--> " << observerLog().getName(this) << " is done informing all its Observers."; #endif } /** Handles passing on updates from sub-Observables. * Mimicks basically the Observer::update() function. * * \param *publisher The \a *this we observe. */ void Observable::update(Observable *publisher) { // circle detection bool presentCircle = false; { boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex()); presentCircle = (GlobalObservableInfo::getInstance().getbusyObservables()).find(this)!=(GlobalObservableInfo::getInstance().getbusyObservables()).end(); } if(presentCircle) { // somehow a circle was introduced... we were busy notifying our // observers, but still we are called by one of our sub-Observables // we cannot be sure observation will still work at this point ASSERT(0,"Circle detected in observation-graph.\n" "Observation-graph always needs to be a DAG to work correctly!\n" "Please check your observation code and fix this!\n"); return; } else { boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex()); // see if we are in the process of changing ourselves // if we are changing ourselves at the same time our sub-observables change // we do not need to publish all the changes at each time we are called std::map& depth = GlobalObservableInfo::getInstance().getdepth(); const bool depth_contains = depth.find(this)==depth.end(); if(depth_contains) { #ifdef LOG_OBSERVER observerLog().addMessage() << "-* Update from " << observerLog().getName(publisher) << " propagated by " << observerLog().getName(this); #endif notifyAll(); } else{ #ifdef LOG_OBSERVER observerLog().addMessage() << "-| Update from " << observerLog().getName(publisher) << " not propagated by " << observerLog().getName(this); #endif } } } /** Sign on an Observer to this Observable. * Puts \a *target into Observable::(GlobalObservableInfo::getInstance().getcallTable()) list. * \param *target Observer * \param priority number in [-20,20] */ void Observable::signOn(Observer *target, GlobalObservableInfo::PriorityLevel priority) const { #ifdef LOG_OBSERVER observerLog().addMessage() << "@@ Signing on " << observerLog().getName(target) << " to " << observerLog().getName(const_cast(this)); #endif bool res = false; boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex()); GlobalObservableInfo::callees_t &callees = (GlobalObservableInfo::getInstance().getcallTable())[const_cast(this)]; GlobalObservableInfo::callees_t::iterator iter; for(iter=callees.begin();iter!=callees.end();++iter){ res |= ((*iter).second == target); } if(!res) callees.insert(std::pair(priority.level,target)); } /** Sign off an Observer from this Observable. * Removes \a *target from Observable::(GlobalObservableInfo::getInstance().getcallTable()) list. * \param *target Observer */ void Observable::signOff(Observer *target) const { { boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex()); GlobalObservableInfo::calltable_t &callTable = GlobalObservableInfo::getInstance().getcallTable(); ASSERT(callTable.count(const_cast(this)), "SignOff called for an Observable without Observers."); #ifdef LOG_OBSERVER observerLog().addMessage() << "** Signing off " << observerLog().getName(target) << " from " << observerLog().getName(const_cast(this)); #endif GlobalObservableInfo::callees_t &callees = callTable[const_cast(this)]; GlobalObservableInfo::callees_t::iterator iter; GlobalObservableInfo::callees_t::iterator deliter; for(iter=callees.begin();iter!=callees.end();) { if((*iter).second == target) { callees.erase(iter++); } else { ++iter; } } if(callees.empty()){ callTable.erase(const_cast(this)); } } (*graveyard_informer)(this); } void Observable::signOn( Observer *target, size_t channelno, GlobalObservableInfo::PriorityLevel priority) const { Notification_ptr notification = getChannel(channelno); #ifdef LOG_OBSERVER observerLog().addMessage() << "@@ Signing on " << observerLog().getName(target) << " to " << observerLog().getName(const_cast(this)) << "'s channel no." << channelno << "."; #endif notification->addObserver(target, priority.level); } void Observable::signOff(Observer *target, size_t channelno) const { Notification_ptr notification = getChannel(channelno); #ifdef LOG_OBSERVER observerLog().addMessage() << "** Signing off " << observerLog().getName(target) << " from " << observerLog().getName(const_cast(this)) << "'s channel no." << channelno << "."; #endif notification->removeObserver(target); (*graveyard_informer)(this); } bool Observable::isBlocked() const { boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex()); return (GlobalObservableInfo::getInstance().getdepth()).count(const_cast(this)) > 0; } Notification_ptr Observable::getChannel(size_t no) const { return getNotificationChannel(this, no); } size_t Observable::getNumberOfObservers() const { boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex()); size_t ObserverCount = 0; { GlobalObservableInfo::calltable_t &callTable = GlobalObservableInfo::getInstance().getcallTable(); GlobalObservableInfo::calltable_t::const_iterator callees_t_iter = callTable.find(const_cast(this)); // if not present, then we have zero observers if (callees_t_iter != callTable.end()) ObserverCount += callees_t_iter->second.size(); } { boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex()); const Channels *OurChannels = getNotificationChannels(this); if (OurChannels != NULL) for (Channels::NotificationTypetoRefMap::const_iterator channeliter = OurChannels->ChannelMap.begin(); channeliter != OurChannels->ChannelMap.end(); ++channeliter) ObserverCount += (channeliter->second)->getNumberOfObservers(); } return ObserverCount; } /** Handles sub-observables that just got killed * when an sub-observerable dies we usually don't need to do anything * \param *publisher Sub-Observable. */ void Observable::subjectKilled(Observable *publisher) { } /** Constructor for class Observable. */ Observable::Observable( std::string name, const channels_t &_channels) : Observer(Observer::BaseConstructor()), graveyard_informer(&noop_informer) { #ifdef LOG_OBSERVER observerLog().addName(this,name); observerLog().addMessage() << "++ Creating Observable " << observerLog().getName(static_cast(this)); #endif if (!_channels.empty()) { Channels *OurChannel = new Channels; // add instance for each notification type for (channels_t::const_iterator iter = _channels.begin(); iter != _channels.end(); ++iter) OurChannel->addChannel(*iter); insertNotificationChannel( std::make_pair(static_cast(this), OurChannel) ); } } /** Destructor for class Observable. * When an observable is deleted, we let all our observers know. \sa Observable::subjectKilled(). */ Observable::~Observable() { #ifdef LOG_OBSERVER observerLog().addMessage() << "-- Destroying Observable " << observerLog().getName(static_cast(this)); #endif bool CallTable_contains = false; { boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex()); CallTable_contains = (GlobalObservableInfo::getInstance().getcallTable()).count(this); } if(CallTable_contains) { // copy the list from the map boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex()); // copy such that signOff() within receiving subjectKilled() does not affect iterating // this is because within the same thread and with the subjectKilled() signOff() may be // called and when executed it modifies targets GlobalObservableInfo::callees_t callees = (GlobalObservableInfo::getInstance().getcallTable())[this]; // delete all entries for this observable GlobalObservableInfo::callees_t::iterator iter; for(iter=callees.begin();iter!=callees.end();++iter) (*iter).second->subjectKilled(this); // erase the list in the map (GlobalObservableInfo::getInstance().getcallTable()).erase(this); } // also kill instance in static Channels map if present eraseNotificationChannel(this); } Observable::channels_t Observable::getChannelList(const size_t max) { channels_t channels(max); std::generate(channels.begin(), channels.end(), UniqueNumber()); return channels; } void Observable::insertNotificationChannel(std::pair _pair) { boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex()); NotificationChannels.insert(_pair); } void Observable::eraseNotificationChannel(Observable * const _target) { boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex()); ChannelMap::iterator iter = NotificationChannels.find(static_cast(_target)); if (iter != NotificationChannels.end()) { iter->second->subjectKilled(static_cast(_target)); delete iter->second; NotificationChannels.erase(iter); } } bool Observable::isNotificationChannelPresent(const Observable * const _target) { boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex()); ChannelMap::const_iterator iter = NotificationChannels.find(const_cast(_target)); return iter != NotificationChannels.end(); } const Channels* Observable::getNotificationChannels(const Observable * const _target) { boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex()); ChannelMap::const_iterator iter = NotificationChannels.find(const_cast(_target)); if (iter != NotificationChannels.end()) return iter->second; else return NULL; } Notification_ptr Observable::getNotificationChannel(const Observable * const _target, const size_t _no) { boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex()); ChannelMap::const_iterator iter = NotificationChannels.find(const_cast(_target)); ASSERT(iter != NotificationChannels.end(), "Observable::getNotificationChannel() - could not find channel for target " +toString(_target)+"."); return iter->second->getChannel(_no); }