10 #include "../externals/factfits.h" 11 #include "../externals/ofits.h" 12 #include "../externals/checksum.h" 47 const string& comm) : _key(k),
58 string Trim(
const string &
str,
char c=
' ')
61 const size_t pstart = str.find_first_not_of(c);
62 const size_t pend = str.find_last_not_of(c);
65 if (string::npos==pstart || string::npos==pend)
68 return str.substr(pstart, pend-pstart+1);
75 _fitsString = line.data();
78 _key =
Trim(line.substr(0,8));
80 if (line.substr(8,2)!=
"= ")
83 _comment =
Trim(line.substr(10));
86 string next=line.substr(10);
87 const size_t slash = next.find_first_of(
'/');
89 _comment =
Trim(next.substr(slash+1));
103 const string&
value()
const {
return _value;}
104 const string&
key()
const {
return _key;}
105 const string&
comment()
const {
return _comment;}
137 unsigned int totSize = 0;
140 if (_key.length() > 8)
142 str << _key.substr(0, 8);
148 totSize += _key.length();
152 for (
int i=totSize;
i<8;
i++)
163 if (_value.length() < 20)
164 for (;totSize<30-_value.length();totSize++)
167 if (_value.length() > 70)
169 str << _value.substr(0,70);
175 totSize += _value.size();
185 unsigned int commentSize = 80 - totSize;
186 if (_comment.length() > commentSize)
188 str << _comment.substr(0,commentSize);
189 totSize += commentSize;
194 totSize += _comment.length();
200 for (
int i=totSize;
i<80;
i++)
203 _fitsString = str.str();
206 if (_fitsString.length() != 80)
207 cout <<
"Error |" << _fitsString <<
"| is not of length 80" << endl;
252 vector<uint16_t>& seq) : _name(n),
265 case 'B': _typeSize = 1;
break;
266 case 'I': _typeSize = 2;
break;
268 case 'E': _typeSize = 4;
break;
270 case 'D': _typeSize = 8;
break;
272 cout <<
"Error: typename " << t <<
" missing in the current implementation" << endl;
276 str <<
"data format of field: ";
280 case 'L': str <<
"1-byte BOOL";
break;
281 case 'A': str <<
"1-byte CHAR";
break;
282 case 'B': str <<
"BYTE";
break;
283 case 'I': str <<
"2-byte INTEGER";
break;
284 case 'J': str <<
"4-byte INTEGER";
break;
285 case 'K': str <<
"8-byte INTEGER";
break;
286 case 'E': str <<
"4-byte FLOAT";
break;
287 case 'D': str <<
"8-byte FLOAT";
break;
290 _description = str.str();
293 const string&
name()
const {
return _name;};
294 int width()
const {
return _num*_typeSize;};
299 char type()
const {
return _type;};
345 bool reallocateBuffers();
401 bool changeHeaderKey(
const string& origName,
const string& newName);
404 bool open(
const string& fileName,
const string& tableName=
"Data");
410 bool writeBinaryRow(
const char* bufferToWrite);
412 uint32_t getRowWidth();
415 void setDrsCalib(int16_t*
data);
418 bool setNumWorkingThreads(uint32_t num);
422 uint64_t compressBuffer(uint32_t threadIndex);
425 bool writeCompressedDataToDisk(uint32_t threadID, uint32_t sizeToWrite);
428 void addHeaderChecksum(
Checksum& checksum);
431 void writeHeader(
bool closingFile =
false);
434 void writeCatalog(
bool closingFile=
false);
440 static void* threadFunction(
void* context);
443 void writeDrsCalib();
446 void copyTransposeTile(uint32_t index);
449 uint32_t compressUNCOMPRESSED(
char* dest,
const char* src, uint32_t
numRows, uint32_t sizeOfElems, uint32_t numRowElems);
450 uint32_t compressHUFFMAN(
char* dest,
const char* src, uint32_t
numRows, uint32_t sizeOfElems, uint32_t numRowElems);
451 uint32_t compressSMOOTHMAN(
char* dest,
char* src, uint32_t
numRows, uint32_t sizeOfElems, uint32_t numRowElems);
452 uint32_t applySMOOTHING(
char* dest,
char* src, uint32_t
numRows, uint32_t sizeOfElems, uint32_t numRowElems);
475 if (val.size() > 2 && val[0] ==
'\'')
477 size_t pos = val.find_last_of(
"'");
478 if (pos != string::npos && pos != 0)
479 val = val.substr(1, pos-1);
483 str <<
"'" << val <<
"'";
484 for (
int i=str.str().length();
i<20;
i++)
494 "BITPIX = 8 / number of bits per data pixel " 495 "NAXIS = 0 / number of data axes " 496 "EXTEND = T / FITS dataset may contain extensions " 497 "CHECKSUM= '4AcB48bA4AbA45bA' / Checksum for the whole HDU " 498 "DATASUM = ' 0' / Checksum for the data block " 499 "COMMENT FITS (Flexible Image Transport System) format is defined in 'Astronomy" 500 "COMMENT and Astrophysics', volume 376, page 359; bibcode: 2001A&A...376..359H " 576 uint32_t numRowsPerTile) : _header(),
579 _numRowsPerTile(numRowsPerTile),
582 _headerFlushed(false),
586 _transposedBuffer(1),
587 _compressedBuffer(1),
641 if (
_buffer == NULL)
return false;
693 pthread_mutex_init(&
_mutex, NULL);
701 pthread_mutex_destroy(&
_mutex);
719 if (num < 1 || num > 64)
721 cout <<
"ERROR: num threads must be between 1 and 64. Ignoring" << endl;
730 for (uint32_t
i=0;
i<num;
i++)
746 uint64_t whereDidIStart =
_file.tellp();
747 vector<HeaderEntry> header;
748 header.push_back(
HeaderEntry(
"XTENSION",
"'BINTABLE' ",
"binary table extension"));
749 header.push_back(
HeaderEntry(
"BITPIX" , 8 ,
"8-bit bytes"));
750 header.push_back(
HeaderEntry(
"NAXIS" , 2 ,
"2-dimensional binary table"));
751 header.push_back(
HeaderEntry(
"NAXIS1" , 1024*1440*2 ,
"width of table in bytes"));
752 header.push_back(
HeaderEntry(
"NAXIS2" , 1 ,
"number of rows in table"));
753 header.push_back(
HeaderEntry(
"PCOUNT" , 0 ,
"size of special data area"));
754 header.push_back(
HeaderEntry(
"GCOUNT" , 1 ,
"one data group (required keyword)"));
755 header.push_back(
HeaderEntry(
"TFIELDS" , 1 ,
"number of fields in each row"));
756 header.push_back(
HeaderEntry(
"CHECKSUM",
"'0000000000000000' ",
"Checksum for the whole HDU"));
757 header.push_back(
HeaderEntry(
"DATASUM" ,
" 0" ,
"Checksum for the data block"));
758 header.push_back(
HeaderEntry(
"EXTNAME" ,
"'ZDrsCellOffsets' ",
"name of this binary table extension"));
759 header.push_back(
HeaderEntry(
"TTYPE1" ,
"'OffsetCalibration' ",
"label for field 1"));
760 header.push_back(
HeaderEntry(
"TFORM1" ,
"'1474560I' ",
"data format of field: 2-byte INTEGER"));
762 for (uint32_t
i=0;
i<header.size();
i++)
763 _file.write(header[
i].fitsString().c_str(), 80);
765 _file.write(
"END ", 80);
766 long here =
_file.tellp();
770 int16_t* swappedBytes =
new int16_t[1024];
772 for (int32_t
i=0;
i<1440;
i++)
775 for (int32_t j=0;j<2048;j+=2)
778 inter =
reinterpret_cast<int8_t*
>(swappedBytes)[j];
779 reinterpret_cast<int8_t*
>(swappedBytes)[j] = reinterpret_cast<int8_t*>(swappedBytes)[j+1];
780 reinterpret_cast<int8_t*
>(swappedBytes)[j+1] = inter;
782 _file.write(reinterpret_cast<char*>(swappedBytes), 2048);
783 checksum.
add(reinterpret_cast<char*>(swappedBytes), 2048);
785 uint64_t whereDidIStop =
_file.tellp();
786 delete[] swappedBytes;
791 str << checksum.
val();
792 header[9] =
HeaderEntry(
"DATASUM", str.str(),
"Checksum for the data block");
793 for (vector<HeaderEntry>::iterator it=header.begin();it!=header.end(); it++)
794 checksum.
add(it->fitsString().c_str(), 80);
797 checksum.
add(end.c_str(), 80);
798 int headerRowsLeft = 36 - (header.size() + 1)%36;
799 for (
int i=0;
i<headerRowsLeft;
i++)
800 checksum.
add(space.c_str(), 80);
802 header[8] =
HeaderEntry(
"CHECKSUM", checksum.
str(),
"Checksum for the whole HDU");
804 _file.seekp(whereDidIStart);
805 for (uint32_t
i=0;
i<header.size();
i++)
806 _file.write(header[
i].fitsString().c_str(), 80);
807 _file.seekp(whereDidIStop);
817 cout <<
"Error: cannot add new columns once first row has been written" << endl;
820 for (vector<ColumnEntry>::iterator it=
_columns.begin(); it !=
_columns.end(); it++)
822 if (it->name() == column.
name())
824 cout <<
"Warning: column already exist (" << column.
name() <<
"). Ignoring" << endl;
833 ostringstream
str, str2, str3;
835 str2 << column.
name();
836 str3 <<
"label for field ";
837 if (
_columns.size() < 10) str3 <<
" ";
838 if (
_columns.size() < 100) str3 <<
" ";
846 str3 <<
"data format of field " <<
_columns.size();
853 str3 <<
"Original format of field " <<
_columns.size();
860 str3 <<
"Comp. Scheme of field " <<
_columns.size();
866 for (uint32_t j=0;j<
_catalog[
i].size();j++)
878 for (vector<HeaderEntry>::iterator it=
_header.begin(); it !=
_header.end(); it++)
880 if (it->key() == entry.
key())
891 if (it->key() == entry.
key())
902 cout <<
"Error: new header keys (" << entry.
key() <<
") must be set before the first row is written. Ignoring." << endl;
912 for (vector<HeaderEntry>::iterator it=
_header.begin(); it !=
_header.end(); it++)
914 if (it->key() == origName)
916 (*it) =
HeaderEntry(newName, it->value(), it->comment());
923 if (it->key() == origName)
925 (*it) =
HeaderEntry(newName, it->value(), it->comment());
937 _file.open(fileName.c_str(), ios_base::out);
938 if (!
_file.is_open())
940 cout <<
"Error: Could not open the file (" << fileName <<
")." << endl;
954 return (
_file.good());
964 if (!
_file.is_open())
967 long cPos =
_file.tellp();
978 _file.write(it->fitsString().c_str(), 80);
980 for (vector<HeaderEntry>::iterator it=
_header.begin(); it !=
_header.end(); it++)
981 _file.write(it->fitsString().c_str(), 80);
983 _file.write(
"END ", 80);
984 long here =
_file.tellp();
990 here =
_file.tellp();
993 cout <<
"Error: seems that header did not finish at the end of a block." << endl;
995 if (here > cPos && cPos != 0)
997 cout <<
"Error, entries were added after the first row was written. This is not supposed to happen." << endl;
1001 here =
_file.tellp();
1004 here =
_file.tellp() - here;
1018 uint32_t sizeWritten = 0;
1021 for (uint32_t j=0;j<
_catalog[
i].size();j++)
1024 int8_t swappedEntry[16];
1025 swappedEntry[0] =
reinterpret_cast<int8_t*
>(&
_catalog[
i][j].first)[7];
1026 swappedEntry[1] =
reinterpret_cast<int8_t*
>(&
_catalog[
i][j].first)[6];
1027 swappedEntry[2] =
reinterpret_cast<int8_t*
>(&
_catalog[
i][j].first)[5];
1028 swappedEntry[3] =
reinterpret_cast<int8_t*
>(&
_catalog[
i][j].first)[4];
1029 swappedEntry[4] =
reinterpret_cast<int8_t*
>(&
_catalog[
i][j].first)[3];
1030 swappedEntry[5] =
reinterpret_cast<int8_t*
>(&
_catalog[
i][j].first)[2];
1031 swappedEntry[6] =
reinterpret_cast<int8_t*
>(&
_catalog[
i][j].first)[1];
1032 swappedEntry[7] =
reinterpret_cast<int8_t*
>(&
_catalog[
i][j].first)[0];
1034 swappedEntry[8] =
reinterpret_cast<int8_t*
>(&
_catalog[
i][j].second)[7];
1035 swappedEntry[9] =
reinterpret_cast<int8_t*
>(&
_catalog[
i][j].second)[6];
1036 swappedEntry[10] =
reinterpret_cast<int8_t*
>(&
_catalog[
i][j].second)[5];
1037 swappedEntry[11] =
reinterpret_cast<int8_t*
>(&
_catalog[
i][j].second)[4];
1038 swappedEntry[12] =
reinterpret_cast<int8_t*
>(&
_catalog[
i][j].second)[3];
1039 swappedEntry[13] =
reinterpret_cast<int8_t*
>(&
_catalog[
i][j].second)[2];
1040 swappedEntry[14] =
reinterpret_cast<int8_t*
>(&
_catalog[
i][j].second)[1];
1041 swappedEntry[15] =
reinterpret_cast<int8_t*
>(&
_catalog[
i][j].second)[0];
1044 _checksum.
add(reinterpret_cast<char*>(swappedEntry), 16);
1046 _file.write(reinterpret_cast<char*>(&swappedEntry[0]), 2*
sizeof(int64_t));
1047 sizeWritten += 2*
sizeof(int64_t);
1056 if (sizeWritten % 2880 != 0)
1058 vector<char> nullVec(2880 - sizeWritten%2880, 0);
1059 _file.write(nullVec.data(), 2880 - sizeWritten%2880);
1070 for (vector<HeaderEntry>::iterator it=
_header.begin(); it !=
_header.end(); it++)
1074 checksum.
add(end.c_str(), 80);
1076 for (
int i=0;
i<headerRowsLeft;
i++)
1077 checksum.
add(space.c_str(), 80);
1114 int64_t heapSize = 0;
1115 int64_t compressedOffset = 0;
1116 for (uint32_t i=0;i<
_catalog.size();i++)
1120 for (uint32_t j=0;j<
_catalog[
i].size();j++)
1125 _catalog[
i][j].second = compressedOffset;
1126 compressedOffset +=
_catalog[
i][j].first;
1154 long here =
_file.tellp();
1157 vector<char> nullVec(2880 - here%2880, 0);
1158 _file.write(nullVec.data(), 2880 - here%2880);
1172 uint32_t offset = 0;
1175 switch (
_columns[
i].getColumnOrdering())
1178 for (uint32_t k=0;k<thisRoundNumRows;k++)
1186 for (
int j=0;j<
_columns[
i].numElems();j++)
1187 for (uint32_t k=0;k<thisRoundNumRows;k++)
1194 cout <<
"Error: unknown column ordering: " <<
_columns[
i].getColumnOrdering() << endl;
1225 return _file.good();
1238 memcpy(dest, src, numRows*sizeOfElems*numRowElems);
1239 return numRows*sizeOfElems*numRowElems;
1247 string huffmanOutput;
1248 uint32_t previousHuffmanSize = 0;
1251 return numRows*sizeOfElems*numRowElems + 1000;
1253 if (sizeOfElems < 2 )
1255 cout <<
"Fatal ERROR: HUFMANN can only encode short or longer types" << endl;
1258 uint32_t huffmanOffset = 0;
1259 for (uint32_t j=0;j<numRowElems;j++)
1262 reinterpret_cast<const uint16_t*>(&src[j*sizeOfElems*numRows]),
1263 numRows*(sizeOfElems/2));
1264 reinterpret_cast<uint32_t*
>(&dest[huffmanOffset])[0] = huffmanOutput.size() - previousHuffmanSize;
1265 huffmanOffset +=
sizeof(uint32_t);
1266 previousHuffmanSize = huffmanOutput.size();
1268 const size_t totalSize = huffmanOutput.size() + huffmanOffset;
1271 if (totalSize < numRows*sizeOfElems*numRowElems)
1272 memcpy(&dest[huffmanOffset], huffmanOutput.data(), huffmanOutput.size());
1282 uint32_t colWidth = numRowElems;
1283 for (
int j=colWidth*numRows-1;j>1;j--)
1284 reinterpret_cast<int16_t*>(src)[j] =
reinterpret_cast<int16_t*
>(src)[j] - (reinterpret_cast<int16_t*>(src)[j-1]+
reinterpret_cast<int16_t*
>(src)[j-2])/2;
1291 uint32_t colWidth = numRowElems;
1292 for (
int j=colWidth*numRows-1;j>1;j--)
1293 reinterpret_cast<int16_t*>(src)[j] =
reinterpret_cast<int16_t*
>(src)[j] - (reinterpret_cast<int16_t*>(src)[j-1]+
reinterpret_cast<int16_t*
>(src)[j-2])/2;
1295 return numRows*sizeOfElems*numRowElems;
1305 uint64_t compressedOffset =
sizeof(
TileHeader);
1310 _catalog[currentCatalogRow][
i].second = compressedOffset;
1312 if (
_columns[
i].numElems() == 0)
continue;
1315 const vector<uint16_t>& sequence =
_columns[
i].getCompressionSequence();
1317 uint64_t previousOffset = compressedOffset;
1319 compressedOffset +=
sizeof(
BlockHeader) +
sizeof(uint16_t)*sequence.size();
1321 for (uint32_t j=0;j<sequence.size(); j++)
1323 switch (sequence[j])
1339 cout <<
"ERROR: Unkown compression sequence entry: " << sequence[
i] << endl;
1346 compressedOffset - previousOffset >
_columns[
i].sizeOfElems()*
_columns[
i].numElems()*thisRoundNumRows+
sizeof(
BlockHeader)+
sizeof(uint16_t)*sequence.size())
1348 cout <<
"REDOING UNCOMPRESSED" << endl;
1349 compressedOffset = previousOffset +
sizeof(
BlockHeader) + 1;
1352 he.size = compressedOffset - previousOffset;
1358 _catalog[currentCatalogRow][
i].first = compressedOffset -
_catalog[currentCatalogRow][
i].second;
1361 head.size = compressedOffset - previousOffset;
1366 _catalog[currentCatalogRow][
i].first = compressedOffset -
_catalog[currentCatalogRow][
i].second;
1369 TileHeader tHead(thisRoundNumRows, compressedOffset);
1371 return compressedOffset;
1380 int32_t extraBytes = 0;
1381 uint32_t sizeToChecksum = sizeToWrite;
1388 if (sizeToChecksum%4 != 0)
1390 extraBytes = 4 - (sizeToChecksum%4);
1391 memset(checkSumPointer+sizeToChecksum, 0,extraBytes);
1392 sizeToChecksum += extraBytes;
1400 return _file.good();
1411 pthread_mutex_lock(&(myself->
_mutex));
1413 pthread_mutex_unlock(&(myself->
_mutex));
1414 uint32_t threadToWaitForBeforeWriting = (myID == 0) ? myself->
_numThreads-1 : myID-1;
1426 while (myself->
_threadIndex != threadToWaitForBeforeWriting)
1429 pthread_mutex_lock(&(myself->
_mutex));
1432 pthread_mutex_unlock(&(myself->
_mutex));
1444 cout <<
"The FACT-Fits compressor reads an input Fits file from FACT" 1445 " and compresses it.\n It can use a drs calibration in order to" 1446 " improve the compression ratio. If so, the input calibration" 1447 " is embedded into the compressed file.\n" 1448 " By default, the Data column will be compressed using SMOOTHMAN (Thomas' algorithm)" 1449 " while other columns will be compressed with the AMPLITUDE coding (Veritas)" 1450 "Usage: Compressed_Fits_Test <inputFile>";
1460 cout <<
"The inputFile is required. It must have fits in its filename and the compressed file will be written in the same folder. " 1461 "The fz extension will be added, replacing the .gz one if required \n" 1462 "If output is specified, then it will replace the automatically generated output filename\n" 1463 "If --drs, followed by a drs calib then it will be applied to the data before compressing\n" 1464 "rowPerTile can be used to tune how many rows are in each tile. Default is 100\n" 1465 "threads gives the number of threads to use. Cannot be less than the default (1)\n" 1466 "compression explicitely gives the compression scheme to use for a given column. The syntax is:\n" 1467 "<ColumnName>=<CompressionScheme> with <CompressionScheme> one of the following:\n" 1474 "--quiet removes any textual output, except error messages\n" 1475 "--verify makes the compressor check the compressed data. It will read it back, and compare the reconstructed CHECKSUM and DATASUM with the original file values." 1477 cout << endl << endl;
1485 po::options_description configs(
"FitsCompressor options");
1486 configs.add_options()
1487 (
"inputFile,i", vars<string>(),
"Input file")
1488 (
"drs,d", var<string>(),
"Input drs calibration file")
1489 (
"rowPerTile,r", var<unsigned int>(),
"Number of rows per tile. Default is 100")
1490 (
"output,o", var<string>(),
"Output file. If empty, .fz is appened to the original name")
1491 (
"threads,t", var<unsigned int>(),
"Number of threads to use for compression")
1492 (
"compression,c", vars<string>(),
"which compression to use for which column. Syntax <colName>=<compressionScheme>")
1493 (
"quiet,q",
po_switch(),
"Should the program display any text at all ?")
1494 (
"verify,v",
po_switch(),
"Should we verify the data that has been compressed ?")
1496 po::positional_options_description positional;
1497 positional.add(
"inputFile", -1);
1505 int main(
int argc,
const char** argv)
1515 string fileNameIn =
"";
1516 string fileNameOut =
"";
1517 string drsFileName =
"";
1518 uint32_t numRowsPerTile = 100;
1519 bool displayText=
true;
1522 if (conf.
Get<
bool>(
"quiet")) displayText =
false;
1523 const vector<string> inputFileNameVec = conf.
Vec<
string>(
"inputFile");
1524 if (inputFileNameVec.size() != 1)
1527 if (inputFileNameVec.size() == 0) cout <<
"no";
1528 else cout << inputFileNameVec.size();
1529 cout <<
" input file(s) given. Expected one. Aborting. Input:" << endl;;
1530 for (
unsigned int i=0;
i<inputFileNameVec.size();
i++)
1531 cout << inputFileNameVec[
i] << endl;
1536 fileNameIn = inputFileNameVec[0];
1539 if (conf.
Has(
"drs")) drsFileName = conf.
Get<
string>(
"drs");
1542 bool verifyDataPlease =
false;
1543 if (conf.
Has(
"verify")) verifyDataPlease = conf.
Get<
bool>(
"verify");
1547 if (conf.
Has(
"output"))
1548 fileNameOut = conf.
Get<
string>(
"output");
1551 size_t pos = fileNameIn.find(
".fits");
1552 if (pos == string::npos)
1554 cout <<
"ERROR: input file does not seems ot be fits. Aborting." << endl;
1557 fileNameOut = fileNameIn +
".fz";
1562 const vector<string> columnsCompression = conf.
Vec<
string>(
"compression");
1565 vector<std::pair<string, string>> compressions;
1566 for (
unsigned int i=0;
i<columnsCompression.size();
i++)
1568 size_t pos = columnsCompression[
i].find_first_of(
"=");
1569 if (pos == string::npos)
1571 cout <<
"ERROR: Something wrong occured while parsing " << columnsCompression[
i] <<
". Aborting." << endl;
1574 string comp = columnsCompression[
i].substr(pos+1);
1575 if (comp !=
"UNCOMPRESSED" && comp !=
"AMPLITUDE" && comp !=
"HUFFMAN" &&
1576 comp !=
"SMOOTHMAN" && comp !=
"INT_WAVELET")
1578 cout <<
"Unkown compression scheme requested (" << comp <<
"). Aborting." << endl;
1581 compressions.push_back(make_pair(columnsCompression[
i].substr(0, pos), comp));
1585 if (conf.
Has(
"rowPerTile")) numRowsPerTile = conf.
Get<
unsigned int>(
"rowPerTile");
1596 cout <<
"ERROR: input file is already a compressed fits. Cannot be compressed again: Aborting." << endl;
1601 uint32_t originalNumRows = inFile.
GetNumRows();
1602 uint32_t numTiles = (originalNumRows%numRowsPerTile) ? (originalNumRows/numRowsPerTile)+1 : originalNumRows/numRowsPerTile;
1606 unsigned int numThreads = 1;
1607 if (conf.
Has(
"threads"))
1609 numThreads = conf.
Get<
unsigned int>(
"threads");
1613 if (!outFile.
open(fileNameOut))
1615 cout <<
"Error: could not open " << fileNameOut <<
" for writing" << endl;
1622 if (drsFileName !=
"")
1626 drsFile =
new factfits(drsFileName);
1630 cout <<
"Error: could not open " << drsFileName <<
" for calibration" << endl;
1638 cout <<
"**********************" << endl;
1639 cout <<
"Will compress from : " << fileNameIn << endl;
1640 cout <<
"to : " << fileNameOut << endl;
1641 if (drsFileName !=
"")
1642 cout <<
"while calibrating with: " << drsFileName << endl;
1643 cout <<
"Compression will use : " << numThreads <<
" worker threads" << endl;
1644 cout <<
"Data will be verified : ";
1645 if (verifyDataPlease)
1646 cout <<
"yes" << endl;
1648 cout <<
"no (WARNING !)" << endl;
1649 cout <<
"**********************" << endl;
1658 uint32_t rowWidth = inFile.
GetUInt(
"NAXIS1");
1659 char*
buffer =
new char[rowWidth + 12];
1660 memset(buffer, 0, 4);
1667 cout <<
"Input file has " << columns.size() <<
" columns and " << inFile.
GetNumRows() <<
" rows" << endl;
1670 uint32_t totalRowWidth = 0;
1671 for (uint32_t
i=0;
i<sortedColumns.size();
i++)
1675 str <<
"TTYPE" <<
i+1;
1676 string colName = inFile.
GetStr(str.str());
1679 cout <<
"Column " << colName;
1680 for (uint32_t j=colName.size();j<21;j++)
1688 vector<uint16_t> rawProcessings(1);
1690 vector<uint16_t> smoothmanProcessings(2);
1695 totalRowWidth += sortedColumns[
i].bytes;
1698 bool explicitRequest =
false;
1699 for (
unsigned int j=0;j<compressions.size();j++)
1701 if (compressions[j].
first == colName)
1703 explicitRequest =
true;
1704 if (displayText) cout << compressions[j].second << endl;
1705 if (compressions[j].
second ==
"UNCOMPRESSED")
1707 if (compressions[j].
second ==
"SMOOTHMAN")
1713 if (explicitRequest)
continue;
1715 if (colName !=
"Data")
1717 if (displayText) cout <<
"UNCOMPRESSED" << endl;
1722 if (displayText) cout <<
"SMOOTHMAN" << endl;
1729 for (fits::Table::Keys::const_iterator
i=header.begin();
i!= header.end();
i++)
1731 string k =
i->first;
1732 if (k ==
"XTENSION" || k ==
"BITPIX" || k ==
"PCOUNT" || k ==
"GCOUNT" || k ==
"TFIELDS")
1734 if (k ==
"CHECKSUM")
1745 if (k ==
"TTYPE" || k ==
"TFORM")
1747 string tmpKey =
i->second.fitsString;
1761 int16_t* drsCalib16 = NULL;
1764 int32_t startCellOffset = -1;
1765 if (drsFileName !=
"")
1767 drsCalib16 =
new int16_t[1440*1024];
1768 float* drsCalibFloat = NULL;
1771 drsCalibFloat =
reinterpret_cast<float*
>(drsFile->
SetPtrAddress(
"BaselineMean"));
1775 cout <<
"Could not find column BaselineMean in drs calibration file " << drsFileName <<
". Aborting" << endl;
1781 for (uint32_t
i=0;
i<1440*1024;
i++)
1782 drsCalib16[
i] = (int16_t)(drsCalibFloat[
i]*4096.f/2000.f);
1786 for (fits::Table::Columns::const_iterator it=columns.begin(); it!= columns.end(); it++)
1787 if (it->first ==
"StartCellData")
1789 startCellOffset = it->second.offset;
1790 if (it->second.type !=
'I')
1792 cout <<
"Wrong type for the StartCellData Column: " << it->second.type <<
" instead of I expected"<< endl;
1797 if (startCellOffset == -1)
1799 cout <<
"WARNING: Could not find StartCellData in input file " << fileNameIn <<
". Doing it uncalibrated"<< endl;
1812 if (displayText) cout <<
"Converting file..." << endl;
1815 int32_t dataOffset = -1;
1818 for (fits::Table::Columns::const_iterator it=columns.begin(); it!= columns.end(); it++)
1819 if (it->first ==
"Data")
1821 numSlices = it->second.num;
1822 dataOffset = it->second.offset;
1824 if (numSlices % 1440 != 0)
1826 cout <<
"seems like the number of samples is not a multiple of 1440. Aborting." << endl;
1829 if (dataOffset == -1)
1831 cout <<
"Could not find the column Data in the input file. Aborting." << endl;
1838 vector<void*> readPointers;
1839 vector<int32_t> readOffsets;
1840 vector<int32_t> readElemSize;
1841 vector<int32_t> readNumElems;
1842 for (fits::Table::Columns::const_iterator it=columns.begin(); it!= columns.end(); it++)
1845 readOffsets.push_back(it->second.offset);
1846 readElemSize.push_back(it->second.size);
1847 readNumElems.push_back(it->second.num);
1852 ostringstream wrongChannels;
1853 map<int, int> wrongChannelsMap;
1854 map<int, int> wrongChannelsLastValues;
1855 for (uint32_t
i=0;
i<1440;
i++)
1857 wrongChannelsMap[
i] = 0;
1858 wrongChannelsLastValues[
i] = 0;
1862 if (displayText) cout <<
"\r Row " <<
i+1 << flush;
1865 cout <<
"ERROR: file has less rows than advertized. aborting" << endl;
1870 for (fits::Table::Columns::const_iterator it=columns.begin(); it!= columns.end();it++)
1872 memcpy(&buffer[readOffsets[count]], readPointers[count], readElemSize[count]*readNumElems[count]);
1876 char* checkSumPointer =
buffer;
1877 const int chkOffset = (
i*totalRowWidth)%4;
1878 checkSumPointer -= chkOffset;
1879 uint32_t sizeToChecksum = totalRowWidth + chkOffset;
1880 if (sizeToChecksum%4 != 0)
1882 int32_t extraBytes = 4 - (sizeToChecksum%4);
1883 memset(checkSumPointer+sizeToChecksum, 0, extraBytes);
1884 sizeToChecksum += extraBytes;
1886 rawsum.
add(checkSumPointer, sizeToChecksum,
false);
1888 if (startCellOffset != -1)
1890 for (
int j=0;j<1440;j++)
1892 const int thisStartCell =
reinterpret_cast<int16_t*
>(&buffer[startCellOffset])[j];
1893 if (thisStartCell > 1023)
1895 wrongChannelsMap[j]++;
1896 wrongChannelsLastValues[j] = thisStartCell;
1899 if (thisStartCell < 0)
continue;
1900 for (
int k=0;k<numSlices;k++)
1901 reinterpret_cast<int16_t*>(&buffer[dataOffset])[numSlices*j + k] -= drsCalib16[1024*j + (thisStartCell+k)%1024];
1907 if (wrongChannels.str() !=
"")
1909 cout <<
"ERROR: Wrong channels: ";
1910 for (uint32_t
i=0;
i<1440;
i++)
1912 if (wrongChannelsMap[
i] != 0)
1913 cout <<
i <<
"(" << wrongChannelsMap[
i] <<
"|" << wrongChannelsLastValues[
i] <<
") ";
1918 ostringstream strSum;
1919 strSum << rawsum.
val();
1923 string tableName = inFile.
GetStr(
"EXTNAME");
1925 if (displayText) cout << endl <<
"Done. Flushing output file..." << endl;
1928 if (!outFile.
close())
1930 cout <<
"Something went wrong while writing the catalog: negative index" << endl;
1934 if (drsCalib16 != NULL)
1935 delete[] drsCalib16;
1937 if (displayText) cout <<
"Done." << endl;
1943 if (verifyDataPlease)
1945 if (displayText) cout <<
"Now verify data..." << endl;
1954 string copyName(
"");
1955 factfits verifyFile(fileNameOut, copyName, tableName,
false);
1961 ofits reconstructedFile;
1964 string reconstructedName = fileNameOut+
".recons";
1965 reconstructedName =
"/dev/null";
1966 reconstructedFile.
open(reconstructedName.c_str(),
false);
1969 string origChecksumStr;
1980 for (fits::Table::Keys::const_iterator it=header2.begin(); it!= header2.end(); it++)
1982 string k = it->first;
1983 if (k ==
"XTENSION" || k ==
"BITPIX" || k ==
"PCOUNT" || k ==
"GCOUNT" ||
1984 k ==
"TFIELDS" || k ==
"ZTABLE" || k ==
"ZNAXIS1" || k ==
"ZNAXIS2" ||
1985 k ==
"ZHEAPPTR" || k ==
"ZPCOUNT" || k ==
"ZTILELEN" || k ==
"THEAP" ||
1986 k ==
"CHECKSUM" || k ==
"DATASUM" || k ==
"FCTCPVER" || k ==
"ZHEAP")
1993 reconstructedFile.
SetKeyComment(
"CHECKSUM", it->second.comment);
1994 origChecksumStr = it->second.value;
2004 reconstructedFile.
SetKeyComment(
"DATASUM", it->second.comment);
2005 origDatasum = it->second.value;
2011 tableName = it->second.value;
2022 if (k ==
"TFORM" || k ==
"NAXIS" || k ==
"ZCTYP" )
2027 if (k ==
"ZFORM" || k ==
"ZTYPE")
2029 string tmpKey = it->second.fitsString;
2031 reconstructedFile.SetKeyFromFitsString(tmpKey);
2035 reconstructedFile.SetKeyFromFitsString(it->second.fitsString);
2038 if (tableName ==
"")
2040 cout <<
"Error: table name from file " << fileNameOut <<
" could not be found. Aborting" << endl;
2045 for (uint32_t numCol=1; numCol<10000; numCol++)
2049 if (!verifyFile.
HasKey(
"TTYPE"+str.str()))
break;
2051 string ttype = verifyFile.
GetStr(
"TTYPE"+str.str());
2052 string tform = verifyFile.
GetStr(
"ZFORM"+str.str());
2053 char type = tform[tform.size()-1];
2054 string number = tform.substr(0, tform.size()-1);
2055 int numElems = atoi(number.c_str());
2057 if (number ==
"") numElems=1;
2059 reconstructedFile.
AddColumn(numElems, type, ttype,
"",
"",
false);
2069 readPointers.clear();
2070 readOffsets.clear();
2071 readElemSize.clear();
2072 readNumElems.clear();
2073 for (fits::Table::Columns::const_iterator it=columns.begin(); it!= columns.end(); it++)
2075 readPointers.push_back(verifyFile.
SetPtrAddress(it->first));
2076 readOffsets.push_back(it->second.offset);
2077 readElemSize.push_back(it->second.size);
2078 readNumElems.push_back(it->second.num);
2086 for (fits::Table::Columns::const_iterator it=columns.begin(); it!= columns.end();it++)
2088 memcpy(&buffer[readOffsets[count]], readPointers[count], readElemSize[count]*readNumElems[count]);
2091 if (displayText) cout <<
"\r Row " << i << flush;
2092 reconstructedFile.
WriteRow(buffer, rowWidth);
2096 if (displayText) cout << endl;
2102 cout <<
"ERROR: file checksums seems wrong" << endl;
2104 if (!reconstructedFile.
close())
2106 cout <<
"ERROR: disk probably full..." << endl;
2111 std::pair<string, int> origChecksum = make_pair(origChecksumStr, atoi(origDatasum.c_str()));
2112 std::pair<string, int> newChecksum = reconstructedFile.
GetChecksumData();
2115 if (origChecksum.second != newChecksum.second)
2117 cout <<
"ERROR: datasums are NOT identical: " << (uint32_t)(origChecksum.second) <<
" vs " << (uint32_t)(newChecksum.second) << endl;
2120 if (origChecksum.first != newChecksum.first)
2122 cout <<
"WARNING: checksums are NOT Identical: " << origChecksum.first <<
" vs " << newChecksum.first << endl;
2126 if (
true) cout <<
"Ok" << endl;
static string _fitsHeader
the default header to be written in every fits file
uint32_t compressSMOOTHMAN(char *dest, char *src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
BlockHeader & getBlockHeader()
std::vector< Column > SortedColumns
bool HasKey(const std::string &key) const
std::string GetStr(const std::string &key) const
void copyTransposeTile(uint32_t index)
Copy and transpose (or semi-transpose) one tile of data.
void * SetPtrAddress(const std::string &name)
static const uint32_t _THREAD_DECOMPRESS_
Thread working, decompressing.
bool IsCompressedFITS() const
std::map< std::string, Column > Columns
int32_t _checkOffset
offset to the data pointer to calculate the checksum
static HeaderEntry _dummyHeaderEntry
Dummy entry for returning if requested on is not found.
string _name
name of the column
CatalogType _catalog
Catalog, i.e. the main table that points to the compressed data.
uint64_t GetUInt(const std::string &key) const
std::pair< std::string, int > GetChecksumData()
CompressedFitsWriter(uint32_t numTiles=100, uint32_t numRowsPerTile=100)
Default constructor. 100 tiles of 100 rows each are assigned by default.
int64_t second
offset of this column in the tile, from the start of the heap area
string getDescription() const
void SetPrintUsage(const std::function< void(void)> &func)
T Get(const std::string &var)
bool SetKeyComment(const std::string &key, const std::string &comment)
static void * threadFunction(void *context)
the main function compressing the data
uint32_t _totalNumRows
Total number of raws.
uint32_t _numThreads
The number of threads that will be used to compress.
uint32_t _numTiles
Number of tiles (i.e. groups of rows)
static const uint32_t _THREAD_WRITE_
Thread writing data to disk.
po::typed_value< bool > * po_switch()
vector< uint32_t > _threadNumRows
Total number of rows for thread to compress.
bool changeHeaderKey(const string &origName, const string &newName)
vector< uint32_t > _threadStatus
Flag telling whether the buffer to be transposed (and compressed) is full or empty.
uint32_t compressUNCOMPRESSED(char *dest, const char *src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
Specific compression functions.
size_t GetNumRows() const
virtual bool AddColumn(uint32_t cnt, char typechar, const std::string &name, const std::string &unit, const std::string &comment="", bool addHeaderKeys=true)
string _description
a description for the column. It will be placed in the header
std::vector< T > Vec(const std::string &var)
void addHeaderChecksum(Checksum &checksum)
add the header checksum to the datasum
void writeCatalog(bool closingFile=false)
write the compressed data catalog. If closingFile is set to true, checksum is calculated ...
void SetArgumentPositions(const po::positional_options_description &desc)
int64_t first
Size of this column in the tile.
BlockHeader(uint64_t s=0, char o=kOrderByRow, unsigned char n=1)
int main(int argc, const char **argv)
string getCompressionString() const
bool close()
close the opened file
uint64_t _heapPtr
the address in the file of the heap area
bool Encode(std::string &bufout, const uint16_t *bufin, size_t bufinlen)
bool GetNextRow(bool check=true)
ColumnEntry(const string &n, char t, int numOf, BlockHeader &head, vector< uint16_t > &seq)
const vector< uint16_t > & getCompressionSequence() const
int16_t * _drsCalibData
array of the Drs baseline mean
static string _emptyBlock
an empty block to be apened at the end of a file so that its length is a multiple of 2880 ...
const Table::Keys & GetKeys() const
bool Has(const std::string &var)
pair< int64_t, int64_t > CatalogEntry
int32_t _threadLooper
Which thread will deal with the upcoming bunch of data ?
virtual void open(const char *filename, bool addEXTNAMEKey=true)
vector< char * > _transposedBuffer
Memory buffer to store rows while they are transposed.
char _type
the type of the column, as specified by the fits documentation
static const uint32_t _THREAD_READ_
Thread reading data from disk.
CompressedFitsFile(uint32_t numTiles=100, uint32_t numRowsPerTile=100)
default constructor. Assigns a default number of rows and tiles
void AddOptions(const po::options_description &opt, bool visible=true)
static const uint32_t _THREAD_WAIT_
Thread doing nothing.
vector< uint16_t > _compSequence
std::map< std::string, Entry > Keys
const char & getColumnOrdering() const
int _typeSize
the number of bytes taken by one element
fstream _file
The actual file streamer for accessing disk data.
uint32_t _rowWidth
Total number of bytes in one row.
bool writeBinaryRow(const char *bufferToWrite)
write one row of data, already placed in bufferToWrite. Does the byte-swapping
vector< HeaderEntry > _header
Header keys.
std::string str(bool complm=true) const
char * _buffer
Memory buffer to store rows while they are not compressed.
int _offset
the offset of the column, in bytes, from the beginning of one row
virtual ~CompressedFitsFile()
default destructor
uint64_t compressBuffer(uint32_t threadIndex)
compresses one buffer of data, the one given by threadIndex
bool _headerFlushed
Flag telling whether the header record is synchronized with the data on disk.
Commandline parsing, resource file parsing and database access.
static const uint32_t _THREAD_EXIT_
Thread exiting.
virtual bool IsFileOk() const
void writeHeader(bool closingFile=false)
write the header. If closingFile is set to true, checksum is calculated
vector< ColumnEntry > _columns
Columns in the file.
uint32_t compressHUFFMAN(char *dest, const char *src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
pthread_mutex_t _mutex
mutex for compressing threads
virtual ~CompressedFitsWriter()
default destructor
void writeDrsCalib()
Write the drs calibration to disk, if any.
uint32_t applySMOOTHING(char *dest, char *src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
bool writeCompressedDataToDisk(uint32_t threadID, uint32_t sizeToWrite)
writes an already compressed buffer to disk
const Table::SortedColumns & GetSortedColumns() const
bool reallocateBuffers()
protected function to allocate the intermediate buffers
Checksum _checksum
Checksum for asserting the consistency of the data.
const Table::Columns & GetColumns() const
vector< HeaderEntry > & getHeaderEntries()
get the header of the file
vector< HeaderEntry > _defaultHeader
FIXME this was a bad idea. Move everything to the regular header.
void setupConfiguration(Configuration &conf)
const string & name() const
virtual bool WriteTableHeader(const char *name="DATA")
bool addColumn(const ColumnEntry &column)
add one column to the file
bool add(const char *buf, size_t len, bool big_endian=true)
vector< CatalogRow > CatalogType
vector< CatalogEntry > CatalogRow
bool setNumWorkingThreads(uint32_t num)
set the number of worker threads compressing the data.
bool DoParse(int argc, const char **argv, const std::function< void()> &func=std::function< void()>())
uint32_t _numRowsPerTile
Number of rows per tile.
bool setHeaderKey(const HeaderEntry &)
sets a given header key
uint32_t _threadIndex
A variable to assign threads indices.
virtual bool WriteRow(const void *ptr, size_t cnt, bool byte_swap=true)
static const uint32_t _THREAD_COMPRESS_
Thread working, compressing.
bool open(const string &fileName, const string &tableName="Data")
open a new fits file
vector< pthread_t > _thread
The thread handler of the compressor.
int _num
number of elements contained in one row of this column
vector< char * > _compressedBuffer
Memory buffer to store rows while they are compressed.
void setDrsCalib(int16_t *data)
assign a given (already loaded) drs calibration