EAMessage is an intra-application messaging system which fundamentally implements the observer pattern.
The two fundamental elements of EAMessage are the observers (class IHandler) and the server (class Server). There are additional classes and utilities that provide extended functionality. We will discuss the primary messaging functionality in this document, and address the extended functionality in its own documents, such as EAMessageExtra and EAMessageDefer.
There are many views on how to implement event or messaging systems. A fundamental design aspect of EAMessage is that messages are void pointers and that it is up to the users to agree upon the meaning of the data. This allows for a simpler and more flexible implementation. This design is controversial with some, due to its lack of inherent type safety. However, years of experience with EAMessage-style messaging has shown it to not be an issue in practice.
Events in a given event system can be pushed, pulled, or polled. The differences are described here:
EAMessage is fundamentally a pushed messaging system. However, the EAMessage Server class uses a Queue class to implement a message queue and this queue can be used independently of the Server in pulled or polled way. Indeed, the Server itself pulls from the Queue and pushes the resulting messages onto clients.
Here we provide a list of fundamental features of EAMessage:
Feature Description Immediate delivery (a.k.a. message send) Messages are routed to their destination immediately. This is the most common messaging mechanism. Delayed delivery (a.k.a. message post) Messages are put into a queue to be delivered at a later designated time. Among other things, this removes the need for clients to be reentrant and thread-safe, but has its downsides as well. Direct delivery (peer directly to peer) An entity can directly send a message to another entity without using a server. Arbitrary sender The sender doesn't have to be any special entity; anybody can send a message. Ideally, the sender doesn't have to register messages before the sender can send them. A message with no registered recipients simply goes into the bit bucket. Multiple event listeners (event subscription) Messages get delivered to potentially multiple recipients. In this case the messages are usually better thought of as broadcasted events. Message listener priority A listener can be designated as the first entity or last entity to receive the message, for example. Reentrancy You can send messages while you are in the middle of receiving a message, for example. Ditto for other operations. Thread-safety Multiple threads can use messaging simultaneously without restriction. Auditing An external entity can listen in on the activity of the system as a passive observer, primarily for debugging. Private queues (per-thread queues) A features which allows messages to be sent to a handler's own queue, which is typically handled within the handler's own thread.
Additional utility functionality (EAMessage, EAMessageExtra, EAMessageDefer):
Feature Location Description AutoHandler EAMessage Implements automatic registration and unregistration for a set of messages for a class. message_cast EAMessage Casts a message pointer to a given type. QueuedHandler EAMessageExtra Implements the IQueuedHandler interface. Handler EAMessageExtra This templated class allows you to have a class which recieves messages to a user-defined member function as opposed to a function named HandleMessage. HandlerMemberFunction EAMessageExtra Allows a C function to be used as a message handler. MessageRC EAMessageExtra Implements a base for IMessageRC (reference counted message). MessageBasic EAMessageExtra Implements a generic message type. MessageBasicRC EAMessageExtra Implements a generic message type that's reference counted. DeferredMessage EAMessageDefer Implements a message that is sent after a given amount of time. This is like a posted message but the delivery doesn't just occur upon the next message management cycle but instead after a certain amount of time or events (e.g. vblanks). ThreadSafeCallback EAMessageDefer Implements a callback that is done via EA::Callback but the callback occurs in the main thread instead of the Callback thread. A primary use of this is to allow a system to receive callbacks in a thread-safe way because Callback may occur outside the main thread but the recipient wants the callback to occur in the main thread. CallbackMessage EAMessageDefer Implements a Callback (EA::Callback) that sends or posts a callback event to a given recipient instead of a calling a member function. A primary use of this is to allow a system to receive Callbacks via messaging instead of via a specific member function. This allows some systems to be written in a more generic way and have reduced dependencies, assuming those systems are already messaging-savvy.
We provide some basic example usage here, but there is additional example usage in the code and in the unit tests.
Here's an example of setting up a Server in an application:
EA::Messaging::Server gServer;
int main(int, char**)
{
gServer.Init();
EA::Messaging::SetServer(&gServer);
RunApplication();
gServer.Shutdown();
};
Here's an example class that registers to receive messages with the message Server:
const uint32_t kTouchdownScored = 0x12345678;
struct TouchdownData {
int mHomeScore;
int mAwayScore;
};
class Listener : public EA::Messaging::IHandler {
public:
void Init() {
gServer.AddHandler(this, kTouchdownScored, false);
}
void Shutdown() {
gServer.RemoveHandler(this, kTouchdownScored);
}
bool HandleMessage(EA::Messaging::MessageId id, void* pMessage) {
if(id == kTouchdownScored) {
const TouchdownData* const pTD = (TouchdownData*)pMessage;
printf("Touchdown was scored. New score is %d : %d", );
}
return true;
}
};
Here's an example of sending a message:
void Simulator::Simulate() {
if(playerPosition > 100) {
TouchdownData td = { gHomeScore, gAwayScore + 6; }
gServer.MessageSend(kTouchdownScored, &td);
}
}
We list the fundamental entities defined by the EAMessage interface. There are some additional functions and definitions which you can see by reading the header file itself. EAMessageExtra and EAMessageDefer have additional functionality.
Priority:
/// enum Priority /// This enumeration defines messaging system priorities. In practice, priorities are implemented /// simply as signed integers whereby higher values denote higher priority. As such, this enumeration /// is more of a placeholder than a set of values that must be adhered to. enum Priority { kPriorityAny = -9999, /// This is a special value used to mean to match any priority during a RemoveHandler call. kPriorityLow = -1000, kPriorityNormal = 0, kPriorityHigh = 1000 };
IMessageRC:
/// IMessageRC
///
/// Provides an interface for reference counted messages. Only queue (delayed)
/// messages need to be IMessageRC objects. Regular sent messages can be of any
/// type of object, including IMessageRC objects.
///
/// Instances of this interface need to be created via some kind of heap and
/// cannot be created on the stack. This is because their lifetime is
/// necessarily indeterminate. However, such objects can be located in a special
/// nonfragmenting fixed size heap in order to optimize their allocation.
///
class IMessageRC
{
public:
virtual ~IMessageRC() { }
virtual int AddRef() = 0;
virtual int Release() = 0;
};
IHandler:
/// IHandler
///
/// Defines a base class that can process messages. A user would register
/// such a class with a an entity which generates or distributes messages;
/// Normally this is the Server class. However, IHandler instances can be
/// used outside of the Server class, including the case whereby some piece
/// of code simply directly calls the HandleMessage function of an IHandler.
///
/// The return value usage of the IHandler::HandleMessage function is user-defined.
/// The Server class does not use this return value, but other users of the
/// IHandler may do so. The return value is used in some systems to indicate
/// whether the handler handled the message or not.
///
/// Note that the HandleMessage function doesn't have a mechanism to identify
/// the message sender nor other information such as a timestamp. The reason
/// for this is that history has shown that 95+% of the time the message
/// recipient either doesn't care about who sent the message, or always knows
/// who sent it because there is only one sender. Thus sender identification is
/// something that should be added to the Message subclass on an as-needed basis.
/// As for time stamps and other similar metadata, history has shown that this
/// is best handled in the message server in an application-specific way, as there
/// is no single type of metadata that all users want or need.
///
class IHandler
{
public:
virtual ~IHandler() { }
virtual bool HandleMessage(MessageId messageId, void* pMessage) = 0;
};
class IHandlerRC : public IHandler
{
public:
virtual int AddRef() = 0;
virtual int Release() = 0;
};
Server:
/// class Server /// /// Standard message server. This class implements a server which allows the /// registration of handlers for messages and the sending and posting of messages /// to those handlers. /// /// Re-entrancy safety allows the message server to post messages and add/remove /// handlers while in the middle of posting messages to handlers. This is rather /// important in systems with complex message handling usage patterns. /// /// To consider: Perhaps the Server should be a ref-countable object. Without it /// there is the possibility of code using a Server after it has been destroyed. /// class Server { public: /// Constructs an instance of a Server. This function doesn't do anything more /// than set member data to a known empty state. You must call the Init function /// separately in order to prepare the Server for use; see Init for information /// about this. /// Server(); Server(EA::Allocator::ICoreAllocator* pCoreAllocator); Server(const Server&); Server& operator=(const Server&); /// Destructs an instance of a Server. You must call the Shutdown function /// before destructing an instance of Server. This is good practice and eliminates /// the possibiliy that a specific implementation of this class falls prey to /// chicken-and-egg dependency problems. /// ~Server(); /// Initalizes an instance of this class. /// bool Init(); /// Shuts down an instance of this class. The instance will be restored to /// its state prior to initialization. All notification registrations will /// be unregistered and /// bool Shutdown(); /// Sets the memory allocator to use with this class.
/// This allocator is used to allocate mHandlerInfoMap elements
/// This function must be called right after Init(). /// void SetAllocator(EA::Allocator::ICoreAllocator* pCoreAllocator);
/// Specifies user-configurable options. /// enum Options { kOptionNone, /// Refers to no options. kOptionThreadSafe, /// Enabled by default. Enables thread safety. Thread safety allows the server to be used from multiple threads concurrently. kOptionReentrancy, /// Enabled by default. Enables reentrancy. When the server is re-entrant-safe, users can call back into the message server while the message server is in the process of directly or indirectly calling the user. kOptionQueueProcessing, /// Enabled by default. This option specifies whether queued messages are processed. You can use this option to temporarily delay processing. Multiple calls to disable processing must be matched with multiple calls to enable processing for processing to be resumed. kOptionClearQueue, /// Enabled by default. This option forces the clearing of the posted message queue on DoProcessing. Otherwise, only messages in the queue before DoProcessing are processed. kOptionRefCount, /// Enabled by default. Enables the ability to do reference counting of posted messages and subscribers Doesn't mean that reference counting is forced on all clients; they still have a choice. kOptionTrace, /// Enabled by default. Enables the propogation of traces or assertion failures in debug builds. kOptionCount /// Count of options. }; /// The input 'option' is an int instead of enum Option because having it /// as an int allows subclasses to extend the range of options. If the /// input option is an undefined option, the return value is zero. /// /// Example usage: /// bool threadSafe = pServer->GetOption(Server::kOptionThreadSafe); /// int GetOption(int option) const; /// The input 'option' is an int instead of enum Option because having it /// as an int allows subclasses to extend the range of options. If the /// input option is an undefined option, the call has no effect. /// All options should be set before first use of the Server. /// void SetOption(int option, int value); /// Immediately delivers the given message to subscribers. /// The message can be any block of data. The format of that data is defined /// by the sender and all messages of the same MessageId must have the same /// format. /// /// Any MessageId is valid except for kMessageIdNone, kMessageIdAddRef, and /// kMessageIdRelease. A value of kMessageIdAll will result in the message /// being sent only to those who have registered for kMessageIdAll. /// /// Note that the message system is reentrant. Thus it is possible that while /// a message is being sent, one of the recipients may turn around and send /// additional messages do other message system activity while processing /// a given message. As a result, you will want to take care to make sure that /// you account for this if you need to. /// /// If pHandler is non-NULL, the message is delivered to the given handler alone. /// In this case, the IHandler need not be a handler that has subscribed to /// messages but can be any arbitrary IHandler object. /// void MessageSend(MessageId id, void* pMessage, IHandler* pHandler = NULL); /// Puts the given message into a queue for delivery the next time the queue is processed. /// Among other things, usage of posted messages removes the need for clients to be /// reentrant and thread-safe, but has its downsides as well. Nevertheless, this is /// critical for the communication between threads in systems such as audio. /// /// Messages that are posted are based on the IMessageRC interface. The reason for this /// is that IMessageRC provides for reference counting which is used to allow the message /// to live for an unspecified time while in the queue. For performance reasons it is /// advised that IMessageRC-based objects avoid using the general heap and instead come /// from a pool of objects which are given back to the pool via IMessageRC::Release(). /// /// If the input pHandler is NULL, the message is delivered to all subscribers, /// else it is delivered only to the given message handler. /// /// The priority parameter allows the message to be moved higher in the queue. By default /// messages go at the back of the queue. But if a message is of higher priority than /// any messages at the back of the queue, it is put ahead of them. 99% of the time /// you want to use normal priority. The only times you want to use higher priority /// messages is when you have out-of-band items that are generally independent from /// other items and preservation of exact ordering is not important. /// /// The recipient list for a posted message is evaluated at the time the message will be /// delivered and not at the time of the MessagePost call. /// /// Any MessageId is valid except for kMessageIdNone, kMessageIdAddRef, and /// kMessageIdRelease. A value of kMessageIdAll will result in the message /// being sent only to those who have registered for kMessageIdAll. void MessagePost(MessageId id, IMessageRC* pMessage, int nPriority = kPriorityNormal, IHandlerRC* pHandlerRC = NULL); /// Similar to MessagePost for IHandlerRC, except it uses a standalone function as the override destination.
/// void MessagePostFunction(MessageId id, IMessageRC* pMessage, int nPriority = kPriorityNormal,
HandlerFunction pHandlerFunction = NULL, void* pHandlerFunctionContext = NULL);
/// Adds the given message handler to the subscription list for the given message id. /// A given handler can be registered more than once; this may be useful for the registation /// of a handler for both low and high priorities. /// /// If the bRefCounted argument is true, the IHandler input is assumed to be an IHandlerRC /// object and is reference counted by the message server via the IHandlerRC interface, /// which is an extension of the IHandler interface. Reference counting is useful in preventing /// problems which occur with highly reentrant usage patterns. /// /// The priority argument specifies whether the given handler is put ahead or behind other /// handlers in the subscriber list. A handler with a high priority receives messages before /// handlers with lower priorities. /// /// Any MessageId is valid except for kMessageIdNone, kMessageIdAddRef, and /// kMessageIdRelease. A value of kMessageIdAll is a special case which will /// result in the handler receiving all messages that are sent and posted via /// the Server. The priority value will uses as with regular messages. /// void AddHandler(IHandler* pHandler, MessageId messageId, bool bRefCounted, int nPriority = kPriorityNormal); /// Similar to AddHandler, except posted messages get delivered to the IQueuedHandler::HandleQueuedMessage function.
/// You can use an instance of IQueuedHandler (or its subclass QueuedHandler) to recieve messages into a private
/// queue, usually for the purpose of processing them in another thread.
/// If you call AddQueuedHandler then your pHandler will recieve posted messages via its HandleQueuedMessage function,
/// and will recieve sent messages directly via its HandleMessage function. You probably don't want to call AddHandler
/// for an iHandler/MessageId pair that you're already calling AddQueuedHandler for.
///
/// Example usage:
/// QueuedHandler myQueuedHandler(pCoreAllocator); // Can optionally associate an IHandler with this.
///
/// pServer->AddQueuedHandler(&myQueuedHandler, 0x12345678, false); }
///
/// MessageBasicRC<1>* pMessage = new MessageBasicRC<1>;
/// pServer->MessagePost(0x12345678, pMessage);
/// myQueuedHandler.ProcessQueue(); // If an IHandler was associated with myQueuedHandler then the messages will be sent to it here.
/// or
/// myQueuedHandler.GetMessageQueue()->...; // Directly read the queue. Make sure to Release the Message if you directly remove messages from the queue.
/// void AddQueuedHandler(IQueuedHandler* pHandler, MessageId messageId, bool bRefCounted, int nPriority = kPriorityNormal);
/// Adds the given message handling function to the subscription list for the given message id.
/// It is similar to the IHandler-based AddHandler function in all respects except for the
/// case of reference counting. In this case, if bRefCounted is true then reference counting
/// is implemented via the kMessageIdAddRef and kMessageIdRelease messages sent to the function
/// by the server.
///
/// The pHandlerFunctionContext is a pointer to user-defined data (or is NULL) which gets passed
/// to the HandlerFunction when it is called. This allows the user to attach contextual data to
/// the callback function which the function can use to interpret how to operate.
///
/// Example usage:
/// bool HandleMessage(MessageId id, void* pMsg, void* pContext);
/// pServer->AddHandler(HandleMessage, 0x12345678, NULL);
void AddHandlerFunction(HandlerFunction pHandlerFunction, void* pHandlerFunctionContext,
MessageId messageId, bool bRefCounted, int nPriority = kPriorityNormal);
/// Removes the given handler for the given message Id. A handler is removed only once for /// a given call to RemoveHandler in the case that the handler was added more than once. /// /// The priority parameter is used to match a handler to a given priority specified in /// in AddHandler. If no priority is matched exactly, then any priority match is used. /// This allows you to remove a handler multiple times without having to know or care /// about any special priorities it was registered with. /// EAM_VIRTUAL bool RemoveHandler(IHandler* pHandler, MessageId messageId, int nPriority = kPriorityAny); /// RemoveQueuedHandler
///
/// Removes the handler previously added via AddQueuedHandler.
///
/// Example usage:
/// QueuedHandler myQueuedHandler(pCoreAllocator); // Can optionally associate an IHandler with this.
///
/// pServer->AddQueuedHandler(&myQueuedHandler, 0x12345678, false); }
/// pServer->RemoveQueuedHandler(&myQueuedHandler, 0x12345678); }
///
bool RemoveQueuedHandler(IQueuedHandler* pHandler, MessageId messageId, int nPriority = kPriorityAny);
/// This function is just like RemoveHandler for IHandler except it works for a HandlerFunction.
///
/// Example usage:
/// bool HandleMessage(MessageId id, void* pMsg, void* pContext);
/// pServer->RemoveHandler(HandleMessage, 0x12345678, NULL);
///
bool RemoveHandlerFunction(HandlerFunction pHandlerFunction, MessageId messageId, int nPriority = kPriorityAny);
/// Returns true if the given handler is registered by AddHandler to handle the
/// given message id. If the message id is kMessageIdNone, then this function
/// returns true if the handler is registered to handle any message ids.
///
bool IsHandlerRegistered(IHandler* pHandler, MessageId id = kMessageIdNone) const;
/// Does one cycle of queue processing. /// Returns the number of messages processed. /// /// If kOptionQueueProcessing is disabled then the queue is not processed /// and the return value is zero. /// /// If the kOptionClearQueue option is enabled then the queue will be processed /// until no messages remain, including messages posted while processing /// queued messages. If kOptionClearQueue is disabled, then the only messages /// sent will be those present in the queue upon the start of the processsing. /// size_t ProcessQueue(); /// Does multiple cycles of queue processing. /// Returns the number of messages which were processed. /// This function is useful for the case where you want to process the queue /// repeatedly in one shot. This can be useful because sometimes the delivery /// of a queued message results in the queuing of additional messages and /// the user may want to make sure that the additional messages are processed /// as well as the original ones. /// /// Example usage: /// pServer->ProcessQueue(5, 10, 1000); /// EAM_VIRTUAL size_t ProcessQueue(size_t nMinCycleCount, size_t nMaxCycleCount = (size_t)-1, size_t nMaxTimeMilliseconds = (size_t)-1); /// Returns a copy of the message queue to the caller. This function may seem a little /// overly powerful in that it lets the caller inspect and modify the message queue, /// but a MessageQueue is a first class item and not a strictly internal implementation. /// Additionally, history has shown that there are times when systems truly need to /// do smart inspection of message queues in order to do the right thing in the most /// efficient way. /// /// In a multithreaded environment the caller needs to Lock the Server while the /// MessageQueue is being used. /// MessageQueue* GetMessageQueue(); /// Allows you to reserve memory for management of MessageId handler registration.
/// AddHandler potentially results in an increase in the size of the memory pool used
/// to manage registerd handlers. This function allows you to tell the Server up front
/// how many message ids you expect to be using, so it can reserve that amount of memory
/// right away, as opposed to repeatedly reallocating it at runtime as handler regisration
/// increases. This function can be called at any time, but it normally would be called
/// before an increase in handler registration occurs, such as on startup.
///
void ReserveMessageIdCapacity(size_t messageIdCount);
/// Locks the message server for thead-safe access over a user-specified period of time. /// This is useful for doing multiple actions with the server without being interrupted. /// /// Returns the number of locks after the operation has completed. A call to Lock(false) /// when the lock count is already zero will be ignored. /// int Lock(bool bLock); }; // class Server