/* * Project: MoleCuilder * Description: creates and alters molecular systems * Copyright (C) 2010 University of Bonn. All rights reserved. * Please see the LICENSE file or "Copyright notice" in builder.cpp for details. */ /* * Relay.cpp * * Created on: Dec 1, 2011 * Author: heber */ // include config.h #ifdef HAVE_CONFIG_H #include #endif //#include "CodePatterns/MemDebug.hpp" #include "CodePatterns/Observer/Relay.hpp" #include "CodePatterns/Assert.hpp" #include "CodePatterns/Observer/Channels.hpp" #include "CodePatterns/Observer/Notification.hpp" #include #include /** Constructor for class Relay. */ Relay::Relay(std::string name) : Observable(name), Updater(NULL) { #ifdef LOG_OBSERVER observerLog().addName(this,name); observerLog().addMessage() << "++ Creating Relay " << observerLog().getName(this); #endif } /** Destructor for class Relay. * When an observable is deleted, we let all our observers know. \sa Relay::subjectKilled(). */ Relay::~Relay() { #ifdef LOG_OBSERVER observerLog().addMessage() << "-- Destroying Relay " << observerLog().getName(this); #endif // killing subjects is done by Observables' dstor } /** Sign on an Observer to this Observable. * Puts \a *target into Observable::(GlobalObservableInfo::getInstance().getcallTable()) list. * \param *target Observer * \param priority number in [-20,20] */ void Relay::signOn(Observer *target, GlobalObservableInfo::PriorityLevel priority) const { #ifdef LOG_OBSERVER observerLog().addMessage() << "@@ Signing on " << observerLog().getName(target) << " to " << observerLog().getName(const_cast(static_cast(this))); #endif bool res = false; boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex()); GlobalObservableInfo::callees_t &callees = (GlobalObservableInfo::getInstance().getcallTable())[const_cast(static_cast(this))]; GlobalObservableInfo::callees_t::iterator iter; for(iter=callees.begin();iter!=callees.end();++iter){ res |= ((*iter).second == target); } if(!res) callees.insert(std::pair(priority.level,target)); } /** Sign off an Observer from this Observable. * Removes \a *target from Observable::(GlobalObservableInfo::getInstance().getcallTable()) list. * \param *target Observer */ void Relay::signOff(Observer *target) const { boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex()); GlobalObservableInfo::calltable_t &callTable = GlobalObservableInfo::getInstance().getcallTable(); ASSERT(callTable.count(const_cast(static_cast(this))), "Relay::signOff() - called for an Observable without Observers."); #ifdef LOG_OBSERVER observerLog().addMessage() << "** Signing off " << observerLog().getName(target) << " from " << observerLog().getName(const_cast(static_cast(this))); #endif GlobalObservableInfo::callees_t &callees = callTable[const_cast(static_cast(this))]; GlobalObservableInfo::callees_t::iterator iter; GlobalObservableInfo::callees_t::iterator deliter; for(iter=callees.begin();iter!=callees.end();) { if((*iter).second == target) { callees.erase(iter++); } else { ++iter; } } if(callees.empty()){ callTable.erase(const_cast(static_cast(this))); } } void Relay::signOn(Observer *target, size_t channelno, GlobalObservableInfo::PriorityLevel priority) const { Notification_ptr notification = getChannel(channelno); notification->addObserver(target, priority.level); } void Relay::signOff(Observer *target, size_t channelno) const { Notification_ptr notification = getChannel(channelno); notification->removeObserver(target); } /** Notify all Observers of changes. * Puts \a *this into Relay::(GlobalObservableInfo::getInstance().getbusyObservables()), calls Observer::update() for all in callee_t * and removes from busy list. */ void Relay::notifyAll() { ASSERT(Updater != NULL, "Relay::notifyAll() called while Updater is NULL."); // we are busy notifying others right now // add ourselves to the list of busy subjects to enable circle detection { boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex()); (GlobalObservableInfo::getInstance().getbusyObservables()).insert(this); } // see if anyone has signed up for observation // and call all observers try { boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex()); GlobalObservableInfo::calltable_t& callTable = GlobalObservableInfo::getInstance().getcallTable(); const bool callTable_contains = callTable.count(this); if(callTable_contains) { // elements are stored sorted by keys in the multimap // so iterating over it gives us a the callees sorted by // the priorities // copy such that signOff() within receiving update() does not affect iterating // this is because within the same thread and with the updateKilled() signOff() may be // called and when executed it modifies targets boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex()); GlobalObservableInfo::callees_t callees = callTable[this]; GlobalObservableInfo::callees_t::iterator iter; for(iter=callees.begin();iter!=callees.end();++iter){ #ifdef LOG_OBSERVER observerLog().addMessage() << "-> " << observerLog().getName(this) << " is relaying update from " << observerLog().getName(Updater) << " to " << observerLog().getName((*iter).second) << " (priority=" << (*iter).first << ")"; #endif (*iter).second->update(Updater); } } } ASSERT_NOCATCH("Exception thrown from Observer Update"); // send out all (GlobalObservableInfo::getInstance().getnotifications()) that need to be done { boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex()); GlobalObservableInfo::notificationSet currentNotifications = (GlobalObservableInfo::getInstance().getnotifications())[Updater]; for(GlobalObservableInfo::notificationSet::iterator it = currentNotifications.begin(); it != currentNotifications.end();++it){ (*it)->notifyAll(Updater); } } { boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex()); (GlobalObservableInfo::getInstance().getnotifications()).erase(Updater); // done with notification, we can leave the set of busy subjects (GlobalObservableInfo::getInstance().getbusyObservables()).erase(this); } } /** Handles passing on updates from sub-Relays. * Mimicks basically the Observer::update() function. * * \param *publisher The \a *this we observe. */ void Relay::update(Observable *publisher) { // circle detection bool circle_present = false; { boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex()); std::set& busyObservables = GlobalObservableInfo::getInstance().getbusyObservables(); circle_present = busyObservables.find(this)!=busyObservables.end(); } if(circle_present) { // somehow a circle was introduced... we were busy notifying our // observers, but still we are called by one of our sub-Relays // we cannot be sure observation will still work at this point ASSERT(0,"Circle detected in observation-graph.\n" "Observation-graph always needs to be a DAG to work correctly!\n" "Please check your observation code and fix this!\n"); return; } else { // see if we are in the process of changing ourselves // if we are changing ourselves at the same time our sub-observables change // we do not need to publish all the changes at each time we are called boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex()); std::map& depth = GlobalObservableInfo::getInstance().getdepth(); const bool depth_contains = depth.find(this)==depth.end(); if(depth_contains) { #ifdef LOG_OBSERVER observerLog().addMessage() << "-* Update from " << observerLog().getName(publisher) << " relayed by " << observerLog().getName(this); #endif Updater = publisher; notifyAll(); Updater = NULL; } else{ #ifdef LOG_OBSERVER observerLog().addMessage() << "-| Update from " << observerLog().getName(publisher) << " not relayed by " << observerLog().getName(this); #endif } } } /** Method for receiving specialized (GlobalObservableInfo::getInstance().getnotifications()). * * \param *publisher The \a *this we observe. * \param notification type of notification */ void Relay::recieveNotification(Observable *publisher, Notification_ptr notification) { Updater = publisher; bool contains_channels = false; { boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex()); contains_channels = NotificationChannels.find(this) != NotificationChannels.end(); } if (contains_channels) { const size_t channelno = notification->getChannelNo(); Notification *mynotification = NULL; { boost::recursive_mutex::scoped_lock lock(GlobalObservableInfo::getInstance().getObservablesMapMutex()); ChannelMap::const_iterator iter = NotificationChannels.find(this); const Channels *myChannels = iter->second; mynotification = myChannels->getChannel(channelno); } ASSERT(mynotification != NULL, "Relay::recieveNotification() - this relay does not have a notification no "+toString(channelno)+"."); mynotification->notifyAll(Updater); Updater = NULL; } else { // note that this relay does not seem to have any channels } } /** Handles sub-observables that just got killed * when an sub-observerable dies we usually don't need to do anything * \param *publisher Sub-Relay. */ void Relay::subjectKilled(Observable *publisher) { }