Message AMGI Message Format¶
The Message AMGI sub-specification outlines how messages are sent, and received within AMGI.
It is deliberately designed to be agnostic where possible. Terminology is taken from AsyncAPI so as to follow their agnosticism.
A simple implementation would be:
async def app(scope, receive, send):
if scope["type"] == "message":
try:
headers = scope["headers"]
payload = scope.get("payload")
bindings = scope.get("bindings", {})
... # Do some message handling here!
await send(
{
"type": "message.ack",
}
)
except Exception as e:
await send(
{
"type": "message.nack",
"message": str(e),
}
)
else:
pass # Handle other types
Message¶
A message has a single message scope. Your application will be called once per message.
The message scope information passed in scope contains:
scope["type"](Literal['message'])scope["amgi"]["spec_version"](str) – Version of the AMGI message spec this server understands-scope["amgi"]["version"](Literal['2.0']) – Version of the AMGI specscope["address"](str) – The address of the batch of messages, for example, in Kafka this would be the topicscope["headers"](Sequence[tuple[bytes,bytes]]) – Includes the headers of the messagescope["payload"](NotRequired[Optional[bytes]]) – Payload of the message, which can beNoneorbytes. If missing, it defaults toNonescope["bindings"](NotRequired[dict[str,dict[str,Any]]]) – Protocol specific bindings, for example, when receiving a Kafka message the bindings could include the key:{"kafka": {"key": b"key"}}scope["state"](NotRequired[dict[str,Any]]) – A copy of the namespace passed into the lifespan corresponding to this batch. Optional; if missing the server does not support this feature.scope["extensions"](NotRequired[dict[str,dict[str,Any]]]) – Extensions allow AMGI servers to advertise optional capabilities to applications. Extensions are provided via scope and are opt-in: applications MUST assume an extension is unsupported unless it is explicitly present.
Response message ack - send() event¶
Sent by the application to signify that it has successfully acknowledged a message.
Keys:
message["type"](Literal['message.ack'])
Response message nack - send() event¶
Sent by the application to signify that it could not process a message.
Keys:
Response message send - send() event¶
Sent by the application to send a message. If the server fails to send the message, the server should raise a
server-specific subclass of OSError.
Keys:
message["type"](Literal['message.send'])message["address"](str) – Address to send the message tomessage["headers"](Sequence[tuple[bytes,bytes]]) – Headers of the messagemessage["payload"](NotRequired[Optional[bytes]]) – Payload of the message, which can beNone, orbytes. If missing, it defaults toNone.message["bindings"](NotRequired[dict[str,dict[str,Any]]]) – Protocol specific bindings to send. This can be bindings for multiple protocols, allowing the server to decide to handle them, or ignore them.
Bindings Object¶
Both "message.receive", and "message.send" can contain a bindings object. These are defined as per protocol
specifications.