Summary: in this tutorial, you’ll learn about Dart stream and how to process a stream including reading, handling errors, canceling, and transforming.
Introduction to the Dart Stream
A future represents a single value that will be returned by an asynchronous operation. A stream is like a list of futures, representing multiple values that will be returned in the future.
Dart uses the Stream<T>
class to create streams. Since the Stream<T>
is a generic class, you can have a stream of any objects.
Typically, you’ll use streams to:
- Read data from a large file in chunks.
- Download a resource from a remote server.
- Listen for requests coming into a server.
In practice, you’ll often consume streams from libraries rather than creating new streams from scratch. But to understand it deeply, we’ll show you how to create a simple stream.
Creating a stream
To create a stream, you use the StreamController<T>
class. The StreamController<T>
class creates a stream that others can listen to and push events (or data) to it.
Here are the steps for creating a stream:
First, create an instance of the StreamController<T>
class:
var controller = StreamController<int>();
Code language: Dart (dart)
Second, add an event to the stream via the sink
property of the StreamController<T>
object. The sink
property has the type of StreamSink<T>
. For example, the following adds the number 1 to the stream:
controller.sink(1);
Code language: Dart (dart)
Third, access the stream via the stream
property of the StreamController<T>
object:
var stream = controller.stream;
Code language: Dart (dart)
Fourth, to raise an error, you use the addError()
method:
controller.addError("An error occurred");
Code language: Dart (dart)
Finally, to close the stream, you use the close()
method:
controller.close()
Code language: CSS (css)
The following illustrates how to use the StreamController<T>
to create a stream that emits an integer every second:
import 'dart:async';
class Number {
Number() {
Timer.periodic(
Duration(seconds: 1),
(timer) {
_controller.sink.add(_count);
_count++;
},
);
}
// create a StreamController
final _controller = StreamController<int>();
var _count = 1;
// property that returns the stream object
Stream<int> get stream => _controller.stream;
}
Code language: Dart (dart)
Reading from a stream
To read data from a stream, you can use either callbacks or async & await keywords.
1) Reading from a stream example using callbacks
The following example shows how to use read data from the number stream above using a callback:
import 'number.dart';
void main() {
var stream = Number().stream;
var subscription = stream.listen(
(event) => print(event),
);
}
Code language: Dart (dart)
The program displays a number from 1 every second:
1
2
3
...
Code language: Dart (dart)
How it works.
First, create an instance of the Number()
and access the number stream via the stream property:
var stream = Number().stream;
Code language: Dart (dart)
Second, subscribe for notifications whenever a new number is coming in the stream by calling the listen()
method on the stream object:
var subscription = stream.listen(
(event) => print(event),
);
Code language: Dart (dart)
Dart will execute the anonymous function passed to the listen()
method whenever a number is coming into the stream.
By default, a stream accepts a single subscription. In other words, a stream only allows a single listener for its whole lifespan.
If you attempt to listen to a stream more than once, you’ll get an exception like this:
import 'number.dart';
void main() {
var stream = Number().stream;
stream.listen(
(event) => print(event),
);
// cause exception
stream.listen(
(event) => print(event),
);
}
Code language: Dart (dart)
Exception:
Unhandled exception:
Bad state: Stream has already been listened to.
Code language: plaintext (plaintext)
To allow multiple subscriptions, you need to create a stream as a broadcast stream by using the asBroadcastStream()
method of the Stream<T>
object:
import 'number.dart';
void main() {
var stream = Number().stream.asBroadcastStream();
stream.listen(
(event) => print('listener 1: $event'),
);
// cause exception
stream.listen(
(event) => print('listener 2: $event'),
);
}
Code language: Dart (dart)
Output:
listener 1: 1
listener 2: 1
listener 1: 2
listener 2: 2
listener 1: 3
listener 2: 3
Code language: plaintext (plaintext)
2) Reading from a stream using an await-for statement
The following example uses an asynchronous for
loop rather than a callback to read data from the stream:
import 'number.dart';
Future<void> main() async {
// create a new stream
var stream = Number().stream;
// reading data from the stream
await for (var number in stream) {
print(number);
}
}
Code language: Dart (dart)
To make a regular for
loop asynchronous, you place the await
keyword in front of it. The await for
pauses the loop until the next number emits from the stream.
Note that we mark the main()
function as an async
function because it has an await
keyword.
StreamSubscription<T>
The listen()
method of the Stream<T>
object returns an instance of the StreamSubscription<T>
:
var subscription = stream.listen(
(event) => print(event),
);
Code language: JavaScript (javascript)
The StreamSubscription<T>
object has three useful methods:
pause()
– pauses a subscription.resume()
– resumes a subscription after a pause.cancel()
– cancels a subscription.
The following example illustrates how to pause a subscription after two seconds and then resume it after three seconds using two timers:
import 'number.dart';
import 'dart:async';
void main() {
var stream = Number().stream;
var subscription = stream.listen(
(event) => print(event),
);
// pause after 2 seconds
Timer(
Duration(seconds: 2),
() {
print('Pausing the subscription');
subscription.pause();
},
);
// resume after three seconds later
Timer(
Duration(seconds: 5),
() {
print('Resuming the subscription');
subscription.resume();
},
);
}
Code language: C# (cs)
Output:
1
2
Pausing the subscription
Resuming the subscription
3
4
5
...
Note that while pausing the subscription, the stream is emitting the numbers.
Transforming a stream
The Stream<T>
object provides you with many useful methods for transforming data in the stream.
For example, the following program uses the take()
method to get the first 10 numbers, the where()
method to get only even numbers, and the map()
method to transform the numbers into strings:
import 'dart:async';
import 'number.dart';
Future<void> main() async {
Number()
.stream
.take(10)
.where((number) => number % 2 == 0)
.map((number) => 'number $number')
.listen(
(number) => print(number),
onDone: () => print('Done!'),
);
}
Code language: Dart (dart)
Output:
number 2
number 4
number 6
number 8
number 10
Done!
Code language: plaintext (plaintext)
Handling errors
The following modifies the number.dart
file to raise an error in the number stream when the current number reaches 5:
import 'dart:async';
class Number {
Number() {
Timer.periodic(
Duration(seconds: 1),
(timer) {
_controller.sink.add(_count);
_count++;
if (_count == 5) {
_controller.addError("Limit reached");
_controller.close();
timer.cancel();
}
},
);
}
// create a StreamController
final _controller = StreamController<int>();
var _count = 1;
// property that returns the stream object
Stream<int> get stream => _controller.stream;
}
Code language: Dart (dart)
In this example, we use the addError()
method of the StreamController<T>
class to raise an error and the close()
method to close the stream
To handle the errors of a stream, you can use either a callback or a try-catch
block.
1) Using a callback to handle error
The following example illustrates how to use a callback to handle an error that occurs in a stream:
import 'dart:async';
import 'number.dart';
Future<void> main() async {
Number().stream.listen(
(number) => print(number),
onError: (err) => print('Error:$err'),
);
}
Code language: Dart (dart)
Output:
1
2
3
4
Error:Limit reached
Code language: plaintext (plaintext)
The listen()
method also has a parameter onDone
that accepts a callback. When the stream closes and sends a done event, the onDone
callback is called. If onDone
is null
, nothing happens. For example:
import 'dart:async';
import 'number.dart';
Future<void> main() async {
Number().stream.listen(
(number) => print(number),
onError: (err) => print('Error:$err'),
onDone: () => print("Done."),
);
}
Code language: Dart (dart)
Output:
1
2
3
4
Error:Limit reached
Done.
Code language: plaintext (plaintext)
2) Using a try-catch block to handle errors
Besides the OnError handler, you can also use the try-catch
block to handle the errors on a stream. For example:
import 'dart:async';
import 'number.dart';
Future<void> main() async {
try {
Number().stream.listen(
(number) => print(number),
);
} catch (err) {
print(err);
}
}
Code language: Dart (dart)
Output:
1
2
3
4
Unhandled exception:
Limit reached
Code language: plaintext (plaintext)
Summary
- A stream represents multiple values that will be returned in the future.
- Use the
StreamController<T>
to create a simple stream that others can listen to and push data into. - Use the
Stream<T>
class to manage streams. - Use the
listen()
method of theStream<T>
object to subscribe for notifications.