FACT++  1.0
sqmctrl.cc
Go to the documentation of this file.
1 #include <boost/algorithm/string.hpp>
2 
3 #include "FACT.h"
4 #include "Dim.h"
5 #include "Event.h"
6 #include "StateMachineDim.h"
7 #include "StateMachineAsio.h"
8 #include "Connection.h"
9 #include "LocalControl.h"
10 #include "Configuration.h"
11 #include "Console.h"
12 
13 #include "tools.h"
14 
15 #include "HeadersSQM.h"
16 
17 namespace ba = boost::asio;
18 namespace bs = boost::system;
19 namespace dummy = ba::placeholders;
20 
21 using namespace std;
22 
23 class ConnectionSQM : public Connection
24 {
25 protected:
26  virtual void Update(const SQM::Data &)
27  {
28  }
29 
30 private:
31  bool fIsVerbose;
33  bool fValid;
34  uint16_t fTimeout;
35 
36  boost::asio::streambuf fBuffer;
37 
38  boost::asio::deadline_timer fTrigger;
39 
40  void HandleRead(const boost::system::error_code& err, size_t bytes_received)
41  {
42  // Do not schedule a new read if the connection failed.
43  if (bytes_received==0 || err)
44  {
45  if (err==ba::error::eof)
46  Warn("Connection closed by remote host.");
47 
48  // 107: Transport endpoint is not connected (bs::error_code(107, bs::system_category))
49  // 125: Operation canceled
50  if (err && err!=ba::error::eof && // Connection closed by remote host
51  err!=ba::error::basic_errors::not_connected && // Connection closed by remote host
52  err!=ba::error::basic_errors::operation_aborted) // Connection closed by us
53  {
54  ostringstream str;
55  str << "Reading from " << URL() << ": " << err.message() << " (" << err << ")";// << endl;
56  Error(str);
57  }
58  PostClose(false);//err!=ba::error::basic_errors::operation_aborted);
59  return;
60  }
61 
62  istream is(&fBuffer);
63 
64  string buffer;
65  if (!getline(is, buffer, '\n'))
66  {
67  Fatal("Received message does not contain \\n... closing connection.");
68  PostClose(false);
69  return;
70  }
71 
72  buffer = buffer.substr(0, buffer.size()-1);
73 
74  if (fIsVerbose)
75  {
76  Out() << Time().GetAsStr("%H:%M:%S.%f") << "[" << buffer.size() << "]: " << buffer << "|" << endl;
77  // Out() << Time().GetAsStr("%H:%M:%S.%f") << "[ " << vec.size() << "]: ";
78  // for (auto it=vec.begin(); it!=vec.end(); it++)
79  // Out() << *it << "|";
80  // Out() << endl;
81  }
82 
83  vector<string> vec;
84  boost::split(vec, buffer, boost::is_any_of(","));
85 
86  try
87  {
88  if (vec.size()!=6)
89  throw runtime_error("Unknown number of fields in received data");
90 
91  if (vec[0]!="r")
92  throw runtime_error("Not a proper answer");
93 
95 
96  data.mag = stof(vec[1]);
97  data.freq = stol(vec[2]);
98  data.counts = stol(vec[3]);
99  data.period = stof(vec[4]);
100  data.temp = stof(vec[5]);
101 
102  Update(data);
103 
104  fValid = true;
105  }
106  catch (const exception &e)
107  {
108  if (fFirstMessage)
109  Warn("Parsing first message failed ["+string(e.what())+"]");
110  else
111  {
112  Error("Parsing received message failed ["+string(e.what())+"]");
113  Error("Received: "+buffer);
114  PostClose(false);
115  return;
116  }
117  }
118 
119  // Send next request in fTimeout milliseconds calculated from
120  // the last request onwards.
121  fTrigger.expires_at(fTrigger.expires_at()+boost::posix_time::milliseconds(fTimeout));
122  fTrigger.async_wait(boost::bind(&ConnectionSQM::HandleRequestTrigger,
123  this, dummy::error));
124 
125  fFirstMessage = false;
126  }
127 
128  void HandleReadTimeout(const bs::error_code &error)
129  {
130  // 125: Operation canceled (bs::error_code(125, bs::system_category))
131  if (error && error!=ba::error::basic_errors::operation_aborted)
132  {
133  ostringstream str;
134  str << "ReadTimeout of " << URL() << " failed: " << error.message() << " (" << error << ")";// << endl;
135  Error(str);
136 
137  PostClose(false);
138  return;
139  }
140 
141  if (!is_open())
142  {
143  // For example: Here we could schedule a new accept if we
144  // would not want to allow two connections at the same time.
145  fValid = false;
146  PostClose(true);
147  return;
148  }
149 
150  // This is called if the deadline has been shifted
151  if (error==ba::error::basic_errors::operation_aborted)
152  return;
153 
154  // Check whether the deadline has passed. We compare the deadline
155  // against the current time since a new asynchronous operation
156  // may have moved the deadline before this actor had a chance
157  // to run.
158  if (fInTimeout.expires_at() > ba::deadline_timer::traits_type::now())
159  return;
160 
161  ostringstream str;
162  str << "No valid answer received from " << URL() << " within " << ceil(fTimeout*1.5) << "ms";
163  Error(str);
164 
165  PostClose(false);
166 
167  fInTimeout.expires_from_now(boost::posix_time::milliseconds(1000));
168  fInTimeout.async_wait(boost::bind(&ConnectionSQM::HandleReadTimeout,
169  this, dummy::error));
170  }
171 
172  void HandleRequestTrigger(const bs::error_code &error)
173  {
174 
175  // 125: Operation canceled (bs::error_code(125, bs::system_category))
176  if (error && error!=ba::error::basic_errors::operation_aborted)
177  {
178  ostringstream str;
179  str << "RequestTrigger failed of " << URL() << " failed: " << error.message() << " (" << error << ")";// << endl;
180  Error(str);
181 
182  PostClose(false);
183  return;
184  }
185 
186  if (!is_open())
187  {
188  // For example: Here we could schedule a new accept if we
189  // would not want to allow two connections at the same time.
190  //PostClose(true);
191  return;
192  }
193 
194  // Check whether the deadline has passed. We compare the deadline
195  // against the current time since a new asynchronous operation
196  // may have moved the deadline before this actor had a chance
197  // to run.
198  if (fTrigger.expires_at() > ba::deadline_timer::traits_type::now())
199  return;
200 
201  StartReadReport();
202  }
203 
205  {
206  PostMessage(string("rx\n"), 3);
207 
208  // Do not schedule two reads
209  if (!fFirstMessage)
210  {
211  async_read_until(*this, fBuffer, '\n',
212  boost::bind(&ConnectionSQM::HandleRead, this,
213  dummy::error, dummy::bytes_transferred));
214  }
215 
216  fInTimeout.expires_from_now(boost::posix_time::milliseconds(fTimeout*1.5));
217  fInTimeout.async_wait(boost::bind(&ConnectionSQM::HandleReadTimeout,
218  this, dummy::error));
219  }
220 
221 private:
222  // This is called when a connection was established
224  {
225  fValid = false;
226  fFirstMessage = true;
227 
228  // Empty a possible buffer first before we start reading
229  // otherwise reading and writing might not be consecutive
230  async_read_until(*this, fBuffer, '\n',
231  boost::bind(&ConnectionSQM::HandleRead, this,
232  dummy::error, dummy::bytes_transferred));
233 
234  // If there was no immediate answer, send a request
235  fTrigger.expires_at(Time()+boost::posix_time::milliseconds(1000));
236  fTrigger.async_wait(boost::bind(&ConnectionSQM::HandleRequestTrigger,
237  this, dummy::error));
238  }
239 
240 public:
241  static const uint16_t kMaxAddr;
242 
243 public:
244  ConnectionSQM(ba::io_service& ioservice, MessageImp &imp) : Connection(ioservice, imp()),
245  fIsVerbose(true), fTimeout(0), fTrigger(ioservice)
246  {
247  SetLogStream(&imp);
248  }
249 
250  void SetVerbose(bool b)
251  {
252  fIsVerbose = b;
254  }
255 
256  void SetTimeout(uint16_t t)
257  {
258  fTimeout = t;
259  }
260 
261  int GetState() const
262  {
263  if (!is_open())
265 
266  return fValid ? SQM::State::kValid : SQM::State::kConnected;
267  }
268 };
269 
270 // ------------------------------------------------------------------------
271 
272 #include "DimDescriptionService.h"
273 
274 class ConnectionDimWeather : public ConnectionSQM
275 {
276 private:
277  DimDescribedService fDim;
278 
279 public:
280  ConnectionDimWeather(ba::io_service& ioservice, MessageImp &imp) :
281  ConnectionSQM(ioservice, imp),
282  fDim("SQM_CONTROL/DATA", "F:1;I:1;I:1;F:1;F:1",
283  "Data received from sky quality meter"
284  "|Mag[mag/arcsec^2]:Magnitude (0 means upper brightness limit)"
285  "|Freq[Hz]:Frequency of sensor"
286  "|Counts:Period of sensor (counts occur at 14.7456MHz/32)"
287  "|Period[s]:Period of sensor"
288  "|Temp[deg C]:Sensor temperature in deg C")
289  {
290  }
291 
292  void Update(const SQM::Data &data)
293  {
294  fDim.Update(data);
295  }
296 };
297 
298 // ------------------------------------------------------------------------
299 
300 template <class T, class S>
302 {
303 private:
304  S fSQM;
305 
306  bool CheckEventSize(size_t has, const char *name, size_t size)
307  {
308  if (has==size)
309  return true;
310 
311  ostringstream msg;
312  msg << name << " - Received event has " << has << " bytes, but expected " << size << ".";
313  T::Fatal(msg);
314  return false;
315  }
316 
318  {
319  // Close all connections
320  fSQM.PostClose(false);
321 
322  return T::GetCurrentState();
323  }
324 
325  int Reconnect(const EventImp &evt)
326  {
327  // Close all connections to supress the warning in SetEndpoint
328  fSQM.PostClose(false);
329 
330  // Now wait until all connection have been closed and
331  // all pending handlers have been processed
332  ba::io_service::poll();
333 
334  if (evt.GetBool())
335  fSQM.SetEndpoint(evt.GetString());
336 
337  // Now we can reopen the connection
338  fSQM.PostClose(true);
339 
340  return T::GetCurrentState();
341  }
342 
343  int SetVerbosity(const EventImp &evt)
344  {
345  if (!CheckEventSize(evt.GetSize(), "SetVerbosity", 1))
346  return T::kSM_FatalError;
347 
348  fSQM.SetVerbose(evt.GetBool());
349 
350  return T::GetCurrentState();
351  }
352 
353  int Send(const string &cmd)
354  {
355  const string tx = cmd+"\r\n";
356  fSQM.PostMessage(tx, tx.size());
357  return T::GetCurrentState();
358  }
359 
360  int SendCommand(const EventImp &evt)
361  {
362  return Send(evt.GetString());
363  }
364 
365  int Execute()
366  {
367  return fSQM.GetState();
368  }
369 
370 
371 public:
372  StateMachineSQMControl(ostream &out=cout) :
373  StateMachineAsio<T>(out, "SQM_CONTROL"), fSQM(*this, *this)
374  {
375  // State names
376  T::AddStateName(SQM::State::kDisconnected, "Disconnected",
377  "No connection to Sky Quality Meter");
378 
379  T::AddStateName(SQM::State::kConnected, "Connected",
380  "Connection established, but no valid message received");
381 
382  T::AddStateName(SQM::State::kValid, "Valid",
383  "Valid message received");
384 
385  // Commands
386  //T::AddEvent("SEND_COMMAND", "C")
387  // (bind(&StateMachineSQMControl::SendCommand, this, placeholders::_1))
388  // ("Send command to SQM");
389 
390  // Verbosity commands
391  T::AddEvent("SET_VERBOSE", "B")
392  (bind(&StateMachineSQMControl::SetVerbosity, this, placeholders::_1))
393  ("set verbosity state"
394  "|verbosity[bool]:disable or enable verbosity for received data (yes/no), except dynamic data");
395 
396  //T::AddEvent("ENABLE")
397  // (bind(&StateMachineSQMControl::Send, this, "veto_60"))
398  // ("Enable trigger signal once a second vetoed at every exact minute");
399 
400  //T::AddEvent("DISABLE")
401  // (bind(&StateMachineSQMControl::Send, this, "veto_on"))
402  // ("Diable trigger output");
403 
404  // Conenction commands
405  T::AddEvent("DISCONNECT")
407  ("disconnect from ethernet");
408 
409  T::AddEvent("RECONNECT", "O")
410  (bind(&StateMachineSQMControl::Reconnect, this, placeholders::_1))
411  ("(Re)connect ethernet connection to SQM, a new address can be given"
412  "|[host][string]:new ethernet address in the form <host:port>");
413 
414  }
415 
417  {
418  fSQM.SetVerbose(!conf.Get<bool>("quiet"));
419  fSQM.SetTimeout(conf.Get<uint16_t>("request-interval"));
420  fSQM.SetDebugTx(conf.Get<bool>("debug-tx"));
421  fSQM.SetEndpoint(conf.Get<string>("addr"));
422  fSQM.StartConnect();
423 
424  return -1;
425  }
426 };
427 
428 // ------------------------------------------------------------------------
429 
430 #include "Main.h"
431 
432 
433 template<class T, class S, class R>
435 {
436  return Main::execute<T, StateMachineSQMControl<S, R>>(conf);
437 }
438 
440 {
441  po::options_description control("SQM control");
442  control.add_options()
443  ("no-dim,d", po_switch(), "Disable dim services")
444  ("addr,a", var<string>("10.0.100.208:10001"), "Network address of the lid controling Arduino including port")
445  ("quiet,q", po_bool(true), "Disable printing contents of all received messages (except dynamic data) in clear text.")
446  ("debug-tx", po_bool(), "Enable debugging of ethernet transmission.")
447  ("request-interval", var<uint16_t>(5000), "How often to request a report [milliseconds].")
448  ;
449 
450  conf.AddOptions(control);
451 }
452 
453 /*
454  Extract usage clause(s) [if any] for SYNOPSIS.
455  Translators: "Usage" and "or" here are patterns (regular expressions) which
456  are used to match the usage synopsis in program output. An example from cp
457  (GNU coreutils) which contains both strings:
458  Usage: cp [OPTION]... [-T] SOURCE DEST
459  or: cp [OPTION]... SOURCE... DIRECTORY
460  or: cp [OPTION]... -t DIRECTORY SOURCE...
461  */
463 {
464  cout <<
465  "The sqmctrl is an interface to the Sky Quality Meter.\n"
466  "\n"
467  "The default is that the program is started without user intercation. "
468  "All actions are supposed to arrive as DimCommands. Using the -c "
469  "option, a local shell can be initialized. With h or help a short "
470  "help message about the usuage can be brought to the screen.\n"
471  "\n"
472  "Usage: sqmctrl [-c type] [OPTIONS]\n"
473  " or: sqmctrl [OPTIONS]\n";
474  cout << endl;
475 }
476 
477 void PrintHelp()
478 {
479 // Main::PrintHelp<StateMachineFTM<StateMachine, ConnectionFTM>>();
480 
481  /* Additional help text which is printed after the configuration
482  options goes here */
483 
484  /*
485  cout << "bla bla bla" << endl << endl;
486  cout << endl;
487  cout << "Environment:" << endl;
488  cout << "environment" << endl;
489  cout << endl;
490  cout << "Examples:" << endl;
491  cout << "test exam" << endl;
492  cout << endl;
493  cout << "Files:" << endl;
494  cout << "files" << endl;
495  cout << endl;
496  */
497 }
498 
499 int main(int argc, const char* argv[])
500 {
501  Configuration conf(argv[0]);
504  SetupConfiguration(conf);
505 
506  if (!conf.DoParse(argc, argv, PrintHelp))
507  return 127;
508 
509  // No console access at all
510  if (!conf.Has("console"))
511  {
512  if (conf.Get<bool>("no-dim"))
513  return RunShell<LocalStream, StateMachine, ConnectionSQM>(conf);
514  else
515  return RunShell<LocalStream, StateMachineDim, ConnectionDimWeather>(conf);
516  }
517  // Cosole access w/ and w/o Dim
518  if (conf.Get<bool>("no-dim"))
519  {
520  if (conf.Get<int>("console")==0)
521  return RunShell<LocalShell, StateMachine, ConnectionSQM>(conf);
522  else
523  return RunShell<LocalConsole, StateMachine, ConnectionSQM>(conf);
524  }
525  else
526  {
527  if (conf.Get<int>("console")==0)
528  return RunShell<LocalShell, StateMachineDim, ConnectionDimWeather>(conf);
529  else
530  return RunShell<LocalConsole, StateMachineDim, ConnectionDimWeather>(conf);
531  }
532 
533  return 0;
534 }
void SetTimeout(uint16_t t)
Definition: sqmctrl.cc:256
uint32_t counts
Definition: HeadersSQM.h:21
A general base-class describing events issues in a state machine.
Definition: EventImp.h:11
virtual void Update(const SQM::Data &)
Definition: sqmctrl.cc:26
void ConnectionEstablished()
Definition: sqmctrl.cc:223
bool fIsVerbose
Definition: sqmctrl.cc:31
void SetupConfiguration(Configuration &conf)
Definition: Main.h:25
The base implementation of a distributed messaging system.
Definition: MessageImp.h:10
Adds some functionality to boost::posix_time::ptime for our needs.
Definition: Time.h:30
char str[80]
Definition: test_client.c:7
void SetPrintUsage(const std::function< void(void)> &func)
T Get(const std::string &var)
void PrintUsage()
Definition: sqmctrl.cc:462
bool CheckEventSize(size_t has, const char *name, size_t size)
Definition: sqmctrl.cc:306
po::typed_value< bool > * po_switch()
STL namespace.
int SendCommand(const EventImp &evt)
Definition: sqmctrl.cc:360
int RunShell(Configuration &conf)
Definition: sqmctrl.cc:434
ConnectionSQM(ba::io_service &ioservice, MessageImp &imp)
Definition: sqmctrl.cc:244
std::string GetString() const
Definition: EventImp.cc:194
static const uint16_t kMaxAddr
Definition: sqmctrl.cc:241
bool fValid
Definition: sqmctrl.cc:33
StateMachineSQMControl(ostream &out=cout)
Definition: sqmctrl.cc:372
void StartReadReport()
Definition: sqmctrl.cc:204
void HandleReadTimeout(const bs::error_code &error)
Definition: sqmctrl.cc:128
bool Has(const std::string &var)
void AddOptions(const po::options_description &opt, bool visible=true)
Definition: Configuration.h:92
float period
Definition: HeadersSQM.h:22
void SetupConfiguration(Configuration &conf)
Definition: sqmctrl.cc:439
boost::asio::deadline_timer fTrigger
Definition: sqmctrl.cc:38
float mag
Definition: HeadersSQM.h:19
boost::asio::streambuf fBuffer
Definition: sqmctrl.cc:36
void SetVerbose(bool b=true)
Definition: Connection.h:148
Commandline parsing, resource file parsing and database access.
Definition: Configuration.h:9
void HandleRequestTrigger(const bs::error_code &error)
Definition: sqmctrl.cc:172
int buffer[BUFFSIZE]
Definition: db_dim_client.c:14
uint16_t fTimeout
Definition: sqmctrl.cc:34
int Reconnect(const EventImp &evt)
Definition: sqmctrl.cc:325
int size
Definition: db_dim_server.c:17
float data[4 *1440]
void SetVerbose(bool b)
Definition: sqmctrl.cc:250
bool GetBool() const
Definition: EventImp.h:90
TT t
Definition: test_client.c:26
Error()
Definition: HeadersFTM.h:197
int SetVerbosity(const EventImp &evt)
Definition: sqmctrl.cc:343
int Send(const string &cmd)
Definition: sqmctrl.cc:353
po::typed_value< bool > * po_bool(bool def=false)
int main(int argc, const char *argv[])
Definition: sqmctrl.cc:499
std::string GetAsStr(const char *fmt="%Y-%m-%d %H:%M:%S") const
Definition: Time.cc:240
int EvalOptions(Configuration &conf)
Definition: sqmctrl.cc:416
bool DoParse(int argc, const char **argv, const std::function< void()> &func=std::function< void()>())
int GetState() const
Definition: sqmctrl.cc:261
float temp
Definition: HeadersSQM.h:23
ConnectionDimWeather(ba::io_service &ioservice, MessageImp &imp)
Definition: sqmctrl.cc:280
uint32_t freq
Definition: HeadersSQM.h:20
void Update(const SQM::Data &data)
Definition: sqmctrl.cc:292
void PrintHelp()
Definition: sqmctrl.cc:477
virtual size_t GetSize() const
Definition: EventImp.h:55
bool fFirstMessage
Definition: sqmctrl.cc:32
void HandleRead(const boost::system::error_code &err, size_t bytes_received)
Definition: sqmctrl.cc:40