Lens AI Profiler Cpp
frequent_items_sketch_impl.hpp
1 /*
2  * Licensed to the Apache Software Foundation (ASF) under one
3  * or more contributor license agreements. See the NOTICE file
4  * distributed with this work for additional information
5  * regarding copyright ownership. The ASF licenses this file
6  * to you under the Apache License, Version 2.0 (the
7  * "License"); you may not use this file except in compliance
8  * with the License. You may obtain a copy of the License at
9  *
10  * http://www.apache.org/licenses/LICENSE-2.0
11  *
12  * Unless required by applicable law or agreed to in writing,
13  * software distributed under the License is distributed on an
14  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15  * KIND, either express or implied. See the License for the
16  * specific language governing permissions and limitations
17  * under the License.
18  */
19 
20 #ifndef FREQUENT_ITEMS_SKETCH_IMPL_HPP_
21 #define FREQUENT_ITEMS_SKETCH_IMPL_HPP_
22 
23 #include <cstring>
24 #include <limits>
25 #include <sstream>
26 #include <stdexcept>
27 
28 #include "memory_operations.hpp"
29 
30 namespace datasketches {
31 
32 // clang++ seems to require this declaration for CMAKE_BUILD_TYPE='Debug"
33 template<typename T, typename W, typename H, typename E, typename A>
34 const uint8_t frequent_items_sketch<T, W, H, E, A>::LG_MIN_MAP_SIZE;
35 
36 template<typename T, typename W, typename H, typename E, typename A>
37 frequent_items_sketch<T, W, H, E, A>::frequent_items_sketch(uint8_t lg_max_map_size, uint8_t lg_start_map_size,
38  const E& equal, const A& allocator):
39 total_weight(0),
40 offset(0),
41 map(
42  std::max(lg_start_map_size, frequent_items_sketch::LG_MIN_MAP_SIZE),
43  std::max(lg_max_map_size, frequent_items_sketch::LG_MIN_MAP_SIZE),
44  equal,
45  allocator
46 )
47 {
48  if (lg_start_map_size > lg_max_map_size) throw std::invalid_argument("starting size must not be greater than maximum size");
49 }
50 
51 template<typename T, typename W, typename H, typename E, typename A>
52 void frequent_items_sketch<T, W, H, E, A>::update(const T& item, W weight) {
53  check_weight(weight);
54  if (weight == 0) return;
55  total_weight += weight;
56  offset += map.adjust_or_insert(item, weight);
57 }
58 
59 template<typename T, typename W, typename H, typename E, typename A>
61  check_weight(weight);
62  if (weight == 0) return;
63  total_weight += weight;
64  offset += map.adjust_or_insert(std::move(item), weight);
65 }
66 
67 template<typename T, typename W, typename H, typename E, typename A>
69  if (other.is_empty()) return;
70  const W merged_total_weight = total_weight + other.get_total_weight(); // for correction at the end
71  for (auto it: other.map) {
72  update(it.first, it.second);
73  }
74  offset += other.offset;
75  total_weight = merged_total_weight;
76 }
77 
78 template<typename T, typename W, typename H, typename E, typename A>
80  if (other.is_empty()) return;
81  const W merged_total_weight = total_weight + other.get_total_weight(); // for correction at the end
82  for (auto it: other.map) {
83  update(std::move(it.first), it.second);
84  }
85  offset += other.offset;
86  total_weight = merged_total_weight;
87 }
88 
89 template<typename T, typename W, typename H, typename E, typename A>
91  return map.get_num_active() == 0;
92 }
93 
94 template<typename T, typename W, typename H, typename E, typename A>
96  return map.get_num_active();
97 }
98 
99 template<typename T, typename W, typename H, typename E, typename A>
101  return total_weight;
102 }
103 
104 template<typename T, typename W, typename H, typename E, typename A>
106  // if item is tracked estimate = weight + offset, otherwise 0
107  const W weight = map.get(item);
108  if (weight > 0) return weight + offset;
109  return 0;
110 }
111 
112 template<typename T, typename W, typename H, typename E, typename A>
114  return map.get(item);
115 }
116 
117 template<typename T, typename W, typename H, typename E, typename A>
119  return map.get(item) + offset;
120 }
121 
122 template<typename T, typename W, typename H, typename E, typename A>
124  return offset;
125 }
126 
127 template<typename T, typename W, typename H, typename E, typename A>
129  return EPSILON_FACTOR / (1 << map.get_lg_max_size());
130 }
131 
132 template<typename T, typename W, typename H, typename E, typename A>
133 double frequent_items_sketch<T, W, H, E, A>::get_epsilon(uint8_t lg_max_map_size) {
134  return EPSILON_FACTOR / (1 << lg_max_map_size);
135 }
136 
137 template<typename T, typename W, typename H, typename E, typename A>
138 double frequent_items_sketch<T, W, H, E, A>::get_apriori_error(uint8_t lg_max_map_size, W estimated_total_weight) {
139  return get_epsilon(lg_max_map_size) * estimated_total_weight;
140 }
141 
142 
143 template<typename T, typename W, typename H, typename E, typename A>
145  return get_frequent_items(err_type, get_maximum_error());
146 }
147 
148 template<typename T, typename W, typename H, typename E, typename A>
150  vector_row items(map.get_allocator());
151  for (auto it: map) {
152  const W lb = it.second;
153  const W ub = it.second + offset;
154  if ((err_type == NO_FALSE_NEGATIVES && ub > threshold) || (err_type == NO_FALSE_POSITIVES && lb > threshold)) {
155  items.push_back(row(&it.first, it.second, offset));
156  }
157  }
158  // sort by estimate in descending order
159  std::sort(items.begin(), items.end(), [](row a, row b){ return a.get_estimate() > b.get_estimate(); });
160  return items;
161 }
162 
163 template<typename T, typename W, typename H, typename E, typename A>
164 template<typename SerDe>
165 void frequent_items_sketch<T, W, H, E, A>::serialize(std::ostream& os, const SerDe& sd) const {
166  const uint8_t preamble_longs = is_empty() ? PREAMBLE_LONGS_EMPTY : PREAMBLE_LONGS_NONEMPTY;
167  write(os, preamble_longs);
168  const uint8_t serial_version = SERIAL_VERSION;
169  write(os, serial_version);
170  const uint8_t family = FAMILY_ID;
171  write(os, family);
172  const uint8_t lg_max_size = map.get_lg_max_size();
173  write(os, lg_max_size);
174  const uint8_t lg_cur_size = map.get_lg_cur_size();
175  write(os, lg_cur_size);
176  const uint8_t flags_byte(
177  (is_empty() ? 1 << flags::IS_EMPTY_1 : 0)
178  | (is_empty() ? 1 << flags::IS_EMPTY_2 : 0)
179  );
180  write(os, flags_byte);
181  const uint16_t unused16 = 0;
182  write(os, unused16);
183  if (!is_empty()) {
184  const uint32_t num_items = map.get_num_active();
185  write(os, num_items);
186  const uint32_t unused32 = 0;
187  write(os, unused32);
188  write(os, total_weight);
189  write(os, offset);
190 
191  // copy active items and their weights to use batch serialization
192  using AllocW = typename std::allocator_traits<A>::template rebind_alloc<W>;
193  AllocW aw(map.get_allocator());
194  W* weights = aw.allocate(num_items);
195  A alloc(map.get_allocator());
196  T* items = alloc.allocate(num_items);
197  uint32_t i = 0;
198  for (auto it: map) {
199  new (&items[i]) T(it.first);
200  weights[i++] = it.second;
201  }
202  write(os, weights, sizeof(W) * num_items);
203  aw.deallocate(weights, num_items);
204  sd.serialize(os, items, num_items);
205  for (i = 0; i < num_items; i++) items[i].~T();
206  alloc.deallocate(items, num_items);
207  }
208 }
209 
210 template<typename T, typename W, typename H, typename E, typename A>
211 template<typename SerDe>
213  if (is_empty()) return PREAMBLE_LONGS_EMPTY * sizeof(uint64_t);
214  size_t size = PREAMBLE_LONGS_NONEMPTY * sizeof(uint64_t) + map.get_num_active() * sizeof(W);
215  for (auto it: map) size += sd.size_of_item(it.first);
216  return size;
217 }
218 
219 template<typename T, typename W, typename H, typename E, typename A>
220 template<typename SerDe>
221 auto frequent_items_sketch<T, W, H, E, A>::serialize(unsigned header_size_bytes, const SerDe& sd) const -> vector_bytes {
222  const size_t size = header_size_bytes + get_serialized_size_bytes(sd);
223  vector_bytes bytes(size, 0, map.get_allocator());
224  uint8_t* ptr = bytes.data() + header_size_bytes;
225  uint8_t* end_ptr = ptr + size;
226 
227  const uint8_t preamble_longs = is_empty() ? PREAMBLE_LONGS_EMPTY : PREAMBLE_LONGS_NONEMPTY;
228  ptr += copy_to_mem(preamble_longs, ptr);
229  const uint8_t serial_version = SERIAL_VERSION;
230  ptr += copy_to_mem(serial_version, ptr);
231  const uint8_t family = FAMILY_ID;
232  ptr += copy_to_mem(family, ptr);
233  const uint8_t lg_max_size = map.get_lg_max_size();
234  ptr += copy_to_mem(lg_max_size, ptr);
235  const uint8_t lg_cur_size = map.get_lg_cur_size();
236  ptr += copy_to_mem(lg_cur_size, ptr);
237  const uint8_t flags_byte(
238  (is_empty() ? 1 << flags::IS_EMPTY_1 : 0)
239  | (is_empty() ? 1 << flags::IS_EMPTY_2 : 0)
240  );
241  ptr += copy_to_mem(flags_byte, ptr);
242  ptr += sizeof(uint16_t); // unused
243  if (!is_empty()) {
244  const uint32_t num_items = map.get_num_active();
245  ptr += copy_to_mem(num_items, ptr);
246  ptr += sizeof(uint32_t); // unused
247  ptr += copy_to_mem(total_weight, ptr);
248  ptr += copy_to_mem(offset, ptr);
249 
250  // copy active items and their weights to use batch serialization
251  using AllocW = typename std::allocator_traits<A>::template rebind_alloc<W>;
252  AllocW aw(map.get_allocator());
253  W* weights = aw.allocate(num_items);
254  A alloc(map.get_allocator());
255  T* items = alloc.allocate(num_items);
256  uint32_t i = 0;
257  for (auto it: map) {
258  new (&items[i]) T(it.first);
259  weights[i++] = it.second;
260  }
261  ptr += copy_to_mem(weights, ptr, sizeof(W) * num_items);
262  aw.deallocate(weights, num_items);
263  const size_t bytes_remaining = end_ptr - ptr;
264  ptr += sd.serialize(ptr, bytes_remaining, items, num_items);
265  for (i = 0; i < num_items; i++) items[i].~T();
266  alloc.deallocate(items, num_items);
267  }
268  return bytes;
269 }
270 
271 template<typename T, typename W, typename H, typename E, typename A>
272 class frequent_items_sketch<T, W, H, E, A>::items_deleter {
273 public:
274  items_deleter(uint32_t num, bool destroy, const A& allocator):
275  allocator_(allocator), num_(num), destroy_(destroy) {}
276  void set_destroy(bool destroy) { destroy_ = destroy; }
277  void operator() (T* ptr) {
278  if (ptr != nullptr) {
279  if (destroy_) {
280  for (uint32_t i = 0; i < num_; ++i) ptr[i].~T();
281  }
282  allocator_.deallocate(ptr, num_);
283  }
284  }
285 private:
286  A allocator_;
287  uint32_t num_;
288  bool destroy_;
289 };
290 
291 template<typename T, typename W, typename H, typename E, typename A>
292 template<typename SerDe>
294  const SerDe& sd, const E& equal, const A& allocator) {
295  const auto preamble_longs = read<uint8_t>(is);
296  const auto serial_version = read<uint8_t>(is);
297  const auto family_id = read<uint8_t>(is);
298  const auto lg_max_size = read<uint8_t>(is);
299  const auto lg_cur_size = read<uint8_t>(is);
300  const auto flags_byte = read<uint8_t>(is);
301  read<uint16_t>(is); // unused
302 
303  const bool is_empty = (flags_byte & (1 << flags::IS_EMPTY_1)) | (flags_byte & (1 << flags::IS_EMPTY_2));
304 
305  check_preamble_longs(preamble_longs, is_empty);
306  check_serial_version(serial_version);
307  check_family_id(family_id);
308  check_size(lg_cur_size, lg_max_size);
309 
310  frequent_items_sketch sketch(lg_max_size, lg_cur_size, equal, allocator);
311  if (!is_empty) {
312  const auto num_items = read<uint32_t>(is);
313  read<uint32_t>(is); // unused
314  const auto total_weight = read<W>(is);
315  const auto offset = read<W>(is);
316 
317  // batch deserialization with intermediate array of items and weights
318  using AllocW = typename std::allocator_traits<A>::template rebind_alloc<W>;
319  std::vector<W, AllocW> weights(num_items, 0, allocator);
320  read(is, weights.data(), sizeof(W) * num_items);
321  A alloc(allocator);
322  std::unique_ptr<T, items_deleter> items(alloc.allocate(num_items), items_deleter(num_items, false, alloc));
323  sd.deserialize(is, items.get(), num_items);
324  items.get_deleter().set_destroy(true); // serde did not throw, so the items must be constructed
325  for (uint32_t i = 0; i < num_items; i++) {
326  sketch.update(std::move(items.get()[i]), weights[i]);
327  }
328  sketch.total_weight = total_weight;
329  sketch.offset = offset;
330  }
331  if (!is.good())
332  throw std::runtime_error("error reading from std::istream");
333  return sketch;
334 }
335 
336 template<typename T, typename W, typename H, typename E, typename A>
337 template<typename SerDe>
338 frequent_items_sketch<T, W, H, E, A> frequent_items_sketch<T, W, H, E, A>::deserialize(const void* bytes, size_t size,
339  const SerDe& sd, const E& equal, const A& allocator) {
340  ensure_minimum_memory(size, 8);
341  const char* ptr = static_cast<const char*>(bytes);
342  const char* base = static_cast<const char*>(bytes);
343  uint8_t preamble_longs;
344  ptr += copy_from_mem(ptr, preamble_longs);
345  uint8_t serial_version;
346  ptr += copy_from_mem(ptr, serial_version);
347  uint8_t family_id;
348  ptr += copy_from_mem(ptr, family_id);
349  uint8_t lg_max_size;
350  ptr += copy_from_mem(ptr, lg_max_size);
351  uint8_t lg_cur_size;
352  ptr += copy_from_mem(ptr, lg_cur_size);
353  uint8_t flags_byte;
354  ptr += copy_from_mem(ptr, flags_byte);
355  ptr += sizeof(uint16_t); // unused
356 
357  const bool is_empty = (flags_byte & (1 << flags::IS_EMPTY_1)) | (flags_byte & (1 << flags::IS_EMPTY_2));
358 
359  check_preamble_longs(preamble_longs, is_empty);
360  check_serial_version(serial_version);
361  check_family_id(family_id);
362  check_size(lg_cur_size, lg_max_size);
363  ensure_minimum_memory(size, preamble_longs * sizeof(uint64_t));
364 
365  frequent_items_sketch sketch(lg_max_size, lg_cur_size, equal, allocator);
366  if (!is_empty) {
367  uint32_t num_items;
368  ptr += copy_from_mem(ptr, num_items);
369  ptr += sizeof(uint32_t); // unused
370  W total_weight;
371  ptr += copy_from_mem(ptr, total_weight);
372  W offset;
373  ptr += copy_from_mem(ptr, offset);
374 
375  ensure_minimum_memory(size, ptr - base + (sizeof(W) * num_items));
376  // batch deserialization with intermediate array of items and weights
377  using AllocW = typename std::allocator_traits<A>::template rebind_alloc<W>;
378  std::vector<W, AllocW> weights(num_items, 0, allocator);
379  ptr += copy_from_mem(ptr, weights.data(), sizeof(W) * num_items);
380  A alloc(allocator);
381  std::unique_ptr<T, items_deleter> items(alloc.allocate(num_items), items_deleter(num_items, false, alloc));
382  const size_t bytes_remaining = size - (ptr - base);
383  ptr += sd.deserialize(ptr, bytes_remaining, items.get(), num_items);
384  items.get_deleter().set_destroy(true); // serde did not throw, so the items must be constructed
385  for (uint32_t i = 0; i < num_items; i++) {
386  sketch.update(std::move(items.get()[i]), weights[i]);
387  }
388 
389  sketch.total_weight = total_weight;
390  sketch.offset = offset;
391  }
392  return sketch;
393 }
394 
395 template<typename T, typename W, typename H, typename E, typename A>
396 void frequent_items_sketch<T, W, H, E, A>::check_preamble_longs(uint8_t preamble_longs, bool is_empty) {
397  if (is_empty) {
398  if (preamble_longs != PREAMBLE_LONGS_EMPTY) {
399  throw std::invalid_argument("Possible corruption: preamble longs of an empty sketch must be " + std::to_string(PREAMBLE_LONGS_EMPTY) + ": " + std::to_string(preamble_longs));
400  }
401  } else {
402  if (preamble_longs != PREAMBLE_LONGS_NONEMPTY) {
403  throw std::invalid_argument("Possible corruption: preamble longs of an non-empty sketch must be " + std::to_string(PREAMBLE_LONGS_NONEMPTY) + ": " + std::to_string(preamble_longs));
404  }
405  }
406 }
407 
408 template<typename T, typename W, typename H, typename E, typename A>
409 void frequent_items_sketch<T, W, H, E, A>::check_serial_version(uint8_t serial_version) {
410  if (serial_version != SERIAL_VERSION) {
411  throw std::invalid_argument("Possible corruption: serial version must be " + std::to_string(SERIAL_VERSION) + ": " + std::to_string(serial_version));
412  }
413 }
414 
415 template<typename T, typename W, typename H, typename E, typename A>
416 void frequent_items_sketch<T, W, H, E, A>::check_family_id(uint8_t family_id) {
417  if (family_id != FAMILY_ID) {
418  throw std::invalid_argument("Possible corruption: family ID must be " + std::to_string(FAMILY_ID) + ": " + std::to_string(family_id));
419  }
420 }
421 
422 template<typename T, typename W, typename H, typename E, typename A>
423 void frequent_items_sketch<T, W, H, E, A>::check_size(uint8_t lg_cur_size, uint8_t lg_max_size) {
424  if (lg_cur_size > lg_max_size) {
425  throw std::invalid_argument("Possible corruption: expected lg_cur_size <= lg_max_size: " + std::to_string(lg_cur_size) + " <= " + std::to_string(lg_max_size));
426  }
427  if (lg_cur_size < LG_MIN_MAP_SIZE) {
428  throw std::invalid_argument("Possible corruption: lg_cur_size must not be less than " + std::to_string(LG_MIN_MAP_SIZE) + ": " + std::to_string(lg_cur_size));
429  }
430 }
431 
432 template<typename T, typename W, typename H, typename E, typename A>
433 string<A> frequent_items_sketch<T, W, H, E, A>::to_string(bool print_items) const {
434  // Using a temporary stream for implementation here does not comply with AllocatorAwareContainer requirements.
435  // The stream does not support passing an allocator instance, and alternatives are complicated.
436  std::ostringstream os;
437  os << "### Frequent items sketch summary:" << std::endl;
438  os << " lg cur map size : " << (int) map.get_lg_cur_size() << std::endl;
439  os << " lg max map size : " << (int) map.get_lg_max_size() << std::endl;
440  os << " num active items : " << get_num_active_items() << std::endl;
441  os << " total weight : " << get_total_weight() << std::endl;
442  os << " max error : " << get_maximum_error() << std::endl;
443  os << "### End sketch summary" << std::endl;
444  if (print_items) {
445  vector_row items;
446  for (auto it: map) {
447  items.push_back(row(&it.first, it.second, offset));
448  }
449  // sort by estimate in descending order
450  std::sort(items.begin(), items.end(), [](row a, row b){ return a.get_estimate() > b.get_estimate(); });
451  os << "### Items in descending order by estimate" << std::endl;
452  os << " item, estimate, lower bound, upper bound" << std::endl;
453  for (auto it: items) {
454  os << " " << it.get_item() << ", " << it.get_estimate() << ", "
455  << it.get_lower_bound() << ", " << it.get_upper_bound() << std::endl;
456  }
457  os << "### End items" << std::endl;
458  }
459  return string<A>(os.str().c_str(), map.get_allocator());
460 }
461 
462 // version for integral signed type
463 template<typename T, typename W, typename H, typename E, typename A>
464 template<typename WW, typename std::enable_if<std::is_integral<WW>::value && std::is_signed<WW>::value, int>::type>
466  if (weight < 0) {
467  throw std::invalid_argument("weight must be non-negative");
468  }
469 }
470 
471 // version for integral unsigned type - no-op
472 template<typename T, typename W, typename H, typename E, typename A>
473 template<typename WW, typename std::enable_if<std::is_integral<WW>::value && std::is_unsigned<WW>::value, int>::type>
474 void frequent_items_sketch<T, W, H, E, A>::check_weight(WW) {}
475 
476 // version for floating point type
477 template<typename T, typename W, typename H, typename E, typename A>
478 template<typename WW, typename std::enable_if<std::is_floating_point<WW>::value, int>::type>
479 void frequent_items_sketch<T, W, H, E, A>::check_weight(WW weight) {
480  if (weight < 0) {
481  throw std::invalid_argument("weight must be non-negative");
482  }
483  if (std::isnan(weight)) {
484  throw std::invalid_argument("weight must be a valid number");
485  }
486  if (std::isinf(weight)) {
487  throw std::invalid_argument("weight must be finite");
488  }
489 }
490 
491 }
492 
493 #endif
Definition: frequent_items_sketch_impl.hpp:272
Row in the output from get_frequent_items.
Definition: frequent_items_sketch.hpp:330
Definition: frequent_items_sketch.hpp:55
frequent_items_sketch(uint8_t lg_max_map_size, uint8_t lg_start_map_size=LG_MIN_MAP_SIZE, const E &equal=E(), const A &allocator=A())
Definition: frequent_items_sketch_impl.hpp:37
W get_upper_bound(const T &item) const
Definition: frequent_items_sketch_impl.hpp:118
bool is_empty() const
Definition: frequent_items_sketch_impl.hpp:90
double get_epsilon() const
Definition: frequent_items_sketch_impl.hpp:128
vector_row get_frequent_items(frequent_items_error_type err_type) const
Definition: frequent_items_sketch_impl.hpp:144
W get_estimate(const T &item) const
Definition: frequent_items_sketch_impl.hpp:105
W get_maximum_error() const
Definition: frequent_items_sketch_impl.hpp:123
static frequent_items_sketch deserialize(std::istream &is, const SerDe &sd=SerDe(), const E &equal=E(), const A &allocator=A())
void serialize(std::ostream &os, const SerDe &sd=SerDe()) const
Definition: frequent_items_sketch_impl.hpp:165
static double get_apriori_error(uint8_t lg_max_map_size, W estimated_total_weight)
Definition: frequent_items_sketch_impl.hpp:138
uint32_t get_num_active_items() const
Definition: frequent_items_sketch_impl.hpp:95
W get_total_weight() const
Definition: frequent_items_sketch_impl.hpp:100
void update(const T &item, W weight=1)
Definition: frequent_items_sketch_impl.hpp:52
string< A > to_string(bool print_items=false) const
Definition: frequent_items_sketch_impl.hpp:433
W get_lower_bound(const T &item) const
Definition: frequent_items_sketch_impl.hpp:113
size_t get_serialized_size_bytes(const SerDe &sd=SerDe()) const
Definition: frequent_items_sketch_impl.hpp:212
void merge(const frequent_items_sketch &other)
Definition: frequent_items_sketch_impl.hpp:68
DataSketches namespace.
Definition: common_defs.hpp:32
frequent_items_error_type
Frequent items error type.
Definition: frequent_items_sketch.hpp:36
@ NO_FALSE_POSITIVES
include an item in the result list if get_lower_bound(item) > threshold
Definition: frequent_items_sketch.hpp:37
@ NO_FALSE_NEGATIVES
include an item in the result list if get_upper_bound(item) > threshold
Definition: frequent_items_sketch.hpp:38