Path: blob/main/crates/bevy_ecs/src/message/iterators.rs
6849 views
#[cfg(feature = "multi_threaded")]1use crate::batching::BatchingStrategy;2use crate::message::{Message, MessageCursor, MessageId, MessageInstance, Messages};3use core::{iter::Chain, slice::Iter};45/// An iterator that yields any unread messages from a [`MessageReader`](super::MessageReader) or [`MessageCursor`].6#[derive(Debug)]7pub struct MessageIterator<'a, M: Message> {8iter: MessageIteratorWithId<'a, M>,9}1011impl<'a, M: Message> Iterator for MessageIterator<'a, M> {12type Item = &'a M;13fn next(&mut self) -> Option<Self::Item> {14self.iter.next().map(|(message, _)| message)15}1617fn size_hint(&self) -> (usize, Option<usize>) {18self.iter.size_hint()19}2021fn count(self) -> usize {22self.iter.count()23}2425fn last(self) -> Option<Self::Item>26where27Self: Sized,28{29self.iter.last().map(|(message, _)| message)30}3132fn nth(&mut self, n: usize) -> Option<Self::Item> {33self.iter.nth(n).map(|(message, _)| message)34}35}3637impl<'a, M: Message> ExactSizeIterator for MessageIterator<'a, M> {38fn len(&self) -> usize {39self.iter.len()40}41}4243/// An iterator that yields any unread messages (and their IDs) from a [`MessageReader`](super::MessageReader) or [`MessageCursor`].44#[derive(Debug)]45pub struct MessageIteratorWithId<'a, M: Message> {46reader: &'a mut MessageCursor<M>,47chain: Chain<Iter<'a, MessageInstance<M>>, Iter<'a, MessageInstance<M>>>,48unread: usize,49}5051impl<'a, M: Message> MessageIteratorWithId<'a, M> {52/// Creates a new iterator that yields any `messages` that have not yet been seen by `reader`.53pub fn new(reader: &'a mut MessageCursor<M>, messages: &'a Messages<M>) -> Self {54let a_index = reader55.last_message_count56.saturating_sub(messages.messages_a.start_message_count);57let b_index = reader58.last_message_count59.saturating_sub(messages.messages_b.start_message_count);60let a = messages.messages_a.get(a_index..).unwrap_or_default();61let b = messages.messages_b.get(b_index..).unwrap_or_default();6263let unread_count = a.len() + b.len();64// Ensure `len` is implemented correctly65debug_assert_eq!(unread_count, reader.len(messages));66reader.last_message_count = messages.message_count - unread_count;67// Iterate the oldest first, then the newer messages68let chain = a.iter().chain(b.iter());6970Self {71reader,72chain,73unread: unread_count,74}75}7677/// Iterate over only the messages.78pub fn without_id(self) -> MessageIterator<'a, M> {79MessageIterator { iter: self }80}81}8283impl<'a, M: Message> Iterator for MessageIteratorWithId<'a, M> {84type Item = (&'a M, MessageId<M>);85fn next(&mut self) -> Option<Self::Item> {86match self87.chain88.next()89.map(|instance| (&instance.message, instance.message_id))90{91Some(item) => {92#[cfg(feature = "detailed_trace")]93tracing::trace!("MessageReader::iter() -> {}", item.1);94self.reader.last_message_count += 1;95self.unread -= 1;96Some(item)97}98None => None,99}100}101102fn size_hint(&self) -> (usize, Option<usize>) {103self.chain.size_hint()104}105106fn count(self) -> usize {107self.reader.last_message_count += self.unread;108self.unread109}110111fn last(self) -> Option<Self::Item>112where113Self: Sized,114{115let MessageInstance {116message_id,117message,118} = self.chain.last()?;119self.reader.last_message_count += self.unread;120Some((message, *message_id))121}122123fn nth(&mut self, n: usize) -> Option<Self::Item> {124if let Some(MessageInstance {125message_id,126message,127}) = self.chain.nth(n)128{129self.reader.last_message_count += n + 1;130self.unread -= n + 1;131Some((message, *message_id))132} else {133self.reader.last_message_count += self.unread;134self.unread = 0;135None136}137}138}139140impl<'a, M: Message> ExactSizeIterator for MessageIteratorWithId<'a, M> {141fn len(&self) -> usize {142self.unread143}144}145146/// A parallel iterator over `Message`s.147#[cfg(feature = "multi_threaded")]148#[derive(Debug)]149pub struct MessageParIter<'a, M: Message> {150reader: &'a mut MessageCursor<M>,151slices: [&'a [MessageInstance<M>]; 2],152batching_strategy: BatchingStrategy,153#[cfg(not(target_arch = "wasm32"))]154unread: usize,155}156157#[cfg(feature = "multi_threaded")]158impl<'a, M: Message> MessageParIter<'a, M> {159/// Creates a new parallel iterator over `messages` that have not yet been seen by `reader`.160pub fn new(reader: &'a mut MessageCursor<M>, messages: &'a Messages<M>) -> Self {161let a_index = reader162.last_message_count163.saturating_sub(messages.messages_a.start_message_count);164let b_index = reader165.last_message_count166.saturating_sub(messages.messages_b.start_message_count);167let a = messages.messages_a.get(a_index..).unwrap_or_default();168let b = messages.messages_b.get(b_index..).unwrap_or_default();169170let unread_count = a.len() + b.len();171// Ensure `len` is implemented correctly172debug_assert_eq!(unread_count, reader.len(messages));173reader.last_message_count = messages.message_count - unread_count;174175Self {176reader,177slices: [a, b],178batching_strategy: BatchingStrategy::default(),179#[cfg(not(target_arch = "wasm32"))]180unread: unread_count,181}182}183184/// Changes the batching strategy used when iterating.185///186/// For more information on how this affects the resultant iteration, see187/// [`BatchingStrategy`].188pub fn batching_strategy(mut self, strategy: BatchingStrategy) -> Self {189self.batching_strategy = strategy;190self191}192193/// Runs the provided closure for each unread message in parallel.194///195/// Unlike normal iteration, the message order is not guaranteed in any form.196///197/// # Panics198/// If the [`ComputeTaskPool`] is not initialized. If using this from a message reader that is being199/// initialized and run from the ECS scheduler, this should never panic.200///201/// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool202pub fn for_each<FN: Fn(&'a M) + Send + Sync + Clone>(self, func: FN) {203self.for_each_with_id(move |e, _| func(e));204}205206/// Runs the provided closure for each unread message in parallel, like [`for_each`](Self::for_each),207/// but additionally provides the [`MessageId`] to the closure.208///209/// Note that the order of iteration is not guaranteed, but [`MessageId`]s are ordered by send order.210///211/// # Panics212/// If the [`ComputeTaskPool`] is not initialized. If using this from a message reader that is being213/// initialized and run from the ECS scheduler, this should never panic.214///215/// [`ComputeTaskPool`]: bevy_tasks::ComputeTaskPool216#[cfg_attr(217target_arch = "wasm32",218expect(unused_mut, reason = "not mutated on this target")219)]220pub fn for_each_with_id<FN: Fn(&'a M, MessageId<M>) + Send + Sync + Clone>(mut self, func: FN) {221#[cfg(target_arch = "wasm32")]222{223self.into_iter().for_each(|(e, i)| func(e, i));224}225226#[cfg(not(target_arch = "wasm32"))]227{228let pool = bevy_tasks::ComputeTaskPool::get();229let thread_count = pool.thread_num();230if thread_count <= 1 {231return self.into_iter().for_each(|(e, i)| func(e, i));232}233234let batch_size = self235.batching_strategy236.calc_batch_size(|| self.len(), thread_count);237let chunks = self.slices.map(|s| s.chunks_exact(batch_size));238let remainders = chunks.each_ref().map(core::slice::ChunksExact::remainder);239240pool.scope(|scope| {241for batch in chunks.into_iter().flatten().chain(remainders) {242let func = func.clone();243scope.spawn(async move {244for message_instance in batch {245func(&message_instance.message, message_instance.message_id);246}247});248}249});250251// Messages are guaranteed to be read at this point.252self.reader.last_message_count += self.unread;253self.unread = 0;254}255}256257/// Returns the number of [`Message`]s to be iterated.258pub fn len(&self) -> usize {259self.slices.iter().map(|s| s.len()).sum()260}261262/// Returns [`true`] if there are no messages remaining in this iterator.263pub fn is_empty(&self) -> bool {264self.slices.iter().all(|x| x.is_empty())265}266}267268#[cfg(feature = "multi_threaded")]269impl<'a, M: Message> IntoIterator for MessageParIter<'a, M> {270type IntoIter = MessageIteratorWithId<'a, M>;271type Item = <Self::IntoIter as Iterator>::Item;272273fn into_iter(self) -> Self::IntoIter {274let MessageParIter {275reader,276slices: [a, b],277..278} = self;279let unread = a.len() + b.len();280let chain = a.iter().chain(b);281MessageIteratorWithId {282reader,283chain,284unread,285}286}287}288289290