强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

RTMP 协议精讲 / 05 - AMF 编码与命令

AMF 编码与命令

5.1 AMF 概述

AMF(Action Message Format,动作消息格式)是 RTMP 协议中用于序列化数据的二进制编码格式。它最初是 Adobe Flash 平台的数据交换格式,类似于 JSON 或 Protocol Buffers,但专门为实时通信优化。

AMF 的角色

应用层(开发者视角):
  connect("rtmp://live.example.com/app", "stream1")

         ↓ AMF 编码

协议层(网络传输):
  [02 00 07] [63 6F 6E 6E 65 63 74]  ← "connect" (AMF String)
  [02 00 24] [72 74 6D 70 3A 2F ...]  ← "rtmp://..." (AMF String)

         ↓ 块流层拆分

传输层(TCP):
  Chunk #1 | Chunk #2 | Chunk #3 | ...

AMF 版本

版本引入时间特点消息类型
AMF0Flash MX (2002)基础类型,兼容性好Type 18/20
AMF3Flash 9 (2006)支持更多类型,更紧凑Type 15/17

5.2 AMF0 数据类型

AMF0 使用类型标记(Type Marker)+ 值的格式:

标记类型说明
0x000Number8 字节 IEEE 754 双精度浮点数
0x011Boolean0x00=false, 0x01=true
0x022String2 字节长度 + UTF-8 字符串
0x033Object键值对集合(ECMA Array)
0x044MovieClip保留(Flash 特有)
0x055Null空值(无附加数据)
0x066Undefined未定义(无附加数据)
0x077Reference对象引用(2 字节索引)
0x088ECMA Array关联数组(4 字节计数 + 键值对)
0x0A10Strict Array索引数组(4 字节计数 + 值)
0x0B11Date日期(8 字节毫秒 + 2 字节时区)
0x0C12Long String长字符串(4 字节长度)
0x0D13Unsupported不支持的类型
0x0E14Recordset保留
0x0F15XML DocumentXML 文档
0x1016Typed Object自定义类型对象
0x1117AMF3 Switch切换到 AMF3 编码

5.3 AMF0 编码实现

#!/usr/bin/env python3
"""
AMF0 Encoder/Decoder
完整的 AMF0 编解码实现
"""

import struct
from datetime import datetime, timezone


class AMF0:
    """AMF0 类型标记常量"""
    NUMBER = 0x00
    BOOLEAN = 0x01
    STRING = 0x02
    OBJECT = 0x03
    NULL = 0x05
    UNDEFINED = 0x06
    ECMA_ARRAY = 0x08
    OBJECT_END = 0x09  # Object 结束标记 (0x00 0x00 0x09)
    STRICT_ARRAY = 0x0A
    DATE = 0x0B
    LONG_STRING = 0x0C
    XML_DOCUMENT = 0x0F
    TYPED_OBJECT = 0x10
    AMF3_SWITCH = 0x11


class AMF0Encoder:
    """AMF0 编码器"""

    def encode(self, *values) -> bytes:
        """编码多个 AMF0 值"""
        result = b''
        for value in values:
            result += self._encode_value(value)
        return result

    def _encode_value(self, value) -> bytes:
        """编码单个值"""
        if value is None:
            return bytes([AMF0.NULL])
        elif isinstance(value, bool):
            return self._encode_boolean(value)
        elif isinstance(value, (int, float)):
            return self._encode_number(float(value))
        elif isinstance(value, str):
            return self._encode_string(value)
        elif isinstance(value, dict):
            return self._encode_object(value)
        elif isinstance(value, list):
            return self._encode_strict_array(value)
        elif isinstance(value, datetime):
            return self._encode_date(value)
        else:
            return bytes([AMF0.UNDEFINED])

    def _encode_number(self, value: float) -> bytes:
        return bytes([AMF0.NUMBER]) + struct.pack('>d', value)

    def _encode_boolean(self, value: bool) -> bytes:
        return bytes([AMF0.BOOLEAN, 0x01 if value else 0x00])

    def _encode_string(self, value: str) -> bytes:
        encoded = value.encode('utf-8')
        if len(encoded) <= 0xFFFF:
            return bytes([AMF0.STRING]) + struct.pack('>H', len(encoded)) + encoded
        else:
            return bytes([AMF0.LONG_STRING]) + struct.pack('>I', len(encoded)) + encoded

    def _encode_object(self, value: dict) -> bytes:
        result = bytes([AMF0.OBJECT])
        for key, val in value.items():
            key_bytes = key.encode('utf-8')
            result += struct.pack('>H', len(key_bytes)) + key_bytes
            result += self._encode_value(val)
        result += b'\x00\x00' + bytes([AMF0.OBJECT_END])
        return result

    def _encode_strict_array(self, value: list) -> bytes:
        result = bytes([AMF0.STRICT_ARRAY]) + struct.pack('>I', len(value))
        for item in value:
            result += self._encode_value(item)
        return result

    def _encode_date(self, value: datetime) -> bytes:
        timestamp_ms = int(value.timestamp() * 1000)
        timezone_offset = 0  # UTC
        return (bytes([AMF0.DATE]) +
                struct.pack('>d', float(timestamp_ms)) +
                struct.pack('>h', timezone_offset))


class AMF0Decoder:
    """AMF0 解码器"""

    def __init__(self, data: bytes, offset: int = 0):
        self.data = data
        self.offset = offset

    def decode(self) -> list:
        """解码所有值,返回值列表"""
        results = []
        while self.offset < len(self.data):
            try:
                value = self._decode_value()
                results.append(value)
            except (struct.error, IndexError):
                break
        return results

    def _decode_value(self):
        """解码单个值"""
        if self.offset >= len(self.data):
            return None

        marker = self.data[self.offset]
        self.offset += 1

        if marker == AMF0.NUMBER:
            return self._decode_number()
        elif marker == AMF0.BOOLEAN:
            return self._decode_boolean()
        elif marker == AMF0.STRING:
            return self._decode_string()
        elif marker == AMF0.OBJECT:
            return self._decode_object()
        elif marker == AMF0.NULL:
            return None
        elif marker == AMF0.UNDEFINED:
            return None
        elif marker == AMF0.ECMA_ARRAY:
            return self._decode_ecma_array()
        elif marker == AMF0.STRICT_ARRAY:
            return self._decode_strict_array()
        elif marker == AMF0.DATE:
            return self._decode_date()
        elif marker == AMF0.LONG_STRING:
            return self._decode_long_string()
        elif marker == AMF0.AMF3_SWITCH:
            return self._decode_value()  # 递归解码后续值
        else:
            return f'[Unknown marker: 0x{marker:02x}]'

    def _decode_number(self) -> float:
        value = struct.unpack('>d', self.data[self.offset:self.offset+8])[0]
        self.offset += 8
        return value

    def _decode_boolean(self) -> bool:
        value = self.data[self.offset] != 0
        self.offset += 1
        return value

    def _decode_string(self) -> str:
        length = struct.unpack('>H', self.data[self.offset:self.offset+2])[0]
        self.offset += 2
        value = self.data[self.offset:self.offset+length].decode('utf-8')
        self.offset += length
        return value

    def _decode_long_string(self) -> str:
        length = struct.unpack('>I', self.data[self.offset:self.offset+4])[0]
        self.offset += 4
        value = self.data[self.offset:self.offset+length].decode('utf-8')
        self.offset += length
        return value

    def _decode_object(self) -> dict:
        result = {}
        while self.offset < len(self.data):
            # 检查结束标记
            if (self.data[self.offset] == 0 and
                self.data[self.offset+1] == 0 and
                self.data[self.offset+2] == AMF0.OBJECT_END):
                self.offset += 3
                break

            key = self._decode_string_key()
            value = self._decode_value()
            result[key] = value
        return result

    def _decode_string_key(self) -> str:
        length = struct.unpack('>H', self.data[self.offset:self.offset+2])[0]
        self.offset += 2
        value = self.data[self.offset:self.offset+length].decode('utf-8')
        self.offset += length
        return value

    def _decode_ecma_array(self) -> dict:
        count = struct.unpack('>I', self.data[self.offset:self.offset+4])[0]
        self.offset += 4
        # ECMA Array 使用与 Object 相同的键值对格式
        return self._decode_object()

    def _decode_strict_array(self) -> list:
        count = struct.unpack('>I', self.data[self.offset:self.offset+4])[0]
        self.offset += 4
        result = []
        for _ in range(count):
            result.append(self._decode_value())
        return result

    def _decode_date(self) -> datetime:
        timestamp_ms = struct.unpack('>d', self.data[self.offset:self.offset+8])[0]
        self.offset += 8
        timezone_offset = struct.unpack('>h', self.data[self.offset:self.offset+2])[0]
        self.offset += 2
        return datetime.fromtimestamp(timestamp_ms / 1000, tz=timezone.utc)

5.4 AMF3 数据类型

AMF3 引入了 引用机制(引用池),可以更高效地编码重复的对象和字符串:

标记类型说明
0x000Undefined
0x011Null
0x022Boolean False
0x033Boolean True
0x044Integer可变长度 29 位有符号整数
0x055Double8 字节浮点数
0x066String引用或 UTF-8 字符串
0x077XML Document
0x088Date
0x099Array
0x0A10Object
0x0B11XML
0x0C12ByteArray

AMF3 Integer 编码

AMF3 使用 可变长度编码(Variable-Length Encoding)表示整数:

规则:
- 有效位为 29 位(0 ~ 2^28-1)
- 每字节的最高位为"继续位"
- 大端序

示例:
值 0x03:
  0x03                                              → 0x03

值 0x80 (128):
  0x81 0x00                                         → 1 0000001 0000000
                                                     = 0b10000000 = 128

值 0x20000 (131072):
  0x84 0x80 0x00                                    → 0b100 0000000 0000000
                                                     = 131072
def amf3_read_integer(data: bytes, offset: int) -> tuple:
    """读取 AMF3 可变长度整数,返回 (value, new_offset)"""
    result = 0
    b = data[offset]
    offset += 1

    if b < 0x80:
        return b, offset

    result = (b & 0x7F) << 7
    b = data[offset]; offset += 1
    if b < 0x80:
        return result | b, offset

    result = (result | (b & 0x7F)) << 7
    b = data[offset]; offset += 1
    if b < 0x80:
        return result | b, offset

    result = (result | (b & 0x7F)) << 8
    b = data[offset]; offset += 1
    return result | b, offset


def amf3_write_integer(value: int) -> bytes:
    """写入 AMF3 可变长度整数"""
    if value < 0x80:
        return bytes([value])
    elif value < 0x4000:
        return bytes([0x80 | (value >> 7), value & 0x7F])
    elif value < 0x200000:
        return bytes([0x80 | (value >> 14), 0x80 | ((value >> 7) & 0x7F), value & 0x7F])
    else:
        return bytes([0x80 | (value >> 22), 0x80 | ((value >> 15) & 0x7F),
                       0x80 | ((value >> 8) & 0x7F), value & 0xFF])

5.5 RTMP 命令消息

命令消息使用 AMF 编码,通过消息类型 20(AMF0 Command)或 17(AMF3 Command)传输。

命令消息结构

AMF0 Command (Type 20) Body 结构:
┌──────────────────────────────────────────────────┐
│  Command Name (AMF0 String)     ← "connect"      │
│  Transaction ID (AMF0 Number)   ← 1.0            │
│  Command Object (AMF0 Object)   ← {app, tcUrl}   │
│  Optional Args (AMF0 Value...)  ← ...             │
└──────────────────────────────────────────────────┘

5.5.1 connect 命令

客户端发送 connect 命令建立与应用程序的连接:

def create_connect_command(
    app: str,
    tc_url: str,
    flash_ver: str = "FMLE/3.0",
    fpad: bool = False,
    capabilities: int = 239,
    audio_codecs: int = 3575,
    video_codecs: int = 252,
    video_function: int = 1,
    object_encoding: int = 0,
    transaction_id: float = 1.0
) -> list:
    """构造 connect 命令的 AMF 值列表"""
    return [
        "connect",                # Command Name
        transaction_id,           # Transaction ID
        {                         # Command Object
            "app": app,
            "flashVer": flash_ver,
            "fpad": fpad,
            "capabilities": capabilities,
            "audioCodecs": audio_codecs,
            "videoCodecs": video_codecs,
            "videoFunction": video_function,
            "objectEncoding": object_encoding,
            "tcUrl": tc_url,
        }
    ]

connect 命令的 Command Object 字段

字段类型说明
appString应用名称,如 “live”
flashVerStringFlash Player 版本
fpadBoolean是否支持代理
capabilitiesNumber客户端能力标志
audioCodecsNumber支持的音频编码
videoCodecsNumber支持的视频编码
videoFunctionNumber视频功能标志
objectEncodingNumberAMF 版本(0=AMF0, 3=AMF3)
tcUrlStringRTMP 完整 URL

5.5.2 _result / _error 响应

服务端响应 connect 命令:

def create_connect_result(transaction_id: float) -> list:
    """构造 connect 成功响应"""
    return [
        "_result",                # Command Name
        transaction_id,           # Transaction ID (对应 connect 的 ID)
        {                         # Properties
            "fmsVer": "FMS/3,0,1,123",
            "capabilities": 31,
        },
        {                         # Information
            "level": "status",
            "code": "NetConnection.Connect.Success",
            "description": "Connection succeeded.",
            "objectEncoding": 0,
        }
    ]


def create_connect_error(transaction_id: float, code: str, desc: str) -> list:
    """构造 connect 错误响应"""
    return [
        "_error",
        transaction_id,
        None,
        {
            "level": "error",
            "code": code,
            "description": desc,
        }
    ]

5.5.3 createStream 命令

创建逻辑流:

def create_create_stream(transaction_id: float) -> list:
    """构造 createStream 命令"""
    return [
        "createStream",
        transaction_id,
        None,  # 无 Command Object
    ]

def create_create_stream_result(transaction_id: float, stream_id: float = 1.0) -> list:
    """构造 createStream 响应"""
    return [
        "_result",
        transaction_id,
        None,
        stream_id,  # 新建的流 ID
    ]

5.5.4 play 命令

请求播放流:

def create_play(
    stream_name: str,
    stream_id: int = 1,
    start: float = -2,
    duration: float = -1,
    reset: bool = True
) -> list:
    """构造 play 命令"""
    return [
        "play",
        0,  # Transaction ID (通常为 0)
        None,
        stream_name,  # 流名称
        start,        # 开始时间 (-2=直播, -1=从头)
        duration,     # 持续时间 (-1=到结束)
        reset,        # 是否重置
    ]

5.5.5 publish 命令

开始推流:

def create_publish(
    stream_name: str,
    publish_type: str = "live"
) -> list:
    """构造 publish 命令"""
    return [
        "publish",
        0,
        None,
        stream_name,    # 流名称
        publish_type,   # "live", "record", "append"
    ]

5.6 远程过程调用(RPC)

RTMP 支持两种 RPC 机制:

5.6.1 NetConnection.call

客户端可以直接调用服务端的方法:

Client → Server (AMF Command, Type 20):
┌────────────────────────────────────────────────┐
│  "call"                          ← 命令名      │
│  transaction_id                  ← 事务 ID     │
│  null                            ← 无对象      │
│  "getServerTime"                 ← 方法名      │
│  arg1, arg2, ...                 ← 参数        │
└────────────────────────────────────────────────┘

5.6.2 RPC 调用

def create_rpc_call(method: str, transaction_id: float, *args) -> list:
    """构造 RPC 调用命令"""
    return ["call", transaction_id, None, method] + list(args)

# 示例:调用服务端的 getServerTime
encoder = AMF0Encoder()
rpc_msg = create_rpc_call("getServerTime", 3.0)
encoded = encoder.encode(*rpc_msg)

5.7 onStatus 事件通知

服务端向客户端发送状态事件:

def create_on_status(
    level: str,
    code: str,
    description: str,
    info: dict = None
) -> list:
    """构造 onStatus 事件"""
    return [
        "onStatus",
        0,
        None,
        {
            "level": level,
            "code": code,
            "description": description,
            **(info or {}),
        }
    ]


# 常见事件代码:
# NetStream.Play.Start    - 开始播放
# NetStream.Play.Stop     - 停止播放
# NetStream.Play.Reset    - 播放重置
# NetStream.Publish.Start - 开始推流
# NetStream.Unpublish.Success - 停止推流
# NetStream.Play.StreamNotFound - 流未找到

5.8 完整命令流程

推流(Publish)完整流程

Client                                      Server
  │                                           │
  │════  握手完成  ════════════════════════════│
  │                                           │
  │─── Set Chunk Size (4096) ───────────────→│
  │─── connect("live") ────────────────────→│  Type 20
  │                                           │
  │←── Window Ack Size ─────────────────────│  Type 5
  │←── Set Peer Bandwidth ──────────────────│  Type 6
  │←── User Control (Stream Begin, 0) ──────│  Type 4
  │←── _result (NetConnection.Connect.Success)│  Type 20
  │                                           │
  │─── createStream() ─────────────────────→│  Type 20
  │←── _result (stream_id=1) ───────────────│  Type 20
  │                                           │
  │─── @setDataFrame (onMetaData) ─────────→│  Type 18
  │─── publish("mystream", "live") ────────→│  Type 20
  │                                           │
  │←── onStatus (Publish.Start) ────────────│  Type 20
  │                                           │
  │─── AAC Sequence Header ────────────────→│  Type 8
  │─── AVC Sequence Header ────────────────→│  Type 9
  │                                           │
  │─── Audio Data ─────────────────────────→│  Type 8
  │─── Video Data ─────────────────────────→│  Type 9
  │─── Audio Data ─────────────────────────→│  Type 8
  │─── Video Data ─────────────────────────→│  Type 9
  │       ... 持续发送 ...                    │

播放(Play)完整流程

Client                                      Server
  │                                           │
  │─── connect("live") ────────────────────→│
  │←── _result ─────────────────────────────│
  │                                           │
  │─── createStream() ─────────────────────→│
  │←── _result (stream_id=1) ───────────────│
  │                                           │
  │─── play("mystream") ───────────────────→│
  │                                           │
  │←── User Control (Stream Begin, 1) ──────│
  │←── onStatus (Play.Reset) ───────────────│
  │←── onStatus (Play.Start) ───────────────│
  │←── RtmpSampleAccess (false, false) ────│  Type 18
  │←── onMetaData ─────────────────────────│  Type 18
  │                                           │
  │←── AAC Sequence Header ─────────────────│  Type 8
  │←── AVC Sequence Header ─────────────────│  Type 9
  │                                           │
  │←── Audio Data ──────────────────────────│  Type 8
  │←── Video Data ──────────────────────────│  Type 9
  │       ... 持续接收 ...                    │

5.9 AMF 编解码测试

def test_amf0_codec():
    """AMF0 编解码测试"""
    encoder = AMF0Encoder()
    decoder = AMF0Decoder

    # 测试 connect 命令
    values = create_connect_command(
        app="live",
        tc_url="rtmp://localhost:1935/live"
    )
    encoded = encoder.encode(*values)
    print(f"Encoded {len(values)} values: {len(encoded)} bytes")
    print(f"Hex: {encoded.hex()}")

    # 解码验证
    d = decoder(encoded)
    decoded = d.decode()
    print(f"Decoded: {decoded}")

    # 验证
    assert decoded[0] == "connect"
    assert decoded[1] == 1.0
    assert decoded[2]["app"] == "live"
    print("✅ AMF0 codec test passed!")


if __name__ == '__main__':
    test_amf0_codec()

注意事项

  1. AMF0 vs AMF3 选择:大多数 RTMP 实现默认使用 AMF0,AMF3 在 Flash Remoting 中更常见
  2. Transaction ID 递增:每个命令的 Transaction ID 应递增,用于匹配请求和响应
  3. Object Encoding:connect 命令中的 objectEncoding 字段告诉服务端客户端使用的 AMF 版本
  4. 字符串编码:AMF0 String 使用 UTF-8,注意不要超过 65535 字节(Long String 例外)
  5. 数字精度:AMF0 Number 是 64 位浮点数,整数不要超过 2^53 以避免精度丢失
  6. onMetaData:此消息中的 metadata 对象不固定,不同编码器/版本可能包含不同字段

扩展阅读


上一章04 - 消息格式 下一章06 - 流操作 — 了解 RTMP 的流生命周期管理