FACT++  1.0
Connection.cc
Go to the documentation of this file.
1 // **************************************************************************
10 // **************************************************************************
11 #include "Connection.h"
12 
13 using namespace std;
14 
15 namespace ba = boost::asio;
16 namespace bs = boost::system;
17 namespace dummy = ba::placeholders;
18 
19 using ba::ip::tcp;
20 
21  // -------- Abbreviations for starting async tasks ---------
22 
23 int Connection::Write(const Time &t, const string &txt, int qos)
24 {
25  if (fLog)
26  return fLog->Write(t, txt, qos);
27 
28  return MessageImp::Write(t, txt, qos);
29 }
30 
31 void Connection::AsyncRead(const ba::mutable_buffers_1 buffers, int type)
32 {
33  ba::async_read(*this, buffers,
34  boost::bind(&Connection::HandleReceivedData, this,
35  dummy::error, dummy::bytes_transferred, type));
36 }
37 
38 void Connection::AsyncWrite(const ba::const_buffers_1 &buffers)
39 {
40  ba::async_write(*this, buffers,
41  boost::bind(&Connection::HandleSentData, this,
42  dummy::error, dummy::bytes_transferred));
43 }
44 
45 /*
46 void Connection::AsyncWait(ba::deadline_timer &timer, int millisec,
47  void (Connection::*handler)(const bs::error_code&))
48 {
49  // - The boost::asio::basic_deadline_timer::expires_from_now()
50  // function cancels any pending asynchronous waits, and returns
51  // the number of asynchronous waits that were cancelled. If it
52  // returns 0 then you were too late and the wait handler has
53  // already been executed, or will soon be executed. If it
54  // returns 1 then the wait handler was successfully cancelled.
55  // - If a wait handler is cancelled, the bs::error_code passed to
56  // it contains the value bs::error::operation_aborted.
57  timer.expires_from_now(boost::posix_time::milliseconds(millisec));
58 
59  timer.async_wait(boost::bind(handler, this, dummy::error));
60 }
61 */
62 
63 void Connection::AsyncConnect(tcp::resolver::iterator iterator)
64 {
65  tcp::endpoint endpoint = *iterator;
66 
67  // AsyncConnect + Deadline
68  async_connect(endpoint,
69  boost::bind(&Connection::ConnectIter,
70  this, iterator, ba::placeholders::error));
71 
72  // We will get a "Connection timeout anyway"
73  //AsyncWait(fConnectTimeout, 5, &Connection::HandleConnectTimeout);
74 }
75 
77 {
78  // AsyncConnect + Deadline
79  async_connect(fEndpoint,
80  boost::bind(&Connection::ConnectAddr,
81  this, fEndpoint, ba::placeholders::error));
82 
83  // We will get a "Connection timeout anyway"
84  //AsyncWait(fConnectTimeout, 5, &Connection::HandleConnectTimeout);
85 }
86 
87 // ------------------------ close --------------------------
88 // close from another thread
89 void Connection::CloseImp(bool restart)
90 {
91  if (IsConnected() && fVerbose)
92  {
93  ostringstream str;
94  str << "Connection closed to " << URL() << ".";
95  Info(str);
96  }
97 
98  // Stop any pending connection attempt
99  fConnectionTimer.cancel();
100 
101  // Close possible open connections
102  close();
103 
104  // Reset the connection status
105  fQueueSize = 0;
106  fConnectionStatus = kDisconnected;
107 
108  // Stop deadline counters
109  fInTimeout.cancel();
110  fOutTimeout.cancel();
111 
112  if (!restart || IsConnecting())
113  return;
114 
115  // We need some timeout before reconnecting!
116  // And we have to check if we are alreayd trying to connect
117  // We shoudl wait until all operations in progress were canceled
118 
119  // Start trying to reconnect
120  fMsgConnect = "";
121  fErrConnect = "";
122  StartConnect();
123 }
124 
125 void Connection::PostClose(bool restart)
126 {
127  get_io_service().post(boost::bind(&Connection::CloseImp, this, restart));
128 }
129 
130 // ------------------------ write --------------------------
131 void Connection::HandleWriteTimeout(const bs::error_code &error)
132 {
133  if (error==ba::error::basic_errors::operation_aborted)
134  return;
135 
136  // 125: Operation canceled (bs::error_code(125, bs::system_category))
137  if (error)
138  {
139  ostringstream str;
140  str << "Write timeout of " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
141  Error(str);
142 
143  CloseImp();
144  return;
145  }
146 
147  if (!is_open())
148  {
149  // For example: Here we could schedule a new accept if we
150  // would not want to allow two connections at the same time.
151  return;
152  }
153 
154  // Check whether the deadline has passed. We compare the deadline
155  // against the current time since a new asynchronous operation
156  // may have moved the deadline before this actor had a chance
157  // to run.
158  if (fOutTimeout.expires_at() > ba::deadline_timer::traits_type::now())
159  return;
160 
161  Error("fOutTimeout has expired, writing data to "+URL());
162 
163  CloseImp();
164 }
165 
166 void Connection::HandleSentData(const bs::error_code& error, size_t n)
167 {
168  if (error==ba::error::basic_errors::operation_aborted)
169  return;
170 
171  if (error && error != ba::error::not_connected)
172  {
173  ostringstream str;
174  str << "Writing to " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
175  Error(str);
176 
177  CloseImp();
178  return;
179  }
180 
181  if (error == ba::error::not_connected)
182  {
183  ostringstream msg;
184  msg << n << " bytes could not be sent to " << URL() << " due to missing connection.";
185  Warn(msg);
186 
187  return;
188  }
189 
190  if (--fQueueSize==0)
191  fOutTimeout.cancel();
192 
193  if (fDebugTx)
194  {
195  ostringstream msg;
196  msg << n << " bytes successfully sent to " << URL();
197  Debug(msg);
198  }
199 }
200 
201 void Connection::PostMessage(const void *ptr, size_t sz)
202 {
203  // This function can be called from a different thread...
204  if (!is_open())
205  return;
206 
207  // ... this is why we have to increase fQueueSize first
208  fQueueSize++;
209 
210  // ... and shift the deadline timer
211  // This is not ideal, because if we are continously
212  // filling the buffer, it will never timeout
213  AsyncWait(fOutTimeout, 5000, &Connection::HandleWriteTimeout);
214 
215  // Now we can schedule the buffer to be sent
216  AsyncWrite(ba::const_buffers_1(ptr, sz));
217 
218  // If a socket is closed, all pending asynchronous
219  // operation will be aborted.
220 }
221 
222 void Connection::PostMessage(const string &cmd, size_t max)
223 {
224  if (max==size_t(-1))
225  max = cmd.length()+1;
226 
227  PostMessage(cmd.c_str(), min(cmd.length()+1, max));
228 }
229 
230 void Connection::HandleConnectionTimer(const bs::error_code &error)
231 {
232  if (error==ba::error::basic_errors::operation_aborted)
233  return;
234 
235  if (error)
236  {
237  ostringstream str;
238  str << "Connetion timer of " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
239  Error(str);
240  }
241 
242  if (is_open())
243  {
244  // For example: Here we could schedule a new accept if we
245  // would not want to allow two connections at the same time.
246  return;
247  }
248 
249  // Check whether the deadline has passed. We compare the deadline
250  // against the current time since a new asynchronous operation
251  // may have moved the deadline before this actor had a chance
252  // to run.
253  if (fConnectionTimer.expires_at() < ba::deadline_timer::traits_type::now())
254  StartConnect();
255 }
256 
257 bool Connection::ConnectImp(const tcp::endpoint &endpoint, const bs::error_code& error)
258 {
259  const string host = endpoint.port()==0 ? "" :
260  endpoint.address().to_string()+':'+to_string((long long unsigned int)endpoint.port());
261 
262  // Connection established
263  if (!error)
264  {
265  set_option(socket_base::keep_alive(true));
266 
267  const int optval = 30;
268  // First keep alive after 30s
269  setsockopt(native(), SOL_TCP, TCP_KEEPIDLE, &optval, sizeof(optval));
270  // New keep alive after 30s
271  setsockopt(native(), SOL_TCP, TCP_KEEPINTVL, &optval, sizeof(optval));
272 
273  if (fVerbose)
274  Info("Connection established to "+host+"...");
275 
276  fQueueSize = 0;
277  fConnectionStatus = kConnected;
278 
279  ConnectionEstablished();
280  return true;
281  }
282 
283  // If returning from run will lead to deletion of this
284  // instance, close() is not needed (maybe implicitly called).
285  // If run is called again, close() is needed here. Otherwise:
286  // Software caused connection abort when we try to resolve
287  // the endpoint again.
288  CloseImp(false);
289 
290  ostringstream msg;
291  if (!host.empty())
292  msg << "Connecting to " << host << ": " << error.message() << " (" << error << ")";
293 
294  if (fErrConnect!=msg.str())
295  {
296  if (error!=ba::error::basic_errors::connection_refused)
297  fMsgConnect = "";
298  fErrConnect = msg.str();
299  Warn(fErrConnect);
300  }
301 
302  if (error==ba::error::basic_errors::operation_aborted)
303  return true;
304 
305  fConnectionStatus = kConnecting;
306 
307  return false;
308 /*
309  // Go on with the next
310  if (++iterator != tcp::resolver::iterator())
311  {
312  AsyncConnect(iterator);
313  return;
314  }
315 */
316  // No more entries to try, if we would not put anything else
317  // into the queue anymore it would now return (run() would return)
318 
319  // Since we don't want to block the main loop, we wait using an
320  // asnychronous timer
321 
322  // FIXME: Should we move this before AsyncConnect() ?
323 // AsyncWait(fConnectionTimer, 250, &Connection::HandleConnectionTimer);
324 }
325 
326 void Connection::ConnectIter(tcp::resolver::iterator iterator, const bs::error_code& error)
327 {
328  if (ConnectImp(*iterator, error))
329  return;
330 
331  // Go on with the next
332  if (++iterator != tcp::resolver::iterator())
333  {
334  AsyncConnect(iterator);
335  return;
336  }
337 
338  // No more entries to try, if we would not put anything else
339  // into the queue anymore it would now return (run() would return)
340  AsyncWait(fConnectionTimer, 250, &Connection::HandleConnectionTimer);
341 }
342 
343 void Connection::ConnectAddr(const tcp::endpoint &endpoint, const bs::error_code& error)
344 {
345  if (ConnectImp(endpoint, error))
346  return;
347 
348  AsyncWait(fConnectionTimer, 250, &Connection::HandleConnectionTimer);
349 }
350 
351 // FIXME: Async connect should get address and port as an argument
353 {
354  fConnectionStatus = kConnecting;
355 
356  if (fEndpoint!=tcp::endpoint())
357  {
358  ostringstream msg;
359  msg << "Trying to connect to " << fEndpoint << "...";
360  if (fMsgConnect!=msg.str())
361  {
362  fMsgConnect = msg.str();
363  Info(msg);
364  }
365 
366  AsyncConnect();
367  return;
368  }
369 
370  const bool valid = !fAddress.empty() || !fPort.empty();
371 
372  boost::system::error_code ec;
373 
374  ostringstream msg;
375  if (!valid)
376  msg << "No target address... connection attempt postponed.";
377  else
378  {
379  tcp::resolver resolver(get_io_service());
380 
381  tcp::resolver::query query(fAddress, fPort);
382  tcp::resolver::iterator iterator = resolver.resolve(query, ec);
383 
384  msg << "Trying to connect to " << URL() << "...";
385 
386  // Start connection attempts (will also reset deadline counter)
387  if (!ec)
388  AsyncConnect(iterator);
389  else
390  msg << " " << ec.message() << " (" << ec << ")";
391  }
392 
393  // Only output message if it has changed
394  if (fMsgConnect!=msg.str())
395  {
396  fMsgConnect = msg.str();
397  if (ec)
398  Error(msg);
399  if (!ec && fVerbose)
400  Info(msg);
401  }
402 
403  if (!valid || ec)
404  AsyncWait(fConnectionTimer, 250, &Connection::HandleConnectionTimer);
405 }
406 
407 void Connection::SetEndpoint(const string &addr, int port)
408 {
409  if (fConnectionStatus>=1)
410  Warn("Connection or connection attempt in progress. New endpoint only valid for next connection.");
411 
412  fAddress = addr;
413  fPort = to_string((long long)port);
414 }
415 
416 void Connection::SetEndpoint(const string &addr, const string &port)
417 {
418  if (fConnectionStatus>=1 && URL()!=":")
419  Warn("Connection or connection attempt in progress. New endpoint only valid for next connection.");
420 
421  fAddress = addr;
422  fPort = port;
423 }
424 
425 void Connection::SetEndpoint(const string &addr)
426 {
427  const size_t p0 = addr.find_first_of(':');
428  const size_t p1 = addr.find_last_of(':');
429 
430  if (p0==string::npos || p0!=p1)
431  {
432  Error("Connection::SetEndpoint - Wrong format of argument '"+addr+"' ('host:port' expected)");
433  return;
434  }
435 
436  SetEndpoint(addr.substr(0, p0), addr.substr(p0+1));
437 }
438 
439 void Connection::SetEndpoint(const tcp::endpoint &ep)
440 {
441  const ba::ip::address addr = ep.address();
442 
443  const ba::ip::address use =
444  addr.is_v6() && addr.to_v6().is_loopback() ?
445  ba::ip::address(ba::ip::address_v4::loopback()) :
446  addr;
447 
448  SetEndpoint(use.to_string(), ep.port());
449 
450  fEndpoint = tcp::endpoint(use, ep.port());
451 }
452 
453 
454 Connection::Connection(ba::io_service& ioservice, ostream &out) :
455 MessageImp(out), tcp::socket(ioservice),
456 fLog(0), fVerbose(true), fDebugTx(false),
457 fInTimeout(ioservice), fOutTimeout(ioservice), fConnectionTimer(ioservice),
458 fQueueSize(0), fConnectionStatus(kDisconnected)
459 {
460 }
void PostMessage(const void *msg, size_t s=0)
bool ConnectImp(const boost::asio::ip::tcp::endpoint &endpoint, const boost::system::error_code &error)
Definition: Connection.cc:257
void PostClose(bool restart=true)
Definition: Connection.cc:125
virtual void HandleReceivedData(const boost::system::error_code &, size_t, int=0)
Definition: Connection.h:137
void HandleConnectionTimer(const boost::system::error_code &error)
Definition: Connection.cc:230
The base implementation of a distributed messaging system.
Definition: MessageImp.h:10
Adds some functionality to boost::posix_time::ptime for our needs.
Definition: Time.h:30
char str[80]
Definition: test_client.c:7
void CloseImp(bool restart=true)
Definition: Connection.cc:89
void AsyncConnect()
Definition: Connection.cc:76
STL namespace.
int Write(const Time &t, const std::string &txt, int qos=kInfo)
Definition: Connection.cc:23
void HandleWriteTimeout(const boost::system::error_code &error)
Definition: Connection.cc:131
void HandleSentData(const boost::system::error_code &error, size_t)
Definition: Connection.cc:166
void SetEndpoint(const std::string &addr, int port)
virtual void StartConnect()
Definition: Connection.cc:352
void AsyncWrite(const boost::asio::const_buffers_1 &buffers)
Definition: Connection.cc:38
virtual int Write(const Time &time, const std::string &txt, int qos=kMessage)
Definition: MessageImp.cc:133
int type
uint16_t qos
Definition: HeadersGPS.h:29
void ConnectAddr(const boost::asio::ip::tcp::endpoint &endpoint, const boost::system::error_code &error)
Definition: Connection.cc:343
bool valid() const
Definition: HeadersFTM.h:271
void AsyncRead(const boost::asio::mutable_buffers_1 buffers, int type=0)
Definition: Connection.cc:31
void ConnectIter(boost::asio::ip::tcp::resolver::iterator endpoint_iterator, const boost::system::error_code &error)
Definition: Connection.cc:326
static int Debug
Definition: dns.c:78
TT t
Definition: test_client.c:26
Error()
Definition: HeadersFTM.h:197
Connection(boost::asio::io_service &io_service, std::ostream &out)
Definition: Connection.cc:454