Changeset 8036b7


Ignore:
Timestamp:
Jun 27, 2012, 4:07:21 PM (13 years ago)
Author:
Frederik Heber <heber@…>
Branches:
Action_Thermostats, Add_AtomRandomPerturbation, Add_FitFragmentPartialChargesAction, Add_RotateAroundBondAction, Add_SelectAtomByNameAction, Added_ParseSaveFragmentResults, AddingActions_SaveParseParticleParameters, Adding_Graph_to_ChangeBondActions, Adding_MD_integration_tests, Adding_ParticleName_to_Atom, Adding_StructOpt_integration_tests, AtomFragments, Automaking_mpqc_open, AutomationFragmentation_failures, Candidate_v1.5.4, Candidate_v1.6.0, Candidate_v1.6.1, ChangeBugEmailaddress, ChangingTestPorts, ChemicalSpaceEvaluator, CombiningParticlePotentialParsing, Combining_Subpackages, Debian_Package_split, Debian_package_split_molecuildergui_only, Disabling_MemDebug, Docu_Python_wait, EmpiricalPotential_contain_HomologyGraph, EmpiricalPotential_contain_HomologyGraph_documentation, Enable_parallel_make_install, Enhance_userguide, Enhanced_StructuralOptimization, Enhanced_StructuralOptimization_continued, Example_ManyWaysToTranslateAtom, Exclude_Hydrogens_annealWithBondGraph, FitPartialCharges_GlobalError, Fix_BoundInBox_CenterInBox_MoleculeActions, Fix_ChargeSampling_PBC, Fix_ChronosMutex, Fix_FitPartialCharges, Fix_FitPotential_needs_atomicnumbers, Fix_ForceAnnealing, Fix_IndependentFragmentGrids, Fix_ParseParticles, Fix_ParseParticles_split_forward_backward_Actions, Fix_PopActions, Fix_QtFragmentList_sorted_selection, Fix_Restrictedkeyset_FragmentMolecule, Fix_StatusMsg, Fix_StepWorldTime_single_argument, Fix_Verbose_Codepatterns, Fix_fitting_potentials, Fixes, ForceAnnealing_goodresults, ForceAnnealing_oldresults, ForceAnnealing_tocheck, ForceAnnealing_with_BondGraph, ForceAnnealing_with_BondGraph_continued, ForceAnnealing_with_BondGraph_continued_betteresults, ForceAnnealing_with_BondGraph_contraction-expansion, FragmentAction_writes_AtomFragments, FragmentMolecule_checks_bonddegrees, GeometryObjects, Gui_Fixes, Gui_displays_atomic_force_velocity, ImplicitCharges, IndependentFragmentGrids, IndependentFragmentGrids_IndividualZeroInstances, IndependentFragmentGrids_IntegrationTest, IndependentFragmentGrids_Sole_NN_Calculation, JobMarket_RobustOnKillsSegFaults, JobMarket_StableWorkerPool, JobMarket_unresolvable_hostname_fix, MoreRobust_FragmentAutomation, ODR_violation_mpqc_open, PartialCharges_OrthogonalSummation, PdbParser_setsAtomName, PythonUI_with_named_parameters, QtGui_reactivate_TimeChanged_changes, Recreated_GuiChecks, Rewrite_FitPartialCharges, RotateToPrincipalAxisSystem_UndoRedo, SaturateAtoms_findBestMatching, SaturateAtoms_singleDegree, StoppableMakroAction, Subpackage_CodePatterns, Subpackage_JobMarket, Subpackage_LinearAlgebra, Subpackage_levmar, Subpackage_mpqc_open, Subpackage_vmg, Switchable_LogView, ThirdParty_MPQC_rebuilt_buildsystem, TrajectoryDependenant_MaxOrder, TremoloParser_IncreasedPrecision, TremoloParser_MultipleTimesteps, TremoloParser_setsAtomName, Ubuntu_1604_changes, stable
Children:
95454a
Parents:
95b384
git-author:
Frederik Heber <heber@…> (02/22/12 17:50:06)
git-committer:
Frederik Heber <heber@…> (06/27/12 16:07:21)
Message:

Refactored Listener out of FragmentScheduler.

  • this is preparatory for creating PoolWorker class, i.e. Worker that listen for jobs sent to them by the server.
  • the connection is just reset() on new initiateSocket().
  • closeSocket() correctly shutdown()s and close()s the socket.
  • Note: As ControllerListener receives new jobs and thus knows when the server is required to listen on worker port again, it needs a bound function to WorkerListener_t::initiateSocket(). This will be removed when the new Pool is in place that handles Worker connections.
Location:
src/Fragmentation/Automation
Files:
2 added
3 edited

Legend:

Unmodified
Added
Removed
  • src/Fragmentation/Automation/FragmentScheduler.cpp

    r95b384 r8036b7  
    4040#include "FragmentScheduler.hpp"
    4141
    42 FragmentJob::ptr FragmentScheduler::NoJob(new SystemCommandJob(std::string("/bin/true"), std::string("dosomething"), JobId::NoJob));
     42FragmentJob::ptr FragmentScheduler::WorkerListener_t::NoJob(new SystemCommandJob(std::string("/bin/true"), std::string("dosomething"), JobId::NoJob));
    4343
    4444/** Helper function to enforce binding of FragmentWorker to possible derived
     
    5959 */
    6060FragmentScheduler::FragmentScheduler(boost::asio::io_service& io_service, unsigned short workerport, unsigned short controllerport) :
    61   worker_acceptor_(io_service,
    62       boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), workerport)
    63   ),
    64   controller_acceptor_(io_service,
    65       boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), controllerport)
    66   ),
    67   result( new FragmentResult(JobId::NoJob) ),
    68   jobInfo((size_t)2, 0),
    69   choice(NoOperation),
    70   globalId(0),
    71   Exitflag(OkFlag)
     61    WorkerListener(io_service, workerport, JobsQueue),
     62    ControllerListener(io_service, controllerport, JobsQueue,
     63        boost::bind(&Listener::initiateSocket, boost::ref(WorkerListener)))
    7264{
    7365  Info info(__FUNCTION__);
     
    7567  // only initiate socket if jobs are already present
    7668  if (JobsQueue.isJobPresent()) {
    77     LOG(1, "Listening for workers on port " << workerport << ".");
    78     initiateWorkerSocket();
    79   }
    80 
    81   initiateControllerSocket();
    82   LOG(1, "Listening for controller on port " << controllerport << ".");
    83 }
    84 
    85 /** Internal function to start worker connection.
    86  *
    87  */
    88 void FragmentScheduler::initiateWorkerSocket()
    89 {
    90   // Start an accept operation for worker connections.
    91   connection_ptr new_conn(new Connection(worker_acceptor_.get_io_service()));
    92   worker_acceptor_.async_accept(new_conn->socket(),
    93     boost::bind(&FragmentScheduler::handle_AcceptWorker, this,
    94       boost::asio::placeholders::error, new_conn));
    95 }
    96 
    97 /** Internal function to start controller connection.
    98  *
    99  */
    100 void FragmentScheduler::initiateControllerSocket()
    101 {
    102   // Start an accept operation for controller connection.
    103   connection_ptr new_conn(new Connection(controller_acceptor_.get_io_service()));
    104   controller_acceptor_.async_accept(new_conn->socket(),
    105     boost::bind(&FragmentScheduler::handle_AcceptController, this,
    106       boost::asio::placeholders::error, new_conn));
    107 }
    108 
     69    WorkerListener.initiateSocket();
     70  }
     71
     72  ControllerListener.initiateSocket();
     73}
    10974
    11075/** Handle a new worker connection.
     
    11782 * \param conn reference with the connection
    11883 */
    119 void FragmentScheduler::handle_AcceptWorker(const boost::system::error_code& e, connection_ptr conn)
     84void FragmentScheduler::WorkerListener_t::handle_Accept(const boost::system::error_code& e, connection_ptr conn)
    12085{
    12186  Info info(__FUNCTION__);
     
    13196      LOG(1, "INFO: Sending job #" << job->getId() << ".");
    13297      conn->async_write(job,
    133         boost::bind(&FragmentScheduler::handle_SendJobtoWorker, this,
     98        boost::bind(&FragmentScheduler::WorkerListener_t::handle_SendJobtoWorker, this,
    13499        boost::asio::placeholders::error, conn));
    135100
     
    137102      // send the static NoJob
    138103      conn->async_write(NoJob,
    139         boost::bind(&FragmentScheduler::handle_SendJobtoWorker, this,
     104        boost::bind(&FragmentScheduler::WorkerListener_t::handle_SendJobtoWorker, this,
    140105        boost::asio::placeholders::error, conn));
    141106
     
    150115    // accept operation the io_service will run out of work to do and the
    151116    // server will exit.
    152     Exitflag = WorkerErrorFlag;
     117    Exitflag = ErrorFlag;
    153118    ELOG(0, e.message());
    154119  }
     
    157122    // Start an accept operation for a new Connection only when there
    158123    // are still jobs present
    159     initiateWorkerSocket();
     124    initiateSocket();
    160125  }
    161126}
     
    170135 * \param conn reference with the connection
    171136 */
    172 void FragmentScheduler::handle_SendJobtoWorker(const boost::system::error_code& e, connection_ptr conn)
     137void FragmentScheduler::WorkerListener_t::handle_SendJobtoWorker(const boost::system::error_code& e, connection_ptr conn)
    173138{
    174139    Info info(__FUNCTION__);
     
    177142    LOG(1, "INFO: Receiving result for a job ...");
    178143    conn->async_read(result,
    179       boost::bind(&FragmentScheduler::handle_ReceiveResultFromWorker, this,
     144      boost::bind(&FragmentScheduler::WorkerListener_t::handle_ReceiveResultFromWorker, this,
    180145      boost::asio::placeholders::error, conn));
    181146}
     
    186151 * \param conn reference with the connection
    187152 */
    188 void FragmentScheduler::handle_ReceiveResultFromWorker(const boost::system::error_code& e, connection_ptr conn)
     153void FragmentScheduler::WorkerListener_t::handle_ReceiveResultFromWorker(const boost::system::error_code& e, connection_ptr conn)
    189154{
    190155  Info info(__FUNCTION__);
     
    212177 * \param conn reference with the connection
    213178 */
    214 void FragmentScheduler::handle_AcceptController(const boost::system::error_code& e, connection_ptr conn)
     179void FragmentScheduler::ControllerListener_t::handle_Accept(const boost::system::error_code& e, connection_ptr conn)
    215180{
    216181  Info info(__FUNCTION__);
     
    218183  {
    219184    conn->async_read(choice,
    220       boost::bind(&FragmentScheduler::handle_ReadChoice, this,
     185      boost::bind(&FragmentScheduler::ControllerListener_t::handle_ReadChoice, this,
    221186      boost::asio::placeholders::error, conn));
    222187  }
     
    226191    // accept operation the io_service will run out of work to do and the
    227192    // server will exit.
    228     Exitflag = ControllerErrorFlag;
     193    Exitflag = ErrorFlag;
    229194    ELOG(0, e.message());
    230195  }
     
    236201 * \param conn reference with the connection
    237202 */
    238 void FragmentScheduler::handle_ReadChoice(const boost::system::error_code& e, connection_ptr conn)
     203void FragmentScheduler::ControllerListener_t::handle_ReadChoice(const boost::system::error_code& e, connection_ptr conn)
    239204{
    240205  Info info(__FUNCTION__);
     
    255220      LOG(1, "INFO: Sending next available job id " << nextid << " to controller ...");
    256221      conn->async_write(nextid,
    257         boost::bind(&FragmentScheduler::handle_GetNextJobIdState, this,
     222        boost::bind(&FragmentScheduler::ControllerListener_t::handle_GetNextJobIdState, this,
    258223        boost::asio::placeholders::error, conn));
    259224      break;
     
    265230      LOG(1, "INFO: Receiving bunch of jobs from a controller ...");
    266231      conn->async_read(jobs,
    267         boost::bind(&FragmentScheduler::handle_ReceiveJobs, this,
     232        boost::bind(&FragmentScheduler::ControllerListener_t::handle_ReceiveJobs, this,
    268233        boost::asio::placeholders::error, conn));
    269234      break;
     
    277242      LOG(1, "INFO: Sending state that "+toString(jobInfo[0])+" jobs are present and "+toString(jobInfo[1])+" jobs are done to controller ...");
    278243      conn->async_write(jobInfo,
    279         boost::bind(&FragmentScheduler::handle_CheckResultState, this,
     244        boost::bind(&FragmentScheduler::ControllerListener_t::handle_CheckResultState, this,
    280245        boost::asio::placeholders::error, conn));
    281246      break;
     
    287252      LOG(1, "INFO: Sending "+toString(results.size())+" results to controller ...");
    288253      conn->async_write(results,
    289         boost::bind(&FragmentScheduler::handle_SendResults, this,
     254        boost::bind(&FragmentScheduler::ControllerListener_t::handle_SendResults, this,
    290255        boost::asio::placeholders::error, conn));
    291256      break;
     
    297262    }
    298263    default:
    299       Exitflag = ControllerErrorFlag;
     264      Exitflag = ErrorFlag;
    300265      ELOG(1, "FragmentScheduler::handle_ReadChoice() - called with no valid choice.");
    301266      break;
     
    307272      LOG(1, "Launching new acceptor on socket.");
    308273      // Start an accept operation for a new Connection.
    309       initiateControllerSocket();
     274      initiateSocket();
    310275    }
    311276  }
     
    315280    // accept operation the io_service will run out of work to do and the
    316281    // server will exit.
    317     Exitflag = ControllerErrorFlag;
     282    Exitflag = ErrorFlag;
    318283    ELOG(0, e.message());
    319284  }
     
    329294 * \param conn reference with the connection
    330295 */
    331 void FragmentScheduler::handle_ReceiveJobs(const boost::system::error_code& e, connection_ptr conn)
    332 {
    333   Info info(__FUNCTION__);
    334   bool initiateSocket = !JobsQueue.isJobPresent();
     296void FragmentScheduler::ControllerListener_t::handle_ReceiveJobs(const boost::system::error_code& e, connection_ptr conn)
     297{
     298  Info info(__FUNCTION__);
     299  bool need_initiateSocket = !JobsQueue.isJobPresent();
    335300
    336301  // jobs are received, hence place in JobsQueue
     
    338303    LOG(1, "INFO: Pushing " << jobs.size() << " jobs into queue.");
    339304    JobsQueue.pushJobs(jobs);
    340     // initiate socket if we had no jobs before
    341     if (initiateSocket)
    342       initiateWorkerSocket();
    343305  }
    344306
    345307  jobs.clear();
    346308
     309  // initiate socket if we had no jobs before
     310  if (need_initiateSocket)
     311    initiateWorkerSocket();
    347312}
    348313
     
    352317 * \param conn reference with the connection
    353318 */
    354 void FragmentScheduler::handle_CheckResultState(const boost::system::error_code& e, connection_ptr conn)
     319void FragmentScheduler::ControllerListener_t::handle_CheckResultState(const boost::system::error_code& e, connection_ptr conn)
    355320{
    356321  Info info(__FUNCTION__);
     
    364329 * \param conn reference with the connection
    365330 */
    366 void FragmentScheduler::handle_GetNextJobIdState(const boost::system::error_code& e, connection_ptr conn)
     331void FragmentScheduler::ControllerListener_t::handle_GetNextJobIdState(const boost::system::error_code& e, connection_ptr conn)
    367332{
    368333  Info info(__FUNCTION__);
     
    376341 * \param conn reference with the connection
    377342 */
    378 void FragmentScheduler::handle_SendResults(const boost::system::error_code& e, connection_ptr conn)
     343void FragmentScheduler::ControllerListener_t::handle_SendResults(const boost::system::error_code& e, connection_ptr conn)
    379344{
    380345  Info info(__FUNCTION__);
  • src/Fragmentation/Automation/FragmentScheduler.hpp

    r95b384 r8036b7  
    1616#include <vector>
    1717#include <boost/asio.hpp>
     18#include <boost/function.hpp>
    1819
    1920#include "Connection.hpp"
     
    2223#include "GlobalJobId.hpp"
    2324#include "Jobs/FragmentJob.hpp"
     25#include "Listener.hpp"
    2426#include "Results/FragmentResult.hpp"
    2527#include "types.hpp"
    2628
    27 /** FragmentScheduler serves FragmentJobs to Workers.
     29/** FragmentScheduler serves FragmentJobs to Workers and accepts commands from
     30 * a Controller.
    2831 *
    2932 */
     
    3538  FragmentScheduler(boost::asio::io_service& io_service, unsigned short workerport, unsigned short controllerport);
    3639
    37   enum Exitflag_t{
    38     OkFlag = 0,
    39     QueueError = 128,
    40     ControllerErrorFlag = 254,
    41     WorkerErrorFlag = 255
     40private:
     41  class WorkerListener_t : public Listener
     42  {
     43  public:
     44    WorkerListener_t(
     45        boost::asio::io_service& io_service,
     46        unsigned short port,
     47        FragmentQueue &_JobsQueue) :
     48      Listener(io_service, port),
     49      JobsQueue(_JobsQueue),
     50      result( new FragmentResult(JobId::NoJob) )
     51    {}
     52    virtual ~WorkerListener_t() {}
     53
     54  protected:
     55    /// Handle completion of a accept worker operation.
     56    void handle_Accept(const boost::system::error_code& e, connection_ptr conn);
     57
     58    /// Worker callback function when job has been sent.
     59    void handle_SendJobtoWorker(const boost::system::error_code& e, connection_ptr conn);
     60
     61    /// Worker callback function when result has been received.
     62    void handle_ReceiveResultFromWorker(const boost::system::error_code& e, connection_ptr conn);
     63  private:
     64    //!> reference to external FragmentQueue containing jobs to work on
     65    FragmentQueue & JobsQueue;
     66
     67    /// result that is received from the client.
     68    FragmentResult::ptr result;
     69
     70    // static entity to indicate to clients that the queue is empty.
     71    static FragmentJob::ptr NoJob;
    4272  };
    4373
     74  class ControllerListener_t : public Listener
     75  {
     76  public:
     77    ControllerListener_t(
     78        boost::asio::io_service& io_service,
     79        unsigned short port,
     80        FragmentQueue &_JobsQueue,
     81        boost::function<void ()> _initiateWorkerSocket) :
     82      Listener(io_service, port),
     83      JobsQueue(_JobsQueue),
     84      jobInfo((size_t)2, 0),
     85      choice(NoOperation),
     86      globalId(0),
     87      initiateWorkerSocket(_initiateWorkerSocket)
     88    {}
     89    virtual ~ControllerListener_t() {}
     90
     91  protected:
     92    /// Handle completion of a accept controller operation.
     93    void handle_Accept(const boost::system::error_code& e, connection_ptr conn);
     94
     95    /// Handle completion of controller operation to read choice
     96    void handle_ReadChoice(const boost::system::error_code& e, connection_ptr conn);
     97
     98    /// Controller callback function when job has been sent.
     99    void handle_ReceiveJobs(const boost::system::error_code& e, connection_ptr conn);
     100
     101    /// Controller callback function when checking on state of results.
     102    void handle_CheckResultState(const boost::system::error_code& e, connection_ptr conn);
     103
     104    /// Controller callback function when checking on state of results.
     105    void handle_GetNextJobIdState(const boost::system::error_code& e, connection_ptr conn);
     106
     107    /// Controller callback function when result has been received.
     108    void handle_SendResults(const boost::system::error_code& e, connection_ptr conn);
     109
     110  private:
     111    //!> reference to external FragmentQueue containing jobs to work on
     112    FragmentQueue & JobsQueue;
     113
     114    /// bunch of jobs received from controller before placed in JobsQueue
     115    std::vector<FragmentJob::ptr> jobs;
     116
     117    /// number of jobs that are waiting to be and are calculated, required for returning status
     118    std::vector<size_t> jobInfo;
     119
     120    // choice
     121    enum ControllerChoices choice;
     122
     123    // TODO: replace this instance by a IdPool.
     124    //!> global id to give next available job id
     125    GlobalJobId globalId;
     126
     127    //!> callback function to tell that worker socket should be enabled
     128    boost::function<void ()> initiateWorkerSocket;
     129  };
     130
     131private:
     132  //!> Listener instance that waits for a worker
     133  WorkerListener_t  WorkerListener;
     134
     135  //!> Listener instance that waits for a controller
     136  ControllerListener_t  ControllerListener;
     137
     138  /// Queue with data to be sent to each client.
     139  FragmentQueue JobsQueue;
     140
     141public:
    44142  /** Getter for Exitflag.
    45143   *
     
    48146  size_t getExitflag() const
    49147  {
    50     return Exitflag;
     148    if (WorkerListener.getExitflag() != 0)
     149      return WorkerListener.getExitflag();
     150    if (ControllerListener.getExitflag() != 0)
     151      return ControllerListener.getExitflag();
     152    return 0;
    51153  }
    52154
    53 protected:
    54   /// Handle completion of a accept worker operation.
    55   void handle_AcceptWorker(const boost::system::error_code& e, connection_ptr conn);
    56 
    57   /// Handle completion of a accept controller operation.
    58   void handle_AcceptController(const boost::system::error_code& e, connection_ptr conn);
    59 
    60   /// Handle completion of controller operation to read choice
    61   void handle_ReadChoice(const boost::system::error_code& e, connection_ptr conn);
    62 
    63   /// Worker callback function when job has been sent.
    64   void handle_SendJobtoWorker(const boost::system::error_code& e, connection_ptr conn);
    65 
    66   /// Worker callback function when result has been received.
    67   void handle_ReceiveResultFromWorker(const boost::system::error_code& e, connection_ptr conn);
    68 
    69   /// Controller callback function when job has been sent.
    70   void handle_ReceiveJobs(const boost::system::error_code& e, connection_ptr conn);
    71 
    72 private:
    73 
    74   /// Controller callback function when checking on state of results.
    75   void handle_CheckResultState(const boost::system::error_code& e, connection_ptr conn);
    76 
    77   /// Controller callback function when checking on state of results.
    78   void handle_GetNextJobIdState(const boost::system::error_code& e, connection_ptr conn);
    79 
    80   /// Controller callback function when result has been received.
    81   void handle_SendResults(const boost::system::error_code& e, connection_ptr conn);
    82 
    83   /// internal function to prepare worker connections
    84   void initiateWorkerSocket();
    85 
    86   /// internal function to prepare controller connections
    87   void initiateControllerSocket();
    88 
    89 private:
    90   /// The acceptor object used to accept incoming worker socket connections.
    91   boost::asio::ip::tcp::acceptor worker_acceptor_;
    92 
    93   /// The acceptor object used to accept incoming controller socket connections.
    94   boost::asio::ip::tcp::acceptor controller_acceptor_;
    95 
    96   /// result that is received from the client.
    97   FragmentResult::ptr result;
    98 
    99   /// bunch of jobs received from controller before placed in JobsQueue
    100   std::vector<FragmentJob::ptr> jobs;
    101 
    102   /// number of jobs that are waiting to be and are calculated, required for returning status
    103   std::vector<size_t> jobInfo;
    104 
    105   // choice
    106   enum ControllerChoices choice;
    107 
    108   /// Queue with data to be sent to each client.
    109   FragmentQueue JobsQueue;
    110 
    111   // static entity to indicate to clients that the queue is empty.
    112   static FragmentJob::ptr NoJob;
    113 
    114   // TODO: replace this instance by a IdPool.
    115   //!> global id to give next available job id
    116   GlobalJobId globalId;
    117 
    118   // Exit flag on program exit
    119   enum Exitflag_t Exitflag;
    120155};
    121156
  • src/Fragmentation/Automation/Makefile.am

    r95b384 r8036b7  
    8686
    8787SERVERSOURCE = \
    88   FragmentScheduler.cpp
     88  FragmentScheduler.cpp \
     89  Listener.cpp
    8990
    9091SERVERHEADER = \
    9192  Connection.hpp \
    9293  ControllerChoices.hpp \
    93   FragmentScheduler.hpp
     94  FragmentScheduler.hpp \
     95  Listener.hpp
    9496
    9597WORKERSOURCE = \
Note: See TracChangeset for help on using the changeset viewer.