3 #include <boost/asio.hpp> 4 #include <boost/bind.hpp> 5 #include <boost/lexical_cast.hpp> 6 #include <boost/asio/deadline_timer.hpp> 7 #include <boost/enable_shared_from_this.hpp> 9 using boost::lexical_cast;
22 namespace ba = boost::asio;
23 namespace bs = boost::system;
24 namespace dummy = ba::placeholders;
26 using boost::lexical_cast;
35 vector<tcp_connection*>
vec;
38 Trigger() : fCmd(
"FAD/TRIGGER",
"I:1", this)
49 vec.erase(find(vec.begin(), vec.end(), ptr));
52 void commandHandler();
57 class tcp_connection :
public ba::ip::tcp::socket,
public boost::enable_shared_from_this<tcp_connection>
68 ba::async_read(*
this, buffers,
70 dummy::error, dummy::bytes_transferred));
73 void AsyncWrite(ba::ip::tcp::socket *socket,
const ba::const_buffers_1 &buffers)
75 ba::async_write(*socket, buffers,
77 dummy::error, dummy::bytes_transferred));
79 void AsyncWait(ba::deadline_timer &timer,
int seconds,
82 timer.expires_from_now(boost::posix_time::milliseconds(seconds));
83 timer.async_wait(boost::bind(handler, shared_from_this(), dummy::error));
87 tcp_connection(ba::io_service& ioservice,
int boardid) : ba::ip::tcp::socket(ioservice),
88 fBoardId(boardid), fRamRoi(
kNumChannels), fTriggerSendData(ioservice),
89 fTriggerEnabled(false)
101 void HandleSentData(
const boost::system::error_code& error,
size_t bytes_transferred)
103 cout <<
"Data sent[" << fBoardId <<
"]: (transmitted=" << bytes_transferred <<
") rc=" << error.message() <<
" (" << error <<
")" << endl;
104 fOutQueue.pop_front();
109 fOutQueue.pop_front();
134 if (fOutQueue.size()>3)
138 fHeader.fEventCounter++;
139 fHeader.fTriggerCounter = triggerid;
140 fHeader.fTimeStamp = uint32_t((
Time(
Time::utc).UnixTime()-fStartTime)*10000);
141 fHeader.fFreqRefClock = 997+rand()/(RAND_MAX/7);
160 fHeader.fTempDrs[
i] = (42.+fBoardId/40.+
float(rand())/RAND_MAX*5)*16;
163 size_t sz =
sizeof(fHeader) +
kNumChannels*
sizeof(FAD::ChannelHeader) + 2;
168 vector<uint16_t> evtbuf;
173 fChHeader[
i].fStartCell = int64_t(1023)*rand()/RAND_MAX;
177 for (
int ii=0; ii<fChHeader[
i].fRegionOfInterest; ii++)
180 const int abs = (ii+fChHeader[
i].fStartCell)%fChHeader[
i].fRegionOfInterest;
182 data[rel] += 6.*rand()/RAND_MAX + 5*exp(-rel/10);
183 data[rel] += 15*sin(2*3.1415*abs/512);
188 int p = 5.*rand()/RAND_MAX+ 20;
189 double rndm = 500.*rand()/RAND_MAX+500;
190 for (
int ii=0; ii<fChHeader[
i].fRegionOfInterest; ii++)
191 data[ii] += rndm*exp(-0.5*(ii-p)*(ii-p)/25);
194 const vector<uint16_t> buf = fChHeader[
i].HtoN();
196 evtbuf.insert(evtbuf.end(), buf.begin(), buf.end());
197 evtbuf.insert(evtbuf.end(), data.begin(), data.end());
200 fHeader.fPackageLength += fChHeader[
i].fRegionOfInterest;
205 const vector<uint16_t> h = fHeader.HtoN();
207 evtbuf.insert(evtbuf.begin(), h.begin(), h.end());
209 fOutQueue.push_back(evtbuf);
212 AsyncWrite(
this,
ba::buffer(ba::const_buffer(fOutQueue.back().data(), fOutQueue.back().size()*2)));
215 if (fSockets.size()==0)
219 fSocket %= fSockets.size();
221 AsyncWrite(fSockets[fSocket].
get(),
ba::buffer(ba::const_buffer(fOutQueue.back().data(), fOutQueue.back().size()*2)));
234 if (ec==ba::error::basic_errors::operation_aborted)
241 if (fTriggerSendData.expires_at() > ba::deadline_timer::traits_type::now())
254 if (bytes_received==0)
262 if (fCommand.size()==0)
264 transform(fBufCommand.begin(), fBufCommand.begin()+bytes_received/2,
265 fBufCommand.begin(), ntohs);
267 switch (fBufCommand[0])
271 fHeader.Enable(FAD::EventHeader::kDenable, fBufCommand[0]==
kCmdDrsEnable);
272 cout <<
"-> DrsEnable " << fBoardId <<
" " << (fBufCommand[0]==
kCmdDrsEnable) << endl;
277 fHeader.Enable(FAD::EventHeader::kDwrite, fBufCommand[0]==
kCmdDwrite);
278 cout <<
"-> Dwrite " << fBoardId <<
" " << (fBufCommand[0]==
kCmdDwrite) << endl;
283 cout <<
"-> Trigger line " << fBoardId <<
" " << (fBufCommand[0]==
kCmdTriggerLine) << endl;
285 fHeader.Enable(FAD::EventHeader::kTriggerLine, fTriggerEnabled);
290 cout <<
"-> Sclk " << fBoardId << endl;
291 fHeader.Enable(FAD::EventHeader::kSpiSclk, fBufCommand[0]==
kCmdSclk);
296 cout <<
"-> Srclk " << fBoardId << endl;
302 cout <<
"-> Run " << fBoardId << endl;
307 cout <<
"-> BusyOff " << fBoardId <<
" " << (fBufCommand[0]==
kCmdBusyOff) << endl;
308 fHeader.Enable(FAD::EventHeader::kBusyOff, fBufCommand[0]==
kCmdBusyOff);
313 cout <<
"-> BusyOn " << fBoardId <<
" " << (fBufCommand[0]==
kCmdBusyOn) << endl;
314 fHeader.Enable(FAD::EventHeader::kBusyOn, fBufCommand[0]==
kCmdBusyOn);
319 cout <<
"-> Socket " << fBoardId <<
" " << (fBufCommand[0]==
kCmdSocket) << endl;
321 fHeader.Enable(FAD::EventHeader::kSock17, !fCommandSocket);
329 fTriggerSendData.cancel();
330 fHeader.Enable(FAD::EventHeader::kContTrigger, fBufCommand[0]==
kCmdContTrigger);
331 cout <<
"-> ContTrig " << fBoardId <<
" " << (fBufCommand[0]==
kCmdContTrigger) << endl;
335 cout <<
"-> ResetId " << fBoardId << endl;
336 fHeader.fEventCounter = 0;
340 cout <<
"-> Trigger " << fBoardId << endl;
345 cout <<
"-> Execute " << fBoardId << endl;
346 memcpy(fHeader.fDac, fRam.fDac,
sizeof(fHeader.fDac));
349 fHeader.fRunNumber = fRam.fRunNumber;
353 fCommand = fBufCommand;
357 fCommand = fBufCommand;
382 cout <<
"Received b=" << bytes_received <<
": " << error.message() <<
" (" << error <<
")" << endl;
383 cout <<
"Hex:" << Converter::GetHex<uint16_t>(&fBufCommand[0], bytes_received) << endl;
387 fBufCommand.resize(1);
392 transform(fBufCommand.begin(), fBufCommand.begin()+bytes_received/2,
393 fBufCommand.begin(), ntohs);
398 fRam.fRunNumber &= 0xffff;
399 fRam.fRunNumber |= fBufCommand[0]<<16;
400 cout <<
"-> Set RunNumber " << fBoardId <<
" MSW" << endl;
403 fRam.fRunNumber &= 0xffff0000;
404 fRam.fRunNumber |= fBufCommand[0];
405 cout <<
"-> Set RunNumber " << fBoardId <<
" LSW" << endl;
408 cout <<
"-> Set " << fBoardId <<
" Roi[" << fCommand[1] <<
"]=" << fBufCommand[0] << endl;
410 fRamRoi[fCommand[1]] = fBufCommand[0];
414 cout <<
"-> Set " << fBoardId <<
" Dac[" << fCommand[1] <<
"]=" << fBufCommand[0] << endl;
415 fRam.fDac[fCommand[1]] = fBufCommand[0];
419 cout <<
"-> Set " << fBoardId <<
" Rate =" << fBufCommand[0] << endl;
420 fHeader.fTriggerGeneratorPrescaler = fBufCommand[0];
426 fBufCommand.resize(1);
433 static shared_ptr
create(ba::io_service& io_service,
int boardid)
442 fTriggerEnabled=
false;
446 fHeader.fVersion = 0x104;
447 fHeader.fBoardId = (fBoardId%10) | ((fBoardId/10)<<8);
448 fHeader.fRunNumber = 0;
449 fHeader.fDNA =
reinterpret_cast<uint64_t
>(
this);
450 fHeader.fTriggerGeneratorPrescaler = 100;
451 fHeader.fStatus = 0xf<<12 |
452 FAD::EventHeader::kDenable |
453 FAD::EventHeader::kDwrite |
454 FAD::EventHeader::kDcmLocked |
455 FAD::EventHeader::kDcmReady |
456 FAD::EventHeader::kSpiSclk;
463 fChHeader[
i].fId = (
i%9) | ((
i/9)<<4);
464 fChHeader[
i].fRegionOfInterest = 0;
468 fBufCommand.resize(1);
478 vector<boost::shared_ptr<ba::ip::tcp::socket>>
fSockets;
486 void handle_accept(boost::shared_ptr<ba::ip::tcp::socket> socket,
int port,
const boost::system::error_code&)
488 cout <<
this <<
" Added one socket[" << fBoardId <<
"] " << socket->remote_endpoint().address().to_v4().to_string();
489 cout <<
":"<< port << endl;
490 fSockets.push_back(socket);
501 for (vector<tcp_connection*>::iterator it=vec.begin();
503 (*it)->PostTrigger(getCommand()->getInt());
522 acc0(ioservice, tcp::endpoint(tcp::v4(), port)),
523 acc1(ioservice, tcp::endpoint(tcp::v4(), port+1)),
524 acc2(ioservice, tcp::endpoint(tcp::v4(), port+2)),
525 acc3(ioservice, tcp::endpoint(tcp::v4(), port+3)),
526 acc4(ioservice, tcp::endpoint(tcp::v4(), port+4)),
527 acc5(ioservice, tcp::endpoint(tcp::v4(), port+5)),
528 acc6(ioservice, tcp::endpoint(tcp::v4(), port+6)),
529 acc7(ioservice, tcp::endpoint(tcp::v4(), port+7)),
543 boost::shared_ptr<ba::ip::tcp::socket> connection =
544 boost::shared_ptr<ba::ip::tcp::socket>(
new ba::ip::tcp::socket(acc.get_io_service()));
546 acc.async_accept(*connection,
549 acc.local_endpoint().port(),
550 ba::placeholders::error));
555 cout <<
"Start accept[" << fBoardId <<
"] " << acc0.local_endpoint().port() <<
"..." << flush;
558 cout << new_connection.get() <<
" ";
561 acc0.async_accept(*new_connection,
565 ba::placeholders::error));
567 start_accept(new_connection, acc1);
568 start_accept(new_connection, acc2);
569 start_accept(new_connection, acc3);
570 start_accept(new_connection, acc4);
571 start_accept(new_connection, acc5);
572 start_accept(new_connection, acc6);
573 start_accept(new_connection, acc7);
575 cout <<
"start-done." << endl;
583 cout << new_connection.get() <<
" Handle accept[" << fBoardId <<
"]["<<new_connection->fBoardId<<
"]..." << flush;
586 new_connection->start();
593 cout <<
"handle-done." << endl;
601 const string n = conf.
GetName()+
".log";
603 po::options_description config(
"Program options");
605 (
"dns", var<string>(
"localhost"),
"Dim nameserver host name (Overwites DIM_DNS_NODE environment variable)")
606 (
"port,p", var<uint16_t>(4000),
"")
607 (
"num,n", var<uint16_t>(40),
"")
610 po::positional_options_description p;
614 conf.
AddEnv(
"dns",
"DIM_DNS_NODE");
620 int main(
int argc,
const char **argv)
626 po::variables_map vm;
629 vm = conf.
Parse(argc, argv);
631 #if BOOST_VERSION > 104000 632 catch (po::multiple_occurrences &e)
634 cerr <<
"Program options invalid due to: " << e.what() <<
" of '" << e.get_option_name() <<
"'." << endl;
640 cerr <<
"Program options invalid due to: " << e.what() << endl;
653 ba::io_service io_service;
655 const uint16_t n = conf.
Get<uint16_t>(
"num");
656 uint16_t port = conf.
Get<uint16_t>(
"port");
658 vector<shared_ptr<tcp_server>> servers;
660 for (
int i=0;
i<n;
i++)
662 shared_ptr<tcp_server> server(
new tcp_server(io_service, port,
i));
663 servers.push_back(server);
vector< uint16_t > fCommand
void TriggerSendData(const boost::system::error_code &ec)
void AsyncWait(ba::deadline_timer &timer, int seconds, void(tcp_connection::*handler)(const bs::error_code &))
ba::deadline_timer fTriggerSendData
Adds some functionality to boost::posix_time::ptime for our needs.
T Get(const std::string &var)
void AsyncRead(ba::mutable_buffers_1 buffers)
void SendData(uint32_t triggerid)
void HandleReceivedData(const boost::system::error_code &error, size_t bytes_received)
void handle_accept(boost::shared_ptr< ba::ip::tcp::socket > socket, int port, const boost::system::error_code &)
tcp_server(ba::io_service &ioservice, int port, int board)
void AddEnv(const std::string &conf, const std::string &env)
void SetArgumentPositions(const po::positional_options_description &desc)
void Add(tcp_connection *ptr)
void Setup(const std::string &dns="", const std::string &host="")
void handle_accept(tcp_connection::shared_ptr new_connection, const boost::system::error_code &error)
boost::shared_ptr< tcp_connection > shared_ptr
void AddOptions(const po::options_description &opt, bool visible=true)
void PostTrigger(uint32_t triggerid)
Commandline parsing, resource file parsing and database access.
void AsyncWrite(ba::ip::tcp::socket *socket, const ba::const_buffers_1 &buffers)
tcp_connection(ba::io_service &ioservice, int boardid)
vector< boost::shared_ptr< ba::ip::tcp::socket > > fSockets
deque< vector< uint16_t > > fOutQueue
static void handler(int conn_id, char *packet, int size, int status)
void Remove(tcp_connection *ptr)
void start_accept(tcp_connection::shared_ptr dest, tcp::acceptor &acc)
int main(int argc, const char **argv)
const po::variables_map & Parse(int argc, const char **argv, const std::function< void()> &func=std::function< void()>())
vector< uint16_t > fBufCommand
vector< tcp_connection * > vec
void SetupConfiguration(::Configuration &conf)
vector< uint16_t > fRamRoi
void HandleSentData(const boost::system::error_code &, size_t)
static shared_ptr create(ba::io_service &io_service, int boardid)
const std::string & GetName() const