1 | /*
|
---|
2 | * Project: MoleCuilder
|
---|
3 | * Description: creates and alters molecular systems
|
---|
4 | * Copyright (C) 2012 University of Bonn. All rights reserved.
|
---|
5 | * Please see the LICENSE file or "Copyright notice" in builder.cpp for details.
|
---|
6 | */
|
---|
7 |
|
---|
8 | /*
|
---|
9 | * OperationQueue.cpp
|
---|
10 | *
|
---|
11 | * Created on: Apr 24, 2012
|
---|
12 | * Author: heber
|
---|
13 | */
|
---|
14 |
|
---|
15 |
|
---|
16 | // include config.h
|
---|
17 | #ifdef HAVE_CONFIG_H
|
---|
18 | #include <config.h>
|
---|
19 | #endif
|
---|
20 |
|
---|
21 | // boost asio needs specific operator new
|
---|
22 | #include <boost/asio.hpp>
|
---|
23 |
|
---|
24 | #include "CodePatterns/MemDebug.hpp"
|
---|
25 |
|
---|
26 | #include <boost/bind.hpp>
|
---|
27 | #include <boost/lambda/lambda.hpp>
|
---|
28 | #include <string>
|
---|
29 |
|
---|
30 | #include "CodePatterns/Log.hpp"
|
---|
31 | #include "CodePatterns/Observer/Observer.hpp"
|
---|
32 |
|
---|
33 | #include "Operations/AsyncOperation.hpp"
|
---|
34 | #include "Operations/OperationQueue.hpp"
|
---|
35 | #include "WorkerAddress.hpp"
|
---|
36 |
|
---|
37 | size_t OperationQueue::max_connections = 1;
|
---|
38 |
|
---|
39 | OperationQueue::OperationQueue_t::iterator OperationQueue::findOperation(AsyncOperation *op)
|
---|
40 | {
|
---|
41 | OperationQueue_t::iterator iter =
|
---|
42 | std::find_if(queue.begin(), queue.end(),
|
---|
43 | boost::bind(&AsyncOp_ptr::get, boost::lambda::_1) == op);
|
---|
44 | return iter;
|
---|
45 | }
|
---|
46 |
|
---|
47 | void OperationQueue::push_back(AsyncOperation *&op, const WorkerAddress &address)
|
---|
48 | {
|
---|
49 | if (op != NULL) {
|
---|
50 | if (!IsBlockedFlag) {
|
---|
51 | AsyncOp_ptr ptr(op); // this always prevents memory loss
|
---|
52 | ptr->signOn(this);
|
---|
53 | OperationQueue_t::iterator iter = queue.insert(queue.end(), ptr );
|
---|
54 | op = NULL;
|
---|
55 | AddressMap.insert( make_pair(*iter, address) );
|
---|
56 | LaunchNextOp();
|
---|
57 | } else {
|
---|
58 | ELOG(1, "Queue is currently blocked, dropping operation "+op->getName()+".");
|
---|
59 | delete op;
|
---|
60 | op = NULL;
|
---|
61 | }
|
---|
62 | } else {
|
---|
63 | ELOG(1, "Given operation pointer is NULL.");
|
---|
64 | }
|
---|
65 | }
|
---|
66 |
|
---|
67 | void OperationQueue::LaunchNextOp()
|
---|
68 | {
|
---|
69 | // connection available?
|
---|
70 | if (getNumberOfRunningOps() < max_connections) {
|
---|
71 | // only start operation when address is valid
|
---|
72 | OperationQueue_t::iterator queueiter =
|
---|
73 | std::find_if(queue.begin(), queue.end(),
|
---|
74 | boost::bind(&AddressMap_t::count, boost::ref(AddressMap), boost::lambda::_1) );
|
---|
75 | if (queueiter != queue.end()) {
|
---|
76 | AddressMap_t::iterator mapiter = AddressMap.find(*queueiter);
|
---|
77 | ASSERT( mapiter != AddressMap.end(),
|
---|
78 | "OperationQueue::LaunchNextOp() - cannot find connection "+toString((*queueiter)->getName())+" in AddressMap.");
|
---|
79 | const WorkerAddress address = mapiter->second;
|
---|
80 | AsyncOp_ptr ptr = mapiter->first;
|
---|
81 | // always erase the op from the list of ones pending for launch
|
---|
82 | AddressMap.erase(mapiter);
|
---|
83 | // only launch when not a debug op
|
---|
84 | if ((!address.host.empty()) && (!address.service.empty())) {
|
---|
85 | LOG(2, "DEBUG: Launching next operation " << ptr->getName() << ".");
|
---|
86 | (*ptr)(address.host, address.service);
|
---|
87 | } else {
|
---|
88 | LOG(3, "DEBUG: Skipping debug operation " << ptr->getName() << " with empty address.");
|
---|
89 | }
|
---|
90 | } else {
|
---|
91 | LOG(2, "DEBUG: All remaining operations are already running.");
|
---|
92 | }
|
---|
93 | } else {
|
---|
94 | LOG(2, "DEBUG: Currently there are no free connections.");
|
---|
95 | }
|
---|
96 | }
|
---|
97 |
|
---|
98 | void OperationQueue::remove(AsyncOperation *op, Observer *observer)
|
---|
99 | {
|
---|
100 | if (op != NULL) {
|
---|
101 | OperationQueue_t::iterator iter = findOperation(op);
|
---|
102 | if (iter != queue.end()) {
|
---|
103 | // sign off and remove op
|
---|
104 | if (observer != NULL)
|
---|
105 | op->signOff(observer);
|
---|
106 | queue.erase(iter);
|
---|
107 | } else {
|
---|
108 | ELOG(1, "Could not find Operation " << op->getName() << " in operation's queue.");
|
---|
109 | }
|
---|
110 | } else {
|
---|
111 | ELOG(1, "Given operation pointer is NULL.");
|
---|
112 | }
|
---|
113 | }
|
---|
114 |
|
---|
115 | void OperationQueue::update(Observable *publisher)
|
---|
116 | {
|
---|
117 | AsyncOperation *op = static_cast<AsyncOperation *>(publisher);
|
---|
118 | if (op != NULL) {
|
---|
119 | LOG(1, "INFO: We are note notified that " << op->getName() << " is done, removing ...");
|
---|
120 | // remove from queue
|
---|
121 | remove(op, this);
|
---|
122 | LaunchNextOp();
|
---|
123 | }
|
---|
124 | }
|
---|
125 |
|
---|
126 | void OperationQueue::recieveNotification(Observable *publisher, Notification_ptr notification)
|
---|
127 | {}
|
---|
128 |
|
---|
129 | void OperationQueue::subjectKilled(Observable *publisher)
|
---|
130 | {
|
---|
131 | AsyncOperation *op = static_cast<AsyncOperation *>(publisher);
|
---|
132 | if (op != NULL) {
|
---|
133 | ELOG(2, "DEBUG: AsyncOperation at " << publisher << " got killed before being done?");
|
---|
134 | // remove from queue
|
---|
135 | remove(op, this);
|
---|
136 | }
|
---|
137 | }
|
---|
138 |
|
---|