Spaces:
Runtime error
Runtime error
// From https://github.com/mmp/pbrt-v3/blob/master/src/core/parallel.cpp | |
static std::vector<std::thread> threads; | |
static bool shutdownThreads = false; | |
struct ParallelForLoop; | |
static ParallelForLoop *workList = nullptr; | |
static std::mutex workListMutex; | |
struct ParallelForLoop { | |
ParallelForLoop(std::function<void(int64_t)> func1D, int64_t maxIndex, int chunkSize) | |
: func1D(std::move(func1D)), maxIndex(maxIndex), chunkSize(chunkSize) { | |
} | |
ParallelForLoop(const std::function<void(Vector2i)> &f, const Vector2i count) | |
: func2D(f), maxIndex(count[0] * count[1]), chunkSize(1) { | |
nX = count[0]; | |
} | |
std::function<void(int64_t)> func1D; | |
std::function<void(Vector2i)> func2D; | |
const int64_t maxIndex; | |
const int chunkSize; | |
int64_t nextIndex = 0; | |
int activeWorkers = 0; | |
ParallelForLoop *next = nullptr; | |
int nX = -1; | |
bool Finished() const { | |
return nextIndex >= maxIndex && activeWorkers == 0; | |
} | |
}; | |
void Barrier::Wait() { | |
std::unique_lock<std::mutex> lock(mutex); | |
assert(count > 0); | |
if (--count == 0) { | |
// This is the last thread to reach the barrier; wake up all of the | |
// other ones before exiting. | |
cv.notify_all(); | |
} else { | |
// Otherwise there are still threads that haven't reached it. Give | |
// up the lock and wait to be notified. | |
cv.wait(lock, [this] { return count == 0; }); | |
} | |
} | |
static std::condition_variable workListCondition; | |
static void worker_thread_func(const int tIndex, std::shared_ptr<Barrier> barrier) { | |
ThreadIndex = tIndex; | |
// The main thread sets up a barrier so that it can be sure that all | |
// workers have called ProfilerWorkerThreadInit() before it continues | |
// (and actually starts the profiling system). | |
barrier->Wait(); | |
// Release our reference to the Barrier so that it's freed once all of | |
// the threads have cleared it. | |
barrier.reset(); | |
std::unique_lock<std::mutex> lock(workListMutex); | |
while (!shutdownThreads) { | |
if (!workList) { | |
// Sleep until there are more tasks to run | |
workListCondition.wait(lock); | |
} else { | |
// Get work from _workList_ and run loop iterations | |
ParallelForLoop &loop = *workList; | |
// Run a chunk of loop iterations for _loop_ | |
// Find the set of loop iterations to run next | |
int64_t indexStart = loop.nextIndex; | |
int64_t indexEnd = std::min(indexStart + loop.chunkSize, loop.maxIndex); | |
// Update _loop_ to reflect iterations this thread will run | |
loop.nextIndex = indexEnd; | |
if (loop.nextIndex == loop.maxIndex) | |
workList = loop.next; | |
loop.activeWorkers++; | |
// Run loop indices in _[indexStart, indexEnd)_ | |
lock.unlock(); | |
for (int64_t index = indexStart; index < indexEnd; ++index) { | |
if (loop.func1D) { | |
loop.func1D(index); | |
} | |
// Handle other types of loops | |
else { | |
assert(loop.func2D != nullptr); | |
loop.func2D(Vector2i{int(index % loop.nX), | |
int(index / loop.nX)}); | |
} | |
} | |
lock.lock(); | |
// Update _loop_ to reflect completion of iterations | |
loop.activeWorkers--; | |
if (loop.Finished()) { | |
workListCondition.notify_all(); | |
} | |
} | |
} | |
} | |
void parallel_for_host(const std::function<void(int64_t)> &func, | |
int64_t count, | |
int chunkSize) { | |
// Run iterations immediately if not using threads or if _count_ is small | |
if (threads.empty() || count < chunkSize) { | |
for (int64_t i = 0; i < count; ++i) { | |
func(i); | |
} | |
return; | |
} | |
// Create and enqueue _ParallelForLoop_ for this loop | |
ParallelForLoop loop(func, count, chunkSize); | |
workListMutex.lock(); | |
loop.next = workList; | |
workList = &loop; | |
workListMutex.unlock(); | |
// Notify worker threads of work to be done | |
std::unique_lock<std::mutex> lock(workListMutex); | |
workListCondition.notify_all(); | |
// Help out with parallel loop iterations in the current thread | |
while (!loop.Finished()) { | |
// Run a chunk of loop iterations for _loop_ | |
// Find the set of loop iterations to run next | |
int64_t indexStart = loop.nextIndex; | |
int64_t indexEnd = std::min(indexStart + loop.chunkSize, loop.maxIndex); | |
// Update _loop_ to reflect iterations this thread will run | |
loop.nextIndex = indexEnd; | |
if (loop.nextIndex == loop.maxIndex) { | |
workList = loop.next; | |
} | |
loop.activeWorkers++; | |
// Run loop indices in _[indexStart, indexEnd)_ | |
lock.unlock(); | |
for (int64_t index = indexStart; index < indexEnd; ++index) { | |
if (loop.func1D) { | |
loop.func1D(index); | |
} | |
// Handle other types of loops | |
else { | |
assert(loop.func2D != nullptr); | |
loop.func2D(Vector2i{int(index % loop.nX), | |
int(index / loop.nX)}); | |
} | |
} | |
lock.lock(); | |
// Update _loop_ to reflect completion of iterations | |
loop.activeWorkers--; | |
} | |
} | |
thread_local int ThreadIndex; | |
void parallel_for_host( | |
std::function<void(Vector2i)> func, const Vector2i count) { | |
// Launch worker threads if needed | |
if (threads.empty() || count.x * count.y <= 1) { | |
for (int y = 0; y < count.y; ++y) { | |
for (int x = 0; x < count.x; ++x) { | |
func(Vector2i{x, y}); | |
} | |
} | |
return; | |
} | |
ParallelForLoop loop(std::move(func), count); | |
{ | |
std::lock_guard<std::mutex> lock(workListMutex); | |
loop.next = workList; | |
workList = &loop; | |
} | |
std::unique_lock<std::mutex> lock(workListMutex); | |
workListCondition.notify_all(); | |
// Help out with parallel loop iterations in the current thread | |
while (!loop.Finished()) { | |
// Run a chunk of loop iterations for _loop_ | |
// Find the set of loop iterations to run next | |
int64_t indexStart = loop.nextIndex; | |
int64_t indexEnd = std::min(indexStart + loop.chunkSize, loop.maxIndex); | |
// Update _loop_ to reflect iterations this thread will run | |
loop.nextIndex = indexEnd; | |
if (loop.nextIndex == loop.maxIndex) { | |
workList = loop.next; | |
} | |
loop.activeWorkers++; | |
// Run loop indices in _[indexStart, indexEnd)_ | |
lock.unlock(); | |
for (int64_t index = indexStart; index < indexEnd; ++index) { | |
if (loop.func1D) { | |
loop.func1D(index); | |
} | |
// Handle other types of loops | |
else { | |
assert(loop.func2D != nullptr); | |
loop.func2D(Vector2i{int(index % loop.nX), | |
int(index / loop.nX)}); | |
} | |
} | |
lock.lock(); | |
// Update _loop_ to reflect completion of iterations | |
loop.activeWorkers--; | |
} | |
} | |
int num_system_cores() { | |
// return 1; | |
int ret = std::thread::hardware_concurrency(); | |
if (ret == 0) { | |
return 16; | |
} | |
return ret; | |
} | |
void parallel_init() { | |
assert(threads.size() == 0); | |
int nThreads = num_system_cores(); | |
ThreadIndex = 0; | |
// Create a barrier so that we can be sure all worker threads get past | |
// their call to ProfilerWorkerThreadInit() before we return from this | |
// function. In turn, we can be sure that the profiling system isn't | |
// started until after all worker threads have done that. | |
std::shared_ptr<Barrier> barrier = std::make_shared<Barrier>(nThreads); | |
// Launch one fewer worker thread than the total number we want doing | |
// work, since the main thread helps out, too. | |
for (int i = 0; i < nThreads - 1; ++i) { | |
threads.push_back(std::thread(worker_thread_func, i + 1, barrier)); | |
} | |
barrier->Wait(); | |
} | |
void parallel_cleanup() { | |
if (threads.empty()) { | |
return; | |
} | |
{ | |
std::lock_guard<std::mutex> lock(workListMutex); | |
shutdownThreads = true; | |
workListCondition.notify_all(); | |
} | |
for (std::thread &thread : threads) { | |
thread.join(); | |
} | |
threads.erase(threads.begin(), threads.end()); | |
shutdownThreads = false; | |
} | |