强曰为道
与天地相似,故不违。知周乎万物,而道济天下,故不过。旁行而不流,乐天知命,故不忧.
文档目录

函数式编程艺术 / 12 函数式响应式编程

12 函数式响应式编程

“FRP 把事件流当作一等值来处理——你可以像操作数组一样操作事件。”


12.1 概述

函数式响应式编程(Functional Reactive Programming, FRP) 将时间维度引入函数式编程,用流(Stream)和信号(Signal)来表示随时间变化的值。

12.1.1 命令式 vs 响应式

方面 命令式 响应式
数据流 手动更新 自动传播
状态管理 可变变量 不可变流
事件处理 回调函数 声明式流
并发 手动管理 操作符处理

12.1.2 FRP 的两大学派

学派 代表 特点
经典 FRP Fran, Elm (早期) 连续时间信号/行为
反应式编程 RxJS, Reactor 离散事件流

12.2 信号与行为(Signals/Behaviors)

12.2.1 经典 FRP 模型

行为(Behavior) 是随时间连续变化的值:

Behavior a = Time → a

事件(Event) 是离散发生的值:

Event a = [(Time, a)]

Elm 风格:

-- 行为:鼠标位置随时间变化
mousePos : Behavior (Int, Int)
mousePos = Mouse.position

-- 事件:鼠标点击
clicks : Event ()
clicks = Mouse.clicks

-- 组合:点击时的鼠标位置
clickPositions : Event (Int, Int)
clickPositions = sampleOn clicks mousePos

-- 转换
countClicks : Behavior Int
countClicks = foldp (\_ acc -> acc + 1) 0 clicks

12.3 响应式流(Reactive Streams)

12.3.1 Observable 模型

Observable 是最流行的响应式流实现。

核心概念:

概念 说明
Observable 可观察的数据流,发出多个值
Observer 消费 Observable 发出的值
Subscription 连接 Observable 和 Observer
Operator 变换、组合流的函数

12.3.2 RxJS 示例

import { fromEvent, interval, merge, combineLatest } from 'rxjs';
import { map, filter, scan, debounceTime, switchMap, takeUntil } from 'rxjs/operators';

// 基本流
const numbers$ = from([1, 2, 3, 4, 5]);

// 事件流
const clicks$ = fromEvent(document, 'click');
const keys$ = fromEvent(document, 'keydown');

// 变换
const positions$ = clicks$.pipe(
  map(e => ({ x: e.clientX, y: e.clientY }))
);

// 过滤
const enterKeys$ = keys$.pipe(
  filter(e => e.key === 'Enter')
);

// 累积
const clickCount$ = clicks$.pipe(
  scan((count, _) => count + 1, 0)
);

// 防抖
const searchInput$ = fromEvent(searchBox, 'input').pipe(
  debounceTime(300),
  map(e => e.target.value),
  filter(query => query.length > 2)
);

// 搜索请求(取消前一个)
const results$ = searchInput$.pipe(
  switchMap(query => fetch(`/api/search?q=${query}`).then(r => r.json()))
);

// 订阅
results$.subscribe(results => {
  renderResults(results);
});

12.3.3 自动补全示例

// 经典应用:搜索自动补全
const searchBox = document.getElementById('search');
const results = document.getElementById('results');

const search$ = fromEvent(searchBox, 'input').pipe(
  // 取输入值
  map(e => e.target.value.trim()),
  // 防抖 300ms
  debounceTime(300),
  // 至少 2 个字符
  filter(query => query.length >= 2),
  // 去重
  distinctUntilChanged(),
  // 发起请求(取消前一个未完成的)
  switchMap(query =>
    from(fetch(`/api/search?q=${encodeURIComponent(query)}`).then(r => r.json())).pipe(
      // 错误处理:返回空结果
      catchError(() => of([]))
    )
  )
);

search$.subscribe(data => {
  results.innerHTML = data.map(item =>
    `<div class="result">${item.title}</div>`
  ).join('');
});

12.4 流操作符

12.4.1 创建操作符

操作符 说明 RxJS
of 从值创建 of(1, 2, 3)
from 从数组/可迭代创建 from([1, 2, 3])
interval 定时创建 interval(1000)
fromEvent 从事件创建 fromEvent(el, 'click')
create 自定义创建 new Observable(subscriber => {...})

12.4.2 变换操作符

操作符 说明
map 转换每个值
flatMap / mergeMap 展平内部 Observable
switchMap 切换到新的内部 Observable(取消旧的)
concatMap 顺序执行内部 Observable
scan 累积归约
buffer 收集值到数组

12.4.3 过滤操作符

操作符 说明
filter 根据谓词过滤
take 取前 N 个值
skip 跳过前 N 个值
distinctUntilChanged 去重连续相同值
debounceTime 防抖
throttleTime 节流
first / last 取第一个/最后一个

12.4.4 组合操作符

操作符 说明
merge 合并多个流
concat 顺序连接流
combineLatest 最新值组合
zip 按索引配对
forkJoin 等待所有完成
import { merge, combineLatest, zip } from 'rxjs';

// merge:任一流发出值都传播
const merged$ = merge(mouseClicks$, keyPresses$);

// combineLatest:任一流发出时,取其他流的最新值
const fullName$ = combineLatest([firstName$, lastName$]).pipe(
  map(([first, last]) => `${first} ${last}`)
);

// zip:严格配对
const paired$ = zip(names$, ages$).pipe(
  map(([name, age]) => ({ name, age }))
);

12.5 错误处理

import { of, EMPTY } from 'rxjs';
import { catchError, retry, retryWhen, delay } from 'rxjs/operators';

// 基本错误处理
const safeRequest$ = fetch('/api/data').pipe(
  catchError(error => {
    console.error('Request failed:', error);
    return of({ data: [], error: error.message });
  })
);

// 重试
const resilientRequest$ = fetch('/api/data').pipe(
  retry(3)
);

// 指数退避重试
const backoffRequest$ = fetch('/api/data').pipe(
  retryWhen(errors => errors.pipe(
    scan((retryCount, error) => {
      if (retryCount >= 3) throw error;
      return retryCount + 1;
    }, 0),
    delay(retryCount => Math.pow(2, retryCount) * 1000)
  ))
);

// 错误恢复
const requestWithFallback$ = primaryRequest$.pipe(
  catchError(() => fallbackRequest$),
  catchError(() => of(defaultValue))
);

12.6 状态管理

12.6.1 Redux 的流视角

// Redux 本质上是一个流
import { Subject } from 'rxjs';
import { scan, shareReplay } from 'rxjs/operators';

const createStore = (reducer, initialState) => {
  const actions$ = new Subject();
  const state$ = actions$.pipe(
    scan(reducer, initialState),
    shareReplay(1)
  );
  return {
    dispatch: (action) => actions$.next(action),
    select: (selector) => state$.pipe(map(selector)),
    state$,
  };
};

// 使用
const store = createStore(todoReducer, []);

store.select(state => state.filter(t => !t.completed)).subscribe(
  activeTodos => render(activeTodos)
);

store.dispatch({ type: 'ADD_TODO', text: 'Learn FRP' });

12.6.2 Elm Architecture

// Elm Architecture: Model → Update → View
const ElmApp = (init, update, view) => {
  const actions$ = new Subject();

  const model$ = actions$.pipe(
    scan((model, action) => update(model, action), init)
  );

  const vdom$ = model$.pipe(
    map(model => view(model, (action) => actions$.next(action)))
  );

  return { model$, vdom$ };
};

12.7 Haskell 中的 FRP

-- 使用 reflex 库
import Reflex

-- 基本 FRP
example :: Widget t m => m ()
example = do
  -- 创建事件
  rec textInput <- textInput def
      let keypressEvent = _textInput_keypress textInput

  -- 行为:累积按键次数
  count <- foldDyn (+) 0 (1 <$ keypressEvent)

  -- 显示
  display count

-- 动态文本
  dynText $ fmap show count

12.8 业务场景

12.8.1 实时仪表板

// 实时数据仪表板
const dashboard$ = combineLatest({
  cpu: interval(1000).pipe(switchMap(() => fetchMetric('cpu'))),
  memory: interval(2000).pipe(switchMap(() => fetchMetric('memory'))),
  requests: fromEventSource('/api/metrics/requests'),
  errors: fromEventSource('/api/metrics/errors').pipe(
    bufferTime(5000),
    filter(errors => errors.length > 0)
  )
}).pipe(
  // 状态转换
  scan((dashboard, metrics) => ({
    ...dashboard,
    ...metrics,
    lastUpdated: Date.now(),
    alerts: [
      ...dashboard.alerts,
      ...(metrics.cpu > 90 ? ['High CPU'] : []),
      ...(metrics.errors.length > 10 ? ['Error spike'] : []),
    ]
  }), { alerts: [] })
);

dashboard$.subscribe(renderDashboard);

12.8.2 表单验证

const formValidation$ = (form) => {
  const name$ = fromEvent(form.name, 'input').pipe(
    map(e => e.target.value),
    map(name => ({
      value: name,
      valid: name.length >= 2,
      error: name.length < 2 ? 'Name too short' : null
    }))
  );

  const email$ = fromEvent(form.email, 'input').pipe(
    map(e => e.target.value),
    map(email => ({
      value: email,
      valid: /^[^\s@]+@[^\s@]+\.[^\s@]+$/.test(email),
      error: !/^[^\s@]+@[^\s@]+\.[^\s@]+$/.test(email) ? 'Invalid email' : null
    }))
  );

  const age$ = fromEvent(form.age, 'input').pipe(
    map(e => parseInt(e.target.value)),
    map(age => ({
      value: age,
      valid: age >= 18 && age <= 120,
      error: age < 18 ? 'Must be 18+' : age > 120 ? 'Invalid age' : null
    }))
  );

  return combineLatest([name$, email$, age$]).pipe(
    map(([name, email, age]) => ({
      fields: { name, email, age },
      valid: name.valid && email.valid && age.valid
    })),
    distinctUntilChanged((a, b) => a.valid === b.valid)
  );
};

12.9 注意事项

注意事项 说明
内存泄漏 忘记取消订阅会导致内存泄漏
调试困难 流的异步性质使调试复杂
过度使用 简单场景不需要 FRP
背压处理 高速数据流需要背压策略
测试 使用 TestScheduler 进行虚拟时间测试

12.10 小结

要点 说明
FRP 函数式 + 响应式,流是一等公民
Signal/Behavior 连续时间变化的值
Observable 离散事件流,支持丰富的操作符
操作符 创建、变换、过滤、组合、错误处理
状态管理 Elm Architecture、Redux 的流视角

扩展阅读

  1. RxJS 文档 — 官方文档
  2. The Introduction to Reactive Programming You’ve Been Missing — André Staltz
  3. Elm Architecture — Elm 官方教程
  4. Reflex FRP — Haskell FRP 库

下一章13 函数式设计模式