文章目錄
- Dart中建立Stream
-
- 1.轉變已有的Stream
- 2.從頭開始建立流
- 3.流控制器
-
- 3.1等待訂閱
- 3.2遵守暫停狀态
- 總結
Dart中建立Stream
可以從以下幾個方法來建立流
- 轉變已有的Stream
- 使用
關鍵字建立流async*
- 使用
來建立流StreamController
1.轉變已有的Stream
在已經有了一個流,想基于原始流事件建立一個新的流。例如,希望通過UTF-8對輸入進行解碼,将一個位元組流轉換為字元串流。最常用的方法是建立一個新流,該流等待原始流上的事件,然後輸出新的事件。
Stream<String> lines(Stream<String> source) async* {
var partial = '';
await for (var chunk in source) {
var lines = chunk.split('\n');
lines[0] = partial + lines[0];
partial = lines.removeLast();
for (var line in lines) {
yield line;
}
}
if (partial.isNotEmpty) yield partial;
}
例如,假設你有一個流,
counterStream
,每秒釋放一個遞增的計數器。下面是如何實作的:
var counterStram =
Stream<int>.periodic(Duration(seconds: 1), (x) => x).take(15);
counterStram.forEach(print);
要轉換流事件,您可以在監聽流之前在流上調用轉換方法,如map()。該方法傳回一個新的流。
var counterStram =
Stream<int>.periodic(Duration(seconds: 1), (x) => x).take(15);
var doubleCounterStream = counterStram.map((event) => event * 2);
doubleCounterStream.forEach(print);
通常,你隻需要一個轉換方法。但是,如果您需要對轉換進行更多的控制,您可以使用Stream 的 transform()方法指定StreamTransformer。平台庫為許多常見任務提供流轉換器。例如,下面的代碼使用了由dart:convert庫提供的utf8.decoder和LineSplitter轉換器。
main(List<String> args) async {
Stream<List<int>> content = File(r'.\t6.dart').openRead();
List<String> lines =
await content.transform(utf8.decoder).transform(LineSplitter()).toList();
lines.forEach((element) {
print(element);
});
}
2.從頭開始建立流
建立新流的一種方法是使用異步生成器(async*)函數。流是在調用函數時建立的,當偵聽流時,函數的主體開始運作。當函數傳回時,流關閉。在函數傳回之前,它可以使用
yield
或
yield*
語句在流上發出事件。
下面是一個基本的示例,它定期發出數字.
Stream<int> timedCounter(Duration interval, [int maxCount]) async* {
int i = 0;
while (true) {
await Future.delayed(interval);
yield i++;
if (i == maxCount) break;
}
}
main(List<String> args) async{
var stream = timedCounter(Duration(seconds: 2), 5);
await for (var i in stream) {
print(i);
}
}
當偵聽器取消(通過在listen()方法傳回的StreamSubscription對象上調用cancel())時,那麼下一次主體到達yield語句時,yield将充當傳回語句。執行任何封閉的finally塊,函數退出。如果函數試圖在退出前生成一個值,則會失敗。
當該函數最終退出時,cancel()方法傳回的future就完成了。如果函數因錯誤而退出,則将來會因該錯誤而結束;否則,它以null結束。
以下示例是将Future序列轉化為一個Stream
Stream<T> streamFromFutures<T>(Iterable<Future<T>> futures) async* {
for(var future in futures){
var result = await future;
yield result;
}
}
這個函數請求可疊代的future來獲得一個新的future,等待那個future,發出結果值,然後進行循環。如果一個future完成時出現了錯誤,那麼流也會随着該錯誤完成。
用一個async函數從零開始建構流是很少見的。它需要從某個地方擷取資料,而這個地方通常是另一個流。在某些情況下,如上面的Futures序列,資料來自其他異步事件源。然而,在許多情況下,async函數過于簡單,無法輕松處理多個資料源, 這就是StreamController(流控制器)類存在的原因。
3.流控制器
如果流的事件來自程式的不同部分,而不隻是來自可以由異步函數周遊的Stream或Future,則使用
StreamController
建立和填充流。
StreamController
提供了一個新流,以及在任何點和任何地方向流中添加事件的方法。流具有處理偵聽器和暫停所需的所有邏輯。傳回Stream或者保持Controller完全可以自己控制。
下面的示例(來自流控制器bad.dart)展示了流控制器的基本用法(盡管有缺陷),以實作前面示例中的timedCounter()函數。這段代碼建立一個要傳回的流,然後根據計時器事件(既不是Future事件也不是Stream
事件)将資料提供給它。
import 'dart:async';
Stream<int> timedCounter(Duration interval, [int maxCount]) {
var controller = StreamController<int>();
int counter = 0;
void tick(Timer timer) {
counter++;
controller.add(counter);
if (maxCount != null && counter >= maxCount) {
timer.cancel();
controller.close();
}
}
Timer.periodic(interval, tick);
return controller.stream;
}
main(List<String> args) {
var countStream = timedCounter(Duration(seconds: 1), 10);
countStream.listen(print);
}
以上代碼存的
timedCounter()
存在兩個問題。
- 在訂閱者接收之前已經開始生産事件
- 當訂閱者暫停,它仍然持續生産事件
在接下來的内容中,将使用
onListen
和
onPause
來解決上述問題。
3.1等待訂閱
作為一個規則,流應該等待訂閱者開工作才開始生産事件。
import 'dart:async';
Stream<int> timedCounter(Duration interval, [int maxCount]) {
var controller = StreamController<int>();
int counter = 0;
void tick(Timer timer) {
counter++;
controller.add(counter);
if (maxCount != null && counter >= maxCount) {
timer.cancel();
controller.close();
}
}
Timer.periodic(interval, tick);
return controller.stream;
}
void listenAfterDelay() async {
var counterStream = timedCounter(const Duration(seconds: 1), 15);
await Future.delayed(const Duration(seconds: 5));
await for (int n in counterStream) {
print(n);
}
}
main(List<String> args) {
listenAfterDelay();
}
當這段代碼運作時,在前5秒内沒有列印任何東西,但是流已經在工作中。是以當暫停5秒結束,流中已經緩存了5個事件。是以觀察輸出結果可以發現前5個結果幾乎同時輸出。
3.2遵守暫停狀态
為了避免在偵聽器請求暫停時産生事件,
async*
函數自動在
yield
處暫停。但是
StreamController
在訂閱者暫停時也持續生産事件,這樣可能會導緻流的緩沖區無限增長。
void listenWithPause() {
var counterStream = timedCounter(const Duration(seconds: 1), 15);
StreamSubscription<int> subscription;
subscription = counterStream.listen((counter) {
print(counter);
if (counter == 5) {
subscription.pause(Future.delayed(const Duration(seconds: 5)));
}
});
}
以上例子中,當
subscription
暫停5秒時,Stream仍然在持續生産。
以下版本的timedCounter()通過使用StreamController上的onListen、onPause、onResume和onCancel回調來實作暫停。
import 'dart:async';
Stream<int> timedCounter(Duration interval, [int maxCount]) {
StreamController<int> controller;
Timer timer;
int counter = 0;
void tick(_) {
counter++;
controller.add(counter);
if (counter == maxCount) {
timer.cancel();
controller.close();
}
}
void startTimer() {
timer = Timer.periodic(interval, tick);
}
void stopTimer() {
if (timer != null) {
timer.cancel();
timer = null;
}
}
controller = StreamController<int>(
onListen: startTimer,
onPause: stopTimer,
onResume: startTimer,
onCancel: stopTimer);
return controller.stream;
}
void listenWithPause() {
var counterStream = timedCounter(const Duration(seconds: 1), 15);
StreamSubscription<int> subscription;
subscription = counterStream.listen((counter) {
print(counter);
if (counter == 5) {
subscription.pause(Future.delayed(const Duration(seconds: 5)));
}
});
}
main(List<String> args) {
listenWithPause();
}
上述例子中,當訂閱者執行pause時,Stream同時也會執行onPause方法實作暫停生産事件。
總結
當不使用
async*
函數來建立Stream時,需要記住以下幾點:
- 避免使用同步
:controller
。當向未暫停的同步控制器上發送事件時,改事件會立即送到所有的流偵聽器上。在添加偵聽器代碼完全傳回之前是不能調用偵聽器的,但是同步控制器可能會破壞這一規則。StreamController(sync: true)
- 如果使用StreamController,則在listen調用傳回StreamSubscription之前調用onListen回調。不要讓onListen回調依賴于已經存在的訂閱。
- StreamController定義的onListen、onPause、onResume和onCancel回調在流偵聽器狀态改變時被流調用,但不會在别的改變狀态的回調函數執行期間調用。在這些情況下,狀态更改回調被延遲,直到前一個回調完成。
- 不要嘗試自己實作流接口。在事件、回調以及添加和删除偵聽器之間進行互動很容易出錯。始終使用現有流(可能來自StreamController)來實作新流的listen調用。
- 盡管可以通過擴充Stream類并在上面實作listen方法和額外功能來建立具有更多功能的擴充Stream的類,但通常不建議這樣做,因為它引入了使用者必須考慮的新類型。通常的做法是建立一個類,該類中又一個stream成員變量。