注册

【Flutter 异步编程 - 叁】 | 初步认识 Stream 类的使用

一、分析 Stream 对象


要了解一个事物,最好去思考它存在的 价值 。当你可以意识到某个事物的作用,缺少它会有什么弊端,自然会有兴趣去了解它。而不是稀里糊涂的看别人怎么用,自己死记硬背 API 有哪些,分别表示什么意思。一味的堆砌知识点,这样无论学什么都是流于表面,不得要领。




1. Stream 存在的必要性

可能很多朋友都没有在开发中使用过 Stream 对象,知道它挺重要,但又不知道他的具体的用途。有种只可远观,不可亵玩的距离感。Stream 可以弥补 Future 的短板,它对于异步来说是一块很重要的版块。


一个 Future 对象诞生的那一刻,无论成败,它最终注定只有一个结果。就像一个普通的网络接口,一次请求只会有一个响应结果。应用开发在绝大多数场景是一个 ,对应一个 ,所以和 Future 打交道比较多。


image.png


但有些场景,任务无法一次完成,对于 一次 请求,会有 若干次 响应。比如现实生活中,你追更一部小说,在你订阅后,作者每次新时,都会通知你。在这个场景下,小说完结代表任务结束,期间会触发多次响应通知,这是 Future 无法处理的。


另外,事件通知的时间不确定的,作者创作的过程也是非常耗时的,所以机体没有必要处于同步等待 的阻塞状态。像这种 异步事件序列 被形象的称之为 Stream 流



在人类科学中,一件重要事物的存在,必然有其发挥效用的场所,在这片领域之下,它是所向披靡的王。在接触新知识、新概念时,感知这片领域非常重要,一个工具只有在合适的场景下,才能发挥最大的效力。




2.从读取文件认识 Stream 的使用

File 对象可以通过 readAsString 异步方法读取文件内容,返回 Future<String> 类型对象。而 Future 异步任务只有一次响应机会,通过 then 回调,所以该方法会将文件中的 所有字符 读取出来。


---->[File#readAsString]---
Future<String> readAsString({Encoding encoding = utf8});

但有些场景中没有必要不能 全部读取。比如,想要在一个大文件中寻找一些字符,找到后就 停止读取 ;想要在读取文件时 显示 读取进度。这时,只能响应一次事件的 Future 就爱莫能助了,而这正是 Stream 大显身手的领域。在 File 类中有 openRead 方法返回 Stream 对象,我们先通过这个方法了解一下 Stream 的使用方式。


Stream<List<int>> openRead([int? start, int? end]);



现在的场景和上面 追更小说 是很相似的:



  • 小说作者 无需一次性向 读者 提供所有的章节;小说是 一章章 进行更新的,每次更新章节,都需要 通知读者 进行阅读。
  • 操作系统 不用一次性读取全部文件内容,返回给请求的 机体;文件是 一块块 进行读取的,每块文件读取完,需要 通知机体 进行处理。


在对 Stream 的理解中,需要认清两个角色: 发布者订阅者 。其中发布者是真正处理任务的机体,是结果的生产者,比如 作者操作系统服务器 等,它们有 发送通知 的义务。订阅者是发送请求的机体,对于异步任务,其本身并不参与到执行过程中,可以监听通知来获取需要的结果数据。


代码处理中 Stream 对象使用 listen 方法 监听通知 ,该方法的第一入参是回调函数,每次通知时都会被触发。回调函数的参数类型是 Stream 的泛型,表示此次通知时携带的结果数据。


StreamSubscription<T> listen(void onData(T event)?,
{Function? onError, void onDone()?, bool? cancelOnError});



如下是通过 Stream 事件读取文件,显示读取进度的处理逻辑。当 openRead 任务分发之后,操作系统会一块一块地对文件进行读取,每读一块会发送通知。Dart 代码中通过 _onData 函数进行监听,回调的 bytes 就是读取的字节数组结果。


image.png


_onData 函数中根据每次回调的字节数,就可以很轻松地计算出读取的进度。 onDone 指定的函数,会在任务完成时被触发,任务完成也就表示不会再有事件通知了。


void readFile() async {
File file = File(path.join(Directory.current.path, "assets", "Jane Eyre.txt"));
print("开始读取 Jane Eyre.txt ");
fileLength = await file.length();
Stream<List<int>> stream = file.openRead();
stream.listen(_onData,onDone: _onDone);
}

void _onData(List<int> bytes) {
counter += bytes.length;
double progress = counter * 100 / fileLength;
DateTime time = DateTime.now();
String timeStr = "[${time.hour}:${time.minute}:${time.second}:${time.millisecond}]";
print(timeStr + "=" * (progress ~/ 2) + '[${progress.toStringAsFixed(2)}%]');
}

void _onDone() {
print("读取 Jane Eyre.txt 结束");
}



3.初步认识 StreamSubscription

Stream#listen 方法监听后,会返回一个 StreamSubscription 对象,表示此次对流的订阅。


StreamSubscription<T> listen(void onData(T event)?,
{Function? onError, void onDone()?, bool? cancelOnError});

通过这个订阅对象,可以暂停 pause 或恢复 resume 对流的监听,以及通过 cancel 取消对流的监听。


---->[StreamSubscription]----
void pause([Future<void>? resumeSignal]);
void resume();
Future<void> cancel();



比如下面当进度大于 50 时,取消对流的订阅:通过打印日志可以看出 54.99% 时,订阅取消,流也随之停止,可以注意一个细节。此时 onDone 回调并未触发,表示当 Stream 任务被取消订阅时,不能算作完成。


image.png


late StreamSubscription<List<int>> subscription;

void readFile() async {
File file = File(path.join(Directory.current.path, "assets", "Jane Eyre.txt"));
print("开始读取 Jane Eyre.txt ");
fileLength = await file.length();
Stream<List<int>> stream = file.openRead();
// listen 方法返回 StreamSubscription 对象
subscription = stream.listen(_onData,onDone: _onDone);
}


void _onData(List<int> bytes) async{
counter += bytes.length;
double progress = counter * 100 / fileLength;
DateTime time = DateTime.now();
String timeStr = "[${time.hour}:${time.minute}:${time.second}:${time.millisecond}]";
print(timeStr + "=" * (progress ~/ 2) + '[${progress.toStringAsFixed(2)}%]');
if(progress > 50){
subscription.cancel(); // 取消订阅
}
}



二、结合应用理解 Stream 的使用


单看 Dart 代码在控制台打印,实在有些不过瘾。下面通过一个有趣的小例子,介绍 StreamFlutter 项目中的使用。这样可以更形象地认识 Stream 的用途,便于进一步理解。




1. 场景分析

现实生活中如果细心观察,会发现很多 Stream 概念的身影。比如在银行办理业务时,客户可以看作 Stream 中的一个元素,广播依次播报牌号,业务员需要对某个元素进行处理。在餐馆中,每桌的客人可以看作 Stream 中的一个元素,客人下单完成,厨师根据请求准备饭菜进行处理。这里,通过模拟 红绿灯 的状态变化,来说明 Stream 的使用。


16.gif


可以想象,在一个时间轴上,信号灯的变化是一个连续不断的事件。我们可以将每次的变化视为 Stream 中的一个元素,信号灯每秒的状态信息都会不同。也就是说,这个 Stream 每秒会产出一个状态,要在应用中模拟红绿灯,只需要监听每次的通知,更新界面显示即可。


befcfa2a7819d9739bcba435aff62d1.png


这里将信号灯的状态信息通过 SignalState 类来封装,成员变量有当前秒数 counter 和信号灯类型 type 。 其中信号灯类型通过 SignalType 枚举表示,有如下三种类型:


const int _kAllowMaxCount = 10;
const int _kWaitMaxCount = 3;
const int _kDenialMaxCount = 10;

class SignalState {
final int counter;
final SignalType type;

SignalState({
required this.counter,
required this.type,
});
}

enum SignalType {
allow, // 允许 - 绿灯
denial, // 拒绝 - 红灯
wait, // 等待 - 黄灯
}



2. 信号灯组件的构建

如下所示,信号灯由三个 Lamp 组件和数字构成。三个灯分别表示 红、黄、绿 ,某一时刻只会量一盏,不亮的使用灰色示意。三个灯水平排列,有一个黑色背景装饰,和文字呈上下结构。


image.png




先看灯 Lamp 组件的构建:逻辑非常简单,使用 Container 组件显示圆形,构造时可指定颜色值,为 null 时显示灰色。


class Lamp extends StatelessWidget {
final Color? color;

const Lamp({Key? key, required this.color}) : super(key: key);

@override
Widget build(BuildContext context) {
return Container(
width: 40,
height: 40,
decoration: BoxDecoration(
color: color ?? Colors.grey.withOpacity(0.8),
shape: BoxShape.circle,
),
);
}
}



如下是 SignalLamp 组件的展示效果,其依赖于 SignalState 对象进行显示。根据 SignalType 确定显示的颜色和需要点亮的灯,状态中的 counter 成员用于展示数字。


image.png


class SignalLamp extends StatelessWidget {
final SignalState state;

const SignalLamp({Key? key, required this.state}) : super(key: key);

Color get activeColor {
switch (state.type) {
case SignalType.allow:
return Colors.green;
case SignalType.denial:
return Colors.red;
case SignalType.wait:
return Colors.amber;
}
}

@override
Widget build(BuildContext context) {
return Column(
children: [
Container(
padding: const EdgeInsets.symmetric(horizontal: 15, vertical: 10),
decoration: BoxDecoration(
color: Colors.black, borderRadius: BorderRadius.circular(30),),
child: Wrap(
alignment: WrapAlignment.center,
crossAxisAlignment: WrapCrossAlignment.center,
spacing: 15,
children: [
Lamp(color: state.type == SignalType.denial ? activeColor : null),
Lamp(color: state.type == SignalType.wait ? activeColor : null),
Lamp(color: state.type == SignalType.allow ? activeColor : null),
],
),
),
Text(
state.counter.toString(),
style: TextStyle(
fontWeight: FontWeight.bold, fontSize: 50, color: activeColor,
),
)
],
);
}
}



4. Stream 事件的添加与监听

这样,指定不同的 SignalState 就会呈现相应的效果,如下是黄灯的 2 s


SignalLamp(
state: SignalState(counter: 2, type: SignalType.wait),
)

image.png


在使用 Stream 触发更新之前,先说一下思路。Stream 可以监听一系列事件的触发,每次监听会获取新的信号状态,根据新状态渲染界面即可。如下在 SignalState 中定义 next 方法,便于产出下一状态。逻辑很简单,如果数值大于一,类型不变,数值减一,比如 红灯 6 的下一状态是 红灯 5 ; 如果数值等于一,会进入下一类型的最大数值,比如 红灯 1 的下一状态是 黄灯 3


---->[SignalState]----
SignalState next() {
if (counter > 1) {
return SignalState(type: type, counter: counter - 1);
} else {
switch (type) {
case SignalType.allow:
return SignalState(
type: SignalType.denial, counter: _kDenialMaxCount);
case SignalType.denial:
return SignalState(type: SignalType.wait, counter: _kWaitMaxCount);
case SignalType.wait:
return SignalState(type: SignalType.allow, counter: _kAllowMaxCount);
}
}
}



把每个事件通知看做元素,Stream 应用处理事件序列,只不过序列中的元素在此刻是未知的,何时触发也是不定的。Stream 基于 发布-订阅 的思想通过监听来处理这些事件。 其中两个非常重要的角色: 发布者 是元素的生产者,订阅者 是元素的消费者。


在引擎中的 async 包中封装了 StreamController 类用于控制元素的添加操作,同时提供 Stream 对象用于监听。代码处理如下,tag1 处,监听 streamControllerstream 对象。事件到来时触发 emit 方法 ( 方法名任意 ),在 emit 中会回调出 SignalState 对象,根据这个新状态更新界面即可。然后延迟 1s 继续添加下一状态。


---->[_MyHomePageState]----
final StreamController<SignalState> streamController = StreamController();
SignalState _signalState = SignalState(counter: 10, type: SignalType.denial);

@override
void initState() {
super.initState();
streamController.stream.listen(emit); // tag1
streamController.add(_signalState);
}

@override
void dispose() {
super.dispose();
streamController.close();
}

void emit(SignalState state) async {
_signalState = state;
setState(() {});
await Future.delayed(const Duration(seconds: 1));
streamController.add(state.next());
}

这样 streamController 添加元素,作为 发布者;添加的元素可以通过 StreamControllerstream 成员进行监听。





5. Stream 的控制与异常监听

在前面介绍过 Stream#listen 方法会返回一个 StreamSubscription 的订阅对象,通过该对象可以暂停、恢复、取消对流的监听。如下所示,通过点击按钮执行 _toggle 方法,可以达到 暂停/恢复 切换的效果:



---->[_MyHomePageState]----
late StreamSubscription<SignalState> _subscription;

@override
void initState() {
super.initState();
_subscription = streamController.stream.listen(emit);
streamController.add(_signalState);
}

void _toggle() {
if(_subscription.isPaused){
_subscription.resume();
}else{
_subscription.pause();
}
setState(() {});
}



另外,StreamController 在构造时可以传入四个函数来监听流的状态:


image.png


final StreamController<SignalState> streamController = StreamController(
onListen: ()=> print("=====onListen====="),
onPause: ()=> print("=====onPause====="),
onResume: ()=> print("=====onResume====="),
onCancel: ()=> print("=====onCancel====="),
);

onListen 会在 stream 成员被监听时触发一次;onPauseonResumeonCancel 分别对应订阅者的 pauseresumecancel 方法。如下是点击暂停和恢复的日志信息:


image.png




Stream#listen 方法中还有另外两个可选参数用于异常的处理。 onError 是错误的回调函数,cancelOnError 标识用于控制触发异常时,是否取消 Stream


StreamSubscription<T> listen(void onData(T event)?,
{Function? onError, void onDone()?, bool? cancelOnError});

如下所示,在 emit 中故意在 红 7 时通过 addError 添加一个异常元素。这里界面简单显示错误信息,在 3 s 后异常被修复,继续添加新元素。



void emit(SignalState state) async {
_signalState = state;
setState(() {});
await Future.delayed(const Duration(seconds: 1));
SignalState nextState = state.next();
if (nextState.counter == 7 && nextState.type == SignalType.denial) {
streamController.addError(Exception('Error Signal State'));
} else {
streamController.add(nextState);
}
}



listen 方法中使用 onError 监听异常事件,进行处理:其中逻辑是渲染错误界面,三秒后修复异常,继续产出下一状态:


_subscription = streamController.stream.listen(
emit,
onError: (err) async {
print(err);
renderError();
await Future.delayed(const Duration(seconds: 3));
fixError();
emit(_signalState.next());
},
cancelOnError: false,
);

关于异常的处理,这里简单地提供 hasError 标识进行构建逻辑的区分:



bool hasError = false;

void renderError(){
hasError = true;
setState(() {});
}
void fixError(){
hasError = false;
}



最后说一下 listencancelOnError 的作用,它默认是 false 。如果 cancelOnError = true ,在监听到异常之后,就会取消监听 stream ,也就是说之后控制器添加的元素就会监听了。这样异常时 StreamController 会触发 onCancel 回调:


image.png




三、异步生成器函数与 Stream


前面介绍了通过 StreamController 获取 Stream 进行处理的方式,下面再来看另一种获取 Stream 的方式 - 异步生成器函数




1. 思考 Stream 与 Iterable

通过前面对 Stream 的认识,我们知道它是在 时间线 上可拥有若干个可监听的事件元素。而 Iterable 也可以拥有多个元素,两者之间是有很大差距的。Iterable时间空间 上都对元素保持持有关系;而 Stream 只是在时间上监听若干元素的到来,并不在任意时刻都持有元素,更不会在空间上保持持有关系。


对于一个 Type 类型的数据,在异步任务中,Stream<T>Future<T> 就是多值和单值的区别,它们的结果都不能在 当前时刻 得到,只能通过监听在 未来 得到值。 与之相对的就是 Iterable<Type>Type ,它们代表此时此刻,实实在在的对象,可以随时使用。






















单值多值
同步TypeIterable<Type>
异步Future<Type>Stream<Type>



2. 通过异步生成器函数获取 Stream 对象

Future 对象可以通过 async/awiat 关键字,简化书写,更方便的获取异步任务结果。 对于 Stream 也有类似的 async*/yield 关键字。 如下所示, async* 修饰的方法需要返回一个 Stream 对象。


在方法体中通过 yield 关键字 产出 泛型结果对象,如下是对 信号状态流 元素产生出的逻辑:遍历 count 次,每隔 1 s 产出一个状态。


class SignalStream{
SignalState _signalState = SignalState(counter: 10, type: SignalType.denial);

Stream<SignalState> createStream({int count = 100}) async*{
for(int i = 0 ; i < count; i++){
await Future.delayed(const Duration(seconds: 1));
_signalState = _signalState.next();
yield _signalState;
}
}
}



这样,在 _MyHomePageState 中通过 signalStream.createStream() 就可以创建一个有 100 个元素的流,进行监听。每次接收到新状态时,更新界面,也可以达到目的:



---->[_MyHomePageState]---
final SignalStream signalStream = SignalStream();

_subscription = signalStream.createStream().listen(
emit,
);

void emit(SignalState state) async {
_signalState = state;
setState(() {});
}



到这里,关于 Stream 的初步认识就结束了,当然 Stream 的知识还有很多,在后面会陆续介绍。通过本文,你只需要明白 Stream 是什么,通过它我们能干什么就行了。下一篇我们将分析一下 FutureBuilderStreamBuilder 组件的使用和源码实现。它们是 Flutter 对异步对象的封装组件,通过对它们的认识,也能加深我们对 FutureStream 的立即。 那本文就到这里,谢谢观看 ~


作者:张风捷特烈
链接:https://juejin.cn/post/7147881475688366093
来源:稀土掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

0 个评论

要回复文章请先登录注册