WebSafe 3.7git.libreoffice.org
|
|
🏠
Implement parallel version of super-scalar-sample-sort...

and use it for the pivot table construction routine processBuckets().

The implementation uses ideas from the non-parallel sample sort discussed in the below paper,
but parallelizes the "binning"/"classification" operations and the sorting of the bins
themselves.

Sanders, Peter, and Sebastian Winkel. "Super scalar sample sort."
European Symposium on Algorithms. Springer, Berlin, Heidelberg, 2004.

which can be accessed at :
http://citeseerx.ist.psu.edu/viewdoc/download?doi=10.1.1.72.366&rep=rep1&type=pdf

Change-Id: I3723b87e2feb8d7d9ee03f71f6025e26add914ce
Reviewed-on: https://gerrit.libreoffice.org/79486
Tested-by: Jenkins
Reviewed-by: Luboš Luňák <l.lunak@collabora.com>
diff --git a/comphelper/CppunitTest_comphelper_parallelsort_test.mk b/comphelper/CppunitTest_comphelper_parallelsort_test.mk
new file mode 100644
index 0000000..e1d2eab
--- /dev/null
+++ b/comphelper/CppunitTest_comphelper_parallelsort_test.mk
@@ -0,0 +1,30 @@
# -*- Mode: makefile-gmake; tab-width: 4; indent-tabs-mode: t -*-
#
# This file is part of the LibreOffice project.
#
# This Source Code Form is subject to the terms of the Mozilla Public
# License, v. 2.0. If a copy of the MPL was not distributed with this
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
#

$(eval $(call gb_CppunitTest_CppunitTest,comphelper_parallelsort_test))

$(eval $(call gb_CppunitTest_add_exception_objects,comphelper_parallelsort_test, \
    comphelper/qa/unit/parallelsorttest \
))

$(eval $(call gb_CppunitTest_use_externals,comphelper_parallelsort_test,\
	boost_headers \
))

$(eval $(call gb_CppunitTest_use_sdk_api,comphelper_parallelsort_test))

$(eval $(call gb_CppunitTest_use_libraries,comphelper_parallelsort_test, \
    comphelper \
    cppuhelper \
    cppu \
    sal \
    tl \
))

# vim: set noet sw=4 ts=4:
diff --git a/comphelper/Module_comphelper.mk b/comphelper/Module_comphelper.mk
index 30ac708..7541a59 100644
--- a/comphelper/Module_comphelper.mk
+++ b/comphelper/Module_comphelper.mk
@@ -30,6 +30,7 @@
))

$(eval $(call gb_Module_add_check_targets,comphelper,\
    CppunitTest_comphelper_parallelsort_test \
    CppunitTest_comphelper_threadpool_test \
    CppunitTest_comphelper_syntaxhighlight_test \
    CppunitTest_comphelper_variadictemplates_test \
diff --git a/comphelper/qa/unit/parallelsorttest.cxx b/comphelper/qa/unit/parallelsorttest.cxx
new file mode 100644
index 0000000..90dcb3c
--- /dev/null
+++ b/comphelper/qa/unit/parallelsorttest.cxx
@@ -0,0 +1,101 @@
/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
/*
 * This file is part of the LibreOffice project.
 *
 * This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0. If a copy of the MPL was not distributed with this
 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
 */

#include <comphelper/parallelsort.hxx>
#include <comphelper/threadpool.hxx>
#include <rtl/string.hxx>
#include <cppunit/TestAssert.h>
#include <cppunit/TestFixture.h>
#include <cppunit/extensions/HelperMacros.h>
#include <cppunit/plugin/TestPlugIn.h>

#include <cstdlib>
#include <vector>
#include <algorithm>
#include <random>

class ParallelSortTest : public CppUnit::TestFixture
{
public:
    void testSortTiny();
    void testSortMedium();
    void testSortBig();

    virtual void setUp() override;
    virtual void tearDown() override;

    CPPUNIT_TEST_SUITE(ParallelSortTest);
    CPPUNIT_TEST(testSortTiny);
    CPPUNIT_TEST(testSortMedium);
    CPPUNIT_TEST(testSortBig);
    CPPUNIT_TEST_SUITE_END();

private:
    void sortTest(size_t nLen);
    void fillRandomUptoN(std::vector<size_t>& rVector, size_t N);

    comphelper::ThreadPool* pThreadPool;
    size_t mnThreads;
};

void ParallelSortTest::setUp()
{
    pThreadPool = &comphelper::ThreadPool::getSharedOptimalPool();
    mnThreads = pThreadPool->getWorkerCount();
}

void ParallelSortTest::tearDown()
{
    if (pThreadPool)
        pThreadPool->joinAll();
}

void ParallelSortTest::fillRandomUptoN(std::vector<size_t>& rVector, size_t N)
{
    rVector.resize(N);
    for (size_t nIdx = 0; nIdx < N; ++nIdx)
        rVector[nIdx] = nIdx;
    std::shuffle(rVector.begin(), rVector.end(), std::default_random_engine(42));
}

void ParallelSortTest::sortTest(size_t nLen)
{
    std::vector<size_t> aVector(nLen);
    fillRandomUptoN(aVector, nLen);
    comphelper::parallelSort(aVector.begin(), aVector.end());
    for (size_t nIdx = 0; nIdx < nLen; ++nIdx)
    {
        OString aMsg = "Wrong aVector[" + OString::number(nIdx) + "]";
        CPPUNIT_ASSERT_EQUAL_MESSAGE(aMsg.getStr(), nIdx, aVector[nIdx]);
    }
}

void ParallelSortTest::testSortTiny()
{
    sortTest(5);
    sortTest(15);
    sortTest(16);
    sortTest(17);
}

void ParallelSortTest::testSortMedium()
{
    sortTest(1025);
    sortTest(1029);
    sortTest(1024 * 2 + 1);
    sortTest(1024 * 2 + 9);
}

void ParallelSortTest::testSortBig() { sortTest(1024 * 16 + 3); }

CPPUNIT_TEST_SUITE_REGISTRATION(ParallelSortTest);

CPPUNIT_PLUGIN_IMPLEMENT();

/* vim:set shiftwidth=4 softtabstop=4 expandtab: */
diff --git a/include/comphelper/parallelsort.hxx b/include/comphelper/parallelsort.hxx
new file mode 100644
index 0000000..fc77bde
--- /dev/null
+++ b/include/comphelper/parallelsort.hxx
@@ -0,0 +1,373 @@
/* -*- Mode: C++; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */
/*
 * This file is part of the LibreOffice project.
 *
 * This Source Code Form is subject to the terms of the Mozilla Public
 * License, v. 2.0. If a copy of the MPL was not distributed with this
 * file, You can obtain one at http://mozilla.org/MPL/2.0/.
 */

#ifndef INCLUDED_COMPHELPER_PARALLELSORT_HXX
#define INCLUDED_COMPHELPER_PARALLELSORT_HXX

#include <comphelper/comphelperdllapi.h>
#include <comphelper/threadpool.hxx>
#include <tools/cpuid.hxx>

#include <memory>
#include <iterator>
#include <thread>
#include <algorithm>
#include <cmath>
#include <random>
#include <functional>
#include <iostream>
#include <chrono>

namespace comphelper
{
static const size_t nThreadCountGlobal = std::thread::hardware_concurrency();
const static bool bHyperThreadingActive = cpuid::hasHyperThreading();
static comphelper::ThreadPool& rTPool(comphelper::ThreadPool::getSharedOptimalPool());

static thread_local std::mt19937 aGenerator{ std::random_device{}() };

#define PARALLELSORT_ENABLEPZ 0

namespace
{
class ProfileZone
{
public:
#if PARALLELSORT_ENABLEPZ
    ProfileZone(const char* pTag)
        : maTag(pTag)
        , maStart(std::chrono::steady_clock::now())
        , mbFinished(false)
    {
    }

    ~ProfileZone()
    {
        if (!mbFinished)
            showTimeElapsed();
    }

    void stop()
    {
        showTimeElapsed();
        mbFinished = true;
    }
#else
    ProfileZone(const char* /*pTag*/)
        : mbDummy(true)
    {
    }

    void stop()
    {
        // Avoid loplugin:staticmethods, loplugin:staticaccess errors
        (void)mbDummy;
    }
#endif

private:
#if PARALLELSORT_ENABLEPZ

    void showTimeElapsed()
    {
        auto end = std::chrono::steady_clock::now();
        size_t elapsed
            = std::chrono::duration_cast<std::chrono::milliseconds>(end - maStart).count();
        std::cout << maTag << " : " << elapsed << " ms" << std::endl << std::flush;
    }

    std::string maTag;
    std::chrono::steady_clock::time_point maStart;
    bool mbFinished;
#else
    bool mbDummy;

#endif
};

class ParallelRunner
{
    class Executor : public comphelper::ThreadTask
    {
    public:
        Executor(const std::shared_ptr<comphelper::ThreadTaskTag>& rTag,
                 std::function<void()> aFunc)
            : comphelper::ThreadTask(rTag)
            , maFunc(std::move(aFunc))
        {
        }

        virtual void doWork() override { maFunc(); }

    private:
        const std::function<void()> maFunc;
    };

public:
    ParallelRunner() { maTag = comphelper::ThreadPool::createThreadTaskTag(); }

    void enqueue(std::function<void()> aFunc)
    {
        rTPool.pushTask(std::make_unique<Executor>(maTag, aFunc));
    }

    void wait() { rTPool.waitUntilDone(maTag, false); }

private:
    std::shared_ptr<comphelper::ThreadTaskTag> maTag;
};

constexpr size_t nMaxTreeArraySize = 64;

size_t lcl_round_down_pow2(size_t nNum)
{
    size_t nPow2;
    for (nPow2 = 1; nPow2 <= nNum; nPow2 <<= 1)
        ;
    return std::min((nPow2 >> 1), nMaxTreeArraySize);
}

template <class RandItr> struct Sampler
{
    using ValueType = typename std::iterator_traits<RandItr>::value_type;

    static void sample(RandItr aBegin, RandItr aEnd, ValueType* pSamples, size_t nSamples,
                       size_t /*nParallelism*/)
    {
        ProfileZone aZone("\tsample()");
        assert(aBegin <= aEnd);
        size_t nLen = static_cast<std::size_t>(aEnd - aBegin);
        assert(std::mt19937::max() >= nLen);

        for (size_t nIdx = 0; nIdx < nSamples; ++nIdx)
        {
            size_t nSel = aGenerator() % nLen--;
            std::swap(*(aBegin + nSel), *(aBegin + nLen));
            pSamples[nIdx] = *(aBegin + nLen);
        }
    }
};

template <class RandItr, class Compare> class Binner
{
    using ValueType = typename std::iterator_traits<RandItr>::value_type;

    const size_t mnTreeArraySize;
    const size_t mnDividers;
    constexpr static size_t mnMaxStaticSize = 1024 * 50;
    uint8_t maLabels[mnMaxStaticSize];
    ValueType maDividers[nMaxTreeArraySize];
    std::unique_ptr<uint8_t[]> pLabels;
    size_t maSepBinEnds[nMaxTreeArraySize * nMaxTreeArraySize];
    bool mbThreaded;

public:
    size_t maBinEnds[nMaxTreeArraySize];

    Binner(const ValueType* pSamples, size_t nSamples, size_t nBins, bool bThreaded)
        : mnTreeArraySize(lcl_round_down_pow2(nBins))
        , mnDividers(mnTreeArraySize - 1)
        , mbThreaded(bThreaded)
    {
        assert((nSamples % mnTreeArraySize) == 0);
        assert(mnTreeArraySize <= nMaxTreeArraySize);
        std::fill(maBinEnds, maBinEnds + mnTreeArraySize, 0);
        std::fill(maSepBinEnds, maSepBinEnds + mnTreeArraySize * mnTreeArraySize, 0);
        fillTreeArray(1, pSamples, pSamples + nSamples);
    }

    void fillTreeArray(size_t nPos, const ValueType* pLow, const ValueType* pHigh)
    {
        assert(pLow <= pHigh);
        const ValueType* pMid = pLow + (pHigh - pLow) / 2;
        maDividers[nPos] = *pMid;

        if (2 * nPos < mnDividers) // So that 2*nPos < mnTreeArraySize
        {
            fillTreeArray(2 * nPos, pLow, pMid);
            fillTreeArray(2 * nPos + 1, pMid + 1, pHigh);
        }
    }

    constexpr inline size_t findBin(const ValueType& rVal, Compare& aComp)
    {
        size_t nIdx = 1;
        while (nIdx <= mnDividers)
            nIdx = ((nIdx << 1) + aComp(maDividers[nIdx], rVal));
        return (nIdx - mnTreeArraySize);
    }

    void label(const RandItr aBegin, const RandItr aEnd, Compare& aComp)
    {
        ProfileZone aZoneSetup("\tlabel():setup");
        size_t nLen = static_cast<std::size_t>(aEnd - aBegin);
        if (nLen > mnMaxStaticSize)
            pLabels = std::make_unique<uint8_t[]>(nLen);
        uint8_t* pLabelsRaw = (nLen > mnMaxStaticSize) ? pLabels.get() : maLabels;
        aZoneSetup.stop();
        ProfileZone aZoneFindBins("\tFindBins()");
        if (mbThreaded)
        {
            ParallelRunner aPRunner;
            const size_t nBins = mnTreeArraySize;
            for (size_t nTIdx = 0; nTIdx < nBins; ++nTIdx)
            {
                aPRunner.enqueue([this, nTIdx, nBins, nLen, aBegin, pLabelsRaw, &aComp] {
                    ProfileZone aZoneIn("\t\tFindBinsThreaded()");
                    size_t nBinEndsStartIdx = nTIdx * mnTreeArraySize;
                    size_t* pBinEnds = maSepBinEnds + nBinEndsStartIdx;
                    size_t aBinEndsF[nMaxTreeArraySize] = { 0 };
                    for (size_t nIdx = nTIdx; nIdx < nLen; nIdx += nBins)
                    {
                        size_t nBinIdx = findBin(*(aBegin + nIdx), aComp);
                        pLabelsRaw[nIdx] = static_cast<uint8_t>(nBinIdx);
                        ++aBinEndsF[nBinIdx];
                    }

                    for (size_t nIdx = 0; nIdx < mnTreeArraySize; ++nIdx)
                        pBinEnds[nIdx] = aBinEndsF[nIdx];
                });
            }

            aPRunner.wait();

            // Populate maBinEnds from maSepBinEnds
            for (size_t nTIdx = 0; nTIdx < mnTreeArraySize; ++nTIdx)
            {
                for (size_t nSepIdx = 0; nSepIdx < mnTreeArraySize; ++nSepIdx)
                    maBinEnds[nTIdx] += maSepBinEnds[nSepIdx * mnTreeArraySize + nTIdx];
            }
        }
        else
        {
            uint8_t* pLabel = pLabelsRaw;
            for (RandItr aItr = aBegin; aItr != aEnd; ++aItr)
            {
                size_t nBinIdx = findBin(*aItr, aComp);
                *pLabel++ = nBinIdx;
                ++maBinEnds[nBinIdx];
            }
        }

        aZoneFindBins.stop();

        size_t nSum = 0;
        // Store each bin's starting position in maBinEnds array for now.
        for (size_t nIdx = 0; nIdx < mnTreeArraySize; ++nIdx)
        {
            size_t nSize = maBinEnds[nIdx];
            maBinEnds[nIdx] = nSum;
            nSum += nSize;
        }

        // Now maBinEnds has end positions of each bin.
    }

    void bin(const RandItr aBegin, const RandItr aEnd, ValueType* pOut)
    {
        ProfileZone aZone("\tbin()");
        const size_t nLen = static_cast<std::size_t>(aEnd - aBegin);
        uint8_t* pLabelsRaw = (nLen > mnMaxStaticSize) ? pLabels.get() : maLabels;
        size_t nIdx;
        for (nIdx = 0; nIdx < nLen; ++nIdx)
        {
            pOut[maBinEnds[pLabelsRaw[nIdx]]++] = *(aBegin + nIdx);
        }
    }
};

template <class RandItr, class Compare = std::less<>>
void s3sort(const RandItr aBegin, const RandItr aEnd, Compare aComp = Compare(),
            bool bThreaded = true)
{
    static size_t nThreadCount = nThreadCountGlobal;

    constexpr size_t nBaseCaseSize = 1024;
    const std::size_t nLen = static_cast<std::size_t>(aEnd - aBegin);
    if (nLen < nBaseCaseSize)
    {
        std::sort(aBegin, aEnd, aComp);
        return;
    }

    using ValueType = typename std::iterator_traits<RandItr>::value_type;
    auto pOut = std::make_unique<ValueType[]>(nLen);

    const size_t nBins = lcl_round_down_pow2(nThreadCount);
    const size_t nOverSamplingFactor = std::max(1.0, std::sqrt(static_cast<double>(nLen) / 64));
    const size_t nSamples = nOverSamplingFactor * nBins;
    auto aSamples = std::make_unique<ValueType[]>(nSamples);
    ProfileZone aZoneSampleAnsSort("SampleAndSort");
    // Select samples and sort them
    Sampler<RandItr>::sample(aBegin, aEnd, aSamples.get(), nSamples, nBins);
    std::sort(aSamples.get(), aSamples.get() + nSamples, aComp);
    aZoneSampleAnsSort.stop();

    if (!aComp(aSamples[0], aSamples[nSamples - 1]))
    {
        // All samples are equal, fallback to standard sort.
        std::sort(aBegin, aEnd, aComp);
        return;
    }

    ProfileZone aZoneBinner("Binner");
    // Create and populate bins using pOut from input iterators.
    Binner<RandItr, Compare> aBinner(aSamples.get(), nSamples, nBins, bThreaded);
    aBinner.label(aBegin, aEnd, aComp);
    aBinner.bin(aBegin, aEnd, pOut.get());
    aZoneBinner.stop();

    ProfileZone aZoneSortBins("SortBins");
    ValueType* pOutRaw = pOut.get();
    if (bThreaded)
    {
        ParallelRunner aPRunner;
        // Sort the bins separately.
        for (size_t nBinIdx = 0, nBinStart = 0; nBinIdx < nBins; ++nBinIdx)
        {
            size_t nBinEnd = aBinner.maBinEnds[nBinIdx];
            aPRunner.enqueue([pOutRaw, nBinStart, nBinEnd, &aComp] {
                std::sort(pOutRaw + nBinStart, pOutRaw + nBinEnd, aComp);
            });

            nBinStart = nBinEnd;
        }

        aPRunner.wait();
    }
    else
    {
        for (size_t nBinIdx = 0, nBinStart = 0; nBinIdx < nBins; ++nBinIdx)
        {
            auto nBinEnd = aBinner.maBinEnds[nBinIdx];
            std::sort(pOutRaw + nBinStart, pOutRaw + nBinEnd, aComp);
            nBinStart = nBinEnd;
        }
    }

    aZoneSortBins.stop();

    // Move the sorted array to the array specified by input iterators.
    std::move(pOutRaw, pOutRaw + nLen, aBegin);
}

} // anonymous namespace

template <class RandItr, class Compare = std::less<>>
void parallelSort(const RandItr aBegin, const RandItr aEnd, Compare aComp = Compare())
{
    assert(aBegin <= aEnd);
    s3sort(aBegin, aEnd, aComp);
}

} // namespace comphelper

#endif // INCLUDED_COMPHELPER_PARALLELSORT_HXX

/* vim:set shiftwidth=4 softtabstop=4 expandtab: */
diff --git a/sc/source/core/data/dpcache.cxx b/sc/source/core/data/dpcache.cxx
index cf7eaff..8a83461 100644
--- a/sc/source/core/data/dpcache.cxx
+++ b/sc/source/core/data/dpcache.cxx
@@ -32,6 +32,7 @@
#include <columniterator.hxx>
#include <cellvalue.hxx>

#include <comphelper/parallelsort.hxx>
#include <rtl/math.hxx>
#include <unotools/charclass.hxx>
#include <unotools/textsearch.hxx>
@@ -171,6 +172,7 @@
    ScDPItemData maValue;
    SCROW mnOrderIndex;
    SCROW mnDataIndex;
    Bucket() {}
    Bucket(const ScDPItemData& rValue, SCROW nData) :
        maValue(rValue), mnOrderIndex(0), mnDataIndex(nData) {}
};
@@ -250,7 +252,7 @@
        return;

    // Sort by the value.
    std::sort(aBuckets.begin(), aBuckets.end(), LessByValue());
    comphelper::parallelSort(aBuckets.begin(), aBuckets.end(), LessByValue());

    {
        // Set order index such that unique values have identical index value.
@@ -269,14 +271,14 @@
    }

    // Re-sort the bucket this time by the data index.
    std::sort(aBuckets.begin(), aBuckets.end(), LessByDataIndex());
    comphelper::parallelSort(aBuckets.begin(), aBuckets.end(), LessByDataIndex());

    // Copy the order index series into the field object.
    rField.maData.reserve(aBuckets.size());
    std::for_each(aBuckets.begin(), aBuckets.end(), PushBackOrderIndex(rField.maData));

    // Sort by the value again.
    std::sort(aBuckets.begin(), aBuckets.end(), LessByOrderIndex());
    comphelper::parallelSort(aBuckets.begin(), aBuckets.end(), LessByOrderIndex());

    // Unique by value.
    std::vector<Bucket>::iterator itUniqueEnd =