FACT++  1.0
zofits.h
Go to the documentation of this file.
1 #ifndef FACT_zofits
2 #define FACT_zofits
3 
4 /*
5  * zofits.h
6  *
7  * FACT native compressed FITS writer
8  * Author: lyard
9  */
10 #include "ofits.h"
11 #include "huffman.h"
12 #include "Queue.h"
13 #include "MemoryManager.h"
14 
15 #ifdef HAVE_BOOST_THREAD
16 #include <boost/thread.hpp>
17 #else
18 #include <unistd.h>
19 #endif
20 
21 #if defined(__CINT__) || !defined(__EXCEPTIONS)
22 namespace std
23 {
24  typedef void* exception_ptr;
25 };
26 #endif
27 
28 class zofits : public ofits
29 {
30 #ifdef __MARS__ // Needed by CINT to access the structures
31 public:
32 #endif
33  template<class S>
35  struct QueueMin : std::list<S>
36  {
37  typename std::list<S>::iterator begin()
38  {
39  return min_element(std::list<S>::begin(), std::list<S>::end());
40  }
41  };
42 
43 #ifdef __CINT__
44  // CINT doesn't like the packed attribute...
45  // Therefore we give another hint of the size of the structure
46  struct CatalogEntry { char dummy[16]; };
47 #else
48  //catalog types
49  struct CatalogEntry
50  {
51  CatalogEntry(int64_t f=0, int64_t s=0) : first(f), second(s) { }
52  int64_t first;
53  int64_t second;
54  } __attribute__((__packed__));
55 #endif
56 
57  typedef std::vector<CatalogEntry> CatalogRow;
58  typedef std::list<CatalogRow> CatalogType;
59 
61  struct WriteTarget
62  {
63  bool operator < (const WriteTarget& other) const
64  {
65  return tile_num < other.tile_num;
66  }
67 
69  WriteTarget(const WriteTarget &t, uint32_t sz) : tile_num(t.tile_num), size(sz), data(t.data) { }
70 
71  uint32_t tile_num;
72  uint32_t size;
73  std::shared_ptr<char> data;
74  };
75 
76 
79  {
80  CompressionTarget(CatalogRow& r) : catalog_entry(r)
81  {}
82 
83  CatalogRow& catalog_entry;
84  std::shared_ptr<char> src;
85  std::shared_ptr<char> transposed_src;
87  uint32_t num_rows;
88  };
89 
90 public:
92  static uint32_t DefaultNumThreads(const uint32_t &_n=-2) { static uint32_t n=0; if (int32_t(_n)>-2) n=_n; return n; }
93  static uint32_t DefaultMaxMemory(const uint32_t &_n=0) { static uint32_t n=1000000; if (_n>0) n=_n; return n; }
94  static uint32_t DefaultMaxNumTiles(const uint32_t &_n=0) { static uint32_t n=1000; if (_n>0) n=_n; return n; }
95  static uint32_t DefaultNumRowsPerTile(const uint32_t &_n=0) { static uint32_t n=100; if (_n>0) n=_n; return n; }
96 
101  zofits(uint32_t numTiles = DefaultMaxNumTiles(),
102  uint32_t rowPerTile = DefaultNumRowsPerTile(),
103  uint32_t maxUsableMem= DefaultMaxMemory()) : ofits(),
104  fMemPool(0, size_t(maxUsableMem)*1000),
105  fWriteToDiskQueue(std::bind(&zofits::WriteBufferToDisk, this, std::placeholders::_1), false)
106  {
107  InitMemberVariables(numTiles, rowPerTile, size_t(maxUsableMem)*1000);
108  SetNumThreads(DefaultNumThreads());
109  }
110 
115  zofits(const char* fname,
116  uint32_t numTiles = DefaultMaxNumTiles(),
117  uint32_t rowPerTile = DefaultNumRowsPerTile(),
118  uint32_t maxUsableMem= DefaultMaxMemory()) : ofits(),
119  fMemPool(0, size_t(maxUsableMem)*1000),
120  fWriteToDiskQueue(std::bind(&zofits::WriteBufferToDisk, this, std::placeholders::_1), false)
121  {
122  InitMemberVariables(numTiles, rowPerTile, size_t(maxUsableMem)*1000);
123  SetNumThreads(DefaultNumThreads());
124  open(fname);
125  }
126 
127  zofits(const std::string &fname,
128  uint32_t numTiles = DefaultMaxNumTiles(),
129  uint32_t rowPerTile = DefaultNumRowsPerTile(),
130  uint32_t maxUsableMem= DefaultMaxMemory()) : ofits(),
131  fMemPool(0, size_t(maxUsableMem)*1000),
132  fWriteToDiskQueue(std::bind(&zofits::WriteBufferToDisk, this, std::placeholders::_1), false)
133  {
134  InitMemberVariables(numTiles, rowPerTile, size_t(maxUsableMem)*1000);
135  SetNumThreads(DefaultNumThreads());
136  open(fname);
137  }
138 
139  //initialization of member variables
143  void InitMemberVariables(const uint32_t nt=0, const uint32_t rpt=0, const uint64_t maxUsableMem=0)
144  {
145  fCheckOffset = 0;
146  fNumQueues = 0;
147 
148  fNumTiles = nt==0 ? 1 : nt;
149  fNumRowsPerTile = rpt;
150 
151  fRealRowWidth = 0;
152  fCatalogOffset = 0;
153  fCatalogSize = 0;
154 
155  fMaxUsableMem = maxUsableMem;
156 #ifdef __EXCEPTIONS
157  fThreadsException = std::exception_ptr();
158 #endif
159  fErrno = 0;
160  }
161 
165  virtual bool WriteTableHeader(const char* name="DATA")
166  {
167  reallocateBuffers();
168 
169  SetInt("ZNAXIS1", fRealRowWidth);
170 
172 
173  fCompressionQueues.front().setPromptExecution(fNumQueues==0);
174  fWriteToDiskQueue.setPromptExecution(fNumQueues==0);
175 
176  if (fNumQueues != 0)
177  {
178  //start the compression queues
179  for (auto it=fCompressionQueues.begin(); it!= fCompressionQueues.end(); it++)
180  it->start();
181 
182  //start the disk writer
183  fWriteToDiskQueue.start();
184  }
185 
186  //mark that no tile has been written so far
187  fLatestWrittenTile = -1;
188 
189  //no wiring error (in the writing of the data) has occured so far
190  fErrno = 0;
191 
192  return good();
193  }
194 
198  void open(const char* filename, bool addEXTNAMEKey=true)
199  {
200  ofits::open(filename, addEXTNAMEKey);
201 
202  //add compression-related header entries
203  SetBool( "ZTABLE", true, "Table is compressed");
204  SetInt( "ZNAXIS1", 0, "Width of uncompressed rows");
205  SetInt( "ZNAXIS2", 0, "Number of uncompressed rows");
206  SetInt( "ZPCOUNT", 0, "");
207  SetInt( "ZHEAPPTR", 0, "");
208  SetInt( "ZTILELEN", fNumRowsPerTile, "Number of rows per tile");
209  SetInt( "THEAP", 0, "");
210  SetStr( "RAWSUM", " 0", "Checksum of raw little endian data");
211  SetFloat("ZRATIO", 0, "Compression ratio");
212  SetInt( "ZSHRINK", 1, "Catalog shrink factor");
213 
214  fCatalogSize = 0;
215  fRealRowWidth = 0;
216  fCatalogOffset = 0;
217  fCatalogSize = 0;
218  fCheckOffset = 0;
219 
220  fRealColumns.clear();
221  fCatalog.clear();
222  fCatalogSum.reset();
223  fRawSum.reset();
224  }
225 
226  void open(const std::string &filename, bool addEXTNAMEKey=true)
227  {
228  open(filename.c_str(), addEXTNAMEKey);
229  }
230 
233  virtual bool WriteDrsOffsetsTable()
234  {
235  return good();
236  }
237 
240  uint32_t GetBytesPerRow() const
241  {
242  return fRealRowWidth;
243  }
244 
248  {
249  const uint32_t one_catalog_row_size = fTable.num_cols*2*sizeof(uint64_t);
250  const uint32_t total_catalog_size = fNumTiles*one_catalog_row_size;
251 
252  // swap the catalog bytes before writing
253  std::vector<char> swapped_catalog(total_catalog_size);
254 
255  uint32_t shift = 0;
256  for (auto it=fCatalog.cbegin(); it!=fCatalog.cend(); it++)
257  {
258  revcpy<sizeof(uint64_t)>(swapped_catalog.data() + shift, (char*)(it->data()), fTable.num_cols*2);
259  shift += one_catalog_row_size;
260  }
261 
262  if (fCatalogSize < fNumTiles)
263  memset(swapped_catalog.data()+shift, 0, total_catalog_size-shift);
264 
265  // first time writing ? remember where we are
266  if (fCatalogOffset == 0)
267  fCatalogOffset = tellp();
268 
269  // remember where we came from
270  const off_t where_are_we = tellp();
271 
272  // write to disk
273  seekp(fCatalogOffset);
274  write(swapped_catalog.data(), total_catalog_size);
275 
276  if (where_are_we != fCatalogOffset)
277  seekp(where_are_we);
278 
279  // udpate checksum
280  fCatalogSum.reset();
281  fCatalogSum.add(swapped_catalog.data(), total_catalog_size);
282 
283  return good();
284  }
285 
287  virtual void DrsOffsetCalibrate(char* )
288  {
289 
290  }
291 
292  CatalogRow& AddOneCatalogRow()
293  {
294  // add one row to the catalog
295  fCatalog.emplace_back(CatalogRow());
296  fCatalog.back().resize(fTable.num_cols);
297  for (auto it=fCatalog.back().begin(); it != fCatalog.back().end(); it++)
298  *it = CatalogEntry(0,0);
299 
300  fCatalogSize++;
301 
302  return fCatalog.back();
303  }
304 
312  bool WriteRow(const void* ptr, size_t cnt, bool = true)
313  {
314  if (cnt != fRealRowWidth)
315  {
316 #ifdef __EXCEPTIONS
317  throw std::runtime_error("Wrong size of row given to WriteRow");
318 #else
319  gLog << ___err___ << "ERROR - Wrong size of row given to WriteRow" << std::endl;
320  return false;
321 #endif
322  }
323 
324 #ifdef __EXCEPTIONS
325  //check if something hapenned while the compression threads were working
326  //if so, re-throw the exception that was generated
327  if (fThreadsException != std::exception_ptr())
328  std::rethrow_exception(fThreadsException);
329 #endif
330 
331  //copy current row to pool or rows waiting for compression
332  char* target_location = fSmartBuffer.get() + fRealRowWidth*(fTable.num_rows%fNumRowsPerTile);
333  memcpy(target_location, ptr, fRealRowWidth);
334 
335  //for now, make an extra copy of the data, for RAWSUM checksuming.
336  //Ideally this should be moved to the threads
337  //However, because the RAWSUM must be calculated before the tile is transposed, I am not sure whether
338  //one extra memcpy per row written is worse than 100 rows checksumed when the tile is full....
339  const uint32_t rawOffset = (fTable.num_rows*fRealRowWidth)%4;
340  char* buffer = fRawSumBuffer.data() + rawOffset;
341  auto ib = fRawSumBuffer.begin();
342  auto ie = fRawSumBuffer.rbegin();
343  *ib++ = 0;
344  *ib++ = 0;
345  *ib++ = 0;
346  *ib = 0;
347 
348  *ie++ = 0;
349  *ie++ = 0;
350  *ie++ = 0;
351  *ie = 0;
352 
353  memcpy(buffer, ptr, fRealRowWidth);
354 
355  fRawSum.add(fRawSumBuffer, false);
356 
357  fTable.num_rows++;
358 
359  if (fTable.num_rows % fNumRowsPerTile != 0)
360  {
361  errno = fErrno;
362  return errno==0;
363  }
364 
365  // use the least occupied queue
366  const auto imin = std::min_element(fCompressionQueues.begin(), fCompressionQueues.end());
367 
368  if (!imin->emplace(InitNextCompression()))
369  {
370 #ifdef __EXCEPTIONS
371  throw std::runtime_error("The compression queues are not started. Did you close the file before writing this row?");
372 #else
373  gLog << ___err___ << "The compression queues are not started. Did you close the file before writing this row?" << std::endl;
374  errno = 0;
375  return false;
376 #endif
377  }
378 
379  errno = fErrno;
380  return errno==0;
381  }
382 
385  {
386  SetInt("NAXIS2", (fTable.num_rows + fNumRowsPerTile-1)/fNumRowsPerTile);
387  SetInt("ZNAXIS2", fTable.num_rows);
388  FlushHeader();
389  }
390 
394  {
395  CompressionTarget target(AddOneCatalogRow());
396 
397  //fill up compression target
398  target.src = fSmartBuffer;
399  target.transposed_src = fMemPool.malloc();
400  target.num_rows = fTable.num_rows;
401 
402  //fill up write to disk target
403  WriteTarget &write_target = target.target;
404  write_target.tile_num = (fTable.num_rows-1)/fNumRowsPerTile;
405  write_target.size = 0;
406  write_target.data = fMemPool.malloc();
407 
408  //get a new buffer to host the incoming data
409  fSmartBuffer = fMemPool.malloc();
410 
411  return target;
412  }
413 
415  uint32_t ShrinkCatalog()
416  {
417  //add empty row to get either the target number of rows, or a multiple of the allowed size
418  for (uint32_t i=0;i<fCatalogSize%fNumTiles;i++)
419  AddOneCatalogRow();
420 
421  //did we write more rows than what the catalog could host ?
422  if (fCatalogSize <= fNumTiles) // nothing to do
423  return 1;
424 
425  //always exact as extra rows were added just above
426  const uint32_t shrink_factor = fCatalogSize / fNumTiles;
427 
428  //shrink the catalog !
429  uint32_t entry_id = 1;
430  auto it = fCatalog.begin();
431  it++;
432  for (; it != fCatalog.end(); it++)
433  {
434  if (entry_id >= fNumTiles)
435  break;
436 
437  const uint32_t target_id = entry_id*shrink_factor;
438 
439  auto jt = it;
440  for (uint32_t i=0; i<target_id-entry_id; i++)
441  jt++;
442 
443  *it = *jt;
444 
445  entry_id++;
446  }
447 
448  const uint32_t num_tiles_to_remove = fCatalogSize-fNumTiles;
449 
450  //remove the too many entries
451  for (uint32_t i=0;i<num_tiles_to_remove;i++)
452  {
453  fCatalog.pop_back();
454  fCatalogSize--;
455  }
456 
457  //update header keywords
458  fNumRowsPerTile *= shrink_factor;
459 
460  SetInt("ZTILELEN", fNumRowsPerTile);
461  SetInt("ZSHRINK", shrink_factor);
462 
463  return shrink_factor;
464  }
465 
468  bool close()
469  {
470  // stop compression and write threads
471  for (auto it=fCompressionQueues.begin(); it != fCompressionQueues.end(); it++)
472  it->wait();
473 
474  fWriteToDiskQueue.wait();
475 
476  if (tellp() < 0)
477  return false;
478 
479 #ifdef __EXCEPTIONS
480  //check if something hapenned while the compression threads were working
481  //if so, re-throw the exception that was generated
482  if (fThreadsException != std::exception_ptr())
483  std::rethrow_exception(fThreadsException);
484 #endif
485 
486  //write the last tile of data (if any)
487  if (fErrno==0 && fTable.num_rows%fNumRowsPerTile!=0)
488  {
489  fWriteToDiskQueue.enablePromptExecution();
490  fCompressionQueues.front().enablePromptExecution();
491  fCompressionQueues.front().emplace(InitNextCompression());
492  }
493 
494  AlignTo2880Bytes();
495 
496  int64_t heap_size = 0;
497  int64_t compressed_offset = 0;
498  for (auto it=fCatalog.begin(); it!= fCatalog.end(); it++)
499  {
500  compressed_offset += sizeof(FITS::TileHeader);
501  heap_size += sizeof(FITS::TileHeader);
502  for (uint32_t j=0; j<it->size(); j++)
503  {
504  heap_size += (*it)[j].first;
505  (*it)[j].second = compressed_offset;
506  compressed_offset += (*it)[j].first;
507  if ((*it)[j].first == 0)
508  (*it)[j].second = 0;
509  }
510  }
511 
512  const uint32_t shrink_factor = ShrinkCatalog();
513 
514  //update header keywords
515  SetInt("ZNAXIS1", fRealRowWidth);
516  SetInt("ZNAXIS2", fTable.num_rows);
517 
518  SetInt("ZHEAPPTR", fCatalogSize*fTable.num_cols*sizeof(uint64_t)*2);
519 
520  const uint32_t total_num_tiles_written = (fTable.num_rows + fNumRowsPerTile-1)/fNumRowsPerTile;
521  const uint32_t total_catalog_width = 2*sizeof(int64_t)*fTable.num_cols;
522 
523  SetInt("THEAP", total_num_tiles_written*total_catalog_width);
524  SetInt("NAXIS1", total_catalog_width);
525  SetInt("NAXIS2", total_num_tiles_written);
526  SetStr("RAWSUM", std::to_string((long long int)(fRawSum.val())));
527 
528  const float compression_ratio = (float)(fRealRowWidth*fTable.num_rows)/(float)heap_size;
529  SetFloat("ZRATIO", compression_ratio);
530 
531  //add to the heap size the size of the gap between the catalog and the actual heap
532  heap_size += (fCatalogSize - total_num_tiles_written)*fTable.num_cols*sizeof(uint64_t)*2;
533 
534  SetInt("PCOUNT", heap_size, "size of special data area");
535 
536  //Just for updating the fCatalogSum value
537  WriteCatalog();
538 
539  fDataSum += fCatalogSum;
540 
541  const Checksum checksm = UpdateHeaderChecksum();
542 
543  if (!fFilebuf.close())
544  setstate(ios_base::failbit);
545 
546  fSmartBuffer = std::shared_ptr<char>();
547 
548  //restore the number of rows per tile in case the catalog has been shrinked
549  if (shrink_factor != 1)
550  fNumRowsPerTile /= shrink_factor;
551 
552  if ((checksm+fDataSum).valid())
553  return true;
554 
555  std::ostringstream sout;
556  sout << "Checksum (" << std::hex << checksm.val() << ") invalid.";
557 #ifdef __EXCEPTIONS
558  throw std::runtime_error(sout.str());
559 #else
560  gLog << ___err___ << "ERROR - " << sout.str() << std::endl;
561  return false;
562 #endif
563  }
564 
566  bool AddColumn(uint32_t cnt, char typechar, const std::string& name, const std::string& unit,
567  const std::string& comment="", bool addHeaderKeys=true)
568  {
569  return AddColumn(FITS::kFactRaw, cnt, typechar, name, unit, comment, addHeaderKeys);
570  }
571 
573  bool AddColumn(const FITS::Compression &comp, uint32_t cnt, char typechar, const std::string& name,
574  const std::string& unit, const std::string& comment="", bool addHeaderKeys=true)
575  {
576  if (!ofits::AddColumn(1, 'Q', name, unit, comment, addHeaderKeys))
577  return false;
578 
579  const size_t size = FITS::SizeFromType(typechar);
580 
581  Table::Column col;
582  col.name = name;
583  col.type = typechar;
584  col.num = cnt;
585  col.size = size;
586  col.offset = fRealRowWidth;
587 
588  fRealRowWidth += size*cnt;
589 
590  fRealColumns.emplace_back(col, comp);
591 
592  SetStr("ZFORM"+std::to_string((long long int)(fRealColumns.size())), std::to_string((long long int)(cnt))+typechar, "format of "+name+" "+FITS::CommentFromType(typechar));
593  SetStr("ZCTYP"+std::to_string((long long int)(fRealColumns.size())), "FACT", "Compression type: FACT");
594 
595  return true;
596  }
597 
599  int32_t GetNumThreads() const { return fNumQueues; }
600  bool SetNumThreads(uint32_t num)
601  {
602  if (tellp()>0)
603  {
604 #ifdef __EXCEPTIONS
605  throw std::runtime_error("Number of threads cannot be changed in the middle of writing a file");
606 #else
607  gLog << ___err___ << "ERROR - Number of threads cannot be changed in the middle of writing a file" << std::endl;
608 #endif
609  return false;
610  }
611 
612  //get number of physically available threads
613 #ifdef HAVE_BOOST_THREAD
614  unsigned int num_available_cores = boost::thread::hardware_concurrency();
615 #else
616  unsigned int num_available_cores = std::thread::hardware_concurrency();
617  if (num_available_cores == 0)
618  num_available_cores = sysconf(_SC_NPROCESSORS_ONLN);
619 #endif
620  // could not detect number of available cores from system properties...
621  if (num_available_cores == 0)
622  num_available_cores = 1;
623 
624  // leave one core for the main thread and one for the writing
625  if (num > num_available_cores)
626  num = num_available_cores>2 ? num_available_cores-2 : 1;
627 
628  fCompressionQueues.resize(num<1?1:num, Queue<CompressionTarget>(std::bind(&zofits::CompressBuffer, this, std::placeholders::_1), false));
629  fNumQueues = num;
630 
631  return true;
632  }
633 
634  uint32_t GetNumTiles() const { return fNumTiles; }
635  void SetNumTiles(uint32_t num) { fNumTiles=num; }
636 
637 protected:
638 
641  {
642  size_t total_block_head_size = 0;
643  for (auto it=fRealColumns.begin(); it!=fRealColumns.end(); it++)
644  total_block_head_size += it->block_head.getSizeOnDisk();
645 
646  const size_t chunk_size = fRealRowWidth*fNumRowsPerTile + total_block_head_size + sizeof(FITS::TileHeader) + 8; //+8 for checksuming;
647  fMemPool.setChunkSize(chunk_size);
648  fSmartBuffer = fMemPool.malloc();
649  fRawSumBuffer.resize(fRealRowWidth + 4-fRealRowWidth%4); //for checksuming
650  }
651 
656  bool writeCompressedDataToDisk(char* src, const uint32_t sizeToWrite)
657  {
658  char* checkSumPointer = src+4;
659  int32_t extraBytes = 0;
660  uint32_t sizeToChecksum = sizeToWrite;
661 
662  //should we extend the array to the left ?
663  if (fCheckOffset != 0)
664  {
665  sizeToChecksum += fCheckOffset;
666  checkSumPointer -= fCheckOffset;
667  memset(checkSumPointer, 0, fCheckOffset);
668  }
669 
670  //should we extend the array to the right ?
671  if (sizeToChecksum%4 != 0)
672  {
673  extraBytes = 4 - (sizeToChecksum%4);
674  memset(checkSumPointer+sizeToChecksum, 0, extraBytes);
675  sizeToChecksum += extraBytes;
676  }
677 
678  //do the checksum
679  fDataSum.add(checkSumPointer, sizeToChecksum);
680 
681  fCheckOffset = (4 - extraBytes)%4;
682 
683  //write data to disk
684  write(src+4, sizeToWrite);
685 
686  return good();
687  }
688 
692  bool CompressBuffer(const CompressionTarget& target)
693  {
694  //Can't get this to work in the thread. Printed the adresses, and they seem to be correct.
695  //Really do not understand what's wrong...
696  //calibrate data if required
697  const uint32_t thisRoundNumRows = (target.num_rows%fNumRowsPerTile) ? target.num_rows%fNumRowsPerTile : fNumRowsPerTile;
698  for (uint32_t i=0;i<thisRoundNumRows;i++)
699  {
700  char* target_location = target.src.get() + fRealRowWidth*i;
701  DrsOffsetCalibrate(target_location);
702  }
703 #ifdef __EXCEPTIONS
704  try
705  {
706 #endif
707  //transpose the original data
708  copyTransposeTile(target.src.get(), target.transposed_src.get(), target.num_rows);
709 
710  //compress the buffer
711  const uint64_t compressed_size = compressBuffer(target.target.data.get(), target.transposed_src.get(), target.num_rows, target.catalog_entry);
712 
713  //post the result to the writing queue
714  //get a copy so that it becomes non-const
715  fWriteToDiskQueue.emplace(target.target, compressed_size);
716 
717 #ifdef __EXCEPTIONS
718  }
719  catch (...)
720  {
721  fThreadsException = std::current_exception();
722  if (fNumQueues == 0)
723  std::rethrow_exception(fThreadsException);
724  }
725 #endif
726 
727  return true;
728  }
729 
732  bool WriteBufferToDisk(const WriteTarget& target)
733  {
734  //is this the tile we're supposed to write ?
735  if (target.tile_num != (uint32_t)(fLatestWrittenTile+1))
736  return false;
737 
738  fLatestWrittenTile++;
739 
740 #ifdef __EXCEPTIONS
741  try
742  {
743 #endif
744  //could not write the data to disk
745  if (!writeCompressedDataToDisk(target.data.get(), target.size))
746  fErrno = errno;
747 #ifdef __EXCEPTIONS
748  }
749  catch (...)
750  {
751  fThreadsException = std::current_exception();
752  if (fNumQueues == 0)
753  std::rethrow_exception(fThreadsException);
754  }
755 #endif
756  return true;
757  }
758 
760  //src cannot be const, as applySMOOTHING is done in place
765  uint64_t compressBuffer(char* dest, char* src, uint32_t num_rows, CatalogRow& catalog_row)
766  {
767  const uint32_t thisRoundNumRows = (num_rows%fNumRowsPerTile) ? num_rows%fNumRowsPerTile : fNumRowsPerTile;
768  uint32_t offset = 0;
769 
770  //skip the checksum reserved area
771  dest += 4;
772 
773  //skip the 'TILE' marker and tile size entry
774  uint64_t compressedOffset = sizeof(FITS::TileHeader);
775 
776  //now compress each column one by one by calling compression on arrays
777  for (uint32_t i=0;i<fRealColumns.size();i++)
778  {
779  catalog_row[i].second = compressedOffset;
780 
781  if (fRealColumns[i].col.num == 0)
782  continue;
783 
784  FITS::Compression& head = fRealColumns[i].block_head;
785 
786  //set the default byte telling if uncompressed the compressed Flag
787  const uint64_t previousOffset = compressedOffset;
788 
789  //skip header data
790  compressedOffset += head.getSizeOnDisk();
791 
792  for (uint32_t j=0;j<head.getNumProcs();j++)//sequence.size(); j++)
793  {
794  switch (head.getProc(j))
795  {
796  case FITS::kFactRaw:
797  compressedOffset += compressUNCOMPRESSED(dest + compressedOffset, src + offset, thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num);
798  break;
799 
801  applySMOOTHING(src + offset, thisRoundNumRows*fRealColumns[i].col.num);
802  break;
803 
805  if (head.getOrdering() == FITS::kOrderByCol)
806  compressedOffset += compressHUFFMAN16(dest + compressedOffset, src + offset, thisRoundNumRows, fRealColumns[i].col.size, fRealColumns[i].col.num);
807  else
808  compressedOffset += compressHUFFMAN16(dest + compressedOffset, src + offset, fRealColumns[i].col.num, fRealColumns[i].col.size, thisRoundNumRows);
809  break;
810  }
811  }
812 
813  //check if compressed size is larger than uncompressed
814  //if so set flag and redo it uncompressed
815  if ((head.getProc(0) != FITS::kFactRaw) && (compressedOffset - previousOffset > fRealColumns[i].col.size*fRealColumns[i].col.num*thisRoundNumRows+head.getSizeOnDisk()))// && two)
816  {
817  //de-smooth !
818  if (head.getProc(0) == FITS::kFactSmoothing)
819  UnApplySMOOTHING(src+offset, fRealColumns[i].col.num*thisRoundNumRows);
820 
822 
823  compressedOffset = previousOffset + he.getSizeOnDisk();
824  compressedOffset += compressUNCOMPRESSED(dest + compressedOffset, src + offset, thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num);
825 
826  he.SetBlockSize(compressedOffset - previousOffset);
827  he.Memcpy(dest+previousOffset);
828 
829  offset += thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num;
830 
831  catalog_row[i].first = compressedOffset - catalog_row[i].second;
832  continue;
833  }
834 
835  head.SetBlockSize(compressedOffset - previousOffset);
836  head.Memcpy(dest + previousOffset);
837 
838  offset += thisRoundNumRows*fRealColumns[i].col.size*fRealColumns[i].col.num;
839  catalog_row[i].first = compressedOffset - catalog_row[i].second;
840  }
841 
842  const FITS::TileHeader tile_head(thisRoundNumRows, compressedOffset);
843  memcpy(dest, &tile_head, sizeof(FITS::TileHeader));
844 
845  return compressedOffset;
846  }
847 
851  void copyTransposeTile(const char* src, char* dest, uint32_t num_rows)
852  {
853  const uint32_t thisRoundNumRows = (num_rows%fNumRowsPerTile) ? num_rows%fNumRowsPerTile : fNumRowsPerTile;
854 
855  //copy the tile and transpose it
856  for (uint32_t i=0;i<fRealColumns.size();i++)
857  {
858  switch (fRealColumns[i].block_head.getOrdering())
859  {
860  case FITS::kOrderByRow:
861  //regular, "semi-transposed" copy
862  for (uint32_t k=0;k<thisRoundNumRows;k++)
863  {
864  memcpy(dest, src+k*fRealRowWidth+fRealColumns[i].col.offset, fRealColumns[i].col.size*fRealColumns[i].col.num);
865  dest += fRealColumns[i].col.size*fRealColumns[i].col.num;
866  }
867  break;
868 
869  case FITS::kOrderByCol:
870  //transposed copy
871  for (uint32_t j=0;j<fRealColumns[i].col.num;j++)
872  for (uint32_t k=0;k<thisRoundNumRows;k++)
873  {
874  memcpy(dest, src+k*fRealRowWidth+fRealColumns[i].col.offset+fRealColumns[i].col.size*j, fRealColumns[i].col.size);
875  dest += fRealColumns[i].col.size;
876  }
877  break;
878  };
879  }
880  }
881 
887  uint32_t compressUNCOMPRESSED(char* dest, const char* src, uint32_t size)
888  {
889  memcpy(dest, src, size);
890  return size;
891  }
892 
900  uint32_t compressHUFFMAN16(char* dest, const char* src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
901  {
902  std::string huffmanOutput;
903  uint32_t previousHuffmanSize = 0;
904 
905  //if we have less than 2 elems to compress, Huffman encoder does not work (and has no point). Just return larger size than uncompressed to trigger the raw storage.
906  if (numRows < 2)
907  return numRows*sizeOfElems*numRowElems + 1000;
908 
909  if (sizeOfElems < 2 )
910  {
911 #ifdef __EXCEPTIONS
912  throw std::runtime_error("HUFMANN16 can only encode columns with 16-bit or longer types");
913 #else
914  gLog << ___err___ << "ERROR - HUFMANN16 can only encode columns with 16-bit or longer types" << std::endl;
915  return 0;
916 #endif
917  }
918 
919  uint32_t huffmanOffset = 0;
920  for (uint32_t j=0;j<numRowElems;j++)
921  {
922  Huffman::Encode(huffmanOutput,
923  reinterpret_cast<const uint16_t*>(&src[j*sizeOfElems*numRows]),
924  numRows*(sizeOfElems/2));
925  reinterpret_cast<uint32_t*>(&dest[huffmanOffset])[0] = huffmanOutput.size() - previousHuffmanSize;
926  huffmanOffset += sizeof(uint32_t);
927  previousHuffmanSize = huffmanOutput.size();
928  }
929 
930  const size_t totalSize = huffmanOutput.size() + huffmanOffset;
931 
932  //only copy if not larger than not-compressed size
933  if (totalSize < numRows*sizeOfElems*numRowElems)
934  memcpy(&dest[huffmanOffset], huffmanOutput.data(), huffmanOutput.size());
935 
936  return totalSize;
937  }
938 
943  uint32_t applySMOOTHING(char* data, uint32_t numElems)
944  {
945  int16_t* short_data = reinterpret_cast<int16_t*>(data);
946  for (int j=numElems-1;j>1;j--)
947  short_data[j] = short_data[j] - (short_data[j-1]+short_data[j-2])/2;
948 
949  return numElems*sizeof(int16_t);
950  }
951 
956  uint32_t UnApplySMOOTHING(char* data, uint32_t numElems)
957  {
958  int16_t* short_data = reinterpret_cast<int16_t*>(data);
959  for (uint32_t j=2;j<numElems;j++)
960  short_data[j] = short_data[j] + (short_data[j-1]+short_data[j-2])/2;
961 
962  return numElems*sizeof(uint16_t);
963  }
964 
965 
966 
967  //thread related stuff
969  int32_t fNumQueues;
970  uint64_t fMaxUsableMem;
972 
973  std::vector<Queue<CompressionTarget>> fCompressionQueues;
975 
976  // catalog related stuff
977  CatalogType fCatalog;
978  uint32_t fCatalogSize;
979  uint32_t fNumTiles;
980  uint32_t fNumRowsPerTile;
982 
983  // checksum related stuff
986  int32_t fCheckOffset;
987 
988  // data layout related stuff
990 
991  public: // Public for the root dictionary
993  {
994  CompressedColumn(const Table::Column& c, const FITS::Compression& h) : col(c),
995  block_head(h)
996  {}
997 #ifdef __MARS__ // Needed for the compilation ofthe dictionary
998  CompressedColumn() { }
999 #endif
1000  Table::Column col;
1002  };
1003 
1004  protected:
1005  std::vector<CompressedColumn> fRealColumns;
1006  uint32_t fRealRowWidth;
1007  std::shared_ptr<char> fSmartBuffer;
1008  std::vector<char> fRawSumBuffer;
1009 
1011  int fErrno;
1012 };
1013 
1014 #endif
int32_t fNumQueues
Current number of threads that will be used by this object.
Definition: zofits.h:969
Overriding of the begin() operator to get the smallest item in the list instead of the true begin...
Definition: zofits.h:35
CatalogEntry(int64_t f=0, int64_t s=0)
Definition: zofits.h:51
MemoryManager fMemPool
Actual memory manager, providing memory for the compression buffers.
Definition: zofits.h:968
std::vector< char > fRawSumBuffer
buffer used for checksuming the incoming data, before compression
Definition: zofits.h:1008
bool WriteCatalog()
Definition: zofits.h:247
std::shared_ptr< char > transposed_src
Transposed data.
Definition: zofits.h:85
uint32_t fNumRowsPerTile
Number of rows per tile.
Definition: zofits.h:980
RowOrdering_t getOrdering() const
Definition: FITS.h:170
int i
Definition: db_dim_client.c:21
CatalogRow & AddOneCatalogRow()
Definition: zofits.h:292
int64_t second
offset of this column in the tile, from the start of the heap area
Definition: zofits.h:27
bool WriteRow(const void *ptr, size_t cnt, bool=true)
Definition: zofits.h:312
void InitMemberVariables(const uint32_t nt=0, const uint32_t rpt=0, const uint64_t maxUsableMem=0)
Definition: zofits.h:143
bool WriteBufferToDisk(const WriteTarget &target)
Definition: zofits.h:732
CatalogRow & catalog_entry
Reference to the catalog entry to deal with.
Definition: zofits.h:83
uint32_t compressHUFFMAN16(char *dest, const char *src, uint32_t numRows, uint32_t sizeOfElems, uint32_t numRowElems)
Definition: zofits.h:900
zofits(uint32_t numTiles=DefaultMaxNumTiles(), uint32_t rowPerTile=DefaultNumRowsPerTile(), uint32_t maxUsableMem=DefaultMaxMemory())
Definition: zofits.h:101
void FlushNumRows()
update the real number of rows
Definition: zofits.h:384
Parameters required to write a tile to disk.
Definition: zofits.h:61
STL namespace.
static uint32_t DefaultMaxMemory(const uint32_t &_n=0)
Definition: zofits.h:93
double begin
virtual bool AddColumn(uint32_t cnt, char typechar, const std::string &name, const std::string &unit, const std::string &comment="", bool addHeaderKeys=true)
Definition: ofits.h:596
zofits(const std::string &fname, uint32_t numTiles=DefaultMaxNumTiles(), uint32_t rowPerTile=DefaultNumRowsPerTile(), uint32_t maxUsableMem=DefaultMaxMemory())
Definition: zofits.h:127
Definition: Queue.h:28
WriteTarget target
Compressed data.
Definition: zofits.h:86
int64_t first
Size of this column in the tile.
Definition: zofits.h:26
uint32_t val() const
Definition: checksum.h:20
#define gLog
Definition: fits.h:36
std::shared_ptr< char > data
Memory block to write.
Definition: zofits.h:73
static uint32_t DefaultMaxNumTiles(const uint32_t &_n=0)
Definition: zofits.h:94
void Memcpy(char *dest) const
Definition: FITS.h:176
uint32_t GetNumTiles() const
Definition: zofits.h:634
uint32_t fCatalogSize
Actual catalog size (.size() is slow on large lists)
Definition: zofits.h:978
Checksum fRawSum
Raw sum (specific to FACT)
Definition: zofits.h:985
Checksum fCatalogSum
Checksum of the catalog.
Definition: zofits.h:984
int32_t fCheckOffset
offset to the data pointer to calculate the checksum
Definition: zofits.h:986
#define ___err___
Definition: fits.h:37
bool Encode(std::string &bufout, const uint16_t *bufin, size_t bufinlen)
Definition: huffman.h:385
bool SetNumThreads(uint32_t num)
Definition: zofits.h:600
uint32_t getSizeOnDisk() const
Definition: FITS.h:171
uint32_t UnApplySMOOTHING(char *data, uint32_t numElems)
Definition: zofits.h:956
static uint32_t DefaultNumThreads(const uint32_t &_n=-2)
static setter for the default number of threads to use. -1 means all available physical cores ...
Definition: zofits.h:92
uint64_t fMaxUsableMem
Maximum number of bytes that can be allocated by the memory manager.
Definition: zofits.h:970
virtual void DrsOffsetCalibrate(char *)
Applies the DrsOffsets calibration to the data. Does nothing as zofits knows nothing about drsoffsets...
Definition: zofits.h:287
std::shared_ptr< char > src
Original data.
Definition: zofits.h:84
uint16_t getNumProcs() const
Definition: FITS.h:173
static uint32_t SizeFromType(char type)
Definition: FITS.h:70
std::list< S >::iterator begin()
Definition: zofits.h:37
std::vector< Queue< CompressionTarget > > fCompressionQueues
Processing queues (=threads)
Definition: zofits.h:973
virtual void open(const char *filename, bool addEXTNAMEKey=true)
Definition: ofits.h:390
virtual bool WriteTableHeader(const char *name="DATA")
Definition: zofits.h:165
Definition: ofits.h:29
virtual bool WriteDrsOffsetsTable()
Definition: zofits.h:233
std::list< CatalogRow > CatalogType
Definition: zofits.h:58
uint32_t compressUNCOMPRESSED(char *dest, const char *src, uint32_t size)
Definition: zofits.h:887
uint32_t ShrinkCatalog()
Shrinks a catalog that is too long to fit into the reserved space at the beginning of the file...
Definition: zofits.h:415
void open(const char *filename, bool addEXTNAMEKey=true)
Definition: zofits.h:198
CompressionProcess_t getProc(uint32_t i) const
Definition: FITS.h:172
void * exception_ptr
Definition: zofits.h:24
uint32_t applySMOOTHING(char *data, uint32_t numElems)
Definition: zofits.h:943
WriteTarget(const WriteTarget &t, uint32_t sz)
Definition: zofits.h:69
Queue< WriteTarget, QueueMin< WriteTarget > > fWriteToDiskQueue
Writing queue (=thread)
Definition: zofits.h:974
void copyTransposeTile(const char *src, char *dest, uint32_t num_rows)
Definition: zofits.h:851
typedef __attribute__
int64_t first
Size of this column in the tile.
Definition: zofits.h:52
uint32_t num_rows
Number of rows to compress.
Definition: zofits.h:87
off_t fCatalogOffset
Offset of the catalog from the beginning of the file.
Definition: zofits.h:981
double end
int buffer[BUFFSIZE]
Definition: db_dim_client.c:14
bool operator<(const Service &left, const Service &right)
Definition: Service.h:13
uint64_t compressBuffer(char *dest, char *src, uint32_t num_rows, CatalogRow &catalog_row)
Compress a given buffer based on its source and destination.
Definition: zofits.h:765
int fErrno
propagate errno to main thread
Definition: zofits.h:1011
Parameters required to compress a tile of data.
Definition: zofits.h:78
uint32_t fNumTiles
Number of pre-reserved tiles.
Definition: zofits.h:979
int size
Definition: db_dim_server.c:17
float data[4 *1440]
void SetNumTiles(uint32_t num)
Definition: zofits.h:635
std::vector< CompressedColumn > fRealColumns
Vector hosting the columns of the file.
Definition: zofits.h:1005
static uint32_t DefaultNumRowsPerTile(const uint32_t &_n=0)
Definition: zofits.h:95
bool AddColumn(const FITS::Compression &comp, uint32_t cnt, char typechar, const std::string &name, const std::string &unit, const std::string &comment="", bool addHeaderKeys=true)
Overload of the simplified compressed version.
Definition: zofits.h:573
uint32_t tile_num
Tile index of the data (to make sure that they are written in the correct order)
Definition: zofits.h:71
uint32_t fRealRowWidth
Width in bytes of one uncompressed row.
Definition: zofits.h:1006
std::exception_ptr fThreadsException
exception pointer to store exceptions coming from the threads
Definition: zofits.h:1010
uint32_t size
Size to write.
Definition: zofits.h:72
CatalogType fCatalog
Catalog for this file.
Definition: zofits.h:977
void open(const std::string &filename, bool addEXTNAMEKey=true)
Definition: zofits.h:226
CompressionTarget(CatalogRow &r)
Definition: zofits.h:80
Regular columns augmented with compression informations.
Definition: zofits.h:992
TT t
Definition: test_client.c:26
Table::Column col
the regular column entry
Definition: zofits.h:1000
int32_t GetNumThreads() const
Get and set the actual number of threads for this object.
Definition: zofits.h:599
void reallocateBuffers()
Allocates the required objects.
Definition: zofits.h:640
zofits(const char *fname, uint32_t numTiles=DefaultMaxNumTiles(), uint32_t rowPerTile=DefaultNumRowsPerTile(), uint32_t maxUsableMem=DefaultMaxMemory())
Definition: zofits.h:115
int64_t second
offset of this column in the tile, from the start of the heap area
Definition: zofits.h:53
uint32_t GetBytesPerRow() const
Definition: zofits.h:240
uint32_t numRows
Definition: FITS.h:72
virtual bool WriteTableHeader(const char *name="DATA")
Definition: ofits.h:844
CompressionTarget InitNextCompression()
Definition: zofits.h:393
static std::string CommentFromType(char type)
Definition: FITS.h:50
bool writeCompressedDataToDisk(char *src, const uint32_t sizeToWrite)
Definition: zofits.h:656
CompressedColumn(const Table::Column &c, const FITS::Compression &h)
Definition: zofits.h:994
bool close()
Definition: zofits.h:468
FITS::Compression block_head
the compression data associated with that column
Definition: zofits.h:1001
int32_t fLatestWrittenTile
Index of the last tile written to disk (for correct ordering while using several threads) ...
Definition: zofits.h:971
Definition: zofits.h:28
bool CompressBuffer(const CompressionTarget &target)
Definition: zofits.h:692
CatalogEntry(int64_t f=0, int64_t s=0)
Definition: zofits.h:25
std::shared_ptr< char > fSmartBuffer
Smart pointer to the buffer where the incoming rows are written.
Definition: zofits.h:1007
TileHeader()
Definition: FITS.h:75
void SetBlockSize(uint64_t size)
Definition: FITS.h:175
std::vector< CatalogEntry > CatalogRow
Definition: zofits.h:57
bool AddColumn(uint32_t cnt, char typechar, const std::string &name, const std::string &unit, const std::string &comment="", bool addHeaderKeys=true)
Overload of the ofits method. Just calls the zofits specific one with default, uncompressed options f...
Definition: zofits.h:566