Dart Streams

Handle sequences of asynchronous data

🌊 What are Dart Streams?

Streams provide a way to receive a sequence of events or data over time. Think of them as pipes where data flows continuously, perfect for handling real-time updates, user input, or network data.


Stream<int> countStream() async* {
  for (int i = 1; i <= 5; i++) {
    await Future.delayed(Duration(seconds: 1));
    yield i;
  }
}

void main() async {
  await for (int value in countStream()) {
    print('Received: $value');
  }
}
                                    

Output:

Received: 1

Received: 2

Received: 3

Received: 4

Received: 5

Stream Concepts

🎯

Stream Creation

Create streams with async* generators

Stream<String> dataStream() async* {
  yield 'Hello';
  yield 'World';
}
👂

Stream Listening

Listen to stream events

stream.listen((data) {
  print('Got: $data');
});
🔄

Stream Controllers

Manually control stream data

var controller = StreamController<int>();
controller.add(42);
âš¡

Stream Transformations

Transform stream data

stream.map((x) => x * 2)
      .where((x) => x > 10)

🔹 Creating Streams

Different ways to create streams:

import 'dart:async';

// Method 1: Using async* generator
Stream<String> messageStream() async* {
  yield 'Starting...';
  await Future.delayed(Duration(seconds: 1));
  yield 'Processing...';
  await Future.delayed(Duration(seconds: 1));
  yield 'Complete!';
}

// Method 2: Using Stream.periodic
Stream<int> timerStream() {
  return Stream.periodic(Duration(seconds: 1), (count) => count + 1)
      .take(5); // Only take first 5 values
}

// Method 3: Using StreamController
StreamController<String> createControlledStream() {
  var controller = StreamController<String>();
  
  // Add data to stream
  Timer.periodic(Duration(milliseconds: 500), (timer) {
    if (timer.tick > 6) {
      controller.close();
      timer.cancel();
    } else {
      controller.add('Message ${timer.tick}');
    }
  });
  
  return controller;
}

void main() async {
  print('=== Message Stream ===');
  await for (String message in messageStream()) {
    print(message);
  }
  
  print('\n=== Timer Stream ===');
  await for (int count in timerStream()) {
    print('Timer: $count');
  }
  
  print('\n=== Controlled Stream ===');
  var controller = createControlledStream();
  await for (String data in controller.stream) {
    print(data);
  }
}

Output:

=== Message Stream ===

Starting...

Processing...

Complete!

=== Timer Stream ===

Timer: 1

Timer: 2

Timer: 3

Timer: 4

Timer: 5

=== Controlled Stream ===

Message 1

Message 2

Message 3

Message 4

Message 5

Message 6

🔹 Listening to Streams

Different ways to consume stream data:

import 'dart:async';

Stream<int> numberStream() async* {
  for (int i = 1; i <= 5; i++) {
    await Future.delayed(Duration(milliseconds: 500));
    yield i;
  }
}

void main() async {
  // Method 1: Using await for (consumes entire stream)
  print('Method 1: await for');
  await for (int number in numberStream()) {
    print('Number: $number');
  }
  
  // Method 2: Using listen() method
  print('\nMethod 2: listen()');
  var subscription = numberStream().listen(
    (int number) {
      print('Listened: $number');
    },
    onError: (error) {
      print('Error: $error');
    },
    onDone: () {
      print('Stream completed!');
    },
  );
  
  // Wait for stream to complete
  await Future.delayed(Duration(seconds: 3));
  
  // Method 3: Using first, last, single
  print('\nMethod 3: Stream methods');
  int firstValue = await numberStream().first;
  print('First value: $firstValue');
  
  int lastValue = await numberStream().last;
  print('Last value: $lastValue');
  
  List<int> allValues = await numberStream().toList();
  print('All values: $allValues');
}

Output:

Method 1: await for

Number: 1

Number: 2

Number: 3

Number: 4

Number: 5

Method 2: listen()

Listened: 1

Listened: 2

Listened: 3

Listened: 4

Listened: 5

Stream completed!

Method 3: Stream methods

First value: 1

Last value: 5

All values: [1, 2, 3, 4, 5]

🔹 Stream Transformations

Transform and filter stream data:

import 'dart:async';

Stream<int> sourceStream() async* {
  for (int i = 1; i <= 10; i++) {
    await Future.delayed(Duration(milliseconds: 200));
    yield i;
  }
}

void main() async {
  print('Original stream:');
  await for (int value in sourceStream().take(5)) {
    print(value);
  }
  
  print('\nTransformed stream (map):');
  await for (int doubled in sourceStream().map((x) => x * 2).take(5)) {
    print('$doubled');
  }
  
  print('\nFiltered stream (where):');
  await for (int even in sourceStream().where((x) => x % 2 == 0).take(3)) {
    print('Even: $even');
  }
  
  print('\nChained transformations:');
  await for (String result in sourceStream()
      .where((x) => x > 3)           // Filter: only numbers > 3
      .map((x) => x * x)             // Transform: square the numbers
      .map((x) => 'Square: $x')      // Transform: format as string
      .take(4)) {                    // Limit: take only 4 values
    print(result);
  }
  
  print('\nStream reduction:');
  int sum = await sourceStream().take(5).reduce((a, b) => a + b);
  print('Sum of first 5 numbers: $sum');
  
  List<int> evenNumbers = await sourceStream()
      .where((x) => x % 2 == 0)
      .take(3)
      .toList();
  print('First 3 even numbers: $evenNumbers');
}

Output:

Original stream:

1

2

3

4

5

Transformed stream (map):

2

4

6

8

10

Filtered stream (where):

Even: 2

Even: 4

Even: 6

Chained transformations:

Square: 16

Square: 25

Square: 36

Square: 49

Stream reduction:

Sum of first 5 numbers: 15

First 3 even numbers: [2, 4, 6]

🔹 Real-World Stream Example

A chat application simulation using streams:

import 'dart:async';
import 'dart:math';

class ChatMessage {
  final String user;
  final String message;
  final DateTime timestamp;
  
  ChatMessage(this.user, this.message) : timestamp = DateTime.now();
  
  @override
  String toString() {
    return '[$user] $message (${timestamp.hour}:${timestamp.minute.toString().padLeft(2, '0')})';
  }
}

class ChatRoom {
  final StreamController<ChatMessage> _messageController = StreamController<ChatMessage>.broadcast();
  final List<String> _users = ['Alice', 'Bob', 'Charlie', 'Diana'];
  final List<String> _messages = [
    'Hello everyone!',
    'How are you doing?',
    'Great weather today!',
    'Anyone up for lunch?',
    'I love Dart programming!',
    'See you later!',
  ];
  
  Stream<ChatMessage> get messageStream => _messageController.stream;
  
  void startSimulation() {
    Timer.periodic(Duration(seconds: 2), (timer) {
      if (timer.tick > 8) {
        _messageController.close();
        timer.cancel();
        return;
      }
      
      var random = Random();
      var user = _users[random.nextInt(_users.length)];
      var message = _messages[random.nextInt(_messages.length)];
      
      _messageController.add(ChatMessage(user, message));
    });
  }
  
  void sendMessage(String user, String message) {
    _messageController.add(ChatMessage(user, message));
  }
}

void main() async {
  var chatRoom = ChatRoom();
  
  // Listen to all messages
  var subscription = chatRoom.messageStream.listen(
    (ChatMessage message) {
      print('📱 $message');
    },
    onDone: () {
      print('🔚 Chat room closed');
    },
  );
  
  // Listen to messages from specific user
  var aliceMessages = chatRoom.messageStream
      .where((msg) => msg.user == 'Alice')
      .listen((msg) {
    print('👩 Alice said: ${msg.message}');
  });
  
  // Count total messages
  var messageCount = 0;
  var counter = chatRoom.messageStream.listen((msg) {
    messageCount++;
    print('📊 Total messages: $messageCount');
  });
  
  print('🚀 Starting chat simulation...');
  
  // Send a welcome message
  chatRoom.sendMessage('System', 'Welcome to the chat room!');
  
  // Start the simulation
  chatRoom.startSimulation();
  
  // Wait for simulation to complete
  await Future.delayed(Duration(seconds: 20));
  
  // Clean up
  subscription.cancel();
  aliceMessages.cancel();
  counter.cancel();
}

Output:

🚀 Starting chat simulation...

📱 [System] Welcome to the chat room! (14:30)

📊 Total messages: 1

📱 [Bob] Hello everyone! (14:30)

📊 Total messages: 2

📱 [Alice] Great weather today! (14:30)

👩 Alice said: Great weather today!

📊 Total messages: 3

📱 [Charlie] I love Dart programming! (14:30)

📊 Total messages: 4

🔚 Chat room closed

🧠 Test Your Knowledge

What keyword is used to create a stream generator function?