强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

RTMP 协议精讲 / 05 - AMF 编码与命令

AMF 编码与命令

5.1 AMF 概述

AMF(Action Message Format,动作消息格式)是 RTMP 协议中用于序列化数据的二进制编码格式。它最初是 Adobe Flash 平台的数据交换格式,类似于 JSON 或 Protocol Buffers,但专门为实时通信优化。

AMF 的角色

应用层(开发者视角):
  connect("rtmp://live.example.com/app", "stream1")

         ↓ AMF 编码

协议层(网络传输):
  [02 00 07] [63 6F 6E 6E 65 63 74]  ← "connect" (AMF String)
  [02 00 24] [72 74 6D 70 3A 2F ...]  ← "rtmp://..." (AMF String)

         ↓ 块流层拆分

传输层(TCP):
  Chunk #1 | Chunk #2 | Chunk #3 | ...

AMF 版本

版本 引入时间 特点 消息类型
AMF0 Flash MX (2002) 基础类型,兼容性好 Type 18/20
AMF3 Flash 9 (2006) 支持更多类型,更紧凑 Type 15/17

5.2 AMF0 数据类型

AMF0 使用类型标记(Type Marker)+ 值的格式:

标记 类型 说明
0x00 0 Number 8 字节 IEEE 754 双精度浮点数
0x01 1 Boolean 0x00=false, 0x01=true
0x02 2 String 2 字节长度 + UTF-8 字符串
0x03 3 Object 键值对集合(ECMA Array)
0x04 4 MovieClip 保留(Flash 特有)
0x05 5 Null 空值(无附加数据)
0x06 6 Undefined 未定义(无附加数据)
0x07 7 Reference 对象引用(2 字节索引)
0x08 8 ECMA Array 关联数组(4 字节计数 + 键值对)
0x0A 10 Strict Array 索引数组(4 字节计数 + 值)
0x0B 11 Date 日期(8 字节毫秒 + 2 字节时区)
0x0C 12 Long String 长字符串(4 字节长度)
0x0D 13 Unsupported 不支持的类型
0x0E 14 Recordset 保留
0x0F 15 XML Document XML 文档
0x10 16 Typed Object 自定义类型对象
0x11 17 AMF3 Switch 切换到 AMF3 编码

5.3 AMF0 编码实现

#!/usr/bin/env python3
"""
AMF0 Encoder/Decoder
完整的 AMF0 编解码实现
"""

import struct
from datetime import datetime, timezone


class AMF0:
    """AMF0 类型标记常量"""
    NUMBER = 0x00
    BOOLEAN = 0x01
    STRING = 0x02
    OBJECT = 0x03
    NULL = 0x05
    UNDEFINED = 0x06
    ECMA_ARRAY = 0x08
    OBJECT_END = 0x09  # Object 结束标记 (0x00 0x00 0x09)
    STRICT_ARRAY = 0x0A
    DATE = 0x0B
    LONG_STRING = 0x0C
    XML_DOCUMENT = 0x0F
    TYPED_OBJECT = 0x10
    AMF3_SWITCH = 0x11


class AMF0Encoder:
    """AMF0 编码器"""

    def encode(self, *values) -> bytes:
        """编码多个 AMF0 值"""
        result = b''
        for value in values:
            result += self._encode_value(value)
        return result

    def _encode_value(self, value) -> bytes:
        """编码单个值"""
        if value is None:
            return bytes([AMF0.NULL])
        elif isinstance(value, bool):
            return self._encode_boolean(value)
        elif isinstance(value, (int, float)):
            return self._encode_number(float(value))
        elif isinstance(value, str):
            return self._encode_string(value)
        elif isinstance(value, dict):
            return self._encode_object(value)
        elif isinstance(value, list):
            return self._encode_strict_array(value)
        elif isinstance(value, datetime):
            return self._encode_date(value)
        else:
            return bytes([AMF0.UNDEFINED])

    def _encode_number(self, value: float) -> bytes:
        return bytes([AMF0.NUMBER]) + struct.pack('>d', value)

    def _encode_boolean(self, value: bool) -> bytes:
        return bytes([AMF0.BOOLEAN, 0x01 if value else 0x00])

    def _encode_string(self, value: str) -> bytes:
        encoded = value.encode('utf-8')
        if len(encoded) <= 0xFFFF:
            return bytes([AMF0.STRING]) + struct.pack('>H', len(encoded)) + encoded
        else:
            return bytes([AMF0.LONG_STRING]) + struct.pack('>I', len(encoded)) + encoded

    def _encode_object(self, value: dict) -> bytes:
        result = bytes([AMF0.OBJECT])
        for key, val in value.items():
            key_bytes = key.encode('utf-8')
            result += struct.pack('>H', len(key_bytes)) + key_bytes
            result += self._encode_value(val)
        result += b'\x00\x00' + bytes([AMF0.OBJECT_END])
        return result

    def _encode_strict_array(self, value: list) -> bytes:
        result = bytes([AMF0.STRICT_ARRAY]) + struct.pack('>I', len(value))
        for item in value:
            result += self._encode_value(item)
        return result

    def _encode_date(self, value: datetime) -> bytes:
        timestamp_ms = int(value.timestamp() * 1000)
        timezone_offset = 0  # UTC
        return (bytes([AMF0.DATE]) +
                struct.pack('>d', float(timestamp_ms)) +
                struct.pack('>h', timezone_offset))


class AMF0Decoder:
    """AMF0 解码器"""

    def __init__(self, data: bytes, offset: int = 0):
        self.data = data
        self.offset = offset

    def decode(self) -> list:
        """解码所有值,返回值列表"""
        results = []
        while self.offset < len(self.data):
            try:
                value = self._decode_value()
                results.append(value)
            except (struct.error, IndexError):
                break
        return results

    def _decode_value(self):
        """解码单个值"""
        if self.offset >= len(self.data):
            return None

        marker = self.data[self.offset]
        self.offset += 1

        if marker == AMF0.NUMBER:
            return self._decode_number()
        elif marker == AMF0.BOOLEAN:
            return self._decode_boolean()
        elif marker == AMF0.STRING:
            return self._decode_string()
        elif marker == AMF0.OBJECT:
            return self._decode_object()
        elif marker == AMF0.NULL:
            return None
        elif marker == AMF0.UNDEFINED:
            return None
        elif marker == AMF0.ECMA_ARRAY:
            return self._decode_ecma_array()
        elif marker == AMF0.STRICT_ARRAY:
            return self._decode_strict_array()
        elif marker == AMF0.DATE:
            return self._decode_date()
        elif marker == AMF0.LONG_STRING:
            return self._decode_long_string()
        elif marker == AMF0.AMF3_SWITCH:
            return self._decode_value()  # 递归解码后续值
        else:
            return f'[Unknown marker: 0x{marker:02x}]'

    def _decode_number(self) -> float:
        value = struct.unpack('>d', self.data[self.offset:self.offset+8])[0]
        self.offset += 8
        return value

    def _decode_boolean(self) -> bool:
        value = self.data[self.offset] != 0
        self.offset += 1
        return value

    def _decode_string(self) -> str:
        length = struct.unpack('>H', self.data[self.offset:self.offset+2])[0]
        self.offset += 2
        value = self.data[self.offset:self.offset+length].decode('utf-8')
        self.offset += length
        return value

    def _decode_long_string(self) -> str:
        length = struct.unpack('>I', self.data[self.offset:self.offset+4])[0]
        self.offset += 4
        value = self.data[self.offset:self.offset+length].decode('utf-8')
        self.offset += length
        return value

    def _decode_object(self) -> dict:
        result = {}
        while self.offset < len(self.data):
            # 检查结束标记
            if (self.data[self.offset] == 0 and
                self.data[self.offset+1] == 0 and
                self.data[self.offset+2] == AMF0.OBJECT_END):
                self.offset += 3
                break

            key = self._decode_string_key()
            value = self._decode_value()
            result[key] = value
        return result

    def _decode_string_key(self) -> str:
        length = struct.unpack('>H', self.data[self.offset:self.offset+2])[0]
        self.offset += 2
        value = self.data[self.offset:self.offset+length].decode('utf-8')
        self.offset += length
        return value

    def _decode_ecma_array(self) -> dict:
        count = struct.unpack('>I', self.data[self.offset:self.offset+4])[0]
        self.offset += 4
        # ECMA Array 使用与 Object 相同的键值对格式
        return self._decode_object()

    def _decode_strict_array(self) -> list:
        count = struct.unpack('>I', self.data[self.offset:self.offset+4])[0]
        self.offset += 4
        result = []
        for _ in range(count):
            result.append(self._decode_value())
        return result

    def _decode_date(self) -> datetime:
        timestamp_ms = struct.unpack('>d', self.data[self.offset:self.offset+8])[0]
        self.offset += 8
        timezone_offset = struct.unpack('>h', self.data[self.offset:self.offset+2])[0]
        self.offset += 2
        return datetime.fromtimestamp(timestamp_ms / 1000, tz=timezone.utc)

5.4 AMF3 数据类型

AMF3 引入了 引用机制(引用池),可以更高效地编码重复的对象和字符串:

标记 类型 说明
0x00 0 Undefined
0x01 1 Null
0x02 2 Boolean False
0x03 3 Boolean True
0x04 4 Integer 可变长度 29 位有符号整数
0x05 5 Double 8 字节浮点数
0x06 6 String 引用或 UTF-8 字符串
0x07 7 XML Document
0x08 8 Date
0x09 9 Array
0x0A 10 Object
0x0B 11 XML
0x0C 12 ByteArray

AMF3 Integer 编码

AMF3 使用 可变长度编码(Variable-Length Encoding)表示整数:

规则:
- 有效位为 29 位(0 ~ 2^28-1)
- 每字节的最高位为"继续位"
- 大端序

示例:
值 0x03:
  0x03                                              → 0x03

值 0x80 (128):
  0x81 0x00                                         → 1 0000001 0000000
                                                     = 0b10000000 = 128

值 0x20000 (131072):
  0x84 0x80 0x00                                    → 0b100 0000000 0000000
                                                     = 131072
def amf3_read_integer(data: bytes, offset: int) -> tuple:
    """读取 AMF3 可变长度整数,返回 (value, new_offset)"""
    result = 0
    b = data[offset]
    offset += 1

    if b < 0x80:
        return b, offset

    result = (b & 0x7F) << 7
    b = data[offset]; offset += 1
    if b < 0x80:
        return result | b, offset

    result = (result | (b & 0x7F)) << 7
    b = data[offset]; offset += 1
    if b < 0x80:
        return result | b, offset

    result = (result | (b & 0x7F)) << 8
    b = data[offset]; offset += 1
    return result | b, offset


def amf3_write_integer(value: int) -> bytes:
    """写入 AMF3 可变长度整数"""
    if value < 0x80:
        return bytes([value])
    elif value < 0x4000:
        return bytes([0x80 | (value >> 7), value & 0x7F])
    elif value < 0x200000:
        return bytes([0x80 | (value >> 14), 0x80 | ((value >> 7) & 0x7F), value & 0x7F])
    else:
        return bytes([0x80 | (value >> 22), 0x80 | ((value >> 15) & 0x7F),
                       0x80 | ((value >> 8) & 0x7F), value & 0xFF])

5.5 RTMP 命令消息

命令消息使用 AMF 编码,通过消息类型 20(AMF0 Command)或 17(AMF3 Command)传输。

命令消息结构

AMF0 Command (Type 20) Body 结构:
┌──────────────────────────────────────────────────┐
│  Command Name (AMF0 String)     ← "connect"      │
│  Transaction ID (AMF0 Number)   ← 1.0            │
│  Command Object (AMF0 Object)   ← {app, tcUrl}   │
│  Optional Args (AMF0 Value...)  ← ...             │
└──────────────────────────────────────────────────┘

5.5.1 connect 命令

客户端发送 connect 命令建立与应用程序的连接:

def create_connect_command(
    app: str,
    tc_url: str,
    flash_ver: str = "FMLE/3.0",
    fpad: bool = False,
    capabilities: int = 239,
    audio_codecs: int = 3575,
    video_codecs: int = 252,
    video_function: int = 1,
    object_encoding: int = 0,
    transaction_id: float = 1.0
) -> list:
    """构造 connect 命令的 AMF 值列表"""
    return [
        "connect",                # Command Name
        transaction_id,           # Transaction ID
        {                         # Command Object
            "app": app,
            "flashVer": flash_ver,
            "fpad": fpad,
            "capabilities": capabilities,
            "audioCodecs": audio_codecs,
            "videoCodecs": video_codecs,
            "videoFunction": video_function,
            "objectEncoding": object_encoding,
            "tcUrl": tc_url,
        }
    ]

connect 命令的 Command Object 字段

字段 类型 说明
app String 应用名称,如 “live”
flashVer String Flash Player 版本
fpad Boolean 是否支持代理
capabilities Number 客户端能力标志
audioCodecs Number 支持的音频编码
videoCodecs Number 支持的视频编码
videoFunction Number 视频功能标志
objectEncoding Number AMF 版本(0=AMF0, 3=AMF3)
tcUrl String RTMP 完整 URL

5.5.2 _result / _error 响应

服务端响应 connect 命令:

def create_connect_result(transaction_id: float) -> list:
    """构造 connect 成功响应"""
    return [
        "_result",                # Command Name
        transaction_id,           # Transaction ID (对应 connect 的 ID)
        {                         # Properties
            "fmsVer": "FMS/3,0,1,123",
            "capabilities": 31,
        },
        {                         # Information
            "level": "status",
            "code": "NetConnection.Connect.Success",
            "description": "Connection succeeded.",
            "objectEncoding": 0,
        }
    ]


def create_connect_error(transaction_id: float, code: str, desc: str) -> list:
    """构造 connect 错误响应"""
    return [
        "_error",
        transaction_id,
        None,
        {
            "level": "error",
            "code": code,
            "description": desc,
        }
    ]

5.5.3 createStream 命令

创建逻辑流:

def create_create_stream(transaction_id: float) -> list:
    """构造 createStream 命令"""
    return [
        "createStream",
        transaction_id,
        None,  # 无 Command Object
    ]

def create_create_stream_result(transaction_id: float, stream_id: float = 1.0) -> list:
    """构造 createStream 响应"""
    return [
        "_result",
        transaction_id,
        None,
        stream_id,  # 新建的流 ID
    ]

5.5.4 play 命令

请求播放流:

def create_play(
    stream_name: str,
    stream_id: int = 1,
    start: float = -2,
    duration: float = -1,
    reset: bool = True
) -> list:
    """构造 play 命令"""
    return [
        "play",
        0,  # Transaction ID (通常为 0)
        None,
        stream_name,  # 流名称
        start,        # 开始时间 (-2=直播, -1=从头)
        duration,     # 持续时间 (-1=到结束)
        reset,        # 是否重置
    ]

5.5.5 publish 命令

开始推流:

def create_publish(
    stream_name: str,
    publish_type: str = "live"
) -> list:
    """构造 publish 命令"""
    return [
        "publish",
        0,
        None,
        stream_name,    # 流名称
        publish_type,   # "live", "record", "append"
    ]

5.6 远程过程调用(RPC)

RTMP 支持两种 RPC 机制:

5.6.1 NetConnection.call

客户端可以直接调用服务端的方法:

Client → Server (AMF Command, Type 20):
┌────────────────────────────────────────────────┐
│  "call"                          ← 命令名      │
│  transaction_id                  ← 事务 ID     │
│  null                            ← 无对象      │
│  "getServerTime"                 ← 方法名      │
│  arg1, arg2, ...                 ← 参数        │
└────────────────────────────────────────────────┘

5.6.2 RPC 调用

def create_rpc_call(method: str, transaction_id: float, *args) -> list:
    """构造 RPC 调用命令"""
    return ["call", transaction_id, None, method] + list(args)

# 示例:调用服务端的 getServerTime
encoder = AMF0Encoder()
rpc_msg = create_rpc_call("getServerTime", 3.0)
encoded = encoder.encode(*rpc_msg)

5.7 onStatus 事件通知

服务端向客户端发送状态事件:

def create_on_status(
    level: str,
    code: str,
    description: str,
    info: dict = None
) -> list:
    """构造 onStatus 事件"""
    return [
        "onStatus",
        0,
        None,
        {
            "level": level,
            "code": code,
            "description": description,
            **(info or {}),
        }
    ]


# 常见事件代码:
# NetStream.Play.Start    - 开始播放
# NetStream.Play.Stop     - 停止播放
# NetStream.Play.Reset    - 播放重置
# NetStream.Publish.Start - 开始推流
# NetStream.Unpublish.Success - 停止推流
# NetStream.Play.StreamNotFound - 流未找到

5.8 完整命令流程

推流(Publish)完整流程

Client                                      Server
  │                                           │
  │════  握手完成  ════════════════════════════│
  │                                           │
  │─── Set Chunk Size (4096) ───────────────→│
  │─── connect("live") ────────────────────→│  Type 20
  │                                           │
  │←── Window Ack Size ─────────────────────│  Type 5
  │←── Set Peer Bandwidth ──────────────────│  Type 6
  │←── User Control (Stream Begin, 0) ──────│  Type 4
  │←── _result (NetConnection.Connect.Success)│  Type 20
  │                                           │
  │─── createStream() ─────────────────────→│  Type 20
  │←── _result (stream_id=1) ───────────────│  Type 20
  │                                           │
  │─── @setDataFrame (onMetaData) ─────────→│  Type 18
  │─── publish("mystream", "live") ────────→│  Type 20
  │                                           │
  │←── onStatus (Publish.Start) ────────────│  Type 20
  │                                           │
  │─── AAC Sequence Header ────────────────→│  Type 8
  │─── AVC Sequence Header ────────────────→│  Type 9
  │                                           │
  │─── Audio Data ─────────────────────────→│  Type 8
  │─── Video Data ─────────────────────────→│  Type 9
  │─── Audio Data ─────────────────────────→│  Type 8
  │─── Video Data ─────────────────────────→│  Type 9
  │       ... 持续发送 ...                    │

播放(Play)完整流程

Client                                      Server
  │                                           │
  │─── connect("live") ────────────────────→│
  │←── _result ─────────────────────────────│
  │                                           │
  │─── createStream() ─────────────────────→│
  │←── _result (stream_id=1) ───────────────│
  │                                           │
  │─── play("mystream") ───────────────────→│
  │                                           │
  │←── User Control (Stream Begin, 1) ──────│
  │←── onStatus (Play.Reset) ───────────────│
  │←── onStatus (Play.Start) ───────────────│
  │←── RtmpSampleAccess (false, false) ────│  Type 18
  │←── onMetaData ─────────────────────────│  Type 18
  │                                           │
  │←── AAC Sequence Header ─────────────────│  Type 8
  │←── AVC Sequence Header ─────────────────│  Type 9
  │                                           │
  │←── Audio Data ──────────────────────────│  Type 8
  │←── Video Data ──────────────────────────│  Type 9
  │       ... 持续接收 ...                    │

5.9 AMF 编解码测试

def test_amf0_codec():
    """AMF0 编解码测试"""
    encoder = AMF0Encoder()
    decoder = AMF0Decoder

    # 测试 connect 命令
    values = create_connect_command(
        app="live",
        tc_url="rtmp://localhost:1935/live"
    )
    encoded = encoder.encode(*values)
    print(f"Encoded {len(values)} values: {len(encoded)} bytes")
    print(f"Hex: {encoded.hex()}")

    # 解码验证
    d = decoder(encoded)
    decoded = d.decode()
    print(f"Decoded: {decoded}")

    # 验证
    assert decoded[0] == "connect"
    assert decoded[1] == 1.0
    assert decoded[2]["app"] == "live"
    print("✅ AMF0 codec test passed!")


if __name__ == '__main__':
    test_amf0_codec()

注意事项

  1. AMF0 vs AMF3 选择:大多数 RTMP 实现默认使用 AMF0,AMF3 在 Flash Remoting 中更常见
  2. Transaction ID 递增:每个命令的 Transaction ID 应递增,用于匹配请求和响应
  3. Object Encoding:connect 命令中的 objectEncoding 字段告诉服务端客户端使用的 AMF 版本
  4. 字符串编码:AMF0 String 使用 UTF-8,注意不要超过 65535 字节(Long String 例外)
  5. 数字精度:AMF0 Number 是 64 位浮点数,整数不要超过 2^53 以避免精度丢失
  6. onMetaData:此消息中的 metadata 对象不固定,不同编码器/版本可能包含不同字段

扩展阅读


上一章04 - 消息格式 下一章06 - 流操作 — 了解 RTMP 的流生命周期管理