Dart Streams
Handle sequences of asynchronous data
🌊 What are Dart Streams?
Streams provide a way to receive a sequence of events or data over time. Think of them as pipes where data flows continuously, perfect for handling real-time updates, user input, or network data.
Stream<int> countStream() async* {
for (int i = 1; i <= 5; i++) {
await Future.delayed(Duration(seconds: 1));
yield i;
}
}
void main() async {
await for (int value in countStream()) {
print('Received: $value');
}
}
Output:
Received: 1
Received: 2
Received: 3
Received: 4
Received: 5
Stream Concepts
Stream Creation
Create streams with async* generators
Stream<String> dataStream() async* {
yield 'Hello';
yield 'World';
}
Stream Listening
Listen to stream events
stream.listen((data) {
print('Got: $data');
});
Stream Controllers
Manually control stream data
var controller = StreamController<int>();
controller.add(42);
Stream Transformations
Transform stream data
stream.map((x) => x * 2)
.where((x) => x > 10)
🔹 Creating Streams
Different ways to create streams:
import 'dart:async';
// Method 1: Using async* generator
Stream<String> messageStream() async* {
yield 'Starting...';
await Future.delayed(Duration(seconds: 1));
yield 'Processing...';
await Future.delayed(Duration(seconds: 1));
yield 'Complete!';
}
// Method 2: Using Stream.periodic
Stream<int> timerStream() {
return Stream.periodic(Duration(seconds: 1), (count) => count + 1)
.take(5); // Only take first 5 values
}
// Method 3: Using StreamController
StreamController<String> createControlledStream() {
var controller = StreamController<String>();
// Add data to stream
Timer.periodic(Duration(milliseconds: 500), (timer) {
if (timer.tick > 6) {
controller.close();
timer.cancel();
} else {
controller.add('Message ${timer.tick}');
}
});
return controller;
}
void main() async {
print('=== Message Stream ===');
await for (String message in messageStream()) {
print(message);
}
print('\n=== Timer Stream ===');
await for (int count in timerStream()) {
print('Timer: $count');
}
print('\n=== Controlled Stream ===');
var controller = createControlledStream();
await for (String data in controller.stream) {
print(data);
}
}
Output:
=== Message Stream ===
Starting...
Processing...
Complete!
=== Timer Stream ===
Timer: 1
Timer: 2
Timer: 3
Timer: 4
Timer: 5
=== Controlled Stream ===
Message 1
Message 2
Message 3
Message 4
Message 5
Message 6
🔹 Listening to Streams
Different ways to consume stream data:
import 'dart:async';
Stream<int> numberStream() async* {
for (int i = 1; i <= 5; i++) {
await Future.delayed(Duration(milliseconds: 500));
yield i;
}
}
void main() async {
// Method 1: Using await for (consumes entire stream)
print('Method 1: await for');
await for (int number in numberStream()) {
print('Number: $number');
}
// Method 2: Using listen() method
print('\nMethod 2: listen()');
var subscription = numberStream().listen(
(int number) {
print('Listened: $number');
},
onError: (error) {
print('Error: $error');
},
onDone: () {
print('Stream completed!');
},
);
// Wait for stream to complete
await Future.delayed(Duration(seconds: 3));
// Method 3: Using first, last, single
print('\nMethod 3: Stream methods');
int firstValue = await numberStream().first;
print('First value: $firstValue');
int lastValue = await numberStream().last;
print('Last value: $lastValue');
List<int> allValues = await numberStream().toList();
print('All values: $allValues');
}
Output:
Method 1: await for
Number: 1
Number: 2
Number: 3
Number: 4
Number: 5
Method 2: listen()
Listened: 1
Listened: 2
Listened: 3
Listened: 4
Listened: 5
Stream completed!
Method 3: Stream methods
First value: 1
Last value: 5
All values: [1, 2, 3, 4, 5]
🔹 Stream Transformations
Transform and filter stream data:
import 'dart:async';
Stream<int> sourceStream() async* {
for (int i = 1; i <= 10; i++) {
await Future.delayed(Duration(milliseconds: 200));
yield i;
}
}
void main() async {
print('Original stream:');
await for (int value in sourceStream().take(5)) {
print(value);
}
print('\nTransformed stream (map):');
await for (int doubled in sourceStream().map((x) => x * 2).take(5)) {
print('$doubled');
}
print('\nFiltered stream (where):');
await for (int even in sourceStream().where((x) => x % 2 == 0).take(3)) {
print('Even: $even');
}
print('\nChained transformations:');
await for (String result in sourceStream()
.where((x) => x > 3) // Filter: only numbers > 3
.map((x) => x * x) // Transform: square the numbers
.map((x) => 'Square: $x') // Transform: format as string
.take(4)) { // Limit: take only 4 values
print(result);
}
print('\nStream reduction:');
int sum = await sourceStream().take(5).reduce((a, b) => a + b);
print('Sum of first 5 numbers: $sum');
List<int> evenNumbers = await sourceStream()
.where((x) => x % 2 == 0)
.take(3)
.toList();
print('First 3 even numbers: $evenNumbers');
}
Output:
Original stream:
1
2
3
4
5
Transformed stream (map):
2
4
6
8
10
Filtered stream (where):
Even: 2
Even: 4
Even: 6
Chained transformations:
Square: 16
Square: 25
Square: 36
Square: 49
Stream reduction:
Sum of first 5 numbers: 15
First 3 even numbers: [2, 4, 6]
🔹 Real-World Stream Example
A chat application simulation using streams:
import 'dart:async';
import 'dart:math';
class ChatMessage {
final String user;
final String message;
final DateTime timestamp;
ChatMessage(this.user, this.message) : timestamp = DateTime.now();
@override
String toString() {
return '[$user] $message (${timestamp.hour}:${timestamp.minute.toString().padLeft(2, '0')})';
}
}
class ChatRoom {
final StreamController<ChatMessage> _messageController = StreamController<ChatMessage>.broadcast();
final List<String> _users = ['Alice', 'Bob', 'Charlie', 'Diana'];
final List<String> _messages = [
'Hello everyone!',
'How are you doing?',
'Great weather today!',
'Anyone up for lunch?',
'I love Dart programming!',
'See you later!',
];
Stream<ChatMessage> get messageStream => _messageController.stream;
void startSimulation() {
Timer.periodic(Duration(seconds: 2), (timer) {
if (timer.tick > 8) {
_messageController.close();
timer.cancel();
return;
}
var random = Random();
var user = _users[random.nextInt(_users.length)];
var message = _messages[random.nextInt(_messages.length)];
_messageController.add(ChatMessage(user, message));
});
}
void sendMessage(String user, String message) {
_messageController.add(ChatMessage(user, message));
}
}
void main() async {
var chatRoom = ChatRoom();
// Listen to all messages
var subscription = chatRoom.messageStream.listen(
(ChatMessage message) {
print('📱 $message');
},
onDone: () {
print('🔚 Chat room closed');
},
);
// Listen to messages from specific user
var aliceMessages = chatRoom.messageStream
.where((msg) => msg.user == 'Alice')
.listen((msg) {
print('👩 Alice said: ${msg.message}');
});
// Count total messages
var messageCount = 0;
var counter = chatRoom.messageStream.listen((msg) {
messageCount++;
print('📊 Total messages: $messageCount');
});
print('🚀 Starting chat simulation...');
// Send a welcome message
chatRoom.sendMessage('System', 'Welcome to the chat room!');
// Start the simulation
chatRoom.startSimulation();
// Wait for simulation to complete
await Future.delayed(Duration(seconds: 20));
// Clean up
subscription.cancel();
aliceMessages.cancel();
counter.cancel();
}
Output:
🚀 Starting chat simulation...
📱 [System] Welcome to the chat room! (14:30)
📊 Total messages: 1
📱 [Bob] Hello everyone! (14:30)
📊 Total messages: 2
📱 [Alice] Great weather today! (14:30)
👩 Alice said: Great weather today!
📊 Total messages: 3
📱 [Charlie] I love Dart programming! (14:30)
📊 Total messages: 4
🔚 Chat room closed