Changeset 92b1d3 for src/Fragmentation


Ignore:
Timestamp:
Jul 2, 2012, 8:32:02 AM (12 years ago)
Author:
Frederik Heber <heber@…>
Branches:
Action_Thermostats, Add_AtomRandomPerturbation, Add_FitFragmentPartialChargesAction, Add_RotateAroundBondAction, Add_SelectAtomByNameAction, Added_ParseSaveFragmentResults, AddingActions_SaveParseParticleParameters, Adding_Graph_to_ChangeBondActions, Adding_MD_integration_tests, Adding_ParticleName_to_Atom, Adding_StructOpt_integration_tests, AtomFragments, Automaking_mpqc_open, AutomationFragmentation_failures, Candidate_v1.5.4, Candidate_v1.6.0, Candidate_v1.6.1, ChangeBugEmailaddress, ChangingTestPorts, ChemicalSpaceEvaluator, CombiningParticlePotentialParsing, Combining_Subpackages, Debian_Package_split, Debian_package_split_molecuildergui_only, Disabling_MemDebug, Docu_Python_wait, EmpiricalPotential_contain_HomologyGraph, EmpiricalPotential_contain_HomologyGraph_documentation, Enable_parallel_make_install, Enhance_userguide, Enhanced_StructuralOptimization, Enhanced_StructuralOptimization_continued, Example_ManyWaysToTranslateAtom, Exclude_Hydrogens_annealWithBondGraph, FitPartialCharges_GlobalError, Fix_BoundInBox_CenterInBox_MoleculeActions, Fix_ChargeSampling_PBC, Fix_ChronosMutex, Fix_FitPartialCharges, Fix_FitPotential_needs_atomicnumbers, Fix_ForceAnnealing, Fix_IndependentFragmentGrids, Fix_ParseParticles, Fix_ParseParticles_split_forward_backward_Actions, Fix_PopActions, Fix_QtFragmentList_sorted_selection, Fix_Restrictedkeyset_FragmentMolecule, Fix_StatusMsg, Fix_StepWorldTime_single_argument, Fix_Verbose_Codepatterns, Fix_fitting_potentials, Fixes, ForceAnnealing_goodresults, ForceAnnealing_oldresults, ForceAnnealing_tocheck, ForceAnnealing_with_BondGraph, ForceAnnealing_with_BondGraph_continued, ForceAnnealing_with_BondGraph_continued_betteresults, ForceAnnealing_with_BondGraph_contraction-expansion, FragmentAction_writes_AtomFragments, FragmentMolecule_checks_bonddegrees, GeometryObjects, Gui_Fixes, Gui_displays_atomic_force_velocity, ImplicitCharges, IndependentFragmentGrids, IndependentFragmentGrids_IndividualZeroInstances, IndependentFragmentGrids_IntegrationTest, IndependentFragmentGrids_Sole_NN_Calculation, JobMarket_RobustOnKillsSegFaults, JobMarket_StableWorkerPool, JobMarket_unresolvable_hostname_fix, MoreRobust_FragmentAutomation, ODR_violation_mpqc_open, PartialCharges_OrthogonalSummation, PdbParser_setsAtomName, PythonUI_with_named_parameters, QtGui_reactivate_TimeChanged_changes, Recreated_GuiChecks, Rewrite_FitPartialCharges, RotateToPrincipalAxisSystem_UndoRedo, SaturateAtoms_findBestMatching, SaturateAtoms_singleDegree, StoppableMakroAction, Subpackage_CodePatterns, Subpackage_JobMarket, Subpackage_LinearAlgebra, Subpackage_levmar, Subpackage_mpqc_open, Subpackage_vmg, Switchable_LogView, ThirdParty_MPQC_rebuilt_buildsystem, TrajectoryDependenant_MaxOrder, TremoloParser_IncreasedPrecision, TremoloParser_MultipleTimesteps, TremoloParser_setsAtomName, Ubuntu_1604_changes, stable
Children:
dc759c
Parents:
81c96b6
git-author:
Frederik Heber <heber@…> (04/24/12 16:53:31)
git-committer:
Frederik Heber <heber@…> (07/02/12 08:32:02)
Message:

OperationQueue now has a max_connections and allows only this number of running operations.

  • OperationQueue::launchNextOp() is called by update() and push_back().
  • new AddressMap takes endpoint addresses and is used to know which operations still have to be executed.
Location:
src/Fragmentation/Automation/Operations
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • src/Fragmentation/Automation/Operations/OperationQueue.cpp

    r81c96b6 r92b1d3  
    3535#include "WorkerAddress.hpp"
    3636
     37size_t OperationQueue::max_connections = 1;
     38
    3739OperationQueue::OperationQueue_t::iterator OperationQueue::findOperation(AsyncOperation *op)
    3840{
     
    4850    AsyncOp_ptr ptr(op); // this always prevents memory loss
    4951    ptr->signOn(this);
    50     queue.push_back( ptr );
     52    OperationQueue_t::iterator iter = queue.insert(queue.end(), ptr );
    5153    op = NULL;
    52     // only start operation when address is valid
    53     if ((!address.host.empty()) && (!address.service.empty()))
    54       (*ptr)(address.host, address.service);
     54    AddressMap.insert( make_pair(*iter, address) );
     55    LaunchNextOp();
    5556  } else {
    5657    ELOG(1, "Given operation pointer is NULL.");
     58  }
     59}
     60
     61void OperationQueue::LaunchNextOp()
     62{
     63  // connection available?
     64  if (getNumberOfRunningOps() < max_connections) {
     65    // only start operation when address is valid
     66    OperationQueue_t::iterator queueiter =
     67        std::find_if(queue.begin(), queue.end(),
     68            boost::bind(&AddressMap_t::count, boost::ref(AddressMap), boost::lambda::_1) );
     69    if (queueiter != queue.end()) {
     70      AddressMap_t::iterator mapiter = AddressMap.find(*queueiter);
     71      ASSERT( mapiter != AddressMap.end(),
     72          "OperationQueue::LaunchNextOp() - cannot find connection "+toString((*queueiter)->getName())+" in AddressMap.");
     73      const WorkerAddress address = mapiter->second;
     74      AsyncOp_ptr ptr = mapiter->first;
     75      // always erase the op from the list of ones pending for launch
     76      AddressMap.erase(mapiter);
     77      // only launch when not a debug op
     78      if ((!address.host.empty()) && (!address.service.empty())) {
     79        LOG(2, "DEBUG: Launching next operation " << ptr->getName() << ".");
     80       (*ptr)(address.host, address.service);
     81      } else {
     82        LOG(3, "DEBUG: Skipping debug operation " << ptr->getName() << " with empty address.");
     83      }
     84    } else {
     85      LOG(2, "DEBUG: All remaining operations are already running.");
     86    }
     87  } else {
     88    LOG(2, "DEBUG: Currently there are no free connections.");
    5789  }
    5890}
     
    82114    // remove from queue
    83115    remove(op, this);
     116    LaunchNextOp();
    84117  }
    85118}
  • src/Fragmentation/Automation/Operations/OperationQueue.hpp

    r81c96b6 r92b1d3  
    3737   */
    3838  OperationQueue() :
    39     Observer("OperationQueue")
     39    Observer("OperationQueue"),
     40    RunningOps(0)
    4041  {}
    4142  /** Default destructor for class OperationQueue.
     
    7475  void remove(AsyncOperation *op, Observer *observer);
    7576
     77  /** Returns the number of currently running operations.
     78   *
     79   * @return Gives the difference between the entries in the queue and in the AddressMap.
     80   */
     81  size_t getNumberOfRunningOps() const
     82  {
     83    return queue.size() - AddressMap.size();
     84  }
     85
     86  /** Helper to launch the next pending operation.
     87   *
     88   */
     89  void LaunchNextOp();
     90
    7691  //!> internal operation to send jobs to workers
    7792  typedef std::deque<AsyncOp_ptr> OperationQueue_t;
     
    8499  OperationQueue_t::iterator findOperation(AsyncOperation *op);
    85100
     101  //!> internal number stating how many operations are running
     102  size_t RunningOps;
     103
     104  //!> giving the maximum number of connections
     105  static size_t max_connections;
     106
    86107private:
    87108  //!> internal queue with operations
    88109  OperationQueue_t queue;
     110
     111  //!> typedef for the association for each operation to its address to connect to
     112  typedef std::map<AsyncOp_ptr, WorkerAddress> AddressMap_t;
     113  //!> Association for each operation to its address to connect to
     114  AddressMap_t AddressMap;
    89115};
    90116
Note: See TracChangeset for help on using the changeset viewer.