Path: blob/main/crates/bevy_ecs/src/message/mut_iterators.rs
6849 views
#[cfg(feature = "multi_threaded")]1use crate::batching::BatchingStrategy;2use crate::message::{Message, MessageCursor, MessageId, MessageInstance, Messages};3use core::{iter::Chain, slice::IterMut};45/// An iterator that yields any unread messages from an [`MessageMutator`] or [`MessageCursor`].6///7/// [`MessageMutator`]: super::MessageMutator8#[derive(Debug)]9pub struct MessageMutIterator<'a, E: Message> {10iter: MessageMutIteratorWithId<'a, E>,11}1213impl<'a, E: Message> Iterator for MessageMutIterator<'a, E> {14type Item = &'a mut E;15fn next(&mut self) -> Option<Self::Item> {16self.iter.next().map(|(message, _)| message)17}1819fn size_hint(&self) -> (usize, Option<usize>) {20self.iter.size_hint()21}2223fn count(self) -> usize {24self.iter.count()25}2627fn last(self) -> Option<Self::Item>28where29Self: Sized,30{31self.iter.last().map(|(message, _)| message)32}3334fn nth(&mut self, n: usize) -> Option<Self::Item> {35self.iter.nth(n).map(|(message, _)| message)36}37}3839impl<'a, E: Message> ExactSizeIterator for MessageMutIterator<'a, E> {40fn len(&self) -> usize {41self.iter.len()42}43}4445/// An iterator that yields any unread messages (and their IDs) from an [`MessageMutator`] or [`MessageCursor`].46///47/// [`MessageMutator`]: super::MessageMutator48#[derive(Debug)]49pub struct MessageMutIteratorWithId<'a, E: Message> {50mutator: &'a mut MessageCursor<E>,51chain: Chain<IterMut<'a, MessageInstance<E>>, IterMut<'a, MessageInstance<E>>>,52unread: usize,53}5455impl<'a, E: Message> MessageMutIteratorWithId<'a, E> {56/// Creates a new iterator that yields any `messages` that have not yet been seen by `mutator`.57pub fn new(mutator: &'a mut MessageCursor<E>, messages: &'a mut Messages<E>) -> Self {58let a_index = mutator59.last_message_count60.saturating_sub(messages.messages_a.start_message_count);61let b_index = mutator62.last_message_count63.saturating_sub(messages.messages_b.start_message_count);64let a = messages.messages_a.get_mut(a_index..).unwrap_or_default();65let b = messages.messages_b.get_mut(b_index..).unwrap_or_default();6667let unread_count = a.len() + b.len();6869mutator.last_message_count = messages.message_count - unread_count;70// Iterate the oldest first, then the newer messages71let chain = a.iter_mut().chain(b.iter_mut());7273Self {74mutator,75chain,76unread: unread_count,77}78}7980/// Iterate over only the messages.81pub fn without_id(self) -> MessageMutIterator<'a, E> {82MessageMutIterator { iter: self }83}84}8586impl<'a, E: Message> Iterator for MessageMutIteratorWithId<'a, E> {87type Item = (&'a mut E, MessageId<E>);88fn next(&mut self) -> Option<Self::Item> {89match self90.chain91.next()92.map(|instance| (&mut instance.message, instance.message_id))93{94Some(item) => {95#[cfg(feature = "detailed_trace")]96tracing::trace!("MessageMutator::iter() -> {}", item.1);97self.mutator.last_message_count += 1;98self.unread -= 1;99Some(item)100}101None => None,102}103}104105fn size_hint(&self) -> (usize, Option<usize>) {106self.chain.size_hint()107}108109fn count(self) -> usize {110self.mutator.last_message_count += self.unread;111self.unread112}113114fn last(self) -> Option<Self::Item>115where116Self: Sized,117{118let MessageInstance {119message_id,120message,121} = self.chain.last()?;122self.mutator.last_message_count += self.unread;123Some((message, *message_id))124}125126fn nth(&mut self, n: usize) -> Option<Self::Item> {127if let Some(MessageInstance {128message_id,129message,130}) = self.chain.nth(n)131{132self.mutator.last_message_count += n + 1;133self.unread -= n + 1;134Some((message, *message_id))135} else {136self.mutator.last_message_count += self.unread;137self.unread = 0;138None139}140}141}142143impl<'a, E: Message> ExactSizeIterator for MessageMutIteratorWithId<'a, E> {144fn len(&self) -> usize {145self.unread146}147}148149/// A parallel iterator over `Message`s.150#[derive(Debug)]151#[cfg(feature = "multi_threaded")]152pub struct MessageMutParIter<'a, E: Message> {153mutator: &'a mut MessageCursor<E>,154slices: [&'a mut [MessageInstance<E>]; 2],155batching_strategy: BatchingStrategy,156#[cfg(not(target_arch = "wasm32"))]157unread: usize,158}159160#[cfg(feature = "multi_threaded")]161impl<'a, E: Message> MessageMutParIter<'a, E> {162/// Creates a new parallel iterator over `messages` that have not yet been seen by `mutator`.163pub fn new(mutator: &'a mut MessageCursor<E>, messages: &'a mut Messages<E>) -> Self {164let a_index = mutator165.last_message_count166.saturating_sub(messages.messages_a.start_message_count);167let b_index = mutator168.last_message_count169.saturating_sub(messages.messages_b.start_message_count);170let a = messages.messages_a.get_mut(a_index..).unwrap_or_default();171let b = messages.messages_b.get_mut(b_index..).unwrap_or_default();172173let unread_count = a.len() + b.len();174mutator.last_message_count = messages.message_count - unread_count;175176Self {177mutator,178slices: [a, b],179batching_strategy: BatchingStrategy::default(),180#[cfg(not(target_arch = "wasm32"))]181unread: unread_count,182}183}184185/// Changes the batching strategy used when iterating.186///187/// For more information on how this affects the resultant iteration, see188/// [`BatchingStrategy`].189pub fn batching_strategy(mut self, strategy: BatchingStrategy) -> Self {190self.batching_strategy = strategy;191self192}193194/// Runs the provided closure for each unread message in parallel.195///196/// Unlike normal iteration, the message order is not guaranteed in any form.197///198/// # Panics199/// If the [`ComputeTaskPool`] is not initialized. If using this from a message reader that is being200/// initialized and run from the ECS scheduler, this should never panic.201///202/// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool203pub fn for_each<FN: Fn(&'a mut E) + Send + Sync + Clone>(self, func: FN) {204self.for_each_with_id(move |e, _| func(e));205}206207/// Runs the provided closure for each unread message in parallel, like [`for_each`](Self::for_each),208/// but additionally provides the `MessageId` to the closure.209///210/// Note that the order of iteration is not guaranteed, but `MessageId`s are ordered by send order.211///212/// # Panics213/// If the [`ComputeTaskPool`] is not initialized. If using this from a message reader that is being214/// initialized and run from the ECS scheduler, this should never panic.215///216/// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool217#[cfg_attr(218target_arch = "wasm32",219expect(unused_mut, reason = "not mutated on this target")220)]221pub fn for_each_with_id<FN: Fn(&'a mut E, MessageId<E>) + Send + Sync + Clone>(222mut self,223func: FN,224) {225#[cfg(target_arch = "wasm32")]226{227self.into_iter().for_each(|(e, i)| func(e, i));228}229230#[cfg(not(target_arch = "wasm32"))]231{232let pool = bevy_tasks::ComputeTaskPool::get();233let thread_count = pool.thread_num();234if thread_count <= 1 {235return self.into_iter().for_each(|(e, i)| func(e, i));236}237238let batch_size = self239.batching_strategy240.calc_batch_size(|| self.len(), thread_count);241let chunks = self.slices.map(|s| s.chunks_mut(batch_size));242243pool.scope(|scope| {244for batch in chunks.into_iter().flatten() {245let func = func.clone();246scope.spawn(async move {247for message_instance in batch {248func(&mut message_instance.message, message_instance.message_id);249}250});251}252});253254// Messages are guaranteed to be read at this point.255self.mutator.last_message_count += self.unread;256self.unread = 0;257}258}259260/// Returns the number of [`Message`]s to be iterated.261pub fn len(&self) -> usize {262self.slices.iter().map(|s| s.len()).sum()263}264265/// Returns [`true`] if there are no messages remaining in this iterator.266pub fn is_empty(&self) -> bool {267self.slices.iter().all(|x| x.is_empty())268}269}270271#[cfg(feature = "multi_threaded")]272impl<'a, E: Message> IntoIterator for MessageMutParIter<'a, E> {273type IntoIter = MessageMutIteratorWithId<'a, E>;274type Item = <Self::IntoIter as Iterator>::Item;275276fn into_iter(self) -> Self::IntoIter {277let MessageMutParIter {278mutator: reader,279slices: [a, b],280..281} = self;282let unread = a.len() + b.len();283let chain = a.iter_mut().chain(b);284MessageMutIteratorWithId {285mutator: reader,286chain,287unread,288}289}290}291292293