Definition at line 1085 of file EventBuilder.cc.
References READ_STRUCT::activeSockets, actrun, Memory::allocated, READ_STRUCT::B, READ_STRUCT::bufLen, READ_STRUCT::bufPos, READ_STRUCT::bufTyp, READ_STRUCT::connected, copyData(), debugHead(), end, factPrintf(), factReportIncomplete(), factStat(), READ_STRUCT::fd_epoll, g_evtTimeout, g_maxMem, g_reset, READ_STRUCT::get(), gi_NumConnect, gj, READ_STRUCT::H, i, Memory::inuse, READ_STRUCT::kData, MessageImp::kError, kFileOpen, READ_STRUCT::kHeader, MessageImp::kInfo, kRequestConnectionChange, kRequestNone, kRequestTimeout, READ_STRUCT::len(), Memory::max_inuse, MAX_TOT_MEM, mBufEvt(), NBOARDS, Queue< T, List >::post(), READ_STRUCT::read(), READ_STRUCT::relBytes, reportIncomplete(), runFinished(), Queue< T, List >::size(), READ_STRUCT::skip, READ_STRUCT::socket, READ_STRUCT::sockId, Queue< T, List >::start(), READ_STRUCT::swapData(), READ_STRUCT::swapHeader(), time, READ_STRUCT::totBytes, Queue< T, List >::wait(), and READ_STRUCT::wait().
Referenced by StartEvtBuild().
1093 actrun = make_shared<RUN_CTRL2>();
1096 time_t gi_SecTime =
time(NULL)-1;
1106 for (
int i=0;
i<40;
i++)
1108 if (rd[
i].socket>=0 && rd[
i].connected && rd[
i].bufLen>0)
1111 fds[nn].events = POLLIN;
1117 const int rc_epoll = poll(fds, nn, 100);
1127 if (rd[
i].socket>=0 && rd[
i].connected && rd[
i].bufLen>0)
1129 FD_SET(rd[
i].socket, &readfs);
1130 if (rd[
i].socket>nfsd)
1136 tv.tv_usec = 100000;
1137 const int rc_select = select(nfsd+1, &readfs, NULL, NULL, &tv);
1153 #ifdef PRIORITY_QUEUE 1154 priority_queue<READ_STRUCT*, vector<READ_STRUCT*>, READ_STRUCTcomp> prio;
1157 if (rd[
i].connected)
1160 if (!prio.empty())
do 1165 for (
int jj=0; jj<nn; jj++)
1168 for (
int jj=0; jj<rc_epoll; jj++)
1170 #
if !defined(USE_EPOLL) && !defined(USE_POLL) && !defined(PRIORITY_QUEUE)
1171 for (
int jj=0; jj<NBOARDS; jj++)
1174 #ifdef PRIORITY_QUEUE 1178 if (!FD_ISSET(rs->
socket, &readfs))
1183 if ((fds[jj].revents&POLLIN)==0)
1197 #if !defined(USE_POLL) && !defined(USE_EPOLL) && !defined(PRIORITY_QUEUE) 1198 const int i = (jj%4)*10 + (jj/4);
1202 #ifdef COMPLETE_EVENTS 1203 if (rs->
bufTyp==READ_STRUCT::kWait)
1209 const bool rc_read = rs->
read();
1228 for (k=0; k<
sizeof(PEVNT_HEADER)-1; k++)
1230 if (rs->
B[k]==0xfb && rs->
B[k+1] == 0x01)
1236 if (k==
sizeof(PEVNT_HEADER)-1)
1238 rs->
B[0] = rs->
B[
sizeof(PEVNT_HEADER)-1];
1240 rs->
bufLen =
sizeof(PEVNT_HEADER)-1;
1246 memmove(rs->
B, rs->
B+k,
sizeof(PEVNT_HEADER)-k);
1264 rs->
bufLen = rs->
len() -
sizeof(PEVNT_HEADER);
1271 const uint16_t &
end = *
reinterpret_cast<uint16_t*
>(rs->
bufPos-2);
1279 rs->
bufLen =
sizeof(PEVNT_HEADER);
1289 if (evt && !evt->initMemory())
1291 const time_t tm =
time(NULL);
1292 if (evt->runCtrl->reportMem==tm)
1296 evt->runCtrl->reportMem = tm;
1302 rs->
bufLen =
sizeof(PEVNT_HEADER);
1310 if (evt->board[rs->
sockId] != -1)
1324 #ifdef COMPLETE_EVENTS 1326 rs->
bufTyp = READ_STRUCT::kWait;
1330 evt->header = evt->FADhead+rs->
sockId;
1333 #ifdef COMPLETE_EPOLL 1349 const bool found = it->get()==evt.get();
1356 for (
int ib=0; ib<40; ib++)
1357 rd[ib].relBytes += uint32_t((*it)->FADhead[ib].package_length)*2;
1369 #ifdef COMPLETE_EPOLL 1370 for (
int j=0; j<40; j++)
1373 ev.events = EPOLLIN;
1374 ev.data.ptr = &rd[j];
1383 #ifdef COMPLETE_EVENTS 1384 for (
int j=0; j<40; j++)
1389 rs->
bufLen =
sizeof(PEVNT_HEADER);
1395 #ifdef PRIORITY_QUEUE 1401 const time_t actTime =
time(NULL);
1402 if (actTime == gi_SecTime)
1404 #if !defined(USE_SELECT) && !defined(USE_EPOLL) && !defined(USE_POLL) 1406 usleep(actTime-
actrun->lastTime>300 ? 10000 : 1);
1410 gi_SecTime = actTime;
1421 const shared_ptr<EVT_CTRL2> &evt = *it;
1451 for (
int ib=0; ib<40; ib++)
1452 rd[ib].relBytes += uint32_t(evt->FADhead[ib].package_length)*2;
1470 bool changed =
false;
1472 static vector<uint64_t> store(NBOARDS);
1474 for (
int ib=0; ib<
NBOARDS; ib++)
1481 if (rd[ib].check(
g_port[ib].sockDef,
g_port[ib].sockAddr))
1501 if (actTime-
actrun->lastTime>300)
1522 const bool abort = gi_reset%100==2;
1539 return gi_reset>=100;
void copyData(const READ_STRUCT &rBuf, EVT_CTRL2 *evt)
void factPrintf(int severity, const char *fmt,...)
list< shared_ptr< EVT_CTRL2 > > evtCtrl
uint64_t reportIncomplete(const shared_ptr< EVT_CTRL2 > &evt, const char *txt)
Queue< shared_ptr< EVT_CTRL2 > > processingQueue1(bind(&proc1, placeholders::_1))
Queue< shared_ptr< EVT_CTRL2 > > secondaryQueue(bind(&writeEvt, placeholders::_1))
void factStat(const GUI_STAT &gj)
uint gi_NumConnect[NBOARDS]
static READ_STRUCT * get(int i)
FACT_SOCK g_port[NBOARDS]
shared_ptr< RUN_CTRL2 > actrun
void debugHead(void *buf)
Queue< shared_ptr< EVT_CTRL2 > > primaryQueue(bind(&procEvt, placeholders::_1))
An info telling something which can be interesting to know.
static uint activeSockets
Warning because the service this data corrsponds to might have been last updated longer ago than Local time
Error, something unexpected happened, but can still be handled by the program.
shared_ptr< EVT_CTRL2 > mBufEvt(const READ_STRUCT &rd, shared_ptr< RUN_CTRL2 > &actrun)
void factReportIncomplete(uint64_t rep)