FACT++  1.0
ConnectionUSB.cc
Go to the documentation of this file.
1 // **************************************************************************
7 // **************************************************************************
8 #include "ConnectionUSB.h"
9 
10 #include <boost/bind.hpp>
11 
12 using namespace std;
13 
14 namespace ba = boost::asio;
15 namespace bs = boost::system;
16 namespace dummy = ba::placeholders;
17 
18 using ba::serial_port_base;
19 
20 //#define DEBUG_TX
21 //#define DEBUG
22 
23 #ifdef DEBUG
24 #include <fstream>
25 #include <iomanip>
26 #include "Time.h"
27 #endif
28 
29 // -------- Abbreviations for starting async tasks ---------
30 
31 int ConnectionUSB::Write(const Time &t, const string &txt, int qos)
32 {
33  if (fLog)
34  return fLog->Write(t, txt, qos);
35 
36  return MessageImp::Write(t, txt, qos);
37 }
38 
39 void ConnectionUSB::AsyncRead(const ba::mutable_buffers_1 buffers, int type, int counter)
40 {
41  ba::async_read(*this, buffers,
42  boost::bind(&ConnectionUSB::HandleReceivedData, this,
43  dummy::error, dummy::bytes_transferred, type, counter));
44 }
45 
46 void ConnectionUSB::AsyncWrite(const ba::const_buffers_1 &buffers)
47 {
48  ba::async_write(*this, buffers,
49  boost::bind(&ConnectionUSB::HandleSentData, this,
50  dummy::error, dummy::bytes_transferred));
51 }
52 
53 void ConnectionUSB::AsyncWait(ba::deadline_timer &timer, int millisec,
54  void (ConnectionUSB::*handler)(const bs::error_code&))
55 {
56  // - The boost::asio::basic_deadline_timer::expires_from_now()
57  // function cancels any pending asynchronous waits, and returns
58  // the number of asynchronous waits that were cancelled. If it
59  // returns 0 then you were too late and the wait handler has
60  // already been executed, or will soon be executed. If it
61  // returns 1 then the wait handler was successfully cancelled.
62  // - If a wait handler is cancelled, the bs::error_code passed to
63  // it contains the value bs::error::operation_aborted.
64  timer.expires_from_now(boost::posix_time::milliseconds(millisec));
65 
66  timer.async_wait(boost::bind(handler, this, dummy::error));
67 }
68 
69 // ------------------------ close --------------------------
70 // close from another thread
71 void ConnectionUSB::CloseImp(int64_t delay)
72 {
73  if (IsConnected())
74  Info("Closing connection to "+URL()+".");
75 
76  // Close possible open connections
77  bs::error_code ec;
78  cancel(ec);
79  if (ec && ec!=ba::error::basic_errors::bad_descriptor)
80  {
81  ostringstream msg;
82  msg << "Cancel async requests on " << URL() << ": " << ec.message() << " (" << ec << ")";
83  Error(msg);
84  }
85 
86  if (IsConnected())
87  {
88  close(ec);
89  if (ec)
90  {
91  ostringstream msg;
92  msg << "Closing " << URL() << ": " << ec.message() << " (" << ec << ")";
93  Error(msg);
94  }
95  else
96  Info("Closed connection to "+URL()+" succesfully.");
97  }
98 
99  // Stop deadline counters
100  fInTimeout.cancel();
101  fOutTimeout.cancel();
102  fConnectTimeout.cancel();
103 
104  // Reset the connection status
105  fQueueSize = 0;
106  fConnectionStatus = kDisconnected;
107 
108 #ifdef DEBUG
109  ofstream fout1("transmitted.txt", ios::app);
110  ofstream fout2("received.txt", ios::app);
111  ofstream fout3("send.txt", ios::app);
112  fout1 << Time() << ": ---" << endl;
113  fout2 << Time() << ": ---" << endl;
114  fout3 << Time() << ": ---" << endl;
115 #endif
116 
117  if (delay<0 || IsConnecting())
118  return;
119 
120  // We need some timeout before reconnecting!
121  // And we have to check if we are alreayd trying to connect
122  // We should wait until all operations in progress were canceled
123  fConnectTimeout.expires_from_now(boost::posix_time::seconds(delay));
124  fConnectTimeout.async_wait(boost::bind(&ConnectionUSB::HandleReconnectTimeout, this, dummy::error));
125 }
126 
127 void ConnectionUSB::PostClose(int64_t delay)
128 {
129  get_io_service().post(boost::bind(&ConnectionUSB::CloseImp, this, delay));
130 }
131 
132 void ConnectionUSB::HandleReconnectTimeout(const bs::error_code &error)
133 {
134  if (error==ba::error::basic_errors::operation_aborted)
135  return;
136 
137  // 125: Operation canceled (bs::error_code(125, bs::system_category))
138  if (error)
139  {
140  ostringstream str;
141  str << "Reconnect timeout of " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
142  Error(str);
143 
144  CloseImp(-1);
145  return;
146  }
147 
148 
149  if (is_open())
150  {
151  Error("HandleReconnectTimeout - "+URL()+" is already open.");
152  return;
153  }
154 
155  // Check whether the deadline has passed. We compare the deadline
156  // against the current time since a new asynchronous operation
157  // may have moved the deadline before this actor had a chance
158  // to run.
159  if (fConnectTimeout.expires_at() > ba::deadline_timer::traits_type::now())
160  return;
161 
162  // Start trying to reconnect
163  Connect();
164 }
165 
166 
167 // ------------------------ write --------------------------
168 void ConnectionUSB::HandleWriteTimeout(const bs::error_code &error)
169 {
170  if (error==ba::error::basic_errors::operation_aborted)
171  return;
172 
173  // 125: Operation canceled (bs::error_code(125, bs::system_category))
174  if (error)
175  {
176  ostringstream str;
177  str << "Write timeout of " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
178  Error(str);
179 
180  CloseImp(-1);
181  return;
182  }
183 
184  if (!is_open())
185  {
186  // For example: Here we could schedule a new accept if we
187  // would not want to allow two connections at the same time.
188  return;
189  }
190 
191  // Check whether the deadline has passed. We compare the deadline
192  // against the current time since a new asynchronous operation
193  // may have moved the deadline before this actor had a chance
194  // to run.
195  if (fOutTimeout.expires_at() > ba::deadline_timer::traits_type::now())
196  return;
197 
198  Error("fOutTimeout has expired, writing data to "+URL());
199 
200  CloseImp(-1);
201 }
202 
203 void ConnectionUSB::HandleSentData(const bs::error_code& error, size_t n)
204 {
205  if (error==ba::error::basic_errors::operation_aborted)
206  return;
207 
208  if (error && error != ba::error::not_connected)
209  {
210  ostringstream str;
211  str << "Writing to " << URL() << ": " << error.message() << " (" << error << ")";// << endl;
212  Error(str);
213 
214  CloseImp(-1);
215  return;
216  }
217 
218  if (error == ba::error::not_connected)
219  {
220  ostringstream msg;
221  msg << n << " bytes could not be sent to " << URL() << " due to missing connection.";
222  Warn(msg);
223  return;
224  }
225 
226  if (--fQueueSize==0)
227  fOutTimeout.cancel();
228 
229 #ifdef DEBUG_TX
230  ostringstream msg;
231  msg << n << " bytes successfully sent to " << URL();
232  Message(msg);
233 #endif
234 
235 #ifdef DEBUG
236  ofstream fout("transmitted.txt", ios::app);
237  fout << Time() << ": ";
238  for (unsigned int i=0; i<fOutQueue.front().size(); i++)
239  fout << hex << setfill('0') << setw(2) << (uint32_t)fOutQueue.front()[i];
240  fout << endl;
241 #endif
242 
243  HandleTransmittedData(n);
244 }
245 
246 void ConnectionUSB::PostMessage(const void *ptr, size_t sz)
247 {
248  // This function can be called from a different thread...
249  if (!is_open())
250  return;
251 
252  // ... this is why we have to increase fQueueSize first
253  fQueueSize++;
254 
255  // ... and shift the deadline timer
256  // This is not ideal, because if we are continously
257  // filling the buffer, it will never timeout
258  AsyncWait(fOutTimeout, 5000, &ConnectionUSB::HandleWriteTimeout);
259 
260  // Now we can schedule the buffer to be sent
261  AsyncWrite(ba::const_buffers_1(ptr, sz));
262 }
263 
264 void ConnectionUSB::PostMessage(const string &cmd, size_t max)
265 {
266  if (max==size_t(-1))
267  max = cmd.length()+1;
268 
269  PostMessage(cmd.c_str(), min(cmd.length()+1, max));
270 }
271 
273 {
274  fConnectionStatus = kConnecting;
275 
276  Info("Connecting to "+URL()+".");
277 
278  bs::error_code ec;
279  open(URL(), ec);
280 
281  if (ec)
282  {
283  ostringstream msg;
284  msg << "Error opening " << URL() << "... " << ec.message() << " (" << ec << ")";
285  Error(msg);
286  fConnectionStatus = kDisconnected;
287  return;
288  }
289 
290  Info("Connection established.");
291 
292  try
293  {
294  Info("Setting Baud Rate");
295  set_option(fBaudRate);
296  Info("Setting Character Size");
297  set_option(fCharacterSize);
298  Info("Setting Parity");
299  set_option(fParity);
300  Info("Setting Sop Bits");
301  set_option(fStopBits);
302  Info("Setting Flow control");
303  set_option(fFlowControl);
304  }
305  catch (const bs::system_error &erc)
306  {
307  Error(string("Setting connection options: ")+erc.what());
308  // CLOSE
309  return;
310  }
311 
312  fQueueSize = 0;
313  fConnectionStatus = kConnected;
314 
315  ConnectionEstablished();
316 }
317 
318 void ConnectionUSB::SetEndpoint(const string &addr)
319 {
320  if (fConnectionStatus>=1)
321  Warn("Connection or connection attempt in progress. New endpoint only valid for next connection.");
322 
323  fAddress = "/dev/"+addr;
324 }
325 
326 
327 ConnectionUSB::ConnectionUSB(ba::io_service& ioservice, ostream &out) :
328 MessageImp(out), ba::serial_port(ioservice), fLog(0),
329 fBaudRate(115200),
330 fCharacterSize(8), fParity(parity::none), fStopBits(stop_bits::one),
331 fFlowControl(flow_control::none),
332 fInTimeout(ioservice), fOutTimeout(ioservice), fConnectTimeout(ioservice),
333 fQueueSize(0), fConnectionStatus(kDisconnected)
334 {
335 }
void SetEndpoint(const std::string &addr)
void PostMessage(const void *msg, size_t s=0)
void CloseImp(int64_t delay=0)
int i
Definition: db_dim_client.c:21
The base implementation of a distributed messaging system.
Definition: MessageImp.h:10
void AsyncWrite(const boost::asio::const_buffers_1 &buffers)
void AsyncWait(boost::asio::deadline_timer &timer, int millisec, void(ConnectionUSB::*handler)(const boost::system::error_code &))
Adds some functionality to boost::posix_time::ptime for our needs.
Definition: Time.h:30
char str[80]
Definition: test_client.c:7
void AsyncRead(const boost::asio::mutable_buffers_1 buffers, int type=0, int counter=0)
virtual void HandleReceivedData(const boost::system::error_code &, size_t, int=0, int=0)
STL namespace.
virtual int Write(const Time &time, const std::string &txt, int qos=kMessage)
Definition: MessageImp.cc:133
int type
uint16_t qos
Definition: HeadersGPS.h:29
void HandleSentData(const boost::system::error_code &error, size_t)
ConnectionUSB(boost::asio::io_service &io_service, std::ostream &out)
void HandleWriteTimeout(const boost::system::error_code &error)
int counter
Definition: db_dim_client.c:19
void HandleReconnectTimeout(const boost::system::error_code &error)
void PostClose(int64_t delay=0)
TT t
Definition: test_client.c:26
static void handler(int conn_id, char *packet, int size, int status)
Definition: webServer.c:635
Error()
Definition: HeadersFTM.h:197
int Write(const Time &t, const std::string &txt, int qos=kInfo)