15 namespace ba = boost::asio;
16 namespace bs = boost::system;
17 namespace dummy = ba::placeholders;
26 return fLog->Write(t, txt, qos);
33 ba::async_read(*
this, buffers,
35 dummy::error, dummy::bytes_transferred, type));
40 ba::async_write(*
this, buffers,
42 dummy::error, dummy::bytes_transferred));
65 tcp::endpoint endpoint = *iterator;
68 async_connect(endpoint,
70 this, iterator, ba::placeholders::error));
79 async_connect(fEndpoint,
81 this, fEndpoint, ba::placeholders::error));
91 if (IsConnected() && fVerbose)
94 str <<
"Connection closed to " << URL() <<
".";
99 fConnectionTimer.cancel();
110 fOutTimeout.cancel();
112 if (!restart || IsConnecting())
133 if (error==ba::error::basic_errors::operation_aborted)
140 str <<
"Write timeout of " << URL() <<
": " << error.message() <<
" (" << error <<
")";
158 if (fOutTimeout.expires_at() > ba::deadline_timer::traits_type::now())
161 Error(
"fOutTimeout has expired, writing data to "+URL());
168 if (error==ba::error::basic_errors::operation_aborted)
171 if (error && error != ba::error::not_connected)
174 str <<
"Writing to " << URL() <<
": " << error.message() <<
" (" << error <<
")";
181 if (error == ba::error::not_connected)
184 msg << n <<
" bytes could not be sent to " << URL() <<
" due to missing connection.";
191 fOutTimeout.cancel();
196 msg << n <<
" bytes successfully sent to " << URL();
216 AsyncWrite(ba::const_buffers_1(ptr, sz));
225 max = cmd.length()+1;
227 PostMessage(cmd.c_str(), min(cmd.length()+1, max));
232 if (error==ba::error::basic_errors::operation_aborted)
238 str <<
"Connetion timer of " << URL() <<
": " << error.message() <<
" (" << error <<
")";
253 if (fConnectionTimer.expires_at() < ba::deadline_timer::traits_type::now())
259 const string host = endpoint.port()==0 ?
"" :
260 endpoint.address().to_string()+
':'+to_string((
long long unsigned int)endpoint.port());
265 set_option(socket_base::keep_alive(
true));
267 const int optval = 30;
269 setsockopt(native(), SOL_TCP, TCP_KEEPIDLE, &optval,
sizeof(optval));
271 setsockopt(native(), SOL_TCP, TCP_KEEPINTVL, &optval,
sizeof(optval));
274 Info(
"Connection established to "+host+
"...");
279 ConnectionEstablished();
292 msg <<
"Connecting to " << host <<
": " << error.message() <<
" (" << error <<
")";
294 if (fErrConnect!=msg.str())
296 if (error!=ba::error::basic_errors::connection_refused)
298 fErrConnect = msg.str();
302 if (error==ba::error::basic_errors::operation_aborted)
328 if (ConnectImp(*iterator, error))
332 if (++iterator != tcp::resolver::iterator())
334 AsyncConnect(iterator);
345 if (ConnectImp(endpoint, error))
356 if (fEndpoint!=tcp::endpoint())
359 msg <<
"Trying to connect to " << fEndpoint <<
"...";
360 if (fMsgConnect!=msg.str())
362 fMsgConnect = msg.str();
370 const bool valid = !fAddress.empty() || !fPort.empty();
372 boost::system::error_code ec;
376 msg <<
"No target address... connection attempt postponed.";
379 tcp::resolver resolver(get_io_service());
381 tcp::resolver::query query(fAddress, fPort);
382 tcp::resolver::iterator iterator = resolver.resolve(query, ec);
384 msg <<
"Trying to connect to " << URL() <<
"...";
388 AsyncConnect(iterator);
390 msg <<
" " << ec.message() <<
" (" << ec <<
")";
394 if (fMsgConnect!=msg.str())
396 fMsgConnect = msg.str();
409 if (fConnectionStatus>=1)
410 Warn(
"Connection or connection attempt in progress. New endpoint only valid for next connection.");
413 fPort = to_string((
long long)port);
418 if (fConnectionStatus>=1 && URL()!=
":")
419 Warn(
"Connection or connection attempt in progress. New endpoint only valid for next connection.");
427 const size_t p0 = addr.find_first_of(
':');
428 const size_t p1 = addr.find_last_of(
':');
430 if (p0==string::npos || p0!=p1)
432 Error(
"Connection::SetEndpoint - Wrong format of argument '"+addr+
"' ('host:port' expected)");
436 SetEndpoint(addr.substr(0, p0), addr.substr(p0+1));
441 const ba::ip::address addr = ep.address();
443 const ba::ip::address use =
444 addr.is_v6() && addr.to_v6().is_loopback() ?
445 ba::ip::address(ba::ip::address_v4::loopback()) :
448 SetEndpoint(use.to_string(), ep.port());
450 fEndpoint = tcp::endpoint(use, ep.port());
456 fLog(0), fVerbose(true), fDebugTx(false),
457 fInTimeout(ioservice), fOutTimeout(ioservice), fConnectionTimer(ioservice),
void PostMessage(const void *msg, size_t s=0)
bool ConnectImp(const boost::asio::ip::tcp::endpoint &endpoint, const boost::system::error_code &error)
void PostClose(bool restart=true)
virtual void HandleReceivedData(const boost::system::error_code &, size_t, int=0)
void HandleConnectionTimer(const boost::system::error_code &error)
The base implementation of a distributed messaging system.
Adds some functionality to boost::posix_time::ptime for our needs.
void CloseImp(bool restart=true)
int Write(const Time &t, const std::string &txt, int qos=kInfo)
void HandleWriteTimeout(const boost::system::error_code &error)
void HandleSentData(const boost::system::error_code &error, size_t)
void SetEndpoint(const std::string &addr, int port)
virtual void StartConnect()
void AsyncWrite(const boost::asio::const_buffers_1 &buffers)
virtual int Write(const Time &time, const std::string &txt, int qos=kMessage)
void ConnectAddr(const boost::asio::ip::tcp::endpoint &endpoint, const boost::system::error_code &error)
void AsyncRead(const boost::asio::mutable_buffers_1 buffers, int type=0)
void ConnectIter(boost::asio::ip::tcp::resolver::iterator endpoint_iterator, const boost::system::error_code &error)
Connection(boost::asio::io_service &io_service, std::ostream &out)