Vala 语言入门教程 / 07 - 异步编程
第 7 章:异步编程
异步编程是现代应用开发的核心能力。Vala 提供了优雅的
async/await语法,让异步代码看起来像同步代码一样清晰。
7.1 为什么需要异步编程
7.1.1 同步 vs 异步
同步执行:
任务A ──────────────→ 任务B ──────────────→ 任务C
| 阻塞 5 秒 | | 阻塞 3 秒 |
总耗时: 8 秒
异步执行:
任务A ──────────────→
任务B ──────────→
任务C ──────────→
总耗时: max(5, 3) ≈ 5 秒
7.1.2 GLib 主循环(Main Loop)
Vala 的异步编程基于 GLib 主循环(Main Loop)。主循环是一个事件循环,负责:
- 处理 I/O 事件
- 执行定时器回调
- 分发信号
- 运行异步任务
┌───────────────────────────────────┐
│ GLib Main Loop │
│ ┌─────────────────────────────┐ │
│ │ 1. 检查 I/O 事件 │ │
│ │ 2. 执行到期的定时器 │ │
│ │ 3. 分发 idle 回调 │ │
│ │ 4. 处理异步完成回调 │ │
│ │ 5. 等待下一个事件 │ │
│ └────────── 循环 ──────────────┘ │
└───────────────────────────────────┘
7.2 async/await 基础
7.2.1 定义异步方法
// 异步方法使用 async 关键字
public async void fetch_data () {
print ("开始获取数据...\n");
// 模拟耗时操作
// yield 关键字将控制权交还给主循环
yield do_some_work ();
print ("数据获取完成!\n");
}
// 模拟异步工作
public async void do_some_work () {
print (" 工作中...\n");
// 在实际应用中,这里会是网络请求、文件 I/O 等
}
void main () {
// 创建主循环
var loop = new MainLoop ();
// 启动异步任务
fetch_data.begin ((obj, res) => {
fetch_data.end (res);
print ("回调:异步任务完成\n");
loop.quit ();
});
// 运行主循环
loop.run ();
}
输出:
开始获取数据...
工作中...
数据获取完成!
回调:异步任务完成
7.2.2 async/await 语法详解
// async 方法可以有返回值
public async int calculate_async (int x, int y) {
// 模拟异步计算
int result = x + y;
return result;
}
// 使用 await 等待异步结果
public async void process () {
print ("计算开始\n");
// 使用 yield(Vala 的 await)等待结果
int result = yield calculate_async (10, 20);
print ("计算结果: %d\n", result);
// 可以连续等待多个异步操作
int r1 = yield calculate_async (1, 2);
int r2 = yield calculate_async (3, 4);
print ("r1=%d, r2=%d, 总和=%d\n", r1, r2, r1 + r2);
}
void main () {
var loop = new MainLoop ();
process.begin ((obj, res) => {
process.end (res);
loop.quit ();
});
loop.run ();
}
7.2.3 异步方法与错误处理
errordomain NetworkError {
TIMEOUT,
CONNECTION_REFUSED,
NOT_FOUND
}
// 异步方法可以抛出错误
public async string fetch_url (string url) throws NetworkError {
print ("请求: %s\n", url);
if (url.contains ("timeout")) {
throw new NetworkError.TIMEOUT ("请求超时");
}
if (url.contains ("refused")) {
throw new NetworkError.CONNECTION_REFUSED ("连接被拒绝");
}
return "响应内容: OK";
}
// 使用 try-catch 处理异步错误
public async void safe_fetch () {
try {
string result = yield fetch_url ("https://example.com");
print ("成功: %s\n", result);
} catch (NetworkError e) {
printerr ("网络错误: %s\n", e.message);
}
}
void main () {
var loop = new MainLoop ();
safe_fetch.begin ((obj, res) => {
safe_fetch.end (res);
loop.quit ();
});
loop.run ();
}
7.3 回调风格(Callbacks)
在 async/await 出现之前,Vala 使用回调风格进行异步编程:
7.3.1 基本回调
// 回调委托
delegate void AsyncCallback<T> (T result);
// 使用回调的异步方法
public void fetch_with_callback (AsyncCallback<string> callback) {
// 模拟异步操作
GLib.Timeout.add (1000, () => {
callback ("回调结果: Hello");
return GLib.Source.REMOVE;
});
}
void main () {
var loop = new MainLoop ();
fetch_with_callback ((result) => {
print ("%s\n", result);
loop.quit ();
});
loop.run ();
}
7.3.2 GIO 风格回调
// GIO 风格:begin/end 模式
public void async_operation.begin (GLib.AsyncReadyCallback? callback) {
// 启动异步操作
GLib.Timeout.add (500, () => {
callback (this, null);
return GLib.Source.REMOVE;
});
}
public string async_operation.end (GLib.AsyncResult result) {
return "操作完成";
}
void main () {
var loop = new MainLoop ();
// 使用 GIO 风格的回调
var obj = new Object ();
obj.begin ((source, result) => {
// 获取结果
print ("异步操作完成\n");
loop.quit ();
});
loop.run ();
}
7.4 GIO 异步操作
GIO 是 GLib 的 I/O 库,提供了丰富的异步操作。
7.4.1 异步文件读取
public async string read_file_async (string path) throws GLib.Error {
// 打开文件
var file = GLib.File.new_for_path (path);
// 异步读取
var stream = yield file.read_async (Priority.DEFAULT, null);
// 异步读取全部内容
var data_stream = new GLib.DataInputStream (stream);
var contents = new StringBuilder ();
string line;
size_t length;
while ((line = yield data_stream.read_line_async (
Priority.DEFAULT, null, out length)) != null) {
contents.append (line);
contents.append_c ('\n');
}
return contents.str;
}
void main () {
var loop = new MainLoop ();
read_file_async.begin ("/etc/hostname", (obj, res) => {
try {
string content = read_file_async.end (res);
print ("文件内容:\n%s\n", content);
} catch (GLib.Error e) {
printerr ("读取失败: %s\n", e.message);
}
loop.quit ();
});
loop.run ();
}
7.4.2 异步文件写入
public async void write_file_async (string path, string content)
throws GLib.Error
{
var file = GLib.File.new_for_path (path);
// 异步创建文件
var stream = yield file.replace_async (
null, // etag
false, // make_backup
GLib.FileCreateFlags.NONE,
Priority.DEFAULT,
null // cancellable
);
// 异步写入
yield stream.write_async (
content.data,
Priority.DEFAULT,
null
);
// 异步关闭
yield stream.close_async (Priority.DEFAULT, null);
print ("文件写入完成: %s\n", path);
}
void main () {
var loop = new MainLoop ();
write_file_async.begin ("/tmp/test.txt", "Hello, Vala!", (obj, res) => {
try {
write_file_async.end (res);
} catch (GLib.Error e) {
printerr ("写入失败: %s\n", e.message);
}
loop.quit ();
});
loop.run ();
}
7.4.3 异步网络请求
public async string http_get (string url) throws GLib.Error {
var session = new Soup.Session ();
var message = new Soup.Message ("GET", url);
// 异步发送请求
GLib.Bytes response = yield session.send_and_read_async (
message,
Priority.DEFAULT,
null
);
if (message.status_code != 200) {
throw new GLib.IOError.FAILED (
"HTTP 错误: %u".printf (message.status_code)
);
}
return (string) response.get_data ();
}
void main () {
var loop = new MainLoop ();
print ("发送 HTTP 请求...\n");
http_get.begin ("https://httpbin.org/get", (obj, res) => {
try {
string body = http_get.end (res);
print ("响应:\n%s\n", body.substring (0, 200));
} catch (GLib.Error e) {
printerr ("请求失败: %s\n", e.message);
}
loop.quit ();
});
loop.run ();
}
💡 使用 Soup 库需要在编译时链接
--pkg libsoup-3.0。
7.5 超时和取消
7.5.1 异步超时
public async string fetch_with_timeout (string url, uint timeout_ms)
throws GLib.Error
{
var cancellable = new GLib.Cancellable ();
// 设置超时
GLib.Timeout.add (timeout_ms, () => {
cancellable.cancel ();
return GLib.Source.REMOVE;
});
// 使用 cancellable
var file = GLib.File.new_for_uri (url);
var stream = yield file.read_async (
Priority.DEFAULT,
cancellable
);
return "请求完成";
}
void main () {
var loop = new MainLoop ();
fetch_with_timeout.begin ("https://example.com", 5000, (obj, res) => {
try {
string result = fetch_with_timeout.end (res);
print ("%s\n", result);
} catch (GLib.Error e) {
if (e is GLib.IOError.CANCELLED) {
printerr ("请求超时!\n");
} else {
printerr ("错误: %s\n", e.message);
}
}
loop.quit ();
});
loop.run ();
}
7.5.2 手动取消
public async void long_running_task (GLib.Cancellable? cancellable = null)
throws GLib.Error
{
for (int i = 0; i < 10; i++) {
// 检查是否被取消
if (cancellable != null && cancellable.is_cancelled ()) {
throw new GLib.IOError.CANCELLED ("任务被取消");
}
print ("处理步骤 %d/10\n", i + 1);
yield do_step_async ();
}
print ("任务完成!\n");
}
public async void do_step_async () {
// 模拟工作
GLib.usleep (500000); // 0.5 秒
}
void main () {
var loop = new MainLoop ();
var cancellable = new GLib.Cancellable ();
// 启动任务
long_running_task.begin (cancellable, (obj, res) => {
try {
long_running_task.end (res);
} catch (GLib.Error e) {
printerr ("任务失败: %s\n", e.message);
}
loop.quit ();
});
// 3 秒后取消任务
GLib.Timeout.add (3000, () => {
print ("正在取消任务...\n");
cancellable.cancel ();
return GLib.Source.REMOVE;
});
loop.run ();
}
7.6 并行异步操作
7.6.1 并行执行多个任务
public async int fetch_value (string name, int delay_ms) {
print ("[%s] 开始获取\n", name);
GLib.usleep (delay_ms * 1000);
print ("[%s] 获取完成\n", name);
return delay_ms;
}
public async void parallel_tasks () {
var results = new int[3];
// 并行启动多个异步任务
var task1 = fetch_value.begin ("任务A", 1000, (obj, res) => {
results[0] = fetch_value.end (res);
});
var task2 = fetch_value.begin ("任务B", 2000, (obj, res) => {
results[1] = fetch_value.end (res);
});
var task3 = fetch_value.begin ("任务C", 1500, (obj, res) => {
results[2] = fetch_value.end (res);
});
// 等待所有任务完成
// 注意:在 Vala 中需要手动管理多个任务的完成
GLib.usleep (3000000); // 等待足够时间
print ("结果: %d, %d, %d\n", results[0], results[1], results[2]);
}
void main () {
var loop = new MainLoop ();
parallel_tasks.begin ((obj, res) => {
parallel_tasks.end (res);
print ("所有并行任务完成\n");
loop.quit ();
});
loop.run ();
}
7.6.2 顺序执行 vs 并行执行
public async void sequential () {
print ("=== 顺序执行 ===\n");
int r1 = yield fetch_value ("A", 1000);
int r2 = yield fetch_value ("B", 1000);
int r3 = yield fetch_value ("C", 1000);
print ("总结果: %d\n", r1 + r2 + r3);
}
public async void parallel () {
print ("=== 并行执行 ===\n");
// 这里简化处理,实际需要更复杂的并发管理
int r1 = yield fetch_value ("X", 1000);
int r2 = yield fetch_value ("Y", 1000);
int r3 = yield fetch_value ("Z", 1000);
print ("总结果: %d\n", r1 + r2 + r3);
}
7.7 协程(Coroutines)
Vala 的 async/await 本质上是协程(Coroutines):
7.7.1 协程概念
协程是可以暂停和恢复的函数:
main() async_func()
| |
|--- begin -------->|
| | yield (暂停)
|<-- 返回控制权 ----|
| |
| (继续运行) | (等待)
| |
|<-- 完成回调 -------| (恢复)
| |
v v
7.7.2 生成器模式
// 使用 async 实现生成器风格
public async int? generate_fibonacci (int count) {
int a = 0, b = 1;
for (int i = 0; i < count; i++) {
yield; // 暂停,返回控制权
int temp = a;
a = b;
b = temp + b;
// 在实际实现中,这里会通过其他方式返回值
}
return a;
}
// 更实用的异步迭代
public class AsyncIterator<T> : Object {
private T[] items;
private int index = 0;
public AsyncIterator (T[] items) {
this.items = items;
}
public async T? next () {
if (index >= items.length) {
return null;
}
// 模拟异步延迟
GLib.usleep (100000); // 0.1 秒
return items[index++];
}
public bool has_next () {
return index < items.length;
}
}
void main () {
var loop = new MainLoop ();
var numbers = new int[] {1, 2, 3, 4, 5};
var iter = new AsyncIterator<int> (numbers);
iterate_async.begin (iter, (obj, res) => {
iterate_async.end (res);
loop.quit ();
});
loop.run ();
}
public async void iterate_async (AsyncIterator<int> iter) {
while (iter.has_next ()) {
int? val = yield iter.next ();
if (val != null) {
print ("值: %d\n", val);
}
}
}
7.8 业务场景:异步下载管理器
// 下载状态
public enum DownloadState {
PENDING,
DOWNLOADING,
COMPLETED,
FAILED
}
// 下载任务
public class DownloadTask : Object {
public string url { get; set; }
public string dest_path { get; set; }
public DownloadState state { get; set; default = DownloadState.PENDING; }
public int progress { get; set; default = 0; }
public string? error_message { get; set; }
public signal void state_changed (DownloadState new_state);
public signal void progress_updated (int percent);
public DownloadTask (string url, string dest_path) {
Object (url: url, dest_path: dest_path);
}
}
// 下载管理器
public class DownloadManager : Object {
private GLib.List<DownloadTask> queue = new GLib.List<DownloadTask> ();
private int max_concurrent = 3;
private int active_count = 0;
public signal void task_completed (DownloadTask task);
public signal void all_completed ();
public void add_task (DownloadTask task) {
queue.append (task);
}
public async void start_all () {
print ("开始下载 %u 个任务\n", queue.length ());
foreach (var task in queue) {
if (task.state == DownloadState.PENDING) {
yield download_task (task);
}
}
all_completed ();
}
private async void download_task (DownloadTask task) {
task.state = DownloadState.DOWNLOADING;
task.state_changed (DownloadState.DOWNLOADING);
print ("[下载] %s\n", task.url);
try {
// 模拟下载进度
for (int i = 0; i <= 100; i += 10) {
task.progress = i;
task.progress_updated (i);
GLib.usleep (100000); // 模拟下载
}
task.state = DownloadState.COMPLETED;
task.state_changed (DownloadState.COMPLETED);
print ("[完成] %s\n", task.url);
task_completed (task);
} catch (GLib.Error e) {
task.state = DownloadState.FAILED;
task.error_message = e.message;
task.state_changed (DownloadState.FAILED);
printerr ("[失败] %s: %s\n", task.url, e.message);
}
}
}
void main () {
var loop = new MainLoop ();
var manager = new DownloadManager ();
// 添加下载任务
manager.add_task (new DownloadTask (
"https://example.com/file1.zip", "/tmp/file1.zip"
));
manager.add_task (new DownloadTask (
"https://example.com/file2.zip", "/tmp/file2.zip"
));
manager.add_task (new DownloadTask (
"https://example.com/file3.zip", "/tmp/file3.zip"
));
// 连接信号
manager.task_completed.connect ((task) => {
print ("✅ 任务完成: %s\n", task.url);
});
manager.all_completed.connect (() => {
print ("\n🎉 所有下载完成!\n");
loop.quit ();
});
// 开始下载
manager.start_all.begin ();
loop.run ();
}
7.9 注意事项
⚠️ 异步编程常见陷阱
- 必须有主循环:异步操作需要
GLib.MainLoop运行 yield不是阻塞:yield将控制权交还给主循环,不是阻塞线程- 错误传播:异步方法的错误通过
end()方法抛出 - 生命周期管理:确保异步对象在回调执行期间仍然有效
- 嵌套异步:使用
yield调用其他异步方法 - 回调顺序:回调可能在主循环的下一次迭代中执行
7.10 扩展阅读
7.11 总结
| 要点 | 说明 |
|---|---|
async | 定义异步方法 |
yield | 暂停异步方法,交还控制权 |
begin() | 启动异步操作 |
end() | 获取异步结果 |
| 主循环 | 异步操作的运行基础 |
| 错误处理 | 通过 end() 抛出 |
| 取消 | 使用 GLib.Cancellable |
下一章我们将学习如何使用 Vala 开发 GTK 应用。→ 第 8 章:GTK 应用开发