OpenClovis Logo

API Usage Examples
Event Service

Code Examples. More...

Code Examples.

The Event Service invovles communincation between pulishers and subscribers and this is accomplished over an event channel. Here we illustrate how a typical subscriber and publisher would look like emphasizing on the various API available with the Event Service.

The example is based on the event generation on a water mark hit in the Execution Object(EO). The event generated has attributes like the EO name, Library ID, Water Mark ID, Water Mark Type and Water Mark Value. The publisher will publish with some specific values for each of these event attributes. A subscriber can choose the particular attributes it is interested in say a particular EO with the name "COR". In our example we have take the NULL filter which ensures delivery of any event generated on water mark hit irrespective of the attributes to the subscriber.

Common defintions: The filter/pattern information should be shared between the subscriber and publisher. These are passed as byte streams to Event Service and it is the responsibility of application to ensure these are endian neurtal by using marshalling (XDR) or some such mechanism. This would make it possible to filter events across different platforms.

typedef enum{
CL_EO_LIB_ID_OSAL,
CL_EO_LIB_ID_MEM,
CL_EO_LIB_ID_HEAP,
CL_EO_LIB_ID_BUFFER,
CL_EO_LIB_ID_TIMER,
CL_EO_LIB_ID_IOC,
CL_EO_LIB_ID_RMD,
CL_EO_LIB_ID_EO,
CL_EO_LIB_ID_RES,
CL_EO_LIB_ID_POOL = CL_EO_LIB_ID_RES,
CL_EO_LIB_ID_CPM,
CL_EO_LIB_ID_MAX
} ClEoLibIdT;

Subscriber Side:

Typical Event Service would provide with three different handles. The user may want to group them up as below:

typedef struct ClSubsEventInfo
{
ClEventInitHandleT initHandle;
ClEventChannelHandleT channelHandle;
ClEventHandleT eventHandle;
} ClSubsEventInfoT;
static ClSubsEventInfoT gSubsEventInfo;

If the application chooses to open the channel asynchronously then it needs to register a callback that shall be invoked on the creation of the channel. A typical callback would contain the subcription calls or event publish related calls. This has been illustrated in the publisher later.

void clSubsAsyncChannelOpenCb(ClInvocationT invocation,
ClEventChannelHandleT channelHandle, ClRcT retCode)
{
ClRcT rc = CL_OK;
clOsalPrintf("*******************************************************\n");
clOsalPrintf("************* Async Channel Open Callback *************\n");
clOsalPrintf("*******************************************************\n");
clOsalPrintf(" Invocation : %#X\n", invocation);
clOsalPrintf(" Channel Handle : %p\n", (ClPtrT)channelHandle);
clOsalPrintf(" API Return Code : %#X\n", retCode);
clOsalPrintf("*******************************************************\n");
/* Check if the Channel Open Asyn was successful */
if(CL_OK != retCode)
{
goto failure;
}
rc = clEventSubscribe(channelHandle,
CL_EVENT_DEFAULT_SUBS_FILTER, UNIQUE_SUBSCRIPTION_ID,
"User Specified Argument (cookie) for the event delivery callback");
if(CL_OK != rc)
{
goto failure;
}
failure:
return;
}

The subscriber is required to register a callback during the Event Library initialize which is to invoked when an event is delivered. For our example the callback is as follows:

static void clSubEoEventWaterMarkCb( ClEventSubscriptionIdT subscriptionId,
ClEventHandleT eventHandle, ClSizeT eventDataSize )
{
ClRcT rc = CL_OK;
ClEventPriorityT priority = 0;
ClTimeT retentionTime = 0;
ClNameT publisherName = { 0 };
ClEventIdT eventId = 0;
ClEventPatternArrayT patternArray = { 0 };
ClTimeT publishTime = 0;
ClPtrT pCookie = NULL;
ClPtrT pEventData = NULL;
ClUint8T *pEventPayload = NULL;
/* Allocate memory for the event payload.*/
pEventData = clHeapAllocate(eventDataSize);
if (pEventData == NULL)
{
clOsalPrintf("Allocation for event data failed. rc[%#X]\n", rc);
goto failure;
}
/* Fetch the event payload. */
rc = clEventDataGet (eventHandle, pEventData, &eventDataSize);
if (rc != CL_OK)
{
clOsalPrintf("Event Data Get failed. rc[%#X]\n", rc);
goto failure;
}
pEventPayload = pEventData;
/* Fetch the cookie specified during subscribe. */
rc = clEventCookieGet(eventHandle, &pCookie);
if (rc != CL_OK)
{
clOsalPrintf("Event Cookie Get failed. rc[%#X]\n", rc);
goto failure;
}
/* Fetch the attributes of the event.*/
rc = clEventAttributesGet(eventHandle, &patternArray, &priority,
&retentionTime, &publisherName,
&publishTime, &eventId);
if (rc != CL_OK)
{
clOsalPrintf("Event Attributes Get failed. rc[%#X]\n", rc);
goto failure;
}
/* Free the allocated Event */
rc = clEventFree(eventHandle);
clOsalPrintf("-------------------------------------------------------\n");
clOsalPrintf("!!!!!!!!!!!!!!! Event Delivery Callback !!!!!!!!!!!!!!!\n");
clOsalPrintf("-------------------------------------------------------\n");
clOsalPrintf(" Subscription ID : %#X\n", subscriptionId);
clOsalPrintf(" Event Priority : %#X\n", priority);
clOsalPrintf(" Retention Time : 0x%llX\n",
retentionTime);
clOsalPrintf(" Publisher Name : %.*s\n",
publisherName.length, publisherName.value);
clOsalPrintf(" EventID : %#X\n", eventId);
clOsalPrintf(" Event Payload : %s\n", pEventPayload);
clOsalPrintf(" Event Cookie : %s\n", (ClUint8T*)pCookie);
/*
Display the Water Mark Details and Free the patterns.
Note that each pattern needs to be freed and then the
pattern array in that order.
*/
clOsalPrintf(" EO Name : %.*s\n",
(ClInt32T)patternArray.pPatterns[0].patternSize,
(ClCharT *)patternArray.pPatterns[0].pPattern);
clHeapFree(patternArray.pPatterns[0].pPattern);
clOsalPrintf(" Library ID : %#X\n",
*(ClEoLibIdT *)patternArray.pPatterns[1].pPattern);
clHeapFree(patternArray.pPatterns[1].pPattern);
clOsalPrintf(" Water Mark ID : %#X\n",
*(ClWaterMarkIdT *)patternArray.pPatterns[2].pPattern);
clHeapFree(patternArray.pPatterns[2].pPattern);
clOsalPrintf(" Water Mark Type : %s\n",
(*(ClEoWaterMarkFlagT *)patternArray.pPatterns[3].pPattern ==
CL_WM_HIGH_LIMIT) ? "HIGH" : "LOW");
clHeapFree(patternArray.pPatterns[3].pPattern);
clOsalPrintf(" Water Mark Value : %u\n",
*(ClUint32T *)patternArray.pPatterns[4].pPattern);
clHeapFree(patternArray.pPatterns[4].pPattern);
clHeapFree(patternArray.pPatterns);
clOsalPrintf("-------------------------------------------------------\n");
failure:
return;
}

Any component that wishes to be a subscriber has to initialize the Event Library with appropriate arguments. The following code snippet shows how Event Library is initialized and set for the subscription. Typically, the following function would reside in the clCompAppInitialize() registered in the clEoConfig structure of an EO. There could be multiple subscriptions depending on the requirement each with a unique subscription id (unique per initialization of Event Library).

static ClRcT clSubsEventLibrayInitialize(void)
{
ClRcT rc = CL_OK;
ClNameT channelName = {sizeof(CL_EO_EVENT_CHANNEL_NAME)-1,
const ClEventCallbacksT evtCallbacks =
{
NULL, // clSubsAsyncChannelOpenCb for Async Channel Open
clSubEoEventWaterMarkCb, // Event Delivery Callback
};
rc = clEventInitialize(&gSubsEventInfo.initHandle, &evtCallbacks,
&version);
if(CL_OK != rc)
{
clOsalPrintf("clEventInitialize() failed [%#X]\n",rc);
goto failure;
}
rc = clEventChannelOpen(gSubsEventInfo.initHandle, &channelName,
&gSubsEventInfo.channelHandle);
if(CL_OK != rc)
{
clOsalPrintf("clEventChannelOpen() failed [%#X]\n",rc);
goto init_done;
}
rc = clEventSubscribe(gSubsEventInfo.channelHandle,
CL_EVENT_DEFAULT_SUBS_FILTER, UNIQUE_SUBSCRIPTION_ID,
"User Specified Argument (cookie) for the event delivery callback");
if(CL_OK != rc)
{
clOsalPrintf("clEventSubscribe() failed [%#X]\n",rc);
goto channel_opened;
}
return CL_OK;
channel_opened:
clEventChannelClose(gSubsEventInfo.channelHandle);
init_done:
clEventFinalize(gSubsEventInfo.initHandle);
failure:
return rc;
}

The Event Library is typically finalized from within clCompAppTerminate() registered with the Component Manager (CPM). The following code snippet illustrates the steps involved in finalization of a subscriber.

static ClRcT clSubsEventLibrayFinalize(void)
{
ClRcT rc = CL_OK;
rc = clEventUnsubscribe(gSubsEventInfo.channelHandle,
UNIQUE_SUBSCRIPTION_ID);
if(CL_OK != rc)
{
clOsalPrintf("clEventUnsubscribe() failed [%#X]\n",rc);
}
rc = clEventChannelClose(gSubsEventInfo.channelHandle);
if(CL_OK != rc)
{
clOsalPrintf("clEventChannelClose() failed [%#X]\n",rc);
}
rc = clEventFinalize(gSubsEventInfo.initHandle);
if(CL_OK != rc)
{
clOsalPrintf("clEventFinalize() failed [%#X]\n",rc);
}
return CL_OK;
}

Publisher Side:

The Publisher would unlike a subscriber allocate events, set their attributes appropriately and publish them as illustrated below:

static ClRcT clPubsTriggerEvent(ClEoLibIdT libId, ClWaterMarkIdT wmId,
ClUint32T wmValue, ClEoWaterMarkFlagT wmFlag)
{
ClRcT rc = CL_OK;
ClEventIdT eventId = 0;
ClNameT publisherName = {sizeof(CL_EVENT_PUBLISHER_NAME)-1,
CL_EVENT_PUBLISHER_NAME};
ClEventPatternT patterns[5] = {{0}};
ClEventPatternArrayT patternArray = {
0,
CL_SIZEOF_ARRAY(patterns),
patterns
};
rc = clEventAllocate(gPubsEventInfo.channelHandle,
&gPubsEventInfo.eventHandle);
if(CL_OK != rc)
{
clOsalPrintf("clEventAllocate() failed [%#X]\n",rc);
goto failure;
}
patterns[0].patternSize = strlen(CL_EO_NAME);
patterns[0].pPattern = (ClUint8T *)CL_EO_NAME;
patterns[1].patternSize = sizeof(libId);
patterns[1].pPattern = (ClUint8T *)&libId;
patterns[2].patternSize = sizeof(wmId);
patterns[2].pPattern = (ClUint8T *)&wmId;
patterns[3].patternSize = sizeof(wmFlag);
patterns[3].pPattern = (ClUint8T *)&wmFlag;
patterns[4].patternSize = sizeof(wmValue);
patterns[4].pPattern = (ClUint8T *)(&wmValue);
rc = clEventAttributesSet(gPubsEventInfo.eventHandle, &patternArray,
CL_EVENT_HIGHEST_PRIORITY, 0, &publisherName);
if(CL_OK != rc)
{
clOsalPrintf("clEventAttributesSet() failed [%#X]\n",rc);
goto event_allocated;
}
rc = clEventPublish(gPubsEventInfo.eventHandle,
"Event Payload passed in endian neutral way",
sizeof("Event Payload passed in endian neutral way.")-1,
&eventId);
if(CL_OK != rc)
{
clOsalPrintf("clEventPublish() failed [%#X]\n",rc);
goto event_allocated;
}
event_allocated:
rc = clEventFree(gPubsEventInfo.eventHandle);
if(CL_OK != rc)
{
clOsalPrintf("clEventFree() failed [%#X]\n",rc);
}
failure:
return rc;
}

Like in the subscriber the publisher needs to do the Event Library Initialize as suggested before. In publisher we open the channel asynchronously, the callback for which is as below:

void clPubsAsyncChannelOpenCb(ClInvocationT invocation,
ClEventChannelHandleT channelHandle, ClRcT retCode)
{
ClRcT rc = CL_OK;
clOsalPrintf("*******************************************************\n");
clOsalPrintf("************* Async Channel Open Callback *************\n");
clOsalPrintf("*******************************************************\n");
clOsalPrintf(" Invocation : %#X\n", invocation);
clOsalPrintf(" Channel Handle : %p\n", (ClPtrT)channelHandle);
clOsalPrintf(" API Return Code : %#X\n", retCode);
clOsalPrintf("*******************************************************\n");
/*
Check if the Channel Open Asyn was successful
*/
if(CL_OK != retCode)
{
clOsalPrintf("clEventChannelOpenAsync() failed [%#X]\n",rc);
goto failure;
}
gPubsEventInfo.channelHandle = channelHandle;
rc = clPubsTriggerEvent(CL_EO_LIB_ID_HEAP, CL_WM_HIGH, CL_WM_HIGH_LIMIT,
CL_WM_LOW_LIMIT);
if(CL_OK != rc)
{
clOsalPrintf("Publish trigger failed [%#X]\n",rc);
goto channel_opened;
}
channel_opened:
clEventChannelClose(gPubsEventInfo.channelHandle);
failure:
return;
}

Much of the code is similar between the subscriber and publisher as can be seen below. The only difference being a publisher may optionally choose to be a subscriber also in which case it will need to register an event delivery callback. Note that event publish related API that appear in channel open callback above would appear right after the channel open if it were synchrounous. The event can be allocated each time it is published or it can be re-used with special care taken about protection in multi threaded environment when the event handle is shared between the threads.

typedef struct ClPubsEventInfo
{
ClEventInitHandleT initHandle;
ClEventChannelHandleT channelHandle;
ClEventHandleT eventHandle;
} ClPubsEventInfoT;
static ClPubsEventInfoT gPubsEventInfo;
static ClRcT clPubsEventLibrayInitialize(void)
{
ClRcT rc = CL_OK;
ClNameT channelName = {sizeof(CL_EO_EVENT_CHANNEL_NAME)-1,
const ClEventCallbacksT evtCallbacks =
{
clPubsAsyncChannelOpenCb, // Can be NULL if sync call is used
NULL, // Since it is not a subscriber
};
// To identify the callback
ClInvocationT invocation = UNIQUE_INVOCATION_ID;
rc = clEventInitialize(&gPubsEventInfo.initHandle,
&evtCallbacks, &version);
if(CL_OK != rc)
{
clOsalPrintf("clEventInitialize() failed [%#X]\n",rc);
goto failure;
}
rc = clEventChannelOpenAsync(gPubsEventInfo.initHandle, invocation,
&channelName, evtFlags);
if(CL_OK != rc)
{
clOsalPrintf("clEventChannelOpen() failed [%#X]\n",rc);
goto init_done;
}
return CL_OK;
init_done:
clEventFinalize(gPubsEventInfo.initHandle);
failure:
return rc;
}
static ClRcT clPubsEventLibrayFinalize(void)
{
ClRcT rc = CL_OK;
rc = clEventChannelClose(gPubsEventInfo.channelHandle);
if(CL_OK != rc)
{
clOsalPrintf("clEventChannelClose() failed [%#X]\n",rc);
}
rc = clEventFinalize(gPubsEventInfo.initHandle);
if(CL_OK != rc)
{
clOsalPrintf("clEventFinalize() failed [%#X]\n",rc);
}
return CL_OK;
}

Generated on Tue Jan 10 10:29:15 PST 2012 for OpenClovis SDK using Doxygen