Changeset 41c1b7 for src


Ignore:
Timestamp:
Jun 27, 2012, 4:07:21 PM (13 years ago)
Author:
Frederik Heber <heber@…>
Branches:
Action_Thermostats, Add_AtomRandomPerturbation, Add_FitFragmentPartialChargesAction, Add_RotateAroundBondAction, Add_SelectAtomByNameAction, Added_ParseSaveFragmentResults, AddingActions_SaveParseParticleParameters, Adding_Graph_to_ChangeBondActions, Adding_MD_integration_tests, Adding_ParticleName_to_Atom, Adding_StructOpt_integration_tests, AtomFragments, Automaking_mpqc_open, AutomationFragmentation_failures, Candidate_v1.5.4, Candidate_v1.6.0, Candidate_v1.6.1, ChangeBugEmailaddress, ChangingTestPorts, ChemicalSpaceEvaluator, CombiningParticlePotentialParsing, Combining_Subpackages, Debian_Package_split, Debian_package_split_molecuildergui_only, Disabling_MemDebug, Docu_Python_wait, EmpiricalPotential_contain_HomologyGraph, EmpiricalPotential_contain_HomologyGraph_documentation, Enable_parallel_make_install, Enhance_userguide, Enhanced_StructuralOptimization, Enhanced_StructuralOptimization_continued, Example_ManyWaysToTranslateAtom, Exclude_Hydrogens_annealWithBondGraph, FitPartialCharges_GlobalError, Fix_BoundInBox_CenterInBox_MoleculeActions, Fix_ChargeSampling_PBC, Fix_ChronosMutex, Fix_FitPartialCharges, Fix_FitPotential_needs_atomicnumbers, Fix_ForceAnnealing, Fix_IndependentFragmentGrids, Fix_ParseParticles, Fix_ParseParticles_split_forward_backward_Actions, Fix_PopActions, Fix_QtFragmentList_sorted_selection, Fix_Restrictedkeyset_FragmentMolecule, Fix_StatusMsg, Fix_StepWorldTime_single_argument, Fix_Verbose_Codepatterns, Fix_fitting_potentials, Fixes, ForceAnnealing_goodresults, ForceAnnealing_oldresults, ForceAnnealing_tocheck, ForceAnnealing_with_BondGraph, ForceAnnealing_with_BondGraph_continued, ForceAnnealing_with_BondGraph_continued_betteresults, ForceAnnealing_with_BondGraph_contraction-expansion, FragmentAction_writes_AtomFragments, FragmentMolecule_checks_bonddegrees, GeometryObjects, Gui_Fixes, Gui_displays_atomic_force_velocity, ImplicitCharges, IndependentFragmentGrids, IndependentFragmentGrids_IndividualZeroInstances, IndependentFragmentGrids_IntegrationTest, IndependentFragmentGrids_Sole_NN_Calculation, JobMarket_RobustOnKillsSegFaults, JobMarket_StableWorkerPool, JobMarket_unresolvable_hostname_fix, MoreRobust_FragmentAutomation, ODR_violation_mpqc_open, PartialCharges_OrthogonalSummation, PdbParser_setsAtomName, PythonUI_with_named_parameters, QtGui_reactivate_TimeChanged_changes, Recreated_GuiChecks, Rewrite_FitPartialCharges, RotateToPrincipalAxisSystem_UndoRedo, SaturateAtoms_findBestMatching, SaturateAtoms_singleDegree, StoppableMakroAction, Subpackage_CodePatterns, Subpackage_JobMarket, Subpackage_LinearAlgebra, Subpackage_levmar, Subpackage_mpqc_open, Subpackage_vmg, Switchable_LogView, ThirdParty_MPQC_rebuilt_buildsystem, TrajectoryDependenant_MaxOrder, TremoloParser_IncreasedPrecision, TremoloParser_MultipleTimesteps, TremoloParser_setsAtomName, Ubuntu_1604_changes, stable
Children:
38032a
Parents:
5d8c0f
git-author:
Frederik Heber <heber@…> (02/29/12 08:32:10)
git-committer:
Frederik Heber <heber@…> (06/27/12 16:07:21)
Message:

HUGE: Added PoolWorker, removed (Fragment)Worker, and rewrote parts of FragmentScheduler.

ToDo:

  • FragmentQueue needs a callback function when new jobs have been added.
  • WorkerPool needs a callback function when a worker is idle.
  • WorkerListener_t then does not need a callback anymore, just access to the pool who has then the callback.
  • so far PoolWorker quits after first job (and is also removed by Server).
Location:
src/Fragmentation/Automation
Files:
11 added
4 deleted
6 edited

Legend:

Unmodified
Added
Removed
  • src/Fragmentation/Automation/Controller/Commands/ShutdownOperation.cpp

    r5d8c0f r41c1b7  
    22 * Project: MoleCuilder
    33 * Description: creates and alters molecular systems
    4  * Copyright (C)  2011-2012 University of Bonn. All rights reserved.
     4 * Copyright (C)  2012 University of Bonn. All rights reserved.
    55 * Please see the LICENSE file or "Copyright notice" in builder.cpp for details.
    66 */
     
    99 * ShutdownOperation.cpp
    1010 *
    11  *  Created on: Dec 11, 2011
     11 *  Created on: Feb 28, 2012
    1212 *      Author: heber
    1313 */
  • src/Fragmentation/Automation/FragmentScheduler.cpp

    r5d8c0f r41c1b7  
    3434#include "CodePatterns/Info.hpp"
    3535#include "CodePatterns/Log.hpp"
     36#include "Controller/Commands/EnrollInPoolOperation.hpp"
    3637#include "Jobs/MPQCCommandJob.hpp"
    3738#include "Jobs/SystemCommandJob.hpp"
     
    5960 */
    6061FragmentScheduler::FragmentScheduler(boost::asio::io_service& io_service, unsigned short workerport, unsigned short controllerport) :
    61     WorkerListener(io_service, workerport, JobsQueue),
     62    WorkerListener(io_service, workerport, JobsQueue, pool,
     63        boost::bind(&FragmentScheduler::sendJobToWorker, boost::ref(*this), _1, _2)),
    6264    ControllerListener(io_service, controllerport, JobsQueue,
    63         boost::bind(&Listener::initiateSocket, boost::ref(WorkerListener)))
    64 {
    65   Info info(__FUNCTION__);
     65        boost::bind(&Listener::initiateSocket, boost::ref(WorkerListener))),
     66    connection(io_service),
     67    sendJobOp(connection)
     68{
     69  Info info(__FUNCTION__);
     70
     71  // listen for controller
     72  ControllerListener.initiateSocket();
    6673
    6774  // only initiate socket if jobs are already present
     
    6976    WorkerListener.initiateSocket();
    7077  }
    71 
    72   ControllerListener.initiateSocket();
    7378}
    7479
    7580/** Handle a new worker connection.
    7681 *
    77  * We check whether jobs are in the JobsQueue. If present, job is sent.
    78  *
    79  * \sa handle_SendJobtoWorker()
     82 * We store the given address in the pool.
    8083 *
    8184 * \param e error code if something went wrong
     
    8891  {
    8992    // Successfully accepted a new connection.
    90     // Check whether there are jobs in the queue
    91     if (JobsQueue.isJobPresent()) {
    92       // pop a job and send it to the client.
    93       FragmentJob::ptr job(JobsQueue.popJob());
    94       // The connection::async_write() function will automatically
    95       // serialize the data structure for us.
    96       LOG(1, "INFO: Sending job #" << job->getId() << ".");
    97       conn->async_write(job,
    98         boost::bind(&FragmentScheduler::WorkerListener_t::handle_SendJobtoWorker, this,
    99         boost::asio::placeholders::error, conn));
    100 
    101     } else {
    102       // send the static NoJob
    103       conn->async_write(NoJob,
    104         boost::bind(&FragmentScheduler::WorkerListener_t::handle_SendJobtoWorker, this,
    105         boost::asio::placeholders::error, conn));
    106 
    107       // then there must be no read necesary
    108 
    109       ELOG(2, "There is currently no job present in the queue.");
    110     }
    111   }
    112   else
    113   {
     93    // read address
     94    conn->async_read(address,
     95      boost::bind(&FragmentScheduler::WorkerListener_t::handle_checkAddress, this,
     96      boost::asio::placeholders::error, conn));
     97  }  else {
    11498    // An error occurred. Log it and return. Since we are not starting a new
    11599    // accept operation the io_service will run out of work to do and the
     
    118102    ELOG(0, e.message());
    119103  }
     104}
     105
     106/** Callback function when worker address has been received.
     107 *
     108 * \param e error code if something went wrong
     109 * \param conn reference with the connection
     110 */
     111void FragmentScheduler::WorkerListener_t::handle_checkAddress(const boost::system::error_code& e, connection_ptr conn)
     112{
     113  Info info(__FUNCTION__);
     114  if (!e)
     115  {
     116    if (pool.presentInPool(address)) {
     117      // check whether its priority is busy_priority
     118      if (pool.isWorkerBusy(address)) {
     119        conn->async_read(result,
     120          boost::bind(&FragmentScheduler::WorkerListener_t::handle_ReceiveResultFromWorker, this,
     121          boost::asio::placeholders::error, conn));
     122      } else {
     123        ELOG(1, "INFO: Idle worker "+toString(address)+" has logged in again.");
     124      }
     125    } else {
     126      // insert as its new worker
     127      pool.addWorker(address);
     128      enum EnrollInPoolOperation::EnrollFlag flag = EnrollInPoolOperation::Success;
     129      conn->async_write(flag,
     130        boost::bind(&FragmentScheduler::WorkerListener_t::handle_enrolled, this,
     131        boost::asio::placeholders::error, conn));
     132    }
     133  }
     134  else
     135  {
     136    // An error occurred. Log it and return. Since we are not starting a new
     137    // accept operation the io_service will run out of work to do and the
     138    // server will exit.
     139    Exitflag = ErrorFlag;
     140    ELOG(0, e.message());
     141  }
    120142
    121143  if (JobsQueue.isJobPresent()) {
     
    126148}
    127149
    128 /** Callback function when job has been sent.
    129  *
    130  * After job has been sent we start async_read() for the result.
    131  *
    132  * \sa handle_ReceiveResultFromWorker()
    133  *
    134  * \param e error code if something went wrong
    135  * \param conn reference with the connection
    136  */
    137 void FragmentScheduler::WorkerListener_t::handle_SendJobtoWorker(const boost::system::error_code& e, connection_ptr conn)
    138 {
    139     Info info(__FUNCTION__);
    140     LOG(1, "INFO: Job sent.");
    141     // obtain result
    142     LOG(1, "INFO: Receiving result for a job ...");
    143     conn->async_read(result,
    144       boost::bind(&FragmentScheduler::WorkerListener_t::handle_ReceiveResultFromWorker, this,
    145       boost::asio::placeholders::error, conn));
     150/** Callback function when new worker has enrolled.
     151 *
     152 * \param e error code if something went wrong
     153 * \param conn reference with the connection
     154 */
     155void FragmentScheduler::WorkerListener_t::handle_enrolled(const boost::system::error_code& e, connection_ptr conn)
     156{
     157  Info info(__FUNCTION__);
     158  if (!e)
     159  {
     160    FragmentJob::ptr job;
     161    if (JobsQueue.isJobPresent()) {
     162      job = JobsQueue.popJob();
     163    } else {
     164      job = NoJob;
     165    }
     166    callback_sendJobToWorker(address, job);
     167  }
     168  else
     169  {
     170    // An error occurred. Log it and return. Since we are not starting a new
     171    // accept operation the io_service will run out of work to do and the
     172    // server will exit.
     173    Exitflag = ErrorFlag;
     174    ELOG(0, e.message());
     175  }
    146176}
    147177
     
    155185  Info info(__FUNCTION__);
    156186  LOG(1, "INFO: Received result for job #" << result->getId() << " ...");
     187
    157188  // and push into queue
    158189  ASSERT(result->getId() != (JobId_t)JobId::NoJob,
    159       "FragmentScheduler::handle_ReceiveResultFromWorker() - result received has NoJob id.");
     190      "WorkerListener_t::handle_ReceiveResultFromWorker() - result received has NoJob id.");
    160191  ASSERT(result->getId() != (JobId_t)JobId::IllegalJob,
    161       "FragmentScheduler::handle_ReceiveResultFromWorker() - result received has IllegalJob id.");
     192      "WorkerListener_t::handle_ReceiveResultFromWorker() - result received has IllegalJob id.");
    162193  // place id into expected
    163194  if ((result->getId() != (JobId_t)JobId::NoJob) && (result->getId() != (JobId_t)JobId::IllegalJob))
    164195    JobsQueue.pushResult(result);
     196
     197  // mark as idle
     198  pool.unmarkWorkerBusy(address);
     199  // for now remove worker again from pool such that other may connect
     200  pool.removeWorker(address);
     201
    165202  // erase result
    166203  result.reset();
    167204  LOG(1, "INFO: JobsQueue has " << JobsQueue.getDoneJobs() << " results.");
    168205}
     206
    169207
    170208/** Handle a new controller connection.
     
    348386}
    349387
     388
     389/** Helper function to send a job to worker.
     390 *
     391 * @param address address of worker
     392 * @param job job to send
     393 */
     394void FragmentScheduler::sendJobToWorker(const WorkerAddress &address, FragmentJob::ptr &job)
     395{
     396  ASSERT( !pool.isWorkerBusy(address),
     397      "FragmentScheduler::sendJobToWorker() - Worker "+toString(address)+" is already busy.");
     398  LOG(1, "INFO: Sending job " << job->getId() << " to worker " << address << ".");
     399  sendJobOp.setJob(job);
     400  sendJobOp(address.host, address.service);
     401  // set worker as busy (assuming it has been removed from idle queue already)
     402  WorkerPool::Idle_Queue_t::iterator iter = pool.getIdleWorker(address);
     403  ASSERT( iter != pool.getIdleEnd(),
     404      "FragmentScheduler::sendJobToWorker() - cannot find worker "
     405      +toString(address)+" in idle queue.");
     406  pool.markWorkerBusy(iter);
     407}
     408
     409///** Helper function to shutdown a single worker.
     410// *
     411// * We send NoJob to indicate shutdown
     412// *
     413// * @param address of worker to shutdown
     414// */
     415//void FragmentScheduler::shutdownWorker(const WorkerAddress &address)
     416//{
     417//  sendJobToWorker(address, NoJob);
     418//}
     419//
     420///** Sends shutdown to all current workers in the pool.
     421// *
     422// */
     423//void FragmentScheduler::removeAllWorkers()
     424//{
     425//  // give all workers shutdown signal
     426//  while (pool.presentIdleWorkers()) {
     427//    const WorkerAddress address = pool.getNextIdleWorker();
     428//    shutdownWorker(address);
     429//  }
     430//}
  • src/Fragmentation/Automation/FragmentScheduler.hpp

    r5d8c0f r41c1b7  
    2020#include "Connection.hpp"
    2121#include "ControllerChoices.hpp"
     22#include "Controller/Commands/SendJobToWorkerOperation.hpp"
    2223#include "FragmentQueue.hpp"
    2324#include "GlobalJobId.hpp"
     
    2627#include "Results/FragmentResult.hpp"
    2728#include "types.hpp"
     29#include "Pool/WorkerPool.hpp"
     30#include "WorkerAddress.hpp"
    2831
    2932/** FragmentScheduler serves FragmentJobs to Workers and accepts commands from
     
    3942
    4043private:
     44  void sendJobToWorker(const WorkerAddress &address, FragmentJob::ptr &job);
     45//  void shutdownWorker(const WorkerAddress &address);
     46//  void removeAllWorkers();
     47
    4148  class WorkerListener_t : public Listener
    4249  {
     
    4552        boost::asio::io_service& io_service,
    4653        unsigned short port,
    47         FragmentQueue &_JobsQueue) :
     54        FragmentQueue &_JobsQueue,
     55        WorkerPool &_pool,
     56        boost::function<void (const WorkerAddress&, FragmentJob::ptr&)> _callback) :
    4857      Listener(io_service, port),
     58      address("127.0.0.1", "0"),
    4959      JobsQueue(_JobsQueue),
    50       result( new FragmentResult(JobId::NoJob) )
     60      pool(_pool),
     61      result( new FragmentResult(JobId::NoJob) ),
     62      callback_sendJobToWorker(_callback)
    5163    {}
    5264    virtual ~WorkerListener_t() {}
     
    5769
    5870    /// Worker callback function when job has been sent.
    59     void handle_SendJobtoWorker(const boost::system::error_code& e, connection_ptr conn);
     71    void handle_checkAddress(const boost::system::error_code& e, connection_ptr conn);
     72
     73    /// Worker callback function when new worker has enrolled.
     74    void handle_enrolled(const boost::system::error_code& e, connection_ptr conn);
    6075
    6176    /// Worker callback function when result has been received.
    6277    void handle_ReceiveResultFromWorker(const boost::system::error_code& e, connection_ptr conn);
    6378  private:
    64     //!> reference to external FragmentQueue containing jobs to work on
    65     FragmentQueue & JobsQueue;
     79    //!> address of new Worker
     80    WorkerAddress address;
     81
     82    //!> reference to Queue
     83    FragmentQueue &JobsQueue;
     84
     85    //!> callback reference to container class
     86    WorkerPool &pool;
    6687
    6788    /// result that is received from the client.
    6889    FragmentResult::ptr result;
     90
     91    //!> callback function to access send job function
     92    boost::function<void (const WorkerAddress&, FragmentJob::ptr&)> callback_sendJobToWorker;
    6993
    7094    // static entity to indicate to clients that the queue is empty.
     
    130154
    131155private:
     156  /// Queue with data to be sent to each client.
     157  FragmentQueue JobsQueue;
     158
     159  //!> Pool of Workers
     160  WorkerPool pool;
     161
    132162  //!> Listener instance that waits for a worker
    133163  WorkerListener_t  WorkerListener;
     
    135165  //!> Listener instance that waits for a controller
    136166  ControllerListener_t  ControllerListener;
    137 
    138   /// Queue with data to be sent to each client.
    139   FragmentQueue JobsQueue;
    140167
    141168public:
     
    153180  }
    154181
     182private:
     183  //!> Connection for sending jobs to workers
     184  Connection connection;
     185
     186  //!> internal operation to send jobs to workers
     187  mutable SendJobToWorkerOperation sendJobOp;
    155188};
    156189
  • src/Fragmentation/Automation/Makefile.am

    r5d8c0f r41c1b7  
    4444FRAGMENTATIONCOMMANDSSOURCE = \
    4545  Controller/Commands/CheckResultsOperation.cpp \
     46  Controller/Commands/EnrollInPoolOperation.cpp \
    4647  Controller/Commands/GetNextJobIdOperation.cpp \
     48  Controller/Commands/ObtainJobOperation.cpp \
    4749  Controller/Commands/Operation.cpp \
    4850  Controller/Commands/ReceiveJobsOperation.cpp \
     51  Controller/Commands/SendJobToWorkerOperation.cpp \
    4952  Controller/Commands/SendResultsOperation.cpp \
    5053  Controller/Commands/ShutdownOperation.cpp \
    51   Controller/Commands/WorkOnJobOperation.cpp
     54  Controller/Commands/SubmitResultOperation.cpp
    5255
    5356FRAGMENTATIONCOMMANDSHEADER = \
    5457  ControllerChoices.hpp \
    5558  Controller/Commands/CheckResultsOperation.hpp \
     59  Controller/Commands/EnrollInPoolOperation.hpp \
    5660  Controller/Commands/GetNextJobIdOperation.hpp \
     61  Controller/Commands/ObtainJobOperation.hpp \
    5762  Controller/Commands/Operation.hpp \
    5863  Controller/Commands/ReceiveJobsOperation.hpp \
     64  Controller/Commands/SendJobToWorkerOperation.hpp \
    5965  Controller/Commands/SendResultsOperation.hpp \
    6066  Controller/Commands/ShutdownOperation.hpp \
    61   Controller/Commands/WorkOnJobOperation.hpp
     67  Controller/Commands/SubmitResultOperation.hpp
    6268
    6369noinst_LTLIBRARIES += libMolecuilderFragmentationCommands.la
     
    6975FRAGMENTATIONAUTOMATIONHELPERSOURCE = \
    7076  atexit.cpp \
    71   GlobalJobId.cpp
     77  GlobalJobId.cpp \
     78  Listener.cpp \
     79  WorkerAddress.cpp
    7280
    7381FRAGMENTATIONAUTOMATIONHELPERHEADER = \
    7482  atexit.hpp \
    75   GlobalJobId.hpp
     83  GlobalJobId.hpp \
     84  Listener.hpp \
     85  WorkerAddress.hpp
    7686
    7787noinst_LTLIBRARIES += libMolecuilderFragmentationAutomationHelper.la
     
    8696AM_CPPFLAGS = ${BOOST_CPPFLAGS} ${CodePatterns_CFLAGS}
    8797
    88 bin_PROGRAMS += Controller Server Worker
     98bin_PROGRAMS += Controller PoolWorker Server
    8999
    90100CONTROLLERSOURCE = \
     
    97107  Controller/FragmentController.hpp
    98108
     109POOLWORKERSOURCE = \
     110  Pool/PoolWorker.cpp
     111
     112POOLWORKERHEADER = \
     113  Connection.hpp \
     114  Pool/PoolWorker.hpp
     115
    99116SERVERSOURCE = \
    100117  FragmentScheduler.cpp \
    101   Listener.cpp
     118  Pool/WorkerPool.cpp
    102119
    103120SERVERHEADER = \
     
    105122  ControllerChoices.hpp \
    106123  FragmentScheduler.hpp \
    107   Listener.hpp
    108 
    109 WORKERSOURCE = \
    110   FragmentWorker.cpp \
    111   Worker.cpp
    112 
    113 WORKERHEADER = \
    114   Connection.hpp \
    115   FragmentWorker.hpp
     124  Pool/WorkerPool.hpp
    116125
    117126Controller_SOURCES = $(CONTROLLERSOURCE) $(CONTROLLERHEADER) controller.cpp
     
    119128Controller_CXXFLAGS = $(AM_CPPFLAGS)
    120129Controller_LDADD = \
     130  libMolecuilderFragmentationAutomationHelper.la \
     131  libMolecuilderFragmentationCommands.la \
     132  libMolecuilderFragmentJobs.la \
     133  $(BOOST_ASIO_LIBS) \
     134  $(BOOST_SERIALIZATION_LIBS) \
     135  $(BOOST_THREAD_LIBS) \
     136  $(BOOST_SYSTEM_LIBS) \
     137  ${CodePatterns_LIBS}
     138
     139PoolWorker_SOURCES = $(POOLWORKERSOURCE) $(POOLWORKERHEADER) poolworker.cpp
     140PoolWorker_LDFLAGS = $(AM_LDFLAGS) $(BOOST_ASIO_LDFLAGS) $(BOOST_SYSTEM_LDFLAGS) $(BOOST_THREAD_LDFLAGS) $(BOOST_SERIALIZATION_LDFLAGS)
     141PoolWorker_CXXFLAGS = $(AM_CPPFLAGS) $(BOOST_ASIO_DEBUG)
     142PoolWorker_LDADD = \
    121143  libMolecuilderFragmentationAutomationHelper.la \
    122144  libMolecuilderFragmentationCommands.la \
     
    141163  ${CodePatterns_LIBS}
    142164
    143 Worker_SOURCES = $(WORKERSOURCE) $(WORKERHEADER)
    144 Worker_LDFLAGS = $(AM_LDFLAGS) $(BOOST_ASIO_LDFLAGS) $(BOOST_SYSTEM_LDFLAGS) $(BOOST_THREAD_LDFLAGS) $(BOOST_SERIALIZATION_LDFLAGS)
    145 Worker_CXXFLAGS = $(AM_CPPFLAGS)
    146 Worker_LDADD = \
    147   libMolecuilderFragmentationAutomationHelper.la \
    148   libMolecuilderFragmentationCommands.la \
    149   libMolecuilderFragmentJobs.la \
    150   $(BOOST_ASIO_LIBS) \
    151   $(BOOST_SERIALIZATION_LIBS) \
    152   $(BOOST_THREAD_LIBS) \
    153   $(BOOST_SYSTEM_LIBS) \
    154   ${CodePatterns_LIBS}
    155 
  • src/Fragmentation/Automation/Pool/WorkerPool.cpp

    r5d8c0f r41c1b7  
    7676  // return address
    7777  return returnaddress;
     78}
     79
     80WorkerPool::Idle_Queue_t::iterator WorkerPool::getIdleWorker(const WorkerAddress &address)
     81{
     82  Idle_Queue_t::iterator idleiter = idle_queue.begin();
     83  while (idleiter != idle_queue.end()) {
     84    if (idleiter->second == address) {
     85      break;
     86    }
     87    ++idleiter;
     88  }
     89  return idleiter;
    7890}
    7991
     
    130142  Pool_t::iterator iter = pool.find( address );
    131143  if (iter != pool.end()) {
    132     Idle_Queue_t::iterator idleiter = idle_queue.begin();
    133     while (idleiter != idle_queue.end()) {
    134       if (idleiter->second == address) {
    135         idle_queue.erase(idleiter);
    136         break;
    137       }
    138     }
     144    Idle_Queue_t::iterator idleiter = getIdleWorker(address);
     145    if (idleiter != idle_queue.end())
     146      idle_queue.erase(idleiter);
    139147    Busy_Queue_t::iterator busyiter = busy_queue.find(address);
    140148    if (busyiter != busy_queue.end())
     
    147155        +" is in pool and both idle and busy!");
    148156    pool.erase(iter);
     157    LOG(1, "INFO: Removed worker " << address << " from pool.");
    149158    return true;
    150159  } else {
     
    174183void WorkerPool::markWorkerBusy(Idle_Queue_t::iterator &iter)
    175184{
    176   if (isWorkerBusy(iter->second))
     185  const WorkerAddress returnaddress = iter->second;
     186  if (isWorkerBusy(returnaddress))
    177187    return;
    178   const WorkerAddress returnaddress = iter->second;
    179188  const priority_t priority = iter->first;
    180189
     
    206215    busy_queue.erase(address);
    207216    idle_queue.insert( make_pair( priority, address) );
    208   }
    209 }
     217
     218    LOG(1, "INFO: Worker " << address << " is now marked idle.");
     219  }
     220}
  • src/Fragmentation/Automation/Pool/WorkerPool.hpp

    r5d8c0f r41c1b7  
    4646  void unmarkWorkerBusy(const WorkerAddress &address);
    4747
    48 private:
     48  // this is currently for the passing time until Worker pool is fully operable
     49
    4950  //!> typedef of the priority in the idle queue of a worker
    5051  typedef size_t priority_t;
     
    5354  typedef std::multimap<priority_t, WorkerAddress> Idle_Queue_t;
    5455
     56  Idle_Queue_t::iterator getIdleWorker(const WorkerAddress &address);
     57  Idle_Queue_t::iterator getIdleEnd()
     58  {
     59    return idle_queue.end();
     60  }
     61  void markWorkerBusy(Idle_Queue_t::iterator &iter);
     62
     63private:
    5564  //!> typedef for the worker queue being a map with priority and address of worker
    5665  typedef std::map<WorkerAddress, priority_t> Busy_Queue_t;
     
    6069
    6170private:
    62   void markWorkerBusy(Idle_Queue_t::iterator &iter);
    6371  void removeAllWorkers();
    6472
Note: See TracChangeset for help on using the changeset viewer.