Path: blob/master/tests/core/templates/test_command_queue.h
10278 views
/**************************************************************************/1/* test_command_queue.h */2/**************************************************************************/3/* This file is part of: */4/* GODOT ENGINE */5/* https://godotengine.org */6/**************************************************************************/7/* Copyright (c) 2014-present Godot Engine contributors (see AUTHORS.md). */8/* Copyright (c) 2007-2014 Juan Linietsky, Ariel Manzur. */9/* */10/* Permission is hereby granted, free of charge, to any person obtaining */11/* a copy of this software and associated documentation files (the */12/* "Software"), to deal in the Software without restriction, including */13/* without limitation the rights to use, copy, modify, merge, publish, */14/* distribute, sublicense, and/or sell copies of the Software, and to */15/* permit persons to whom the Software is furnished to do so, subject to */16/* the following conditions: */17/* */18/* The above copyright notice and this permission notice shall be */19/* included in all copies or substantial portions of the Software. */20/* */21/* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, */22/* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF */23/* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. */24/* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY */25/* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, */26/* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE */27/* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */28/**************************************************************************/2930#pragma once3132#include "core/config/project_settings.h"33#include "core/math/random_number_generator.h"34#include "core/object/worker_thread_pool.h"35#include "core/os/os.h"36#include "core/os/thread.h"37#include "core/templates/command_queue_mt.h"38#include "tests/test_macros.h"3940namespace TestCommandQueue {4142class ThreadWork {43Semaphore thread_sem;44Semaphore main_sem;45Mutex mut;46int threading_errors = 0;47enum State {48MAIN_START,49MAIN_DONE,50THREAD_START,51THREAD_DONE,52} state;5354public:55ThreadWork() {56mut.lock();57state = MAIN_START;58}59~ThreadWork() {60CHECK_MESSAGE(threading_errors == 0, "threads did not lock/unlock correctly");61}62void thread_wait_for_work() {63thread_sem.wait();64mut.lock();65if (state != MAIN_DONE) {66threading_errors++;67}68state = THREAD_START;69}70void thread_done_work() {71if (state != THREAD_START) {72threading_errors++;73}74state = THREAD_DONE;75mut.unlock();76main_sem.post();77}7879void main_wait_for_done() {80main_sem.wait();81mut.lock();82if (state != THREAD_DONE) {83threading_errors++;84}85state = MAIN_START;86}87void main_start_work() {88if (state != MAIN_START) {89threading_errors++;90}91state = MAIN_DONE;92mut.unlock();93thread_sem.post();94}95};9697class SharedThreadState {98public:99ThreadWork reader_threadwork;100ThreadWork writer_threadwork;101102CommandQueueMT command_queue;103104enum TestMsgType {105TEST_MSG_FUNC1_TRANSFORM,106TEST_MSG_FUNC2_TRANSFORM_FLOAT,107TEST_MSG_FUNC3_TRANSFORMx6,108TEST_MSGSYNC_FUNC1_TRANSFORM,109TEST_MSGSYNC_FUNC2_TRANSFORM_FLOAT,110TEST_MSGRET_FUNC1_TRANSFORM,111TEST_MSGRET_FUNC2_TRANSFORM_FLOAT,112TEST_MSG_MAX113};114115Vector<TestMsgType> message_types_to_write;116bool during_writing = false;117int message_count_to_read = 0;118bool exit_threads = false;119120Thread reader_thread;121WorkerThreadPool::TaskID reader_task_id = WorkerThreadPool::INVALID_TASK_ID;122Thread writer_thread;123124int func1_count = 0;125126void func1(Transform3D t) {127func1_count++;128}129void func2(Transform3D t, float f) {130func1_count++;131}132void func3(Transform3D t1, Transform3D t2, Transform3D t3, Transform3D t4, Transform3D t5, Transform3D t6) {133func1_count++;134}135Transform3D func1r(Transform3D t) {136func1_count++;137return t;138}139Transform3D func2r(Transform3D t, float f) {140func1_count++;141return t;142}143144void add_msg_to_write(TestMsgType type) {145message_types_to_write.push_back(type);146}147148void reader_thread_loop() {149reader_threadwork.thread_wait_for_work();150while (!exit_threads) {151if (reader_task_id == WorkerThreadPool::INVALID_TASK_ID) {152command_queue.flush_all();153} else {154if (message_count_to_read < 0) {155command_queue.flush_all();156}157for (int i = 0; i < message_count_to_read; i++) {158WorkerThreadPool::get_singleton()->yield();159command_queue.wait_and_flush();160}161}162message_count_to_read = 0;163164reader_threadwork.thread_done_work();165reader_threadwork.thread_wait_for_work();166}167command_queue.flush_all();168reader_threadwork.thread_done_work();169}170static void static_reader_thread_loop(void *stsvoid) {171SharedThreadState *sts = static_cast<SharedThreadState *>(stsvoid);172sts->reader_thread_loop();173}174175void writer_thread_loop() {176during_writing = false;177writer_threadwork.thread_wait_for_work();178while (!exit_threads) {179Transform3D tr;180Transform3D otr;181float f = 1;182during_writing = true;183for (int i = 0; i < message_types_to_write.size(); i++) {184TestMsgType msg_type = message_types_to_write[i];185switch (msg_type) {186case TEST_MSG_FUNC1_TRANSFORM:187command_queue.push(this, &SharedThreadState::func1, tr);188break;189case TEST_MSG_FUNC2_TRANSFORM_FLOAT:190command_queue.push(this, &SharedThreadState::func2, tr, f);191break;192case TEST_MSG_FUNC3_TRANSFORMx6:193command_queue.push(this, &SharedThreadState::func3, tr, tr, tr, tr, tr, tr);194break;195case TEST_MSGSYNC_FUNC1_TRANSFORM:196command_queue.push_and_sync(this, &SharedThreadState::func1, tr);197break;198case TEST_MSGSYNC_FUNC2_TRANSFORM_FLOAT:199command_queue.push_and_sync(this, &SharedThreadState::func2, tr, f);200break;201case TEST_MSGRET_FUNC1_TRANSFORM:202command_queue.push_and_ret(this, &SharedThreadState::func1r, &otr, tr);203break;204case TEST_MSGRET_FUNC2_TRANSFORM_FLOAT:205command_queue.push_and_ret(this, &SharedThreadState::func2r, &otr, tr, f);206break;207default:208break;209}210}211message_types_to_write.clear();212during_writing = false;213214writer_threadwork.thread_done_work();215writer_threadwork.thread_wait_for_work();216}217writer_threadwork.thread_done_work();218}219static void static_writer_thread_loop(void *stsvoid) {220SharedThreadState *sts = static_cast<SharedThreadState *>(stsvoid);221sts->writer_thread_loop();222}223224void init_threads(bool p_use_thread_pool_sync = false) {225if (p_use_thread_pool_sync) {226reader_task_id = WorkerThreadPool::get_singleton()->add_native_task(&SharedThreadState::static_reader_thread_loop, this, true);227command_queue.set_pump_task_id(reader_task_id);228} else {229reader_thread.start(&SharedThreadState::static_reader_thread_loop, this);230}231writer_thread.start(&SharedThreadState::static_writer_thread_loop, this);232}233void destroy_threads() {234exit_threads = true;235reader_threadwork.main_start_work();236writer_threadwork.main_start_work();237238if (reader_task_id != WorkerThreadPool::INVALID_TASK_ID) {239WorkerThreadPool::get_singleton()->wait_for_task_completion(reader_task_id);240} else {241reader_thread.wait_to_finish();242}243writer_thread.wait_to_finish();244}245246struct CopyMoveTestType {247inline static int copy_count;248inline static int move_count;249int value = 0;250251CopyMoveTestType(int p_value = 0) :252value(p_value) {}253254CopyMoveTestType(const CopyMoveTestType &p_other) :255value(p_other.value) {256copy_count++;257}258259CopyMoveTestType(CopyMoveTestType &&p_other) :260value(p_other.value) {261move_count++;262}263264CopyMoveTestType &operator=(const CopyMoveTestType &p_other) {265value = p_other.value;266copy_count++;267return *this;268}269270CopyMoveTestType &operator=(CopyMoveTestType &&p_other) {271value = p_other.value;272move_count++;273return *this;274}275};276277void copy_move_test_copy(CopyMoveTestType p_test_type) {278}279void copy_move_test_ref(const CopyMoveTestType &p_test_type) {280}281void copy_move_test_move(CopyMoveTestType &&p_test_type) {282}283};284285static void test_command_queue_basic(bool p_use_thread_pool_sync) {286const char *COMMAND_QUEUE_SETTING = "memory/limits/command_queue/multithreading_queue_size_kb";287ProjectSettings::get_singleton()->set_setting(COMMAND_QUEUE_SETTING, 1);288SharedThreadState sts;289sts.init_threads(p_use_thread_pool_sync);290291sts.add_msg_to_write(SharedThreadState::TEST_MSG_FUNC1_TRANSFORM);292sts.writer_threadwork.main_start_work();293sts.writer_threadwork.main_wait_for_done();294CHECK_MESSAGE(sts.func1_count == 0,295"Control: no messages read before reader has run.");296297sts.message_count_to_read = 1;298sts.reader_threadwork.main_start_work();299sts.reader_threadwork.main_wait_for_done();300CHECK_MESSAGE(sts.func1_count == 1,301"Reader should have read one message");302303sts.message_count_to_read = -1;304sts.reader_threadwork.main_start_work();305sts.reader_threadwork.main_wait_for_done();306CHECK_MESSAGE(sts.func1_count == 1,307"Reader should have read no additional messages from flush_all");308309sts.add_msg_to_write(SharedThreadState::TEST_MSG_FUNC1_TRANSFORM);310sts.writer_threadwork.main_start_work();311sts.writer_threadwork.main_wait_for_done();312313sts.message_count_to_read = -1;314sts.reader_threadwork.main_start_work();315sts.reader_threadwork.main_wait_for_done();316CHECK_MESSAGE(sts.func1_count == 2,317"Reader should have read one additional message from flush_all");318319sts.destroy_threads();320321CHECK_MESSAGE(sts.func1_count == 2,322"Reader should have read no additional messages after join");323ProjectSettings::get_singleton()->set_setting(COMMAND_QUEUE_SETTING,324ProjectSettings::get_singleton()->property_get_revert(COMMAND_QUEUE_SETTING));325}326327TEST_CASE("[CommandQueue] Test Queue Basics") {328test_command_queue_basic(false);329}330331TEST_CASE("[CommandQueue] Test Queue Basics with WorkerThreadPool sync.") {332test_command_queue_basic(true);333}334335TEST_CASE("[CommandQueue] Test Queue Wrapping to same spot.") {336const char *COMMAND_QUEUE_SETTING = "memory/limits/command_queue/multithreading_queue_size_kb";337ProjectSettings::get_singleton()->set_setting(COMMAND_QUEUE_SETTING, 1);338SharedThreadState sts;339sts.init_threads();340341sts.add_msg_to_write(SharedThreadState::TEST_MSG_FUNC3_TRANSFORMx6);342sts.add_msg_to_write(SharedThreadState::TEST_MSG_FUNC3_TRANSFORMx6);343sts.add_msg_to_write(SharedThreadState::TEST_MSG_FUNC1_TRANSFORM);344sts.writer_threadwork.main_start_work();345sts.writer_threadwork.main_wait_for_done();346347sts.message_count_to_read = -1;348sts.reader_threadwork.main_start_work();349sts.reader_threadwork.main_wait_for_done();350CHECK_MESSAGE(sts.func1_count == 3,351"Reader should have read at least three messages");352353sts.add_msg_to_write(SharedThreadState::TEST_MSG_FUNC3_TRANSFORMx6);354sts.writer_threadwork.main_start_work();355sts.writer_threadwork.main_wait_for_done();356sts.add_msg_to_write(SharedThreadState::TEST_MSG_FUNC1_TRANSFORM);357sts.add_msg_to_write(SharedThreadState::TEST_MSG_FUNC3_TRANSFORMx6);358sts.writer_threadwork.main_start_work();359OS::get_singleton()->delay_usec(1000);360361sts.message_count_to_read = -1;362sts.reader_threadwork.main_start_work();363OS::get_singleton()->delay_usec(1000);364365sts.writer_threadwork.main_wait_for_done();366sts.reader_threadwork.main_wait_for_done();367CHECK_MESSAGE(sts.func1_count >= 3,368"Reader should have read at least three messages");369370sts.message_count_to_read = 6 - sts.func1_count;371sts.reader_threadwork.main_start_work();372373// The following will fail immediately.374// The reason it hangs indefinitely in engine, is all subsequent calls to375// CommandQueue.wait_and_flush_one will also fail.376sts.reader_threadwork.main_wait_for_done();377378// Because looping around uses an extra message, easiest to consume all.379sts.message_count_to_read = -1;380sts.reader_threadwork.main_start_work();381sts.reader_threadwork.main_wait_for_done();382CHECK_MESSAGE(sts.func1_count == 6,383"Reader should have read both message sets");384385sts.destroy_threads();386387CHECK_MESSAGE(sts.func1_count == 6,388"Reader should have read no additional messages after join");389ProjectSettings::get_singleton()->set_setting(COMMAND_QUEUE_SETTING,390ProjectSettings::get_singleton()->property_get_revert(COMMAND_QUEUE_SETTING));391}392393TEST_CASE("[CommandQueue] Test Queue Lapping") {394const char *COMMAND_QUEUE_SETTING = "memory/limits/command_queue/multithreading_queue_size_kb";395ProjectSettings::get_singleton()->set_setting(COMMAND_QUEUE_SETTING, 1);396SharedThreadState sts;397sts.init_threads();398399sts.add_msg_to_write(SharedThreadState::TEST_MSG_FUNC1_TRANSFORM);400sts.add_msg_to_write(SharedThreadState::TEST_MSG_FUNC3_TRANSFORMx6);401sts.add_msg_to_write(SharedThreadState::TEST_MSG_FUNC3_TRANSFORMx6);402sts.writer_threadwork.main_start_work();403sts.writer_threadwork.main_wait_for_done();404405// We need to read an extra message so that it triggers the dealloc logic once.406// Otherwise, the queue will be considered full.407sts.message_count_to_read = 3;408sts.reader_threadwork.main_start_work();409sts.reader_threadwork.main_wait_for_done();410CHECK_MESSAGE(sts.func1_count == 3,411"Reader should have read first set of messages");412413sts.add_msg_to_write(SharedThreadState::TEST_MSG_FUNC3_TRANSFORMx6);414sts.add_msg_to_write(SharedThreadState::TEST_MSG_FUNC3_TRANSFORMx6);415sts.writer_threadwork.main_start_work();416// Don't wait for these, because the queue isn't big enough.417sts.writer_threadwork.main_wait_for_done();418419sts.add_msg_to_write(SharedThreadState::TEST_MSG_FUNC2_TRANSFORM_FLOAT);420sts.writer_threadwork.main_start_work();421OS::get_singleton()->delay_usec(1000);422423sts.message_count_to_read = 3;424sts.reader_threadwork.main_start_work();425sts.reader_threadwork.main_wait_for_done();426427sts.writer_threadwork.main_wait_for_done();428429sts.message_count_to_read = -1;430sts.reader_threadwork.main_start_work();431sts.reader_threadwork.main_wait_for_done();432433CHECK_MESSAGE(sts.func1_count == 6,434"Reader should have read rest of the messages after lapping writers.");435436sts.destroy_threads();437438CHECK_MESSAGE(sts.func1_count == 6,439"Reader should have read no additional messages after join");440ProjectSettings::get_singleton()->set_setting(COMMAND_QUEUE_SETTING,441ProjectSettings::get_singleton()->property_get_revert(COMMAND_QUEUE_SETTING));442}443444TEST_CASE("[Stress][CommandQueue] Stress test command queue") {445const char *COMMAND_QUEUE_SETTING = "memory/limits/command_queue/multithreading_queue_size_kb";446ProjectSettings::get_singleton()->set_setting(COMMAND_QUEUE_SETTING, 1);447SharedThreadState sts;448sts.init_threads();449450RandomNumberGenerator rng;451452rng.set_seed(1837267);453454int msgs_to_add = 2048;455456for (int i = 0; i < msgs_to_add; i++) {457// randi_range is inclusive, so allow any enum value except MAX.458sts.add_msg_to_write((SharedThreadState::TestMsgType)rng.randi_range(0, SharedThreadState::TEST_MSG_MAX - 1));459}460sts.writer_threadwork.main_start_work();461462int max_loop_iters = msgs_to_add * 2;463int loop_iters = 0;464while (sts.func1_count < msgs_to_add && loop_iters < max_loop_iters) {465int remaining = (msgs_to_add - sts.func1_count);466sts.message_count_to_read = rng.randi_range(1, remaining < 128 ? remaining : 128);467if (loop_iters % 3 == 0) {468sts.message_count_to_read = -1;469}470sts.reader_threadwork.main_start_work();471sts.reader_threadwork.main_wait_for_done();472loop_iters++;473}474CHECK_MESSAGE(loop_iters < max_loop_iters,475"Reader needed too many iterations to read messages!");476sts.writer_threadwork.main_wait_for_done();477478sts.destroy_threads();479480CHECK_MESSAGE(sts.func1_count == msgs_to_add,481"Reader should have read no additional messages after join");482ProjectSettings::get_singleton()->set_setting(COMMAND_QUEUE_SETTING,483ProjectSettings::get_singleton()->property_get_revert(COMMAND_QUEUE_SETTING));484}485486TEST_CASE("[CommandQueue] Test Parameter Passing Semantics") {487SharedThreadState sts;488sts.init_threads();489490SUBCASE("Testing with lvalue") {491SharedThreadState::CopyMoveTestType::copy_count = 0;492SharedThreadState::CopyMoveTestType::move_count = 0;493494SharedThreadState::CopyMoveTestType lvalue(42);495496SUBCASE("Pass by copy") {497sts.command_queue.push(&sts, &SharedThreadState::copy_move_test_copy, lvalue);498499sts.message_count_to_read = -1;500sts.reader_threadwork.main_start_work();501sts.reader_threadwork.main_wait_for_done();502503CHECK(SharedThreadState::CopyMoveTestType::copy_count == 1);504CHECK(SharedThreadState::CopyMoveTestType::move_count == 1);505}506507SUBCASE("Pass by reference") {508sts.command_queue.push(&sts, &SharedThreadState::copy_move_test_ref, lvalue);509510sts.message_count_to_read = -1;511sts.reader_threadwork.main_start_work();512sts.reader_threadwork.main_wait_for_done();513514CHECK(SharedThreadState::CopyMoveTestType::copy_count == 1);515CHECK(SharedThreadState::CopyMoveTestType::move_count == 0);516}517}518519SUBCASE("Testing with rvalue") {520SharedThreadState::CopyMoveTestType::copy_count = 0;521SharedThreadState::CopyMoveTestType::move_count = 0;522523SUBCASE("Pass by copy") {524sts.command_queue.push(&sts, &SharedThreadState::copy_move_test_copy,525SharedThreadState::CopyMoveTestType(43));526527sts.message_count_to_read = -1;528sts.reader_threadwork.main_start_work();529sts.reader_threadwork.main_wait_for_done();530531CHECK(SharedThreadState::CopyMoveTestType::copy_count == 0);532CHECK(SharedThreadState::CopyMoveTestType::move_count == 2);533}534535SUBCASE("Pass by reference") {536sts.command_queue.push(&sts, &SharedThreadState::copy_move_test_ref,537SharedThreadState::CopyMoveTestType(43));538539sts.message_count_to_read = -1;540sts.reader_threadwork.main_start_work();541sts.reader_threadwork.main_wait_for_done();542543CHECK(SharedThreadState::CopyMoveTestType::copy_count == 0);544CHECK(SharedThreadState::CopyMoveTestType::move_count == 1);545}546547SUBCASE("Pass by rvalue reference") {548sts.command_queue.push(&sts, &SharedThreadState::copy_move_test_move,549SharedThreadState::CopyMoveTestType(43));550551sts.message_count_to_read = -1;552sts.reader_threadwork.main_start_work();553sts.reader_threadwork.main_wait_for_done();554555CHECK(SharedThreadState::CopyMoveTestType::copy_count == 0);556CHECK(SharedThreadState::CopyMoveTestType::move_count == 1);557}558}559560sts.destroy_threads();561}562} // namespace TestCommandQueue563564565