Path: blob/main/crates/bevy_ecs/src/message/messages.rs
6849 views
use crate::{1change_detection::MaybeLocation,2message::{Message, MessageCursor, MessageId, MessageInstance},3resource::Resource,4};5use alloc::vec::Vec;6use core::{7marker::PhantomData,8ops::{Deref, DerefMut},9};10#[cfg(feature = "bevy_reflect")]11use {12crate::reflect::ReflectResource,13bevy_reflect::{std_traits::ReflectDefault, Reflect},14};1516/// A message collection that represents the messages that occurred within the last two17/// [`Messages::update`] calls.18/// Messages can be written to using a [`MessageWriter`]19/// and are typically cheaply read using a [`MessageReader`].20///21/// Each message can be consumed by multiple systems, in parallel,22/// with consumption tracked by the [`MessageReader`] on a per-system basis.23///24/// If no [ordering](https://github.com/bevyengine/bevy/blob/main/examples/ecs/ecs_guide.rs)25/// is applied between writing and reading systems, there is a risk of a race condition.26/// This means that whether the messages arrive before or after the next [`Messages::update`] is unpredictable.27///28/// This collection is meant to be paired with a system that calls29/// [`Messages::update`] exactly once per update/frame.30///31/// [`message_update_system`] is a system that does this, typically initialized automatically using32/// [`add_message`](https://docs.rs/bevy/*/bevy/app/struct.App.html#method.add_message).33/// [`MessageReader`]s are expected to read messages from this collection at least once per loop/frame.34/// Messages will persist across a single frame boundary and so ordering of message producers and35/// consumers is not critical (although poorly-planned ordering may cause accumulating lag).36/// If messages are not handled by the end of the frame after they are updated, they will be37/// dropped silently.38///39/// # Example40///41/// ```42/// use bevy_ecs::message::{Message, Messages};43///44/// #[derive(Message)]45/// struct MyMessage {46/// value: usize47/// }48///49/// // setup50/// let mut messages = Messages::<MyMessage>::default();51/// let mut cursor = messages.get_cursor();52///53/// // run this once per update/frame54/// messages.update();55///56/// // somewhere else: write a message57/// messages.write(MyMessage { value: 1 });58///59/// // somewhere else: read the messages60/// for message in cursor.read(&messages) {61/// assert_eq!(message.value, 1)62/// }63///64/// // messages are only processed once per reader65/// assert_eq!(cursor.read(&messages).count(), 0);66/// ```67///68/// # Details69///70/// [`Messages`] is implemented using a variation of a double buffer strategy.71/// Each call to [`update`](Messages::update) swaps buffers and clears out the oldest one.72/// - [`MessageReader`]s will read messages from both buffers.73/// - [`MessageReader`]s that read at least once per update will never drop messages.74/// - [`MessageReader`]s that read once within two updates might still receive some messages75/// - [`MessageReader`]s that read after two updates are guaranteed to drop all messages that occurred76/// before those updates.77///78/// The buffers in [`Messages`] will grow indefinitely if [`update`](Messages::update) is never called.79///80/// An alternative call pattern would be to call [`update`](Messages::update)81/// manually across frames to control when messages are cleared.82/// This complicates consumption and risks ever-expanding memory usage if not cleaned up,83/// but can be done by adding your message as a resource instead of using84/// [`add_message`](https://docs.rs/bevy/*/bevy/app/struct.App.html#method.add_message).85///86/// [Example usage.](https://github.com/bevyengine/bevy/blob/latest/examples/ecs/message.rs)87/// [Example usage standalone.](https://github.com/bevyengine/bevy/blob/latest/crates/bevy_ecs/examples/messages.rs)88///89/// [`MessageReader`]: super::MessageReader90/// [`MessageWriter`]: super::MessageWriter91/// [`message_update_system`]: super::message_update_system92#[derive(Debug, Resource)]93#[cfg_attr(feature = "bevy_reflect", derive(Reflect), reflect(Resource, Default))]94pub struct Messages<E: Message> {95/// Holds the oldest still active messages.96/// Note that `a.start_message_count + a.len()` should always be equal to `messages_b.start_message_count`.97pub(crate) messages_a: MessageSequence<E>,98/// Holds the newer messages.99pub(crate) messages_b: MessageSequence<E>,100pub(crate) message_count: usize,101}102103// Derived Default impl would incorrectly require E: Default104impl<E: Message> Default for Messages<E> {105fn default() -> Self {106Self {107messages_a: Default::default(),108messages_b: Default::default(),109message_count: Default::default(),110}111}112}113114impl<M: Message> Messages<M> {115/// Returns the index of the oldest message stored in the message buffer.116pub fn oldest_message_count(&self) -> usize {117self.messages_a.start_message_count118}119120/// Writes an `message` to the current message buffer.121/// [`MessageReader`](super::MessageReader)s can then read the message.122/// This method returns the [ID](`MessageId`) of the written `message`.123#[track_caller]124pub fn write(&mut self, message: M) -> MessageId<M> {125self.write_with_caller(message, MaybeLocation::caller())126}127128pub(crate) fn write_with_caller(&mut self, message: M, caller: MaybeLocation) -> MessageId<M> {129let message_id = MessageId {130id: self.message_count,131caller,132_marker: PhantomData,133};134#[cfg(feature = "detailed_trace")]135tracing::trace!("Messages::write() -> id: {}", message_id);136137let message_instance = MessageInstance {138message_id,139message,140};141142self.messages_b.push(message_instance);143self.message_count += 1;144145message_id146}147148/// Writes a list of `messages` all at once, which can later be read by [`MessageReader`](super::MessageReader)s.149/// This is more efficient than writing each message individually.150/// This method returns the [IDs](`MessageId`) of the written `messages`.151#[track_caller]152pub fn write_batch(&mut self, messages: impl IntoIterator<Item = M>) -> WriteBatchIds<M> {153let last_count = self.message_count;154155self.extend(messages);156157WriteBatchIds {158last_count,159message_count: self.message_count,160_marker: PhantomData,161}162}163164/// Writes the default value of the message. Useful when the message is an empty struct.165/// This method returns the [ID](`MessageId`) of the written `message`.166#[track_caller]167pub fn write_default(&mut self) -> MessageId<M>168where169M: Default,170{171self.write(Default::default())172}173174/// "Sends" an `message` by writing it to the current message buffer.175/// [`MessageReader`](super::MessageReader)s can then read the message.176/// This method returns the [ID](`MessageId`) of the sent `message`.177#[deprecated(since = "0.17.0", note = "Use `Messages<E>::write` instead.")]178#[track_caller]179pub fn send(&mut self, message: M) -> MessageId<M> {180self.write(message)181}182183/// Sends a list of `messages` all at once, which can later be read by [`MessageReader`](super::MessageReader)s.184/// This is more efficient than sending each message individually.185/// This method returns the [IDs](`MessageId`) of the sent `messages`.186#[deprecated(since = "0.17.0", note = "Use `Messages<E>::write_batch` instead.")]187#[track_caller]188pub fn send_batch(&mut self, messages: impl IntoIterator<Item = M>) -> WriteBatchIds<M> {189self.write_batch(messages)190}191192/// Sends the default value of the message. Useful when the message is an empty struct.193/// This method returns the [ID](`MessageId`) of the sent `message`.194#[deprecated(since = "0.17.0", note = "Use `Messages<E>::write_default` instead.")]195#[track_caller]196pub fn send_default(&mut self) -> MessageId<M>197where198M: Default,199{200self.write_default()201}202203/// Gets a new [`MessageCursor`]. This will include all messages already in the message buffers.204pub fn get_cursor(&self) -> MessageCursor<M> {205MessageCursor::default()206}207208/// Gets a new [`MessageCursor`]. This will ignore all messages already in the message buffers.209/// It will read all future messages.210pub fn get_cursor_current(&self) -> MessageCursor<M> {211MessageCursor {212last_message_count: self.message_count,213..Default::default()214}215}216217/// Swaps the message buffers and clears the oldest message buffer. In general, this should be218/// called once per frame/update.219///220/// If you need access to the messages that were removed, consider using [`Messages::update_drain`].221pub fn update(&mut self) {222core::mem::swap(&mut self.messages_a, &mut self.messages_b);223self.messages_b.clear();224self.messages_b.start_message_count = self.message_count;225debug_assert_eq!(226self.messages_a.start_message_count + self.messages_a.len(),227self.messages_b.start_message_count228);229}230231/// Swaps the message buffers and drains the oldest message buffer, returning an iterator232/// of all messages that were removed. In general, this should be called once per frame/update.233///234/// If you do not need to take ownership of the removed messages, use [`Messages::update`] instead.235#[must_use = "If you do not need the returned messages, call .update() instead."]236pub fn update_drain(&mut self) -> impl Iterator<Item = M> + '_ {237core::mem::swap(&mut self.messages_a, &mut self.messages_b);238let iter = self.messages_b.messages.drain(..);239self.messages_b.start_message_count = self.message_count;240debug_assert_eq!(241self.messages_a.start_message_count + self.messages_a.len(),242self.messages_b.start_message_count243);244245iter.map(|e| e.message)246}247248#[inline]249fn reset_start_message_count(&mut self) {250self.messages_a.start_message_count = self.message_count;251self.messages_b.start_message_count = self.message_count;252}253254/// Removes all messages.255#[inline]256pub fn clear(&mut self) {257self.reset_start_message_count();258self.messages_a.clear();259self.messages_b.clear();260}261262/// Returns the number of messages currently stored in the message buffer.263#[inline]264pub fn len(&self) -> usize {265self.messages_a.len() + self.messages_b.len()266}267268/// Returns true if there are no messages currently stored in the message buffer.269#[inline]270pub fn is_empty(&self) -> bool {271self.len() == 0272}273274/// Creates a draining iterator that removes all messages.275pub fn drain(&mut self) -> impl Iterator<Item = M> + '_ {276self.reset_start_message_count();277278// Drain the oldest messages first, then the newest279self.messages_a280.drain(..)281.chain(self.messages_b.drain(..))282.map(|i| i.message)283}284285/// Iterates over messages that happened since the last "update" call.286/// WARNING: You probably don't want to use this call. In most cases you should use an287/// [`MessageReader`]. You should only use this if you know you only need to consume messages288/// between the last `update()` call and your call to `iter_current_update_messages`.289/// If messages happen outside that window, they will not be handled. For example, any messages that290/// happen after this call and before the next `update()` call will be dropped.291///292/// [`MessageReader`]: super::MessageReader293pub fn iter_current_update_messages(&self) -> impl ExactSizeIterator<Item = &M> {294self.messages_b.iter().map(|i| &i.message)295}296297/// Get a specific message by id if it still exists in the messages buffer.298pub fn get_message(&self, id: usize) -> Option<(&M, MessageId<M>)> {299if id < self.oldest_message_count() {300return None;301}302303let sequence = self.sequence(id);304let index = id.saturating_sub(sequence.start_message_count);305306sequence307.get(index)308.map(|instance| (&instance.message, instance.message_id))309}310311/// Which message buffer is this message id a part of.312fn sequence(&self, id: usize) -> &MessageSequence<M> {313if id < self.messages_b.start_message_count {314&self.messages_a315} else {316&self.messages_b317}318}319}320321impl<E: Message> Extend<E> for Messages<E> {322#[track_caller]323fn extend<I>(&mut self, iter: I)324where325I: IntoIterator<Item = E>,326{327let old_count = self.message_count;328let mut message_count = self.message_count;329let messages = iter.into_iter().map(|message| {330let message_id = MessageId {331id: message_count,332caller: MaybeLocation::caller(),333_marker: PhantomData,334};335message_count += 1;336MessageInstance {337message_id,338message,339}340});341342self.messages_b.extend(messages);343344if old_count != message_count {345#[cfg(feature = "detailed_trace")]346tracing::trace!(347"Messages::extend() -> ids: ({}..{})",348self.message_count,349message_count350);351}352353self.message_count = message_count;354}355}356357#[derive(Debug)]358#[cfg_attr(feature = "bevy_reflect", derive(Reflect), reflect(Default))]359pub(crate) struct MessageSequence<E: Message> {360pub(crate) messages: Vec<MessageInstance<E>>,361pub(crate) start_message_count: usize,362}363364// Derived Default impl would incorrectly require E: Default365impl<E: Message> Default for MessageSequence<E> {366fn default() -> Self {367Self {368messages: Default::default(),369start_message_count: Default::default(),370}371}372}373374impl<E: Message> Deref for MessageSequence<E> {375type Target = Vec<MessageInstance<E>>;376377fn deref(&self) -> &Self::Target {378&self.messages379}380}381382impl<E: Message> DerefMut for MessageSequence<E> {383fn deref_mut(&mut self) -> &mut Self::Target {384&mut self.messages385}386}387388/// [`Iterator`] over written [`MessageIds`](`MessageId`) from a batch.389pub struct WriteBatchIds<E> {390last_count: usize,391message_count: usize,392_marker: PhantomData<E>,393}394395/// [`Iterator`] over sent [`MessageIds`](`MessageId`) from a batch.396#[deprecated(since = "0.17.0", note = "Use `WriteBatchIds` instead.")]397pub type SendBatchIds<E> = WriteBatchIds<E>;398399impl<E: Message> Iterator for WriteBatchIds<E> {400type Item = MessageId<E>;401402fn next(&mut self) -> Option<Self::Item> {403if self.last_count >= self.message_count {404return None;405}406407let result = Some(MessageId {408id: self.last_count,409caller: MaybeLocation::caller(),410_marker: PhantomData,411});412413self.last_count += 1;414415result416}417}418419impl<E: Message> ExactSizeIterator for WriteBatchIds<E> {420fn len(&self) -> usize {421self.message_count.saturating_sub(self.last_count)422}423}424425#[cfg(test)]426mod tests {427use crate::message::{Message, Messages};428429#[test]430fn iter_current_update_messages_iterates_over_current_messages() {431#[derive(Message, Clone)]432struct TestMessage;433434let mut test_messages = Messages::<TestMessage>::default();435436// Starting empty437assert_eq!(test_messages.len(), 0);438assert_eq!(test_messages.iter_current_update_messages().count(), 0);439test_messages.update();440441// Writing one message442test_messages.write(TestMessage);443444assert_eq!(test_messages.len(), 1);445assert_eq!(test_messages.iter_current_update_messages().count(), 1);446test_messages.update();447448// Writing two messages on the next frame449test_messages.write(TestMessage);450test_messages.write(TestMessage);451452assert_eq!(test_messages.len(), 3); // Messages are double-buffered, so we see 1 + 2 = 3453assert_eq!(test_messages.iter_current_update_messages().count(), 2);454test_messages.update();455456// Writing zero messages457assert_eq!(test_messages.len(), 2); // Messages are double-buffered, so we see 2 + 0 = 2458assert_eq!(test_messages.iter_current_update_messages().count(), 0);459}460}461462463