Path: blob/master/core/object/worker_thread_pool.cpp
10277 views
/**************************************************************************/1/* worker_thread_pool.cpp */2/**************************************************************************/3/* This file is part of: */4/* GODOT ENGINE */5/* https://godotengine.org */6/**************************************************************************/7/* Copyright (c) 2014-present Godot Engine contributors (see AUTHORS.md). */8/* Copyright (c) 2007-2014 Juan Linietsky, Ariel Manzur. */9/* */10/* Permission is hereby granted, free of charge, to any person obtaining */11/* a copy of this software and associated documentation files (the */12/* "Software"), to deal in the Software without restriction, including */13/* without limitation the rights to use, copy, modify, merge, publish, */14/* distribute, sublicense, and/or sell copies of the Software, and to */15/* permit persons to whom the Software is furnished to do so, subject to */16/* the following conditions: */17/* */18/* The above copyright notice and this permission notice shall be */19/* included in all copies or substantial portions of the Software. */20/* */21/* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, */22/* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF */23/* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. */24/* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY */25/* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, */26/* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE */27/* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */28/**************************************************************************/2930#include "worker_thread_pool.h"3132#include "core/object/script_language.h"33#include "core/os/os.h"34#include "core/os/safe_binary_mutex.h"35#include "core/os/thread_safe.h"3637WorkerThreadPool::Task *const WorkerThreadPool::ThreadData::YIELDING = (Task *)1;3839HashMap<StringName, WorkerThreadPool *> WorkerThreadPool::named_pools;4041void WorkerThreadPool::Task::free_template_userdata() {42ERR_FAIL_NULL(template_userdata);43ERR_FAIL_NULL(native_func_userdata);44BaseTemplateUserdata *btu = (BaseTemplateUserdata *)native_func_userdata;45memdelete(btu);46}4748WorkerThreadPool *WorkerThreadPool::singleton = nullptr;4950#ifdef THREADS_ENABLED51thread_local WorkerThreadPool::UnlockableLocks WorkerThreadPool::unlockable_locks[MAX_UNLOCKABLE_LOCKS];52#endif5354void WorkerThreadPool::_process_task(Task *p_task) {55#ifdef THREADS_ENABLED56int pool_thread_index = thread_ids[Thread::get_caller_id()];57ThreadData &curr_thread = threads[pool_thread_index];58Task *prev_task = nullptr; // In case this is recursively called.5960bool safe_for_nodes_backup = is_current_thread_safe_for_nodes();61CallQueue *call_queue_backup = MessageQueue::get_singleton() != MessageQueue::get_main_singleton() ? MessageQueue::get_singleton() : nullptr;6263{64// Tasks must start with these at default values. They are free to set-and-forget otherwise.65set_current_thread_safe_for_nodes(false);66MessageQueue::set_thread_singleton_override(nullptr);6768// Since the WorkerThreadPool is started before the script server,69// its pre-created threads can't have ScriptServer::thread_enter() called on them early.70// Therefore, we do it late at the first opportunity, so in case the task71// about to be run uses scripting, guarantees are held.72ScriptServer::thread_enter();7374task_mutex.lock();75p_task->pool_thread_index = pool_thread_index;76prev_task = curr_thread.current_task;77curr_thread.current_task = p_task;78curr_thread.has_pump_task = p_task->is_pump_task;79if (p_task->pending_notify_yield_over) {80curr_thread.yield_is_over = true;81}82task_mutex.unlock();83}84#endif8586#ifdef THREADS_ENABLED87bool low_priority = p_task->low_priority;88#endif8990if (p_task->group) {91// Handling a group92bool do_post = false;9394while (true) {95uint32_t work_index = p_task->group->index.postincrement();9697if (work_index >= p_task->group->max) {98break;99}100if (p_task->native_group_func) {101p_task->native_group_func(p_task->native_func_userdata, work_index);102} else if (p_task->template_userdata) {103p_task->template_userdata->callback_indexed(work_index);104} else {105p_task->callable.call(work_index);106}107108// This is the only way to ensure posting is done when all tasks are really complete.109uint32_t completed_amount = p_task->group->completed_index.increment();110111if (completed_amount == p_task->group->max) {112do_post = true;113}114}115116if (do_post && p_task->template_userdata) {117memdelete(p_task->template_userdata); // This is no longer needed at this point, so get rid of it.118}119120if (do_post) {121p_task->group->done_semaphore.post();122p_task->group->completed.set_to(true);123}124uint32_t max_users = p_task->group->tasks_used + 1; // Add 1 because the thread waiting for it is also user. Read before to avoid another thread freeing task after increment.125uint32_t finished_users = p_task->group->finished.increment();126127if (finished_users == max_users) {128// Get rid of the group, because nobody else is using it.129MutexLock task_lock(task_mutex);130group_allocator.free(p_task->group);131}132133// For groups, tasks get rid of themselves.134135task_mutex.lock();136task_allocator.free(p_task);137} else {138if (p_task->native_func) {139p_task->native_func(p_task->native_func_userdata);140} else if (p_task->template_userdata) {141p_task->template_userdata->callback();142memdelete(p_task->template_userdata);143} else {144p_task->callable.call();145}146147task_mutex.lock();148p_task->completed = true;149p_task->pool_thread_index = -1;150if (p_task->waiting_user) {151p_task->done_semaphore.post(p_task->waiting_user);152}153// Let awaiters know.154for (uint32_t i = 0; i < threads.size(); i++) {155if (threads[i].awaited_task == p_task) {156threads[i].cond_var.notify_one();157threads[i].signaled = true;158}159}160}161162#ifdef THREADS_ENABLED163{164curr_thread.current_task = prev_task;165if (low_priority) {166low_priority_threads_used--;167168if (_try_promote_low_priority_task()) {169if (prev_task) { // Otherwise, this thread will catch it.170_notify_threads(&curr_thread, 1, 0);171}172}173}174175task_mutex.unlock();176}177178set_current_thread_safe_for_nodes(safe_for_nodes_backup);179MessageQueue::set_thread_singleton_override(call_queue_backup);180#endif181}182183void WorkerThreadPool::_thread_function(void *p_user) {184ThreadData *thread_data = (ThreadData *)p_user;185Thread::set_name(vformat("WorkerThread %d", thread_data->index));186187while (true) {188Task *task_to_process = nullptr;189{190// Create the lock outside the inner loop so it isn't needlessly unlocked and relocked191// when no task was found to process, and the loop is re-entered.192MutexLock lock(thread_data->pool->task_mutex);193194while (true) {195bool exit = thread_data->pool->_handle_runlevel(thread_data, lock);196if (unlikely(exit)) {197return;198}199200thread_data->signaled = false;201202if (!thread_data->pool->task_queue.first()) {203// There wasn't a task available yet.204// Let's wait for the next notification, then recheck.205thread_data->cond_var.wait(lock);206continue;207}208209// Got a task to process! Remove it from the queue, then break into the task handling section.210task_to_process = thread_data->pool->task_queue.first()->self();211thread_data->pool->task_queue.remove(thread_data->pool->task_queue.first());212break;213}214}215216DEV_ASSERT(task_to_process);217thread_data->pool->_process_task(task_to_process);218}219}220221void WorkerThreadPool::_post_tasks(Task **p_tasks, uint32_t p_count, bool p_high_priority, MutexLock<BinaryMutex> &p_lock, bool p_pump_task) {222// Fall back to processing on the calling thread if there are no worker threads.223// Separated into its own variable to make it easier to extend this logic224// in custom builds.225226// Avoid calling pump tasks or low priority tasks from the calling thread.227bool process_on_calling_thread = threads.is_empty() && !p_pump_task;228if (process_on_calling_thread) {229p_lock.temp_unlock();230for (uint32_t i = 0; i < p_count; i++) {231_process_task(p_tasks[i]);232}233p_lock.temp_relock();234return;235}236237while (runlevel == RUNLEVEL_EXIT_LANGUAGES) {238control_cond_var.wait(p_lock);239}240241uint32_t to_process = 0;242uint32_t to_promote = 0;243244ThreadData *caller_pool_thread = thread_ids.has(Thread::get_caller_id()) ? &threads[thread_ids[Thread::get_caller_id()]] : nullptr;245246for (uint32_t i = 0; i < p_count; i++) {247p_tasks[i]->low_priority = !p_high_priority;248if (p_high_priority || low_priority_threads_used < max_low_priority_threads) {249task_queue.add_last(&p_tasks[i]->task_elem);250if (!p_high_priority) {251low_priority_threads_used++;252}253to_process++;254} else {255// Too many threads using low priority, must go to queue.256low_priority_task_queue.add_last(&p_tasks[i]->task_elem);257to_promote++;258}259}260261_notify_threads(caller_pool_thread, to_process, to_promote);262}263264void WorkerThreadPool::_notify_threads(const ThreadData *p_current_thread_data, uint32_t p_process_count, uint32_t p_promote_count) {265uint32_t to_process = p_process_count;266uint32_t to_promote = p_promote_count;267268// This is where which threads are awaken is decided according to the workload.269// Threads that will anyway have a chance to check the situation and process/promote tasks270// are excluded from being notified. Others will be tried anyway to try to distribute load.271// The current thread, if is a pool thread, is also excluded depending on the promoting/processing272// needs because it will anyway loop again. However, it will contribute to decreasing the count,273// which helps reducing sync traffic.274275uint32_t thread_count = threads.size();276277// First round:278// 1. For processing: notify threads that are not running tasks, to keep the stacks as shallow as possible.279// 2. For promoting: since it's exclusive with processing, we fin threads able to promote low-prio tasks now.280for (uint32_t i = 0;281i < thread_count && (to_process || to_promote);282i++, notify_index = (notify_index + 1) % thread_count) {283ThreadData &th = threads[notify_index];284285if (th.signaled) {286continue;287}288if (th.current_task) {289// Good thread for promoting low-prio?290if (to_promote && th.awaited_task && th.current_task->low_priority) {291if (likely(&th != p_current_thread_data)) {292th.cond_var.notify_one();293}294th.signaled = true;295to_promote--;296}297} else {298if (to_process) {299if (likely(&th != p_current_thread_data)) {300th.cond_var.notify_one();301}302th.signaled = true;303to_process--;304}305}306}307308// Second round:309// For processing: if the first round wasn't enough, let's try now with threads processing tasks but currently awaiting.310for (uint32_t i = 0;311i < thread_count && to_process;312i++, notify_index = (notify_index + 1) % thread_count) {313ThreadData &th = threads[notify_index];314315if (th.signaled) {316continue;317}318if (th.awaited_task) {319if (likely(&th != p_current_thread_data)) {320th.cond_var.notify_one();321}322th.signaled = true;323to_process--;324}325}326}327328bool WorkerThreadPool::_try_promote_low_priority_task() {329if (low_priority_task_queue.first()) {330Task *low_prio_task = low_priority_task_queue.first()->self();331low_priority_task_queue.remove(low_priority_task_queue.first());332task_queue.add_last(&low_prio_task->task_elem);333low_priority_threads_used++;334return true;335} else {336return false;337}338}339340WorkerThreadPool::TaskID WorkerThreadPool::add_native_task(void (*p_func)(void *), void *p_userdata, bool p_high_priority, const String &p_description) {341return _add_task(Callable(), p_func, p_userdata, nullptr, p_high_priority, p_description);342}343344WorkerThreadPool::TaskID WorkerThreadPool::_add_task(const Callable &p_callable, void (*p_func)(void *), void *p_userdata, BaseTemplateUserdata *p_template_userdata, bool p_high_priority, const String &p_description, bool p_pump_task) {345MutexLock<BinaryMutex> lock(task_mutex);346347// Get a free task348Task *task = task_allocator.alloc();349TaskID id = last_task++;350task->self = id;351task->callable = p_callable;352task->native_func = p_func;353task->native_func_userdata = p_userdata;354task->description = p_description;355task->template_userdata = p_template_userdata;356task->is_pump_task = p_pump_task;357tasks.insert(id, task);358359#ifdef THREADS_ENABLED360if (p_pump_task) {361pump_task_count++;362int thread_count = get_thread_count();363if (pump_task_count >= thread_count) {364print_verbose(vformat("A greater number of dedicated threads were requested (%d) than threads available (%d). Please increase the number of available worker task threads. Recovering this session by spawning more worker task threads.", pump_task_count + 1, thread_count)); // +1 because we want to keep a Thread without any pump tasks free.365366Thread::Settings settings;367#ifdef __APPLE__368// The default stack size for new threads on Apple platforms is 512KiB.369// This is insufficient when using a library like SPIRV-Cross,370// which can generate deep stacks and result in a stack overflow.371#ifdef DEV_ENABLED372// Debug builds need an even larger stack size.373settings.stack_size = 2 * 1024 * 1024; // 2 MiB374#else375settings.stack_size = 1 * 1024 * 1024; // 1 MiB376#endif377#endif378// Re-sizing implies relocation, which is not supported for this array.379CRASH_COND_MSG(thread_count + 1 > (int)threads.get_capacity(), "Reserve trick for worker thread pool failed. Crashing.");380threads.resize_initialized(thread_count + 1);381threads[thread_count].index = thread_count;382threads[thread_count].pool = this;383threads[thread_count].thread.start(&WorkerThreadPool::_thread_function, &threads[thread_count], settings);384thread_ids.insert(threads[thread_count].thread.get_id(), thread_count);385}386}387#endif388389_post_tasks(&task, 1, p_high_priority, lock, p_pump_task);390391return id;392}393394WorkerThreadPool::TaskID WorkerThreadPool::add_task(const Callable &p_action, bool p_high_priority, const String &p_description, bool p_pump_task) {395return _add_task(p_action, nullptr, nullptr, nullptr, p_high_priority, p_description, p_pump_task);396}397398WorkerThreadPool::TaskID WorkerThreadPool::add_task_bind(const Callable &p_action, bool p_high_priority, const String &p_description) {399return _add_task(p_action, nullptr, nullptr, nullptr, p_high_priority, p_description, false);400}401402bool WorkerThreadPool::is_task_completed(TaskID p_task_id) const {403MutexLock task_lock(task_mutex);404const Task *const *taskp = tasks.getptr(p_task_id);405if (!taskp) {406ERR_FAIL_V_MSG(false, "Invalid Task ID"); // Invalid task407}408409return (*taskp)->completed;410}411412Error WorkerThreadPool::wait_for_task_completion(TaskID p_task_id) {413task_mutex.lock();414Task **taskp = tasks.getptr(p_task_id);415if (!taskp) {416task_mutex.unlock();417ERR_FAIL_V_MSG(ERR_INVALID_PARAMETER, "Invalid Task ID"); // Invalid task418}419Task *task = *taskp;420421if (task->completed) {422if (task->waiting_pool == 0 && task->waiting_user == 0) {423tasks.erase(p_task_id);424task_allocator.free(task);425}426task_mutex.unlock();427return OK;428}429430ThreadData *caller_pool_thread = thread_ids.has(Thread::get_caller_id()) ? &threads[thread_ids[Thread::get_caller_id()]] : nullptr;431if (caller_pool_thread && p_task_id <= caller_pool_thread->current_task->self) {432// Deadlock prevention:433// When a pool thread wants to wait for an older task, the following situations can happen:434// 1. Awaited task is deep in the stack of the awaiter.435// 2. A group of awaiter threads end up depending on some tasks buried in the stack436// of their worker threads in such a way that progress can't be made.437// Both would entail a deadlock. Some may be handled here in the WorkerThreadPool438// with some extra logic and bookkeeping. However, there would still be unavoidable439// cases of deadlock because of the way waiting threads process outstanding tasks.440// Taking into account there's no feasible solution for every possible case441// with the current design, we just simply reject attempts to await on older tasks,442// with a specific error code that signals the situation so the caller can handle it.443task_mutex.unlock();444return ERR_BUSY;445}446447if (caller_pool_thread) {448task->waiting_pool++;449} else {450task->waiting_user++;451}452453if (caller_pool_thread) {454task_mutex.unlock();455_wait_collaboratively(caller_pool_thread, task);456task_mutex.lock();457task->waiting_pool--;458if (task->waiting_pool == 0 && task->waiting_user == 0) {459tasks.erase(p_task_id);460task_allocator.free(task);461}462} else {463task_mutex.unlock();464task->done_semaphore.wait();465task_mutex.lock();466task->waiting_user--;467if (task->waiting_pool == 0 && task->waiting_user == 0) {468tasks.erase(p_task_id);469task_allocator.free(task);470}471}472473task_mutex.unlock();474return OK;475}476477void WorkerThreadPool::_lock_unlockable_mutexes() {478#ifdef THREADS_ENABLED479for (uint32_t i = 0; i < MAX_UNLOCKABLE_LOCKS; i++) {480if (unlockable_locks[i].ulock) {481unlockable_locks[i].ulock->lock();482}483}484#endif485}486487void WorkerThreadPool::_unlock_unlockable_mutexes() {488#ifdef THREADS_ENABLED489for (uint32_t i = 0; i < MAX_UNLOCKABLE_LOCKS; i++) {490if (unlockable_locks[i].ulock) {491unlockable_locks[i].ulock->unlock();492}493}494#endif495}496497void WorkerThreadPool::_wait_collaboratively(ThreadData *p_caller_pool_thread, Task *p_task) {498// Keep processing tasks until the condition to stop waiting is met.499500while (true) {501Task *task_to_process = nullptr;502bool relock_unlockables = false;503{504MutexLock lock(task_mutex);505506bool was_signaled = p_caller_pool_thread->signaled;507p_caller_pool_thread->signaled = false;508509bool exit = _handle_runlevel(p_caller_pool_thread, lock);510if (unlikely(exit)) {511break;512}513514bool wait_is_over = false;515if (unlikely(p_task == ThreadData::YIELDING)) {516if (p_caller_pool_thread->yield_is_over) {517p_caller_pool_thread->yield_is_over = false;518wait_is_over = true;519}520} else {521if (p_task->completed) {522wait_is_over = true;523}524}525526if (wait_is_over) {527if (was_signaled) {528// This thread was awaken for some additional reason, but it's about to exit.529// Let's find out what may be pending and forward the requests.530uint32_t to_process = task_queue.first() ? 1 : 0;531uint32_t to_promote = p_caller_pool_thread->current_task->low_priority && low_priority_task_queue.first() ? 1 : 0;532if (to_process || to_promote) {533// This thread must be left alone since it won't loop again.534p_caller_pool_thread->signaled = true;535_notify_threads(p_caller_pool_thread, to_process, to_promote);536}537}538539break;540}541542if (p_caller_pool_thread->current_task->low_priority && low_priority_task_queue.first()) {543if (_try_promote_low_priority_task()) {544_notify_threads(p_caller_pool_thread, 1, 0);545}546}547548if (p_caller_pool_thread->pool->task_queue.first()) {549task_to_process = task_queue.first()->self();550if ((p_task == ThreadData::YIELDING || p_caller_pool_thread->has_pump_task == true) && task_to_process->is_pump_task) {551task_to_process = nullptr;552_notify_threads(p_caller_pool_thread, 1, 0);553} else {554task_queue.remove(task_queue.first());555}556}557558if (!task_to_process) {559p_caller_pool_thread->awaited_task = p_task;560561if (this == singleton) {562_unlock_unlockable_mutexes();563}564relock_unlockables = true;565566p_caller_pool_thread->cond_var.wait(lock);567568p_caller_pool_thread->awaited_task = nullptr;569}570}571572if (relock_unlockables && this == singleton) {573_lock_unlockable_mutexes();574}575576if (task_to_process) {577_process_task(task_to_process);578}579}580}581582void WorkerThreadPool::_switch_runlevel(Runlevel p_runlevel) {583DEV_ASSERT(p_runlevel > runlevel);584runlevel = p_runlevel;585memset(&runlevel_data, 0, sizeof(runlevel_data));586for (uint32_t i = 0; i < threads.size(); i++) {587threads[i].cond_var.notify_one();588threads[i].signaled = true;589}590control_cond_var.notify_all();591}592593// Returns whether threads have to exit. This may perform the check about handling needed.594bool WorkerThreadPool::_handle_runlevel(ThreadData *p_thread_data, MutexLock<BinaryMutex> &p_lock) {595bool exit = false;596switch (runlevel) {597case RUNLEVEL_NORMAL: {598} break;599case RUNLEVEL_PRE_EXIT_LANGUAGES: {600if (!p_thread_data->pre_exited_languages) {601if (!task_queue.first() && !low_priority_task_queue.first()) {602p_thread_data->pre_exited_languages = true;603runlevel_data.pre_exit_languages.num_idle_threads++;604control_cond_var.notify_all();605}606}607} break;608case RUNLEVEL_EXIT_LANGUAGES: {609if (!p_thread_data->exited_languages) {610p_lock.temp_unlock();611ScriptServer::thread_exit();612p_lock.temp_relock();613p_thread_data->exited_languages = true;614runlevel_data.exit_languages.num_exited_threads++;615control_cond_var.notify_all();616}617} break;618case RUNLEVEL_EXIT: {619exit = true;620} break;621}622return exit;623}624625void WorkerThreadPool::yield() {626int th_index = get_thread_index();627ERR_FAIL_COND_MSG(th_index == -1, "This function can only be called from a worker thread.");628_wait_collaboratively(&threads[th_index], ThreadData::YIELDING);629630task_mutex.lock();631if (runlevel < RUNLEVEL_EXIT_LANGUAGES) {632// If this long-lived task started before the scripting server was initialized,633// now is a good time to have scripting languages ready for the current thread.634// Otherwise, such a piece of setup won't happen unless another task has been635// run during the collaborative wait.636task_mutex.unlock();637ScriptServer::thread_enter();638} else {639task_mutex.unlock();640}641}642643void WorkerThreadPool::notify_yield_over(TaskID p_task_id) {644MutexLock task_lock(task_mutex);645Task **taskp = tasks.getptr(p_task_id);646if (!taskp) {647ERR_FAIL_MSG("Invalid Task ID.");648}649Task *task = *taskp;650if (task->pool_thread_index == -1) { // Completed or not started yet.651if (!task->completed) {652// This avoids a race condition where a task is created and yield-over called before it's processed.653task->pending_notify_yield_over = true;654}655return;656}657658ThreadData &td = threads[task->pool_thread_index];659td.yield_is_over = true;660td.signaled = true;661td.cond_var.notify_one();662}663664WorkerThreadPool::GroupID WorkerThreadPool::_add_group_task(const Callable &p_callable, void (*p_func)(void *, uint32_t), void *p_userdata, BaseTemplateUserdata *p_template_userdata, int p_elements, int p_tasks, bool p_high_priority, const String &p_description) {665ERR_FAIL_COND_V(p_elements < 0, INVALID_TASK_ID);666if (p_tasks < 0) {667p_tasks = MAX(1u, threads.size());668}669670MutexLock<BinaryMutex> lock(task_mutex);671672Group *group = group_allocator.alloc();673GroupID id = last_task++;674group->max = p_elements;675group->self = id;676677Task **tasks_posted = nullptr;678if (p_elements == 0) {679// Should really not call it with zero Elements, but at least it should work.680group->completed.set_to(true);681group->done_semaphore.post();682group->tasks_used = 0;683p_tasks = 0;684if (p_template_userdata) {685memdelete(p_template_userdata);686}687688} else {689group->tasks_used = p_tasks;690tasks_posted = (Task **)alloca(sizeof(Task *) * p_tasks);691for (int i = 0; i < p_tasks; i++) {692Task *task = task_allocator.alloc();693task->native_group_func = p_func;694task->native_func_userdata = p_userdata;695task->description = p_description;696task->group = group;697task->callable = p_callable;698task->template_userdata = p_template_userdata;699tasks_posted[i] = task;700// No task ID is used.701}702}703704groups[id] = group;705706_post_tasks(tasks_posted, p_tasks, p_high_priority, lock, false);707708return id;709}710711WorkerThreadPool::GroupID WorkerThreadPool::add_native_group_task(void (*p_func)(void *, uint32_t), void *p_userdata, int p_elements, int p_tasks, bool p_high_priority, const String &p_description) {712return _add_group_task(Callable(), p_func, p_userdata, nullptr, p_elements, p_tasks, p_high_priority, p_description);713}714715WorkerThreadPool::GroupID WorkerThreadPool::add_group_task(const Callable &p_action, int p_elements, int p_tasks, bool p_high_priority, const String &p_description) {716return _add_group_task(p_action, nullptr, nullptr, nullptr, p_elements, p_tasks, p_high_priority, p_description);717}718719uint32_t WorkerThreadPool::get_group_processed_element_count(GroupID p_group) const {720MutexLock task_lock(task_mutex);721const Group *const *groupp = groups.getptr(p_group);722if (!groupp) {723ERR_FAIL_V_MSG(0, "Invalid Group ID");724}725return (*groupp)->completed_index.get();726}727bool WorkerThreadPool::is_group_task_completed(GroupID p_group) const {728MutexLock task_lock(task_mutex);729const Group *const *groupp = groups.getptr(p_group);730if (!groupp) {731ERR_FAIL_V_MSG(false, "Invalid Group ID");732}733return (*groupp)->completed.is_set();734}735736void WorkerThreadPool::wait_for_group_task_completion(GroupID p_group) {737#ifdef THREADS_ENABLED738task_mutex.lock();739Group **groupp = groups.getptr(p_group);740task_mutex.unlock();741if (!groupp) {742ERR_FAIL_MSG("Invalid Group ID.");743}744745{746Group *group = *groupp;747748if (this == singleton) {749_unlock_unlockable_mutexes();750}751group->done_semaphore.wait();752if (this == singleton) {753_lock_unlockable_mutexes();754}755756uint32_t max_users = group->tasks_used + 1; // Add 1 because the thread waiting for it is also user. Read before to avoid another thread freeing task after increment.757uint32_t finished_users = group->finished.increment(); // fetch happens before inc, so increment later.758759if (finished_users == max_users) {760// All tasks using this group are gone (finished before the group), so clear the group too.761MutexLock task_lock(task_mutex);762group_allocator.free(group);763}764}765766MutexLock task_lock(task_mutex); // This mutex is needed when Physics 2D and/or 3D is selected to run on a separate thread.767groups.erase(p_group);768#endif769}770771int WorkerThreadPool::get_thread_index() const {772Thread::ID tid = Thread::get_caller_id();773return thread_ids.has(tid) ? thread_ids[tid] : -1;774}775776WorkerThreadPool::TaskID WorkerThreadPool::get_caller_task_id() const {777int th_index = get_thread_index();778if (th_index != -1 && threads[th_index].current_task) {779return threads[th_index].current_task->self;780} else {781return INVALID_TASK_ID;782}783}784785WorkerThreadPool::GroupID WorkerThreadPool::get_caller_group_id() const {786int th_index = get_thread_index();787if (th_index != -1 && threads[th_index].current_task && threads[th_index].current_task->group) {788return threads[th_index].current_task->group->self;789} else {790return INVALID_TASK_ID;791}792}793794#ifdef THREADS_ENABLED795uint32_t WorkerThreadPool::_thread_enter_unlock_allowance_zone(THREADING_NAMESPACE::unique_lock<THREADING_NAMESPACE::mutex> &p_ulock) {796for (uint32_t i = 0; i < MAX_UNLOCKABLE_LOCKS; i++) {797DEV_ASSERT((bool)unlockable_locks[i].ulock == (bool)unlockable_locks[i].rc);798if (unlockable_locks[i].ulock == &p_ulock) {799// Already registered in the current thread.800unlockable_locks[i].rc++;801return i;802} else if (!unlockable_locks[i].ulock) {803unlockable_locks[i].ulock = &p_ulock;804unlockable_locks[i].rc = 1;805return i;806}807}808ERR_FAIL_V_MSG(UINT32_MAX, "No more unlockable lock slots available. Engine bug.");809}810811void WorkerThreadPool::thread_exit_unlock_allowance_zone(uint32_t p_zone_id) {812DEV_ASSERT(unlockable_locks[p_zone_id].ulock && unlockable_locks[p_zone_id].rc);813unlockable_locks[p_zone_id].rc--;814if (unlockable_locks[p_zone_id].rc == 0) {815unlockable_locks[p_zone_id].ulock = nullptr;816}817}818#endif819820void WorkerThreadPool::init(int p_thread_count, float p_low_priority_task_ratio) {821ERR_FAIL_COND(threads.size() > 0);822823runlevel = RUNLEVEL_NORMAL;824825if (p_thread_count < 0) {826p_thread_count = OS::get_singleton()->get_default_thread_pool_size();827}828829max_low_priority_threads = CLAMP(p_thread_count * p_low_priority_task_ratio, 1, p_thread_count - 1);830831print_verbose(vformat("WorkerThreadPool: %d threads, %d max low-priority.", p_thread_count, max_low_priority_threads));832833#ifdef THREADS_ENABLED834// Reserve 5 threads in case we need separate threads for 1) 2D physics 2) 3D physics 3) rendering 4) GPU texture compression, 5) all other tasks.835// We cannot safely increase the Vector size at runtime, so reserve enough up front, but only launch those needed.836threads.reserve(5);837#endif838threads.resize(p_thread_count);839840Thread::Settings settings;841#ifdef __APPLE__842// The default stack size for new threads on Apple platforms is 512KiB.843// This is insufficient when using a library like SPIRV-Cross,844// which can generate deep stacks and result in a stack overflow.845#ifdef DEV_ENABLED846// Debug builds need an even larger stack size.847settings.stack_size = 2 * 1024 * 1024; // 2 MiB848#else849settings.stack_size = 1 * 1024 * 1024; // 1 MiB850#endif851#endif852853for (uint32_t i = 0; i < threads.size(); i++) {854threads[i].index = i;855threads[i].pool = this;856threads[i].thread.start(&WorkerThreadPool::_thread_function, &threads[i], settings);857thread_ids.insert(threads[i].thread.get_id(), i);858}859}860861void WorkerThreadPool::exit_languages_threads() {862if (threads.is_empty()) {863return;864}865866MutexLock lock(task_mutex);867868// Wait until all threads are idle.869_switch_runlevel(RUNLEVEL_PRE_EXIT_LANGUAGES);870while (runlevel_data.pre_exit_languages.num_idle_threads != threads.size()) {871control_cond_var.wait(lock);872}873874// Wait until all threads have detached from scripting languages.875_switch_runlevel(RUNLEVEL_EXIT_LANGUAGES);876while (runlevel_data.exit_languages.num_exited_threads != threads.size()) {877control_cond_var.wait(lock);878}879}880881void WorkerThreadPool::finish() {882if (threads.is_empty()) {883return;884}885886{887MutexLock lock(task_mutex);888SelfList<Task> *E = low_priority_task_queue.first();889while (E) {890print_error("Task waiting was never re-claimed: " + E->self()->description);891E = E->next();892}893894_switch_runlevel(RUNLEVEL_EXIT);895}896897for (ThreadData &data : threads) {898data.thread.wait_to_finish();899}900901{902MutexLock lock(task_mutex);903for (KeyValue<TaskID, Task *> &E : tasks) {904task_allocator.free(E.value);905}906}907908threads.clear();909}910911void WorkerThreadPool::_bind_methods() {912ClassDB::bind_method(D_METHOD("add_task", "action", "high_priority", "description"), &WorkerThreadPool::add_task_bind, DEFVAL(false), DEFVAL(String()));913ClassDB::bind_method(D_METHOD("is_task_completed", "task_id"), &WorkerThreadPool::is_task_completed);914ClassDB::bind_method(D_METHOD("wait_for_task_completion", "task_id"), &WorkerThreadPool::wait_for_task_completion);915ClassDB::bind_method(D_METHOD("get_caller_task_id"), &WorkerThreadPool::get_caller_task_id);916917ClassDB::bind_method(D_METHOD("add_group_task", "action", "elements", "tasks_needed", "high_priority", "description"), &WorkerThreadPool::add_group_task, DEFVAL(-1), DEFVAL(false), DEFVAL(String()));918ClassDB::bind_method(D_METHOD("is_group_task_completed", "group_id"), &WorkerThreadPool::is_group_task_completed);919ClassDB::bind_method(D_METHOD("get_group_processed_element_count", "group_id"), &WorkerThreadPool::get_group_processed_element_count);920ClassDB::bind_method(D_METHOD("wait_for_group_task_completion", "group_id"), &WorkerThreadPool::wait_for_group_task_completion);921ClassDB::bind_method(D_METHOD("get_caller_group_id"), &WorkerThreadPool::get_caller_group_id);922}923924WorkerThreadPool *WorkerThreadPool::get_named_pool(const StringName &p_name) {925WorkerThreadPool **pool_ptr = named_pools.getptr(p_name);926if (pool_ptr) {927return *pool_ptr;928} else {929WorkerThreadPool *pool = memnew(WorkerThreadPool(false));930pool->init();931named_pools[p_name] = pool;932return pool;933}934}935936WorkerThreadPool::WorkerThreadPool(bool p_singleton) {937if (p_singleton) {938singleton = this;939}940}941942WorkerThreadPool::~WorkerThreadPool() {943finish();944945if (this == singleton) {946singleton = nullptr;947for (KeyValue<StringName, WorkerThreadPool *> &E : named_pools) {948E.value->finish();949memdelete(E.value);950}951named_pools.clear();952}953}954955956