1 | /*
2 | * Project: MoleCuilder
3 | * Description: creates and alters molecular systems
4 | * Copyright (C) 2012 University of Bonn. All rights reserved.
5 | * Please see the LICENSE file or "Copyright notice" in builder.cpp for details.
6 | */
7 |
8 | /*
9 | * OperationQueue.cpp
10 | *
11 | * Created on: Apr 24, 2012
12 | * Author: heber
13 | */
14 |
15 |
16 | // include config.h
17 | #ifdef HAVE_CONFIG_H
18 | #include <config.h>
19 | #endif
20 |
21 | // boost asio needs specific operator new
22 | #include <boost/asio.hpp>
23 |
24 | #include "CodePatterns/MemDebug.hpp"
25 |
26 | #include <boost/bind.hpp>
27 | #include <boost/lambda/lambda.hpp>
28 | #include <string>
29 |
30 | #include "CodePatterns/Log.hpp"
31 | #include "CodePatterns/Observer/Observer.hpp"
32 |
33 | #include "Operations/AsyncOperation.hpp"
34 | #include "Operations/OperationQueue.hpp"
35 | #include "WorkerAddress.hpp"
36 |
37 | size_t OperationQueue::max_connections = 1;
38 |
39 | OperationQueue::OperationQueue_t::iterator OperationQueue::findOperation(AsyncOperation *op)
40 | {
41 | OperationQueue_t::iterator iter =
42 | std::find_if(queue.begin(), queue.end(),
43 | boost::bind(&AsyncOp_ptr::get, boost::lambda::_1) == op);
44 | return iter;
45 | }
46 |
47 | void OperationQueue::push_back(AsyncOperation *&op, const WorkerAddress &address)
48 | {
49 | if (op != NULL) {
50 | if (!IsBlockedFlag) {
51 | AsyncOp_ptr ptr(op); // this always prevents memory loss
52 | ptr->signOn(this);
53 | OperationQueue_t::iterator iter = queue.insert(queue.end(), ptr );
54 | op = NULL;
55 | AddressMap.insert( make_pair(*iter, address) );
56 | LaunchNextOp();
57 | } else {
58 | ELOG(1, "Queue is currently blocked, dropping operation "+op->getName()+".");
59 | delete op;
60 | op = NULL;
61 | }
62 | } else {
63 | ELOG(1, "Given operation pointer is NULL.");
64 | }
65 | }
66 |
67 | void OperationQueue::LaunchNextOp()
68 | {
69 | // connection available?
70 | if (getNumberOfRunningOps() < max_connections) {
71 | // only start operation when address is valid
72 | OperationQueue_t::iterator queueiter =
73 | std::find_if(queue.begin(), queue.end(),
74 | boost::bind(&AddressMap_t::count, boost::ref(AddressMap), boost::lambda::_1) );
75 | if (queueiter != queue.end()) {
76 | AddressMap_t::iterator mapiter = AddressMap.find(*queueiter);
77 | ASSERT( mapiter != AddressMap.end(),
78 | "OperationQueue::LaunchNextOp() - cannot find connection "+toString((*queueiter)->getName())+" in AddressMap.");
79 | const WorkerAddress address = mapiter->second;
80 | AsyncOp_ptr ptr = mapiter->first;
81 | // always erase the op from the list of ones pending for launch
82 | AddressMap.erase(mapiter);
83 | // only launch when not a debug op
84 | if ((!address.host.empty()) && (!address.service.empty())) {
85 | LOG(2, "DEBUG: Launching next operation " << ptr->getName() << ".");
86 | (*ptr)(address.host, address.service);
87 | } else {
88 | LOG(3, "DEBUG: Skipping debug operation " << ptr->getName() << " with empty address.");
89 | }
90 | } else {
91 | LOG(2, "DEBUG: All remaining operations are already running.");
92 | }
93 | } else {
94 | LOG(2, "DEBUG: Currently there are no free connections.");
95 | }
96 | }
97 |
98 | void OperationQueue::remove(AsyncOperation *op, Observer *observer)
99 | {
100 | if (op != NULL) {
101 | OperationQueue_t::iterator iter = findOperation(op);
102 | if (iter != queue.end()) {
103 | // sign off and remove op
104 | if (observer != NULL)
105 | op->signOff(observer);
106 | queue.erase(iter);
107 | } else {
108 | ELOG(1, "Could not find Operation " << op->getName() << " in operation's queue.");
109 | }
110 | } else {
111 | ELOG(1, "Given operation pointer is NULL.");
112 | }
113 | }
114 |
115 | void OperationQueue::update(Observable *publisher)
116 | {
117 | AsyncOperation *op = static_cast<AsyncOperation *>(publisher);
118 | if (op != NULL) {
119 | LOG(1, "INFO: We are note notified that " << op->getName() << " is done, removing ...");
120 | // remove from queue
121 | remove(op, this);
122 | LaunchNextOp();
123 | }
124 | }
125 |
126 | void OperationQueue::recieveNotification(Observable *publisher, Notification_ptr notification)
127 | {}
128 |
129 | void OperationQueue::subjectKilled(Observable *publisher)
130 | {
131 | AsyncOperation *op = static_cast<AsyncOperation *>(publisher);
132 | if (op != NULL) {
133 | ELOG(2, "DEBUG: AsyncOperation at " << publisher << " got killed before being done?");
134 | // remove from queue
135 | remove(op, this);
136 | }
137 | }
138 |