Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
godotengine
GitHub Repository: godotengine/godot
Path: blob/master/modules/multiplayer/scene_replication_interface.cpp
10277 views
1
/**************************************************************************/
2
/* scene_replication_interface.cpp */
3
/**************************************************************************/
4
/* This file is part of: */
5
/* GODOT ENGINE */
6
/* https://godotengine.org */
7
/**************************************************************************/
8
/* Copyright (c) 2014-present Godot Engine contributors (see AUTHORS.md). */
9
/* Copyright (c) 2007-2014 Juan Linietsky, Ariel Manzur. */
10
/* */
11
/* Permission is hereby granted, free of charge, to any person obtaining */
12
/* a copy of this software and associated documentation files (the */
13
/* "Software"), to deal in the Software without restriction, including */
14
/* without limitation the rights to use, copy, modify, merge, publish, */
15
/* distribute, sublicense, and/or sell copies of the Software, and to */
16
/* permit persons to whom the Software is furnished to do so, subject to */
17
/* the following conditions: */
18
/* */
19
/* The above copyright notice and this permission notice shall be */
20
/* included in all copies or substantial portions of the Software. */
21
/* */
22
/* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, */
23
/* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF */
24
/* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. */
25
/* IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY */
26
/* CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, */
27
/* TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE */
28
/* SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. */
29
/**************************************************************************/
30
31
#include "scene_replication_interface.h"
32
33
#include "scene_multiplayer.h"
34
35
#include "core/debugger/engine_debugger.h"
36
#include "core/io/marshalls.h"
37
#include "scene/main/node.h"
38
39
#define MAKE_ROOM(m_amount) \
40
if (packet_cache.size() < m_amount) \
41
packet_cache.resize(m_amount);
42
43
#ifdef DEBUG_ENABLED
44
_FORCE_INLINE_ void SceneReplicationInterface::_profile_node_data(const String &p_what, ObjectID p_id, int p_size) {
45
if (EngineDebugger::is_profiling("multiplayer:replication")) {
46
Array values = { p_what, p_id, p_size };
47
EngineDebugger::profiler_add_frame_data("multiplayer:replication", values);
48
}
49
}
50
#endif
51
52
SceneReplicationInterface::TrackedNode &SceneReplicationInterface::_track(const ObjectID &p_id) {
53
if (!tracked_nodes.has(p_id)) {
54
tracked_nodes[p_id] = TrackedNode(p_id);
55
Node *node = get_id_as<Node>(p_id);
56
node->connect(SceneStringName(tree_exited), callable_mp(this, &SceneReplicationInterface::_untrack).bind(p_id), Node::CONNECT_ONE_SHOT);
57
}
58
return tracked_nodes[p_id];
59
}
60
61
void SceneReplicationInterface::_untrack(const ObjectID &p_id) {
62
if (!tracked_nodes.has(p_id)) {
63
return;
64
}
65
uint32_t net_id = tracked_nodes[p_id].net_id;
66
uint32_t peer = tracked_nodes[p_id].remote_peer;
67
tracked_nodes.erase(p_id);
68
// If it was spawned by a remote, remove it from the received nodes.
69
if (peer && peers_info.has(peer)) {
70
peers_info[peer].recv_nodes.erase(net_id);
71
}
72
// If we spawned or synced it, we need to remove it from any peer it was sent to.
73
if (net_id || peer == 0) {
74
for (KeyValue<int, PeerInfo> &E : peers_info) {
75
E.value.spawn_nodes.erase(p_id);
76
}
77
}
78
}
79
80
void SceneReplicationInterface::_free_remotes(const PeerInfo &p_info) {
81
for (const KeyValue<uint32_t, ObjectID> &E : p_info.recv_nodes) {
82
Node *node = tracked_nodes.has(E.value) ? get_id_as<Node>(E.value) : nullptr;
83
ERR_CONTINUE(!node);
84
node->queue_free();
85
}
86
}
87
88
bool SceneReplicationInterface::_has_authority(const Node *p_node) {
89
return multiplayer->has_multiplayer_peer() && p_node->get_multiplayer_authority() == multiplayer->get_unique_id();
90
}
91
92
void SceneReplicationInterface::on_peer_change(int p_id, bool p_connected) {
93
if (p_connected) {
94
peers_info[p_id] = PeerInfo();
95
for (const ObjectID &oid : spawned_nodes) {
96
_update_spawn_visibility(p_id, oid);
97
}
98
for (const ObjectID &oid : sync_nodes) {
99
_update_sync_visibility(p_id, get_id_as<MultiplayerSynchronizer>(oid));
100
}
101
} else {
102
ERR_FAIL_COND(!peers_info.has(p_id));
103
_free_remotes(peers_info[p_id]);
104
peers_info.erase(p_id);
105
}
106
}
107
108
void SceneReplicationInterface::on_reset() {
109
for (const KeyValue<int, PeerInfo> &E : peers_info) {
110
_free_remotes(E.value);
111
}
112
peers_info.clear();
113
// Tracked nodes are cleared on deletion, here we only reset the ids so they can be later re-assigned.
114
for (KeyValue<ObjectID, TrackedNode> &E : tracked_nodes) {
115
TrackedNode &tobj = E.value;
116
tobj.net_id = 0;
117
tobj.remote_peer = 0;
118
}
119
for (const ObjectID &oid : sync_nodes) {
120
MultiplayerSynchronizer *sync = get_id_as<MultiplayerSynchronizer>(oid);
121
ERR_CONTINUE(!sync);
122
sync->reset();
123
}
124
last_net_id = 0;
125
}
126
127
void SceneReplicationInterface::on_network_process() {
128
// Prevent endless stalling in case of unforeseen spawn errors.
129
if (spawn_queue.size()) {
130
ERR_PRINT("An error happened during last spawn, this usually means the 'ready' signal was not emitted by the spawned node.");
131
for (const ObjectID &oid : spawn_queue) {
132
Node *node = get_id_as<Node>(oid);
133
ERR_CONTINUE(!node);
134
if (node->is_connected(SceneStringName(ready), callable_mp(this, &SceneReplicationInterface::_node_ready))) {
135
node->disconnect(SceneStringName(ready), callable_mp(this, &SceneReplicationInterface::_node_ready));
136
}
137
}
138
spawn_queue.clear();
139
}
140
141
// Process syncs.
142
uint64_t usec = OS::get_singleton()->get_ticks_usec();
143
for (KeyValue<int, PeerInfo> &E : peers_info) {
144
const HashSet<ObjectID> to_sync = E.value.sync_nodes;
145
if (to_sync.is_empty()) {
146
continue; // Nothing to sync
147
}
148
uint16_t sync_net_time = ++E.value.last_sent_sync;
149
_send_sync(E.key, to_sync, sync_net_time, usec);
150
_send_delta(E.key, to_sync, usec, E.value.last_watch_usecs);
151
}
152
}
153
154
Error SceneReplicationInterface::on_spawn(Object *p_obj, Variant p_config) {
155
Node *node = Object::cast_to<Node>(p_obj);
156
ERR_FAIL_COND_V(!node || p_config.get_type() != Variant::OBJECT, ERR_INVALID_PARAMETER);
157
MultiplayerSpawner *spawner = Object::cast_to<MultiplayerSpawner>(p_config.get_validated_object());
158
ERR_FAIL_NULL_V(spawner, ERR_INVALID_PARAMETER);
159
// Track node.
160
const ObjectID oid = node->get_instance_id();
161
TrackedNode &tobj = _track(oid);
162
163
// Spawn state needs to be callected after "ready", but the spawn order follows "enter_tree".
164
ERR_FAIL_COND_V(tobj.spawner != ObjectID(), ERR_ALREADY_IN_USE);
165
tobj.spawner = spawner->get_instance_id();
166
spawn_queue.insert(oid);
167
node->connect(SceneStringName(ready), callable_mp(this, &SceneReplicationInterface::_node_ready).bind(oid), Node::CONNECT_ONE_SHOT);
168
return OK;
169
}
170
171
void SceneReplicationInterface::_node_ready(const ObjectID &p_oid) {
172
ERR_FAIL_COND(!spawn_queue.has(p_oid)); // Bug.
173
174
// If we are a nested spawn, we need to wait until the parent is ready.
175
if (p_oid != *(spawn_queue.begin())) {
176
return;
177
}
178
179
for (const ObjectID &oid : spawn_queue) {
180
ERR_CONTINUE(!tracked_nodes.has(oid));
181
182
TrackedNode &tobj = tracked_nodes[oid];
183
MultiplayerSpawner *spawner = get_id_as<MultiplayerSpawner>(tobj.spawner);
184
ERR_CONTINUE(!spawner);
185
186
spawned_nodes.insert(oid);
187
if (_has_authority(spawner)) {
188
_update_spawn_visibility(0, oid);
189
}
190
}
191
spawn_queue.clear();
192
}
193
194
Error SceneReplicationInterface::on_despawn(Object *p_obj, Variant p_config) {
195
Node *node = Object::cast_to<Node>(p_obj);
196
ERR_FAIL_COND_V(!node || p_config.get_type() != Variant::OBJECT, ERR_INVALID_PARAMETER);
197
MultiplayerSpawner *spawner = Object::cast_to<MultiplayerSpawner>(p_config.get_validated_object());
198
ERR_FAIL_COND_V(!p_obj || !spawner, ERR_INVALID_PARAMETER);
199
// Forcibly despawn to all peers that knowns me.
200
int len = 0;
201
Error err = _make_despawn_packet(node, len);
202
ERR_FAIL_COND_V(err != OK, ERR_BUG);
203
const ObjectID oid = p_obj->get_instance_id();
204
for (const KeyValue<int, PeerInfo> &E : peers_info) {
205
if (!E.value.spawn_nodes.has(oid)) {
206
continue;
207
}
208
_send_raw(packet_cache.ptr(), len, E.key, true);
209
}
210
// Also remove spawner tracking from the replication state.
211
ERR_FAIL_COND_V(!tracked_nodes.has(oid), ERR_INVALID_PARAMETER);
212
TrackedNode &tobj = _track(oid);
213
ERR_FAIL_COND_V(tobj.spawner != spawner->get_instance_id(), ERR_INVALID_PARAMETER);
214
tobj.spawner = ObjectID();
215
spawned_nodes.erase(oid);
216
for (KeyValue<int, PeerInfo> &E : peers_info) {
217
E.value.spawn_nodes.erase(oid);
218
}
219
return OK;
220
}
221
222
Error SceneReplicationInterface::on_replication_start(Object *p_obj, Variant p_config) {
223
Node *node = Object::cast_to<Node>(p_obj);
224
ERR_FAIL_COND_V(!node || p_config.get_type() != Variant::OBJECT, ERR_INVALID_PARAMETER);
225
MultiplayerSynchronizer *sync = Object::cast_to<MultiplayerSynchronizer>(p_config.get_validated_object());
226
ERR_FAIL_NULL_V(sync, ERR_INVALID_PARAMETER);
227
228
// Add to synchronizer list.
229
TrackedNode &tobj = _track(p_obj->get_instance_id());
230
const ObjectID sid = sync->get_instance_id();
231
tobj.synchronizers.insert(sid);
232
sync_nodes.insert(sid);
233
234
// Update visibility.
235
sync->connect(SceneStringName(visibility_changed), callable_mp(this, &SceneReplicationInterface::_visibility_changed).bind(sync->get_instance_id()));
236
_update_sync_visibility(0, sync);
237
238
if (pending_spawn == p_obj->get_instance_id() && sync->get_multiplayer_authority() == pending_spawn_remote) {
239
// Try to apply synchronizer Net ID
240
ERR_FAIL_COND_V_MSG(pending_sync_net_ids.is_empty(), ERR_INVALID_DATA, vformat("The MultiplayerSynchronizer at path \"%s\" is unable to process the pending spawn since it has no network ID. This might happen when changing the multiplayer authority during the \"_ready\" callback. Make sure to only change the authority of multiplayer synchronizers during \"_enter_tree\" or the \"_spawn_custom\" callback of their multiplayer spawner.", sync->get_path()));
241
ERR_FAIL_COND_V(!peers_info.has(pending_spawn_remote), ERR_INVALID_DATA);
242
uint32_t net_id = pending_sync_net_ids.front()->get();
243
pending_sync_net_ids.pop_front();
244
peers_info[pending_spawn_remote].recv_sync_ids[net_id] = sync->get_instance_id();
245
sync->set_net_id(net_id);
246
247
// Try to apply spawn state (before ready).
248
if (pending_buffer_size > 0) {
249
ERR_FAIL_COND_V(!node || !sync->get_replication_config_ptr(), ERR_UNCONFIGURED);
250
int consumed = 0;
251
const List<NodePath> props = sync->get_replication_config_ptr()->get_spawn_properties();
252
Vector<Variant> vars;
253
vars.resize(props.size());
254
Error err = MultiplayerAPI::decode_and_decompress_variants(vars, pending_buffer, pending_buffer_size, consumed);
255
ERR_FAIL_COND_V(err, err);
256
if (consumed > 0) {
257
pending_buffer += consumed;
258
pending_buffer_size -= consumed;
259
err = MultiplayerSynchronizer::set_state(props, node, vars);
260
ERR_FAIL_COND_V(err, err);
261
}
262
}
263
}
264
return OK;
265
}
266
267
Error SceneReplicationInterface::on_replication_stop(Object *p_obj, Variant p_config) {
268
Node *node = Object::cast_to<Node>(p_obj);
269
ERR_FAIL_COND_V(!node || p_config.get_type() != Variant::OBJECT, ERR_INVALID_PARAMETER);
270
MultiplayerSynchronizer *sync = Object::cast_to<MultiplayerSynchronizer>(p_config.get_validated_object());
271
ERR_FAIL_NULL_V(sync, ERR_INVALID_PARAMETER);
272
sync->disconnect(SceneStringName(visibility_changed), callable_mp(this, &SceneReplicationInterface::_visibility_changed));
273
// Untrack synchronizer.
274
const ObjectID oid = node->get_instance_id();
275
const ObjectID sid = sync->get_instance_id();
276
ERR_FAIL_COND_V(!tracked_nodes.has(oid), ERR_INVALID_PARAMETER);
277
TrackedNode &tobj = _track(oid);
278
tobj.synchronizers.erase(sid);
279
sync_nodes.erase(sid);
280
for (KeyValue<int, PeerInfo> &E : peers_info) {
281
E.value.sync_nodes.erase(sid);
282
E.value.last_watch_usecs.erase(sid);
283
if (sync->get_net_id()) {
284
E.value.recv_sync_ids.erase(sync->get_net_id());
285
}
286
}
287
return OK;
288
}
289
290
void SceneReplicationInterface::_visibility_changed(int p_peer, ObjectID p_sid) {
291
MultiplayerSynchronizer *sync = get_id_as<MultiplayerSynchronizer>(p_sid);
292
ERR_FAIL_NULL(sync); // Bug.
293
Node *node = sync->get_root_node();
294
ERR_FAIL_NULL(node); // Bug.
295
const ObjectID oid = node->get_instance_id();
296
if (spawned_nodes.has(oid) && p_peer != multiplayer->get_unique_id()) {
297
_update_spawn_visibility(p_peer, oid);
298
}
299
_update_sync_visibility(p_peer, sync);
300
}
301
302
bool SceneReplicationInterface::is_rpc_visible(const ObjectID &p_oid, int p_peer) const {
303
if (!tracked_nodes.has(p_oid)) {
304
return true; // Untracked nodes are always visible to RPCs.
305
}
306
ERR_FAIL_COND_V(p_peer < 0, false);
307
const TrackedNode &tnode = tracked_nodes[p_oid];
308
if (tnode.synchronizers.is_empty()) {
309
return true; // No synchronizers means no visibility restrictions.
310
}
311
if (tnode.remote_peer && uint32_t(p_peer) == tnode.remote_peer) {
312
return true; // RPCs on spawned nodes are always visible to spawner.
313
} else if (spawned_nodes.has(p_oid)) {
314
// It's a spawned node we control, this can be fast.
315
if (p_peer) {
316
return peers_info.has(p_peer) && peers_info[p_peer].spawn_nodes.has(p_oid);
317
} else {
318
for (const KeyValue<int, PeerInfo> &E : peers_info) {
319
if (!E.value.spawn_nodes.has(p_oid)) {
320
return false; // Not public.
321
}
322
}
323
return true; // All peers have this node.
324
}
325
} else {
326
// Cycle object synchronizers to check visibility.
327
for (const ObjectID &sid : tnode.synchronizers) {
328
MultiplayerSynchronizer *sync = get_id_as<MultiplayerSynchronizer>(sid);
329
ERR_CONTINUE(!sync);
330
// RPC visibility is composed using OR when multiple synchronizers are present.
331
// Note that we don't really care about authority here which may lead to unexpected
332
// results when using multiple synchronizers to control the same node.
333
if (sync->is_visible_to(p_peer)) {
334
return true;
335
}
336
}
337
return false; // Not visible.
338
}
339
}
340
341
Error SceneReplicationInterface::_update_sync_visibility(int p_peer, MultiplayerSynchronizer *p_sync) {
342
ERR_FAIL_NULL_V(p_sync, ERR_BUG);
343
if (!_has_authority(p_sync) || p_peer == multiplayer->get_unique_id()) {
344
return OK;
345
}
346
347
const ObjectID &sid = p_sync->get_instance_id();
348
bool is_visible = p_sync->is_visible_to(p_peer);
349
if (p_peer == 0) {
350
for (KeyValue<int, PeerInfo> &E : peers_info) {
351
// Might be visible to this specific peer.
352
bool is_visible_to_peer = is_visible || p_sync->is_visible_to(E.key);
353
if (is_visible_to_peer == E.value.sync_nodes.has(sid)) {
354
continue;
355
}
356
if (is_visible_to_peer) {
357
E.value.sync_nodes.insert(sid);
358
} else {
359
E.value.sync_nodes.erase(sid);
360
E.value.last_watch_usecs.erase(sid);
361
}
362
}
363
return OK;
364
} else {
365
ERR_FAIL_COND_V(!peers_info.has(p_peer), ERR_INVALID_PARAMETER);
366
if (is_visible == peers_info[p_peer].sync_nodes.has(sid)) {
367
return OK;
368
}
369
if (is_visible) {
370
peers_info[p_peer].sync_nodes.insert(sid);
371
} else {
372
peers_info[p_peer].sync_nodes.erase(sid);
373
peers_info[p_peer].last_watch_usecs.erase(sid);
374
}
375
return OK;
376
}
377
}
378
379
Error SceneReplicationInterface::_update_spawn_visibility(int p_peer, const ObjectID &p_oid) {
380
const TrackedNode *tnode = tracked_nodes.getptr(p_oid);
381
ERR_FAIL_NULL_V(tnode, ERR_BUG);
382
MultiplayerSpawner *spawner = get_id_as<MultiplayerSpawner>(tnode->spawner);
383
Node *node = get_id_as<Node>(p_oid);
384
ERR_FAIL_NULL_V(node, ERR_BUG);
385
ERR_FAIL_NULL_V(spawner, ERR_BUG);
386
ERR_FAIL_COND_V(!_has_authority(spawner), ERR_BUG);
387
ERR_FAIL_COND_V(!tracked_nodes.has(p_oid), ERR_BUG);
388
const HashSet<ObjectID> synchronizers = tracked_nodes[p_oid].synchronizers;
389
bool is_visible = true;
390
for (const ObjectID &sid : synchronizers) {
391
MultiplayerSynchronizer *sync = get_id_as<MultiplayerSynchronizer>(sid);
392
ERR_CONTINUE(!sync);
393
if (!_has_authority(sync)) {
394
continue;
395
}
396
// Spawn visibility is composed using OR when multiple synchronizers are present.
397
if (sync->is_visible_to(p_peer)) {
398
is_visible = true;
399
break;
400
}
401
is_visible = false;
402
}
403
// Spawn (and despawn) when needed.
404
HashSet<int> to_spawn;
405
HashSet<int> to_despawn;
406
if (p_peer) {
407
ERR_FAIL_COND_V(!peers_info.has(p_peer), ERR_INVALID_PARAMETER);
408
if (is_visible == peers_info[p_peer].spawn_nodes.has(p_oid)) {
409
return OK;
410
}
411
if (is_visible) {
412
to_spawn.insert(p_peer);
413
} else {
414
to_despawn.insert(p_peer);
415
}
416
} else {
417
// Check visibility for each peers.
418
for (const KeyValue<int, PeerInfo> &E : peers_info) {
419
if (is_visible) {
420
// This is fast, since the object is visible to everyone, we don't need to check each peer.
421
if (E.value.spawn_nodes.has(p_oid)) {
422
// Already spawned.
423
continue;
424
}
425
to_spawn.insert(E.key);
426
} else {
427
// Need to check visibility for each peer.
428
_update_spawn_visibility(E.key, p_oid);
429
}
430
}
431
}
432
if (to_spawn.size()) {
433
int len = 0;
434
_make_spawn_packet(node, spawner, len);
435
for (int pid : to_spawn) {
436
ERR_CONTINUE(!peers_info.has(pid));
437
int path_id;
438
multiplayer_cache->send_object_cache(spawner, pid, path_id);
439
_send_raw(packet_cache.ptr(), len, pid, true);
440
peers_info[pid].spawn_nodes.insert(p_oid);
441
}
442
}
443
if (to_despawn.size()) {
444
int len = 0;
445
_make_despawn_packet(node, len);
446
for (int pid : to_despawn) {
447
ERR_CONTINUE(!peers_info.has(pid));
448
peers_info[pid].spawn_nodes.erase(p_oid);
449
_send_raw(packet_cache.ptr(), len, pid, true);
450
}
451
}
452
return OK;
453
}
454
455
Error SceneReplicationInterface::_send_raw(const uint8_t *p_buffer, int p_size, int p_peer, bool p_reliable) {
456
ERR_FAIL_COND_V(!p_buffer || p_size < 1, ERR_INVALID_PARAMETER);
457
458
Ref<MultiplayerPeer> peer = multiplayer->get_multiplayer_peer();
459
ERR_FAIL_COND_V(peer.is_null(), ERR_UNCONFIGURED);
460
peer->set_transfer_channel(0);
461
peer->set_transfer_mode(p_reliable ? MultiplayerPeer::TRANSFER_MODE_RELIABLE : MultiplayerPeer::TRANSFER_MODE_UNRELIABLE);
462
return multiplayer->send_command(p_peer, p_buffer, p_size);
463
}
464
465
Error SceneReplicationInterface::_make_spawn_packet(Node *p_node, MultiplayerSpawner *p_spawner, int &r_len) {
466
ERR_FAIL_COND_V(!multiplayer || !p_node || !p_spawner, ERR_BUG);
467
468
const ObjectID oid = p_node->get_instance_id();
469
TrackedNode *tnode = tracked_nodes.getptr(oid);
470
ERR_FAIL_NULL_V(tnode, ERR_INVALID_PARAMETER);
471
472
if (tnode->net_id == 0) {
473
// Ensure the node has an ID.
474
tnode->net_id = ++last_net_id;
475
}
476
uint32_t nid = tnode->net_id;
477
ERR_FAIL_COND_V(!nid, ERR_UNCONFIGURED);
478
479
// Prepare custom arg and scene_id
480
uint8_t scene_id = p_spawner->find_spawnable_scene_index_from_object(oid);
481
bool is_custom = scene_id == MultiplayerSpawner::INVALID_ID;
482
Variant spawn_arg = p_spawner->get_spawn_argument(oid);
483
int spawn_arg_size = 0;
484
if (is_custom) {
485
Error err = MultiplayerAPI::encode_and_compress_variant(spawn_arg, nullptr, spawn_arg_size, false);
486
ERR_FAIL_COND_V(err, err);
487
}
488
489
// Prepare spawn state.
490
List<NodePath> state_props;
491
List<uint32_t> sync_ids;
492
const HashSet<ObjectID> synchronizers = tnode->synchronizers;
493
for (const ObjectID &sid : synchronizers) {
494
MultiplayerSynchronizer *sync = get_id_as<MultiplayerSynchronizer>(sid);
495
if (!_has_authority(sync)) {
496
continue;
497
}
498
ERR_CONTINUE(!sync);
499
ERR_FAIL_NULL_V(sync->get_replication_config_ptr(), ERR_BUG);
500
for (const NodePath &prop : sync->get_replication_config_ptr()->get_spawn_properties()) {
501
state_props.push_back(prop);
502
}
503
// Ensure the synchronizer has an ID.
504
if (sync->get_net_id() == 0) {
505
sync->set_net_id(++last_net_id);
506
}
507
sync_ids.push_back(sync->get_net_id());
508
}
509
int state_size = 0;
510
Vector<Variant> state_vars;
511
Vector<const Variant *> state_varp;
512
if (state_props.size()) {
513
Error err = MultiplayerSynchronizer::get_state(state_props, p_node, state_vars, state_varp);
514
ERR_FAIL_COND_V_MSG(err != OK, err, "Unable to retrieve spawn state.");
515
err = MultiplayerAPI::encode_and_compress_variants(state_varp.ptrw(), state_varp.size(), nullptr, state_size);
516
ERR_FAIL_COND_V_MSG(err != OK, err, "Unable to encode spawn state.");
517
}
518
519
// Encode scene ID, path ID, net ID, node name.
520
int path_id = multiplayer_cache->make_object_cache(p_spawner);
521
CharString cname = p_node->get_name().operator String().utf8();
522
int nlen = encode_cstring(cname.get_data(), nullptr);
523
MAKE_ROOM(1 + 1 + 4 + 4 + 4 + 4 * sync_ids.size() + 4 + nlen + (is_custom ? 4 + spawn_arg_size : 0) + state_size);
524
uint8_t *ptr = packet_cache.ptrw();
525
ptr[0] = (uint8_t)SceneMultiplayer::NETWORK_COMMAND_SPAWN;
526
ptr[1] = scene_id;
527
int ofs = 2;
528
ofs += encode_uint32(path_id, &ptr[ofs]);
529
ofs += encode_uint32(nid, &ptr[ofs]);
530
ofs += encode_uint32(sync_ids.size(), &ptr[ofs]);
531
ofs += encode_uint32(nlen, &ptr[ofs]);
532
for (uint32_t snid : sync_ids) {
533
ofs += encode_uint32(snid, &ptr[ofs]);
534
}
535
ofs += encode_cstring(cname.get_data(), &ptr[ofs]);
536
// Write args
537
if (is_custom) {
538
ofs += encode_uint32(spawn_arg_size, &ptr[ofs]);
539
Error err = MultiplayerAPI::encode_and_compress_variant(spawn_arg, &ptr[ofs], spawn_arg_size, false);
540
ERR_FAIL_COND_V(err, err);
541
ofs += spawn_arg_size;
542
}
543
// Write state.
544
if (state_size) {
545
Error err = MultiplayerAPI::encode_and_compress_variants(state_varp.ptrw(), state_varp.size(), &ptr[ofs], state_size);
546
ERR_FAIL_COND_V(err, err);
547
ofs += state_size;
548
}
549
r_len = ofs;
550
return OK;
551
}
552
553
Error SceneReplicationInterface::_make_despawn_packet(Node *p_node, int &r_len) {
554
const ObjectID oid = p_node->get_instance_id();
555
const TrackedNode *tnode = tracked_nodes.getptr(oid);
556
ERR_FAIL_NULL_V(tnode, ERR_INVALID_PARAMETER);
557
MAKE_ROOM(5);
558
uint8_t *ptr = packet_cache.ptrw();
559
ptr[0] = (uint8_t)SceneMultiplayer::NETWORK_COMMAND_DESPAWN;
560
int ofs = 1;
561
uint32_t nid = tnode->net_id;
562
ofs += encode_uint32(nid, &ptr[ofs]);
563
r_len = ofs;
564
return OK;
565
}
566
567
Error SceneReplicationInterface::on_spawn_receive(int p_from, const uint8_t *p_buffer, int p_buffer_len) {
568
ERR_FAIL_COND_V_MSG(p_buffer_len < 18, ERR_INVALID_DATA, "Invalid spawn packet received");
569
int ofs = 1; // The spawn/despawn command.
570
uint8_t scene_id = p_buffer[ofs];
571
ofs += 1;
572
uint32_t node_target = decode_uint32(&p_buffer[ofs]);
573
ofs += 4;
574
MultiplayerSpawner *spawner = Object::cast_to<MultiplayerSpawner>(multiplayer_cache->get_cached_object(p_from, node_target));
575
ERR_FAIL_NULL_V(spawner, ERR_DOES_NOT_EXIST);
576
ERR_FAIL_COND_V(p_from != spawner->get_multiplayer_authority(), ERR_UNAUTHORIZED);
577
578
uint32_t net_id = decode_uint32(&p_buffer[ofs]);
579
ofs += 4;
580
uint32_t sync_len = decode_uint32(&p_buffer[ofs]);
581
ofs += 4;
582
uint32_t name_len = decode_uint32(&p_buffer[ofs]);
583
ofs += 4;
584
ERR_FAIL_COND_V_MSG(name_len + (sync_len * 4) > uint32_t(p_buffer_len - ofs), ERR_INVALID_DATA, vformat("Invalid spawn packet size: %d, wants: %d", p_buffer_len, ofs + name_len + (sync_len * 4)));
585
List<uint32_t> sync_ids;
586
for (uint32_t i = 0; i < sync_len; i++) {
587
sync_ids.push_back(decode_uint32(&p_buffer[ofs]));
588
ofs += 4;
589
}
590
ERR_FAIL_COND_V_MSG(name_len < 1, ERR_INVALID_DATA, "Zero spawn name size.");
591
592
// We need to make sure no trickery happens here, but we want to allow autogenerated ("@") node names.
593
const String name = String::utf8((const char *)&p_buffer[ofs], name_len);
594
ERR_FAIL_COND_V_MSG(name.validate_node_name() != name, ERR_INVALID_DATA, vformat("Invalid node name received: '%s'. Make sure to add nodes via 'add_child(node, true)' remotely.", name));
595
ofs += name_len;
596
597
// Check that we can spawn.
598
Node *parent = spawner->get_node_or_null(spawner->get_spawn_path());
599
ERR_FAIL_NULL_V(parent, ERR_UNCONFIGURED);
600
ERR_FAIL_COND_V(parent->has_node(name), ERR_INVALID_DATA);
601
602
Node *node = nullptr;
603
if (scene_id == MultiplayerSpawner::INVALID_ID) {
604
// Custom spawn.
605
ERR_FAIL_COND_V(p_buffer_len - ofs < 4, ERR_INVALID_DATA);
606
uint32_t arg_size = decode_uint32(&p_buffer[ofs]);
607
ofs += 4;
608
ERR_FAIL_COND_V(arg_size > uint32_t(p_buffer_len - ofs), ERR_INVALID_DATA);
609
Variant v;
610
Error err = MultiplayerAPI::decode_and_decompress_variant(v, &p_buffer[ofs], arg_size, nullptr, false);
611
ERR_FAIL_COND_V(err != OK, err);
612
ofs += arg_size;
613
node = spawner->instantiate_custom(v);
614
} else {
615
// Scene based spawn.
616
node = spawner->instantiate_scene(scene_id);
617
}
618
ERR_FAIL_NULL_V(node, ERR_UNAUTHORIZED);
619
node->set_name(name);
620
621
// Add and track remote
622
ERR_FAIL_COND_V(!peers_info.has(p_from), ERR_UNAVAILABLE);
623
ERR_FAIL_COND_V(peers_info[p_from].recv_nodes.has(net_id), ERR_ALREADY_IN_USE);
624
ObjectID oid = node->get_instance_id();
625
TrackedNode &tobj = _track(oid);
626
tobj.spawner = spawner->get_instance_id();
627
tobj.net_id = net_id;
628
tobj.remote_peer = p_from;
629
peers_info[p_from].recv_nodes[net_id] = oid;
630
631
// The initial state will be applied during the sync config (i.e. before _ready).
632
pending_spawn = node->get_instance_id();
633
pending_spawn_remote = p_from;
634
pending_buffer_size = p_buffer_len - ofs;
635
pending_buffer = pending_buffer_size > 0 ? &p_buffer[ofs] : nullptr;
636
pending_sync_net_ids = sync_ids;
637
638
parent->add_child(node);
639
spawner->emit_signal(SNAME("spawned"), node);
640
641
pending_spawn = ObjectID();
642
pending_spawn_remote = 0;
643
pending_buffer = nullptr;
644
pending_buffer_size = 0;
645
if (pending_sync_net_ids.size()) {
646
pending_sync_net_ids.clear();
647
ERR_FAIL_V(ERR_INVALID_DATA); // Should have been consumed.
648
}
649
return OK;
650
}
651
652
Error SceneReplicationInterface::on_despawn_receive(int p_from, const uint8_t *p_buffer, int p_buffer_len) {
653
ERR_FAIL_COND_V_MSG(p_buffer_len < 5, ERR_INVALID_DATA, "Invalid spawn packet received");
654
int ofs = 1; // The spawn/despawn command.
655
uint32_t net_id = decode_uint32(&p_buffer[ofs]);
656
ofs += 4;
657
658
// Untrack remote
659
ERR_FAIL_COND_V(!peers_info.has(p_from), ERR_UNAUTHORIZED);
660
PeerInfo &pinfo = peers_info[p_from];
661
ERR_FAIL_COND_V(!pinfo.recv_nodes.has(net_id), ERR_UNAUTHORIZED);
662
Node *node = get_id_as<Node>(pinfo.recv_nodes[net_id]);
663
ERR_FAIL_NULL_V(node, ERR_BUG);
664
pinfo.recv_nodes.erase(net_id);
665
666
const ObjectID oid = node->get_instance_id();
667
ERR_FAIL_COND_V(!tracked_nodes.has(oid), ERR_BUG);
668
MultiplayerSpawner *spawner = get_id_as<MultiplayerSpawner>(tracked_nodes[oid].spawner);
669
ERR_FAIL_NULL_V(spawner, ERR_DOES_NOT_EXIST);
670
ERR_FAIL_COND_V(p_from != spawner->get_multiplayer_authority(), ERR_UNAUTHORIZED);
671
672
if (node->get_parent() != nullptr) {
673
node->get_parent()->remove_child(node);
674
}
675
node->queue_free();
676
spawner->emit_signal(SNAME("despawned"), node);
677
678
return OK;
679
}
680
681
bool SceneReplicationInterface::_verify_synchronizer(int p_peer, MultiplayerSynchronizer *p_sync, uint32_t &r_net_id) {
682
r_net_id = p_sync->get_net_id();
683
if (r_net_id == 0 || (r_net_id & 0x80000000)) {
684
int path_id = 0;
685
bool verified = multiplayer_cache->send_object_cache(p_sync, p_peer, path_id);
686
ERR_FAIL_COND_V_MSG(path_id < 0, false, "This should never happen!");
687
if (r_net_id == 0) {
688
// First time path based ID.
689
r_net_id = path_id | 0x80000000;
690
p_sync->set_net_id(r_net_id | 0x80000000);
691
}
692
return verified;
693
}
694
return true;
695
}
696
697
MultiplayerSynchronizer *SceneReplicationInterface::_find_synchronizer(int p_peer, uint32_t p_net_id) {
698
MultiplayerSynchronizer *sync = nullptr;
699
if (p_net_id & 0x80000000) {
700
sync = Object::cast_to<MultiplayerSynchronizer>(multiplayer_cache->get_cached_object(p_peer, p_net_id & 0x7FFFFFFF));
701
} else if (peers_info[p_peer].recv_sync_ids.has(p_net_id)) {
702
const ObjectID &sid = peers_info[p_peer].recv_sync_ids[p_net_id];
703
sync = get_id_as<MultiplayerSynchronizer>(sid);
704
}
705
return sync;
706
}
707
708
void SceneReplicationInterface::_send_delta(int p_peer, const HashSet<ObjectID> &p_synchronizers, uint64_t p_usec, const HashMap<ObjectID, uint64_t> &p_last_watch_usecs) {
709
MAKE_ROOM(/* header */ 1 + /* element */ 4 + 8 + 4 + delta_mtu);
710
uint8_t *ptr = packet_cache.ptrw();
711
ptr[0] = SceneMultiplayer::NETWORK_COMMAND_SYNC | (1 << SceneMultiplayer::CMD_FLAG_0_SHIFT);
712
int ofs = 1;
713
for (const ObjectID &oid : p_synchronizers) {
714
MultiplayerSynchronizer *sync = get_id_as<MultiplayerSynchronizer>(oid);
715
ERR_CONTINUE(!sync || !sync->get_replication_config_ptr() || !_has_authority(sync));
716
uint32_t net_id;
717
if (!_verify_synchronizer(p_peer, sync, net_id)) {
718
continue;
719
}
720
uint64_t last_usec = p_last_watch_usecs.has(oid) ? p_last_watch_usecs[oid] : 0;
721
uint64_t indexes;
722
List<Variant> delta = sync->get_delta_state(p_usec, last_usec, indexes);
723
724
if (!delta.size()) {
725
continue; // Nothing to update.
726
}
727
728
Vector<const Variant *> varp;
729
varp.resize(delta.size());
730
const Variant **vptr = varp.ptrw();
731
int i = 0;
732
for (const Variant &v : delta) {
733
vptr[i] = &v;
734
i++;
735
}
736
int size;
737
Error err = MultiplayerAPI::encode_and_compress_variants(vptr, varp.size(), nullptr, size);
738
ERR_CONTINUE_MSG(err != OK, "Unable to encode delta state.");
739
740
ERR_CONTINUE_MSG(size > delta_mtu, vformat("Synchronizer delta bigger than MTU will not be sent (%d > %d): %s", size, delta_mtu, sync->get_path()));
741
742
if (ofs + 4 + 8 + 4 + size > delta_mtu) {
743
// Send what we got, and reset write.
744
_send_raw(packet_cache.ptr(), ofs, p_peer, true);
745
ofs = 1;
746
}
747
if (size) {
748
ofs += encode_uint32(sync->get_net_id(), &ptr[ofs]);
749
ofs += encode_uint64(indexes, &ptr[ofs]);
750
ofs += encode_uint32(size, &ptr[ofs]);
751
MultiplayerAPI::encode_and_compress_variants(vptr, varp.size(), &ptr[ofs], size);
752
ofs += size;
753
}
754
#ifdef DEBUG_ENABLED
755
_profile_node_data("delta_out", oid, size);
756
#endif
757
peers_info[p_peer].last_watch_usecs[oid] = p_usec;
758
}
759
if (ofs > 1) {
760
// Got some left over to send.
761
_send_raw(packet_cache.ptr(), ofs, p_peer, true);
762
}
763
}
764
765
Error SceneReplicationInterface::on_delta_receive(int p_from, const uint8_t *p_buffer, int p_buffer_len) {
766
int ofs = 1;
767
while (ofs + 4 + 8 + 4 < p_buffer_len) {
768
uint32_t net_id = decode_uint32(&p_buffer[ofs]);
769
ofs += 4;
770
uint64_t indexes = decode_uint64(&p_buffer[ofs]);
771
ofs += 8;
772
uint32_t size = decode_uint32(&p_buffer[ofs]);
773
ofs += 4;
774
ERR_FAIL_COND_V(size > uint32_t(p_buffer_len - ofs), ERR_INVALID_DATA);
775
MultiplayerSynchronizer *sync = _find_synchronizer(p_from, net_id);
776
Node *node = sync ? sync->get_root_node() : nullptr;
777
if (!sync || sync->get_multiplayer_authority() != p_from || !node) {
778
ofs += size;
779
ERR_CONTINUE_MSG(true, "Ignoring delta for non-authority or invalid synchronizer.");
780
}
781
List<NodePath> props = sync->get_delta_properties(indexes);
782
ERR_FAIL_COND_V(props.is_empty(), ERR_INVALID_DATA);
783
Vector<Variant> vars;
784
vars.resize(props.size());
785
int consumed = 0;
786
Error err = MultiplayerAPI::decode_and_decompress_variants(vars, p_buffer + ofs, size, consumed);
787
ERR_FAIL_COND_V(err != OK, err);
788
ERR_FAIL_COND_V(uint32_t(consumed) != size, ERR_INVALID_DATA);
789
err = MultiplayerSynchronizer::set_state(props, node, vars);
790
ERR_FAIL_COND_V(err != OK, err);
791
ofs += size;
792
sync->emit_signal(SNAME("delta_synchronized"));
793
#ifdef DEBUG_ENABLED
794
_profile_node_data("delta_in", sync->get_instance_id(), size);
795
#endif
796
}
797
return OK;
798
}
799
800
void SceneReplicationInterface::_send_sync(int p_peer, const HashSet<ObjectID> &p_synchronizers, uint16_t p_sync_net_time, uint64_t p_usec) {
801
MAKE_ROOM(/* header */ 3 + /* element */ 4 + 4 + sync_mtu);
802
uint8_t *ptr = packet_cache.ptrw();
803
ptr[0] = SceneMultiplayer::NETWORK_COMMAND_SYNC;
804
int ofs = 1;
805
ofs += encode_uint16(p_sync_net_time, &ptr[1]);
806
// Can only send updates for already notified nodes.
807
// This is a lazy implementation, we could optimize much more here with by grouping by replication config.
808
for (const ObjectID &oid : p_synchronizers) {
809
MultiplayerSynchronizer *sync = get_id_as<MultiplayerSynchronizer>(oid);
810
ERR_CONTINUE(!sync || !sync->get_replication_config_ptr() || !_has_authority(sync));
811
if (!sync->update_outbound_sync_time(p_usec)) {
812
continue; // nothing to sync.
813
}
814
815
Node *node = sync->get_root_node();
816
ERR_CONTINUE(!node);
817
uint32_t net_id = sync->get_net_id();
818
if (!_verify_synchronizer(p_peer, sync, net_id)) {
819
// The path based sync is not yet confirmed, skipping.
820
continue;
821
}
822
int size;
823
Vector<Variant> vars;
824
Vector<const Variant *> varp;
825
const List<NodePath> props = sync->get_replication_config_ptr()->get_sync_properties();
826
Error err = MultiplayerSynchronizer::get_state(props, node, vars, varp);
827
ERR_CONTINUE_MSG(err != OK, "Unable to retrieve sync state.");
828
err = MultiplayerAPI::encode_and_compress_variants(varp.ptrw(), varp.size(), nullptr, size);
829
ERR_CONTINUE_MSG(err != OK, "Unable to encode sync state.");
830
// TODO Handle single state above MTU.
831
ERR_CONTINUE_MSG(size > sync_mtu, vformat("Node states bigger than MTU will not be sent (%d > %d): %s", size, sync_mtu, node->get_path()));
832
if (ofs + 4 + 4 + size > sync_mtu) {
833
// Send what we got, and reset write.
834
_send_raw(packet_cache.ptr(), ofs, p_peer, false);
835
ofs = 3;
836
}
837
if (size) {
838
ofs += encode_uint32(sync->get_net_id(), &ptr[ofs]);
839
ofs += encode_uint32(size, &ptr[ofs]);
840
MultiplayerAPI::encode_and_compress_variants(varp.ptrw(), varp.size(), &ptr[ofs], size);
841
ofs += size;
842
}
843
#ifdef DEBUG_ENABLED
844
_profile_node_data("sync_out", oid, size);
845
#endif
846
}
847
if (ofs > 3) {
848
// Got some left over to send.
849
_send_raw(packet_cache.ptr(), ofs, p_peer, false);
850
}
851
}
852
853
Error SceneReplicationInterface::on_sync_receive(int p_from, const uint8_t *p_buffer, int p_buffer_len) {
854
ERR_FAIL_COND_V_MSG(p_buffer_len < 11, ERR_INVALID_DATA, "Invalid sync packet received");
855
bool is_delta = (p_buffer[0] & (1 << SceneMultiplayer::CMD_FLAG_0_SHIFT)) != 0;
856
if (is_delta) {
857
return on_delta_receive(p_from, p_buffer, p_buffer_len);
858
}
859
uint16_t time = decode_uint16(&p_buffer[1]);
860
int ofs = 3;
861
while (ofs + 8 < p_buffer_len) {
862
uint32_t net_id = decode_uint32(&p_buffer[ofs]);
863
ofs += 4;
864
uint32_t size = decode_uint32(&p_buffer[ofs]);
865
ofs += 4;
866
ERR_FAIL_COND_V(size > uint32_t(p_buffer_len - ofs), ERR_INVALID_DATA);
867
MultiplayerSynchronizer *sync = _find_synchronizer(p_from, net_id);
868
if (!sync) {
869
// Not received yet.
870
ofs += size;
871
continue;
872
}
873
Node *node = sync->get_root_node();
874
if (sync->get_multiplayer_authority() != p_from || !node) {
875
// Not valid for me.
876
ofs += size;
877
ERR_CONTINUE_MSG(true, "Ignoring sync data from non-authority or for missing node.");
878
}
879
if (!sync->update_inbound_sync_time(time)) {
880
// State is too old.
881
ofs += size;
882
continue;
883
}
884
const List<NodePath> props = sync->get_replication_config_ptr()->get_sync_properties();
885
Vector<Variant> vars;
886
vars.resize(props.size());
887
int consumed;
888
Error err = MultiplayerAPI::decode_and_decompress_variants(vars, &p_buffer[ofs], size, consumed);
889
ERR_FAIL_COND_V(err, err);
890
err = MultiplayerSynchronizer::set_state(props, node, vars);
891
ERR_FAIL_COND_V(err, err);
892
ofs += size;
893
sync->emit_signal(SNAME("synchronized"));
894
#ifdef DEBUG_ENABLED
895
_profile_node_data("sync_in", sync->get_instance_id(), size);
896
#endif
897
}
898
return OK;
899
}
900
901
void SceneReplicationInterface::set_max_sync_packet_size(int p_size) {
902
ERR_FAIL_COND_MSG(p_size < 128, "Sync maximum packet size must be at least 128 bytes.");
903
sync_mtu = p_size;
904
}
905
906
int SceneReplicationInterface::get_max_sync_packet_size() const {
907
return sync_mtu;
908
}
909
910
void SceneReplicationInterface::set_max_delta_packet_size(int p_size) {
911
ERR_FAIL_COND_MSG(p_size < 128, "Sync maximum packet size must be at least 128 bytes.");
912
delta_mtu = p_size;
913
}
914
915
int SceneReplicationInterface::get_max_delta_packet_size() const {
916
return delta_mtu;
917
}
918
919