source: ThirdParty/JobMarket/src/JobMarket/Operations/OperationQueue.cpp@ bca6b6

AutomationFragmentation_failures Candidate_v1.6.1 ChemicalSpaceEvaluator Enhanced_StructuralOptimization_continued Exclude_Hydrogens_annealWithBondGraph ForceAnnealing_with_BondGraph ForceAnnealing_with_BondGraph_contraction-expansion Gui_displays_atomic_force_velocity JobMarket_RobustOnKillsSegFaults JobMarket_StableWorkerPool PythonUI_with_named_parameters StoppableMakroAction TremoloParser_IncreasedPrecision
Last change on this file since bca6b6 was bca6b6, checked in by Frederik Heber <frederik.heber@…>, 7 years ago

OperationQueue states when queue is empty (instead of all are running).

  • Property mode set to 100644
File size: 4.7 KB
Line 
1/*
2 * Project: JobMarket
3 * Description: asynchronous Server/Controller/Client-approach to parallel computing, based on boost::asio
4 * Copyright (C) 2012 Frederik Heber. All rights reserved.
5 *
6 */
7
8/*
9 * OperationQueue.cpp
10 *
11 * Created on: Apr 24, 2012
12 * Author: heber
13 */
14
15
16// include config.h
17#ifdef HAVE_CONFIG_H
18#include <config.h>
19#endif
20
21// boost asio needs specific operator new
22#include <boost/asio.hpp>
23
24//#include "CodePatterns/MemDebug.hpp"
25
26#include <boost/bind.hpp>
27#include <boost/lambda/lambda.hpp>
28#include <string>
29
30#include "CodePatterns/Log.hpp"
31#include "CodePatterns/Observer/Observer.hpp"
32
33#include "JobMarket/Operations/AsyncOperation.hpp"
34#include "JobMarket/Operations/OperationQueue.hpp"
35
36static void NoOp(const WorkerAddress) {}
37
38// static instances
39const boost::function<void (const WorkerAddress)> OperationQueue::NoOpCallback = boost::bind(&NoOp, _1);
40
41size_t OperationQueue::max_connections = 1;
42
43OperationQueue::OperationQueue_t::iterator OperationQueue::findOperation(AsyncOperation *op)
44{
45 OperationQueue_t::iterator iter =
46 std::find_if(queue.begin(), queue.end(),
47 boost::bind(&AsyncOp_ptr::get, boost::lambda::_1) == op);
48 return iter;
49}
50
51void OperationQueue::push_back(AsyncOperation *&op, const WorkerAddress &address)
52{
53 if (op != NULL) {
54 if (!IsBlockedFlag) {
55 AsyncOp_ptr ptr(op); // this always prevents memory loss
56 ptr->signOn(this);
57 OperationQueue_t::iterator iter = queue.insert(queue.end(), ptr );
58 op = NULL;
59 AddressMap.insert( make_pair(*iter, address) );
60 LaunchNextOp();
61 } else {
62 ELOG(1, "Queue is currently blocked, dropping operation "+op->getName()+".");
63 delete op;
64 op = NULL;
65 }
66 } else {
67 ELOG(1, "Given operation pointer is NULL.");
68 }
69}
70
71void OperationQueue::LaunchNextOp()
72{
73 // connection available?
74 if (getNumberOfRunningOps() < max_connections) {
75 if (!queue.empty()) {
76 // only start operation when address is valid
77 OperationQueue_t::iterator queueiter =
78 std::find_if(queue.begin(), queue.end(),
79 boost::bind(&AddressMap_t::count, boost::ref(AddressMap), boost::lambda::_1) );
80 if (queueiter != queue.end()) {
81 const AddressMap_t::iterator mapiter = AddressMap.find(*queueiter);
82 ASSERT( mapiter != AddressMap.end(),
83 "OperationQueue::LaunchNextOp() - cannot find connection "+toString((*queueiter)->getName())+" in AddressMap.");
84 currentOpsAddress = mapiter->second;
85 const AsyncOp_ptr ptr = mapiter->first;
86 // always erase the op from the list of ones pending for launch
87 AddressMap.erase(mapiter);
88 // only launch when not a debug op
89 if ((!currentOpsAddress.host.empty()) && (!currentOpsAddress.service.empty())) {
90 LOG(2, "DEBUG: Launching next operation " << ptr->getName() << ".");
91 (*ptr)(currentOpsAddress.host, currentOpsAddress.service);
92 } else {
93 LOG(3, "DEBUG: Skipping debug operation " << ptr->getName() << " with empty address.");
94 }
95 } else {
96 LOG(2, "DEBUG: All remaining operations are already running.");
97 }
98 } else {
99 LOG(2, "DEBUG: There are no more operations in the queue.");
100 }
101 } else {
102 LOG(2, "DEBUG: Currently there are no free connections.");
103 }
104}
105
106void OperationQueue::remove(AsyncOperation *op, Observer *observer)
107{
108 if (op != NULL) {
109 OperationQueue_t::iterator iter = findOperation(op);
110 if (iter != queue.end()) {
111 // sign off and remove op
112 if (observer != NULL)
113 op->signOff(observer);
114 queue.erase(iter);
115 } else {
116 ELOG(1, "Could not find Operation " << op->getName() << " in operation's queue.");
117 }
118 } else {
119 ELOG(1, "Given operation pointer is NULL.");
120 }
121}
122
123void OperationQueue::update(Observable *publisher)
124{
125 AsyncOperation *op = static_cast<AsyncOperation *>(publisher);
126 if (op != NULL) {
127 // check for error code of operation
128 if (op->getStatus() != Operation::success) {
129 // remove the worker from the queue
130 failure_callback(currentOpsAddress);
131 LOG(1, "INFO: We are notified that " << op->getName() << " has failed, removing ...");
132 } else {
133 LOG(1, "INFO: We are notified that " << op->getName() << " is done, removing ...");
134 }
135 // remove from queue
136 remove(op, this);
137 LaunchNextOp();
138 }
139}
140
141void OperationQueue::recieveNotification(Observable *publisher, Notification_ptr notification)
142{}
143
144void OperationQueue::subjectKilled(Observable *publisher)
145{
146 AsyncOperation *op = static_cast<AsyncOperation *>(publisher);
147 if (op != NULL) {
148 ELOG(2, "DEBUG: AsyncOperation at " << publisher << " got killed before being done?");
149 // remove from queue
150 remove(op, this);
151 }
152}
153
Note: See TracBrowser for help on using the repository browser.