FACT++  1.0
fitsCompressor.cc
Go to the documentation of this file.
1 /*
2  * fitsCompressor.cc
3  *
4  * Created on: May 7, 2013
5  * Author: lyard
6  */
7 
8 
9 #include "Configuration.h"
10 #include "../externals/factfits.h"
11 #include "../externals/ofits.h"
12 #include "../externals/checksum.h"
13 
14 #include <map>
15 #include <fstream>
16 #include <sstream>
17 #include <iostream>
18 
19 
20 using namespace std;
21 
23 {
24 public:
26  {
27  public:
31  HeaderEntry(): _key(""),
32  _value(""),
33  _comment(""),
34  _fitsString("")
35  {
36  }
37 
44  template<typename T>
45  HeaderEntry(const string& k,
46  const T& val,
47  const string& comm) : _key(k),
48  _value(""),
49  _comment(comm),
50  _fitsString("")
51  {
52  setValue(val);
53  }
54 
58  string Trim(const string &str, char c=' ')
59  {
60  // Trim Both leading and trailing spaces
61  const size_t pstart = str.find_first_not_of(c); // Find the first character position after excluding leading blank spaces
62  const size_t pend = str.find_last_not_of(c); // Find the first character position from reverse af
63 
64  // if all spaces or empty return an empty string
65  if (string::npos==pstart || string::npos==pend)
66  return string();
67 
68  return str.substr(pstart, pend-pstart+1);
69  }
73  HeaderEntry(const string& line)
74  {
75  _fitsString = line.data();
76 
77  //parse the line
78  _key = Trim(line.substr(0,8));
79  //COMMENT and/or HISTORY values
80  if (line.substr(8,2)!= "= ")
81  {
82  _value = "";
83  _comment = Trim(line.substr(10));
84  return;
85  }
86  string next=line.substr(10);
87  const size_t slash = next.find_first_of('/');
88  _value = Trim(Trim(Trim(next.substr(0, slash)), '\''));
89  _comment = Trim(next.substr(slash+1));
90  }
94  HeaderEntry(const vector<char>& lineVec)
95  {
96  HeaderEntry(string(lineVec.data()));
97  }
101  virtual ~HeaderEntry(){}
102 
103  const string& value() const {return _value;}
104  const string& key() const {return _key;}
105  const string& comment() const {return _comment;}
106  const string& fitsString() const {return _fitsString;}
107 
113  template<typename T>
114  void setValue(const T& val)
115  {
116  ostringstream str;
117  str << val;
118  _value = str.str();
119  buildFitsString();
120  };
124  void setComment(const string& comm)
125  {
126  _comment = comm;
127  buildFitsString();
128  }
129 
130  private:
135  {
136  ostringstream str;
137  unsigned int totSize = 0;
138 
139  // Tuncate the key if required
140  if (_key.length() > 8)
141  {
142  str << _key.substr(0, 8);
143  totSize += 8;
144  }
145  else
146  {
147  str << _key;
148  totSize += _key.length();
149  }
150 
151  // Append space if key is less than 8 chars long
152  for (int i=totSize; i<8;i++)
153  {
154  str << " ";
155  totSize++;
156  }
157 
158  // Add separator
159  str << "= ";
160  totSize += 2;
161 
162  // Format value
163  if (_value.length() < 20)
164  for (;totSize<30-_value.length();totSize++)
165  str << " ";
166 
167  if (_value.length() > 70)
168  {
169  str << _value.substr(0,70);
170  totSize += 70;
171  }
172  else
173  {
174  str << _value;
175  totSize += _value.size();
176  }
177 
178  // If there is space remaining, add comment area
179  if (totSize < 77)
180  {
181  str << " / ";
182  totSize += 3;
183  if (totSize < 80)
184  {
185  unsigned int commentSize = 80 - totSize;
186  if (_comment.length() > commentSize)
187  {
188  str << _comment.substr(0,commentSize);
189  totSize += commentSize;
190  }
191  else
192  {
193  str << _comment;
194  totSize += _comment.length();
195  }
196  }
197  }
198 
199  // If there is yet some free space, fill up the entry with spaces
200  for (int i=totSize; i<80;i++)
201  str << " ";
202 
203  _fitsString = str.str();
204 
205  // Check for correct completion
206  if (_fitsString.length() != 80)
207  cout << "Error |" << _fitsString << "| is not of length 80" << endl;
208  }
209 
210  string _key;
211  string _value;
212  string _comment;
213  string _fitsString;
214  };
215 
219  typedef enum
220  {
222  SMOOTHMAN
223  } FitsCompression;
224 
229  {
230  public:
234  ColumnEntry();
235 
239  virtual ~ColumnEntry(){}
240 
248  ColumnEntry(const string& n,
249  char t,
250  int numOf,
251  BlockHeader& head,
252  vector<uint16_t>& seq) : _name(n),
253  _num(numOf),
254  _typeSize(0),
255  _offset(0),
256  _type(t),
257  _description(""),
258  _header(head),
259  _compSequence(seq)
260  {
261  switch (t)
262  {
263  case 'L':
264  case 'A':
265  case 'B': _typeSize = 1; break;
266  case 'I': _typeSize = 2; break;
267  case 'J':
268  case 'E': _typeSize = 4; break;
269  case 'K':
270  case 'D': _typeSize = 8; break;
271  default:
272  cout << "Error: typename " << t << " missing in the current implementation" << endl;
273  };
274 
275  ostringstream str;
276  str << "data format of field: ";
277 
278  switch (t)
279  {
280  case 'L': str << "1-byte BOOL"; break;
281  case 'A': str << "1-byte CHAR"; break;
282  case 'B': str << "BYTE"; break;
283  case 'I': str << "2-byte INTEGER"; break;
284  case 'J': str << "4-byte INTEGER"; break;
285  case 'K': str << "8-byte INTEGER"; break;
286  case 'E': str << "4-byte FLOAT"; break;
287  case 'D': str << "8-byte FLOAT"; break;
288  }
289 
290  _description = str.str();
291  }
292 
293  const string& name() const { return _name;};
294  int width() const { return _num*_typeSize;};
295  int offset() const { return _offset; };
296  int numElems() const { return _num; };
297  int sizeOfElems() const { return _typeSize;};
298  void setOffset(int off) { _offset = off;};
299  char type() const { return _type;};
300  string getDescription() const { return _description;}
301  BlockHeader& getBlockHeader() { return _header;}
302  const vector<uint16_t>& getCompressionSequence() const { return _compSequence;}
303  const char& getColumnOrdering() const { return _header.ordering;}
304 
305 
306  string getCompressionString() const
307  {
308  return "FACT";
309  /*
310  ostringstream str;
311  for (uint32_t i=0;i<_compSequence.size();i++)
312  switch (_compSequence[i])
313  {
314  case FACT_RAW: if (str.str().size() == 0) str << "RAW"; break;
315  case FACT_SMOOTHING: str << "SMOOTHING "; break;
316  case FACT_HUFFMAN16: str << "HUFFMAN16 "; break;
317  };
318  return str.str();*/
319  }
320 
321  private:
322 
323  string _name;
324  int _num;
325  int _typeSize;
326  int _offset;
327  char _type;
328  string _description;
330  vector<uint16_t> _compSequence;
331  };
332 
333  public:
335  CompressedFitsFile(uint32_t numTiles=100, uint32_t numRowsPerTile=100);
336 
338  virtual ~CompressedFitsFile();
339 
341  vector<HeaderEntry>& getHeaderEntries() { return _header;}
342 
343  protected:
345  bool reallocateBuffers();
346 
347  //FITS related stuff
348  vector<HeaderEntry> _header;
349  vector<ColumnEntry> _columns;
350  uint32_t _numTiles;
351  uint32_t _numRowsPerTile;
352  uint32_t _totalNumRows;
353  uint32_t _rowWidth;
355  char* _buffer;
357  fstream _file;
358 
359  //compression related stuff
360  typedef pair<int64_t, int64_t> CatalogEntry;
361  typedef vector<CatalogEntry> CatalogRow;
362  typedef vector<CatalogRow> CatalogType;
363  CatalogType _catalog;
364  uint64_t _heapPtr;
365  vector<char*> _transposedBuffer;
366  vector<char*> _compressedBuffer;
367 
368  //thread related stuff
369  uint32_t _numThreads;
370  uint32_t _threadIndex;
371  vector<pthread_t> _thread;
372  vector<uint32_t> _threadNumRows;
373  vector<uint32_t> _threadStatus;
374 
375  //thread states. Not all used, but they do not hurt
376  static const uint32_t _THREAD_WAIT_;
377  static const uint32_t _THREAD_COMPRESS_;
378  static const uint32_t _THREAD_DECOMPRESS_;
379  static const uint32_t _THREAD_WRITE_;
380  static const uint32_t _THREAD_READ_;
381  static const uint32_t _THREAD_EXIT_;
382 
384 };
385 
387 {
388  public:
390  CompressedFitsWriter(uint32_t numTiles=100, uint32_t numRowsPerTile=100);
391 
393  virtual ~CompressedFitsWriter();
394 
396  bool addColumn(const ColumnEntry& column);
397 
399  bool setHeaderKey(const HeaderEntry&);
400 
401  bool changeHeaderKey(const string& origName, const string& newName);
402 
404  bool open(const string& fileName, const string& tableName="Data");
405 
407  bool close();
408 
410  bool writeBinaryRow(const char* bufferToWrite);
411 
412  uint32_t getRowWidth();
413 
415  void setDrsCalib(int16_t* data);
416 
418  bool setNumWorkingThreads(uint32_t num);
419 
420  private:
422  uint64_t compressBuffer(uint32_t threadIndex);
423 
425  bool writeCompressedDataToDisk(uint32_t threadID, uint32_t sizeToWrite);
426 
428  void addHeaderChecksum(Checksum& checksum);
429 
431  void writeHeader(bool closingFile = false);
432 
434  void writeCatalog(bool closingFile=false);
435 
437  vector<HeaderEntry> _defaultHeader;
438 
440  static void* threadFunction(void* context);
441 
443  void writeDrsCalib();
444 
446  void copyTransposeTile(uint32_t index);
447 
449  uint32_t compressUNCOMPRESSED(char* dest, const char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems);
450  uint32_t compressHUFFMAN(char* dest, const char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems);
451  uint32_t compressSMOOTHMAN(char* dest, char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems);
452  uint32_t applySMOOTHING(char* dest, char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems);
453 
454  int32_t _checkOffset;
455  int16_t* _drsCalibData;
456  int32_t _threadLooper;
457  pthread_mutex_t _mutex;
458 
459 
460  static string _emptyBlock;
461  static string _fitsHeader;
462 };
463 
464 const uint32_t CompressedFitsFile::_THREAD_WAIT_ = 0;
465 const uint32_t CompressedFitsFile::_THREAD_COMPRESS_ = 1;
467 const uint32_t CompressedFitsFile::_THREAD_WRITE_ = 3;
468 const uint32_t CompressedFitsFile::_THREAD_READ_ = 4;
469 const uint32_t CompressedFitsFile::_THREAD_EXIT_ = 5;
470 
471 template<>
473 {
474  string val = v;
475  if (val.size() > 2 && val[0] == '\'')
476  {
477  size_t pos = val.find_last_of("'");
478  if (pos != string::npos && pos != 0)
479  val = val.substr(1, pos-1);
480  }
481  ostringstream str;
482 
483  str << "'" << val << "'";
484  for (int i=str.str().length(); i<20;i++)
485  str << " ";
486  _value = str.str();
487  buildFitsString();
488 }
489 
493 string CompressedFitsWriter::_fitsHeader = "SIMPLE = T / file does conform to FITS standard "
494  "BITPIX = 8 / number of bits per data pixel "
495  "NAXIS = 0 / number of data axes "
496  "EXTEND = T / FITS dataset may contain extensions "
497  "CHECKSUM= '4AcB48bA4AbA45bA' / Checksum for the whole HDU "
498  "DATASUM = ' 0' / Checksum for the data block "
499  "COMMENT FITS (Flexible Image Transport System) format is defined in 'Astronomy"
500  "COMMENT and Astrophysics', volume 376, page 359; bibcode: 2001A&A...376..359H "
501  "END "
502  " "
503  " "
504  " "
505  " "
506  " "
507  " "
508  " "
509  " "
510  " "
511  " "
512  " "
513  " "
514  " "
515  " "
516  " "
517  " "
518  " "
519  " "
520  " "
521  " "
522  " "
523  " "
524  " "
525  " "
526  " "
527  " "
528  " ";
529 
535  " "
536  " "
537  " "
538  " "
539  " "
540  " "
541  " "
542  " "
543  " "
544  " "
545  " "
546  " "
547  " "
548  " "
549  " "
550  " "
551  " "
552  " "
553  " "
554  " "
555  " "
556  " "
557  " "
558  " "
559  " "
560  " "
561  " "
562  " "
563  " "
564  " "
565  " "
566  " "
567  " "
568  " "
569  " ";
570 
572 /****************************************************************
573  * SUPER CLASS DEFAULT CONSTRUCTOR
574  ****************************************************************/
576  uint32_t numRowsPerTile) : _header(),
577  _columns(),
578  _numTiles(numTiles),
579  _numRowsPerTile(numRowsPerTile),
580  _totalNumRows(0),
581  _rowWidth(0),
582  _headerFlushed(false),
583  _buffer(NULL),
584  _checksum(0),
585  _heapPtr(0),
586  _transposedBuffer(1),
587  _compressedBuffer(1),
588  _numThreads(1),
589  _threadIndex(0),
590  _thread(1),
591  _threadNumRows(1),
592  _threadStatus(1)
593 {
594  _catalog.resize(_numTiles);
595  _transposedBuffer[0] = NULL;
596  _compressedBuffer[0] = NULL;
598  _threadNumRows[0] = 0;
599 }
600 
601 /****************************************************************
602  * SUPER CLASS DEFAULT DESTRUCTOR
603  ****************************************************************/
605 {
606  if (_buffer != NULL)
607  {
608  _buffer = _buffer-4;
609  delete[] _buffer;
610  _buffer = NULL;
611  for (uint32_t i=0;i<_numThreads;i++)
612  {
614  delete[] _transposedBuffer[i];
615  delete[] _compressedBuffer[i];
616  _transposedBuffer[i] = NULL;
617  _compressedBuffer[i] = NULL;
618  }
619  }
620  if (_file.is_open())
621  _file.close();
622 }
623 
624 /****************************************************************
625  * REALLOCATE BUFFER
626  ****************************************************************/
628 {
629  if (_buffer)
630  {
631  _buffer = _buffer - 4;
632  delete[] _buffer;
633  for (uint32_t i=0;i<_compressedBuffer.size();i++)
634  {
636  delete[] _transposedBuffer[i];
637  delete[] _compressedBuffer[i];
638  }
639  }
640  _buffer = new char[_rowWidth*_numRowsPerTile + 12];
641  if (_buffer == NULL) return false;
642  memset(_buffer, 0, 4);
643  _buffer = _buffer + 4;
644  if (_compressedBuffer.size() != _numThreads)
645  {
648  }
649  for (uint32_t i=0;i<_numThreads;i++)
650  {
652  _compressedBuffer[i] = new char[_rowWidth*_numRowsPerTile + _columns.size() + sizeof(TileHeader) + 12]; //use a bit more memory for compression flags and checksumming
653  if (_transposedBuffer[i] == NULL || _compressedBuffer[i] == NULL)
654  return false;
655  //shift the compressed buffer by 4 bytes, for checksum calculation
656  memset(_compressedBuffer[i], 0, 4);
658  //initialize the tile header
659  TileHeader tileHeader;
660  memcpy(_compressedBuffer[i], &tileHeader, sizeof(TileHeader));
661  }
662  return true;
663 }
664 
665 /****************************************************************
666  * DEFAULT WRITER CONSTRUCTOR
667  ****************************************************************/
669  uint32_t numRowsPerTile) : CompressedFitsFile(numTiles, numRowsPerTile),
670  _checkOffset(0),
671  _drsCalibData(NULL),
672  _threadLooper(0)
673 {
674  _defaultHeader.push_back(HeaderEntry("XTENSION", "'BINTABLE' ", "binary table extension"));
675  _defaultHeader.push_back(HeaderEntry("BITPIX", 8, "8-bit bytes"));
676  _defaultHeader.push_back(HeaderEntry("NAXIS", 2, "2-dimensional binary table"));
677  _defaultHeader.push_back(HeaderEntry("NAXIS1", _rowWidth, "width of table in bytes"));
678  _defaultHeader.push_back(HeaderEntry("NAXIS2", numTiles, "num of rows in table"));
679  _defaultHeader.push_back(HeaderEntry("PCOUNT", 0, "size of special data area"));
680  _defaultHeader.push_back(HeaderEntry("GCOUNT", 1, "one data group (required keyword)"));
681  _defaultHeader.push_back(HeaderEntry("TFIELDS", _columns.size(), "number of fields in each row"));
682  _defaultHeader.push_back(HeaderEntry("CHECKSUM", "'0000000000000000' ", "Checksum for the whole HDU"));
683  _defaultHeader.push_back(HeaderEntry("DATASUM", " 0", "Checksum for the data block"));
684  //compression stuff
685  _defaultHeader.push_back(HeaderEntry("ZTABLE", "T", "Table is compressed"));
686  _defaultHeader.push_back(HeaderEntry("ZNAXIS1", 0, "Width of uncompressed rows"));
687  _defaultHeader.push_back(HeaderEntry("ZNAXIS2", 0, "Number of uncompressed rows"));
688  _defaultHeader.push_back(HeaderEntry("ZPCOUNT", 0, ""));
689  _defaultHeader.push_back(HeaderEntry("ZHEAPPTR", 0, ""));
690  _defaultHeader.push_back(HeaderEntry("ZTILELEN", numRowsPerTile, "Number of rows per tile"));
691  _defaultHeader.push_back(HeaderEntry("THEAP", 0, ""));
692 
693  pthread_mutex_init(&_mutex, NULL);
694 }
695 
696 /****************************************************************
697  * DEFAULT DESTRUCTOR
698  ****************************************************************/
700 {
701  pthread_mutex_destroy(&_mutex);
702 }
703 
704 /****************************************************************
705  * SET THE POINTER TO THE DRS CALIBRATION
706  ****************************************************************/
708 {
710 }
711 
712 /****************************************************************
713  * SET NUM WORKING THREADS
714  ****************************************************************/
716 {
717  if (_file.is_open())
718  return false;
719  if (num < 1 || num > 64)
720  {
721  cout << "ERROR: num threads must be between 1 and 64. Ignoring" << endl;
722  return false;
723  }
724  _numThreads = num;
725  _transposedBuffer[0] = NULL;
726  _compressedBuffer[0] = NULL;
727  _threadStatus.resize(num);
728  _thread.resize(num);
729  _threadNumRows.resize(num);
730  for (uint32_t i=0;i<num;i++)
731  {
732  _threadNumRows[i] = 0;
734  }
735  return reallocateBuffers();
736 }
737 
738 /****************************************************************
739  * WRITE DRS CALIBRATION TO FILE
740  ****************************************************************/
742 {
743  //if file was not loaded, ignore
744  if (_drsCalibData == NULL)
745  return;
746  uint64_t whereDidIStart = _file.tellp();
747  vector<HeaderEntry> header;
748  header.push_back(HeaderEntry("XTENSION", "'BINTABLE' ", "binary table extension"));
749  header.push_back(HeaderEntry("BITPIX" , 8 , "8-bit bytes"));
750  header.push_back(HeaderEntry("NAXIS" , 2 , "2-dimensional binary table"));
751  header.push_back(HeaderEntry("NAXIS1" , 1024*1440*2 , "width of table in bytes"));
752  header.push_back(HeaderEntry("NAXIS2" , 1 , "number of rows in table"));
753  header.push_back(HeaderEntry("PCOUNT" , 0 , "size of special data area"));
754  header.push_back(HeaderEntry("GCOUNT" , 1 , "one data group (required keyword)"));
755  header.push_back(HeaderEntry("TFIELDS" , 1 , "number of fields in each row"));
756  header.push_back(HeaderEntry("CHECKSUM", "'0000000000000000' ", "Checksum for the whole HDU"));
757  header.push_back(HeaderEntry("DATASUM" , " 0" , "Checksum for the data block"));
758  header.push_back(HeaderEntry("EXTNAME" , "'ZDrsCellOffsets' ", "name of this binary table extension"));
759  header.push_back(HeaderEntry("TTYPE1" , "'OffsetCalibration' ", "label for field 1"));
760  header.push_back(HeaderEntry("TFORM1" , "'1474560I' ", "data format of field: 2-byte INTEGER"));
761 
762  for (uint32_t i=0;i<header.size();i++)
763  _file.write(header[i].fitsString().c_str(), 80);
764  //End the header
765  _file.write("END ", 80);
766  long here = _file.tellp();
767  if (here%2880)
768  _file.write(_emptyBlock.c_str(), 2880 - here%2880);
769  //now write the data itself
770  int16_t* swappedBytes = new int16_t[1024];
771  Checksum checksum;
772  for (int32_t i=0;i<1440;i++)
773  {
774  memcpy(swappedBytes, &(_drsCalibData[i*1024]), 2048);
775  for (int32_t j=0;j<2048;j+=2)
776  {
777  int8_t inter;
778  inter = reinterpret_cast<int8_t*>(swappedBytes)[j];
779  reinterpret_cast<int8_t*>(swappedBytes)[j] = reinterpret_cast<int8_t*>(swappedBytes)[j+1];
780  reinterpret_cast<int8_t*>(swappedBytes)[j+1] = inter;
781  }
782  _file.write(reinterpret_cast<char*>(swappedBytes), 2048);
783  checksum.add(reinterpret_cast<char*>(swappedBytes), 2048);
784  }
785  uint64_t whereDidIStop = _file.tellp();
786  delete[] swappedBytes;
787  //No need to pad the data, as (1440*1024*2)%2880==0
788 
789  //calculate the checksum from the header
790  ostringstream str;
791  str << checksum.val();
792  header[9] = HeaderEntry("DATASUM", str.str(), "Checksum for the data block");
793  for (vector<HeaderEntry>::iterator it=header.begin();it!=header.end(); it++)
794  checksum.add(it->fitsString().c_str(), 80);
795  string end("END ");
796  string space(" ");
797  checksum.add(end.c_str(), 80);
798  int headerRowsLeft = 36 - (header.size() + 1)%36;
799  for (int i=0;i<headerRowsLeft;i++)
800  checksum.add(space.c_str(), 80);
801  //udpate the checksum keyword
802  header[8] = HeaderEntry("CHECKSUM", checksum.str(), "Checksum for the whole HDU");
803  //and eventually re-write the header data
804  _file.seekp(whereDidIStart);
805  for (uint32_t i=0;i<header.size();i++)
806  _file.write(header[i].fitsString().c_str(), 80);
807  _file.seekp(whereDidIStop);
808 }
809 
810 /****************************************************************
811  * ADD COLUMN
812  ****************************************************************/
814 {
815  if (_totalNumRows != 0)
816  {
817  cout << "Error: cannot add new columns once first row has been written" << endl;
818  return false;
819  }
820  for (vector<ColumnEntry>::iterator it=_columns.begin(); it != _columns.end(); it++)
821  {
822  if (it->name() == column.name())
823  {
824  cout << "Warning: column already exist (" << column.name() << "). Ignoring" << endl;
825  return false;
826  }
827  }
828  _columns.push_back(column);
829  _columns.back().setOffset(_rowWidth);
830  _rowWidth += column.width();
832 
833  ostringstream str, str2, str3;
834  str << "TTYPE" << _columns.size();
835  str2 << column.name();
836  str3 << "label for field ";
837  if (_columns.size() < 10) str3 << " ";
838  if (_columns.size() < 100) str3 << " ";
839  str3 << _columns.size();
840  setHeaderKey(HeaderEntry(str.str(), str2.str(), str3.str()));
841  str.str("");
842  str2.str("");
843  str3.str("");
844  str << "TFORM" << _columns.size();
845  str2 << "1QB";
846  str3 << "data format of field " << _columns.size();
847  setHeaderKey(HeaderEntry(str.str(), str2.str(), str3.str()));
848  str.str("");
849  str2.str("");
850  str3.str("");
851  str << "ZFORM" << _columns.size();
852  str2 << column.numElems() << column.type();
853  str3 << "Original format of field " << _columns.size();
854  setHeaderKey(HeaderEntry(str.str(), str2.str(), str3.str()));
855  str.str("");
856  str2.str("");
857  str3.str("");
858  str << "ZCTYP" << _columns.size();
859  str2 << column.getCompressionString();
860  str3 << "Comp. Scheme of field " << _columns.size();
861  setHeaderKey(HeaderEntry(str.str(), str2.str(), str3.str()));
862  //resize the catalog vector accordingly
863  for (uint32_t i=0;i<_numTiles;i++)
864  {
865  _catalog[i].resize(_columns.size());
866  for (uint32_t j=0;j<_catalog[i].size();j++)
867  _catalog[i][j] = make_pair(0,0);
868  }
869  return true;
870 }
871 
872 /****************************************************************
873  * SET HEADER KEY
874  ****************************************************************/
876 {
877  HeaderEntry ent = entry;
878  for (vector<HeaderEntry>::iterator it=_header.begin(); it != _header.end(); it++)
879  {
880  if (it->key() == entry.key())
881  {
882  if (entry.comment() == "")
883  ent.setComment(it->comment());
884  (*it) = ent;
885  _headerFlushed = false;
886  return true;
887  }
888  }
889  for (vector<HeaderEntry>::iterator it=_defaultHeader.begin(); it != _defaultHeader.end(); it++)
890  {
891  if (it->key() == entry.key())
892  {
893  if (entry.comment() == "")
894  ent.setComment(it->comment());
895  (*it) = ent;
896  _headerFlushed = false;
897  return true;
898  }
899  }
900  if (_totalNumRows != 0)
901  {
902  cout << "Error: new header keys (" << entry.key() << ") must be set before the first row is written. Ignoring." << endl;
903  return false;
904  }
905  _header.push_back(entry);
906  _headerFlushed = false;
907  return true;
908 }
909 
910 bool CompressedFitsWriter::changeHeaderKey(const string& origName, const string& newName)
911 {
912  for (vector<HeaderEntry>::iterator it=_header.begin(); it != _header.end(); it++)
913  {
914  if (it->key() == origName)
915  {
916  (*it) = HeaderEntry(newName, it->value(), it->comment());
917  _headerFlushed = false;
918  return true;
919  }
920  }
921  for (vector<HeaderEntry>::iterator it=_defaultHeader.begin(); it != _defaultHeader.end(); it++)
922  {
923  if (it->key() == origName)
924  {
925  (*it) = HeaderEntry(newName, it->value(), it->comment());
926  _headerFlushed = false;
927  return true;
928  }
929  }
930  return false;
931 }
932 /****************************************************************
933  * OPEN
934  ****************************************************************/
935 bool CompressedFitsWriter::open(const string& fileName, const string& tableName)
936 {
937  _file.open(fileName.c_str(), ios_base::out);
938  if (!_file.is_open())
939  {
940  cout << "Error: Could not open the file (" << fileName << ")." << endl;
941  return false;
942  }
943  _defaultHeader.push_back(HeaderEntry("EXTNAME", tableName, "name of this binary table extension"));
944  _headerFlushed = false;
945  _threadIndex = 0;
946  //create requested number of threads
947  for (uint32_t i=0;i<_numThreads;i++)
948  pthread_create(&(_thread[i]), NULL, threadFunction, this);
949  //wait for the threads to start
950  while (_numThreads != _threadIndex)
951  usleep(1000);
952  //set the writing fence to the last thread
953  _threadIndex = _numThreads-1;
954  return (_file.good());
955 }
956 
957 /****************************************************************
958  * WRITE HEADER
959  ****************************************************************/
960 void CompressedFitsWriter::writeHeader(bool closingFile)
961 {
962  if (_headerFlushed)
963  return;
964  if (!_file.is_open())
965  return;
966 
967  long cPos = _file.tellp();
968 
969  _file.seekp(0);
970 
971  _file.write(_fitsHeader.c_str(), 2880);
972 
973  //Write the DRS calib table here !
974  writeDrsCalib();
975 
976  //we are now at the beginning of the main table. Write its header
977  for (vector<HeaderEntry>::iterator it=_defaultHeader.begin(); it != _defaultHeader.end(); it++)
978  _file.write(it->fitsString().c_str(), 80);
979 
980  for (vector<HeaderEntry>::iterator it=_header.begin(); it != _header.end(); it++)
981  _file.write(it->fitsString().c_str(), 80);
982 
983  _file.write("END ", 80);
984  long here = _file.tellp();
985  if (here%2880)
986  _file.write(_emptyBlock.c_str(), 2880 - here%2880);
987 
988  _headerFlushed = true;
989 
990  here = _file.tellp();
991 
992  if (here%2880)
993  cout << "Error: seems that header did not finish at the end of a block." << endl;
994 
995  if (here > cPos && cPos != 0)
996  {
997  cout << "Error, entries were added after the first row was written. This is not supposed to happen." << endl;
998  return;
999  }
1000 
1001  here = _file.tellp();
1002  writeCatalog(closingFile);
1003 
1004  here = _file.tellp() - here;
1005  _heapPtr = here;
1006 
1007  if (cPos != 0)
1008  _file.seekp(cPos);
1009 }
1010 
1011 /****************************************************************
1012  * WRITE CATALOG
1013  * WARNING: writeCatalog is only meant to be used by writeHeader.
1014  * external usage will most likely corrupt the file
1015  ****************************************************************/
1017 {
1018  uint32_t sizeWritten = 0;
1019  for (uint32_t i=0;i<_catalog.size();i++)
1020  {
1021  for (uint32_t j=0;j<_catalog[i].size();j++)
1022  {
1023  //swap the bytes
1024  int8_t swappedEntry[16];
1025  swappedEntry[0] = reinterpret_cast<int8_t*>(&_catalog[i][j].first)[7];
1026  swappedEntry[1] = reinterpret_cast<int8_t*>(&_catalog[i][j].first)[6];
1027  swappedEntry[2] = reinterpret_cast<int8_t*>(&_catalog[i][j].first)[5];
1028  swappedEntry[3] = reinterpret_cast<int8_t*>(&_catalog[i][j].first)[4];
1029  swappedEntry[4] = reinterpret_cast<int8_t*>(&_catalog[i][j].first)[3];
1030  swappedEntry[5] = reinterpret_cast<int8_t*>(&_catalog[i][j].first)[2];
1031  swappedEntry[6] = reinterpret_cast<int8_t*>(&_catalog[i][j].first)[1];
1032  swappedEntry[7] = reinterpret_cast<int8_t*>(&_catalog[i][j].first)[0];
1033 
1034  swappedEntry[8] = reinterpret_cast<int8_t*>(&_catalog[i][j].second)[7];
1035  swappedEntry[9] = reinterpret_cast<int8_t*>(&_catalog[i][j].second)[6];
1036  swappedEntry[10] = reinterpret_cast<int8_t*>(&_catalog[i][j].second)[5];
1037  swappedEntry[11] = reinterpret_cast<int8_t*>(&_catalog[i][j].second)[4];
1038  swappedEntry[12] = reinterpret_cast<int8_t*>(&_catalog[i][j].second)[3];
1039  swappedEntry[13] = reinterpret_cast<int8_t*>(&_catalog[i][j].second)[2];
1040  swappedEntry[14] = reinterpret_cast<int8_t*>(&_catalog[i][j].second)[1];
1041  swappedEntry[15] = reinterpret_cast<int8_t*>(&_catalog[i][j].second)[0];
1042  if (closingFile)
1043  {
1044  _checksum.add(reinterpret_cast<char*>(swappedEntry), 16);
1045  }
1046  _file.write(reinterpret_cast<char*>(&swappedEntry[0]), 2*sizeof(int64_t));
1047  sizeWritten += 2*sizeof(int64_t);
1048  }
1049  }
1050 
1051  //we do not reserve space for now because fverify does not like that.
1052  //TODO bug should be fixed in the new version. Install it on the cluster and restor space reservation
1053  return ;
1054 
1055  //write the padding so that the HEAP section starts at a 2880 bytes boundary
1056  if (sizeWritten % 2880 != 0)
1057  {
1058  vector<char> nullVec(2880 - sizeWritten%2880, 0);
1059  _file.write(nullVec.data(), 2880 - sizeWritten%2880);
1060  }
1061 }
1062 
1063 /****************************************************************
1064  * ADD HEADER CHECKSUM
1065  ****************************************************************/
1067 {
1068  for (vector<HeaderEntry>::iterator it=_defaultHeader.begin();it!=_defaultHeader.end(); it++)
1069  _checksum.add(it->fitsString().c_str(), 80);
1070  for (vector<HeaderEntry>::iterator it=_header.begin(); it != _header.end(); it++)
1071  _checksum.add(it->fitsString().c_str(), 80);
1072  string end("END ");
1073  string space(" ");
1074  checksum.add(end.c_str(), 80);
1075  int headerRowsLeft = 36 - (_defaultHeader.size() + _header.size() + 1)%36;
1076  for (int i=0;i<headerRowsLeft;i++)
1077  checksum.add(space.c_str(), 80);
1078 }
1079 
1080 /****************************************************************
1081  * CLOSE
1082  ****************************************************************/
1084 {
1085  for (uint32_t i=0;i<_numThreads;i++)
1086  while (_threadStatus[i] != _THREAD_WAIT_)
1087  usleep(100000);
1088  for (uint32_t i=0;i<_numThreads;i++)
1090  for (uint32_t i=0;i<_numThreads;i++)
1091  pthread_join(_thread[i], NULL);
1092  //flush the rows that were not written yet
1093  if (_totalNumRows%_numRowsPerTile != 0)
1094  {
1095  copyTransposeTile(0);
1096 
1098  uint32_t numBytes = compressBuffer(0);
1099  writeCompressedDataToDisk(0, numBytes);
1100  }
1101  //compression stuff
1102  setHeaderKey(HeaderEntry("ZNAXIS1", _rowWidth, "Width of uncompressed rows"));
1103  setHeaderKey(HeaderEntry("ZNAXIS2", _totalNumRows, "Number of uncompressed rows"));
1104  //TODO calculate the real offset from the main table to the start of the HEAP data area
1105  setHeaderKey(HeaderEntry("ZHEAPPTR", _heapPtr, ""));
1106  setHeaderKey(HeaderEntry("THEAP", _heapPtr, ""));
1107 
1108  //regular stuff
1109  if (_catalog.size() > 0)
1110  {
1111  setHeaderKey(HeaderEntry("NAXIS1", 2*sizeof(int64_t)*_catalog[0].size(), "width of table in bytes"));
1112  setHeaderKey(HeaderEntry("NAXIS2", _numTiles, ""));
1113  setHeaderKey(HeaderEntry("TFIELDS", _columns.size(), "number of fields in each row"));
1114  int64_t heapSize = 0;
1115  int64_t compressedOffset = 0;
1116  for (uint32_t i=0;i<_catalog.size();i++)
1117  {
1118  compressedOffset += sizeof(TileHeader);
1119  heapSize += sizeof(TileHeader);
1120  for (uint32_t j=0;j<_catalog[i].size();j++)
1121  {
1122  heapSize += _catalog[i][j].first;
1123 // cout << "heapSize: " << heapSize << endl;
1124  //set the catalog offsets to their actual values
1125  _catalog[i][j].second = compressedOffset;
1126  compressedOffset += _catalog[i][j].first;
1127  //special case if entry has zero length
1128  if (_catalog[i][j].first == 0) _catalog[i][j].second = 0;
1129  }
1130  }
1131  setHeaderKey(HeaderEntry("PCOUNT", heapSize, "size of special data area"));
1132  }
1133  else
1134  {
1135  setHeaderKey(HeaderEntry("NAXIS1", _columns.size()*2*sizeof(int64_t), "width of table in bytes"));
1136  setHeaderKey(HeaderEntry("NAXIS2", 0, ""));
1137  setHeaderKey(HeaderEntry("TFIELDS", _columns.size(), "number of fields in each row"));
1138  setHeaderKey(HeaderEntry("PCOUNT", 0, "size of special data area"));
1139  changeHeaderKey("THEAP", "ZHEAP");
1140  }
1141  ostringstream str;
1142 
1143  writeHeader(true);
1144 
1145  str.str("");
1146  str << _checksum.val();
1147 
1148  setHeaderKey(HeaderEntry("DATASUM", str.str(), ""));
1150  setHeaderKey(HeaderEntry("CHECKSUM", _checksum.str(), ""));
1151  //update header value
1152  writeHeader();
1153  //update file length
1154  long here = _file.tellp();
1155  if (here%2880)
1156  {
1157  vector<char> nullVec(2880 - here%2880, 0);
1158  _file.write(nullVec.data(), 2880 - here%2880);
1159  }
1160  _file.close();
1161  return true;
1162 }
1163 
1164 /****************************************************************
1165  * COPY TRANSPOSE TILE
1166  ****************************************************************/
1168 {
1170 
1171  //copy the tile and transpose it
1172  uint32_t offset = 0;
1173  for (uint32_t i=0;i<_columns.size();i++)
1174  {
1175  switch (_columns[i].getColumnOrdering())//getCompression())
1176  {
1177  case FITS::kOrderByRow:
1178  for (uint32_t k=0;k<thisRoundNumRows;k++)
1179  {//regular, "semi-transposed" copy
1180  memcpy(&(_transposedBuffer[index][offset]), &_buffer[k*_rowWidth + _columns[i].offset()], _columns[i].sizeOfElems()*_columns[i].numElems());
1181  offset += _columns[i].sizeOfElems()*_columns[i].numElems();
1182  }
1183  break;
1184 
1185  case FITS::kOrderByCol :
1186  for (int j=0;j<_columns[i].numElems();j++)
1187  for (uint32_t k=0;k<thisRoundNumRows;k++)
1188  {//transposed copy
1189  memcpy(&(_transposedBuffer[index][offset]), &_buffer[k*_rowWidth + _columns[i].offset() + _columns[i].sizeOfElems()*j], _columns[i].sizeOfElems());
1190  offset += _columns[i].sizeOfElems();
1191  }
1192  break;
1193  default:
1194  cout << "Error: unknown column ordering: " << _columns[i].getColumnOrdering() << endl;
1195 
1196  };
1197  }
1198 }
1199 
1200 /****************************************************************
1201  * WRITE BINARY ROW
1202  ****************************************************************/
1203 bool CompressedFitsWriter::writeBinaryRow(const char* bufferToWrite)
1204 {
1205  if (_totalNumRows == 0)
1206  writeHeader();
1207 
1208  memcpy(&_buffer[_rowWidth*(_totalNumRows%_numRowsPerTile)], bufferToWrite, _rowWidth);
1209  _totalNumRows++;
1210  if (_totalNumRows%_numRowsPerTile == 0)
1211  {
1212  //which is the next thread that we should use ?
1214  usleep(100000);
1215 
1217 
1219  usleep(100000);
1220 
1224  }
1225  return _file.good();
1226 }
1227 
1229 {
1230  return _rowWidth;
1231 }
1232 
1233 /****************************************************************
1234  * COMPRESS BUFFER
1235  ****************************************************************/
1236 uint32_t CompressedFitsWriter::compressUNCOMPRESSED(char* dest, const char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
1237 {
1238  memcpy(dest, src, numRows*sizeOfElems*numRowElems);
1239  return numRows*sizeOfElems*numRowElems;
1240 }
1241 
1242 /****************************************************************
1243  * COMPRESS BUFFER
1244  ****************************************************************/
1245 uint32_t CompressedFitsWriter::compressHUFFMAN(char* dest, const char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
1246 {
1247  string huffmanOutput;
1248  uint32_t previousHuffmanSize = 0;
1249  if (numRows < 2)
1250  {//if we have less than 2 elems to compress, Huffman encoder does not work (and has no point). Just return larger size than uncompressed to trigger the raw storage.
1251  return numRows*sizeOfElems*numRowElems + 1000;
1252  }
1253  if (sizeOfElems < 2 )
1254  {
1255  cout << "Fatal ERROR: HUFMANN can only encode short or longer types" << endl;
1256  return 0;
1257  }
1258  uint32_t huffmanOffset = 0;
1259  for (uint32_t j=0;j<numRowElems;j++)
1260  {
1261  Huffman::Encode(huffmanOutput,
1262  reinterpret_cast<const uint16_t*>(&src[j*sizeOfElems*numRows]),
1263  numRows*(sizeOfElems/2));
1264  reinterpret_cast<uint32_t*>(&dest[huffmanOffset])[0] = huffmanOutput.size() - previousHuffmanSize;
1265  huffmanOffset += sizeof(uint32_t);
1266  previousHuffmanSize = huffmanOutput.size();
1267  }
1268  const size_t totalSize = huffmanOutput.size() + huffmanOffset;
1269 
1270  //only copy if not larger than not-compressed size
1271  if (totalSize < numRows*sizeOfElems*numRowElems)
1272  memcpy(&dest[huffmanOffset], huffmanOutput.data(), huffmanOutput.size());
1273 
1274  return totalSize;
1275 }
1276 
1277 /****************************************************************
1278  * COMPRESS BUFFER
1279  ****************************************************************/
1280 uint32_t CompressedFitsWriter::compressSMOOTHMAN(char* dest, char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
1281 {
1282  uint32_t colWidth = numRowElems;
1283  for (int j=colWidth*numRows-1;j>1;j--)
1284  reinterpret_cast<int16_t*>(src)[j] = reinterpret_cast<int16_t*>(src)[j] - (reinterpret_cast<int16_t*>(src)[j-1]+reinterpret_cast<int16_t*>(src)[j-2])/2;
1285  //call the huffman transposed
1286  return compressHUFFMAN(dest, src, numRowElems, sizeOfElems, numRows);
1287 }
1288 
1289 uint32_t CompressedFitsWriter::applySMOOTHING(char* , char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
1290 {
1291  uint32_t colWidth = numRowElems;
1292  for (int j=colWidth*numRows-1;j>1;j--)
1293  reinterpret_cast<int16_t*>(src)[j] = reinterpret_cast<int16_t*>(src)[j] - (reinterpret_cast<int16_t*>(src)[j-1]+reinterpret_cast<int16_t*>(src)[j-2])/2;
1294 
1295  return numRows*sizeOfElems*numRowElems;
1296 }
1297 /****************************************************************
1298  * COMPRESS BUFFER
1299  ****************************************************************/
1300 uint64_t CompressedFitsWriter::compressBuffer(uint32_t threadIndex)
1301 {
1302  uint32_t thisRoundNumRows = (_threadNumRows[threadIndex]%_numRowsPerTile) ? _threadNumRows[threadIndex]%_numRowsPerTile : _numRowsPerTile;
1303  uint32_t offset=0;
1304  uint32_t currentCatalogRow = (_threadNumRows[threadIndex]-1)/_numRowsPerTile;
1305  uint64_t compressedOffset = sizeof(TileHeader); //skip the 'TILE' marker and tile size entry
1306 
1307  //now compress each column one by one by calling compression on arrays
1308  for (uint32_t i=0;i<_columns.size();i++)
1309  {
1310  _catalog[currentCatalogRow][i].second = compressedOffset;
1311 
1312  if (_columns[i].numElems() == 0) continue;
1313 
1314  BlockHeader& head = _columns[i].getBlockHeader();
1315  const vector<uint16_t>& sequence = _columns[i].getCompressionSequence();
1316  //set the default byte telling if uncompressed the compressed Flag
1317  uint64_t previousOffset = compressedOffset;
1318  //skip header data
1319  compressedOffset += sizeof(BlockHeader) + sizeof(uint16_t)*sequence.size();
1320 
1321  for (uint32_t j=0;j<sequence.size(); j++)
1322  {
1323  switch (sequence[j])
1324  {
1325  case FITS::kFactRaw:
1326 // if (head.numProcs == 1)
1327  compressedOffset += compressUNCOMPRESSED(&(_compressedBuffer[threadIndex][compressedOffset]), &(_transposedBuffer[threadIndex][offset]), thisRoundNumRows, _columns[i].sizeOfElems(), _columns[i].numElems());
1328  break;
1329  case FITS::kFactSmoothing:
1330  applySMOOTHING(&(_compressedBuffer[threadIndex][compressedOffset]), &(_transposedBuffer[threadIndex][offset]), thisRoundNumRows, _columns[i].sizeOfElems(), _columns[i].numElems());
1331  break;
1332  case FITS::kFactHuffman16:
1333  if (head.ordering == FITS::kOrderByCol)
1334  compressedOffset += compressHUFFMAN(&(_compressedBuffer[threadIndex][compressedOffset]), &(_transposedBuffer[threadIndex][offset]), thisRoundNumRows, _columns[i].sizeOfElems(), _columns[i].numElems());
1335  else
1336  compressedOffset += compressHUFFMAN(&(_compressedBuffer[threadIndex][compressedOffset]), &(_transposedBuffer[threadIndex][offset]), _columns[i].numElems(), _columns[i].sizeOfElems(), thisRoundNumRows);
1337  break;
1338  default:
1339  cout << "ERROR: Unkown compression sequence entry: " << sequence[i] << endl;
1340  break;
1341  }
1342  }
1343 
1344  //check if compressed size is larger than uncompressed
1345  if (sequence[0] != FITS::kFactRaw &&
1346  compressedOffset - previousOffset > _columns[i].sizeOfElems()*_columns[i].numElems()*thisRoundNumRows+sizeof(BlockHeader)+sizeof(uint16_t)*sequence.size())
1347  {//if so set flag and redo it uncompressed
1348  cout << "REDOING UNCOMPRESSED" << endl;
1349  compressedOffset = previousOffset + sizeof(BlockHeader) + 1;
1350  compressedOffset += compressUNCOMPRESSED(&(_compressedBuffer[threadIndex][compressedOffset]), &(_transposedBuffer[threadIndex][offset]), thisRoundNumRows, _columns[i].sizeOfElems(), _columns[i].numElems());
1351  BlockHeader he;
1352  he.size = compressedOffset - previousOffset;
1353  he.numProcs = 1;
1354  he.ordering = FITS::kOrderByRow;
1355  memcpy(&(_compressedBuffer[threadIndex][previousOffset]), (char*)(&he), sizeof(BlockHeader));
1356  _compressedBuffer[threadIndex][previousOffset+sizeof(BlockHeader)] = FITS::kFactRaw;
1357  offset += thisRoundNumRows*_columns[i].sizeOfElems()*_columns[i].numElems();
1358  _catalog[currentCatalogRow][i].first = compressedOffset - _catalog[currentCatalogRow][i].second;
1359  continue;
1360  }
1361  head.size = compressedOffset - previousOffset;
1362  memcpy(&(_compressedBuffer[threadIndex][previousOffset]), (char*)(&head), sizeof(BlockHeader));
1363  memcpy(&(_compressedBuffer[threadIndex][previousOffset+sizeof(BlockHeader)]), sequence.data(), sizeof(uint16_t)*sequence.size());
1364 
1365  offset += thisRoundNumRows*_columns[i].sizeOfElems()*_columns[i].numElems();
1366  _catalog[currentCatalogRow][i].first = compressedOffset - _catalog[currentCatalogRow][i].second;
1367  }
1368 
1369  TileHeader tHead(thisRoundNumRows, compressedOffset);
1370  memcpy(_compressedBuffer[threadIndex], &tHead, sizeof(TileHeader));
1371  return compressedOffset;
1372 }
1373 
1374 /****************************************************************
1375  * WRITE COMPRESS DATA TO DISK
1376  ****************************************************************/
1377 bool CompressedFitsWriter::writeCompressedDataToDisk(uint32_t threadID, uint32_t sizeToWrite)
1378 {
1379  char* checkSumPointer = _compressedBuffer[threadID];
1380  int32_t extraBytes = 0;
1381  uint32_t sizeToChecksum = sizeToWrite;
1382  if (_checkOffset != 0)
1383  {//should we extend the array to the left ?
1384  sizeToChecksum += _checkOffset;
1385  checkSumPointer -= _checkOffset;
1386  memset(checkSumPointer, 0, _checkOffset);
1387  }
1388  if (sizeToChecksum%4 != 0)
1389  {//should we extend the array to the right ?
1390  extraBytes = 4 - (sizeToChecksum%4);
1391  memset(checkSumPointer+sizeToChecksum, 0,extraBytes);
1392  sizeToChecksum += extraBytes;
1393  }
1394  //do the checksum
1395  _checksum.add(checkSumPointer, sizeToChecksum);
1396 // cout << endl << "Checksum: " << _checksum.val() << endl;
1397  _checkOffset = (4 - extraBytes)%4;
1398  //write data to disk
1399  _file.write(_compressedBuffer[threadID], sizeToWrite);
1400  return _file.good();
1401 }
1402 
1403 /****************************************************************
1404  * WRITER THREAD LOOP
1405  ****************************************************************/
1407 {
1408  CompressedFitsWriter* myself =static_cast<CompressedFitsWriter*>(context);
1409 
1410  uint32_t myID = 0;
1411  pthread_mutex_lock(&(myself->_mutex));
1412  myID = myself->_threadIndex++;
1413  pthread_mutex_unlock(&(myself->_mutex));
1414  uint32_t threadToWaitForBeforeWriting = (myID == 0) ? myself->_numThreads-1 : myID-1;
1415 
1416  while (myself->_threadStatus[myID] != _THREAD_EXIT_)
1417  {
1418  while (myself->_threadStatus[myID] == _THREAD_WAIT_)
1419  usleep(100000);
1420  if (myself->_threadStatus[myID] != _THREAD_COMPRESS_)
1421  continue;
1422  uint32_t numBytes = myself->compressBuffer(myID);
1423  myself->_threadStatus[myID] = _THREAD_WRITE_;
1424 
1425  //wait for the previous data to be written
1426  while (myself->_threadIndex != threadToWaitForBeforeWriting)
1427  usleep(1000);
1428  //do the actual writing to disk
1429  pthread_mutex_lock(&(myself->_mutex));
1430  myself->writeCompressedDataToDisk(myID, numBytes);
1431  myself->_threadIndex = myID;
1432  pthread_mutex_unlock(&(myself->_mutex));
1433  myself->_threadStatus[myID] = _THREAD_WAIT_;
1434  }
1435  return NULL;
1436 }
1437 
1438 /****************************************************************
1439  * PRINT USAGE
1440  ****************************************************************/
1442 {
1443  cout << endl;
1444  cout << "The FACT-Fits compressor reads an input Fits file from FACT"
1445  " and compresses it.\n It can use a drs calibration in order to"
1446  " improve the compression ratio. If so, the input calibration"
1447  " is embedded into the compressed file.\n"
1448  " By default, the Data column will be compressed using SMOOTHMAN (Thomas' algorithm)"
1449  " while other columns will be compressed with the AMPLITUDE coding (Veritas)"
1450  "Usage: Compressed_Fits_Test <inputFile>";
1451  cout << endl;
1452 }
1453 
1454 /****************************************************************
1455  * PRINT HELP
1456  ****************************************************************/
1458 {
1459  cout << endl;
1460  cout << "The inputFile is required. It must have fits in its filename and the compressed file will be written in the same folder. "
1461  "The fz extension will be added, replacing the .gz one if required \n"
1462  "If output is specified, then it will replace the automatically generated output filename\n"
1463  "If --drs, followed by a drs calib then it will be applied to the data before compressing\n"
1464  "rowPerTile can be used to tune how many rows are in each tile. Default is 100\n"
1465  "threads gives the number of threads to use. Cannot be less than the default (1)\n"
1466  "compression explicitely gives the compression scheme to use for a given column. The syntax is:\n"
1467  "<ColumnName>=<CompressionScheme> with <CompressionScheme> one of the following:\n"
1468  "UNCOMPRESSED\n"
1469  "AMPLITUDE\n"
1470  "HUFFMAN\n"
1471  "SMOOTHMAN\n"
1472  "INT_WAVELET\n"
1473  "\n"
1474  "--quiet removes any textual output, except error messages\n"
1475  "--verify makes the compressor check the compressed data. It will read it back, and compare the reconstructed CHECKSUM and DATASUM with the original file values."
1476  ;
1477  cout << endl << endl;
1478 }
1479 
1480 /****************************************************************
1481  * SETUP CONFIGURATION
1482  ****************************************************************/
1484 {
1485  po::options_description configs("FitsCompressor options");
1486  configs.add_options()
1487  ("inputFile,i", vars<string>(), "Input file")
1488  ("drs,d", var<string>(), "Input drs calibration file")
1489  ("rowPerTile,r", var<unsigned int>(), "Number of rows per tile. Default is 100")
1490  ("output,o", var<string>(), "Output file. If empty, .fz is appened to the original name")
1491  ("threads,t", var<unsigned int>(), "Number of threads to use for compression")
1492  ("compression,c", vars<string>(), "which compression to use for which column. Syntax <colName>=<compressionScheme>")
1493  ("quiet,q", po_switch(), "Should the program display any text at all ?")
1494  ("verify,v", po_switch(), "Should we verify the data that has been compressed ?")
1495  ;
1496  po::positional_options_description positional;
1497  positional.add("inputFile", -1);
1498  conf.AddOptions(configs);
1499  conf.SetArgumentPositions(positional);
1500 }
1501 
1502 /****************************************************************
1503  * MAIN
1504  ****************************************************************/
1505 int main(int argc, const char** argv)
1506 {
1507  Configuration conf(argv[0]);
1508  conf.SetPrintUsage(printUsage);
1509  setupConfiguration(conf);
1510 
1511  if (!conf.DoParse(argc, argv, printHelp))
1512  return -1;
1513 
1514  //initialize the file names to nothing.
1515  string fileNameIn = "";
1516  string fileNameOut = "";
1517  string drsFileName = "";
1518  uint32_t numRowsPerTile = 100;
1519  bool displayText=true;
1520 
1521  //parse configuration
1522  if (conf.Get<bool>("quiet")) displayText = false;
1523  const vector<string> inputFileNameVec = conf.Vec<string>("inputFile");
1524  if (inputFileNameVec.size() != 1)
1525  {
1526  cout << "Error: ";
1527  if (inputFileNameVec.size() == 0) cout << "no";
1528  else cout << inputFileNameVec.size();
1529  cout << " input file(s) given. Expected one. Aborting. Input:" << endl;;
1530  for (unsigned int i=0;i<inputFileNameVec.size(); i++)
1531  cout << inputFileNameVec[i] << endl;
1532  return -1;
1533  }
1534 
1535  //Assign the input filename
1536  fileNameIn = inputFileNameVec[0];
1537 
1538  //Check if we have a drs calib too
1539  if (conf.Has("drs")) drsFileName = conf.Get<string>("drs");
1540 
1541  //Should we verify the data ?
1542  bool verifyDataPlease = false;
1543  if (conf.Has("verify")) verifyDataPlease = conf.Get<bool>("verify");
1544 
1545 
1546  //should we use a specific output filename ?
1547  if (conf.Has("output"))
1548  fileNameOut = conf.Get<string>("output");
1549  else
1550  {
1551  size_t pos = fileNameIn.find(".fits");
1552  if (pos == string::npos)
1553  {
1554  cout << "ERROR: input file does not seems ot be fits. Aborting." << endl;
1555  return -1;
1556  }
1557  fileNameOut = fileNameIn + ".fz";
1558  }
1559 
1560 
1561  //should we use specific compression on some columns ?
1562  const vector<string> columnsCompression = conf.Vec<string>("compression");
1563 
1564  //split up values between column names and compression scheme
1565  vector<std::pair<string, string>> compressions;
1566  for (unsigned int i=0;i<columnsCompression.size();i++)
1567  {
1568  size_t pos = columnsCompression[i].find_first_of("=");
1569  if (pos == string::npos)
1570  {
1571  cout << "ERROR: Something wrong occured while parsing " << columnsCompression[i] << ". Aborting." << endl;
1572  return -1;
1573  }
1574  string comp = columnsCompression[i].substr(pos+1);
1575  if (comp != "UNCOMPRESSED" && comp != "AMPLITUDE" && comp != "HUFFMAN" &&
1576  comp != "SMOOTHMAN" && comp != "INT_WAVELET")
1577  {
1578  cout << "Unkown compression scheme requested (" << comp << "). Aborting." << endl;
1579  return -1;
1580  }
1581  compressions.push_back(make_pair(columnsCompression[i].substr(0, pos), comp));
1582  }
1583 
1584  //How many rows per tile should we use ?
1585  if (conf.Has("rowPerTile")) numRowsPerTile = conf.Get<unsigned int>("rowPerTile");
1586 
1587  /************************************************************************************
1588  * Done reading configuration. Open relevant files
1589  ************************************************************************************/
1590 
1591  //Open input's fits file
1592  factfits inFile(fileNameIn);
1593 
1594  if (inFile.IsCompressedFITS())
1595  {
1596  cout << "ERROR: input file is already a compressed fits. Cannot be compressed again: Aborting." << endl;
1597  return -1;
1598  }
1599 
1600  //decide how many tiles should be put in the compressed file
1601  uint32_t originalNumRows = inFile.GetNumRows();
1602  uint32_t numTiles = (originalNumRows%numRowsPerTile) ? (originalNumRows/numRowsPerTile)+1 : originalNumRows/numRowsPerTile;
1603  CompressedFitsWriter outFile(numTiles, numRowsPerTile);
1604 
1605  //should we use a specific number of threads for compressing ?
1606  unsigned int numThreads = 1;
1607  if (conf.Has("threads"))
1608  {
1609  numThreads = conf.Get<unsigned int>("threads");
1610  outFile.setNumWorkingThreads(numThreads);
1611  }
1612 
1613  if (!outFile.open(fileNameOut))
1614  {
1615  cout << "Error: could not open " << fileNameOut << " for writing" << endl;
1616  return -1;
1617  }
1618 
1619  //Because the file to open MUST be given by the constructor, I must use a pointer instead
1620  factfits* drsFile = NULL;
1621  //try to open the Drs file. If any.
1622  if (drsFileName != "")
1623  {
1624  try
1625  {
1626  drsFile = new factfits(drsFileName);
1627  }
1628  catch (...)
1629  {
1630  cout << "Error: could not open " << drsFileName << " for calibration" << endl;
1631  return -1;
1632  }
1633  }
1634 
1635  if (displayText)
1636  {
1637  cout << endl;
1638  cout << "**********************" << endl;
1639  cout << "Will compress from : " << fileNameIn << endl;
1640  cout << "to : " << fileNameOut << endl;
1641  if (drsFileName != "")
1642  cout << "while calibrating with: " << drsFileName << endl;
1643  cout << "Compression will use : " << numThreads << " worker threads" << endl;
1644  cout << "Data will be verified : ";
1645  if (verifyDataPlease)
1646  cout << "yes" << endl;
1647  else
1648  cout << "no (WARNING !)" << endl;
1649  cout << "**********************" << endl;
1650  cout << endl;
1651  }
1652 
1653  /************************************************************************************
1654  * Done opening input files. Allocate memory and configure output file
1655  ************************************************************************************/
1656 
1657  //allocate the buffer for temporary storage of each read/written row
1658  uint32_t rowWidth = inFile.GetUInt("NAXIS1");
1659  char* buffer = new char[rowWidth + 12];
1660  memset(buffer, 0, 4);
1661  buffer = buffer+4;
1662 
1663  //get the source columns
1664  const fits::Table::Columns& columns = inFile.GetColumns();
1665  const fits::Table::SortedColumns& sortedColumns = inFile.GetSortedColumns();
1666  if (displayText)
1667  cout << "Input file has " << columns.size() << " columns and " << inFile.GetNumRows() << " rows" << endl;
1668 
1669  //Add columns.
1670  uint32_t totalRowWidth = 0;
1671  for (uint32_t i=0;i<sortedColumns.size(); i++)
1672  {
1673  //get column name
1674  ostringstream str;
1675  str << "TTYPE" << i+1;
1676  string colName = inFile.GetStr(str.str());
1677  if (displayText)
1678  {
1679  cout << "Column " << colName;
1680  for (uint32_t j=colName.size();j<21;j++)
1681  cout << " ";
1682  cout << " -> ";
1683  }
1684 
1685  //get header structures
1686  BlockHeader rawHeader;
1687  BlockHeader smoothmanHeader(0, FITS::kOrderByRow, 2);
1688  vector<uint16_t> rawProcessings(1);
1689  rawProcessings[0] = FITS::kFactRaw;
1690  vector<uint16_t> smoothmanProcessings(2);
1691  smoothmanProcessings[0] = FITS::kFactSmoothing;
1692  smoothmanProcessings[1] = FITS::kFactHuffman16;
1693 // smoothmanProcessings[2] = FACT_RAW;
1694 
1695  totalRowWidth += sortedColumns[i].bytes;
1696 
1697  //first lets see if we do have an explicit request
1698  bool explicitRequest = false;
1699  for (unsigned int j=0;j<compressions.size();j++)
1700  {
1701  if (compressions[j].first == colName)
1702  {
1703  explicitRequest = true;
1704  if (displayText) cout << compressions[j].second << endl;
1705  if (compressions[j].second == "UNCOMPRESSED")
1706  outFile.addColumn(CompressedFitsFile::ColumnEntry(colName, sortedColumns[i].type, sortedColumns[i].num, rawHeader, rawProcessings));
1707  if (compressions[j].second == "SMOOTHMAN")
1708  outFile.addColumn(CompressedFitsFile::ColumnEntry(colName, sortedColumns[i].type, sortedColumns[i].num, smoothmanHeader, smoothmanProcessings));
1709  break;
1710  }
1711  }
1712 
1713  if (explicitRequest) continue;
1714 
1715  if (colName != "Data")
1716  {
1717  if (displayText) cout << "UNCOMPRESSED" << endl;
1718  outFile.addColumn(CompressedFitsFile::ColumnEntry(colName, sortedColumns[i].type, sortedColumns[i].num, rawHeader, rawProcessings));
1719  }
1720  else
1721  {
1722  if (displayText) cout << "SMOOTHMAN" << endl;
1723  outFile.addColumn(CompressedFitsFile::ColumnEntry(colName, sortedColumns[i].type, sortedColumns[i].num, smoothmanHeader, smoothmanProcessings));
1724  }
1725  }
1726 
1727  //translate original header entries to their Z-version
1728  const fits::Table::Keys& header = inFile.GetKeys();
1729  for (fits::Table::Keys::const_iterator i=header.begin(); i!= header.end(); i++)
1730  {
1731  string k = i->first;//header[i].key();
1732  if (k == "XTENSION" || k == "BITPIX" || k == "PCOUNT" || k == "GCOUNT" || k == "TFIELDS")
1733  continue;
1734  if (k == "CHECKSUM")
1735  {
1736  outFile.setHeaderKey(CompressedFitsFile::HeaderEntry("ZCHKSUM", i->second.value, i->second.comment));
1737  continue;
1738  }
1739  if (k == "DATASUM")
1740  {
1741  outFile.setHeaderKey(CompressedFitsFile::HeaderEntry("ZDTASUM", i->second.value, i->second.comment));
1742  continue;
1743  }
1744  k = k.substr(0,5);
1745  if (k == "TTYPE" || k == "TFORM")
1746  {
1747  string tmpKey = i->second.fitsString;
1748  tmpKey[0] = 'Z';
1749  outFile.setHeaderKey(tmpKey);
1750  continue;
1751  }
1752  if (k == "NAXIS")
1753  continue;
1754  outFile.setHeaderKey(i->second.fitsString);
1755 // cout << i->first << endl;
1756  }
1757 
1758  outFile.setHeaderKey(CompressedFitsFile::HeaderEntry("RAWSUM", " 0", "Checksum of raw littlen endian data"));
1759 
1760  //deal with the DRS calib
1761  int16_t* drsCalib16 = NULL;
1762 
1763  //load the drs calib. data
1764  int32_t startCellOffset = -1;
1765  if (drsFileName != "")
1766  {
1767  drsCalib16 = new int16_t[1440*1024];
1768  float* drsCalibFloat = NULL;
1769  try
1770  {
1771  drsCalibFloat = reinterpret_cast<float*>(drsFile->SetPtrAddress("BaselineMean"));
1772  }
1773  catch (...)
1774  {
1775  cout << "Could not find column BaselineMean in drs calibration file " << drsFileName << ". Aborting" << endl;
1776  return -1;
1777  }
1778 
1779  //read the calibration and calculate its integer value
1780  drsFile->GetNextRow();
1781  for (uint32_t i=0;i<1440*1024;i++)
1782  drsCalib16[i] = (int16_t)(drsCalibFloat[i]*4096.f/2000.f);
1783 
1784 
1785  //get the start cells offsets
1786  for (fits::Table::Columns::const_iterator it=columns.begin(); it!= columns.end(); it++)
1787  if (it->first == "StartCellData")
1788  {
1789  startCellOffset = it->second.offset;
1790  if (it->second.type != 'I')
1791  {
1792  cout << "Wrong type for the StartCellData Column: " << it->second.type << " instead of I expected"<< endl;
1793  return -1;
1794  }
1795  }
1796 
1797  if (startCellOffset == -1)
1798  {
1799  cout << "WARNING: Could not find StartCellData in input file " << fileNameIn << ". Doing it uncalibrated"<< endl;
1800  }
1801  else
1802  {
1803  //assign it to the ouput file
1804  outFile.setDrsCalib(drsCalib16);
1805  }
1806  }
1807 
1808  /************************************************************************************
1809  * Done configuring compression. Do the real job now !
1810  ************************************************************************************/
1811 
1812  if (displayText) cout << "Converting file..." << endl;
1813 
1814  int numSlices = -1;
1815  int32_t dataOffset = -1;
1816 
1817  //Get the pointer to the column that must be drs-calibrated
1818  for (fits::Table::Columns::const_iterator it=columns.begin(); it!= columns.end(); it++)
1819  if (it->first == "Data")
1820  {
1821  numSlices = it->second.num;
1822  dataOffset = it->second.offset;
1823  }
1824  if (numSlices % 1440 != 0)
1825  {
1826  cout << "seems like the number of samples is not a multiple of 1440. Aborting." << endl;
1827  return -1;
1828  }
1829  if (dataOffset == -1)
1830  {
1831  cout << "Could not find the column Data in the input file. Aborting." << endl;
1832  return -1;
1833  }
1834 
1835  numSlices /= 1440;
1836 
1837  //set pointers to the readout data to later be able to gather it to "buffer".
1838  vector<void*> readPointers;
1839  vector<int32_t> readOffsets;
1840  vector<int32_t> readElemSize;
1841  vector<int32_t> readNumElems;
1842  for (fits::Table::Columns::const_iterator it=columns.begin(); it!= columns.end(); it++)
1843  {
1844  readPointers.push_back(inFile.SetPtrAddress(it->first));
1845  readOffsets.push_back(it->second.offset);
1846  readElemSize.push_back(it->second.size);
1847  readNumElems.push_back(it->second.num);
1848  }
1849 
1850  Checksum rawsum;
1851  //Convert each row one after the other
1852  ostringstream wrongChannels;
1853  map<int, int> wrongChannelsMap;
1854  map<int, int> wrongChannelsLastValues;
1855  for (uint32_t i=0;i<1440;i++)
1856  {
1857  wrongChannelsMap[i] = 0;
1858  wrongChannelsLastValues[i] = 0;
1859  }
1860  for (uint32_t i=0;i<inFile.GetNumRows();i++)
1861  {
1862  if (displayText) cout << "\r Row " << i+1 << flush;
1863  if (!inFile.GetNextRow())
1864  {
1865  cout << "ERROR: file has less rows than advertized. aborting" << endl;
1866  exit(0);
1867  }
1868  //copy from inFile internal structures to buffer
1869  int32_t count=0;
1870  for (fits::Table::Columns::const_iterator it=columns.begin(); it!= columns.end();it++)
1871  {
1872  memcpy(&buffer[readOffsets[count]], readPointers[count], readElemSize[count]*readNumElems[count]);
1873  count++;
1874  }
1875 
1876  char* checkSumPointer = buffer;
1877  const int chkOffset = (i*totalRowWidth)%4;
1878  checkSumPointer -= chkOffset;
1879  uint32_t sizeToChecksum = totalRowWidth + chkOffset;
1880  if (sizeToChecksum%4 != 0)
1881  {
1882  int32_t extraBytes = 4 - (sizeToChecksum%4);
1883  memset(checkSumPointer+sizeToChecksum, 0, extraBytes);
1884  sizeToChecksum += extraBytes;
1885  }
1886  rawsum.add(checkSumPointer, sizeToChecksum, false);
1887 
1888  if (startCellOffset != -1)
1889  {//drs calibrate this data
1890  for (int j=0;j<1440;j++)
1891  {
1892  const int thisStartCell = reinterpret_cast<int16_t*>(&buffer[startCellOffset])[j];
1893  if (thisStartCell > 1023)
1894  {
1895  wrongChannelsMap[j]++;
1896  wrongChannelsLastValues[j] = thisStartCell;
1897  wrongChannels << j;
1898  }
1899  if (thisStartCell < 0) continue;
1900  for (int k=0;k<numSlices;k++)
1901  reinterpret_cast<int16_t*>(&buffer[dataOffset])[numSlices*j + k] -= drsCalib16[1024*j + (thisStartCell+k)%1024];
1902  }
1903  }
1904  outFile.writeBinaryRow(buffer);
1905  };
1906 
1907  if (wrongChannels.str() != "")
1908  {
1909  cout << "ERROR: Wrong channels: ";
1910  for (uint32_t i=0;i<1440;i++)
1911  {
1912  if (wrongChannelsMap[i] != 0)
1913  cout << i << "(" << wrongChannelsMap[i] << "|" << wrongChannelsLastValues[i] << ") ";
1914  }
1915  cout << endl;
1916  exit(-1);
1917  }
1918  ostringstream strSum;
1919  strSum << rawsum.val();
1920  outFile.setHeaderKey(CompressedFitsFile::HeaderEntry("RAWSUM", strSum.str(), "Checksum of raw littlen endian data"));
1921 
1922  //Get table name for later use in case the compressed file is to be verified
1923  string tableName = inFile.GetStr("EXTNAME");
1924 
1925  if (displayText) cout << endl << "Done. Flushing output file..." << endl;
1926 
1927  inFile.close();
1928  if (!outFile.close())
1929  {
1930  cout << "Something went wrong while writing the catalog: negative index" << endl;
1931  return false;
1932  }
1933 
1934  if (drsCalib16 != NULL)
1935  delete[] drsCalib16;
1936 
1937  if (displayText) cout << "Done." << endl;
1938 
1939  /************************************************************************************
1940  * Actual job done. Should we verify what we did ?
1941  ************************************************************************************/
1942 
1943  if (verifyDataPlease)
1944  {
1945  if (displayText) cout << "Now verify data..." << endl;
1946  }
1947  else
1948  return 0;
1949 
1950  //get a compressed reader
1951 //TEMP try to copy the file too
1952 // string copyName("/gpfs/scratch/fact/etienne/copyFile.fz");
1953 // string copyName(fileNameOut+".copy");
1954  string copyName("");
1955  factfits verifyFile(fileNameOut, copyName, tableName, false);
1956 
1957  //and the header of the compressed file
1958  const fits::Table::Keys& header2 = verifyFile.GetKeys();
1959 
1960  //get a non-compressed writer
1961  ofits reconstructedFile;
1962 
1963  //figure out its name: /dev/null unless otherwise specified
1964  string reconstructedName = fileNameOut+".recons";
1965  reconstructedName = "/dev/null";
1966  reconstructedFile.open(reconstructedName.c_str(), false);
1967 
1968  //reconstruct the original columns from the compressed file.
1969  string origChecksumStr;
1970  string origDatasum;
1971 
1972  //reset tablename value so that it is re-read from compressed table's header
1973  tableName = "";
1974 
1975  /************************************************************************************
1976  * Reconstruction setup done. Rebuild original header
1977  ************************************************************************************/
1978 
1979  //re-tranlate the keys
1980  for (fits::Table::Keys::const_iterator it=header2.begin(); it!= header2.end(); it++)
1981  {
1982  string k = it->first;
1983  if (k == "XTENSION" || k == "BITPIX" || k == "PCOUNT" || k == "GCOUNT" ||
1984  k == "TFIELDS" || k == "ZTABLE" || k == "ZNAXIS1" || k == "ZNAXIS2" ||
1985  k == "ZHEAPPTR" || k == "ZPCOUNT" || k == "ZTILELEN" || k == "THEAP" ||
1986  k == "CHECKSUM" || k == "DATASUM" || k == "FCTCPVER" || k == "ZHEAP")
1987  {
1988  continue;
1989  }
1990 
1991  if (k == "ZCHKSUM")
1992  {
1993  reconstructedFile.SetKeyComment("CHECKSUM", it->second.comment);
1994  origChecksumStr = it->second.value;
1995  continue;
1996  }
1997  if (k == "RAWSUM")
1998  {
1999  continue;
2000  }
2001 
2002  if (k == "ZDTASUM")
2003  {
2004  reconstructedFile.SetKeyComment("DATASUM", it->second.comment);
2005  origDatasum = it->second.value;
2006  continue;
2007  }
2008 
2009  if (k == "EXTNAME")
2010  {
2011  tableName = it->second.value;
2012  }
2013 
2014  k = k.substr(0,5);
2015 
2016  if (k == "TTYPE")
2017  {//we have an original column name here.
2018  //manually deal with these in order to preserve the ordering (easier than re-constructing yet another list on the fly)
2019  continue;
2020  }
2021 
2022  if (k == "TFORM" || k == "NAXIS" || k == "ZCTYP" )
2023  {
2024  continue;
2025  }
2026 
2027  if (k == "ZFORM" || k == "ZTYPE")
2028  {
2029  string tmpKey = it->second.fitsString;
2030  tmpKey[0] = 'T';
2031  reconstructedFile.SetKeyFromFitsString(tmpKey);
2032  continue;
2033  }
2034 
2035  reconstructedFile.SetKeyFromFitsString(it->second.fitsString);
2036  }
2037 
2038  if (tableName == "")
2039  {
2040  cout << "Error: table name from file " << fileNameOut << " could not be found. Aborting" << endl;
2041  return -1;
2042  }
2043 
2044  //Restore the original columns
2045  for (uint32_t numCol=1; numCol<10000; numCol++)
2046  {
2047  ostringstream str;
2048  str << numCol;
2049  if (!verifyFile.HasKey("TTYPE"+str.str())) break;
2050 
2051  string ttype = verifyFile.GetStr("TTYPE"+str.str());
2052  string tform = verifyFile.GetStr("ZFORM"+str.str());
2053  char type = tform[tform.size()-1];
2054  string number = tform.substr(0, tform.size()-1);
2055  int numElems = atoi(number.c_str());
2056 
2057  if (number == "") numElems=1;
2058 
2059  reconstructedFile.AddColumn(numElems, type, ttype, "", "", false);
2060  }
2061 
2062  reconstructedFile.WriteTableHeader(tableName.c_str());
2063 
2064  /************************************************************************************
2065  * Original header restored. Do the data
2066  ************************************************************************************/
2067 
2068  //set pointers to the readout data to later be able to gather it to "buffer".
2069  readPointers.clear();
2070  readOffsets.clear();
2071  readElemSize.clear();
2072  readNumElems.clear();
2073  for (fits::Table::Columns::const_iterator it=columns.begin(); it!= columns.end(); it++)
2074  {
2075  readPointers.push_back(verifyFile.SetPtrAddress(it->first));
2076  readOffsets.push_back(it->second.offset);
2077  readElemSize.push_back(it->second.size);
2078  readNumElems.push_back(it->second.num);
2079  }
2080 
2081  //do the actual reconstruction work
2082  uint32_t i=1;
2083  while (i<=verifyFile.GetNumRows() && verifyFile.GetNextRow())
2084  {
2085  int count=0;
2086  for (fits::Table::Columns::const_iterator it=columns.begin(); it!= columns.end();it++)
2087  {
2088  memcpy(&buffer[readOffsets[count]], readPointers[count], readElemSize[count]*readNumElems[count]);
2089  count++;
2090  }
2091  if (displayText) cout << "\r Row " << i << flush;
2092  reconstructedFile.WriteRow(buffer, rowWidth);
2093  i++;
2094  }
2095 
2096  if (displayText) cout << endl;
2097 
2098  //close reconstruction input and output
2099 // Do NOT close the verify file, otherwise data cannot be flushed to copy file
2100 // verifyFile.close();
2101  if (!verifyFile.IsFileOk())
2102  cout << "ERROR: file checksums seems wrong" << endl;
2103 
2104  if (!reconstructedFile.close())
2105  {
2106  cout << "ERROR: disk probably full..." << endl;
2107  return -1;
2108  }
2109 
2110  //get original and reconstructed checksum and datasum
2111  std::pair<string, int> origChecksum = make_pair(origChecksumStr, atoi(origDatasum.c_str()));
2112  std::pair<string, int> newChecksum = reconstructedFile.GetChecksumData();
2113 
2114  //verify that no mistake was made
2115  if (origChecksum.second != newChecksum.second)
2116  {
2117  cout << "ERROR: datasums are NOT identical: " << (uint32_t)(origChecksum.second) << " vs " << (uint32_t)(newChecksum.second) << endl;
2118  return -1;
2119  }
2120  if (origChecksum.first != newChecksum.first)
2121  {
2122  cout << "WARNING: checksums are NOT Identical: " << origChecksum.first << " vs " << newChecksum.first << endl;
2123  }
2124  else
2125  {
2126  if (true) cout << "Ok" << endl;
2127  }
2128 
2129  buffer = buffer-4;
2130  delete[] buffer;
2131  return 0;
2132 }
2133 
static string _fitsHeader
the default header to be written in every fits file
uint32_t compressSMOOTHMAN(char *dest, char *src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
const string & comment() const
string _comment
the comment associated to the header entry
std::vector< Column > SortedColumns
Definition: fits.h:114
string _value
the value of the header entry
bool HasKey(const std::string &key) const
Definition: fits.h:1002
std::string GetStr(const std::string &key) const
Definition: fits.h:1011
void copyTransposeTile(uint32_t index)
Copy and transpose (or semi-transpose) one tile of data.
void * SetPtrAddress(const std::string &name)
Definition: fits.h:886
static const uint32_t _THREAD_DECOMPRESS_
Thread working, decompressing.
bool IsCompressedFITS() const
Definition: fits.h:1029
std::map< std::string, Column > Columns
Definition: fits.h:113
int32_t _checkOffset
offset to the data pointer to calculate the checksum
void printHelp()
static HeaderEntry _dummyHeaderEntry
Dummy entry for returning if requested on is not found.
HeaderEntry(const string &line)
string _name
name of the column
HeaderEntry(const string &k, const T &val, const string &comm)
CatalogType _catalog
Catalog, i.e. the main table that points to the compressed data.
uint64_t GetUInt(const std::string &key) const
Definition: fits.h:1009
std::pair< std::string, int > GetChecksumData()
Definition: ofits.h:1012
int i
Definition: db_dim_client.c:21
CompressedFitsWriter(uint32_t numTiles=100, uint32_t numRowsPerTile=100)
Default constructor. 100 tiles of 100 rows each are assigned by default.
int64_t second
offset of this column in the tile, from the start of the heap area
Definition: zofits.h:27
char str[80]
Definition: test_client.c:7
HeaderEntry(const vector< char > &lineVec)
void SetPrintUsage(const std::function< void(void)> &func)
T Get(const std::string &var)
bool SetKeyComment(const std::string &key, const std::string &comment)
Definition: ofits.h:430
void close()
Definition: izstream.h:91
static void * threadFunction(void *context)
the main function compressing the data
uint32_t _totalNumRows
Total number of raws.
uint32_t _numThreads
The number of threads that will be used to compress.
uint32_t _numTiles
Number of tiles (i.e. groups of rows)
static const uint32_t _THREAD_WRITE_
Thread writing data to disk.
po::typed_value< bool > * po_switch()
STL namespace.
vector< uint32_t > _threadNumRows
Total number of rows for thread to compress.
bool changeHeaderKey(const string &origName, const string &newName)
vector< uint32_t > _threadStatus
Flag telling whether the buffer to be transposed (and compressed) is full or empty.
uint32_t compressUNCOMPRESSED(char *dest, const char *src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
Specific compression functions.
size_t GetNumRows() const
Definition: zfits.h:57
virtual bool AddColumn(uint32_t cnt, char typechar, const std::string &name, const std::string &unit, const std::string &comment="", bool addHeaderKeys=true)
Definition: ofits.h:596
string _description
a description for the column. It will be placed in the header
std::vector< T > Vec(const std::string &var)
void addHeaderChecksum(Checksum &checksum)
add the header checksum to the datasum
void writeCatalog(bool closingFile=false)
write the compressed data catalog. If closingFile is set to true, checksum is calculated ...
void SetArgumentPositions(const po::positional_options_description &desc)
int64_t first
Size of this column in the tile.
Definition: zofits.h:26
uint32_t val() const
Definition: checksum.h:20
BlockHeader(uint64_t s=0, char o=kOrderByRow, unsigned char n=1)
Definition: FITS.h:76
int main(int argc, const char **argv)
void setComment(const string &comm)
bool close()
close the opened file
uint64_t _heapPtr
the address in the file of the heap area
bool Encode(std::string &bufout, const uint16_t *bufin, size_t bufinlen)
Definition: huffman.h:385
bool GetNextRow(bool check=true)
Definition: fits.h:851
ColumnEntry(const string &n, char t, int numOf, BlockHeader &head, vector< uint16_t > &seq)
const vector< uint16_t > & getCompressionSequence() const
int16_t * _drsCalibData
array of the Drs baseline mean
static string _emptyBlock
an empty block to be apened at the end of a file so that its length is a multiple of 2880 ...
const Table::Keys & GetKeys() const
Definition: fits.h:1006
bool Has(const std::string &var)
pair< int64_t, int64_t > CatalogEntry
int32_t _threadLooper
Which thread will deal with the upcoming bunch of data ?
virtual void open(const char *filename, bool addEXTNAMEKey=true)
Definition: ofits.h:390
vector< char * > _transposedBuffer
Memory buffer to store rows while they are transposed.
char _type
the type of the column, as specified by the fits documentation
static const uint32_t _THREAD_READ_
Thread reading data from disk.
CompressedFitsFile(uint32_t numTiles=100, uint32_t numRowsPerTile=100)
default constructor. Assigns a default number of rows and tiles
void AddOptions(const po::options_description &opt, bool visible=true)
Definition: Configuration.h:92
static const uint32_t _THREAD_WAIT_
Thread doing nothing.
Definition: ofits.h:29
int type
std::map< std::string, Entry > Keys
Definition: fits.h:112
const char & getColumnOrdering() const
int _typeSize
the number of bytes taken by one element
fstream _file
The actual file streamer for accessing disk data.
uint32_t _rowWidth
Total number of bytes in one row.
virtual bool close()
Definition: ofits.h:984
bool writeBinaryRow(const char *bufferToWrite)
write one row of data, already placed in bufferToWrite. Does the byte-swapping
vector< HeaderEntry > _header
Header keys.
const string & key() const
std::string str(bool complm=true) const
Definition: checksum.h:148
char * _buffer
Memory buffer to store rows while they are not compressed.
int _offset
the offset of the column, in bytes, from the beginning of one row
virtual ~CompressedFitsFile()
default destructor
uint64_t compressBuffer(uint32_t threadIndex)
compresses one buffer of data, the one given by threadIndex
double end
bool _headerFlushed
Flag telling whether the header record is synchronized with the data on disk.
Commandline parsing, resource file parsing and database access.
Definition: Configuration.h:9
static const uint32_t _THREAD_EXIT_
Thread exiting.
virtual bool IsFileOk() const
Definition: zfits.h:48
void writeHeader(bool closingFile=false)
write the header. If closingFile is set to true, checksum is calculated
int buffer[BUFFSIZE]
Definition: db_dim_client.c:14
vector< ColumnEntry > _columns
Columns in the file.
int count
Definition: db_dim_server.c:18
const string & fitsString() const
uint32_t compressHUFFMAN(char *dest, const char *src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
int size
Definition: db_dim_server.c:17
float data[4 *1440]
pthread_mutex_t _mutex
mutex for compressing threads
virtual ~CompressedFitsWriter()
default destructor
void writeDrsCalib()
Write the drs calibration to disk, if any.
uint32_t applySMOOTHING(char *dest, char *src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
bool writeCompressedDataToDisk(uint32_t threadID, uint32_t sizeToWrite)
writes an already compressed buffer to disk
const Table::SortedColumns & GetSortedColumns() const
Definition: fits.h:1005
bool reallocateBuffers()
protected function to allocate the intermediate buffers
Checksum _checksum
Checksum for asserting the consistency of the data.
TT t
Definition: test_client.c:26
string _key
the key (name) of the header entry
string _fitsString
the string that will be written to the fits file
const Table::Columns & GetColumns() const
Definition: fits.h:1004
std::string Trim(const std::string &str)
Definition: tools.cc:68
vector< HeaderEntry > & getHeaderEntries()
get the header of the file
uint32_t numRows
Definition: FITS.h:72
void printUsage()
vector< HeaderEntry > _defaultHeader
FIXME this was a bad idea. Move everything to the regular header.
void setupConfiguration(Configuration &conf)
const string & name() const
virtual bool WriteTableHeader(const char *name="DATA")
Definition: ofits.h:844
bool addColumn(const ColumnEntry &column)
add one column to the file
bool add(const char *buf, size_t len, bool big_endian=true)
Definition: checksum.h:49
vector< CatalogRow > CatalogType
vector< CatalogEntry > CatalogRow
bool setNumWorkingThreads(uint32_t num)
set the number of worker threads compressing the data.
bool DoParse(int argc, const char **argv, const std::function< void()> &func=std::function< void()>())
uint32_t _numRowsPerTile
Number of rows per tile.
bool setHeaderKey(const HeaderEntry &)
sets a given header key
const string & value() const
uint32_t _threadIndex
A variable to assign threads indices.
virtual bool WriteRow(const void *ptr, size_t cnt, bool byte_swap=true)
Definition: ofits.h:884
static const uint32_t _THREAD_COMPRESS_
Thread working, compressing.
TileHeader()
Definition: FITS.h:75
bool open(const string &fileName, const string &tableName="Data")
open a new fits file
vector< pthread_t > _thread
The thread handler of the compressor.
int _num
number of elements contained in one row of this column
vector< char * > _compressedBuffer
Memory buffer to store rows while they are compressed.
string Trim(const string &str, char c=' ')
void setDrsCalib(int16_t *data)
assign a given (already loaded) drs calibration