提问者:小点点

如何有效地合并k个按键排序的成对键/值向量?


我想合并< code>k个按键排序的成对键/值向量。通常,向量的大小< code>n非常大(例如< code>n

请考虑以下示例 k = 2

// Input
keys_1 = [1, 2, 3, 4], values_1 = [11, 12, 13, 14]
keys_2 = [3, 4, 5, 6], values_2 = [23, 24, 25, 26]

// Output
merged_keys = [1, 2, 3, 3, 4, 4, 5, 6], merged_values = [11, 12, 13, 23, 14, 24, 25, 26]

因为< code > _ _ GNU _ parallel::multi way _ merge 是一个高效的< code>k路合并算法,所以我尝试使用一个最先进的zip迭代器(https://github.com/dpellegr/ZipIterator)来“组合”键值对向量。

#include <iostream>
#include <vector>
#include <parallel/algorithm>

#include "ZipIterator.hpp"

int main(int argc, char* argv[]) {
  std::vector<int> keys_1   = {1, 2, 3, 4};
  std::vector<int> values_1 = {11, 12, 13, 14};
  std::vector<int> keys_2   = {3, 4, 5, 6};
  std::vector<int> values_2 = {23, 24, 25, 26};

  std::vector<int> merged_keys(8);
  std::vector<int> merged_values(8);

  auto kv_it_1 = Zip(keys_1, values_1);
  auto kv_it_2 = Zip(keys_2, values_2);
  auto mkv_it = Zip(merged_keys, merged_values);

  auto it_pairs = {std::make_pair(kv_it_1.begin(), kv_it_1.end()),
                   std::make_pair(kv_it_2.begin(), kv_it_2.end())};

  __gnu_parallel::multiway_merge(it_pairs.begin(), it_pairs.end(), mkv_it.begin(), 8, std::less<>());
  
  for (size_t i = 0; i < 8; ++i) {
    std::cout << merged_keys[i] << ":" << merged_values[i] << (i == 7 ? "\n" : ", ");
  }

  return 0;
}

但是,我得到了各种编译错误(使用< code>-O3构建):

错误:无法绑定类型为“std::__iterator_traits”的非常数左值引用

错误:无法转换'ZipIter

有没有可能修改< code>ZipIterator使其工作?

或者有没有更有效的方法来合并k按键排序的成对键/值向量?

考虑的替代方案

  1. 定义一个KeyValuePairstruct具有int keyint value成员以及运算符

共3个答案

匿名用户

是否可以修改压缩迭代器以使其工作?

是的,但这需要修补 __gnu_parallel::multiway_merge。错误源是以下行:

      /** @brief Dereference operator.
      *  @return Referenced element. */
      typename std::iterator_traits<_RAIter>::value_type&
      operator*() const
      { return *_M_current; }

这是 _GuardedIterator 的成员函数 - multiway_merge实现中使用的辅助结构。它包装了_RAIter类,在您的情况下是ZipIter。根据定义,当迭代器被取消引用 (*_M_current) 时,返回表达式的类型应该是引用类型。但是,此代码希望它value_type

  using reference = ZipRef<std::remove_reference_t<typename std::iterator_traits<IT>::reference>...>;

与(非常讨厌的)向量一起使用的做法相同

因此,<code>ZipIterator</code>没有问题,或者您如何使用算法,这对算法本身来说是一个非常重要的要求。下一个问题是,我们能摆脱它吗?

答案是肯定的。您可以更改_GuardedIterator::operator*()以返回reference而不是value_type

      // Default value for potentially non-default-constructible types.
      _ValueType* __arbitrary_element = 0;

      for (_SeqNumber __t = 0; __t < __k; ++__t)
        {
          if(!__arbitrary_element
             && _GLIBCXX_PARALLEL_LENGTH(__seqs_begin[__t]) > 0)
            __arbitrary_element = &(*__seqs_begin[__t].first);
        }

这里,元素的地址被认为是某个< code>__arbitrary_element。我们可以存储这个元素的副本,因为我们知道复制< code>ZipRef的成本很低,并且它是默认可构造的:

      // Local copy of the element
      _ValueType __arbitrary_element_val;
      _ValueType* __arbitrary_element = 0;

      for (_SeqNumber __t = 0; __t < __k; ++__t)
        {
          if(!__arbitrary_element
             && _GLIBCXX_PARALLEL_LENGTH(__seqs_begin[__t]) > 0) {
            __arbitrary_element_val = *__seqs_begin[__t].first;
            __arbitrary_element = &__arbitrary_element_val;
          }
        }

相同的错误将出现在文件multiseq_selection. h的几个地方,例如这里和这里。使用类似的技术修复所有这些。

然后,您将看到多个错误,如下图所示:

./parallel/multiway_merge.h:879:29: error: passing ‘const ZipIter<__gnu_cxx::__normal_iterator<int*, std::vector<int, std::allocator<int> > >, __gnu_cxx::__normal_iterator<int*, std::vector<int, std::allocator<int> > > >’ as ‘this’ argument discards qualifiers [-fpermissive]

它们是关于不正确的常数。这是因为您将< code>it_pairs声明为< code>auto,在这个特定的场景中,它将类型推断为< code>std::inializer_list。这是一种非常奇特的类型。例如,它只提供对其成员的常量访问,即使它本身没有声明为const。这就是这些误差的来源。例如,将< code>auto更改为< code>std::vector后,这些错误就消失了。

此时它应该编译查找。只是不要忘记使用 -fopenmp 进行编译,否则您将获得“未定义对'omp_get_thread_num'的引用”错误。

这是我看到的输出:

$ ./a.out
1:11, 2:12, 3:13, 3:23, 4:14, 4:24, 5:25, 6:26

匿名用户

由于您需要较低的内存开销,一个可能的解决方案是让< code>multiway_merge算法只对唯一的范围标识符和范围索引进行操作,并以lambda函数的形式提供比较和复制运算符。这样,合并算法就完全独立于实际使用的容器类型、键和值类型。

这是一个基于此处描述的基于堆的算法的C 17解决方案:

#include <cassert>
#include <cstdint>
#include <functional>
#include <initializer_list>
#include <iostream>
#include <iterator>
#include <queue>
#include <vector>

using range_type = std::pair<std::uint32_t,std::size_t>;

void multiway_merge(
    std::initializer_list<std::size_t> range_sizes,
    std::function<bool(const range_type&, const range_type&)> compare_func,
    std::function<void(const range_type&)> copy_func)
{
    // lambda compare function for priority queue of ranges
    auto queue_less = [&](const range_type& range1, const range_type& range2) {
        // reverse comparison order of range1 and range2 here,
        // because we require the smallest element to be on top
        return compare_func(range2, range1);
    };
    // create priority queue from all non-empty ranges
    std::priority_queue<
        range_type, std::vector<range_type>, 
        decltype(queue_less)> queue{ queue_less };
    for (std::uint32_t range_id = 0; range_id < range_sizes.size(); ++range_id) {
        if (std::data(range_sizes)[range_id] > 0) {
            queue.emplace(range_id, 0);
        }
    }
    // merge ranges until priority queue is empty
    while (!queue.empty()) {
        range_type top_range = queue.top();
        queue.pop();
        copy_func(top_range);
        if (++top_range.second != std::data(range_sizes)[top_range.first]) {
            // re-insert non-empty range
            queue.push(top_range);
        }
    }
}


int main() {
    std::vector<int> keys_1   = { 1, 2, 3, 4 };
    std::vector<int> values_1 = { 11, 12, 13, 14 };
    std::vector<int> keys_2   = { 3, 4, 5, 6, 7 };
    std::vector<int> values_2 = { 23, 24, 25, 26, 27 };

    std::vector<int> merged_keys;
    std::vector<int> merged_values;

    multiway_merge(
        { keys_1.size(), keys_2.size() },
        [&](const range_type& left, const range_type& right) {
            if (left == right) return false;
            switch (left.first) {
                case 0:
                    assert(right.first == 1);
                    return keys_1[left.second] < keys_2[right.second];
                case 1:
                    assert(right.first == 0);
                    return keys_2[left.second] < keys_1[right.second];
            }
            return false;
        },
        [&](const range_type& range) {
            switch (range.first) {
                case 0:
                    merged_keys.push_back(keys_1[range.second]);
                    merged_values.push_back(values_1[range.second]);
                    break;
                case 1:
                    merged_keys.push_back(keys_2[range.second]);
                    merged_values.push_back(values_2[range.second]);
                    break;
            }
        });
    // copy result to stdout
    std::cout << "keys: ";
    std::copy(
        merged_keys.cbegin(), merged_keys.cend(), 
        std::ostream_iterator<int>(std::cout, " "));
    std::cout << "\nvalues: ";
    std::copy(
        merged_values.cbegin(), merged_values.cend(), 
        std::ostream_iterator<int>(std::cout, " "));
    std::cout << "\n";
}

该算法的时间复杂度为 O(n log(k)),空间复杂度为 O(k),其中 n 是所有范围的总大小,k 是范围的数量。

所有输入范围的大小都需要作为初始值设定项列表传递。该示例仅传递示例中的两个输入范围。将示例扩展到两个以上的范围很简单。

匿名用户

你将必须实现一个适合你的确切情况下,有这么大的数组,多威胁可能不是那么好,如果你能负担得起分配一个完整或接近完整的数组副本,你可以做的一个优化是使用大页面,并确保你正在访问的内存没有分页(如果你打算满负荷运行,没有交换是不理想的)。

这个简单的低内存示例工作得很好,很难击败顺序 I/O,它的主要瓶颈是使用 realloc,当使用的值从 arrs 替换到 ret 时,每个step_size都会有多个 realloc,但只有一个是昂贵的ret.reserve() 可能会消耗“大量”时间,因为缩短缓冲区始终可用,但扩展缓冲区可能不可用,并且操作系统可能需要进行多次内存移动。

#include <vector>
#include <chrono>
#include <stdio.h>

template<typename Pair, typename bool REVERSED = true>
std::vector<Pair> multi_merge_lm(std::vector<std::vector<Pair>>& arrs, float step){
    size_t final_size = 0, max, i;
    for (i = 0; i < arrs.size(); i++){
        final_size += arrs[i].size();
    }

    float original = (float)final_size;
    size_t step_size = (size_t)((float)(final_size) * step);

    printf("Merge of %zi (%zi bytes) with %zi step size \n", 
        final_size, sizeof(Pair), step_size
    );
    printf("Merge operation size %.*f mb + %.*f mb \n",
        3, ((float)(sizeof(Pair) * (float)final_size) / 1000000),
        3, ((float)(sizeof(Pair) * (float)final_size * step) / 1000000)
    );

    std::vector<Pair> ret;
    while (final_size --> 0){

        for (max = 0, i = 0; i < arrs.size(); i++){
            // select the next biggest item from all the arrays
            if (arrs[i].back().first > arrs[max].back().first){
                max = i;
            }
        }

        // This does not actualy resize the vector 
        // unless the capacity is too small
        ret.push_back(arrs[max].back());
        arrs[max].pop_back();

        // This check could be extracted of the while
        // with a unroll and sort to little
        for (i = 0; i < arrs.size(); i++){
            if (arrs[i].empty()){
                arrs[i] = arrs.back();
                arrs.pop_back();
                break;
            }
        }

        if (ret.size() == ret.capacity()) {
            // Remove the used memory from the arrs and
            // realloc more to the ret
            for (std::vector<Pair>& chunk : arrs){
                chunk.shrink_to_fit();
            }
            ret.reserve(ret.size() + step_size);

            // Dont move this to the while loop, it will slow down
            // the execution, leave it just for debugging
            printf("\rProgress %i%c / Merge size %zi", 
                (int)((1 - ((float)final_size / original) ) * 100), 
                '%', ret.size()
            );
        }
    }

    printf("\r%*c\r", 40, ' ');
    ret.shrink_to_fit();
    arrs.clear();

    if (REVERSED){
        std::reverse(ret.begin(), ret.end());
    }

    return ret;
}

int main(void) {

    typedef std::pair<uint64_t, uint64_t> Pair;

    int inc = 1;
    int increment = 100000;
    int test_size = 40000000;
    float step_size = 0.05f;

    auto arrs = std::vector<std::vector<Pair>>(5);
    for (auto& chunk : arrs){

        // makes the arrays big and asymmetric and adds 
        // some data to check if it works
        chunk.resize(test_size + increment * inc++);
        for (int i = 0; i < chunk.size(); i++){
            chunk[i] = std::make_pair(i, i * -1);
        }

    }
    printf("Generation done \n");

    auto start = std::chrono::steady_clock::now();
    auto merged = multi_merge_lm<Pair>(arrs, step_size);
    auto end = std::chrono::steady_clock::now();

    printf("Time taken: %lfs \n", 
        (std::chrono::duration<double>(end - start)).count()
    );
    for (size_t i = 1; i < merged.size(); i++){
        if (merged[i - 1] > merged[i]){
            printf("Miss placed at index: %zi \n", i - 1);
        }
    }

    merged.clear();
    return 0;
}
Merge of 201500000 (16 bytes) with 10075000 step size
Merge operation size 3224.000 mb + 161.200 mb
Time taken: 166.197639s

通过一个分析器(在我的例子中是ANDuProf)运行这个测试表明,调整大小是非常昂贵的,你把< code>step_size做得越大,它就变得越有效。

这次重新运行是 0.5 倍,它快 ~2 倍,但现在该函数消耗的内存比以前多 10 倍,您应该记住,此值不是通用的,它们可能会根据您运行的硬件而变化,但比例不会改变那么多。

Merge of 201500000 (16 bytes) with 100750000 step size
Merge operation size 3224.000 mb + 1612.000 mb
Time taken: 72.062857s

你不应该忘记的另外两件事是,< code>std::vector是动态的,它的实际大小可能更大,而且< code>O2并不能对堆内存访问进行太多的优化,如果你不能保证它的安全性,那么指令只能等待。