source: src/Actions/ActionQueue.cpp@ 9a4949

Last change on this file since 9a4949 was 9a4949, checked in by Frederik Heber <heber@…>, 10 years ago

Added extra flag run_thread_running to ActionQueue.

  • helgrind admonished race conditions when thread is joined. This did not fix them but should be safer as we cannot know whether the other thread has already been started when suddenly shutting down molecuilder due to an exception.
  • Property mode set to 100644
File size: 10.8 KB
Line 
1/*
2 * Project: MoleCuilder
3 * Description: creates and alters molecular systems
4 * Copyright (C) 2013 Frederik Heber. All rights reserved.
5 *
6 *
7 * This file is part of MoleCuilder.
8 *
9 * MoleCuilder is free software: you can redistribute it and/or modify
10 * it under the terms of the GNU General Public License as published by
11 * the Free Software Foundation, either version 2 of the License, or
12 * (at your option) any later version.
13 *
14 * MoleCuilder is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 * GNU General Public License for more details.
18 *
19 * You should have received a copy of the GNU General Public License
20 * along with MoleCuilder. If not, see <http://www.gnu.org/licenses/>.
21 */
22
23/*
24 * ActionQueue.cpp
25 *
26 * Created on: Aug 16, 2013
27 * Author: heber
28 */
29
30// include config.h
31#ifdef HAVE_CONFIG_H
32#include <config.h>
33#endif
34
35#include "CodePatterns/MemDebug.hpp"
36
37#include "Actions/ActionQueue.hpp"
38
39#include "CodePatterns/Assert.hpp"
40#include "CodePatterns/IteratorAdaptors.hpp"
41#include "CodePatterns/Log.hpp"
42#include "CodePatterns/Singleton_impl.hpp"
43
44#include <boost/date_time/posix_time/posix_time.hpp>
45#include <boost/version.hpp>
46#include <string>
47#include <sstream>
48#include <vector>
49
50#include "Actions/ActionExceptions.hpp"
51#include "Actions/ActionHistory.hpp"
52#include "Actions/ActionRegistry.hpp"
53#include "World.hpp"
54
55using namespace MoleCuilder;
56
57const Action* ActionQueue::_lastchangedaction = NULL;
58
59ActionQueue::ActionQueue() :
60 Observable("ActionQueue"),
61 AR(new ActionRegistry()),
62 history(new ActionHistory),
63 CurrentAction(0),
64#ifndef HAVE_ACTION_THREAD
65 lastActionOk(true)
66#else
67 lastActionOk(true),
68 run_thread_isIdle(true),
69 run_thread_running(false),
70 run_thread(boost::bind(&ActionQueue::run, this))
71#endif
72{
73 // channels of observable
74 Channels *OurChannel = new Channels;
75 NotificationChannels.insert( std::make_pair(static_cast<Observable *>(this), OurChannel) );
76 // add instance for each notification type
77 for (size_t type = 0; type < NotificationType_MAX; ++type)
78 OurChannel->addChannel(type);
79}
80
81ActionQueue::~ActionQueue()
82{
83#ifdef HAVE_ACTION_THREAD
84 stop();
85#endif
86
87 clearQueue();
88
89 delete history;
90 delete AR;
91}
92
93void ActionQueue::queueAction(const std::string &name, enum Action::QueryOptions state)
94{
95 const Action * const registryaction = AR->getActionByName(name);
96 queueAction(registryaction, state);
97}
98
99void ActionQueue::queueAction(const Action * const _action, enum Action::QueryOptions state)
100{
101 Action *newaction = _action->clone(state);
102 newaction->prepare(state);
103#ifdef HAVE_ACTION_THREAD
104 mtx_queue.lock();
105#endif
106 actionqueue.push_back( newaction );
107#ifndef HAVE_ACTION_THREAD
108 try {
109 newaction->call();
110 lastActionOk = true;
111 } catch(ActionFailureException &e) {
112 std::cerr << "Action " << *boost::get_error_info<ActionNameString>(e) << " has failed." << std::endl;
113 World::getInstance().setExitFlag(5);
114 clearQueue();
115 lastActionOk = false;
116 std::cerr << "ActionQueue cleared." << std::endl;
117 } catch (std::exception &e) {
118 pushStatus("FAIL: General exception caught, aborting.");
119 World::getInstance().setExitFlag(134);
120 clearQueue();
121 lastActionOk = false;
122 std::cerr << "ActionQueue cleared." << std::endl;
123 }
124 if (lastActionOk) {
125 OBSERVE;
126 NOTIFY(ActionQueued);
127 _lastchangedaction = newaction;
128 }
129#else
130 const bool new_run_thread_isIdle = (CurrentAction == actionqueue.size());
131 mtx_queue.unlock();
132 {
133 boost::lock_guard<boost::mutex> lock(mtx_run_thread_isIdle);
134 run_thread_isIdle = new_run_thread_isIdle;
135 }
136#endif
137}
138
139void ActionQueue::insertAction(Action *_action, enum Action::QueryOptions state)
140{
141#ifndef HAVE_ACTION_THREAD
142 queueAction(_action, state);
143#else
144 Action *newaction = _action->clone(state);
145 newaction->prepare(state);
146 mtx_queue.lock();
147 tempqueue.push_back( newaction );
148 {
149 boost::lock_guard<boost::mutex> lock(mtx_run_thread_isIdle);
150 run_thread_isIdle = !((CurrentAction != actionqueue.size()) || !tempqueue.empty());
151 }
152 mtx_queue.unlock();
153#endif
154}
155
156#ifdef HAVE_ACTION_THREAD
157void ActionQueue::run()
158{
159 {
160 boost::lock_guard<boost::mutex> lock(mtx_run_thread_isIdle);
161 run_thread_running = true;
162 }
163 bool Interrupted = false;
164 do {
165 // sleep for some time and wait for queue to fill up again
166 try {
167#if BOOST_VERSION < 105000
168 boost::this_thread::sleep(boost::get_system_time() + boost::posix_time::milliseconds(100));
169#else
170 boost::this_thread::sleep_for(boost::chrono::milliseconds(100));
171#endif
172 } catch(boost::thread_interrupted &e) {
173 LOG(2, "INFO: ActionQueue has received stop signal.");
174 Interrupted = true;
175 }
176// LOG(1, "DEBUG: Start of ActionQueue's run() loop.");
177 // call all currently present Actions
178 mtx_queue.lock();
179 insertTempQueue();
180 bool status = (CurrentAction != actionqueue.size());
181 mtx_queue.unlock();
182 while ((status) && (!Interrupted)) {
183 // boost::this_thread::disable_interruption di;
184 // access actionqueue, hence using mutex
185 mtx_queue.lock();
186 LOG(0, "Calling Action " << actionqueue[CurrentAction]->getName() << " ... ");
187 try {
188 actionqueue[CurrentAction]->call();
189 pushStatus("SUCCESS: Action "+actionqueue[CurrentAction]->getName()+" successful.");
190 lastActionOk = true;
191 } catch(ActionFailureException &e) {
192 pushStatus("FAIL: Action "+*boost::get_error_info<ActionNameString>(e)+" has failed.");
193 World::getInstance().setExitFlag(5);
194 clearQueue();
195 lastActionOk = false;
196 std::cerr << "ActionQueue cleared." << std::endl;
197 CurrentAction = (size_t)-1;
198 } catch (std::exception &e) {
199 pushStatus("FAIL: General exception caught, aborting.");
200 World::getInstance().setExitFlag(134);
201 clearQueue();
202 std::cerr << "ActionQueue cleared." << std::endl;
203 CurrentAction = (size_t)-1;
204 }
205 // remember action we juse executed
206 const Action *lastaction = actionqueue[CurrentAction];
207 // step on to next action and check for end
208 CurrentAction++;
209 // insert new actions (before [CurrentAction]) if they have been spawned
210 // we must have an extra vector for this, as we cannot change actionqueue
211 // while an action instance is "in-use"
212 insertTempQueue();
213 status = (CurrentAction != actionqueue.size());
214 // set last action
215 if (lastActionOk) {
216 OBSERVE;
217 NOTIFY(ActionQueued);
218 _lastchangedaction = lastaction;
219 // unlock before we delve into Observer functions ...
220 mtx_queue.unlock();
221 } else {
222 mtx_queue.unlock();
223 }
224 }
225 mtx_queue.lock();
226 const bool new_run_thread_isIdle = !((CurrentAction != actionqueue.size()) || !tempqueue.empty());
227 mtx_queue.unlock();
228 {
229 boost::lock_guard<boost::mutex> lock(mtx_run_thread_isIdle);
230 run_thread_isIdle = new_run_thread_isIdle;
231 }
232 cond_idle.notify_one();
233// LOG(1, "DEBUG: End of ActionQueue's run() loop.");
234 } while (!Interrupted);
235 {
236 boost::lock_guard<boost::mutex> lock(mtx_run_thread_isIdle);
237 run_thread_running = false;
238 }
239}
240#endif
241
242void ActionQueue::insertTempQueue()
243{
244 if (!tempqueue.empty()) {
245 ActionQueue_t::iterator InsertionIter = actionqueue.begin();
246 std::advance(InsertionIter, CurrentAction);
247 actionqueue.insert( InsertionIter, tempqueue.begin(), tempqueue.end() );
248 tempqueue.clear();
249 }
250}
251
252#ifdef HAVE_ACTION_THREAD
253void ActionQueue::wait()
254{
255 boost::unique_lock<boost::mutex> lock(mtx_run_thread_isIdle);
256 if (run_thread_running)
257 while(!run_thread_isIdle)
258 {
259 cond_idle.wait(lock);
260 }
261}
262#endif
263
264#ifdef HAVE_ACTION_THREAD
265void ActionQueue::stop()
266{
267 // notify actionqueue thread that we wish to terminate
268 run_thread.interrupt();
269 // wait till it ends
270 run_thread.join();
271}
272#endif
273
274Action* ActionQueue::getActionByName(const std::string &name)
275{
276 return AR->getActionByName(name);
277}
278
279bool ActionQueue::isActionKnownByName(const std::string &name) const
280{
281 return AR->isActionPresentByName(name);
282}
283
284void ActionQueue::registerAction(Action *_action)
285{
286 AR->registerInstance(_action);
287}
288
289void ActionQueue::outputAsCLI(std::ostream &output) const
290{
291 for (ActionQueue_t::const_iterator iter = actionqueue.begin();
292 iter != actionqueue.end();
293 ++iter) {
294 // skip store-session in printed list
295 if ( ((*iter)->getName() != std::string("store-session"))
296 && ((*iter)->getName() != std::string("load-session"))) {
297 if (iter != actionqueue.begin())
298 output << " ";
299 (*iter)->outputAsCLI(output);
300 }
301 }
302 output << std::endl;
303}
304
305void ActionQueue::outputAsPython(std::ostream &output) const
306{
307 const std::string prefix("pyMoleCuilder");
308 output << "import " << prefix << std::endl;
309 output << "# ========================== Stored Session BEGIN ==========================" << std::endl;
310 for (ActionQueue_t::const_iterator iter = actionqueue.begin();
311 iter != actionqueue.end();
312 ++iter) {
313 // skip store-session in printed list
314 if ( ((*iter)->getName() != std::string("store-session"))
315 && ((*iter)->getName() != std::string("load-session")))
316 (*iter)->outputAsPython(output, prefix);
317 }
318 output << "# =========================== Stored Session END ===========================" << std::endl;
319}
320
321const ActionTrait& ActionQueue::getActionsTrait(const std::string &name) const
322{
323 // this const_cast is just required as long as we have a non-const getActionByName
324 const Action * const action = const_cast<ActionQueue *>(this)->getActionByName(name);
325 return action->Traits;
326}
327
328void ActionQueue::addElement(Action* _Action,ActionState::ptr _state)
329{
330 history->addElement(_Action, _state);
331}
332
333void ActionQueue::clear()
334{
335 history->clear();
336}
337
338void ActionQueue::clearQueue()
339{
340 // free all actions contained in actionqueue
341 for (ActionQueue_t::iterator iter = actionqueue.begin();
342 !actionqueue.empty(); iter = actionqueue.begin()) {
343 delete *iter;
344 actionqueue.erase(iter);
345 }
346 // free all actions contained in tempqueue
347 for (ActionQueue_t::iterator iter = tempqueue.begin();
348 !tempqueue.empty(); iter = tempqueue.begin()) {
349 delete *iter;
350 tempqueue.erase(iter);
351 }
352#ifdef HAVE_ACTION_THREAD
353 {
354 boost::unique_lock<boost::mutex> lock(mtx_idle);
355 run_thread_isIdle = true;
356 }
357#endif
358}
359
360const ActionQueue::ActionTokens_t ActionQueue::getListOfActions() const
361{
362 ActionTokens_t returnlist;
363
364 returnlist.insert(
365 returnlist.end(),
366 MapKeyConstIterator<ActionRegistry::const_iterator>(AR->getBeginIter()),
367 MapKeyConstIterator<ActionRegistry::const_iterator>(AR->getEndIter()));
368
369 return returnlist;
370}
371
372void ActionQueue::undoLast()
373{
374 history->undoLast();
375}
376
377void ActionQueue::redoLast()
378{
379 history->redoLast();
380}
381
382
383CONSTRUCT_SINGLETON(ActionQueue)
Note: See TracBrowser for help on using the repository browser.