Changeset ef2767 for src


Ignore:
Timestamp:
May 4, 2012, 2:19:06 PM (13 years ago)
Author:
Frederik Heber <heber@…>
Branches:
Action_Thermostats, Add_AtomRandomPerturbation, Add_FitFragmentPartialChargesAction, Add_RotateAroundBondAction, Add_SelectAtomByNameAction, Added_ParseSaveFragmentResults, AddingActions_SaveParseParticleParameters, Adding_Graph_to_ChangeBondActions, Adding_MD_integration_tests, Adding_ParticleName_to_Atom, Adding_StructOpt_integration_tests, AtomFragments, Automaking_mpqc_open, AutomationFragmentation_failures, Candidate_v1.5.4, Candidate_v1.6.0, Candidate_v1.6.1, ChangeBugEmailaddress, ChangingTestPorts, ChemicalSpaceEvaluator, CombiningParticlePotentialParsing, Combining_Subpackages, Debian_Package_split, Debian_package_split_molecuildergui_only, Disabling_MemDebug, Docu_Python_wait, EmpiricalPotential_contain_HomologyGraph, EmpiricalPotential_contain_HomologyGraph_documentation, Enable_parallel_make_install, Enhance_userguide, Enhanced_StructuralOptimization, Enhanced_StructuralOptimization_continued, Example_ManyWaysToTranslateAtom, Exclude_Hydrogens_annealWithBondGraph, FitPartialCharges_GlobalError, Fix_BoundInBox_CenterInBox_MoleculeActions, Fix_ChargeSampling_PBC, Fix_ChronosMutex, Fix_FitPartialCharges, Fix_FitPotential_needs_atomicnumbers, Fix_ForceAnnealing, Fix_IndependentFragmentGrids, Fix_ParseParticles, Fix_ParseParticles_split_forward_backward_Actions, Fix_PopActions, Fix_QtFragmentList_sorted_selection, Fix_Restrictedkeyset_FragmentMolecule, Fix_StatusMsg, Fix_StepWorldTime_single_argument, Fix_Verbose_Codepatterns, Fix_fitting_potentials, Fixes, ForceAnnealing_goodresults, ForceAnnealing_oldresults, ForceAnnealing_tocheck, ForceAnnealing_with_BondGraph, ForceAnnealing_with_BondGraph_continued, ForceAnnealing_with_BondGraph_continued_betteresults, ForceAnnealing_with_BondGraph_contraction-expansion, FragmentAction_writes_AtomFragments, FragmentMolecule_checks_bonddegrees, GeometryObjects, Gui_Fixes, Gui_displays_atomic_force_velocity, ImplicitCharges, IndependentFragmentGrids, IndependentFragmentGrids_IndividualZeroInstances, IndependentFragmentGrids_IntegrationTest, IndependentFragmentGrids_Sole_NN_Calculation, JobMarket_RobustOnKillsSegFaults, JobMarket_StableWorkerPool, JobMarket_unresolvable_hostname_fix, MoreRobust_FragmentAutomation, ODR_violation_mpqc_open, PartialCharges_OrthogonalSummation, PdbParser_setsAtomName, PythonUI_with_named_parameters, QtGui_reactivate_TimeChanged_changes, Recreated_GuiChecks, Rewrite_FitPartialCharges, RotateToPrincipalAxisSystem_UndoRedo, SaturateAtoms_findBestMatching, SaturateAtoms_singleDegree, StoppableMakroAction, Subpackage_CodePatterns, Subpackage_JobMarket, Subpackage_LinearAlgebra, Subpackage_levmar, Subpackage_mpqc_open, Subpackage_vmg, Switchable_LogView, ThirdParty_MPQC_rebuilt_buildsystem, TrajectoryDependenant_MaxOrder, TremoloParser_IncreasedPrecision, TremoloParser_MultipleTimesteps, TremoloParser_setsAtomName, Ubuntu_1604_changes, stable
Children:
083490
Parents:
630a6e
git-author:
Frederik Heber <heber@…> (11/27/11 18:38:26)
git-committer:
Frederik Heber <heber@…> (05/04/12 14:19:06)
Message:

Server and Worker now also correctly exchange result.

  • Server uses FragmentQueue which also stores the obtained result.
  • Worker calls received FragmentJob::Work() function and returns the thereby calculated FragmentResult.
  • Regression test for Server/Worker now checks whether job #1 is listed twice, this basically checks whether the result has been exchanged.

There have been some concepts to understand to get this working and these

shall be briefly recorded here:

  • asynchronous communication can only work on objects that live beyond the scope of where they have been called, e.g. therefore FragmentScheduler contains a FragmentResult and FragmentWorker a FragmentJob instance. They receive this object and need the write access in the scope of the aync. comm. and not of the caller's scope. This is probably because the initial argument is only used to set up a buffer of correct length. However, the received instance is created/deserialized first when the communication is completed. And this may well be after the caller's scope has been left.
  • This is different to read operations as probably there the object to send is immediately serialized and placed into an internal buffer such that later access is only to this buffer and not to the original instance which therefore does not need to exist anymore. That's why the above Schedulder and Worker do not have the "other" instance as class members as well.
  • chaining asynchronous communications, e.g. a write after a read has been performed, can only be done by using the callback functions that the async_write/read gets as parameters. They are called when the one operation has finished and therein the next operation can then be launched. This way a successful chain is executed.
Location:
src/Fragmentation/Automation
Files:
4 edited

Legend:

Unmodified
Added
Removed
  • src/Fragmentation/Automation/FragmentScheduler.cpp

    r630a6e ref2767  
    3535#include "CodePatterns/Log.hpp"
    3636#include "FragmentJob.hpp"
     37#include "JobId.hpp"
    3738
    3839#include "FragmentScheduler.hpp"
     
    4344  acceptor_(io_service,
    4445      boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port)
    45   )
     46  ),
     47  result(JobId::NoJob)
    4648{
    4749  Info info(__FUNCTION__);
     
    6062{
    6163  Info info(__FUNCTION__);
    62   std::cout << "handle_accept called." << std::endl;
    6364  if (!e)
    6465  {
     
    6768    if (JobsQueue.isJobPresent()) {
    6869      // pop a job and send it to the client.
    69       FragmentJob s(JobsQueue.popJob());
     70      FragmentJob job(JobsQueue.popJob());
    7071      // The connection::async_write() function will automatically
    7172      // serialize the data structure for us.
    72       conn->async_write(s,
     73      LOG(1, "INFO: Sending job #" << job.getId() << ".");
     74      conn->async_write(job,
    7375        boost::bind(&FragmentScheduler::handle_write, this,
    7476        boost::asio::placeholders::error, conn));
     
    8587        boost::bind(&FragmentScheduler::handle_write, this,
    8688        boost::asio::placeholders::error, conn));
     89
     90      // then there must be no read necesary
    8791
    8892      ELOG(2, "There is currently no job present in the queue.");
     
    102106{
    103107    Info info(__FUNCTION__);
    104   // Nothing to do. The socket will be closed automatically when the last
    105   // reference to the connection object goes away.
     108    LOG(1, "INFO: Job sent.");
     109    // obtain result
     110    LOG(1, "INFO: Receiving result for a job ...");
     111    conn->async_read(result,
     112      boost::bind(&FragmentScheduler::handle_read, this,
     113      boost::asio::placeholders::error, conn));
    106114}
    107115
     116/// Handle completion of a read operation.
     117void FragmentScheduler::handle_read(const boost::system::error_code& e, connection_ptr conn)
     118{
     119    Info info(__FUNCTION__);
     120    // nothing to do
     121    LOG(1, "INFO: Received result for job #" << result.getId() << " ...");
     122    // and push into queue
     123    ASSERT(result.getId() != JobId::NoJob,
     124        "FragmentScheduler::handle_write() - result received has NoJob id.");
     125    ASSERT(result.getId() != JobId::IllegalJob,
     126        "FragmentScheduler::handle_write() - result received has IllegalJob id.");
     127    if ((result.getId() != JobId::NoJob) && (result.getId() != JobId::IllegalJob))
     128      JobsQueue.pushResult(result);
     129    // erase result
     130    result = FragmentResult(JobId::NoJob);
     131}
     132
  • src/Fragmentation/Automation/FragmentScheduler.hpp

    r630a6e ref2767  
    2020#include "FragmentJob.hpp"
    2121#include "FragmentQueue.hpp"
     22#include "FragmentResult.hpp"
    2223
    2324/** FragmentScheduler serves FragmentJobs to Workers.
     
    3738  void handle_write(const boost::system::error_code& e, connection_ptr conn);
    3839
     40  /// Handle completion of a read operation.
     41  void handle_read(const boost::system::error_code& e, connection_ptr conn);
     42
    3943private:
    4044  /// The acceptor object used to accept incoming socket connections.
    4145  boost::asio::ip::tcp::acceptor acceptor_;
     46
     47  /// result that is received from the client.
     48  FragmentResult result;
    4249
    4350  /// Queue with data to be sent to each client.
  • src/Fragmentation/Automation/FragmentWorker.cpp

    r630a6e ref2767  
    3434#include "CodePatterns/Log.hpp"
    3535#include "FragmentJob.hpp"
     36#include "FragmentResult.hpp"
    3637#include "FragmentWorker.hpp"
     38
     39FragmentResult FragmentWorker::EmptyResult(JobId::NoJob, std::string("EmptyResult"));
    3740
    3841/// Constructor starts the asynchronous connect operation.
     
    6467    boost::asio::ip::tcp::resolver::iterator endpoint_iterator)
    6568{
    66     Info info(__FUNCTION__);
     69  Info info(__FUNCTION__);
    6770  if (!e)
    6871  {
     
    7073    // of jobs. The connection::async_read() function will automatically
    7174    // decode the data that is read from the underlying socket.
     75    LOG(1, "INFO: Receiving a job ...");
    7276    connection_.async_read(job,
    7377      boost::bind(&FragmentWorker::handle_read, this,
     
    9599  if (!e)
    96100  {
     101    LOG(1, "INFO: Received job #" << job.getId() << ".");
    97102    if (job.getId() != JobId::NoJob) {
    98103      // Print out the data that was received.
    99104      std::cout << "Job output: " << job.outputfile << "\n";
    100105      std::cout << "Job id: " << job.getId() << "\n";
     106
    101107      // do something .. right now: wait
    102       result = job.Work();
     108      LOG(1, "INFO: Calculating job #" << job.getId() << " ...");
     109      FragmentResult result(job.Work());
     110
     111      // write the result to the server
     112      LOG(1, "INFO: Sending result for job #" << job.getId() << " ...");
     113      connection_.async_write(result,
     114        boost::bind(&FragmentWorker::handle_write, this,
     115        boost::asio::placeholders::error));
     116
    103117    } else {
    104118      std::cout << "The server has no job for me." << std::endl;
     119      // send out empty result
     120      LOG(1, "INFO: Sending empty result ...");
     121      connection_.async_write(EmptyResult,
     122        boost::bind(&FragmentWorker::handle_write, this,
     123        boost::asio::placeholders::error));
    105124      Exitflag = NoJobFlag;
    106125    }
     
    117136}
    118137
     138/// Handle completion of a write operation.
     139void FragmentWorker::handle_write(const boost::system::error_code& e)
     140{
     141  Info info(__FUNCTION__);
     142  // Nothing to do.
     143  LOG(1, "INFO: Job #" << job.getId() << " calculated and sent.");
     144  // erase job
     145  job = FragmentJob();
     146}
     147
  • src/Fragmentation/Automation/FragmentWorker.hpp

    r630a6e ref2767  
    1818#include "Connection.hpp"
    1919#include "FragmentJob.hpp"
    20 #include "FragmentResult.hpp"
    2120
    2221/** Receives a job from Server to execute and return FragmentResult.
     
    3736  void handle_read(const boost::system::error_code& e);
    3837
     38  /// Handle completion of a write operation.
     39  void handle_write(const boost::system::error_code& e);
     40
    3941  enum Exitflag_t {
    4042    OkFlag = 0,
     
    5254
    5355private:
     56  // static entity of an empty result
     57  static FragmentResult EmptyResult;
     58
    5459  /// The Connection to the server.
    5560  Connection connection_;
Note: See TracChangeset for help on using the changeset viewer.