函数式编程艺术 / 17 实际应用
17 实际应用
“函数式编程不是象牙塔——它在数据处理、编译器、DSL 设计等实际场景中有广泛应用。”
17.1 数据处理与 ETL
17.1.1 函数式 ETL 管道
ETL(Extract-Transform-Load)是函数式编程最自然的应用场景。
// ETL 管道:CSV → 清洗 → 转换 → 聚合 → 输出
const R = require('ramda');
// 提取:读取 CSV
const extract = (csvText) =>
csvText
.split('\n')
.filter(line => line.trim())
.map(line => line.split(','))
.slice(1) // 跳过表头
.map(([date, product, quantity, price]) => ({
date,
product: product.trim(),
quantity: parseInt(quantity),
price: parseFloat(price),
}));
// 转换:清洗和计算
const transform = R.pipe(
// 过滤无效记录
R.filter(row => !isNaN(row.quantity) && !isNaN(row.price) && row.quantity > 0),
// 计算小计
R.map(row => ({ ...row, subtotal: row.quantity * row.price })),
// 按产品分组
R.groupBy(row => row.product),
// 聚合统计
R.map(rows => ({
totalQuantity: R.sum(R.map(R.prop('quantity'), rows)),
totalRevenue: R.sum(R.map(R.prop('subtotal'), rows)),
avgPrice: R.mean(R.map(R.prop('price'), rows)),
orderCount: rows.length,
}))
);
// 加载:格式化输出
const load = (data) =>
Object.entries(data).map(([product, stats]) => ({
product,
...stats,
formattedRevenue: `$${stats.totalRevenue.toFixed(2)}`,
}));
// 完整管道
const etl = R.pipe(extract, transform, load);
// 使用
const csvData = `date,product,quantity,price
2024-01-01,Laptop,2,999.99
2024-01-01,Mouse,5,29.99
2024-01-02,Laptop,1,999.99
2024-01-02,Keyboard,3,79.99`;
console.log(etl(csvData));
Python:
from functools import reduce
from itertools import groupby
from operator import itemgetter
import csv
from io import StringIO
from collections import defaultdict
# 提取
def extract(csv_text):
reader = csv.DictReader(StringIO(csv_text))
return [
{
'date': row['date'],
'product': row['product'].strip(),
'quantity': int(row['quantity']),
'price': float(row['price']),
}
for row in reader
]
# 转换
def transform(records):
# 过滤无效记录
valid = [r for r in records if r['quantity'] > 0 and r['price'] > 0]
# 计算小计
enriched = [{**r, 'subtotal': r['quantity'] * r['price']} for r in valid]
# 分组聚合
groups = defaultdict(list)
for r in enriched:
groups[r['product']].append(r)
return {
product: {
'total_quantity': sum(r['quantity'] for r in rows),
'total_revenue': sum(r['subtotal'] for r in rows),
'avg_price': sum(r['price'] for r in rows) / len(rows),
'order_count': len(rows),
}
for product, rows in groups.items()
}
# 加载
def load(data):
return [
{'product': product, **stats, 'formatted_revenue': f"${stats['total_revenue']:.2f}"}
for product, stats in data.items()
]
# 管道
from toolz import pipe
result = pipe(csv_data, extract, transform, load)
17.1.2 大数据处理
# 使用 itertools 处理大数据文件
import itertools
from functools import reduce
def process_large_file(filepath):
"""惰性处理大型日志文件"""
def read_lines():
with open(filepath, 'r') as f:
for line in f:
yield line.strip()
def parse(line):
parts = line.split(' ', 3)
return {'timestamp': parts[0], 'level': parts[1],
'module': parts[2], 'message': parts[3]} if len(parts) >= 4 else None
def is_error(entry):
return entry and entry['level'] == 'ERROR'
# 惰性管道
errors = itertools.islice(
filter(is_error, map(parse, read_lines())),
1000 # 只处理前 1000 个错误
)
# 聚合
return reduce(
lambda acc, e: {**acc, e['module']: acc.get(e['module'], 0) + 1},
errors,
{}
)
17.2 解析器组合子(Parser Combinators)
解析器组合子是函数式编程的经典应用——用小的解析器组合成大的解析器。
17.2.1 基本概念
| 概念 | 说明 |
|---|---|
| 解析器 | String → [(a, String)](输入 → [(结果, 剩余输入)]) |
| 组合子 | 将小解析器组合成大解析器的函数 |
| 成功 | 返回结果和剩余输入 |
| 失败 | 返回空列表 |
17.2.2 Haskell 解析器组合子
import Text.Parsec
import Text.Parsec.String
-- 基本解析器
char' :: Char -> Parser Char
char' c = satisfy (== c)
digit :: Parser Char
digit = satisfy isDigit
-- 组合
string' :: String -> Parser String
string' = mapM char'
-- JSON 解析器
data JsonValue
= JsonNull
| JsonBool Bool
| JsonNumber Double
| JsonString String
| JsonArray [JsonValue]
| JsonObject [(String, JsonValue)]
deriving (Show, Eq)
jsonParser :: Parser JsonValue
jsonParser = spaces *> (jsonNull <|> jsonBool <|> jsonNumber <|> jsonString <|> jsonArray <|> jsonObject)
jsonNull :: Parser JsonValue
jsonNull = string "null" *> pure JsonNull
jsonBool :: Parser JsonValue
jsonBool = (string "true" *> pure (JsonBool True))
<|> (string "false" *> pure (JsonBool False))
jsonNumber :: Parser JsonValue
jsonNumber = JsonNumber . read <$> many1 (digit <|> oneOf ".-eE")
jsonString :: Parser JsonValue
jsonString = JsonString <$> (char '"' *> manyTill anyChar (char '"'))
jsonArray :: Parser JsonValue
jsonArray = JsonArray <$> (char '[' *> spaces *> jsonParser `sepBy` (char ',' <* spaces) <* char ']')
jsonObject :: Parser JsonValue
jsonObject = JsonObject <$> (char '{' *> spaces *> pair `sepBy` (char ',' <* spaces) <* char '}')
where
pair = do
key <- jsonString >>= \(JsonString s) -> return s
spaces *> char ':' *> spaces
value <- jsonParser
return (key, value)
17.2.3 JavaScript 解析器组合子
// 简单的解析器组合子库
const Parser = (run) => ({
run,
map: (fn) => Parser(input => {
const result = run(input);
return result ? { value: fn(result.value), rest: result.rest } : null;
}),
flatMap: (fn) => Parser(input => {
const result = run(input);
return result ? fn(result.value).run(result.rest) : null;
}),
then: (other) => Parser(input => {
const result = run(input);
return result ? other.run(result.rest) : null;
}),
});
// 基本解析器
const char = (c) => Parser(input =>
input[0] === c ? { value: c, rest: input.slice(1) } : null
);
const digit = Parser(input =>
/^[0-9]/.test(input) ? { value: input[0], rest: input.slice(1) } : null
);
const many = (parser) => Parser(input => {
const results = [];
let rest = input;
while (true) {
const result = parser.run(rest);
if (!result) break;
results.push(result.value);
rest = result.rest;
}
return { value: results, rest };
});
const many1 = (parser) => parser.flatMap(first =>
many(parser).map(rest => [first, ...rest])
);
const string = (s) => Parser(input =>
input.startsWith(s) ? { value: s, rest: input.slice(s.length) } : null
);
// JSON 数字解析器
const numberParser = many1(digit).map(digits => parseInt(digits.join('')));
// 使用
numberParser.run('123abc'); // { value: 123, rest: 'abc' }
17.2.4 Rust nom 库
use nom::{
IResult,
bytes::complete::{tag, take_while1},
character::complete::{digit1, multispace0},
combinator::{map, opt},
sequence::{delimited, tuple},
branch::alt,
};
// 解析整数
fn parse_integer(input: &str) -> IResult<&str, i64> {
let (input, sign) = opt(tag("-"))(input)?;
let (input, digits) = digit1(input)?;
let number: i64 = digits.parse().unwrap();
Ok((input, if sign.is_some() { -number } else { number }))
}
// 解析字符串
fn parse_string(input: &str) -> IResult<&str, &str> {
delimited(
tag("\""),
take_while1(|c| c != '"'),
tag("\""),
)(input)
}
// 组合:解析键值对
fn parse_key_value(input: &str) -> IResult<&str, (&str, i64)> {
let (input, key) = parse_string(input)?;
let (input, _) = tag(":")(input)?;
let (input, _) = multispace0(input)?;
let (input, value) = parse_integer(input)?;
Ok((input, (key, value)))
}
17.3 DSL 设计
17.3.1 内部 DSL
// 查询 DSL
const query = QueryBuilder
.select('name', 'email', 'age')
.from('users')
.where('age', '>', 18)
.where('active', '=', true)
.orderBy('name', 'ASC')
.limit(10)
.build();
// 更函数式的 DSL
const buildQuery = (options) => {
const { select, from, where, orderBy, limit } = options;
let sql = `SELECT ${select.join(', ')} FROM ${from}`;
if (where.length > 0) {
sql += ` WHERE ${where.map(([col, op, val]) =>
`${col} ${op} ${typeof val === 'string' ? `'${val}'` : val}`
).join(' AND ')}`;
}
if (orderBy) sql += ` ORDER BY ${orderBy[0]} ${orderBy[1]}`;
if (limit) sql += ` LIMIT ${limit}`;
return sql;
};
// 使用
buildQuery({
select: ['name', 'email', 'age'],
from: 'users',
where: [['age', '>', 18], ['active', '=', true]],
orderBy: ['name', 'ASC'],
limit: 10,
});
// SELECT name, email, age FROM users WHERE age > 18 AND active = true ORDER BY name ASC LIMIT 10
17.3.2 Haskell 外部 DSL
-- 用 Parsec 定义外部 DSL
-- 配置文件格式:
-- database {
-- host = "localhost"
-- port = 5432
-- name = "mydb"
-- }
data Config = Config
{ dbHost :: String
, dbPort :: Int
, dbName :: String
} deriving (Show)
configParser :: Parser Config
configParser = do
string "database" *> spaces *> char '{' *> spaces
host <- string "host" *> spaces *> char '=' *> spaces *> quotedString <* spaces
port <- string "port" *> spaces *> char '=' *> spaces *> int <* spaces
name <- string "name" *> spaces *> char '=' *> spaces *> quotedString <* spaces
char '}'
return $ Config host port name
where
quotedString = char '"' *> manyTill anyChar (char '"')
int = read <$> many1 digit
17.3.3 Clojure DSL
;; Clojure 的数据即代码特性非常适合 DSL
;; SQL DSL
(defn sql-select [& fields]
{:select fields})
(defn sql-from [query table]
(assoc query :from table))
(defn sql-where [query condition]
(update query :where conj condition))
(defn sql-limit [query n]
(assoc query :limit n))
;; 组合 DSL
(-> (sql-select :name :email :age)
(sql-from :users)
(sql-where [:age :> 18])
(sql-where [:active := true])
(sql-limit 10))
;; 结果
;; {:select [:name :email :age]
;; :from :users
;; :where [[:age :> 18] [:active := true]]
;; :limit 10}
;; 宏实现更自然的 DSL
(defmacro select [& fields]
`{:select ~(vec fields)})
(defmacro from [query table]
`(assoc ~query :from ~table))
;; 使用
(-> (select :name :email)
(from :users))
17.4 编译器/解释器
17.4.1 简单表达式语言
-- 表达式语言的编译器
data Expr
= Num Double
| Var String
| Add Expr Expr
| Mul Expr Expr
| Let String Expr Expr
deriving (Show, Eq)
-- 类型检查
typeCheck :: Expr -> Either String Type
typeCheck (Num _) = Right TNum
typeCheck (Var _) = Right TNum -- 简化:所有变量都是数值
typeCheck (Add a b) = case (typeCheck a, typeCheck b) of
(Right TNum, Right TNum) -> Right TNum
_ -> Left "Type error in addition"
typeCheck (Mul a b) = case (typeCheck a, typeCheck b) of
(Right TNum, Right TNum) -> Right TNum
_ -> Left "Type error in multiplication"
typeCheck (Let _ expr body) = case typeCheck expr of
Right _ -> typeCheck body
Left err -> Left err
-- 求值
eval :: Map String Double -> Expr -> Double
eval _ (Num x) = x
eval env (Var name) = env Map.! name
eval env (Add a b) = eval env a + eval env b
eval env (Mul a b) = eval env a * eval env b
eval env (Let name expr body) =
let value = eval env expr
newEnv = Map.insert name value env
in eval newEnv body
-- 编译到汇编(简化示例)
compile :: Expr -> [Instruction]
compile (Num x) = [Push x]
compile (Var name) = [Load name]
compile (Add a b) = compile a ++ compile b ++ [AddOp]
compile (Mul a b) = compile a ++ compile b ++ [MulOp]
compile (Let name expr body) =
compile expr ++ [Store name] ++ compile body
Rust:
#[derive(Debug, Clone, PartialEq)]
enum Expr {
Num(f64),
Var(String),
Add(Box<Expr>, Box<Expr>),
Mul(Box<Expr>, Box<Expr>),
Let(String, Box<Expr>, Box<Expr>),
}
impl Expr {
fn eval(&self, env: &std::collections::HashMap<String, f64>) -> Result<f64, String> {
match self {
Expr::Num(x) => Ok(*x),
Expr::Var(name) => env.get(name)
.cloned()
.ok_or_else(|| format!("Undefined variable: {}", name)),
Expr::Add(a, b) => Ok(a.eval(env)? + b.eval(env)?),
Expr::Mul(a, b) => Ok(a.eval(env)? * b.eval(env)?),
Expr::Let(name, expr, body) => {
let value = expr.eval(env)?;
let mut new_env = env.clone();
new_env.insert(name.clone(), value);
body.eval(&new_env)
}
}
}
}
17.5 Web API 设计
17.5.1 函数式 Web 框架
// 函数式中间件
const middleware = {
cors: (handler) => (req) => {
const response = handler(req);
return {
...response,
headers: {
...response.headers,
'Access-Control-Allow-Origin': '*',
},
};
},
auth: (handler) => (req) => {
if (!req.headers.authorization) {
return { status: 401, body: { error: 'Unauthorized' } };
}
const user = verifyToken(req.headers.authorization);
return handler({ ...req, user });
},
validate: (schema) => (handler) => (req) => {
const errors = schema.validate(req.body);
if (errors.length > 0) {
return { status: 400, body: { errors } };
}
return handler(req);
},
};
// 路由定义(函数组合)
const routes = {
'GET /users': compose(
middleware.cors,
middleware.auth,
)((req) => {
const users = db.getUsers();
return { status: 200, body: users };
}),
'POST /users': compose(
middleware.cors,
middleware.auth,
middleware.validate(userSchema),
)((req) => {
const user = db.createUser(req.body);
return { status: 201, body: user };
}),
};
17.6 机器学习预处理
# 函数式数据预处理管道
from functools import reduce, partial
from toolz import pipe, curry, compose
@curry
def normalize_column(df, column):
"""Min-Max 归一化"""
col = df[column]
return df.assign(**{column: (col - col.min()) / (col.max() - col.min())})
@curry
def fill_missing(df, column, strategy='mean'):
"""填充缺失值"""
fill_value = df[column].mean() if strategy == 'mean' else df[column].median()
return df.assign(**{column: df[column].fillna(fill_value)})
@curry
def encode_categorical(df, column):
"""One-hot 编码"""
dummies = pd.get_dummies(df[column], prefix=column)
return pd.concat([df.drop(column, axis=1), dummies], axis=1)
@curry
def filter_outliers(df, column, n_std=3):
"""过滤异常值"""
mean, std = df[column].mean(), df[column].std()
return df[(df[column] >= mean - n_std * std) & (df[column] <= mean + n_std * std)]
# 组合预处理管道
preprocess = compose(
normalize_column('age'),
normalize_column('income'),
fill_missing('age'),
fill_missing('income'),
encode_categorical('city'),
filter_outliers('income'),
)
# 使用
clean_df = preprocess(raw_df)
17.7 业务场景
17.7.1 日志分析系统
# 大规模日志分析
import json
from datetime import datetime
from itertools import islice
from collections import Counter
def analyze_logs(log_stream, time_range=None, level='ERROR'):
"""惰性日志分析管道"""
def parse_lines(stream):
for line in stream:
try:
yield json.loads(line.strip())
except json.JSONDecodeError:
continue
def filter_by_time(entries):
if time_range is None:
return entries
start, end = time_range
return (
e for e in entries
if start <= datetime.fromisoformat(e['timestamp']) <= end
)
def filter_by_level(entries, target_level):
return (e for e in entries if e.get('level') == target_level)
def extract_modules(entries):
return (e.get('module', 'unknown') for e in entries)
# 惰性管道
entries = parse_lines(log_stream)
entries = filter_by_time(entries)
entries = filter_by_level(entries, level)
modules = extract_modules(entries)
# 计数(只消耗需要的元素)
return Counter(islice(modules, 10000))
# 使用
with open('app.log') as f:
error_counts = analyze_logs(f, level='ERROR')
print(error_counts.most_common(10))
17.8 注意事项
| 注意事项 | 说明 |
|---|---|
| 渐进采用 | 在现有项目中逐步引入 FP 概念 |
| 团队培训 | 确保团队理解 FP 基本概念 |
| 库选择 | 选择成熟、有良好文档的 FP 库 |
| 性能测试 | 在关键路径上进行性能测试 |
| 混合风格 | 不要强求所有代码都是纯函数式 |
17.9 小结
| 要点 | 说明 |
|---|---|
| ETL/数据处理 | 管道化、无副作用、易测试 |
| 解析器组合子 | 用小组件构建复杂解析器 |
| DSL 设计 | 内部/外部 DSL,函数组合是关键 |
| 编译器 | 模式匹配 + 递归处理 AST |
| Web API | 函数式中间件组合 |
扩展阅读
- Parsing in Haskell — Haskell 解析器资源
- Domain Specific Languages — Martin Fowler
- nom - Rust 解析器组合子库
- Clojure for Data Science — Henry Garner
下一章:18 最佳实践