1 | /*
|
---|
2 | * Project: MoleCuilder
|
---|
3 | * Description: creates and alters molecular systems
|
---|
4 | * Copyright (C) 2012 University of Bonn. All rights reserved.
|
---|
5 | * Please see the LICENSE file or "Copyright notice" in builder.cpp for details.
|
---|
6 | */
|
---|
7 |
|
---|
8 | /*
|
---|
9 | * \file PoolWorker.cpp
|
---|
10 | *
|
---|
11 | * This file strongly follows the Serialization example from the boost::asio
|
---|
12 | * library (see client.cpp).
|
---|
13 | *
|
---|
14 | * Created on: Feb 28, 2012
|
---|
15 | * Author: heber
|
---|
16 | */
|
---|
17 |
|
---|
18 | // include config.h
|
---|
19 | #ifdef HAVE_CONFIG_H
|
---|
20 | #include <config.h>
|
---|
21 | #endif
|
---|
22 |
|
---|
23 | // boost asio needs specific operator new
|
---|
24 | #include <boost/asio.hpp>
|
---|
25 |
|
---|
26 | #include "CodePatterns/MemDebug.hpp"
|
---|
27 |
|
---|
28 | #include <boost/bind.hpp>
|
---|
29 | #include <boost/lexical_cast.hpp>
|
---|
30 | #include <iostream>
|
---|
31 | #include <vector>
|
---|
32 | #include "Connection.hpp" // Must come before boost/serialization headers.
|
---|
33 | #include <boost/serialization/vector.hpp>
|
---|
34 | #include "CodePatterns/Info.hpp"
|
---|
35 | #include "CodePatterns/Log.hpp"
|
---|
36 | #include "Jobs/FragmentJob.hpp"
|
---|
37 | #include "Jobs/MPQCCommandJob.hpp"
|
---|
38 | #include "Jobs/SystemCommandJob.hpp"
|
---|
39 | #include "Results/FragmentResult.hpp"
|
---|
40 | #include "PoolWorker.hpp"
|
---|
41 |
|
---|
42 | /** Helper function to enforce binding of PoolWorker to possible derived
|
---|
43 | * FragmentJob classes.
|
---|
44 | */
|
---|
45 | void dummyInit() {
|
---|
46 | SystemCommandJob("/bin/false", "something", JobId::IllegalJob);
|
---|
47 | MPQCCommandJob("nofile", JobId::IllegalJob);
|
---|
48 | }
|
---|
49 |
|
---|
50 | /** Constructor for class PoolWorker.
|
---|
51 | *
|
---|
52 | * We automatically connect to the given pool and enroll.
|
---|
53 | *
|
---|
54 | * @param _io_service io service for creating connections
|
---|
55 | * @param _host host part of address of pool to connect to
|
---|
56 | * @param _service service part of address of pool to connect to
|
---|
57 | * @param listenhost host part of address of this instance for listening to pool connections
|
---|
58 | * @param listenservice seervice part of address of this instance for listening to pool connections
|
---|
59 | */
|
---|
60 | PoolWorker::PoolWorker(
|
---|
61 | boost::asio::io_service& _io_service,
|
---|
62 | const std::string& _host,
|
---|
63 | const std::string& _service,
|
---|
64 | const std::string& listenhost,
|
---|
65 | const std::string& listenservice) :
|
---|
66 | io_service(_io_service),
|
---|
67 | PoolListener(_io_service, boost::lexical_cast<unsigned short>(listenservice), *this),
|
---|
68 | address(listenhost, listenservice),
|
---|
69 | connection_(_io_service),
|
---|
70 | enrollOp(connection_, address),
|
---|
71 | submitOp(connection_, address),
|
---|
72 | submitresult(boost::bind(&AsyncOperation::operator(),
|
---|
73 | boost::ref(submitOp),
|
---|
74 | _host, _service)),
|
---|
75 | removeOp(connection_, address),
|
---|
76 | removeme(boost::bind(&Operation::operator(),
|
---|
77 | boost::ref(removeOp),
|
---|
78 | _host, _service))
|
---|
79 | {
|
---|
80 | Info info(__FUNCTION__);
|
---|
81 |
|
---|
82 | // always enroll
|
---|
83 | enrollOp(_host,_service);
|
---|
84 |
|
---|
85 | // initiate listening
|
---|
86 | PoolListener.initiateSocket();
|
---|
87 | }
|
---|
88 |
|
---|
89 | /// Handle completion of a accept server operation.
|
---|
90 | void PoolWorker::PoolListener_t::handle_Accept(const boost::system::error_code& e, connection_ptr conn)
|
---|
91 | {
|
---|
92 | Info info(__FUNCTION__);
|
---|
93 | if (!e)
|
---|
94 | {
|
---|
95 | conn->async_read(job,
|
---|
96 | boost::bind(&PoolWorker::PoolListener_t::handle_ReceiveJob, this,
|
---|
97 | boost::asio::placeholders::error, conn));
|
---|
98 | // and listen for following connections
|
---|
99 | //initiateSocket();
|
---|
100 | }
|
---|
101 | else
|
---|
102 | {
|
---|
103 | // An error occurred. Log it and return. Since we are not starting a new
|
---|
104 | // accept operation the io_service will run out of work to do and the
|
---|
105 | // server will exit.
|
---|
106 | ELOG(0, e.message());
|
---|
107 | }
|
---|
108 | }
|
---|
109 |
|
---|
110 | /// Controller callback function when job has been sent.
|
---|
111 | void PoolWorker::PoolListener_t::handle_ReceiveJob(const boost::system::error_code& e, connection_ptr conn)
|
---|
112 | {
|
---|
113 | Info info(__FUNCTION__);
|
---|
114 |
|
---|
115 | if (!e)
|
---|
116 | {
|
---|
117 | LOG(1, "INFO: Received job " << job->getId() << ".");
|
---|
118 | callback.WorkOnJob(job);
|
---|
119 | }
|
---|
120 | else
|
---|
121 | {
|
---|
122 | // An error occurred. Log it and return. Since we are not starting a new
|
---|
123 | // accept operation the io_service will run out of work to do and the
|
---|
124 | // server will exit.
|
---|
125 | ELOG(0, e.message());
|
---|
126 | }
|
---|
127 | }
|
---|
128 |
|
---|
129 | /** Works on the given job and send the results.
|
---|
130 | *
|
---|
131 | * @param job job to work on
|
---|
132 | */
|
---|
133 | void PoolWorker::WorkOnJob(FragmentJob::ptr &job)
|
---|
134 | {
|
---|
135 | // work on job and create result
|
---|
136 | LOG(2, "DEBUG: Beginning to work on " << job->getId() << ".");
|
---|
137 | FragmentResult::ptr result = job->Work();
|
---|
138 | LOG(2, "DEBUG: Setting result " << result->getId() << ".");
|
---|
139 | submitOp.setResult(result);
|
---|
140 |
|
---|
141 | // submit result
|
---|
142 | LOG(2, "DEBUG: Sending result ...");
|
---|
143 | submitresult();
|
---|
144 | }
|
---|
145 |
|
---|
146 | /** Helper function to shutdown the worker properly.
|
---|
147 | *
|
---|
148 | * Note that we will use ShutdownWorkerOperation to unlist from server's pool.
|
---|
149 | *
|
---|
150 | */
|
---|
151 | void PoolWorker::shutdown(int sig)
|
---|
152 | {
|
---|
153 | LOG(1, "INFO: Shutting down due to signal "+toString(sig)+".");
|
---|
154 |
|
---|
155 | // remove us from pool
|
---|
156 | removeme();
|
---|
157 |
|
---|
158 | // somehow stop listener
|
---|
159 | PoolListener.closeSocket();
|
---|
160 |
|
---|
161 | // finally, stop io_service
|
---|
162 | io_service.stop();
|
---|
163 | }
|
---|