Path: blob/master/modules/multiplayer/scene_replication_interface.cpp
10277 views
/**************************************************************************/1/* scene_replication_interface.cpp */2/**************************************************************************/3/* This file is part of: */4/* GODOT ENGINE */5/* https://godotengine.org */6/**************************************************************************/7/* Copyright (c) 2014-present Godot Engine contributors (see AUTHORS.md). */8/* Copyright (c) 2007-2014 Juan Linietsky, Ariel Manzur. */9/* */10/* Permission is hereby granted, free of charge, to any person obtaining */11/* a copy of this software and associated documentation files (the */12/* "Software"), to deal in the Software without restriction, including */13/* without limitation the rights to use, copy, modify, merge, publish, */14/* distribute, sublicense, and/or sell copies of the Software, and to */15/* permit persons to whom the Software is furnished to do so, subject to */16/* the following conditions: */17/* */18/* The above copyright notice and this permission notice shall be */19/* included in all copies or substantial portions of the Software. */20/* */21/* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, */22/* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF */23/* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. */24/* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY */25/* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, */26/* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE */27/* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */28/**************************************************************************/2930#include "scene_replication_interface.h"3132#include "scene_multiplayer.h"3334#include "core/debugger/engine_debugger.h"35#include "core/io/marshalls.h"36#include "scene/main/node.h"3738#define MAKE_ROOM(m_amount) \39if (packet_cache.size() < m_amount) \40packet_cache.resize(m_amount);4142#ifdef DEBUG_ENABLED43_FORCE_INLINE_ void SceneReplicationInterface::_profile_node_data(const String &p_what, ObjectID p_id, int p_size) {44if (EngineDebugger::is_profiling("multiplayer:replication")) {45Array values = { p_what, p_id, p_size };46EngineDebugger::profiler_add_frame_data("multiplayer:replication", values);47}48}49#endif5051SceneReplicationInterface::TrackedNode &SceneReplicationInterface::_track(const ObjectID &p_id) {52if (!tracked_nodes.has(p_id)) {53tracked_nodes[p_id] = TrackedNode(p_id);54Node *node = get_id_as<Node>(p_id);55node->connect(SceneStringName(tree_exited), callable_mp(this, &SceneReplicationInterface::_untrack).bind(p_id), Node::CONNECT_ONE_SHOT);56}57return tracked_nodes[p_id];58}5960void SceneReplicationInterface::_untrack(const ObjectID &p_id) {61if (!tracked_nodes.has(p_id)) {62return;63}64uint32_t net_id = tracked_nodes[p_id].net_id;65uint32_t peer = tracked_nodes[p_id].remote_peer;66tracked_nodes.erase(p_id);67// If it was spawned by a remote, remove it from the received nodes.68if (peer && peers_info.has(peer)) {69peers_info[peer].recv_nodes.erase(net_id);70}71// If we spawned or synced it, we need to remove it from any peer it was sent to.72if (net_id || peer == 0) {73for (KeyValue<int, PeerInfo> &E : peers_info) {74E.value.spawn_nodes.erase(p_id);75}76}77}7879void SceneReplicationInterface::_free_remotes(const PeerInfo &p_info) {80for (const KeyValue<uint32_t, ObjectID> &E : p_info.recv_nodes) {81Node *node = tracked_nodes.has(E.value) ? get_id_as<Node>(E.value) : nullptr;82ERR_CONTINUE(!node);83node->queue_free();84}85}8687bool SceneReplicationInterface::_has_authority(const Node *p_node) {88return multiplayer->has_multiplayer_peer() && p_node->get_multiplayer_authority() == multiplayer->get_unique_id();89}9091void SceneReplicationInterface::on_peer_change(int p_id, bool p_connected) {92if (p_connected) {93peers_info[p_id] = PeerInfo();94for (const ObjectID &oid : spawned_nodes) {95_update_spawn_visibility(p_id, oid);96}97for (const ObjectID &oid : sync_nodes) {98_update_sync_visibility(p_id, get_id_as<MultiplayerSynchronizer>(oid));99}100} else {101ERR_FAIL_COND(!peers_info.has(p_id));102_free_remotes(peers_info[p_id]);103peers_info.erase(p_id);104}105}106107void SceneReplicationInterface::on_reset() {108for (const KeyValue<int, PeerInfo> &E : peers_info) {109_free_remotes(E.value);110}111peers_info.clear();112// Tracked nodes are cleared on deletion, here we only reset the ids so they can be later re-assigned.113for (KeyValue<ObjectID, TrackedNode> &E : tracked_nodes) {114TrackedNode &tobj = E.value;115tobj.net_id = 0;116tobj.remote_peer = 0;117}118for (const ObjectID &oid : sync_nodes) {119MultiplayerSynchronizer *sync = get_id_as<MultiplayerSynchronizer>(oid);120ERR_CONTINUE(!sync);121sync->reset();122}123last_net_id = 0;124}125126void SceneReplicationInterface::on_network_process() {127// Prevent endless stalling in case of unforeseen spawn errors.128if (spawn_queue.size()) {129ERR_PRINT("An error happened during last spawn, this usually means the 'ready' signal was not emitted by the spawned node.");130for (const ObjectID &oid : spawn_queue) {131Node *node = get_id_as<Node>(oid);132ERR_CONTINUE(!node);133if (node->is_connected(SceneStringName(ready), callable_mp(this, &SceneReplicationInterface::_node_ready))) {134node->disconnect(SceneStringName(ready), callable_mp(this, &SceneReplicationInterface::_node_ready));135}136}137spawn_queue.clear();138}139140// Process syncs.141uint64_t usec = OS::get_singleton()->get_ticks_usec();142for (KeyValue<int, PeerInfo> &E : peers_info) {143const HashSet<ObjectID> to_sync = E.value.sync_nodes;144if (to_sync.is_empty()) {145continue; // Nothing to sync146}147uint16_t sync_net_time = ++E.value.last_sent_sync;148_send_sync(E.key, to_sync, sync_net_time, usec);149_send_delta(E.key, to_sync, usec, E.value.last_watch_usecs);150}151}152153Error SceneReplicationInterface::on_spawn(Object *p_obj, Variant p_config) {154Node *node = Object::cast_to<Node>(p_obj);155ERR_FAIL_COND_V(!node || p_config.get_type() != Variant::OBJECT, ERR_INVALID_PARAMETER);156MultiplayerSpawner *spawner = Object::cast_to<MultiplayerSpawner>(p_config.get_validated_object());157ERR_FAIL_NULL_V(spawner, ERR_INVALID_PARAMETER);158// Track node.159const ObjectID oid = node->get_instance_id();160TrackedNode &tobj = _track(oid);161162// Spawn state needs to be callected after "ready", but the spawn order follows "enter_tree".163ERR_FAIL_COND_V(tobj.spawner != ObjectID(), ERR_ALREADY_IN_USE);164tobj.spawner = spawner->get_instance_id();165spawn_queue.insert(oid);166node->connect(SceneStringName(ready), callable_mp(this, &SceneReplicationInterface::_node_ready).bind(oid), Node::CONNECT_ONE_SHOT);167return OK;168}169170void SceneReplicationInterface::_node_ready(const ObjectID &p_oid) {171ERR_FAIL_COND(!spawn_queue.has(p_oid)); // Bug.172173// If we are a nested spawn, we need to wait until the parent is ready.174if (p_oid != *(spawn_queue.begin())) {175return;176}177178for (const ObjectID &oid : spawn_queue) {179ERR_CONTINUE(!tracked_nodes.has(oid));180181TrackedNode &tobj = tracked_nodes[oid];182MultiplayerSpawner *spawner = get_id_as<MultiplayerSpawner>(tobj.spawner);183ERR_CONTINUE(!spawner);184185spawned_nodes.insert(oid);186if (_has_authority(spawner)) {187_update_spawn_visibility(0, oid);188}189}190spawn_queue.clear();191}192193Error SceneReplicationInterface::on_despawn(Object *p_obj, Variant p_config) {194Node *node = Object::cast_to<Node>(p_obj);195ERR_FAIL_COND_V(!node || p_config.get_type() != Variant::OBJECT, ERR_INVALID_PARAMETER);196MultiplayerSpawner *spawner = Object::cast_to<MultiplayerSpawner>(p_config.get_validated_object());197ERR_FAIL_COND_V(!p_obj || !spawner, ERR_INVALID_PARAMETER);198// Forcibly despawn to all peers that knowns me.199int len = 0;200Error err = _make_despawn_packet(node, len);201ERR_FAIL_COND_V(err != OK, ERR_BUG);202const ObjectID oid = p_obj->get_instance_id();203for (const KeyValue<int, PeerInfo> &E : peers_info) {204if (!E.value.spawn_nodes.has(oid)) {205continue;206}207_send_raw(packet_cache.ptr(), len, E.key, true);208}209// Also remove spawner tracking from the replication state.210ERR_FAIL_COND_V(!tracked_nodes.has(oid), ERR_INVALID_PARAMETER);211TrackedNode &tobj = _track(oid);212ERR_FAIL_COND_V(tobj.spawner != spawner->get_instance_id(), ERR_INVALID_PARAMETER);213tobj.spawner = ObjectID();214spawned_nodes.erase(oid);215for (KeyValue<int, PeerInfo> &E : peers_info) {216E.value.spawn_nodes.erase(oid);217}218return OK;219}220221Error SceneReplicationInterface::on_replication_start(Object *p_obj, Variant p_config) {222Node *node = Object::cast_to<Node>(p_obj);223ERR_FAIL_COND_V(!node || p_config.get_type() != Variant::OBJECT, ERR_INVALID_PARAMETER);224MultiplayerSynchronizer *sync = Object::cast_to<MultiplayerSynchronizer>(p_config.get_validated_object());225ERR_FAIL_NULL_V(sync, ERR_INVALID_PARAMETER);226227// Add to synchronizer list.228TrackedNode &tobj = _track(p_obj->get_instance_id());229const ObjectID sid = sync->get_instance_id();230tobj.synchronizers.insert(sid);231sync_nodes.insert(sid);232233// Update visibility.234sync->connect(SceneStringName(visibility_changed), callable_mp(this, &SceneReplicationInterface::_visibility_changed).bind(sync->get_instance_id()));235_update_sync_visibility(0, sync);236237if (pending_spawn == p_obj->get_instance_id() && sync->get_multiplayer_authority() == pending_spawn_remote) {238// Try to apply synchronizer Net ID239ERR_FAIL_COND_V_MSG(pending_sync_net_ids.is_empty(), ERR_INVALID_DATA, vformat("The MultiplayerSynchronizer at path \"%s\" is unable to process the pending spawn since it has no network ID. This might happen when changing the multiplayer authority during the \"_ready\" callback. Make sure to only change the authority of multiplayer synchronizers during \"_enter_tree\" or the \"_spawn_custom\" callback of their multiplayer spawner.", sync->get_path()));240ERR_FAIL_COND_V(!peers_info.has(pending_spawn_remote), ERR_INVALID_DATA);241uint32_t net_id = pending_sync_net_ids.front()->get();242pending_sync_net_ids.pop_front();243peers_info[pending_spawn_remote].recv_sync_ids[net_id] = sync->get_instance_id();244sync->set_net_id(net_id);245246// Try to apply spawn state (before ready).247if (pending_buffer_size > 0) {248ERR_FAIL_COND_V(!node || !sync->get_replication_config_ptr(), ERR_UNCONFIGURED);249int consumed = 0;250const List<NodePath> props = sync->get_replication_config_ptr()->get_spawn_properties();251Vector<Variant> vars;252vars.resize(props.size());253Error err = MultiplayerAPI::decode_and_decompress_variants(vars, pending_buffer, pending_buffer_size, consumed);254ERR_FAIL_COND_V(err, err);255if (consumed > 0) {256pending_buffer += consumed;257pending_buffer_size -= consumed;258err = MultiplayerSynchronizer::set_state(props, node, vars);259ERR_FAIL_COND_V(err, err);260}261}262}263return OK;264}265266Error SceneReplicationInterface::on_replication_stop(Object *p_obj, Variant p_config) {267Node *node = Object::cast_to<Node>(p_obj);268ERR_FAIL_COND_V(!node || p_config.get_type() != Variant::OBJECT, ERR_INVALID_PARAMETER);269MultiplayerSynchronizer *sync = Object::cast_to<MultiplayerSynchronizer>(p_config.get_validated_object());270ERR_FAIL_NULL_V(sync, ERR_INVALID_PARAMETER);271sync->disconnect(SceneStringName(visibility_changed), callable_mp(this, &SceneReplicationInterface::_visibility_changed));272// Untrack synchronizer.273const ObjectID oid = node->get_instance_id();274const ObjectID sid = sync->get_instance_id();275ERR_FAIL_COND_V(!tracked_nodes.has(oid), ERR_INVALID_PARAMETER);276TrackedNode &tobj = _track(oid);277tobj.synchronizers.erase(sid);278sync_nodes.erase(sid);279for (KeyValue<int, PeerInfo> &E : peers_info) {280E.value.sync_nodes.erase(sid);281E.value.last_watch_usecs.erase(sid);282if (sync->get_net_id()) {283E.value.recv_sync_ids.erase(sync->get_net_id());284}285}286return OK;287}288289void SceneReplicationInterface::_visibility_changed(int p_peer, ObjectID p_sid) {290MultiplayerSynchronizer *sync = get_id_as<MultiplayerSynchronizer>(p_sid);291ERR_FAIL_NULL(sync); // Bug.292Node *node = sync->get_root_node();293ERR_FAIL_NULL(node); // Bug.294const ObjectID oid = node->get_instance_id();295if (spawned_nodes.has(oid) && p_peer != multiplayer->get_unique_id()) {296_update_spawn_visibility(p_peer, oid);297}298_update_sync_visibility(p_peer, sync);299}300301bool SceneReplicationInterface::is_rpc_visible(const ObjectID &p_oid, int p_peer) const {302if (!tracked_nodes.has(p_oid)) {303return true; // Untracked nodes are always visible to RPCs.304}305ERR_FAIL_COND_V(p_peer < 0, false);306const TrackedNode &tnode = tracked_nodes[p_oid];307if (tnode.synchronizers.is_empty()) {308return true; // No synchronizers means no visibility restrictions.309}310if (tnode.remote_peer && uint32_t(p_peer) == tnode.remote_peer) {311return true; // RPCs on spawned nodes are always visible to spawner.312} else if (spawned_nodes.has(p_oid)) {313// It's a spawned node we control, this can be fast.314if (p_peer) {315return peers_info.has(p_peer) && peers_info[p_peer].spawn_nodes.has(p_oid);316} else {317for (const KeyValue<int, PeerInfo> &E : peers_info) {318if (!E.value.spawn_nodes.has(p_oid)) {319return false; // Not public.320}321}322return true; // All peers have this node.323}324} else {325// Cycle object synchronizers to check visibility.326for (const ObjectID &sid : tnode.synchronizers) {327MultiplayerSynchronizer *sync = get_id_as<MultiplayerSynchronizer>(sid);328ERR_CONTINUE(!sync);329// RPC visibility is composed using OR when multiple synchronizers are present.330// Note that we don't really care about authority here which may lead to unexpected331// results when using multiple synchronizers to control the same node.332if (sync->is_visible_to(p_peer)) {333return true;334}335}336return false; // Not visible.337}338}339340Error SceneReplicationInterface::_update_sync_visibility(int p_peer, MultiplayerSynchronizer *p_sync) {341ERR_FAIL_NULL_V(p_sync, ERR_BUG);342if (!_has_authority(p_sync) || p_peer == multiplayer->get_unique_id()) {343return OK;344}345346const ObjectID &sid = p_sync->get_instance_id();347bool is_visible = p_sync->is_visible_to(p_peer);348if (p_peer == 0) {349for (KeyValue<int, PeerInfo> &E : peers_info) {350// Might be visible to this specific peer.351bool is_visible_to_peer = is_visible || p_sync->is_visible_to(E.key);352if (is_visible_to_peer == E.value.sync_nodes.has(sid)) {353continue;354}355if (is_visible_to_peer) {356E.value.sync_nodes.insert(sid);357} else {358E.value.sync_nodes.erase(sid);359E.value.last_watch_usecs.erase(sid);360}361}362return OK;363} else {364ERR_FAIL_COND_V(!peers_info.has(p_peer), ERR_INVALID_PARAMETER);365if (is_visible == peers_info[p_peer].sync_nodes.has(sid)) {366return OK;367}368if (is_visible) {369peers_info[p_peer].sync_nodes.insert(sid);370} else {371peers_info[p_peer].sync_nodes.erase(sid);372peers_info[p_peer].last_watch_usecs.erase(sid);373}374return OK;375}376}377378Error SceneReplicationInterface::_update_spawn_visibility(int p_peer, const ObjectID &p_oid) {379const TrackedNode *tnode = tracked_nodes.getptr(p_oid);380ERR_FAIL_NULL_V(tnode, ERR_BUG);381MultiplayerSpawner *spawner = get_id_as<MultiplayerSpawner>(tnode->spawner);382Node *node = get_id_as<Node>(p_oid);383ERR_FAIL_NULL_V(node, ERR_BUG);384ERR_FAIL_NULL_V(spawner, ERR_BUG);385ERR_FAIL_COND_V(!_has_authority(spawner), ERR_BUG);386ERR_FAIL_COND_V(!tracked_nodes.has(p_oid), ERR_BUG);387const HashSet<ObjectID> synchronizers = tracked_nodes[p_oid].synchronizers;388bool is_visible = true;389for (const ObjectID &sid : synchronizers) {390MultiplayerSynchronizer *sync = get_id_as<MultiplayerSynchronizer>(sid);391ERR_CONTINUE(!sync);392if (!_has_authority(sync)) {393continue;394}395// Spawn visibility is composed using OR when multiple synchronizers are present.396if (sync->is_visible_to(p_peer)) {397is_visible = true;398break;399}400is_visible = false;401}402// Spawn (and despawn) when needed.403HashSet<int> to_spawn;404HashSet<int> to_despawn;405if (p_peer) {406ERR_FAIL_COND_V(!peers_info.has(p_peer), ERR_INVALID_PARAMETER);407if (is_visible == peers_info[p_peer].spawn_nodes.has(p_oid)) {408return OK;409}410if (is_visible) {411to_spawn.insert(p_peer);412} else {413to_despawn.insert(p_peer);414}415} else {416// Check visibility for each peers.417for (const KeyValue<int, PeerInfo> &E : peers_info) {418if (is_visible) {419// This is fast, since the object is visible to everyone, we don't need to check each peer.420if (E.value.spawn_nodes.has(p_oid)) {421// Already spawned.422continue;423}424to_spawn.insert(E.key);425} else {426// Need to check visibility for each peer.427_update_spawn_visibility(E.key, p_oid);428}429}430}431if (to_spawn.size()) {432int len = 0;433_make_spawn_packet(node, spawner, len);434for (int pid : to_spawn) {435ERR_CONTINUE(!peers_info.has(pid));436int path_id;437multiplayer_cache->send_object_cache(spawner, pid, path_id);438_send_raw(packet_cache.ptr(), len, pid, true);439peers_info[pid].spawn_nodes.insert(p_oid);440}441}442if (to_despawn.size()) {443int len = 0;444_make_despawn_packet(node, len);445for (int pid : to_despawn) {446ERR_CONTINUE(!peers_info.has(pid));447peers_info[pid].spawn_nodes.erase(p_oid);448_send_raw(packet_cache.ptr(), len, pid, true);449}450}451return OK;452}453454Error SceneReplicationInterface::_send_raw(const uint8_t *p_buffer, int p_size, int p_peer, bool p_reliable) {455ERR_FAIL_COND_V(!p_buffer || p_size < 1, ERR_INVALID_PARAMETER);456457Ref<MultiplayerPeer> peer = multiplayer->get_multiplayer_peer();458ERR_FAIL_COND_V(peer.is_null(), ERR_UNCONFIGURED);459peer->set_transfer_channel(0);460peer->set_transfer_mode(p_reliable ? MultiplayerPeer::TRANSFER_MODE_RELIABLE : MultiplayerPeer::TRANSFER_MODE_UNRELIABLE);461return multiplayer->send_command(p_peer, p_buffer, p_size);462}463464Error SceneReplicationInterface::_make_spawn_packet(Node *p_node, MultiplayerSpawner *p_spawner, int &r_len) {465ERR_FAIL_COND_V(!multiplayer || !p_node || !p_spawner, ERR_BUG);466467const ObjectID oid = p_node->get_instance_id();468TrackedNode *tnode = tracked_nodes.getptr(oid);469ERR_FAIL_NULL_V(tnode, ERR_INVALID_PARAMETER);470471if (tnode->net_id == 0) {472// Ensure the node has an ID.473tnode->net_id = ++last_net_id;474}475uint32_t nid = tnode->net_id;476ERR_FAIL_COND_V(!nid, ERR_UNCONFIGURED);477478// Prepare custom arg and scene_id479uint8_t scene_id = p_spawner->find_spawnable_scene_index_from_object(oid);480bool is_custom = scene_id == MultiplayerSpawner::INVALID_ID;481Variant spawn_arg = p_spawner->get_spawn_argument(oid);482int spawn_arg_size = 0;483if (is_custom) {484Error err = MultiplayerAPI::encode_and_compress_variant(spawn_arg, nullptr, spawn_arg_size, false);485ERR_FAIL_COND_V(err, err);486}487488// Prepare spawn state.489List<NodePath> state_props;490List<uint32_t> sync_ids;491const HashSet<ObjectID> synchronizers = tnode->synchronizers;492for (const ObjectID &sid : synchronizers) {493MultiplayerSynchronizer *sync = get_id_as<MultiplayerSynchronizer>(sid);494if (!_has_authority(sync)) {495continue;496}497ERR_CONTINUE(!sync);498ERR_FAIL_NULL_V(sync->get_replication_config_ptr(), ERR_BUG);499for (const NodePath &prop : sync->get_replication_config_ptr()->get_spawn_properties()) {500state_props.push_back(prop);501}502// Ensure the synchronizer has an ID.503if (sync->get_net_id() == 0) {504sync->set_net_id(++last_net_id);505}506sync_ids.push_back(sync->get_net_id());507}508int state_size = 0;509Vector<Variant> state_vars;510Vector<const Variant *> state_varp;511if (state_props.size()) {512Error err = MultiplayerSynchronizer::get_state(state_props, p_node, state_vars, state_varp);513ERR_FAIL_COND_V_MSG(err != OK, err, "Unable to retrieve spawn state.");514err = MultiplayerAPI::encode_and_compress_variants(state_varp.ptrw(), state_varp.size(), nullptr, state_size);515ERR_FAIL_COND_V_MSG(err != OK, err, "Unable to encode spawn state.");516}517518// Encode scene ID, path ID, net ID, node name.519int path_id = multiplayer_cache->make_object_cache(p_spawner);520CharString cname = p_node->get_name().operator String().utf8();521int nlen = encode_cstring(cname.get_data(), nullptr);522MAKE_ROOM(1 + 1 + 4 + 4 + 4 + 4 * sync_ids.size() + 4 + nlen + (is_custom ? 4 + spawn_arg_size : 0) + state_size);523uint8_t *ptr = packet_cache.ptrw();524ptr[0] = (uint8_t)SceneMultiplayer::NETWORK_COMMAND_SPAWN;525ptr[1] = scene_id;526int ofs = 2;527ofs += encode_uint32(path_id, &ptr[ofs]);528ofs += encode_uint32(nid, &ptr[ofs]);529ofs += encode_uint32(sync_ids.size(), &ptr[ofs]);530ofs += encode_uint32(nlen, &ptr[ofs]);531for (uint32_t snid : sync_ids) {532ofs += encode_uint32(snid, &ptr[ofs]);533}534ofs += encode_cstring(cname.get_data(), &ptr[ofs]);535// Write args536if (is_custom) {537ofs += encode_uint32(spawn_arg_size, &ptr[ofs]);538Error err = MultiplayerAPI::encode_and_compress_variant(spawn_arg, &ptr[ofs], spawn_arg_size, false);539ERR_FAIL_COND_V(err, err);540ofs += spawn_arg_size;541}542// Write state.543if (state_size) {544Error err = MultiplayerAPI::encode_and_compress_variants(state_varp.ptrw(), state_varp.size(), &ptr[ofs], state_size);545ERR_FAIL_COND_V(err, err);546ofs += state_size;547}548r_len = ofs;549return OK;550}551552Error SceneReplicationInterface::_make_despawn_packet(Node *p_node, int &r_len) {553const ObjectID oid = p_node->get_instance_id();554const TrackedNode *tnode = tracked_nodes.getptr(oid);555ERR_FAIL_NULL_V(tnode, ERR_INVALID_PARAMETER);556MAKE_ROOM(5);557uint8_t *ptr = packet_cache.ptrw();558ptr[0] = (uint8_t)SceneMultiplayer::NETWORK_COMMAND_DESPAWN;559int ofs = 1;560uint32_t nid = tnode->net_id;561ofs += encode_uint32(nid, &ptr[ofs]);562r_len = ofs;563return OK;564}565566Error SceneReplicationInterface::on_spawn_receive(int p_from, const uint8_t *p_buffer, int p_buffer_len) {567ERR_FAIL_COND_V_MSG(p_buffer_len < 18, ERR_INVALID_DATA, "Invalid spawn packet received");568int ofs = 1; // The spawn/despawn command.569uint8_t scene_id = p_buffer[ofs];570ofs += 1;571uint32_t node_target = decode_uint32(&p_buffer[ofs]);572ofs += 4;573MultiplayerSpawner *spawner = Object::cast_to<MultiplayerSpawner>(multiplayer_cache->get_cached_object(p_from, node_target));574ERR_FAIL_NULL_V(spawner, ERR_DOES_NOT_EXIST);575ERR_FAIL_COND_V(p_from != spawner->get_multiplayer_authority(), ERR_UNAUTHORIZED);576577uint32_t net_id = decode_uint32(&p_buffer[ofs]);578ofs += 4;579uint32_t sync_len = decode_uint32(&p_buffer[ofs]);580ofs += 4;581uint32_t name_len = decode_uint32(&p_buffer[ofs]);582ofs += 4;583ERR_FAIL_COND_V_MSG(name_len + (sync_len * 4) > uint32_t(p_buffer_len - ofs), ERR_INVALID_DATA, vformat("Invalid spawn packet size: %d, wants: %d", p_buffer_len, ofs + name_len + (sync_len * 4)));584List<uint32_t> sync_ids;585for (uint32_t i = 0; i < sync_len; i++) {586sync_ids.push_back(decode_uint32(&p_buffer[ofs]));587ofs += 4;588}589ERR_FAIL_COND_V_MSG(name_len < 1, ERR_INVALID_DATA, "Zero spawn name size.");590591// We need to make sure no trickery happens here, but we want to allow autogenerated ("@") node names.592const String name = String::utf8((const char *)&p_buffer[ofs], name_len);593ERR_FAIL_COND_V_MSG(name.validate_node_name() != name, ERR_INVALID_DATA, vformat("Invalid node name received: '%s'. Make sure to add nodes via 'add_child(node, true)' remotely.", name));594ofs += name_len;595596// Check that we can spawn.597Node *parent = spawner->get_node_or_null(spawner->get_spawn_path());598ERR_FAIL_NULL_V(parent, ERR_UNCONFIGURED);599ERR_FAIL_COND_V(parent->has_node(name), ERR_INVALID_DATA);600601Node *node = nullptr;602if (scene_id == MultiplayerSpawner::INVALID_ID) {603// Custom spawn.604ERR_FAIL_COND_V(p_buffer_len - ofs < 4, ERR_INVALID_DATA);605uint32_t arg_size = decode_uint32(&p_buffer[ofs]);606ofs += 4;607ERR_FAIL_COND_V(arg_size > uint32_t(p_buffer_len - ofs), ERR_INVALID_DATA);608Variant v;609Error err = MultiplayerAPI::decode_and_decompress_variant(v, &p_buffer[ofs], arg_size, nullptr, false);610ERR_FAIL_COND_V(err != OK, err);611ofs += arg_size;612node = spawner->instantiate_custom(v);613} else {614// Scene based spawn.615node = spawner->instantiate_scene(scene_id);616}617ERR_FAIL_NULL_V(node, ERR_UNAUTHORIZED);618node->set_name(name);619620// Add and track remote621ERR_FAIL_COND_V(!peers_info.has(p_from), ERR_UNAVAILABLE);622ERR_FAIL_COND_V(peers_info[p_from].recv_nodes.has(net_id), ERR_ALREADY_IN_USE);623ObjectID oid = node->get_instance_id();624TrackedNode &tobj = _track(oid);625tobj.spawner = spawner->get_instance_id();626tobj.net_id = net_id;627tobj.remote_peer = p_from;628peers_info[p_from].recv_nodes[net_id] = oid;629630// The initial state will be applied during the sync config (i.e. before _ready).631pending_spawn = node->get_instance_id();632pending_spawn_remote = p_from;633pending_buffer_size = p_buffer_len - ofs;634pending_buffer = pending_buffer_size > 0 ? &p_buffer[ofs] : nullptr;635pending_sync_net_ids = sync_ids;636637parent->add_child(node);638spawner->emit_signal(SNAME("spawned"), node);639640pending_spawn = ObjectID();641pending_spawn_remote = 0;642pending_buffer = nullptr;643pending_buffer_size = 0;644if (pending_sync_net_ids.size()) {645pending_sync_net_ids.clear();646ERR_FAIL_V(ERR_INVALID_DATA); // Should have been consumed.647}648return OK;649}650651Error SceneReplicationInterface::on_despawn_receive(int p_from, const uint8_t *p_buffer, int p_buffer_len) {652ERR_FAIL_COND_V_MSG(p_buffer_len < 5, ERR_INVALID_DATA, "Invalid spawn packet received");653int ofs = 1; // The spawn/despawn command.654uint32_t net_id = decode_uint32(&p_buffer[ofs]);655ofs += 4;656657// Untrack remote658ERR_FAIL_COND_V(!peers_info.has(p_from), ERR_UNAUTHORIZED);659PeerInfo &pinfo = peers_info[p_from];660ERR_FAIL_COND_V(!pinfo.recv_nodes.has(net_id), ERR_UNAUTHORIZED);661Node *node = get_id_as<Node>(pinfo.recv_nodes[net_id]);662ERR_FAIL_NULL_V(node, ERR_BUG);663pinfo.recv_nodes.erase(net_id);664665const ObjectID oid = node->get_instance_id();666ERR_FAIL_COND_V(!tracked_nodes.has(oid), ERR_BUG);667MultiplayerSpawner *spawner = get_id_as<MultiplayerSpawner>(tracked_nodes[oid].spawner);668ERR_FAIL_NULL_V(spawner, ERR_DOES_NOT_EXIST);669ERR_FAIL_COND_V(p_from != spawner->get_multiplayer_authority(), ERR_UNAUTHORIZED);670671if (node->get_parent() != nullptr) {672node->get_parent()->remove_child(node);673}674node->queue_free();675spawner->emit_signal(SNAME("despawned"), node);676677return OK;678}679680bool SceneReplicationInterface::_verify_synchronizer(int p_peer, MultiplayerSynchronizer *p_sync, uint32_t &r_net_id) {681r_net_id = p_sync->get_net_id();682if (r_net_id == 0 || (r_net_id & 0x80000000)) {683int path_id = 0;684bool verified = multiplayer_cache->send_object_cache(p_sync, p_peer, path_id);685ERR_FAIL_COND_V_MSG(path_id < 0, false, "This should never happen!");686if (r_net_id == 0) {687// First time path based ID.688r_net_id = path_id | 0x80000000;689p_sync->set_net_id(r_net_id | 0x80000000);690}691return verified;692}693return true;694}695696MultiplayerSynchronizer *SceneReplicationInterface::_find_synchronizer(int p_peer, uint32_t p_net_id) {697MultiplayerSynchronizer *sync = nullptr;698if (p_net_id & 0x80000000) {699sync = Object::cast_to<MultiplayerSynchronizer>(multiplayer_cache->get_cached_object(p_peer, p_net_id & 0x7FFFFFFF));700} else if (peers_info[p_peer].recv_sync_ids.has(p_net_id)) {701const ObjectID &sid = peers_info[p_peer].recv_sync_ids[p_net_id];702sync = get_id_as<MultiplayerSynchronizer>(sid);703}704return sync;705}706707void SceneReplicationInterface::_send_delta(int p_peer, const HashSet<ObjectID> &p_synchronizers, uint64_t p_usec, const HashMap<ObjectID, uint64_t> &p_last_watch_usecs) {708MAKE_ROOM(/* header */ 1 + /* element */ 4 + 8 + 4 + delta_mtu);709uint8_t *ptr = packet_cache.ptrw();710ptr[0] = SceneMultiplayer::NETWORK_COMMAND_SYNC | (1 << SceneMultiplayer::CMD_FLAG_0_SHIFT);711int ofs = 1;712for (const ObjectID &oid : p_synchronizers) {713MultiplayerSynchronizer *sync = get_id_as<MultiplayerSynchronizer>(oid);714ERR_CONTINUE(!sync || !sync->get_replication_config_ptr() || !_has_authority(sync));715uint32_t net_id;716if (!_verify_synchronizer(p_peer, sync, net_id)) {717continue;718}719uint64_t last_usec = p_last_watch_usecs.has(oid) ? p_last_watch_usecs[oid] : 0;720uint64_t indexes;721List<Variant> delta = sync->get_delta_state(p_usec, last_usec, indexes);722723if (!delta.size()) {724continue; // Nothing to update.725}726727Vector<const Variant *> varp;728varp.resize(delta.size());729const Variant **vptr = varp.ptrw();730int i = 0;731for (const Variant &v : delta) {732vptr[i] = &v;733i++;734}735int size;736Error err = MultiplayerAPI::encode_and_compress_variants(vptr, varp.size(), nullptr, size);737ERR_CONTINUE_MSG(err != OK, "Unable to encode delta state.");738739ERR_CONTINUE_MSG(size > delta_mtu, vformat("Synchronizer delta bigger than MTU will not be sent (%d > %d): %s", size, delta_mtu, sync->get_path()));740741if (ofs + 4 + 8 + 4 + size > delta_mtu) {742// Send what we got, and reset write.743_send_raw(packet_cache.ptr(), ofs, p_peer, true);744ofs = 1;745}746if (size) {747ofs += encode_uint32(sync->get_net_id(), &ptr[ofs]);748ofs += encode_uint64(indexes, &ptr[ofs]);749ofs += encode_uint32(size, &ptr[ofs]);750MultiplayerAPI::encode_and_compress_variants(vptr, varp.size(), &ptr[ofs], size);751ofs += size;752}753#ifdef DEBUG_ENABLED754_profile_node_data("delta_out", oid, size);755#endif756peers_info[p_peer].last_watch_usecs[oid] = p_usec;757}758if (ofs > 1) {759// Got some left over to send.760_send_raw(packet_cache.ptr(), ofs, p_peer, true);761}762}763764Error SceneReplicationInterface::on_delta_receive(int p_from, const uint8_t *p_buffer, int p_buffer_len) {765int ofs = 1;766while (ofs + 4 + 8 + 4 < p_buffer_len) {767uint32_t net_id = decode_uint32(&p_buffer[ofs]);768ofs += 4;769uint64_t indexes = decode_uint64(&p_buffer[ofs]);770ofs += 8;771uint32_t size = decode_uint32(&p_buffer[ofs]);772ofs += 4;773ERR_FAIL_COND_V(size > uint32_t(p_buffer_len - ofs), ERR_INVALID_DATA);774MultiplayerSynchronizer *sync = _find_synchronizer(p_from, net_id);775Node *node = sync ? sync->get_root_node() : nullptr;776if (!sync || sync->get_multiplayer_authority() != p_from || !node) {777ofs += size;778ERR_CONTINUE_MSG(true, "Ignoring delta for non-authority or invalid synchronizer.");779}780List<NodePath> props = sync->get_delta_properties(indexes);781ERR_FAIL_COND_V(props.is_empty(), ERR_INVALID_DATA);782Vector<Variant> vars;783vars.resize(props.size());784int consumed = 0;785Error err = MultiplayerAPI::decode_and_decompress_variants(vars, p_buffer + ofs, size, consumed);786ERR_FAIL_COND_V(err != OK, err);787ERR_FAIL_COND_V(uint32_t(consumed) != size, ERR_INVALID_DATA);788err = MultiplayerSynchronizer::set_state(props, node, vars);789ERR_FAIL_COND_V(err != OK, err);790ofs += size;791sync->emit_signal(SNAME("delta_synchronized"));792#ifdef DEBUG_ENABLED793_profile_node_data("delta_in", sync->get_instance_id(), size);794#endif795}796return OK;797}798799void SceneReplicationInterface::_send_sync(int p_peer, const HashSet<ObjectID> &p_synchronizers, uint16_t p_sync_net_time, uint64_t p_usec) {800MAKE_ROOM(/* header */ 3 + /* element */ 4 + 4 + sync_mtu);801uint8_t *ptr = packet_cache.ptrw();802ptr[0] = SceneMultiplayer::NETWORK_COMMAND_SYNC;803int ofs = 1;804ofs += encode_uint16(p_sync_net_time, &ptr[1]);805// Can only send updates for already notified nodes.806// This is a lazy implementation, we could optimize much more here with by grouping by replication config.807for (const ObjectID &oid : p_synchronizers) {808MultiplayerSynchronizer *sync = get_id_as<MultiplayerSynchronizer>(oid);809ERR_CONTINUE(!sync || !sync->get_replication_config_ptr() || !_has_authority(sync));810if (!sync->update_outbound_sync_time(p_usec)) {811continue; // nothing to sync.812}813814Node *node = sync->get_root_node();815ERR_CONTINUE(!node);816uint32_t net_id = sync->get_net_id();817if (!_verify_synchronizer(p_peer, sync, net_id)) {818// The path based sync is not yet confirmed, skipping.819continue;820}821int size;822Vector<Variant> vars;823Vector<const Variant *> varp;824const List<NodePath> props = sync->get_replication_config_ptr()->get_sync_properties();825Error err = MultiplayerSynchronizer::get_state(props, node, vars, varp);826ERR_CONTINUE_MSG(err != OK, "Unable to retrieve sync state.");827err = MultiplayerAPI::encode_and_compress_variants(varp.ptrw(), varp.size(), nullptr, size);828ERR_CONTINUE_MSG(err != OK, "Unable to encode sync state.");829// TODO Handle single state above MTU.830ERR_CONTINUE_MSG(size > sync_mtu, vformat("Node states bigger than MTU will not be sent (%d > %d): %s", size, sync_mtu, node->get_path()));831if (ofs + 4 + 4 + size > sync_mtu) {832// Send what we got, and reset write.833_send_raw(packet_cache.ptr(), ofs, p_peer, false);834ofs = 3;835}836if (size) {837ofs += encode_uint32(sync->get_net_id(), &ptr[ofs]);838ofs += encode_uint32(size, &ptr[ofs]);839MultiplayerAPI::encode_and_compress_variants(varp.ptrw(), varp.size(), &ptr[ofs], size);840ofs += size;841}842#ifdef DEBUG_ENABLED843_profile_node_data("sync_out", oid, size);844#endif845}846if (ofs > 3) {847// Got some left over to send.848_send_raw(packet_cache.ptr(), ofs, p_peer, false);849}850}851852Error SceneReplicationInterface::on_sync_receive(int p_from, const uint8_t *p_buffer, int p_buffer_len) {853ERR_FAIL_COND_V_MSG(p_buffer_len < 11, ERR_INVALID_DATA, "Invalid sync packet received");854bool is_delta = (p_buffer[0] & (1 << SceneMultiplayer::CMD_FLAG_0_SHIFT)) != 0;855if (is_delta) {856return on_delta_receive(p_from, p_buffer, p_buffer_len);857}858uint16_t time = decode_uint16(&p_buffer[1]);859int ofs = 3;860while (ofs + 8 < p_buffer_len) {861uint32_t net_id = decode_uint32(&p_buffer[ofs]);862ofs += 4;863uint32_t size = decode_uint32(&p_buffer[ofs]);864ofs += 4;865ERR_FAIL_COND_V(size > uint32_t(p_buffer_len - ofs), ERR_INVALID_DATA);866MultiplayerSynchronizer *sync = _find_synchronizer(p_from, net_id);867if (!sync) {868// Not received yet.869ofs += size;870continue;871}872Node *node = sync->get_root_node();873if (sync->get_multiplayer_authority() != p_from || !node) {874// Not valid for me.875ofs += size;876ERR_CONTINUE_MSG(true, "Ignoring sync data from non-authority or for missing node.");877}878if (!sync->update_inbound_sync_time(time)) {879// State is too old.880ofs += size;881continue;882}883const List<NodePath> props = sync->get_replication_config_ptr()->get_sync_properties();884Vector<Variant> vars;885vars.resize(props.size());886int consumed;887Error err = MultiplayerAPI::decode_and_decompress_variants(vars, &p_buffer[ofs], size, consumed);888ERR_FAIL_COND_V(err, err);889err = MultiplayerSynchronizer::set_state(props, node, vars);890ERR_FAIL_COND_V(err, err);891ofs += size;892sync->emit_signal(SNAME("synchronized"));893#ifdef DEBUG_ENABLED894_profile_node_data("sync_in", sync->get_instance_id(), size);895#endif896}897return OK;898}899900void SceneReplicationInterface::set_max_sync_packet_size(int p_size) {901ERR_FAIL_COND_MSG(p_size < 128, "Sync maximum packet size must be at least 128 bytes.");902sync_mtu = p_size;903}904905int SceneReplicationInterface::get_max_sync_packet_size() const {906return sync_mtu;907}908909void SceneReplicationInterface::set_max_delta_packet_size(int p_size) {910ERR_FAIL_COND_MSG(p_size < 128, "Sync maximum packet size must be at least 128 bytes.");911delta_mtu = p_size;912}913914int SceneReplicationInterface::get_max_delta_packet_size() const {915return delta_mtu;916}917918919