Changeset 7e6c0d for src/Actions/ActionQueue.cpp
- Timestamp:
- Jan 30, 2015, 10:34:16 AM (10 years ago)
- Children:
- 419fa2
- Parents:
- eea0bb (diff), fff8fc (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the(diff)
links above to see all the changes relative to each parent. - git-author:
- Frederik Heber <heber@…> (01/29/15 07:42:59)
- git-committer:
- Frederik Heber <heber@…> (01/30/15 10:34:16)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
src/Actions/ActionQueue.cpp
reea0bb r7e6c0d 65 65 lastActionOk(true) 66 66 #else 67 lastActionOk(true), 67 68 CurrentAction(0), 68 lastActionOk(true),69 run_thread (boost::bind(&ActionQueue::run, this)),70 run_thread _isIdle(true)69 run_thread_isIdle(true), 70 run_thread_running(false), 71 run_thread(boost::bind(&ActionQueue::run, this)) 71 72 #endif 72 73 { … … 104 105 newaction->prepare(state); 105 106 #ifdef HAVE_ACTION_THREAD 106 mtx_ queue.lock();107 mtx_actionqueue.lock(); 107 108 #endif 108 109 actionqueue.push_back( newaction ); … … 114 115 std::cerr << "Action " << *boost::get_error_info<ActionNameString>(e) << " has failed." << std::endl; 115 116 World::getInstance().setExitFlag(5); 116 clearQueue(actionqueue.size()-1);117 // clearQueue(actionqueue.size()-1); 117 118 lastActionOk = false; 118 std::cerr << "Remaining Actions cleared from queue." << std::endl;119 // std::cerr << "Remaining Actions cleared from queue." << std::endl; 119 120 } catch (std::exception &e) { 120 121 pushStatus("FAIL: General exception caught, aborting."); 121 122 World::getInstance().setExitFlag(134); 122 clearQueue(actionqueue.size()-1);123 // clearQueue(actionqueue.size()-1); 123 124 lastActionOk = false; 124 std::cerr << "Remaining Actions cleared from queue." << std::endl;125 // std::cerr << "Remaining Actions cleared from queue." << std::endl; 125 126 } 126 127 if (lastActionOk) { … … 130 131 } 131 132 #else 132 setRunThreadIdle(CurrentAction == actionqueue.size()); 133 mtx_queue.unlock(); 134 #endif 135 } 133 mtx_actionqueue.unlock(); 134 const bool new_run_thread_isIdle = isActionQueueDone(); 135 setrun_thread_isIdle(new_run_thread_isIdle); 136 #endif 137 } 138 139 #ifdef HAVE_ACTION_THREAD 140 bool ActionQueue::isActionQueueDone() const 141 { 142 boost::lock_guard<boost::mutex> lock(mtx_actionqueue); 143 return (CurrentAction == actionqueue.size()); 144 } 145 146 bool ActionQueue::isTempQueueDone() const 147 { 148 boost::lock_guard<boost::mutex> lock(mtx_tempqueue); 149 return tempqueue.empty(); 150 } 151 #endif 136 152 137 153 void ActionQueue::insertAction(Action *_action, enum Action::QueryOptions state) … … 142 158 Action *newaction = _action->clone(state); 143 159 newaction->prepare(state); 144 mtx_queue.lock(); 145 tempqueue.push_back( newaction ); 146 setRunThreadIdle( !((CurrentAction != actionqueue.size()) || !tempqueue.empty()) ); 147 mtx_queue.unlock(); 160 { 161 boost::lock_guard<boost::mutex> lock(mtx_tempqueue); 162 tempqueue.push_back( newaction ); 163 } 164 { 165 bool new_run_thread_isIdle = getrun_thread_isIdle(); 166 new_run_thread_isIdle &= isTempQueueDone(); 167 setrun_thread_isIdle(new_run_thread_isIdle); 168 } 148 169 #endif 149 170 } … … 152 173 void ActionQueue::run() 153 174 { 175 { 176 boost::lock_guard<boost::mutex> lock(mtx_run_thread_isIdle); 177 run_thread_running = true; 178 } 154 179 bool Interrupted = false; 155 180 do { … … 157 182 try { 158 183 #if BOOST_VERSION < 105000 159 run_thread.sleep(boost::get_system_time() + boost::posix_time::milliseconds(100));184 boost::this_thread::sleep(boost::get_system_time() + boost::posix_time::milliseconds(100)); 160 185 #else 161 186 boost::this_thread::sleep_for(boost::chrono::milliseconds(100)); … … 167 192 // LOG(1, "DEBUG: Start of ActionQueue's run() loop."); 168 193 // call all currently present Actions 169 mtx_queue.lock();170 194 insertTempQueue(); 171 bool status = (CurrentAction != actionqueue.size()); 172 mtx_queue.unlock(); 173 while (status) { 195 while ((!Interrupted) && (!isActionQueueDone())) { 174 196 // boost::this_thread::disable_interruption di; 197 // access actionqueue, hence using mutex 198 mtx_actionqueue.lock(); 175 199 LOG(0, "Calling Action " << actionqueue[CurrentAction]->getName() << " ... "); 176 200 try { … … 192 216 std::cerr << "Remaining Actions cleared from queue." << std::endl; 193 217 } 218 // remember action we juse executed 219 const Action *lastaction = actionqueue[CurrentAction]; 220 // step on to next action and check for end 221 CurrentAction++; 222 // insert new actions (before [CurrentAction]) if they have been spawned 223 // we must have an extra vector for this, as we cannot change actionqueue 224 // while an action instance is "in-use" 225 mtx_actionqueue.unlock(); 226 227 insertTempQueue(); 228 229 // set last action 194 230 if (lastActionOk) { 195 231 OBSERVE; 196 232 NOTIFY(ActionQueued); 197 _lastchangedaction = actionqueue[CurrentAction]; 198 mtx_queue.lock(); 199 CurrentAction++; 200 mtx_queue.unlock(); 233 _lastchangedaction = lastaction; 201 234 } 202 // access actionqueue, hence using mutex 203 mtx_queue.lock(); 204 // insert new actions (before [CurrentAction]) if they have been spawned 205 // we must have an extra vector for this, as we cannot change actionqueue 206 // while an action instance is "in-use" 207 insertTempQueue(); 208 status = (CurrentAction != actionqueue.size()); 209 mtx_queue.unlock(); 210 } 211 setRunThreadIdle( !((CurrentAction != actionqueue.size()) || !tempqueue.empty()) ); 235 } 236 { 237 bool new_run_thread_isIdle = isActionQueueDone(); 238 new_run_thread_isIdle &= isTempQueueDone(); 239 setrun_thread_isIdle(new_run_thread_isIdle); 240 } 212 241 cond_idle.notify_one(); 213 242 // LOG(1, "DEBUG: End of ActionQueue's run() loop."); 214 243 } while (!Interrupted); 244 { 245 boost::lock_guard<boost::mutex> lock(mtx_run_thread_isIdle); 246 run_thread_running = false; 247 } 215 248 } 216 249 217 250 void ActionQueue::insertTempQueue() 218 251 { 252 boost::lock_guard<boost::mutex> lock(mtx_tempqueue); 219 253 if (!tempqueue.empty()) { 254 boost::lock_guard<boost::mutex> lock(mtx_actionqueue); 220 255 ActionQueue_t::iterator InsertionIter = actionqueue.begin(); 221 256 std::advance(InsertionIter, CurrentAction); … … 225 260 } 226 261 262 void ActionQueue::setrun_thread_isIdle( 263 const bool _run_thread_isIdle) 264 { 265 boost::unique_lock<boost::mutex> lock(mtx_run_thread_isIdle); 266 run_thread_isIdle = _run_thread_isIdle; 267 } 268 269 bool ActionQueue::getrun_thread_isIdle() const 270 { 271 boost::unique_lock<boost::mutex> lock(mtx_run_thread_isIdle); 272 return run_thread_isIdle; 273 } 274 227 275 void ActionQueue::wait() 228 276 { 229 boost::unique_lock<boost::mutex> lock(mtx_idle);230 while(!run_thread_isIdle)231 {232 cond_idle.wait(lock);233 }234 }235 #endif 236 237 #ifdef HAVE_ACTION_THREAD 277 if (run_thread_running) { 278 boost::unique_lock<boost::mutex> lock(mtx_run_thread_isIdle); 279 while(!run_thread_isIdle) 280 { 281 cond_idle.wait(lock); 282 } 283 } 284 } 285 238 286 void ActionQueue::stop() 239 287 { … … 311 359 void ActionQueue::clearQueue(const size_t _fromAction) 312 360 { 313 #ifdef HAVE_ACTION_THREAD 314 mtx_queue.lock(); 315 #endif 316 LOG(1, "Removing all Actions from position " << _fromAction << " onward."); 317 // free all actions still to be called contained in actionqueue 318 ActionQueue_t::iterator inititer = actionqueue.begin(); 319 std::advance(inititer, _fromAction); 320 for (ActionQueue_t::iterator iter = inititer; iter != actionqueue.end(); ++iter) 321 delete *iter; 322 actionqueue.erase(inititer, actionqueue.end()); 323 LOG(1, "There are " << actionqueue.size() << " remaining Actions."); 324 #ifdef HAVE_ACTION_THREAD 325 CurrentAction = actionqueue.size(); 326 mtx_queue.unlock(); 327 #endif 361 // free all actions contained in actionqueue 362 { 363 #ifdef HAVE_ACTION_THREAD 364 boost::lock_guard<boost::mutex> lock(mtx_actionqueue); 365 #endif 366 LOG(1, "Removing all Actions from position " << _fromAction << " onward."); 367 // free all actions still to be called contained in actionqueue 368 ActionQueue_t::iterator inititer = actionqueue.begin(); 369 std::advance(inititer, _fromAction); 370 for (ActionQueue_t::iterator iter = inititer; iter != actionqueue.end(); ++iter) 371 delete *iter; 372 actionqueue.erase(inititer, actionqueue.end()); 373 LOG(1, "There are " << actionqueue.size() << " remaining Actions."); 374 #ifdef HAVE_ACTION_THREAD 375 CurrentAction = actionqueue.size(); 376 #endif 377 } 328 378 } 329 379 … … 332 382 { 333 383 // free all actions contained in tempqueue 334 for (ActionQueue_t::iterator iter = tempqueue.begin(); 335 !tempqueue.empty(); iter = tempqueue.begin()) { 336 delete *iter; 337 tempqueue.erase(iter); 338 } 339 } 340 341 void ActionQueue::setRunThreadIdle(const bool _flag) 342 { 343 { 344 boost::unique_lock<boost::mutex> lock(mtx_idle); 345 run_thread_isIdle = _flag; 384 { 385 boost::lock_guard<boost::mutex> lock(mtx_tempqueue); 386 for (ActionQueue_t::iterator iter = tempqueue.begin(); 387 !tempqueue.empty(); iter = tempqueue.begin()) { 388 delete *iter; 389 tempqueue.erase(iter); 390 } 346 391 } 347 392 }
Note:
See TracChangeset
for help on using the changeset viewer.