Manage message processing and idempotency
The AccelByte Gaming Services (AGS) Extend Event Handler app's messaging system employs the Kafka ecosystem to ensure robustness, scalability, and fault-tolerant solution of message delivery. Extend depends on the "at least once" delivery semantics, which means the Event Handler app is guaranteed to receive the same message at least once. This ensures that no messages are lost, even if exceptions or other errors occur from the Event Handler or the network, by attempting to redeliver the messages.
Extend uses Kafka Connect to subscribe to Kafka topics, pull data, and transform messages into Event Handler gRPC invocations. To improve network throughput, Kafka Connect polls and processes messages in batches (up to a configurable limit). If a batch fails, it is retried as a whole. Consequently, the Event Handler must be idempotent to ensure that processing of the same message more than once is permitted or can be handled smoothly.
Depending on the use case, there are situations where performing an action multiple times is acceptable, while in others it is not, such as granting 100 XP more than once due to potential message replay. The simplest way to solve idempotency is to track the completed processing of each message uniquely via a unique identifier like the event ID and via a distributed database (e.g., Cloud Save), which will undo any failure during the processing and discard any subsequent duplicates found.
Exceptions and error handling
The only method to guarantee the messages are delivered and fully processed by the Event Handler is to receive acknowledgement from it. Without acknowledgements from the Event Handler, Kafka Connect will attempt the redelivery of the entire batch depending on the gRPC code returned by the Event Handler or the gRPC framework on a set of deemed "Retriable" status codes.
To signal successes or failures and retry attempts (the entire batch of messages will be retried as a whole), it's important to catch and handle all possible exceptions and only throw with a "Retriable" gRPC status code if the message cannot be processed and message retry is desired.
The retriable gRPC status codes from the gRPC Event Handler are as follows:
- UNAVAILABLE: For example, the Event Handler gRPC Server isn't up.
- RESOURCE_EXHAUSTED: For example, the server is temporarily out of resources.
- INTERNAL: For example, an internal server error due to custom logic issues.
- UNKNOWN: For example, the Event Handler app throws an exception or executes an action other than returning a status code when ending the gRPC.
- Python
- C#
- go
- Java
def check_entitlement(self, user_id: str, item_id: str) -> bool:
result, error = platform_service.query_fulfillment_histories(
status="SUCCESS",
user_id=user_id,
)
if not error and result.data:
for fulfillment_history_info in result.data:
if (
hasattr(fulfillment_history_info, "granted_item_ids") and
isinstance(fulfillment_history_info.granted_item_ids, list) and
item_id in fulfillment_history_info.granted_item_ids
):
return True
return False
def grant_entitlement(
self, user_id: str, item_id: str, count: int,
) -> Optional[Union[platform_models.ErrorEntity, HttpResponse]]:
result, error = platform_service.fulfill_item(
user_id=user_id,
body=platform_models.FulfillmentRequest.create(
quantity=count,
item_id=item_id,
source=platform_models.FulfillmentRequestSourceEnum.REWARD,
)
)
if error:
return error
if len(result.entitlement_summaries) <= 0:
raise Exception("could not grant item to user")
return None
async def OnMessage(self, request: UserLoggedIn, context):
self.log_payload(f"{self.OnMessage.__name__} request: %s", request)
item_id = self.item_id_to_grant
user_id = request.userId
has_entitlement = self.check_entitlement(user_id=user_id, item_id=item_id)
if has_entitlement:
if self.logger:
self.logger.info("user already has the item, skipping...")
return Empty()
error = self.grant_entitlement(request.userId, self.item_id_to_grant, 1)
if error:
error_msg = str(error.to_dict()) if isinstance(error, platform_models.ErrorEntity) else str(error)
await context.abort(
grpc.StatusCode.INTERNAL, f"could not grant item {item_id} to user {user_id}: {error_msg}",
) # INTERNAL to retry
response = Empty()
self.log_payload(f"{self.OnMessage.__name__} response: %s", response)
return response
bool allowFulfillment = true;
var fulfillmentCheck = _ABProvider.Sdk.Platform.Fulfillment.QueryFulfillmentHistoriesOp
.SetUserId(request.UserId)
.SetLimit(10)
.SetOffset(0)
.SetStatus(QueryFulfillmentHistoriesStatus.SUCCESS)
.Execute(targetNamespace);
if (fulfillmentCheck != null && fulfillmentCheck.Data != null)
{
foreach (var fulfillmentItem in fulfillmentCheck.Data)
{
if (fulfillmentItem.GrantedItemIds != null)
{
if (fulfillmentItem.GrantedItemIds.IndexOf(_ABProvider.ItemIdToGrant) > -1)
{
//item is already fulfilled for specified user
allowFulfillment = false;
break;
}
}
}
}
if (allowFulfillment)
{
var fulfillmentResponse = _ABProvider.Sdk.Platform.Fulfillment.FulfillItemOp
.SetBody(new FulfillmentRequest()
{
ItemId = _ABProvider.ItemIdToGrant,
Quantity = 1,
Source = FulfillmentRequestSource.REWARD
})
.Execute(targetNamespace, request.UserId);
if (fulfillmentResponse != null)
{
foreach (var entitlementItem in fulfillmentResponse.EntitlementSummaries!)
_Logger.LogInformation($"EntitlementId: {entitlementItem.Id!}");
}
}
func (o *LoginHandler) checkEntitlement(userID string, itemID string) (bool, error) {
namespace := common.GetEnv("AB_NAMESPACE", "accelbyte")
// Query fulfillment histories
statusHistory := platformclientmodels.RequestHistoryStatusSUCCESS
fulfillmentHistories, err := o.fulfillment.QueryFulfillmentHistoriesShort(&fulfillment.QueryFulfillmentHistoriesParams{
Namespace: namespace,
UserID: &userID,
Status: &statusHistory,
})
if err != nil {
return false, err
}
// Check if item has been granted
if fulfillmentHistories != nil && fulfillmentHistories.Data != nil {
for _, history := range fulfillmentHistories.Data {
if history.GrantedItemIds != nil && len(history.GrantedItemIds) > 0 {
for _, grantedItemID := range history.GrantedItemIds {
if grantedItemID == itemID {
return true, nil
}
}
}
}
}
return false, nil
}
func (o *LoginHandler) grantEntitlement(userID string, itemID string, count int32) error {
namespace := common.GetEnv("AB_NAMESPACE", "accelbyte")
// Fulfill the item
fulfillmentResponse, err := o.fulfillment.FulfillItemShort(&fulfillment.FulfillItemParams{
Namespace: namespace,
UserID: userID,
Body: &platformclientmodels.FulfillmentRequest{
ItemID: itemID,
Quantity: &count,
Source: platformclientmodels.EntitlementGrantSourceREWARD,
},
})
if err != nil {
return err
}
if fulfillmentResponse == nil || fulfillmentResponse.EntitlementSummaries == nil || len(fulfillmentResponse.EntitlementSummaries) <= 0 {
return status.Errorf(codes.Internal, "could not grant item to user")
}
return nil
}
func (o *LoginHandler) OnMessage(ctx context.Context, msg *pb.UserLoggedIn) (*emptypb.Empty, error) {
scope := common.GetScopeFromContext(ctx, "LoginHandler.OnMessage")
defer scope.Finish()
if itemIdToGrant == "" {
return &emptypb.Empty{}, status.Errorf(codes.Internal, "Required envar ITEM_ID_TO_GRANT is not configured")
}
hasEntitlement, err := o.checkEntitlement(msg.UserId, itemIdToGrant)
if err != nil {
return &emptypb.Empty{}, status.Errorf(codes.Internal, "failed to check entitlement: %v", err)
}
if hasEntitlement {
logrus.Infof("User already has the item, skipping...")
return &emptypb.Empty{}, nil
}
err = o.grantEntitlement(msg.UserId, itemIdToGrant, 1)
if err != nil {
return &emptypb.Empty{}, status.Errorf(codes.Internal, "failed to grant entitlement: %v", err)
}
logrus.Infof("Entitlement granted successfully for user: %v", msg.UserId)
return &emptypb.Empty{}, nil
}
private boolean checkEntitlement(String userId, String itemId) throws Exception {
QueryFulfillmentHistories queryFulfillmentBody = QueryFulfillmentHistories.builder()
.userId(userId)
.limit(20) // for our use case, checking the last 20 fulfillments is sufficient
.offset(0)
.status(QueryFulfillmentHistories.Status.SUCCESS.toString())
.build();
FulfillmentHistoryPagingSlicedResult fulfillmentCheck = fulfillment.queryFulfillmentHistories(queryFulfillmentBody);
if (fulfillmentCheck != null && fulfillmentCheck.getData() != null) {
for (FulfillmentHistoryInfo fulfillmentItem : fulfillmentCheck.getData()) {
if (fulfillmentItem.getGrantedItemIds() != null) {
if (fulfillmentItem.getGrantedItemIds().indexOf(itemId) > -1) {
// item is already fulfilled for specified user
return true;
}
}
}
}
return false;
}
private void grantEntitlement(String userId, String itemId) throws Exception {
FulfillmentRequest body = FulfillmentRequest.builder()
.itemId(itemId)
.quantity(1)
.source(EntitlementGrant.Source.REWARD.name())
.build();
FulfillItem fulfillItemParam = FulfillItem.builder()
.namespace(namespace)
.userId(userId)
.body(body)
.build();
FulfillmentResult fulfillmentResult = fulfillment.fulfillItem(fulfillItemParam);
if (fulfillmentResult != null) {
for (EntitlementSummary entitlementItem : fulfillmentResult.getEntitlementSummaries()) {
log.info("entitlementId: {}", entitlementItem.getId());
}
}
}
@Override
public void onMessage(UserLoggedIn request, StreamObserver<Empty> responseObserver) {
log.info("received a message: {}", request);
String userId = request.getUserId();
try {
boolean hasEntitlement = checkEntitlement(userId, itemIdToGrant);
if (hasEntitlement) {
log.info("Item {} is already fulfilled, skipping", itemIdToGrant);
} else {
grantEntitlement(userId, itemIdToGrant);
}
responseObserver.onNext(Empty.getDefaultInstance());
responseObserver.onCompleted();
} catch (Exception e) {
final String desc = String.format("could not grant item %s to user %s", itemIdToGrant, userId);
log.error(desc, e);
responseObserver.onError(Status.INTERNAL.withDescription(desc).asException()); // INTERNAL to retry
}
}
The message batch retry duration is indefinite, with each batch retry interval set at 30 seconds. When all gRPC calls receive non-retriable return codes, the entire batch offset will be committed to Kafka.
Reference
For more information about processing guarantee and idempotency using the "at least once" semantic, see Kafka Connect's official documentation on message delivery guarantees.