69 #include <boost/filesystem.hpp> 89 #include <libnova/solar.h> 90 #include <libnova/rise_set.h> 102 char fileName[FILENAME_MAX];
160 fConv = shared_ptr<Converter>();
163 fitsBufferAllocated =
false;
185 kSM_NightlyOpen = 20,
188 kSM_BadFolder = 0x101,
189 kSM_RunWriteError = 0x103,
190 kSM_DailyWriteError = 0x103,
241 vector<Description> GetDescription(
const string& server,
const string& service);
247 int infoCallback(
const EventImp& evt,
unsigned int infoIndex);
258 int ConfigureFilePath(
const Event& evt);
260 int PrintState(
const Event& evt);
262 void CheckForRunNumber(
const EventImp& evt,
unsigned int index);
268 int StopRunLogging();
272 int NightlyToWaitRun();
274 int BackToNightlyOpen();
288 void ServicesMonitoring();
303 void updateSubscriptionList();
305 int setSubscriptionListUpdateTimeLapse(
const Event& evt);
319 int SetDebugOnOff(
const Event& evt);
320 int SetStatsPeriod(
const Event& evt);
321 int SetOpenedFilesOnOff(
const Event& evt);
322 int SetNumSubsAndFitsOnOff(
const Event& evt);
323 int SetRunTimeoutDelay(
const Event& evt);
333 void CreateFitsGrouping(map<
string, vector<string> >& filesToGroup);
335 bool OpenStreamImp(ofstream &stream,
const string &filename,
bool mightbeopen);
336 bool OpenStream(shared_ptr<ofstream> stream,
const string &filename);
340 void AddNewRunNumber(int64_t newRun,
Time time);
343 void RemoveOldestRunNumber();
345 off_t GetFileSize(
const string&);
351 string CompileFileNameWithPath(
const string &path,
const string &service,
const string & extension);
353 string CompileFileName(
const string& service,
const string& extension,
const Time&
time=
Time())
const;
355 bool ShouldSubscribe(
const string& server,
const string& service);
359 bool OpenTextFile(ofstream& stream,
const string& name);
361 bool CheckForOfstreamError(ofstream& out,
bool isDailyStream);
363 void GoToRunWriteErrorState();
364 void GoToNightlyWriteErrorState();
366 bool DoesPathExist(
string path);
368 void TrimOldRunNumbers();
370 bool CreateDirectory(
const string &path);
375 void AddServer(
const string& server);
377 void AddService(
const Service& svc);
380 void RemoveService(
const string,
const string,
bool);
383 void RemoveAllServices(
const string&);
406 int Write(
const Time &
time,
const std::string &txt,
int qos=kMessage);
421 Debug(
"Adding description for service: " + it->front().name);
423 fServiceDescriptionList[it->front().name].assign(it->begin(), it->end());
427 return GetCurrentState();
435 if (fCurrentSubscriptionUpdateRate <= 0)
return;
438 if (timeNow - fLastSubscriptionUpdate < boost::posix_time::seconds(fCurrentSubscriptionUpdateRate))
443 fLastSubscriptionUpdate = timeNow;
446 ostringstream output;
447 for (
auto serverIt=fServiceSubscriptions.begin();serverIt!=fServiceSubscriptions.end(); serverIt++)
449 if (serverIt->first ==
"DATA_LOGGER")
451 for (
auto serviceIt=serverIt->second.begin(); serviceIt!=serverIt->second.end(); serviceIt++)
453 output << serverIt->first <<
"/" << serviceIt->first <<
",";
454 if (serviceIt->second.lastReceivedEvent !=
Time::None)
455 output << (timeNow - serviceIt->second.lastReceivedEvent).total_seconds();
463 fCurrentSubscription->
setData(output.str().c_str(), output.str().size()+1);
465 fCurrentSubscription->
Update();
469 fCurrentSubscriptionUpdateRate = evt.
GetInt();
471 return GetCurrentState();
475 const lock_guard<mutex> guard(fMutex);
476 const auto it = fServiceDescriptionList.find(server+
"/"+service);
477 return it==fServiceDescriptionList.end()?vector<Description>():it->second;
491 ss <<
"datalogger: " << txt;
492 if (fNightlyLogFile.is_open())
494 fNightlyLogImp.
Write(time, ss.str(),
qos);
496 else if (shouldBackLog)
501 backLogBuffer.push_back(str.str());
515 boost::filesystem::create_directories(path);
518 catch (
const runtime_error &e)
537 Info(
"Got request to add server " + server );
538 if (server !=
"DIS_DNS")
540 for (
auto it=fServerDescriptionsList.begin(); it != fServerDescriptionsList.end(); it++)
541 if ((*it)->server == server)
546 str <<
"Already got description for server " << server <<
". Ignoring." << endl;
554 fServerDescriptionsList.push_back(d);
568 const string& serverr = svc.
server;
570 string server = serverr;
571 if (server.size() > 0 && server[0] ==
'+')
573 server = server.substr(1);
574 Warn(
"Got a service beginning with +. This is not supposed to happen");
578 const string& service = svc.
service;
579 const bool isCmd = svc.
iscmd;
585 Info(
"Got request to add service: "+server+
"/"+service);
588 if (!ShouldSubscribe(server, service))
591 map<string, SubscriptionType> &list = fServiceSubscriptions[server];
593 if (list.find(service) != list.end())
595 if (list[service].format != svc.
format)
597 if (list[service].nightlyFile.IsOpen())
599 string fileName = list[service].nightlyFile.GetName();
602 Error(
"Something went wrong while dealing with new format of "+server+
"/"+service+
" file tagged as open but filename is empty. Aborting");
605 list[service].nightlyFile.Close();
606 list[service].increment++;
607 Warn(
"Format of "+server+
"/"+service+
" has changed. Closing "+fileName);
626 list[service].fitsBufferAllocated =
false;
628 list[service].fConv = shared_ptr<Converter>(
new Converter(Out(), svc.
format));
629 list[service].format = svc.
format;
632 Debug(
"Service " + server +
"/" + service +
" is already in the dataLogger's list... ignoring update.");
638 Debug(
"Subscribing to service "+server+
"/"+service);
639 Subscribe(server +
"/" + service)
641 list[service].server = server;
642 list[service].service = service;
643 list[service].format = svc.
format;
644 list[service].index = servicesCounter;
647 if ((server ==
"FAD_CONTROL") && (service ==
"START_RUN"))
648 fRunNumberService = servicesCounter;
650 Info(
"Added subscription to " + server +
"/" + service);
662 Info(
"Got request to remove service: "+server+
"/"+service);
671 if (fServiceSubscriptions.find(server) == fServiceSubscriptions.end())
673 Error(
"Request to remove service "+service+
" from server "+server+
", but service not found.");
677 if (fServiceSubscriptions[server].erase(service) != 1)
680 if (!ShouldSubscribe(server, service))
683 Error(
"Subscription "+server+
"/"+service+
" could not be removed as it is not present");
688 if ((server ==
"FAD_CONTROL") && (service ==
"START_RUN"))
689 fRunNumberService = 0;
691 Info(
"Removed subscription to " + server +
"/" + service);
700 Info(
"Got request for removing all services from: "+server);
701 if (fServiceSubscriptions.find(server)==fServiceSubscriptions.end())
703 Warn(
"Request to remove all services, but corresponding server " + server +
" not found.");
708 fNumSubAndFitsData.
numSubscriptions -= fServiceSubscriptions[server].size();
710 fServiceSubscriptions[server].clear();
711 fServiceSubscriptions.erase(server);
713 if (server ==
"FAD_CONTROL")
714 fRunNumberService = 0;
717 Debug(
"Removed all subscriptions to " + server +
"/");
730 Error(
"An error occured while writing to a text file. Closing it");
734 GoToNightlyWriteErrorState();
736 GoToRunWriteErrorState();
743 if (stream.is_open())
746 Error(filename+
" was already open when trying to open it.");
751 stream.open(filename.c_str(), ios_base::out | ios_base::app);
755 str <<
"ofstream::open() failed for '" << filename <<
"': " << strerror(errno) <<
" [errno=" << errno <<
"]";
760 if (!stream.is_open())
762 Error(
"File "+filename+
" not open as it ought to be.");
766 Info(
"Opened: "+filename);
773 return OpenStreamImp(*stream, filename,
false);
784 return OpenStreamImp(stream, name,
true);
810 if ((fBlackList.find(server +
"/") != fBlackList.end()) ||
811 (fBlackList.find(server +
"/" + service) != fBlackList.end()) ||
812 (fBlackList.find(
"/" + service) != fBlackList.end()))
814 if (fWhiteList.size()>0 &&
815 (fWhiteList.find(server +
"/" + service) != fWhiteList.end()))
818 Debug(
"White list saved service " + server +
"/" + service +
" from blacklisting");
822 Debug(
"Blacklist banned service " + server +
"/" + service);
839 const Time ftime(time);
842 if (!service.empty())
843 str <<
'.' << service;
845 if (!extension.empty())
846 str <<
"." << extension;
859 const Time ftime = fCurrentDay-boost::posix_time::hours(12);
862 str << path << ftime.
GetAsStr(
"/%Y/%m/%d");
865 if (!DoesPathExist(str.str()))
866 CreateDirectory(str.str());
868 str <<
'/' << CompileFileName(service, extension, ftime);
896 str <<
"Removing run number " << fRunNumber.front().runNumber;
900 fRunNumber.pop_front();
911 fNightlyLogImp(fNightlyLogFile), fFilesStats(
"DATA_LOGGER", *this)
936 AddStateName(
kSM_Logging,
"Logging",
"The summary files for the night and the files for a single run are open.");
944 (
"Start the nightly logging. Nightly file location must be specified already");
948 (
"Stop all data logging, close all files.");
952 (
"Transition to exit error states. Closes the any open file.");
956 (
"Go to waiting for run number state. In this state with any received run-number a new file is opened.");
960 (
"Go from the wait for run to nightly open state.");
965 (
"Print information about the internal status of the data logger.");
973 "Path and base name used for the nightly files." 974 "|Type[int]:type of open files (1=log, 2=rep, 4=fits)" 975 "|Name[string]:path and base file name");
978 "Path and base name used for the run files." 979 "|Type[int]:type of open files (1=log, 2=rep, 4=fits)" 980 "|Name[string]:path and base file name");
985 "Num. open files + num. subscribed services" 986 "|NSubAndOpenFiles[int]:Num. of subs and open files");
990 fOpenedFilesIsOn =
true;
991 fNumSubAndFitsIsOn =
true;
993 string emptyString=
"";
996 "List of all the services subscribed by datalogger, except the ones provided by itself." 997 "|Liste[string]:list of logged services and the delay in seconds since last update");
998 fCurrentSubscriptionUpdateRate = 60;
999 fLastSubscriptionUpdate = timeNow;
1004 (
"Switch debug mode on or off. Debug mode prints information about every service written to a file." 1005 "|Enable[bool]:Enable of disable debug mode (yes/no).");
1009 (
"Interval in which the data-logger statistics service (STATS) is updated." 1010 "|Interval[ms]:Value in milliseconds (<=0: no update).");
1014 (
"Switch service which distributes information about the open files on or off." 1015 "|Enable[bool]:Enable of disable filename services (yes/no).");
1019 (
"Switch the service which distributes information about the number of subscriptions and open files on or off." 1020 "|Enable[bool]:Enable of disable NUM_SUBS service (yes/no).");
1024 (
"Set the timeout delay for old run numbers." 1025 "|timeout[min]:Time out in minutes after which files for expired runs are closed.");
1029 (
"Set the min interval between two services-list updates." 1030 "|duration[sec]:The interval between two updates, in seconds.");
1032 fDestructing =
false;
1034 fPreviousOldRunNumberCheck =
Time().
Mjd();
1036 fDailyFileDayChangedAlready =
true;
1037 fRunNumberTimeout = 60000;
1039 fRunNumber.back().runNumber = -1;
1040 fRunNumber.back().time =
Time();
1052 Debug(
"DataLogger Init Done.");
1063 Debug(
"DataLogger destruction starts");
1066 fDestructing =
true;
1070 fServiceSubscriptions.clear();
1074 while (fRunNumber.size() > 0)
1081 Info(
"Will soon close the daily log file");
1088 if (fNightlyLogFile.is_open())
1090 fNightlyLogFile << endl;
1091 fNightlyLogFile.close();
1093 if (!fNightlyLogFile.is_open())
1094 Info(
"Daily log file was closed indeed");
1096 Warn(
"Seems like there was a problem while closing the daily log file.");
1101 Debug(
"DataLogger desctruction ends");
1112 if (cTime - fPreviousOldRunNumberCheck < boost::posix_time::milliseconds(fRunNumberTimeout))
1115 while (fRunNumber.size() > 1 && (cTime - fRunNumber.back().time) > boost::posix_time::milliseconds(fRunNumberTimeout))
1119 fPreviousOldRunNumberCheck = cTime;
1150 SubscriptionsListType::iterator x;
1151 map<string, SubscriptionType>::iterator y;
1152 for (x=fServiceSubscriptions.begin(); x != fServiceSubscriptions.end(); x++)
1155 for (y=x->second.begin(); y!=x->second.end();y++)
1156 if (y->second.index == subIndex)
1165 if (!found && fDebugIsOn)
1168 str <<
"Service " << evt.
GetName() <<
" not found in subscriptions" << endl;
1178 str <<
"Got 0 size for " << evt.
GetName() << endl;
1187 str <<
"Got no format for " << evt.
GetName() << endl;
1219 if (newRun > 0xffffffff)
1221 Error(
"New run number too large, out of range. Ignoring.");
1224 for (std::vector<int64_t>::const_iterator it=previousRunNumbers.begin(); it != previousRunNumbers.end(); it++)
1228 Error(
"Newly provided run number has already been used (or is still in use). Going to error state");
1236 str <<
"Adding new run number " << newRun <<
" issued at " <<
time;
1241 fRunNumber.back().runNumber = int32_t(newRun);
1242 fRunNumber.back().time =
time;
1247 str <<
"The new run number is: " << fRunNumber.back().runNumber;
1323 const bool isItaReport = fmt!=
"C";
1325 if (!fNightlyLogFile.is_open())
1328 if (fDebugIsOn &&
string(evt.
GetName())!=
"DATA_LOGGER/MESSAGE")
1344 if (
lastFlush < timeNow-boost::posix_time::minutes(1))
1347 SubscriptionsListType::iterator x;
1348 map<string, SubscriptionType>::iterator y;
1349 for (x=fServiceSubscriptions.begin(); x != fServiceSubscriptions.end(); x++)
1351 for (y=x->second.begin(); y!=x->second.end();y++)
1352 if (y->second.nightlyFile.IsOpen())
1354 y->second.nightlyFile.Flush();
1358 Debug(
"Just flushed nightly fits files to the disk");
1366 SubscriptionsListType::iterator x;
1367 map<string, SubscriptionType>::iterator y;
1368 for (x=fServiceSubscriptions.begin(); x != fServiceSubscriptions.end(); x++)
1370 for (y=x->second.begin(); y!=x->second.end();y++)
1372 if (y->second.nightlyFile.IsOpen())
1374 y->second.nightlyFile.Close();
1376 y->second.increment = 0;
1381 Debug(
"Day have changed! Closing and reopening nightly files");
1383 fNightlyLogFile << endl;
1384 fNightlyLogFile.close();
1387 Info(
"Closed: "+fFullNightlyLogFileName);
1391 if (!
OpenTextFile(fNightlyLogFile, fFullNightlyLogFileName))
1397 fNightlyLogFile << endl;
1411 if (!sub.
fConv->valid())
1414 str <<
"Couldn't properly parse the format... service " << evt.
GetName() <<
" ignored.";
1420 ostringstream header;
1429 fMjD = cTime.Mjd() ? cTime.Mjd()-40587 : 0;
1479 if (!sub.nightlyFile.IsOpen())
1482 WriteToFITS(sub, evt.
GetData());
1487 vector<string> strings;
1492 catch (
const runtime_error &e)
1495 str <<
"Parsing service " << evt.
GetName();
1496 str <<
" failed: " << e.what() <<
" removing the subscription for now.";
1503 if (strings.size() > 1)
1506 err <<
"There was more than one string message in service " << evt.
GetName() <<
" going to fatal error state";
1510 bool isMessage = (sub.
service ==
"MESSAGE");
1512 string serviceName = isMessage ?
"" :
"_"+sub.
service;
1513 msg << sub.
server << serviceName;
1525 msg <<
"[" << fQuality <<
"]";
1530 if (isMessage && (fQuality ==
kAlarm) && (strings[0] ==
""))
1533 strings[0] =
"Alarm reset";
1535 msg <<
": " << strings[0];
1537 if (fNightlyLogFile.is_open())
1546 fQuality = backup_quality;
1549 if (!sub.nightlyFile.IsOpen())
1552 WriteToFITS(sub, evt.
GetData());
1566 Message(
"------------------------------------------");
1567 Message(
"------- DATA LOGGER CURRENT STATE --------");
1568 Message(
"------------------------------------------");
1571 #if BOOST_VERSION < 104600 1572 Message(
"File path: " + boost::filesystem::system_complete(boost::filesystem::path(fFilePath)).directory_string());
1574 Message(
"File path: " + boost::filesystem::system_complete(boost::filesystem::path(fFilePath)).parent_path().
string());
1580 str <<
"Timeout delay for old run numbers: " << fRunNumberTimeout <<
" ms";
1583 str <<
"Active Run Numbers:";
1584 for (list<RunNumberType>::const_iterator it=fRunNumber.begin(); it!=fRunNumber.end(); it++)
1585 str <<
" " << it->runNumber;
1586 if (fRunNumber.empty())
1591 Message(
"------------ OPEN FILES ----------------");
1592 if (fNightlyLogFile.is_open())
1593 Message(
"Nightly log-file: "+fFullNightlyLogFileName);
1602 str <<
"Number of open FITS files: " << fNumSubAndFitsData.
numOpenFits;
1606 Message(
"FITS output disabled at compilation");
1608 Message(
"----------------- STATS ------------------");
1612 str <<
"Statistics are updated every " << fFilesStats.
GetUpdateInterval() <<
" ms";
1616 Message(
"Statistics updates are currently disabled.");
1618 str <<
"Total Size written: " << statVar.
sizeWritten/1000 <<
" kB";
1621 str <<
"Disk free space: " << statVar.
freeSpace/1000000 <<
" MB";
1624 Message(
"------------ DIM SUBSCRIPTIONS -----------");
1626 str <<
"There are " << fNumSubAndFitsData.
numSubscriptions <<
" active DIM subscriptions.";
1628 for (map<
const string, map<string, SubscriptionType> >::const_iterator it=fServiceSubscriptions.begin(); it!= fServiceSubscriptions.end();it++)
1631 for (map<string, SubscriptionType>::const_iterator it2=it->second.begin(); it2!=it->second.end(); it2++)
1634 Message(
"--------------- BLOCK LIST ---------------");
1635 for (set<string>::const_iterator it=fBlackList.begin(); it != fBlackList.end(); it++)
1637 if (fBlackList.empty())
1640 Message(
"--------------- ALLOW LIST ---------------");
1641 for (set<string>::const_iterator it=fWhiteList.begin(); it != fWhiteList.end(); it++)
1643 if (fWhiteList.empty())
1646 Message(
"-------------- GROUPING LIST -------------");
1647 Message(
"The following servers and/or services will");
1648 Message(
"be grouped into a single fits file:");
1649 for (set<string>::const_iterator it=fGrouping.begin(); it != fGrouping.end(); it++)
1651 if (fGrouping.empty())
1654 Message(
"------------------------------------------");
1655 Message(
"-------- END OF DATA LOGGER STATE --------");
1656 Message(
"------------------------------------------");
1675 if (fDebugIsOn == backupDebug)
1676 Message(
"Debug mode was already in the requested state.");
1711 fOpenedFilesIsOn = evt.
GetBool();
1713 if (fOpenedFilesIsOn == backupOpened)
1714 Message(
"Opened files service mode was already in the requested state.");
1735 fNumSubAndFitsIsOn = evt.
GetBool();
1737 if (fNumSubAndFitsIsOn == backupSubs)
1738 Message(
"Number of subscriptions service mode was already in the requested state");
1758 Error(
"Timeout delays for old run numbers must be greater than 0... ignored.");
1762 if (fRunNumberTimeout == evt.
GetUInt())
1763 Message(
"New timeout for old run numbers is same value as previous one.");
1765 fRunNumberTimeout = evt.
GetUInt();
1768 str <<
"Timeout delay for old run numbers is now " << fRunNumberTimeout <<
" ms";
1783 if (!fOpenedFilesIsOn)
1789 str <<
"Updating " << service->
getName() <<
" file '" << name <<
"' (type=" << type <<
")";
1793 str <<
"Num subscriptions: " << fNumSubAndFitsData.
numSubscriptions <<
" Num open FITS files: " << fNumSubAndFitsData.
numOpenFits;
1797 if (name.size()+1 > FILENAME_MAX)
1799 Error(
"Provided file name '" + name +
"' is longer than allowed file name length.");
1805 memcpy(fToDim.
fileName, name.c_str(), name.size()+1);
1807 service->
setData(reinterpret_cast<void*>(&fToDim), name.size()+1+
sizeof(uint32_t));
1822 Debug(
"Starting...");
1825 bool nightlyLogOpen = fNightlyLogFile.is_open();
1826 if (!
OpenTextFile(fNightlyLogFile, fFullNightlyLogFileName))
1828 if (!nightlyLogOpen)
1829 fNightlyLogFile << endl;
1839 fFilesStats.
FileOpened(fFullNightlyLogFileName);
1845 fOpenedNightlyFits.clear();
1860 for (
unsigned int i=0;
i<serviceName.size();
i++)
1862 if (serviceName[
i] ==
'/')
1864 serviceName[
i] =
'_';
1869 if (!sub.nightlyFile.IsOpen())
1871 string incrementedServiceName = serviceName;
1876 incrementedServiceName += str.str();
1880 const string fileNameOnly = partialName.substr(partialName.find_last_of(
'/')+1, partialName.size());
1882 AllocateFITSBuffers(sub);
1885 fOpenedNightlyFits[fileNameOnly].push_back(serviceName);
1887 if (!sub.nightlyFile.Open(partialName, serviceName, &fNumSubAndFitsData.
numOpenFits,
this, 0))
1894 str <<
"Opened: " << partialName <<
" (Nfits=" << fNumSubAndFitsData.
numOpenFits <<
")";
1900 if (fNumSubAndFitsIsOn)
1901 fNumSubAndFits->
Update();
1913 Description dateDesc(
string(
"Time"),
string(
"Modified Julian Date"),
string(
"MJD"));
1914 sub.nightlyFile.AddStandardColumn(dateDesc,
"1D", &fMjD,
sizeof(
double));
1916 Description QoSDesc(
"QoS",
"Quality of service",
"");
1917 sub.nightlyFile.AddStandardColumn(QoSDesc,
"1J", &fQuality,
sizeof(
int));
1920 if (!sub.
fConv->valid())
1922 Error(
"Compilation of format string failed.");
1928 const vector<string> dataFormatsLocal = sub.
fConv->GetFitsFormat();
1931 str <<
"Initializing data columns for service " << sub.
server <<
"/" << sub.
service;
1945 if (sub.nightlyFile.IsOpen())
1947 if (!sub.nightlyFile.Write(*sub.
fConv.get(),
data))
1955 #endif //if has_fits 1990 str <<
"Creating fits group for nightly files";
1994 CCfits::FITS* groupFile;
1995 unsigned int numFilesToGroup = 0;
1996 unsigned int maxCharLength = 0;
1997 for (map<
string, vector<string> >::const_iterator it=filesToGroup.begin(); it != filesToGroup.end(); it++)
2000 numFilesToGroup += it->second.size();
2002 if (it->first.size() > maxCharLength)
2003 maxCharLength = it->first.size();
2004 for (vector<string>::const_iterator jt=it->second.begin(); jt != it->second.end(); jt++)
2005 if (jt->size() > maxCharLength)
2006 maxCharLength = jt->size();
2012 str <<
"There are " << numFilesToGroup <<
" tables to group";
2015 if (numFilesToGroup <= 1)
2017 filesToGroup.clear();
2022 Info(
"Creating FITS group in: "+groupName);
2024 CCfits::Table* groupTable;
2028 groupFile =
new CCfits::FITS(groupName, CCfits::RWmode::Write);
2030 ostringstream pathTypeName;
2031 pathTypeName << maxCharLength <<
"A";
2032 vector<string> names;
2033 vector<string> dataTypes;
2034 names.emplace_back(
"MEMBER_XTENSION");
2035 dataTypes.emplace_back(
"8A");
2036 names.emplace_back(
"MEMBER_URI_TYPE");
2037 dataTypes.emplace_back(
"3A");
2038 names.emplace_back(
"MEMBER_LOCATION");
2039 dataTypes.push_back(pathTypeName.str());
2040 names.emplace_back(
"MEMBER_NAME");
2041 dataTypes.push_back(pathTypeName.str());
2042 names.emplace_back(
"MEMBER_VERSION");
2043 dataTypes.emplace_back(
"1J");
2044 names.emplace_back(
"MEMBER_POSITION");
2045 dataTypes.emplace_back(
"1J");
2047 groupTable = groupFile->addTable(
"GROUPING", numFilesToGroup, names, dataTypes);
2050 catch (CCfits::FitsException e)
2053 str <<
"Creating FITS table GROUPING in " << groupName <<
": " << e.message();
2059 groupTable->addKey(
"GRPNAME",
"FACT_RAW_DATA",
"Data from the FACT telescope");
2061 catch (CCfits::FitsException e)
2063 Error(
"CCfits::Table::addKey failed for 'GRPNAME' in '"+groupName+
"-GROUPING': "+e.message());
2068 groupTable->makeThisCurrent();
2070 const unsigned int n = 8 + 3 + 2*maxCharLength + 1 + 8;
2072 vector<char> realBuffer(n);
2074 char *startOfExtension = realBuffer.data();
2075 char *startOfURI = realBuffer.data()+8;
2076 char *startOfLocation = realBuffer.data()+8+3;
2077 char *startOfName = realBuffer.data()+8+3+maxCharLength;
2079 strcpy(startOfExtension,
"BINTABLE");
2080 strcpy(startOfURI,
"URL");
2082 realBuffer[8+3+2*maxCharLength+3] = 1;
2083 realBuffer[8+3+2*maxCharLength+7] = 1;
2086 for (map<
string, vector<string> >::const_iterator it=filesToGroup.begin(); it!=filesToGroup.end(); it++)
2087 for (vector<string>::const_iterator jt=it->second.begin(); jt != it->second.end(); jt++, i++)
2089 memset(startOfLocation, 0, 2*maxCharLength+1+8);
2091 strcpy(startOfLocation, it->first.c_str());
2092 strcpy(startOfName, jt->c_str());
2097 str <<
"Grouping " << it->first <<
" " << *jt;
2102 fits_write_tblbytes(groupFile->fitsPointer(),
i, 1, 8+3+2*maxCharLength +8,
2103 reinterpret_cast<unsigned char*
>(realBuffer.data()), &status);
2107 fits_get_errstatus(status, text);
2109 str <<
"Writing FITS row " << i <<
" in " << groupName <<
": " << text <<
" (file_write_tblbytes, rc=" << status <<
")";
2117 filesToGroup.clear();
2133 Debug(
"Stopping Run Logging...");
2136 if (fNumSubAndFitsIsOn)
2137 fNumSubAndFits->
Update();
2139 while (fRunNumber.size() > 0)
2155 Debug(
"Going to the Ready state...");
2169 for (SubscriptionsListType::iterator
i = fServiceSubscriptions.begin();
i != fServiceSubscriptions.end();
i++)
2170 for (map<string, SubscriptionType>::iterator j =
i->second.begin(); j !=
i->second.end(); j++)
2172 if (j->second.nightlyFile.IsOpen())
2173 j->second.nightlyFile.Close();
2181 if (fNumSubAndFitsIsOn)
2182 fNumSubAndFits->
Update();
2208 Debug(
"Going to Wait Run Number state...");
2225 Debug(
"Going to NightlyOpen state...");
2236 fDebugIsOn = conf.
Get<
bool>(
"debug");
2244 fBlackList.insert(
"DATA_LOGGER/MESSAGE");
2245 fBlackList.insert(
"DATA_LOGGER/SUBSCRIPTIONS");
2246 fBlackList.insert(
"/SERVICE_LIST");
2247 fBlackList.insert(
"DIS_DNS/");
2250 const vector<string> vec1 = conf.
Vec<
string>(
"block");
2251 const vector<string> vec2 = conf.
Vec<
string>(
"allow");
2252 const vector<string> vec3 = conf.
Vec<
string>(
"group");
2254 fBlackList.insert(vec1.begin(), vec1.end());
2255 fWhiteList.insert(vec2.begin(), vec2.end());
2256 fGrouping.insert( vec3.begin(), vec3.end());
2259 if (conf.
Has(
"run-timeout"))
2261 const uint32_t timeout = conf.
Get<uint32_t>(
"run-timeout");
2264 Error(
"Time out delay for old run numbers must not be 0.");
2267 fRunNumberTimeout = timeout;
2271 if (conf.
Has(
"destination-folder"))
2273 const string folder = conf.
Get<
string>(
"destination-folder");
2279 if (!
OpenTextFile(fNightlyLogFile, fFullNightlyLogFileName))
2282 fNightlyLogFile << endl;
2285 fNightlyLogFile << *it;
2292 if (conf.
Has(
"stats-interval"))
2296 fOpenedFilesIsOn = !conf.
Get<
bool>(
"no-filename-service");
2299 fNumSubAndFitsIsOn = !conf.
Get<
bool>(
"no-numsubs-service");
2301 if (conf.
Has(
"start-daily-files"))
2302 if (conf.
Get<
bool>(
"start-daily-files"))
2306 if (conf.
Has(
"service-list-interval"))
2307 fCurrentSubscriptionUpdateRate = conf.
Get<int32_t>(
"service-list-interval");
2318 return Main::execute<T, DataLogger>(conf);
2333 "The data logger connects to all available Dim services and " 2334 "writes them to ascii and fits files.\n" 2336 "The default is that the program is started without user interaction. " 2337 "All actions are supposed to arrive as DimCommands. Using the -c " 2338 "option, a local shell can be initialized. With h or help a short " 2339 "help message about the usage can be brought to the screen.\n" 2341 "Usage: datalogger [-c type] [OPTIONS]\n" 2342 " or: datalogger [OPTIONS]\n";
2353 "If the allow list has any element, only the servers and/or services " 2354 "specified in the list will be used for subscription. The black list " 2355 "will disable service subscription and has higher priority than the " 2356 "allow list. If the allow list is not present by default all services " 2357 "will be subscribed." 2359 "For example, block=DIS_DNS/ will skip all the services offered by " 2360 "the DIS_DNS server, while block=/SERVICE_LIST will skip all the " 2361 "SERVICE_LIST services offered by any server and DIS_DNS/SERVICE_LIST " 2362 "will skip DIS_DNS/SERVICE_LIST.\n" 2365 Main::PrintHelp<DataLogger>();
2371 po::options_description configs(
"DataLogger options");
2372 configs.add_options()
2373 (
"block,b", vars<string>(),
"Black-list to block services")
2374 (
"allow,a", vars<string>(),
"White-list to only allowe certain services")
2375 (
"debug,d",
po_bool(),
"Debug mode. Print clear text of received service reports.")
2376 (
"group,g", vars<string>(),
"Grouping of services into a single run-Fits")
2377 (
"run-timeout", var<uint32_t>(),
"Time out delay for old run numbers in milliseconds.")
2378 (
"destination-folder", var<string>(),
"Base path for the nightly and run files")
2379 (
"stats-interval", var<int16_t>(),
"Interval in milliseconds for write statistics update")
2380 (
"no-filename-service",
po_bool(),
"Disable update of filename service")
2381 (
"no-numsubs-service",
po_bool(),
"Disable update of number-of-subscriptions service")
2382 (
"start-daily-files",
po_bool(),
"Starts the logger in DailyFileOpen instead of Ready")
2383 (
"service-list-interval", var<int32_t>(),
"Interval between two updates of the service SUBSCRIPTIONS")
2389 int main(
int argc,
const char* argv[])
2401 if (!conf.
Has(
"console"))
2402 return RunShell<LocalStream>(conf);
2405 if (conf.
Get<
int>(
"console")==0)
2406 return RunShell<LocalShell>(conf);
2408 return RunShell<LocalConsole>(conf);
int PrintState(const Event &evt)
print the current state of the dataLogger
vector< DimDescriptions * > fServerDescriptionsList
DimWriteStatistics fFilesStats
uint32_t numSubscriptions
int main(int argc, const char *argv[])
the folder specified for Nightly logging does not exist or has bad permissions
EventImp & AddEvent(const std::string &name, const std::string &states, const std::string &fmt)
SubscriptionType()
Dim info constructor.
void GoToNightlyWriteErrorState()
Go to Nightly Write Error State.
int fCurrentSubscriptionUpdateRate
Number of seconds since the last update of the subscribed list.
void RemoveOldestRunNumber()
removes the oldest run number, and close the relevant files.
map< const string, map< string, SubscriptionType > > SubscriptionsListType
for obtaining the name of the existing services
void CheckForRunNumber(const EventImp &evt, unsigned int index)
checks whether or not the current info being treated is a run number
void updateSubscriptionList()
update the service
Mainloop running, state machine in operation.
int GetCurrentState() const
return the current state of the machine
int setSubscriptionListUpdateTimeLapse(const Event &evt)
set the duration between two updates. a zero or negative value disables the service updates ...
int SetOpenedFilesOnOff(const Event &evt)
A general base-class describing events issues in a state machine.
void GoToRunWriteErrorState()
Goes to Write error states.
void RemoveAllServices(const string &)
Remove all the services associated with a given server.
void NotifyOpenedFile(const string &name, int type, DimDescribedService *service)
void SetupConfiguration(Configuration &conf)
int Debug(const std::string &str)
bool fDestructing
boolean to prevent DIM update while desctructing the dataLogger
both files openned and writing
void setQuality(int quality)
The base implementation of a distributed messaging system.
const std::string & GetName() const
ofstream fNightlyLogFile
ofstream for the NightlyLogfile
bool ShouldSubscribe(const string &server, const string &service)
Check whether service is in black and/or white list.
Adds some functionality to boost::posix_time::ptime for our needs.
void SetPrintUsage(const std::function< void(void)> &func)
T Get(const std::string &var)
uint32_t NightAsInt() const
int EvalOptions(Configuration &conf)
bool CreateDirectory(const string &path)
Create a given directory.
Time GetNextSunRise(double horizon) const
bool fDebugIsOn
configuration flags
Time fPreviousOldRunNumberCheck
variable to track when the statistic were last calculated
int StopRunLogging()
from waiting to logging transition
bool CheckForOfstreamError(ofstream &out, bool isDailyStream)
Checks if the input osftream is in error state, and if so close it.
DimDnsServiceList fDimList
Time lastReceivedEvent
time of the latest received event
Denots that an error occured while writing a daily file (text or fits).
int RunShell(Configuration &conf)
string fFilePath
ofstream for the Nightly report file
Logs all message and infos between the services.
bool DoesPathExist(string path)
Checks if a given path exist.
distributes the number of opened subscriptions and fits files
char fileName[FILENAME_MAX]
set< string > fBlackList
black/white listing
off_t GetFileSize(const string &)
retrieves the size of a file
std::vector< T > Vec(const std::string &var)
void SetupConfiguration(Configuration &conf)
~RunNumberType()
default destructor
void AddService(const Service &svc)
Add a new service subscription.
bool FileOpened(const std::string &fileName)
DimDescribedService * fOpenedNightlyFiles
Service for opened files.
static bool DoesPathExist(std::string path, MessageImp &log)
bool OpenStreamImp(ofstream &stream, const string &filename, bool mightbeopen)
void SetUpdateInterval(const int16_t millisec)
std::ostream & Out() const
distributes which files were opened.
int SetRunTimeoutDelay(const Event &evt)
An info telling something which can be interesting to know.
void CreateFitsGrouping(map< string, vector< string > > &filesToGroup)
creates a group fits file based on a list of files to be grouped
bool fitsBufferAllocated
whether or not the fits buffer was allocated already
vector< Description > GetDescription(const string &server, const string &service)
A struct which stores a name, a unit and a comment.
Just a message, usually obsolete.
uint16_t GetUpdateInterval() const
unsigned int fRunNumberService
pointer to the dim's subscription that should distribute the run numbers.
int SetDebugOnOff(const Event &evt)
int GoToReady()
stop and reset transition
virtual int Write(const Time &time, const std::string &txt, int qos=kMessage)
std::vector< int64_t > previousRunNumbers
bool Has(const std::string &var)
uint32_t fRunNumberTimeout
old run numbers time-out delay (in seconds)
void AddOptions(const po::options_description &opt, bool visible=true)
DimDescribedService * fCurrentSubscription
Service for broadcasting subscription status.
int Write(const Time &time, const std::string &txt, int qos=kMessage)
Redirect our own logging to fLog.
void setData(const void *ptr, size_t sz)
int SetNumSubsAndFitsOnOff(const Event &evt)
virtual Time GetTime() const
virtual std::string GetFormat() const
int Error(const std::string &str)
vector< string > backLogBuffer
virtual int GetQoS() const
void SetCallbackServiceAdd(const callback_svc &cb)
int HandleDescriptions(DimDescriptions *desc)
the two methods below were copied from StateMachineDimControl.cc
Warning because the service this data corrsponds to might have been last updated longer ago than Local time
bool OpenTextFile(ofstream &stream, const string &name)
Subscribe to a given server and service.
int Warn(const std::string &str)
void Report(const EventImp &evt, SubscriptionType &sub)
Reporting method for the services info received.
const Stats & GetTotalSizeWritten() const
static const Time None
A none-time, this can be used as a simple representation of an invalid time.
DimDescribedService * fNumSubAndFits
string fFullNightlyReportFileName
full name of the nightly report file
map< string, vector< Description > > fServiceDescriptionList
map and mutex for storing services description
void AddNewRunNumber(int64_t newRun, Time time)
Open the relevant text files related to a particular run.
Commandline parsing, resource file parsing and database access.
provides a statistics service telling the free space on disk and the total size written so far ...
int64_t GetFileSizeOnDisk(const std::string &file)
Returns the size on disk of a given file.
int BackToNightlyOpen()
from wait for run number to nightly open
void TrimOldRunNumbers()
Check if old run numbers can be trimmed, and if so, do it.
std::vector< std::vector< Description > > descriptions
int fQuality
Current Service Quality.
Denotes that an error occured while writing a run file (text or fits).
int infoCallback(const EventImp &evt, unsigned int infoIndex)
Inherited from DimInfo. Handles all the Infos to which we subscribed, and log them.
SubscriptionsListType fServiceSubscriptions
All the services to which we have subscribed to, sorted by server name.
DimDescribedService * fOpenedRunFiles
string format
the original format string. So that we can check if format is changing over time
int32_t runNumber
the current run number used by this subscription
bool SetCurrentFolder(const std::string &folder)
Configures that current folder where files are written to.
~SubscriptionType()
default destructor
Class for a state machine implementation within a DIM network.
NumSubAndFitsType fNumSubAndFitsData
string CompileFileName(const string &service, const string &extension, const Time &time=Time()) const
Form the file names only.
Concerete implementation of an EventImp stroring name, format, data and time.
shared_ptr< Converter > fConv
the converter for outputting the data according to the format
virtual std::string GetName() const
Error, something unexpected happened, but needs user intervention (i.e. it needs a signal to the user...
Error states should be between 0x100 and 0xffff.
void RemoveService(const string, const string, bool)
Remove a given service subscription.
void SetCallbackDescriptions(const callback_desc &cb)
list< RunNumberType > fRunNumber
run numbers
Nightly file openned and writing.
bool OpenStream(shared_ptr< ofstream > stream, const string &filename)
double fMjD
Modified Julian Date.
int Info(const std::string &str)
map< string, vector< string > > fOpenedNightlyFits
vectors to keep track of opened Fits files, for grouping purposes.
int Message(const std::string &str)
string fFullNightlyLogFileName
full name of the nightly log file
Dim subscription type. Stores all the relevant info to handle a Dim subscription. ...
void AddServer(const string &server)
Add a new server subscription.
po::typed_value< bool > * po_bool(bool def=false)
MessageImp fNightlyLogImp
Log stream to fNightlyLogFile.
int fPreviousRunNumber
previous run number. to check if changed while logging
int NightlyToWaitRun()
from NightlyOpen to waiting transition
std::string GetAsStr(const char *fmt="%Y-%m-%d %H:%M:%S") const
int32_t runNumber
the actual run number
virtual void Subscribe(StateMachineImp &imp)
virtual const void * GetData() const
void SetCallbackServerAdd(const callback_srv &cb)
bool DoParse(int argc, const char **argv, const std::function< void()> &func=std::function< void()>())
A compiler for the DIM data format string.
bool AddStateName(const int state, const std::string &name, const std::string &doc="")
int Start()
start transition
unsigned int servicesCounter
int Write(const Time &time, const std::string &txt, int qos=kMessage)
DataLogger(ostream &out)
Setup the allows states, configs and transitions for the data logger.
int SetStatsPeriod(const Event &evt)
set< string > fGrouping
list of services to be grouped
bool fDailyFileDayChangedAlready
boolean to know whether we should close and reopen daily files or not
RunNumberType()
default constructor
Run number record. Used to keep track of which run numbers are still active.
Time time
the time at which the run number was received
unsigned int increment
counter to know if format has changed during operations
void Subscribe(StateMachineImp &imp)
string service
the service
waiting for the run number to open the run file
string CompileFileNameWithPath(const string &path, const string &service, const string &extension)
Get the digits of year, month and day for filenames and paths.
virtual size_t GetSize() const
Time fLastSubscriptionUpdate
The last time in seconds of the day when the service was update.
std::string SetCurrentState(int state, const char *txt="", const std::string &cmd="")