Book a Demo!
CoCalc Logo Icon
StoreFeaturesDocsShareSupportNewsAboutPoliciesSign UpSign In
bevyengine
GitHub Repository: bevyengine/bevy
Path: blob/main/crates/bevy_ecs/src/message/mut_iterators.rs
6849 views
1
#[cfg(feature = "multi_threaded")]
2
use crate::batching::BatchingStrategy;
3
use crate::message::{Message, MessageCursor, MessageId, MessageInstance, Messages};
4
use core::{iter::Chain, slice::IterMut};
5
6
/// An iterator that yields any unread messages from an [`MessageMutator`] or [`MessageCursor`].
7
///
8
/// [`MessageMutator`]: super::MessageMutator
9
#[derive(Debug)]
10
pub struct MessageMutIterator<'a, E: Message> {
11
iter: MessageMutIteratorWithId<'a, E>,
12
}
13
14
impl<'a, E: Message> Iterator for MessageMutIterator<'a, E> {
15
type Item = &'a mut E;
16
fn next(&mut self) -> Option<Self::Item> {
17
self.iter.next().map(|(message, _)| message)
18
}
19
20
fn size_hint(&self) -> (usize, Option<usize>) {
21
self.iter.size_hint()
22
}
23
24
fn count(self) -> usize {
25
self.iter.count()
26
}
27
28
fn last(self) -> Option<Self::Item>
29
where
30
Self: Sized,
31
{
32
self.iter.last().map(|(message, _)| message)
33
}
34
35
fn nth(&mut self, n: usize) -> Option<Self::Item> {
36
self.iter.nth(n).map(|(message, _)| message)
37
}
38
}
39
40
impl<'a, E: Message> ExactSizeIterator for MessageMutIterator<'a, E> {
41
fn len(&self) -> usize {
42
self.iter.len()
43
}
44
}
45
46
/// An iterator that yields any unread messages (and their IDs) from an [`MessageMutator`] or [`MessageCursor`].
47
///
48
/// [`MessageMutator`]: super::MessageMutator
49
#[derive(Debug)]
50
pub struct MessageMutIteratorWithId<'a, E: Message> {
51
mutator: &'a mut MessageCursor<E>,
52
chain: Chain<IterMut<'a, MessageInstance<E>>, IterMut<'a, MessageInstance<E>>>,
53
unread: usize,
54
}
55
56
impl<'a, E: Message> MessageMutIteratorWithId<'a, E> {
57
/// Creates a new iterator that yields any `messages` that have not yet been seen by `mutator`.
58
pub fn new(mutator: &'a mut MessageCursor<E>, messages: &'a mut Messages<E>) -> Self {
59
let a_index = mutator
60
.last_message_count
61
.saturating_sub(messages.messages_a.start_message_count);
62
let b_index = mutator
63
.last_message_count
64
.saturating_sub(messages.messages_b.start_message_count);
65
let a = messages.messages_a.get_mut(a_index..).unwrap_or_default();
66
let b = messages.messages_b.get_mut(b_index..).unwrap_or_default();
67
68
let unread_count = a.len() + b.len();
69
70
mutator.last_message_count = messages.message_count - unread_count;
71
// Iterate the oldest first, then the newer messages
72
let chain = a.iter_mut().chain(b.iter_mut());
73
74
Self {
75
mutator,
76
chain,
77
unread: unread_count,
78
}
79
}
80
81
/// Iterate over only the messages.
82
pub fn without_id(self) -> MessageMutIterator<'a, E> {
83
MessageMutIterator { iter: self }
84
}
85
}
86
87
impl<'a, E: Message> Iterator for MessageMutIteratorWithId<'a, E> {
88
type Item = (&'a mut E, MessageId<E>);
89
fn next(&mut self) -> Option<Self::Item> {
90
match self
91
.chain
92
.next()
93
.map(|instance| (&mut instance.message, instance.message_id))
94
{
95
Some(item) => {
96
#[cfg(feature = "detailed_trace")]
97
tracing::trace!("MessageMutator::iter() -> {}", item.1);
98
self.mutator.last_message_count += 1;
99
self.unread -= 1;
100
Some(item)
101
}
102
None => None,
103
}
104
}
105
106
fn size_hint(&self) -> (usize, Option<usize>) {
107
self.chain.size_hint()
108
}
109
110
fn count(self) -> usize {
111
self.mutator.last_message_count += self.unread;
112
self.unread
113
}
114
115
fn last(self) -> Option<Self::Item>
116
where
117
Self: Sized,
118
{
119
let MessageInstance {
120
message_id,
121
message,
122
} = self.chain.last()?;
123
self.mutator.last_message_count += self.unread;
124
Some((message, *message_id))
125
}
126
127
fn nth(&mut self, n: usize) -> Option<Self::Item> {
128
if let Some(MessageInstance {
129
message_id,
130
message,
131
}) = self.chain.nth(n)
132
{
133
self.mutator.last_message_count += n + 1;
134
self.unread -= n + 1;
135
Some((message, *message_id))
136
} else {
137
self.mutator.last_message_count += self.unread;
138
self.unread = 0;
139
None
140
}
141
}
142
}
143
144
impl<'a, E: Message> ExactSizeIterator for MessageMutIteratorWithId<'a, E> {
145
fn len(&self) -> usize {
146
self.unread
147
}
148
}
149
150
/// A parallel iterator over `Message`s.
151
#[derive(Debug)]
152
#[cfg(feature = "multi_threaded")]
153
pub struct MessageMutParIter<'a, E: Message> {
154
mutator: &'a mut MessageCursor<E>,
155
slices: [&'a mut [MessageInstance<E>]; 2],
156
batching_strategy: BatchingStrategy,
157
#[cfg(not(target_arch = "wasm32"))]
158
unread: usize,
159
}
160
161
#[cfg(feature = "multi_threaded")]
162
impl<'a, E: Message> MessageMutParIter<'a, E> {
163
/// Creates a new parallel iterator over `messages` that have not yet been seen by `mutator`.
164
pub fn new(mutator: &'a mut MessageCursor<E>, messages: &'a mut Messages<E>) -> Self {
165
let a_index = mutator
166
.last_message_count
167
.saturating_sub(messages.messages_a.start_message_count);
168
let b_index = mutator
169
.last_message_count
170
.saturating_sub(messages.messages_b.start_message_count);
171
let a = messages.messages_a.get_mut(a_index..).unwrap_or_default();
172
let b = messages.messages_b.get_mut(b_index..).unwrap_or_default();
173
174
let unread_count = a.len() + b.len();
175
mutator.last_message_count = messages.message_count - unread_count;
176
177
Self {
178
mutator,
179
slices: [a, b],
180
batching_strategy: BatchingStrategy::default(),
181
#[cfg(not(target_arch = "wasm32"))]
182
unread: unread_count,
183
}
184
}
185
186
/// Changes the batching strategy used when iterating.
187
///
188
/// For more information on how this affects the resultant iteration, see
189
/// [`BatchingStrategy`].
190
pub fn batching_strategy(mut self, strategy: BatchingStrategy) -> Self {
191
self.batching_strategy = strategy;
192
self
193
}
194
195
/// Runs the provided closure for each unread message in parallel.
196
///
197
/// Unlike normal iteration, the message order is not guaranteed in any form.
198
///
199
/// # Panics
200
/// If the [`ComputeTaskPool`] is not initialized. If using this from a message reader that is being
201
/// initialized and run from the ECS scheduler, this should never panic.
202
///
203
/// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
204
pub fn for_each<FN: Fn(&'a mut E) + Send + Sync + Clone>(self, func: FN) {
205
self.for_each_with_id(move |e, _| func(e));
206
}
207
208
/// Runs the provided closure for each unread message in parallel, like [`for_each`](Self::for_each),
209
/// but additionally provides the `MessageId` to the closure.
210
///
211
/// Note that the order of iteration is not guaranteed, but `MessageId`s are ordered by send order.
212
///
213
/// # Panics
214
/// If the [`ComputeTaskPool`] is not initialized. If using this from a message reader that is being
215
/// initialized and run from the ECS scheduler, this should never panic.
216
///
217
/// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool
218
#[cfg_attr(
219
target_arch = "wasm32",
220
expect(unused_mut, reason = "not mutated on this target")
221
)]
222
pub fn for_each_with_id<FN: Fn(&'a mut E, MessageId<E>) + Send + Sync + Clone>(
223
mut self,
224
func: FN,
225
) {
226
#[cfg(target_arch = "wasm32")]
227
{
228
self.into_iter().for_each(|(e, i)| func(e, i));
229
}
230
231
#[cfg(not(target_arch = "wasm32"))]
232
{
233
let pool = bevy_tasks::ComputeTaskPool::get();
234
let thread_count = pool.thread_num();
235
if thread_count <= 1 {
236
return self.into_iter().for_each(|(e, i)| func(e, i));
237
}
238
239
let batch_size = self
240
.batching_strategy
241
.calc_batch_size(|| self.len(), thread_count);
242
let chunks = self.slices.map(|s| s.chunks_mut(batch_size));
243
244
pool.scope(|scope| {
245
for batch in chunks.into_iter().flatten() {
246
let func = func.clone();
247
scope.spawn(async move {
248
for message_instance in batch {
249
func(&mut message_instance.message, message_instance.message_id);
250
}
251
});
252
}
253
});
254
255
// Messages are guaranteed to be read at this point.
256
self.mutator.last_message_count += self.unread;
257
self.unread = 0;
258
}
259
}
260
261
/// Returns the number of [`Message`]s to be iterated.
262
pub fn len(&self) -> usize {
263
self.slices.iter().map(|s| s.len()).sum()
264
}
265
266
/// Returns [`true`] if there are no messages remaining in this iterator.
267
pub fn is_empty(&self) -> bool {
268
self.slices.iter().all(|x| x.is_empty())
269
}
270
}
271
272
#[cfg(feature = "multi_threaded")]
273
impl<'a, E: Message> IntoIterator for MessageMutParIter<'a, E> {
274
type IntoIter = MessageMutIteratorWithId<'a, E>;
275
type Item = <Self::IntoIter as Iterator>::Item;
276
277
fn into_iter(self) -> Self::IntoIter {
278
let MessageMutParIter {
279
mutator: reader,
280
slices: [a, b],
281
..
282
} = self;
283
let unread = a.len() + b.len();
284
let chain = a.iter_mut().chain(b);
285
MessageMutIteratorWithId {
286
mutator: reader,
287
chain,
288
unread,
289
}
290
}
291
}
292
293