FACT++  1.0
bool mainloop ( READ_STRUCT rd)

Definition at line 1085 of file EventBuilder.cc.

References READ_STRUCT::activeSockets, actrun, Memory::allocated, READ_STRUCT::B, READ_STRUCT::bufLen, READ_STRUCT::bufPos, READ_STRUCT::bufTyp, READ_STRUCT::connected, copyData(), debugHead(), end, factPrintf(), factReportIncomplete(), factStat(), READ_STRUCT::fd_epoll, g_evtTimeout, g_maxMem, g_reset, READ_STRUCT::get(), gi_NumConnect, gj, READ_STRUCT::H, i, Memory::inuse, READ_STRUCT::kData, MessageImp::kError, kFileOpen, READ_STRUCT::kHeader, MessageImp::kInfo, kRequestConnectionChange, kRequestNone, kRequestTimeout, READ_STRUCT::len(), Memory::max_inuse, MAX_TOT_MEM, mBufEvt(), NBOARDS, Queue< T, List >::post(), READ_STRUCT::read(), READ_STRUCT::relBytes, reportIncomplete(), runFinished(), Queue< T, List >::size(), READ_STRUCT::skip, READ_STRUCT::socket, READ_STRUCT::sockId, Queue< T, List >::start(), READ_STRUCT::swapData(), READ_STRUCT::swapHeader(), time, READ_STRUCT::totBytes, Queue< T, List >::wait(), and READ_STRUCT::wait().

Referenced by StartEvtBuild().

1086 {
1087  factPrintf(MessageImp::kInfo, "Starting EventBuilder main loop");
1088 
1089  primaryQueue.start();
1090  secondaryQueue.start();
1091  processingQueue1.start();;
1092 
1093  actrun = make_shared<RUN_CTRL2>();
1094 
1095  //time in seconds
1096  time_t gi_SecTime = time(NULL)-1;
1097 
1098  //loop until global variable g_runStat claims stop
1099  g_reset = 0;
1100  while (g_reset == 0)
1101  {
1102 #ifdef USE_POLL
1103  int pp[40];
1104  int nn = 0;
1105  pollfd fds[40];
1106  for (int i=0; i<40; i++)
1107  {
1108  if (rd[i].socket>=0 && rd[i].connected && rd[i].bufLen>0)
1109  {
1110  fds[nn].fd = rd[i].socket;
1111  fds[nn].events = POLLIN;
1112  pp[nn] = i;
1113  nn++;
1114  }
1115  }
1116 
1117  const int rc_epoll = poll(fds, nn, 100);
1118  if (rc_epoll<0)
1119  break;
1120 #endif
1121 
1122 #ifdef USE_SELECT
1123  fd_set readfs;
1124  FD_ZERO(&readfs);
1125  int nfsd = 0;
1126  for (int i=0; i<NBOARDS; i++)
1127  if (rd[i].socket>=0 && rd[i].connected && rd[i].bufLen>0)
1128  {
1129  FD_SET(rd[i].socket, &readfs);
1130  if (rd[i].socket>nfsd)
1131  nfsd = rd[i].socket;
1132  }
1133 
1134  timeval tv;
1135  tv.tv_sec = 0;
1136  tv.tv_usec = 100000;
1137  const int rc_select = select(nfsd+1, &readfs, NULL, NULL, &tv);
1138  // 0: timeout
1139  // -1: error
1140  if (rc_select<0)
1141  {
1142  factPrintf(MessageImp::kError, "Waiting for data failed: %d (select,rc=%d)", errno);
1143  continue;
1144  }
1145 #endif
1146 
1147 #ifdef USE_EPOLL
1148  const int rc_epoll = READ_STRUCT::wait();
1149  if (rc_epoll<0)
1150  break;
1151 #endif
1152 
1153 #ifdef PRIORITY_QUEUE
1154  priority_queue<READ_STRUCT*, vector<READ_STRUCT*>, READ_STRUCTcomp> prio;
1155 
1156  for (int i=0; i<NBOARDS; i++)
1157  if (rd[i].connected)
1158  prio.push(rd+i);
1159 
1160  if (!prio.empty()) do
1161 #endif
1162 
1163 
1164 #ifdef USE_POLL
1165  for (int jj=0; jj<nn; jj++)
1166 #endif
1167 #ifdef USE_EPOLL
1168  for (int jj=0; jj<rc_epoll; jj++)
1169 #endif
1170 #if !defined(USE_EPOLL) && !defined(USE_POLL) && !defined(PRIORITY_QUEUE)
1171  for (int jj=0; jj<NBOARDS; jj++)
1172 #endif
1173  {
1174 #ifdef PRIORITY_QUEUE
1175  READ_STRUCT *rs = prio.top();
1176 #endif
1177 #ifdef USE_SELECT
1178  if (!FD_ISSET(rs->socket, &readfs))
1179  continue;
1180 #endif
1181 
1182 #ifdef USE_POLL
1183  if ((fds[jj].revents&POLLIN)==0)
1184  continue;
1185 #endif
1186 
1187 #ifdef USE_EPOLL
1188  // FIXME: How to get i?
1189  READ_STRUCT *rs = READ_STRUCT::get(jj);
1190 #endif
1191 
1192 #ifdef USE_POLL
1193  // FIXME: How to get i?
1194  READ_STRUCT *rs = &rd[pp[jj]];
1195 #endif
1196 
1197 #if !defined(USE_POLL) && !defined(USE_EPOLL) && !defined(PRIORITY_QUEUE)
1198  const int i = (jj%4)*10 + (jj/4);
1199  READ_STRUCT *rs = &rd[i];
1200 #endif
1201 
1202 #ifdef COMPLETE_EVENTS
1203  if (rs->bufTyp==READ_STRUCT::kWait)
1204  continue;
1205 #endif
1206 
1207  // ==================================================================
1208 
1209  const bool rc_read = rs->read();
1210 
1211  // Connect might have gotten closed during read
1212  gi_NumConnect[rs->sockId] = rs->connected;
1213  gj.numConn[rs->sockId] = rs->connected;
1214 
1215  // Read either failed or disconnected, or the buffer is not yet full
1216  if (!rc_read)
1217  continue;
1218 
1219  // ==================================================================
1220 
1221  if (rs->bufTyp==READ_STRUCT::kHeader)
1222  {
1223  //check if startflag correct; else shift block ....
1224  // FIXME: This is not enough... this combination of
1225  // bytes can be anywhere... at least the end bytes
1226  // must be checked somewhere, too.
1227  uint k;
1228  for (k=0; k<sizeof(PEVNT_HEADER)-1; k++)
1229  {
1230  if (rs->B[k]==0xfb && rs->B[k+1] == 0x01)
1231  break;
1232  }
1233  rs->skip += k;
1234 
1235  //no start of header found
1236  if (k==sizeof(PEVNT_HEADER)-1)
1237  {
1238  rs->B[0] = rs->B[sizeof(PEVNT_HEADER)-1];
1239  rs->bufPos = rs->B+1;
1240  rs->bufLen = sizeof(PEVNT_HEADER)-1;
1241  continue;
1242  }
1243 
1244  if (k > 0)
1245  {
1246  memmove(rs->B, rs->B+k, sizeof(PEVNT_HEADER)-k);
1247 
1248  rs->bufPos -= k;
1249  rs->bufLen += k;
1250 
1251  continue; // We need to read more (bufLen>0)
1252  }
1253 
1254  if (rs->skip>0)
1255  {
1256  factPrintf(MessageImp::kInfo, "Skipped %d bytes on port %d", rs->skip, rs->sockId);
1257  rs->skip = 0;
1258  }
1259 
1260  // Swap the header entries from network to host order
1261  rs->swapHeader();
1262 
1263  rs->bufTyp = READ_STRUCT::kData;
1264  rs->bufLen = rs->len() - sizeof(PEVNT_HEADER);
1265 
1266  debugHead(rs->B); // i and fadBoard not used
1267 
1268  continue;
1269  }
1270 
1271  const uint16_t &end = *reinterpret_cast<uint16_t*>(rs->bufPos-2);
1272  if (end != 0xfe04)
1273  {
1274  factPrintf(MessageImp::kError, "End-of-event flag wrong on socket %2d for event %d (len=%d), got %04x",
1275  rs->sockId, rs->H.fad_evt_counter, rs->len(), end);
1276 
1277  // ready to read next header
1279  rs->bufLen = sizeof(PEVNT_HEADER);
1280  rs->bufPos = rs->B;
1281  // FIXME: What to do with the validity flag?
1282  continue;
1283  }
1284 
1285  // get index into mBuffer for this event (create if needed)
1286  const shared_ptr<EVT_CTRL2> evt = mBufEvt(*rs, actrun);
1287 
1288  // We have a valid entry, but no memory has yet been allocated
1289  if (evt && !evt->initMemory())
1290  {
1291  const time_t tm = time(NULL);
1292  if (evt->runCtrl->reportMem==tm)
1293  continue;
1294 
1295  factPrintf(MessageImp::kError, "No free memory left for %d (run=%d)", evt->evNum, evt->runNum);
1296  evt->runCtrl->reportMem = tm;
1297  continue;
1298  }
1299 
1300  // ready to read next header
1302  rs->bufLen = sizeof(PEVNT_HEADER);
1303  rs->bufPos = rs->B;
1304 
1305  // Fatal error occured. Event cannot be processed. Skip it. Start reading next header.
1306  if (!evt)
1307  continue;
1308 
1309  // This should never happen
1310  if (evt->board[rs->sockId] != -1)
1311  {
1312  factPrintf(MessageImp::kError, "Got event %5d from board %3d (i=%3d, len=%5d) twice.",
1313  evt->evNum, rs->sockId, jj, rs->len());
1314  // FIXME: What to do with the validity flag?
1315  continue; // Continue reading next header
1316  }
1317 
1318  // Swap the data entries (board headers) from network to host order
1319  rs->swapData();
1320 
1321  // Copy data from rd[i] to mBuffer[evID]
1322  copyData(*rs, evt.get());
1323 
1324 #ifdef COMPLETE_EVENTS
1325  // Do not read anmymore from this board until the whole event has been received
1326  rs->bufTyp = READ_STRUCT::kWait;
1327 #endif
1328  // now we have stored a new board contents into Event structure
1329  evt->board[rs->sockId] = rs->sockId;
1330  evt->header = evt->FADhead+rs->sockId;
1331  evt->nBoard++;
1332 
1333 #ifdef COMPLETE_EPOLL
1334  if (epoll_ctl(READ_STRUCT::fd_epoll, EPOLL_CTL_DEL, rs->socket, NULL)<0)
1335  {
1336  factPrintf(MessageImp::kError, "epoll_ctrl failed: %m (EPOLL_CTL_DEL,rc=%d)", errno);
1337  break;
1338  }
1339 #endif
1340  // event not yet complete
1341  if (evt->nBoard < READ_STRUCT::activeSockets)
1342  continue;
1343 
1344  // All previous events are now flagged as incomplete ("expired")
1345  // and will be removed. (This is a bit tricky, because pop_front()
1346  // would invalidate the current iterator if not done _after_ the increment)
1347  for (auto it=evtCtrl.begin(); it!=evtCtrl.end(); )
1348  {
1349  const bool found = it->get()==evt.get();
1350  if (!found)
1351  reportIncomplete(*it, "expired");
1352  else
1353  primaryQueue.post(evt);
1354 
1355  // package_len is 0 if nothing was received.
1356  for (int ib=0; ib<40; ib++)
1357  rd[ib].relBytes += uint32_t((*it)->FADhead[ib].package_length)*2;
1358 
1359  // The counter must be increased _before_ the pop_front,
1360  // otherwise the counter is invalidated by the pop_front!
1361  it++;
1362  evtCtrl.pop_front();
1363 
1364  // We reached the current event, so we are done
1365  if (found)
1366  break;
1367  }
1368 
1369 #ifdef COMPLETE_EPOLL
1370  for (int j=0; j<40; j++)
1371  {
1372  epoll_event ev;
1373  ev.events = EPOLLIN;
1374  ev.data.ptr = &rd[j]; // user data (union: ev.ptr)
1375  if (epoll_ctl(READ_STRUCT::fd_epoll, EPOLL_CTL_ADD, rd[j].socket, &ev)<0)
1376  {
1377  factPrintf(MessageImp::kError, "epoll_ctl failed: %m (EPOLL_CTL_ADD,rc=%d)", errno);
1378  return;
1379  }
1380  }
1381 #endif
1382 
1383 #ifdef COMPLETE_EVENTS
1384  for (int j=0; j<40; j++)
1385  {
1386  //if (rs->bufTyp==READ_STRUCT::kWait)
1387  {
1389  rs->bufLen = sizeof(PEVNT_HEADER);
1390  rs->bufPos = rs->B;
1391  }
1392  }
1393 #endif
1394  } // end for loop over all sockets
1395 #ifdef PRIORITY_QUEUE
1396  while (0); // convert continue into break ;)
1397 #endif
1398 
1399  // ==================================================================
1400 
1401  const time_t actTime = time(NULL);
1402  if (actTime == gi_SecTime)
1403  {
1404 #if !defined(USE_SELECT) && !defined(USE_EPOLL) && !defined(USE_POLL)
1405  if (evtCtrl.empty())
1406  usleep(actTime-actrun->lastTime>300 ? 10000 : 1);
1407 #endif
1408  continue;
1409  }
1410  gi_SecTime = actTime;
1411 
1412  // ==================================================================
1413  //loop over all active events and flag those older than read-timeout
1414  //delete those that are written to disk ....
1415 
1416  // This could be improved having the pointer which separates the queue with
1417  // the incomplete events from the queue with the complete events
1418  for (auto it=evtCtrl.begin(); it!=evtCtrl.end(); )
1419  {
1420  // A reference is enough because the shared_ptr is hold by the evtCtrl
1421  const shared_ptr<EVT_CTRL2> &evt = *it;
1422 
1423  // The first event is the oldest. If the first event within the
1424  // timeout window was received, we can stop searching further.
1425  if (evt->time.tv_sec+g_evtTimeout>=actTime)
1426  break;
1427 
1428  // The counter must be increased _before_ the pop_front,
1429  // otherwise the counter is invalidated by the pop_front!
1430  it++;
1431 
1432  // This timeout is caused because complete data from one or more
1433  // boards has been received, but the memory could not be allocated.
1434  // There is no reason why we should not go on waiting for
1435  // memory to become free. However, the FADs will disconnect
1436  // after 60s due to their keep-alive timeout, but the event builder
1437  // will still wait for memory to become available.
1438  // Currently, the only possibility to free the memory from the
1439  // evtCtrl to restart the event builder (STOP/START).
1440  if (!evt->valid())
1441  continue;
1442 
1443  // This will result in the emission of a dim service.
1444  // It doesn't matter if that takes comparably long,
1445  // because we have to stop the run anyway.
1446  const uint64_t rep = reportIncomplete(evt, "timeout");
1447  factReportIncomplete(rep);
1448 
1449  // At least the data from one boards is complete...
1450  // package_len is 0 when nothing was received from this board
1451  for (int ib=0; ib<40; ib++)
1452  rd[ib].relBytes += uint32_t(evt->FADhead[ib].package_length)*2;
1453 
1454  evtCtrl.pop_front();
1455  }
1456 
1457  // =================================================================
1458 
1459  gj.bufNew = evtCtrl.size(); //# incomplete events in buffer
1460  gj.bufEvt = primaryQueue.size(); //# complete events in buffer
1461  gj.bufWrite = secondaryQueue.size(); //# complete events in buffer
1462  gj.bufProc = processingQueue1.size(); //# complete events in buffer
1463  gj.bufTot = Memory::max_inuse/MAX_TOT_MEM;
1464  gj.usdMem = Memory::max_inuse;
1465  gj.totMem = Memory::allocated;
1466  gj.maxMem = g_maxMem;
1467 
1468  gj.deltaT = 1000; // temporary, must be improved
1469 
1470  bool changed = false;
1471 
1472  static vector<uint64_t> store(NBOARDS);
1473 
1474  for (int ib=0; ib<NBOARDS; ib++)
1475  {
1476  gj.rateBytes[ib] = store[ib]>rd[ib].totBytes ? rd[ib].totBytes : rd[ib].totBytes-store[ib];
1477  gj.relBytes[ib] = rd[ib].totBytes-rd[ib].relBytes;
1478 
1479  store[ib] = rd[ib].totBytes;
1480 
1481  if (rd[ib].check(g_port[ib].sockDef, g_port[ib].sockAddr))
1482  changed = true;
1483 
1484  gi_NumConnect[ib] = rd[ib].connected;
1485  gj.numConn[ib] = rd[ib].connected;
1486  }
1487 
1488  factStat(gj);
1489 
1490  Memory::max_inuse = 0;
1491 
1492  // =================================================================
1493 
1494  // This is a fake event to trigger possible run-closing conditions once a second
1495  // FIXME: This is not yet ideal because a file would never be closed
1496  // if a new file has been started and no events of the new file
1497  // have been received yet
1498  int request = kRequestNone;
1499 
1500  // If nothing was received for more than 5min, close file
1501  if (actTime-actrun->lastTime>300)
1502  request |= kRequestTimeout;
1503 
1504  // If connection status has changed
1505  if (changed)
1506  request |= kRequestConnectionChange;
1507 
1508  if (request!=kRequestNone)
1509  runFinished();
1510 
1511  if (actrun->fileStat==kFileOpen)
1512  primaryQueue.emplace(new EVT_CTRL2(request, actrun));
1513  }
1514 
1515  // 1: Stop, wait for event to get processed
1516  // 2: Stop, finish immediately
1517  // 101: Restart, wait for events to get processed
1518  // 101: Restart, finish immediately
1519  //
1520  const int gi_reset = g_reset;
1521 
1522  const bool abort = gi_reset%100==2;
1523 
1524  factPrintf(MessageImp::kInfo, "Stop reading ... RESET=%d (%s threads)", gi_reset, abort?"abort":"join");
1525 
1526  primaryQueue.wait(abort);
1527  secondaryQueue.wait(abort);
1528  processingQueue1.wait(abort);
1529 
1530  // Here we also destroy all runCtrl structures and hence close all open files
1531  evtCtrl.clear();
1532  actrun.reset();
1533 
1534  factPrintf(MessageImp::kInfo, "Exit read Process...");
1535  factPrintf(MessageImp::kInfo, "%llu Bytes flagged as in-use.", Memory::inuse);
1536 
1537  factStat(gj);
1538 
1539  return gi_reset>=100;
1540 }
uint32_t len() const
void copyData(const READ_STRUCT &rBuf, EVT_CTRL2 *evt)
void factPrintf(int severity, const char *fmt,...)
list< shared_ptr< EVT_CTRL2 > > evtCtrl
uint64_t reportIncomplete(const shared_ptr< EVT_CTRL2 > &evt, const char *txt)
int i
Definition: db_dim_client.c:21
Queue< shared_ptr< EVT_CTRL2 > > processingQueue1(bind(&proc1, placeholders::_1))
uint8_t * bufPos
void swapData()
uint64_t totBytes
uint32_t bufLen
Queue< shared_ptr< EVT_CTRL2 > > secondaryQueue(bind(&writeEvt, placeholders::_1))
uint64_t relBytes
void factStat(const GUI_STAT &gj)
uint gi_NumConnect[NBOARDS]
Definition: EventBuilder.cc:82
static READ_STRUCT * get(int i)
FACT_SOCK g_port[NBOARDS]
Definition: EventBuilder.cc:80
GUI_STAT gj
Definition: EventBuilder.cc:84
shared_ptr< RUN_CTRL2 > actrun
void debugHead(void *buf)
Queue< shared_ptr< EVT_CTRL2 > > primaryQueue(bind(&procEvt, placeholders::_1))
PEVNT_HEADER H
An info telling something which can be interesting to know.
Definition: MessageImp.h:17
size_t g_maxMem
Definition: EventBuilder.cc:76
static uint activeSockets
uint64_t inuse
Definition: EventBuilder.cc:90
uint64_t max_inuse
Definition: EventBuilder.cc:93
Warning because the service this data corrsponds to might have been last updated longer ago than Local time
Definition: smartfact.txt:92
uint8_t B[MAX_LEN]
double end
#define NBOARDS
Definition: BasicGlCamera.h:4
Error, something unexpected happened, but can still be handled by the program.
Definition: MessageImp.h:19
buftyp_t bufTyp
void runFinished()
uint16_t g_evtTimeout
Definition: EventBuilder.cc:78
static int fd_epoll
uint64_t allocated
Definition: EventBuilder.cc:91
#define MAX_TOT_MEM
Definition: EventBuilder.h:96
void swapHeader()
int g_reset
Definition: EventBuilder.cc:74
static int wait()
shared_ptr< EVT_CTRL2 > mBufEvt(const READ_STRUCT &rd, shared_ptr< RUN_CTRL2 > &actrun)
uint32_t skip
void factReportIncomplete(uint64_t rep)

+ Here is the call graph for this function:

+ Here is the caller graph for this function: