#include "parallel.h" #include #include #include #include #include // From https://github.com/mmp/pbrt-v3/blob/master/src/core/parallel.cpp static std::vector threads; static bool shutdownThreads = false; struct ParallelForLoop; static ParallelForLoop *workList = nullptr; static std::mutex workListMutex; struct ParallelForLoop { ParallelForLoop(std::function func1D, int64_t maxIndex, int chunkSize) : func1D(std::move(func1D)), maxIndex(maxIndex), chunkSize(chunkSize) { } ParallelForLoop(const std::function &f, const Vector2i count) : func2D(f), maxIndex(count[0] * count[1]), chunkSize(1) { nX = count[0]; } std::function func1D; std::function func2D; const int64_t maxIndex; const int chunkSize; int64_t nextIndex = 0; int activeWorkers = 0; ParallelForLoop *next = nullptr; int nX = -1; bool Finished() const { return nextIndex >= maxIndex && activeWorkers == 0; } }; void Barrier::Wait() { std::unique_lock lock(mutex); assert(count > 0); if (--count == 0) { // This is the last thread to reach the barrier; wake up all of the // other ones before exiting. cv.notify_all(); } else { // Otherwise there are still threads that haven't reached it. Give // up the lock and wait to be notified. cv.wait(lock, [this] { return count == 0; }); } } static std::condition_variable workListCondition; static void worker_thread_func(const int tIndex, std::shared_ptr barrier) { ThreadIndex = tIndex; // The main thread sets up a barrier so that it can be sure that all // workers have called ProfilerWorkerThreadInit() before it continues // (and actually starts the profiling system). barrier->Wait(); // Release our reference to the Barrier so that it's freed once all of // the threads have cleared it. barrier.reset(); std::unique_lock lock(workListMutex); while (!shutdownThreads) { if (!workList) { // Sleep until there are more tasks to run workListCondition.wait(lock); } else { // Get work from _workList_ and run loop iterations ParallelForLoop &loop = *workList; // Run a chunk of loop iterations for _loop_ // Find the set of loop iterations to run next int64_t indexStart = loop.nextIndex; int64_t indexEnd = std::min(indexStart + loop.chunkSize, loop.maxIndex); // Update _loop_ to reflect iterations this thread will run loop.nextIndex = indexEnd; if (loop.nextIndex == loop.maxIndex) workList = loop.next; loop.activeWorkers++; // Run loop indices in _[indexStart, indexEnd)_ lock.unlock(); for (int64_t index = indexStart; index < indexEnd; ++index) { if (loop.func1D) { loop.func1D(index); } // Handle other types of loops else { assert(loop.func2D != nullptr); loop.func2D(Vector2i{int(index % loop.nX), int(index / loop.nX)}); } } lock.lock(); // Update _loop_ to reflect completion of iterations loop.activeWorkers--; if (loop.Finished()) { workListCondition.notify_all(); } } } } void parallel_for_host(const std::function &func, int64_t count, int chunkSize) { // Run iterations immediately if not using threads or if _count_ is small if (threads.empty() || count < chunkSize) { for (int64_t i = 0; i < count; ++i) { func(i); } return; } // Create and enqueue _ParallelForLoop_ for this loop ParallelForLoop loop(func, count, chunkSize); workListMutex.lock(); loop.next = workList; workList = &loop; workListMutex.unlock(); // Notify worker threads of work to be done std::unique_lock lock(workListMutex); workListCondition.notify_all(); // Help out with parallel loop iterations in the current thread while (!loop.Finished()) { // Run a chunk of loop iterations for _loop_ // Find the set of loop iterations to run next int64_t indexStart = loop.nextIndex; int64_t indexEnd = std::min(indexStart + loop.chunkSize, loop.maxIndex); // Update _loop_ to reflect iterations this thread will run loop.nextIndex = indexEnd; if (loop.nextIndex == loop.maxIndex) { workList = loop.next; } loop.activeWorkers++; // Run loop indices in _[indexStart, indexEnd)_ lock.unlock(); for (int64_t index = indexStart; index < indexEnd; ++index) { if (loop.func1D) { loop.func1D(index); } // Handle other types of loops else { assert(loop.func2D != nullptr); loop.func2D(Vector2i{int(index % loop.nX), int(index / loop.nX)}); } } lock.lock(); // Update _loop_ to reflect completion of iterations loop.activeWorkers--; } } thread_local int ThreadIndex; void parallel_for_host( std::function func, const Vector2i count) { // Launch worker threads if needed if (threads.empty() || count.x * count.y <= 1) { for (int y = 0; y < count.y; ++y) { for (int x = 0; x < count.x; ++x) { func(Vector2i{x, y}); } } return; } ParallelForLoop loop(std::move(func), count); { std::lock_guard lock(workListMutex); loop.next = workList; workList = &loop; } std::unique_lock lock(workListMutex); workListCondition.notify_all(); // Help out with parallel loop iterations in the current thread while (!loop.Finished()) { // Run a chunk of loop iterations for _loop_ // Find the set of loop iterations to run next int64_t indexStart = loop.nextIndex; int64_t indexEnd = std::min(indexStart + loop.chunkSize, loop.maxIndex); // Update _loop_ to reflect iterations this thread will run loop.nextIndex = indexEnd; if (loop.nextIndex == loop.maxIndex) { workList = loop.next; } loop.activeWorkers++; // Run loop indices in _[indexStart, indexEnd)_ lock.unlock(); for (int64_t index = indexStart; index < indexEnd; ++index) { if (loop.func1D) { loop.func1D(index); } // Handle other types of loops else { assert(loop.func2D != nullptr); loop.func2D(Vector2i{int(index % loop.nX), int(index / loop.nX)}); } } lock.lock(); // Update _loop_ to reflect completion of iterations loop.activeWorkers--; } } int num_system_cores() { // return 1; int ret = std::thread::hardware_concurrency(); if (ret == 0) { return 16; } return ret; } void parallel_init() { assert(threads.size() == 0); int nThreads = num_system_cores(); ThreadIndex = 0; // Create a barrier so that we can be sure all worker threads get past // their call to ProfilerWorkerThreadInit() before we return from this // function. In turn, we can be sure that the profiling system isn't // started until after all worker threads have done that. std::shared_ptr barrier = std::make_shared(nThreads); // Launch one fewer worker thread than the total number we want doing // work, since the main thread helps out, too. for (int i = 0; i < nThreads - 1; ++i) { threads.push_back(std::thread(worker_thread_func, i + 1, barrier)); } barrier->Wait(); } void parallel_cleanup() { if (threads.empty()) { return; } { std::lock_guard lock(workListMutex); shutdownThreads = true; workListCondition.notify_all(); } for (std::thread &thread : threads) { thread.join(); } threads.erase(threads.begin(), threads.end()); shutdownThreads = false; }