Ignore:
Timestamp:
Jan 30, 2015, 10:34:16 AM (10 years ago)
Author:
Frederik Heber <heber@…>
Children:
419fa2
Parents:
eea0bb (diff), fff8fc (diff)
Note: this is a merge changeset, the changes displayed below correspond to the merge itself.
Use the (diff) links above to see all the changes relative to each parent.
git-author:
Frederik Heber <heber@…> (01/29/15 07:42:59)
git-committer:
Frederik Heber <heber@…> (01/30/15 10:34:16)
Message:

Merge branch 'ThreadFixes' into Candidate_v1.4.10

Conflicts:

src/Actions/ActionQueue.cpp
src/Actions/ActionQueue.hpp

  • many conflicts due to last commit in ThreadFixes. Combined stuff from ThreadFixes and preventing failing actions from clearing queue entirely.
File:
1 edited

Legend:

Unmodified
Added
Removed
  • src/Actions/ActionQueue.cpp

    reea0bb r7e6c0d  
    6565    lastActionOk(true)
    6666#else
     67    lastActionOk(true),
    6768    CurrentAction(0),
    68     lastActionOk(true),
    69     run_thread(boost::bind(&ActionQueue::run, this)),
    70     run_thread_isIdle(true)
     69    run_thread_isIdle(true),
     70    run_thread_running(false),
     71    run_thread(boost::bind(&ActionQueue::run, this))
    7172#endif
    7273{
     
    104105  newaction->prepare(state);
    105106#ifdef HAVE_ACTION_THREAD
    106   mtx_queue.lock();
     107  mtx_actionqueue.lock();
    107108#endif
    108109  actionqueue.push_back( newaction );
     
    114115    std::cerr << "Action " << *boost::get_error_info<ActionNameString>(e) << " has failed." << std::endl;
    115116    World::getInstance().setExitFlag(5);
    116     clearQueue(actionqueue.size()-1);
     117//    clearQueue(actionqueue.size()-1);
    117118    lastActionOk = false;
    118     std::cerr << "Remaining Actions cleared from queue." << std::endl;
     119//    std::cerr << "Remaining Actions cleared from queue." << std::endl;
    119120  } catch (std::exception &e) {
    120121    pushStatus("FAIL: General exception caught, aborting.");
    121122    World::getInstance().setExitFlag(134);
    122     clearQueue(actionqueue.size()-1);
     123//    clearQueue(actionqueue.size()-1);
    123124    lastActionOk = false;
    124     std::cerr << "Remaining Actions cleared from queue." << std::endl;
     125//    std::cerr << "Remaining Actions cleared from queue." << std::endl;
    125126  }
    126127  if (lastActionOk) {
     
    130131  }
    131132#else
    132   setRunThreadIdle(CurrentAction == actionqueue.size());
    133   mtx_queue.unlock();
    134 #endif
    135 }
     133  mtx_actionqueue.unlock();
     134  const bool new_run_thread_isIdle = isActionQueueDone();
     135  setrun_thread_isIdle(new_run_thread_isIdle);
     136#endif
     137}
     138
     139#ifdef HAVE_ACTION_THREAD
     140bool ActionQueue::isActionQueueDone() const
     141{
     142  boost::lock_guard<boost::mutex> lock(mtx_actionqueue);
     143  return (CurrentAction == actionqueue.size());
     144}
     145
     146bool ActionQueue::isTempQueueDone() const
     147{
     148  boost::lock_guard<boost::mutex> lock(mtx_tempqueue);
     149  return tempqueue.empty();
     150}
     151#endif
    136152
    137153void ActionQueue::insertAction(Action *_action, enum Action::QueryOptions state)
     
    142158  Action *newaction = _action->clone(state);
    143159  newaction->prepare(state);
    144   mtx_queue.lock();
    145   tempqueue.push_back( newaction );
    146   setRunThreadIdle( !((CurrentAction != actionqueue.size()) || !tempqueue.empty()) );
    147   mtx_queue.unlock();
     160  {
     161    boost::lock_guard<boost::mutex> lock(mtx_tempqueue);
     162    tempqueue.push_back( newaction );
     163  }
     164  {
     165    bool new_run_thread_isIdle = getrun_thread_isIdle();
     166    new_run_thread_isIdle &= isTempQueueDone();
     167    setrun_thread_isIdle(new_run_thread_isIdle);
     168  }
    148169#endif
    149170}
     
    152173void ActionQueue::run()
    153174{
     175  {
     176    boost::lock_guard<boost::mutex> lock(mtx_run_thread_isIdle);
     177    run_thread_running = true;
     178  }
    154179  bool Interrupted = false;
    155180  do {
     
    157182    try {
    158183#if BOOST_VERSION < 105000
    159       run_thread.sleep(boost::get_system_time() + boost::posix_time::milliseconds(100));
     184      boost::this_thread::sleep(boost::get_system_time() + boost::posix_time::milliseconds(100));
    160185#else
    161186      boost::this_thread::sleep_for(boost::chrono::milliseconds(100));
     
    167192//    LOG(1, "DEBUG: Start of ActionQueue's run() loop.");
    168193    // call all currently present Actions
    169     mtx_queue.lock();
    170194    insertTempQueue();
    171     bool status = (CurrentAction != actionqueue.size());
    172     mtx_queue.unlock();
    173     while (status) {
     195    while ((!Interrupted) && (!isActionQueueDone())) {
    174196      //      boost::this_thread::disable_interruption di;
     197      // access actionqueue, hence using mutex
     198      mtx_actionqueue.lock();
    175199      LOG(0, "Calling Action " << actionqueue[CurrentAction]->getName() << " ... ");
    176200      try {
     
    192216        std::cerr << "Remaining Actions cleared from queue." << std::endl;
    193217      }
     218      // remember action we juse executed
     219      const Action *lastaction = actionqueue[CurrentAction];
     220      // step on to next action and check for end
     221      CurrentAction++;
     222      // insert new actions (before [CurrentAction]) if they have been spawned
     223      // we must have an extra vector for this, as we cannot change actionqueue
     224      // while an action instance is "in-use"
     225      mtx_actionqueue.unlock();
     226
     227      insertTempQueue();
     228
     229      // set last action
    194230      if (lastActionOk) {
    195231        OBSERVE;
    196232        NOTIFY(ActionQueued);
    197         _lastchangedaction = actionqueue[CurrentAction];
    198         mtx_queue.lock();
    199         CurrentAction++;
    200         mtx_queue.unlock();
     233        _lastchangedaction = lastaction;
    201234      }
    202       // access actionqueue, hence using mutex
    203       mtx_queue.lock();
    204       // insert new actions (before [CurrentAction]) if they have been spawned
    205       // we must have an extra vector for this, as we cannot change actionqueue
    206       // while an action instance is "in-use"
    207       insertTempQueue();
    208       status = (CurrentAction != actionqueue.size());
    209       mtx_queue.unlock();
    210     }
    211     setRunThreadIdle( !((CurrentAction != actionqueue.size()) || !tempqueue.empty()) );
     235    }
     236    {
     237      bool new_run_thread_isIdle = isActionQueueDone();
     238      new_run_thread_isIdle &= isTempQueueDone();
     239      setrun_thread_isIdle(new_run_thread_isIdle);
     240    }
    212241    cond_idle.notify_one();
    213242//    LOG(1, "DEBUG: End of ActionQueue's run() loop.");
    214243  } while (!Interrupted);
     244  {
     245    boost::lock_guard<boost::mutex> lock(mtx_run_thread_isIdle);
     246    run_thread_running = false;
     247  }
    215248}
    216249
    217250void ActionQueue::insertTempQueue()
    218251{
     252  boost::lock_guard<boost::mutex> lock(mtx_tempqueue);
    219253  if (!tempqueue.empty()) {
     254    boost::lock_guard<boost::mutex> lock(mtx_actionqueue);
    220255    ActionQueue_t::iterator InsertionIter = actionqueue.begin();
    221256    std::advance(InsertionIter, CurrentAction);
     
    225260}
    226261
     262void ActionQueue::setrun_thread_isIdle(
     263    const bool _run_thread_isIdle)
     264{
     265  boost::unique_lock<boost::mutex> lock(mtx_run_thread_isIdle);
     266  run_thread_isIdle = _run_thread_isIdle;
     267}
     268
     269bool ActionQueue::getrun_thread_isIdle() const
     270{
     271  boost::unique_lock<boost::mutex> lock(mtx_run_thread_isIdle);
     272  return run_thread_isIdle;
     273}
     274
    227275void ActionQueue::wait()
    228276{
    229   boost::unique_lock<boost::mutex> lock(mtx_idle);
    230   while(!run_thread_isIdle)
    231   {
    232       cond_idle.wait(lock);
    233   }
    234 }
    235 #endif
    236 
    237 #ifdef HAVE_ACTION_THREAD
     277  if (run_thread_running) {
     278    boost::unique_lock<boost::mutex> lock(mtx_run_thread_isIdle);
     279    while(!run_thread_isIdle)
     280    {
     281        cond_idle.wait(lock);
     282    }
     283  }
     284}
     285
    238286void ActionQueue::stop()
    239287{
     
    311359void ActionQueue::clearQueue(const size_t _fromAction)
    312360{
    313 #ifdef HAVE_ACTION_THREAD
    314   mtx_queue.lock();
    315 #endif
    316   LOG(1, "Removing all Actions from position " << _fromAction << " onward.");
    317   // free all actions still to be called contained in actionqueue
    318   ActionQueue_t::iterator inititer = actionqueue.begin();
    319   std::advance(inititer, _fromAction);
    320   for (ActionQueue_t::iterator iter = inititer; iter != actionqueue.end(); ++iter)
    321     delete *iter;
    322   actionqueue.erase(inititer, actionqueue.end());
    323   LOG(1, "There are " << actionqueue.size() << " remaining Actions.");
    324 #ifdef HAVE_ACTION_THREAD
    325   CurrentAction = actionqueue.size();
    326   mtx_queue.unlock();
    327 #endif
     361  // free all actions contained in actionqueue
     362  {
     363#ifdef HAVE_ACTION_THREAD
     364    boost::lock_guard<boost::mutex> lock(mtx_actionqueue);
     365#endif
     366    LOG(1, "Removing all Actions from position " << _fromAction << " onward.");
     367    // free all actions still to be called contained in actionqueue
     368    ActionQueue_t::iterator inititer = actionqueue.begin();
     369    std::advance(inititer, _fromAction);
     370    for (ActionQueue_t::iterator iter = inititer; iter != actionqueue.end(); ++iter)
     371      delete *iter;
     372    actionqueue.erase(inititer, actionqueue.end());
     373    LOG(1, "There are " << actionqueue.size() << " remaining Actions.");
     374#ifdef HAVE_ACTION_THREAD
     375    CurrentAction = actionqueue.size();
     376#endif
     377  }
    328378}
    329379
     
    332382{
    333383  // free all actions contained in tempqueue
    334   for (ActionQueue_t::iterator iter = tempqueue.begin();
    335       !tempqueue.empty(); iter = tempqueue.begin()) {
    336     delete *iter;
    337     tempqueue.erase(iter);
    338   }
    339 }
    340 
    341 void ActionQueue::setRunThreadIdle(const bool _flag)
    342 {
    343   {
    344     boost::unique_lock<boost::mutex> lock(mtx_idle);
    345     run_thread_isIdle = _flag;
     384  {
     385    boost::lock_guard<boost::mutex> lock(mtx_tempqueue);
     386    for (ActionQueue_t::iterator iter = tempqueue.begin();
     387        !tempqueue.empty(); iter = tempqueue.begin()) {
     388      delete *iter;
     389      tempqueue.erase(iter);
     390    }
    346391  }
    347392}
Note: See TracChangeset for help on using the changeset viewer.