Publish-Subscribe Design Patterns
While developing embedded system, one frequently encounters a situation where
many entities are interested in occurrence of a particular event. This
introduces a strong coupling between the publisher and subscriber of this event
change notification. Thus whenever a new entity needs the information, code for
the publisher of the information also needs to be modified to accommodate the
new request.
An Example
Consider the example of a system which manages terminals. Here many entities
would be interested in knowing when a terminal's status changes. A few examples
are:
- Call Processing module needs to know when the terminal goes out of service
so that calls on the terminal can be cleared.
- Alarm processing software needs to know of terminal status change, so that
alarms can be raised/cleared.
- Resource manager would be interested in terminal status change as it needs
to change the resource counts when terminal status changes.
As the design progresses, more and more applications would be interested in
status change. Its easy to see that this complicates the design, as
terminal status handling has to be modified on each occasion.
Publish-Subscribe Pattern
The Publish-Subscribe Pattern solves the tight coupling problem. Here the
coupling is removed by the publisher of information supporting a generic
interface for subscribers. Entities interested in the information subscribe to
the publisher by registering for the information. With this interface, the
publisher code does not need to be modified every time a subscriber is
introduced.
Whenever information needs to be published, the publisher invokes the Publish
method to inform all the subscribers.
This pattern comes in two flavors:
Here we develop the LocalStatusPublisher class.
This class can be used as a helper class when this generic subscribe-publish
interface is desired. The following important public methods are supported:
- PublishStatus: Invoke this method to inform all
registered subscribers about status change.
- Register: Invoke this method to register an
object for the status change notification.
- Der Invoke this method to deregister an object.
The object will not receive any notifications of status change after it
deregisters.
Local
Status Publisher |
class LocalStatusPublisher
{
Subscriber *m_pSubscriber[MAX_SUBSCRIBERS];
// Find() searches for an entry that matches
// the pointer. It returns the index of the entry.
// If the entry is not found, it returns NOT_FOUND
int Find(Subscriber *pSubscriber) const
{
int index = NOT_FOUND
for (int i=0; i < MAX_SUBSCRIBERS; i++)
{
if (m_pSubscriber[i] == pSubscriber)
{
index = i;
break;
}
}
return index;
}
public:
// PublishStatus() notifies subscribers about status change
void PublishStatus(int unitId, int status)
{
for (int i=0; i < MAX_SUBSCRIBERS; i++)
{
// A valid subscriber exists only when the pointer is non NULL
if (m_pSubscriber[i])
{
m_pSubscriber[i]->HandleStatusChange(termId, status);
}
}
}
// Register() is invoked by objects inheriting from Subscriber. Once
// registered, the object is notified whenever status change is
// detected. The method returns FAILURE in the following cases:
// (1) Duplicate Request (2) Subscriber Table is full
Status Register(Subscriber *pSubscriber)
{
Status status = FAILURE;
// First check if the subscriber is already present
// in the list
int index = Find(pSubscriber);
if (index == NOT_FOUND)
{
// Subscriber was not found, so this is not a duplicate request
// Now look for a free entry by finding NULL
index = Find(NULL);
if (index != NOT_FOUND)
{
// A free entry has been found
m_pSubscriber[index] = pSubscriber;
status = SUCCESS;
}
}
return status
}
// Deregister() removes a subscriber. Returns FAILURE
// if the subscriber was not registered in the first place
Status Deregister(Subscriber *pSubscriber)
{
Status status = FAILURE;
// Search for the entry
int index = Find(pSubscriber);
if (index != NOT_FOUND)
{
// Free the entry by marking as NULL
m_pSubscriber[index] = NULL;
status = SUCCESS;
}
return SUCCESS;
}
};
|
Here we will look at the RemoteStatusPublisher
class. This class supports a message based interface. Subscribers send
registration request message to register for the status change. The source
address of the sender is saved as a part of the registration process.
Deregistration request is sent to stop receiving the status change
notifications.
Whenever status change is detected, PublishStatus
method is invoked. This method sends the status change message to all the
registered subscribers using the address that was obtained during
registration.
Remote
Status Publisher |
class RemoteStatusPublisher
{
SubscriberAddress m_subscriberAddr[MAX_SUBSCRIBERS];
// Find() searches for an entry that matches
// the subscriber Address. It returns the index of the entry.
// If the entry is not found, it returns NOT_FOUND
int Find(SubscriberAddress subscriberAddr) const
{
int index = NOT_FOUND
for (int i=0; i < MAX_SUBSCRIBERS; i++)
{
if (m_subscriberAddr[i] == subscriberAddr)
{
index = i;
break;
}
}
return index;
}
void SendRequestStatus(int requestType, SubscriberAddr
subscriberAddr, Status registrationStatus);
public:
// PublishStatus() notifies subscribers about status change
// by sending a message
void PublishStatus(int unitId, int unitStatus)
{
for (int i=0; i < MAX_SUBSCRIBERS; i++)
{
// A valid subscriber exists only when the pointer is non NULL
if (m_subscriberAddr[i]!= FREE_ENTRY)
{
SendStatusChange(termId, unitStatus);
}
}
}
// This method handles the registration request message
// from remote entities. The source address of the requester
// is saved for notification. The requester is also notified
// of the registration status
void HandleRegisterRequest(const RegisterRequestMsg *pMsg)
{
Status status = FAILURE;
// First check if the subscriber is already present
// in the list
int index = Find(pMsg->sourceAddr);
if (index == NOT_FOUND)
{
// Subscriber was not found, so this is not a duplicate request
// Now look for a free entry by finding FREE_ENTRY
index = Find(FREE_ENTRY);
if (index != NOT_FOUND)
{
// A free entry has been found
m_subscriberAddr[index] = pMsg->sourceAddr;
status = SUCCESS;
}
}
}
// This method handles the deregistration request and responds back
// with the status
void HandleDeregisterRequest(const HandleDeregisterRequest *pMsg)
{
Status status = FAILURE;
// Search for the entry
int index = Find(pMsg->sourceAddr);
if (index != NOT_FOUND)
{
// Free the entry by marking as FREE_ENTRY
m_subscriberAddr[index] = FREE_ENTRY;
status = SUCCESS;
}
SendRequestStatus(DEREGISTRATION, pMsg->sourceAddr, status);
}
};
|