Message AMGI Message Format

The Message AMGI sub-specification outlines how messages are sent, and received within AMGI.

It is deliberately designed to be agnostic where possible. Terminology is taken from AsyncAPI so as to follow their agnosticism.

A simple implementation would be:

async def app(scope, receive, send):
    if scope["type"] == "message":
        try:
            headers = scope["headers"]
            payload = scope.get("payload")
            bindings = scope.get("bindings", {})
            ...  # Do some message handling here!
            await send(
                {
                    "type": "message.ack",
                }
            )
        except Exception as e:
            await send(
                {
                    "type": "message.nack",
                    "message": str(e),
                }
            )
    else:
        pass  # Handle other types

Message

A message has a single message scope. Your application will be called once per message.

The message scope information passed in scope contains:

  • scope["type"] (Literal['message'])

  • scope["amgi"]["spec_version"] (str) – Version of the AMGI message spec this server understands-

  • scope["amgi"]["version"] (Literal['2.0']) – Version of the AMGI spec

  • scope["address"] (str) – The address of the batch of messages, for example, in Kafka this would be the topic

  • scope["headers"] (Sequence[tuple[bytes, bytes]]) – Includes the headers of the message

  • scope["payload"] (NotRequired[Optional[bytes]]) – Payload of the message, which can be None or bytes. If missing, it defaults to None

  • scope["bindings"] (NotRequired[dict[str, dict[str, Any]]]) – Protocol specific bindings, for example, when receiving a Kafka message the bindings could include the key: {"kafka": {"key": b"key"}}

  • scope["state"] (NotRequired[dict[str, Any]]) – A copy of the namespace passed into the lifespan corresponding to this batch. Optional; if missing the server does not support this feature.

  • scope["extensions"] (NotRequired[dict[str, dict[str, Any]]]) – Extensions allow AMGI servers to advertise optional capabilities to applications. Extensions are provided via scope and are opt-in: applications MUST assume an extension is unsupported unless it is explicitly present.

Response message ack - send() event

Sent by the application to signify that it has successfully acknowledged a message.

Keys:

  • message["type"] (Literal['message.ack'])

Response message nack - send() event

Sent by the application to signify that it could not process a message.

Keys:

  • message["type"] (Literal['message.nack'])

  • message["message"] (str)

Response message send - send() event

Sent by the application to send a message. If the server fails to send the message, the server should raise a server-specific subclass of OSError.

Keys:

  • message["type"] (Literal['message.send'])

  • message["address"] (str) – Address to send the message to

  • message["headers"] (Sequence[tuple[bytes, bytes]]) – Headers of the message

  • message["payload"] (NotRequired[Optional[bytes]]) – Payload of the message, which can be None, or bytes. If missing, it defaults to None.

  • message["bindings"] (NotRequired[dict[str, dict[str, Any]]]) – Protocol specific bindings to send. This can be bindings for multiple protocols, allowing the server to decide to handle them, or ignore them.

Bindings Object

Both "message.receive", and "message.send" can contain a bindings object. These are defined as per protocol specifications.