天天看點

Dart中建立StreamDart中建立Stream

文章目錄

  • Dart中建立Stream
    • 1.轉變已有的Stream
    • 2.從頭開始建立流
    • 3.流控制器
      • 3.1等待訂閱
      • 3.2遵守暫停狀态
    • 總結

Dart中建立Stream

可以從以下幾個方法來建立流

  • 轉變已有的Stream
  • 使用

    async*

    關鍵字建立流
  • 使用

    StreamController

    來建立流

1.轉變已有的Stream

在已經有了一個流,想基于原始流事件建立一個新的流。例如,希望通過UTF-8對輸入進行解碼,将一個位元組流轉換為字元串流。最常用的方法是建立一個新流,該流等待原始流上的事件,然後輸出新的事件。

Stream<String> lines(Stream<String> source) async* {
  var partial = '';
  await for (var chunk in source) {
    var lines = chunk.split('\n');
    lines[0] = partial + lines[0];
    partial = lines.removeLast();
    for (var line in lines) {
      yield line;
    }
  }
  if (partial.isNotEmpty) yield partial;
}
           

例如,假設你有一個流,

counterStream

,每秒釋放一個遞增的計數器。下面是如何實作的:

var counterStram =
      Stream<int>.periodic(Duration(seconds: 1), (x) => x).take(15);
  counterStram.forEach(print);
           

要轉換流事件,您可以在監聽流之前在流上調用轉換方法,如map()。該方法傳回一個新的流。

var counterStram =
      Stream<int>.periodic(Duration(seconds: 1), (x) => x).take(15);
var doubleCounterStream = counterStram.map((event) => event * 2);
doubleCounterStream.forEach(print);
           

通常,你隻需要一個轉換方法。但是,如果您需要對轉換進行更多的控制,您可以使用Stream 的 transform()方法指定StreamTransformer。平台庫為許多常見任務提供流轉換器。例如,下面的代碼使用了由dart:convert庫提供的utf8.decoder和LineSplitter轉換器。

main(List<String> args) async {
  Stream<List<int>> content = File(r'.\t6.dart').openRead();
  List<String> lines =
      await content.transform(utf8.decoder).transform(LineSplitter()).toList();
  lines.forEach((element) {
    print(element);
  });
}
           

2.從頭開始建立流

建立新流的一種方法是使用異步生成器(async*)函數。流是在調用函數時建立的,當偵聽流時,函數的主體開始運作。當函數傳回時,流關閉。在函數傳回之前,它可以使用

yield

yield*

語句在流上發出事件。

下面是一個基本的示例,它定期發出數字.

Stream<int> timedCounter(Duration interval, [int maxCount]) async* {
  int i = 0;
  while (true) {
    await Future.delayed(interval);
    yield i++;
    if (i == maxCount) break;
  }
}

main(List<String> args) async{
  var stream = timedCounter(Duration(seconds: 2), 5);
  await for (var i in stream) {
    print(i);
  }
}
           

當偵聽器取消(通過在listen()方法傳回的StreamSubscription對象上調用cancel())時,那麼下一次主體到達yield語句時,yield将充當傳回語句。執行任何封閉的finally塊,函數退出。如果函數試圖在退出前生成一個值,則會失敗。

當該函數最終退出時,cancel()方法傳回的future就完成了。如果函數因錯誤而退出,則将來會因該錯誤而結束;否則,它以null結束。

以下示例是将Future序列轉化為一個Stream

Stream<T> streamFromFutures<T>(Iterable<Future<T>> futures) async* {
	for(var future in futures){
		var result = await future;
		yield result;
	}
}
           

這個函數請求可疊代的future來獲得一個新的future,等待那個future,發出結果值,然後進行循環。如果一個future完成時出現了錯誤,那麼流也會随着該錯誤完成。

用一個async函數從零開始建構流是很少見的。它需要從某個地方擷取資料,而這個地方通常是另一個流。在某些情況下,如上面的Futures序列,資料來自其他異步事件源。然而,在許多情況下,async函數過于簡單,無法輕松處理多個資料源, 這就是StreamController(流控制器)類存在的原因。

3.流控制器

如果流的事件來自程式的不同部分,而不隻是來自可以由異步函數周遊的Stream或Future,則使用

StreamController

建立和填充流。

StreamController

提供了一個新流,以及在任何點和任何地方向流中添加事件的方法。流具有處理偵聽器和暫停所需的所有邏輯。傳回Stream或者保持Controller完全可以自己控制。

下面的示例(來自流控制器bad.dart)展示了流控制器的基本用法(盡管有缺陷),以實作前面示例中的timedCounter()函數。這段代碼建立一個要傳回的流,然後根據計時器事件(既不是Future事件也不是Stream

事件)将資料提供給它。

import 'dart:async';

Stream<int> timedCounter(Duration interval, [int maxCount]) {
  var controller = StreamController<int>();
  int counter = 0;
  void tick(Timer timer) {
    counter++;
    controller.add(counter);
    if (maxCount != null && counter >= maxCount) {
      timer.cancel();
      controller.close();
    }
  }

  Timer.periodic(interval, tick);
  return controller.stream;
}

main(List<String> args) {
  var countStream = timedCounter(Duration(seconds: 1), 10);
  countStream.listen(print);
}
           

以上代碼存的

timedCounter()

存在兩個問題。

  • 在訂閱者接收之前已經開始生産事件
  • 當訂閱者暫停,它仍然持續生産事件

在接下來的内容中,将使用

onListen

onPause

來解決上述問題。

3.1等待訂閱

作為一個規則,流應該等待訂閱者開工作才開始生産事件。

import 'dart:async';

Stream<int> timedCounter(Duration interval, [int maxCount]) {
  var controller = StreamController<int>();
  int counter = 0;
  void tick(Timer timer) {
    counter++;
    controller.add(counter);
    if (maxCount != null && counter >= maxCount) {
      timer.cancel();
      controller.close();
    }
  }

  Timer.periodic(interval, tick);
  return controller.stream;
}

void listenAfterDelay() async {
  var counterStream = timedCounter(const Duration(seconds: 1), 15);
  await Future.delayed(const Duration(seconds: 5));
  await for (int n in counterStream) {
    print(n);
  }
}

main(List<String> args) {
  listenAfterDelay();
}

           

當這段代碼運作時,在前5秒内沒有列印任何東西,但是流已經在工作中。是以當暫停5秒結束,流中已經緩存了5個事件。是以觀察輸出結果可以發現前5個結果幾乎同時輸出。

3.2遵守暫停狀态

為了避免在偵聽器請求暫停時産生事件,

async*

函數自動在

yield

處暫停。但是

StreamController

在訂閱者暫停時也持續生産事件,這樣可能會導緻流的緩沖區無限增長。

void listenWithPause() {
  var counterStream = timedCounter(const Duration(seconds: 1), 15);
  StreamSubscription<int> subscription;
  subscription = counterStream.listen((counter) {
    print(counter);
    if (counter == 5) {
      subscription.pause(Future.delayed(const Duration(seconds: 5)));
    }
  });
}
           

以上例子中,當

subscription

暫停5秒時,Stream仍然在持續生産。

以下版本的timedCounter()通過使用StreamController上的onListen、onPause、onResume和onCancel回調來實作暫停。

import 'dart:async';

Stream<int> timedCounter(Duration interval, [int maxCount]) {
  StreamController<int> controller;
  Timer timer;
  int counter = 0;
  void tick(_) {
    counter++;
    controller.add(counter);
    if (counter == maxCount) {
      timer.cancel();
      controller.close();
    }
  }

  void startTimer() {
    timer = Timer.periodic(interval, tick);
  }

  void stopTimer() {
    if (timer != null) {
      timer.cancel();
      timer = null;
    }
  }

  controller = StreamController<int>(
      onListen: startTimer,
      onPause: stopTimer,
      onResume: startTimer,
      onCancel: stopTimer);

  return controller.stream;
}

void listenWithPause() {
  var counterStream = timedCounter(const Duration(seconds: 1), 15);
  StreamSubscription<int> subscription;
  subscription = counterStream.listen((counter) {
    print(counter);
    if (counter == 5) {
      subscription.pause(Future.delayed(const Duration(seconds: 5)));
    }
  });
}

main(List<String> args) {
  listenWithPause();
}
           

上述例子中,當訂閱者執行pause時,Stream同時也會執行onPause方法實作暫停生産事件。

總結

當不使用

async*

函數來建立Stream時,需要記住以下幾點:

  • 避免使用同步

    controller

    StreamController(sync: true)

    。當向未暫停的同步控制器上發送事件時,改事件會立即送到所有的流偵聽器上。在添加偵聽器代碼完全傳回之前是不能調用偵聽器的,但是同步控制器可能會破壞這一規則。
  • 如果使用StreamController,則在listen調用傳回StreamSubscription之前調用onListen回調。不要讓onListen回調依賴于已經存在的訂閱。
  • StreamController定義的onListen、onPause、onResume和onCancel回調在流偵聽器狀态改變時被流調用,但不會在别的改變狀态的回調函數執行期間調用。在這些情況下,狀态更改回調被延遲,直到前一個回調完成。
  • 不要嘗試自己實作流接口。在事件、回調以及添加和删除偵聽器之間進行互動很容易出錯。始終使用現有流(可能來自StreamController)來實作新流的listen調用。
  • 盡管可以通過擴充Stream類并在上面實作listen方法和額外功能來建立具有更多功能的擴充Stream的類,但通常不建議這樣做,因為它引入了使用者必須考慮的新類型。通常的做法是建立一個類,該類中又一個stream成員變量。