Publish-Subscribe Design Patterns

While developing embedded system, one frequently encounters a situation where many entities are interested in occurrence of a particular event. This introduces a strong coupling between the publisher and subscriber of this event change notification. Thus whenever a new entity needs the information, code for the publisher of the information also needs to be modified to accommodate the new request.

An Example

Consider the example of a system which manages terminals. Here many entities would be interested in knowing when a terminal's status changes. A few examples are:

As the design progresses, more and more applications would be interested in status change. Its easy to see that this complicates the design, as terminal status handling has to be modified on each occasion.

Publish-Subscribe Pattern

The Publish-Subscribe Pattern solves the tight coupling problem. Here the coupling is removed by the publisher of information supporting a generic interface for subscribers. Entities interested in the information subscribe to the publisher by registering for the information. With this interface, the publisher code does not need to be modified every time a subscriber is introduced.

Whenever information needs to be published, the publisher invokes the Publish method to inform all the subscribers.

This pattern comes in two flavors:

Local Publish-Subscribe Pattern

Here we develop the LocalStatusPublisher class. This class can be used as a helper class when this generic subscribe-publish interface is desired. The following important public methods are supported:

 
Local Status Publisher
class LocalStatusPublisher
{
    Subscriber *m_pSubscriber[MAX_SUBSCRIBERS];
    
    // Find() searches for an entry that matches
    // the pointer. It returns the index of the entry.
    // If the entry is not found, it returns NOT_FOUND
    int Find(Subscriber *pSubscriber) const
    {
       int index = NOT_FOUND
       for (int i=0; i < MAX_SUBSCRIBERS; i++)
       {
           if (m_pSubscriber[i] == pSubscriber)
           {
               index = i;
               break;
           }
       }       
       return index;     
    }
            
public:

    // PublishStatus() notifies subscribers about status change
    void PublishStatus(int unitId, int status)
    {
       for (int i=0; i < MAX_SUBSCRIBERS; i++)
       {
           // A valid subscriber exists only when the pointer is non NULL
           if (m_pSubscriber[i])
           {
               m_pSubscriber[i]->HandleStatusChange(termId, status);
           }
       }
    }
    
    // Register() is invoked by objects inheriting from Subscriber. Once
    // registered, the object is notified whenever status change is
    // detected. The method returns FAILURE in the following cases:
    // (1) Duplicate Request (2) Subscriber Table is full
    Status Register(Subscriber *pSubscriber)
    {
        Status status = FAILURE;
        // First check if the subscriber is already present
        // in the list
        int index = Find(pSubscriber);
                
        if (index == NOT_FOUND)
        {
            // Subscriber was not found, so this is not a duplicate request
            // Now look for a free entry by finding NULL
            index = Find(NULL);
            if (index != NOT_FOUND)
            {
               // A free entry has been found
               m_pSubscriber[index] = pSubscriber;
               status = SUCCESS;
            }
        }      
        return status
    }
    
    // Deregister() removes a subscriber. Returns FAILURE
    // if the subscriber was not registered in the first place
    Status Deregister(Subscriber *pSubscriber)
    {
        Status status = FAILURE;
        // Search for the entry
        int index = Find(pSubscriber);
        if (index != NOT_FOUND)
        {
           // Free the entry by marking as NULL
           m_pSubscriber[index] = NULL;
           status = SUCCESS;
        }
        return SUCCESS;
    }
    
};    

Remote Publish-Subscribe Pattern

Here we will look at the RemoteStatusPublisher class. This class supports a message based interface. Subscribers send registration request message to register for the status change. The source address of the sender is saved as a part of the registration process. Deregistration request is sent to stop receiving the status change notifications.

Whenever status change is detected, PublishStatus method is invoked. This method sends the status change message to all the registered subscribers using the address that was obtained during registration.

Remote Status Publisher
class RemoteStatusPublisher
{
    SubscriberAddress m_subscriberAddr[MAX_SUBSCRIBERS];
    
    // Find() searches for an entry that matches
    // the subscriber Address. It returns the index of the entry.
    // If the entry is not found, it returns NOT_FOUND
    int Find(SubscriberAddress subscriberAddr) const
    {
       int index = NOT_FOUND
       for (int i=0; i < MAX_SUBSCRIBERS; i++)
       {
           if (m_subscriberAddr[i] == subscriberAddr)
           {
               index = i;
               break;
           }
       }       
       return index;     
    } 
       
    void SendRequestStatus(int requestType, SubscriberAddr 
                       subscriberAddr, Status registrationStatus);
public:

    // PublishStatus() notifies subscribers about status change
    // by sending a message
    void PublishStatus(int unitId, int unitStatus)
    {
       for (int i=0; i < MAX_SUBSCRIBERS; i++)
       {
           // A valid subscriber exists only when the pointer is non NULL
           if (m_subscriberAddr[i]!= FREE_ENTRY)
           {
               SendStatusChange(termId, unitStatus);
           }
       }
    }
    
    // This method handles the registration request message
    // from remote entities. The source address of the requester
    // is saved for notification. The requester is also notified
    // of the registration status
    void HandleRegisterRequest(const RegisterRequestMsg *pMsg)
    {
        Status status = FAILURE;
        // First check if the subscriber is already present
        // in the list
        int index = Find(pMsg->sourceAddr);
                
        if (index == NOT_FOUND)
        {
            // Subscriber was not found, so this is not a duplicate request
            // Now look for a free entry by finding FREE_ENTRY
            index = Find(FREE_ENTRY);
            if (index != NOT_FOUND)
            {
               // A free entry has been found
               m_subscriberAddr[index] = pMsg->sourceAddr;
               status = SUCCESS;
            }
        }      
    }
    
    // This method handles the deregistration request and responds back
    // with the status
  void HandleDeregisterRequest(const HandleDeregisterRequest *pMsg)
    {
        Status status = FAILURE;
        // Search for the entry
        int index = Find(pMsg->sourceAddr);
        if (index != NOT_FOUND)
        {
           // Free the entry by marking as FREE_ENTRY
           m_subscriberAddr[index] = FREE_ENTRY;
           status = SUCCESS;
        }
        SendRequestStatus(DEREGISTRATION, pMsg->sourceAddr, status);
    }
    
};