Skip to main content

Manage message processing and idempotency

Last updated on October 24, 2024

The AccelByte Gaming Services (AGS) Extend Event Handler app's messaging system employs the Kafka ecosystem to ensure robustness, scalability, and fault-tolerant solution of message delivery. Extend depends on the "at least once" delivery semantics, which means the Event Handler app is guaranteed to receive the same message at least once. This ensures that no messages are lost, even if exceptions or other errors occur from the Event Handler or the network, by attempting to redeliver the messages.

Extend uses Kafka Connect to subscribe to Kafka topics, pull data, and transform messages into Event Handler gRPC invocations. To improve network throughput, Kafka Connect polls and processes messages in batches (up to a configurable limit). If a batch fails, it is retried as a whole. Consequently, the Event Handler must be idempotent to ensure that processing of the same message more than once is permitted or can be handled smoothly.

Depending on the use case, there are situations where performing an action multiple times is acceptable, while in others it is not, such as granting 100 XP more than once due to potential message replay. The simplest way to solve idempotency is to track the completed processing of each message uniquely via a unique identifier like the event ID and via a distributed database (e.g., Cloud Save), which will undo any failure during the processing and discard any subsequent duplicates found.

Exceptions and error handling

The only method to guarantee the messages are delivered and fully processed by the Event Handler is to receive acknowledgement from it. Without acknowledgements from the Event Handler, Kafka Connect will attempt the redelivery of the entire batch depending on the gRPC code returned by the Event Handler or the gRPC framework on a set of deemed "Retriable" status codes.

To signal successes or failures and retry attempts (the entire batch of messages will be retried as a whole), it's important to catch and handle all possible exceptions and only throw with a "Retriable" gRPC status code if the message cannot be processed and message retry is desired.

The retriable gRPC status codes from the gRPC Event Handler are as follows:

  • UNAVAILABLE: For example, the Event Handler gRPC Server isn't up.
  • RESOURCE_EXHAUSTED: For example, the server is temporarily out of resources.
  • INTERNAL: For example, an internal server error due to custom logic issues.
  • UNKNOWN: For example, the Event Handler app throws an exception or executes an action other than returning a status code when ending the gRPC.
def check_entitlement(self, user_id: str, item_id: str) -> bool:
result, error = platform_service.query_fulfillment_histories(
status="SUCCESS",
user_id=user_id,
)
if not error and result.data:
for fulfillment_history_info in result.data:
if (
hasattr(fulfillment_history_info, "granted_item_ids") and
isinstance(fulfillment_history_info.granted_item_ids, list) and
item_id in fulfillment_history_info.granted_item_ids
):
return True
return False

def grant_entitlement(
self, user_id: str, item_id: str, count: int,
) -> Optional[Union[platform_models.ErrorEntity, HttpResponse]]:
result, error = platform_service.fulfill_item(
user_id=user_id,
body=platform_models.FulfillmentRequest.create(
quantity=count,
item_id=item_id,
source=platform_models.FulfillmentRequestSourceEnum.REWARD,
)
)
if error:
return error
if len(result.entitlement_summaries) <= 0:
raise Exception("could not grant item to user")
return None

async def OnMessage(self, request: UserLoggedIn, context):
self.log_payload(f"{self.OnMessage.__name__} request: %s", request)

item_id = self.item_id_to_grant
user_id = request.userId

has_entitlement = self.check_entitlement(user_id=user_id, item_id=item_id)
if has_entitlement:
if self.logger:
self.logger.info("user already has the item, skipping...")
return Empty()

error = self.grant_entitlement(request.userId, self.item_id_to_grant, 1)
if error:
error_msg = str(error.to_dict()) if isinstance(error, platform_models.ErrorEntity) else str(error)
await context.abort(
grpc.StatusCode.INTERNAL, f"could not grant item {item_id} to user {user_id}: {error_msg}",
) # INTERNAL to retry

response = Empty()
self.log_payload(f"{self.OnMessage.__name__} response: %s", response)
return response

The message batch retry duration is indefinite, with each batch retry interval set at 30 seconds. When all gRPC calls receive non-retriable return codes, the entire batch offset will be committed to Kafka.

Reference

For more information about processing guarantee and idempotency using the "at least once" semantic, see Kafka Connect's official documentation on message delivery guarantees.