4 #include <netinet/tcp.h> 11 #include <boost/algorithm/string/join.hpp> 13 #include "../externals/Queue.h" 21 #define MIN_LEN 32 // min #bytes needed to interpret FADheader 22 #define MAX_LEN 81920 // one max evt = 1024*2*36 + 8*36 + 72 + 4 = 74092 (data+boardheader+eventheader+endflag) 119 const std::lock_guard<std::mutex> lock(mtx);
121 void *mem = memory.front();
138 delete [] (
char*)mem;
143 const std::lock_guard<std::mutex> lock(mtx);
144 memory.push_front(mem);
157 vsnprintf(str, 1000, fmt, ap);
172 #ifdef COMPLETE_EVENTS 185 struct sockaddr_in SockAddr;
218 uint32_t
len()
const {
return uint32_t(H.package_length)*2; }
225 READ_STRUCT() : socket(-1), connected(false), totBytes(0), relBytes(0)
236 bool create(sockaddr_in addr);
237 bool check(
int, sockaddr_in addr);
242 #ifdef PRIORITY_QUEUE 243 struct READ_STRUCTcomp
257 const int rc = epoll_wait(fd_epoll, events,
NBOARDS, 100);
278 fd_epoll = epoll_create(
NBOARDS);
290 if (fd_epoll>=0 && ::close(fd_epoll)>0)
302 const int port = ntohs(sockAddr.sin_port) + 1;
304 SockAddr.sin_family = sockAddr.sin_family;
305 SockAddr.sin_addr = sockAddr.sin_addr;
306 SockAddr.sin_port = htons(port);
308 if ((socket = ::socket(PF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0)) <= 0)
316 if (setsockopt(socket, IPPROTO_TCP, TCP_NODELAY, &optval,
sizeof(
int)) < 0)
320 if (setsockopt (socket, SOL_SOCKET, SO_KEEPALIVE, &optval,
sizeof(
int)) < 0)
324 if (setsockopt (socket, SOL_TCP, TCP_KEEPIDLE, &optval,
sizeof(
int)) < 0)
328 if (setsockopt (socket, SOL_TCP, TCP_KEEPINTVL, &optval,
sizeof(
int)) < 0)
332 if (setsockopt (socket, SOL_TCP, TCP_KEEPCNT, &optval,
sizeof(
int)) < 0)
350 if (fd_epoll>=0 && connected && epoll_ctl(fd_epoll, EPOLL_CTL_DEL, socket, NULL)<0)
354 if (::close(socket) > 0)
371 const int old = socket;
374 if (socket>=0 && sockDef==0)
378 if (socket<0 && sockDef!=0)
381 const bool retval = old!=socket;
392 const int rc = connect(socket, (
struct sockaddr *) &SockAddr,
sizeof(SockAddr));
406 bufLen =
sizeof(PEVNT_HEADER);
420 if (epoll_ctl(fd_epoll, EPOLL_CTL_ADD, socket, &ev)<0)
435 const int32_t jrd = recv(socket, bufPos, bufLen, MSG_DONTWAIT);
440 if (errno==EWOULDBLOCK || errno==EAGAIN)
463 gettimeofday(&
time, NULL);
484 S[12] = ntohs(S[12]);
485 S[13] = ntohs(S[13]);
486 S[14] = ntohs(S[14]);
487 S[15] = ntohs(S[15]);
489 I[10] = ntohl(I[10]);
490 I[11] = ntohl(I[11]);
502 for (
int ePatchesCount = 0; ePatchesCount<4*9; ePatchesCount++)
504 S[i+0] = ntohs(S[i+0]);
505 S[i+1] = ntohs(S[i+1]);
506 S[i+2] = ntohs(S[i+2]);
507 S[i+3] = ntohs(S[i+3]);
521 int roiPtr =
sizeof(PEVNT_HEADER)/2 + 2;
523 roi[0] = ntohs(rd.
S[roiPtr]);
525 for (
int jr = 0; jr < 9; jr++)
527 roi[jr] = ntohs(rd.
S[roiPtr]);
536 if (jr!=8 && roi[jr]!=roi[0])
543 for (
int kr = 1; kr < 4; kr++)
545 const int kroi = ntohs(rd.
S[roiPtr]);
561 factPrintf(
MessageImp::kFatal,
"Inconsistent Roi accross channels [DRS=%d Ch=%d], expected %d, got %d", xjr, xkr, roi[xjr], ntohs(rd.
S[roiPtr]));
568 factPrintf(
MessageImp::kError,
"Mismatch of roi (%d) in channel 8. Should be larger or equal than the roi (%d) in channel 0.", roi[8], roi[0]);
588 return shared_ptr<EVT_CTRL2>();
590 for (
auto it=evtCtrl.rbegin(); it!=evtCtrl.rend(); it++)
593 const shared_ptr<EVT_CTRL2> &evt = *it;
599 if (rd.
H.runnumber != evt->runNum)
604 if (rd.
H.fad_evt_counter > evt->evNum)
607 if (rd.
H.fad_evt_counter != evt->evNum)
612 if (evt->nRoi != nRoi[0] || evt->nRoiTM != nRoi[8])
615 evt->nRoi, evt->nRoiTM, nRoi[0], nRoi[8]);
616 return shared_ptr<EVT_CTRL2>();
622 if ((rd.
time.tv_sec==evt->time.tv_sec && rd.
time.tv_usec<evt->time.tv_usec) ||
623 rd.
time.tv_sec<evt->time.tv_sec)
630 if (actrun->runId==rd.
H.runnumber && (actrun->roi0 != nRoi[0] || actrun->roi8 != nRoi[8]))
633 actrun->roi0, actrun->roi8, nRoi[0], nRoi[8], rd.
H.runnumber, rd.
H.fad_evt_counter);
634 return shared_ptr<EVT_CTRL2>();
641 evt->runNum = rd.
H.runnumber;
642 evt->evNum = rd.
H.fad_evt_counter;
644 evt->trgNum = rd.
H.trigger_id;
645 evt->trgTyp = rd.
H.trigger_type;
648 evt->nRoiTM = nRoi[8];
652 const bool newrun = actrun->runId != rd.
H.runnumber;
657 actrun->maxEvt = actrun->lastEvt;
660 rd.
H.runnumber, rd.
H.fad_evt_counter, nRoi[0], nRoi[8], actrun->runId);
663 actrun = make_shared<RUN_CTRL2>();
665 const time_t &tsec = evt->time.tv_sec;
667 actrun->openTime = tsec;
668 actrun->closeTime = tsec + 3600 * 24;
669 actrun->runId = rd.
H.runnumber;
670 actrun->roi0 = nRoi[0];
671 actrun->roi8 = nRoi[8];
683 actrun->lastTime = evt->time.tv_sec;
690 const bool cond1 = actrun->lastEvt < actrun->maxEvt;
691 const bool cond2 = actrun->lastTime < actrun->closeTime;
692 if (!cond1 || !cond2)
705 evtCtrl.emplace_back(evt);
706 return evtCtrl.back();
714 memcpy(evt->
FADhead+i, &rBuf.
H,
sizeof(PEVNT_HEADER));
716 int src =
sizeof(PEVNT_HEADER) / 2;
719 const uint16_t &roi = rBuf.
S[src+2];
722 EVENT *
event = evt->
fEvent;
723 for (
int px = 0; px < 9; px++)
725 for (
int drs = 0; drs < 4; drs++)
727 const int16_t pixC = rBuf.
S[src+1];
728 const int16_t pixR = rBuf.
S[src+2];
731 const int pixS = i*36 + drs*9 + px;
733 event->StartPix[pixS] = pixC;
735 memcpy(event->Adc_Data + pixS*roi, &rBuf.
S[src+4], roi * 2);
743 const int tmS = i*4 + drs;
748 event->StartTM[tmS] = (pixC + pixR - roi) % 1024;
750 memcpy(event->Adc_Data + tmS*roi +
NPIX*roi, &rBuf.
S[src - roi], roi * 2);
754 event->StartTM[tmS] = -1;
765 evt->runNum, evt->evNum, evtCtrl.size(), txt);
772 for (
int ib=0; ib<
NBOARDS; ib++)
777 const int jb = evt->board[ib];
780 str[ik++] =
'0'+(jb%10);
795 report |= ((uint64_t)1)<<ib;
809 bool proc1(
const shared_ptr<EVT_CTRL2> &);
813 bool proc1(
const shared_ptr<EVT_CTRL2> &evt)
849 processingQueue1.
post(evt);
879 vector<string> reason;
881 reason.emplace_back(
"close requested");
883 reason.emplace_back(
"receive timeout");
885 reason.emplace_back(
"connection changed");
887 reason.emplace_back(
"event check failed");
891 reason.push_back(to_string(run.
maxEvt)+
" evts reached");
893 reason.emplace_back(
"runWrite failed");
895 const string str = boost::algorithm::join(reason,
", ");
903 bool procEvt(
const shared_ptr<EVT_CTRL2> &evt)
910 EVENT *
event = evt->fEvent;
919 event->NumBoards = evt->nBoard;
921 event->PCTime = evt->time.tv_sec;
922 event->PCUsec = evt->time.tv_usec;
924 for (
int ib=0; ib<
NBOARDS; ib++)
925 event->BoardTime[ib] = evt->FADhead[ib].time;
933 if (evt->trgTyp && !(evt->trgTyp & FAD::EventHeader::kAll))
970 secondaryQueue.
post(evt);
1089 primaryQueue.
start();
1090 secondaryQueue.
start();
1091 processingQueue1.
start();;
1093 actrun = make_shared<RUN_CTRL2>();
1096 time_t gi_SecTime =
time(NULL)-1;
1106 for (
int i=0;
i<40;
i++)
1108 if (rd[
i].socket>=0 && rd[
i].connected && rd[
i].bufLen>0)
1111 fds[nn].events = POLLIN;
1117 const int rc_epoll = poll(fds, nn, 100);
1127 if (rd[
i].socket>=0 && rd[
i].connected && rd[
i].bufLen>0)
1129 FD_SET(rd[
i].socket, &readfs);
1130 if (rd[
i].socket>nfsd)
1136 tv.tv_usec = 100000;
1137 const int rc_select = select(nfsd+1, &readfs, NULL, NULL, &tv);
1153 #ifdef PRIORITY_QUEUE 1154 priority_queue<READ_STRUCT*, vector<READ_STRUCT*>, READ_STRUCTcomp> prio;
1157 if (rd[
i].connected)
1160 if (!prio.empty())
do 1165 for (
int jj=0; jj<nn; jj++)
1168 for (
int jj=0; jj<rc_epoll; jj++)
1170 #
if !defined(USE_EPOLL) && !defined(USE_POLL) && !defined(PRIORITY_QUEUE)
1171 for (
int jj=0; jj<
NBOARDS; jj++)
1174 #ifdef PRIORITY_QUEUE 1178 if (!FD_ISSET(rs->
socket, &readfs))
1183 if ((fds[jj].revents&POLLIN)==0)
1197 #if !defined(USE_POLL) && !defined(USE_EPOLL) && !defined(PRIORITY_QUEUE) 1198 const int i = (jj%4)*10 + (jj/4);
1202 #ifdef COMPLETE_EVENTS 1203 if (rs->
bufTyp==READ_STRUCT::kWait)
1209 const bool rc_read = rs->
read();
1228 for (k=0; k<
sizeof(PEVNT_HEADER)-1; k++)
1230 if (rs->
B[k]==0xfb && rs->
B[k+1] == 0x01)
1236 if (k==
sizeof(PEVNT_HEADER)-1)
1238 rs->
B[0] = rs->
B[
sizeof(PEVNT_HEADER)-1];
1240 rs->
bufLen =
sizeof(PEVNT_HEADER)-1;
1246 memmove(rs->
B, rs->
B+k,
sizeof(PEVNT_HEADER)-k);
1264 rs->
bufLen = rs->
len() -
sizeof(PEVNT_HEADER);
1271 const uint16_t &
end = *
reinterpret_cast<uint16_t*
>(rs->
bufPos-2);
1279 rs->
bufLen =
sizeof(PEVNT_HEADER);
1289 if (evt && !evt->initMemory())
1291 const time_t tm =
time(NULL);
1292 if (evt->runCtrl->reportMem==tm)
1296 evt->runCtrl->reportMem = tm;
1302 rs->
bufLen =
sizeof(PEVNT_HEADER);
1310 if (evt->board[rs->
sockId] != -1)
1324 #ifdef COMPLETE_EVENTS 1326 rs->
bufTyp = READ_STRUCT::kWait;
1330 evt->header = evt->FADhead+rs->
sockId;
1333 #ifdef COMPLETE_EPOLL 1347 for (
auto it=evtCtrl.begin(); it!=evtCtrl.end(); )
1349 const bool found = it->get()==evt.get();
1353 primaryQueue.
post(evt);
1356 for (
int ib=0; ib<40; ib++)
1357 rd[ib].relBytes += uint32_t((*it)->FADhead[ib].package_length)*2;
1362 evtCtrl.pop_front();
1369 #ifdef COMPLETE_EPOLL 1370 for (
int j=0; j<40; j++)
1373 ev.events = EPOLLIN;
1374 ev.data.ptr = &rd[j];
1383 #ifdef COMPLETE_EVENTS 1384 for (
int j=0; j<40; j++)
1389 rs->
bufLen =
sizeof(PEVNT_HEADER);
1395 #ifdef PRIORITY_QUEUE 1401 const time_t actTime =
time(NULL);
1402 if (actTime == gi_SecTime)
1404 #if !defined(USE_SELECT) && !defined(USE_EPOLL) && !defined(USE_POLL) 1405 if (evtCtrl.empty())
1406 usleep(actTime-
actrun->lastTime>300 ? 10000 : 1);
1410 gi_SecTime = actTime;
1418 for (
auto it=evtCtrl.begin(); it!=evtCtrl.end(); )
1421 const shared_ptr<EVT_CTRL2> &evt = *it;
1451 for (
int ib=0; ib<40; ib++)
1452 rd[ib].relBytes += uint32_t(evt->FADhead[ib].package_length)*2;
1454 evtCtrl.pop_front();
1459 gj.bufNew = evtCtrl.size();
1460 gj.bufEvt = primaryQueue.
size();
1461 gj.bufWrite = secondaryQueue.
size();
1462 gj.bufProc = processingQueue1.
size();
1470 bool changed =
false;
1472 static vector<uint64_t> store(NBOARDS);
1474 for (
int ib=0; ib<
NBOARDS; ib++)
1481 if (rd[ib].check(g_port[ib].sockDef, g_port[ib].sockAddr))
1501 if (actTime-
actrun->lastTime>300)
1522 const bool abort = gi_reset%100==2;
1526 primaryQueue.
wait(abort);
1527 secondaryQueue.
wait(abort);
1528 processingQueue1.
wait(abort);
1539 return gi_reset>=100;
1551 memset(&
gj, 0,
sizeof(GUI_STAT));
static epoll_event events[NBOARDS]
void gotNewRun(RUN_CTRL2 &run)
void factOut(int severity, const char *message)
std::forward_list< void * > memory
void copyData(const READ_STRUCT &rBuf, EVT_CTRL2 *evt)
bool runWrite(const EVT_CTRL2 &evt)
void factPrintf(int severity, const char *fmt,...)
A warning, things that somehow might result in unexpected or unwanted bahaviour.
list< shared_ptr< EVT_CTRL2 > > evtCtrl
uint64_t reportIncomplete(const shared_ptr< EVT_CTRL2 > &evt, const char *txt)
Queue< shared_ptr< EVT_CTRL2 > > processingQueue1(bind(&proc1, placeholders::_1))
Queue< shared_ptr< EVT_CTRL2 > > secondaryQueue(bind(&writeEvt, placeholders::_1))
void factStat(const GUI_STAT &gj)
uint gi_NumConnect[NBOARDS]
static READ_STRUCT * get(int i)
FACT_SOCK g_port[NBOARDS]
bool writeEvt(const shared_ptr< EVT_CTRL2 > &evt)
shared_ptr< RUN_CTRL2 > actrun
void debugHead(void *buf)
Queue< shared_ptr< EVT_CTRL2 > > primaryQueue(bind(&procEvt, placeholders::_1))
An info telling something which can be interesting to know.
static uint activeSockets
bool runOpen(const EVT_CTRL2 &evt)
bool wait(bool abrt=false)
bool create(sockaddr_in addr)
bool mainloop(READ_STRUCT *rd)
bool check(int, sockaddr_in addr)
void applyCalib(const EVT_CTRL2 &evt, const size_t &size)
void runClose(const EVT_CTRL2 &run)
Warning because the service this data corrsponds to might have been last updated longer ago than Local time
std::array< uint32_t, 8 > triggerCounter
bool eventCheck(const EVT_CTRL2 &evt)
Error, something unexpected happened, but can still be handled by the program.
bool procEvt(const shared_ptr< EVT_CTRL2 > &evt)
bool checkRoiConsistency(const READ_STRUCT &rd, uint16_t roi[])
bool proc1(const shared_ptr< EVT_CTRL2 > &)
An error which cannot be handled at all happend, the only solution is program termination.
shared_ptr< EVT_CTRL2 > mBufEvt(const READ_STRUCT &rd, shared_ptr< RUN_CTRL2 > &actrun)
void factReportIncomplete(uint64_t rep)