DEV Community

Guilherme Silva
Guilherme Silva

Posted on

Dart's future.asStream() quick overview

Dart makes it easy to transform a future into a stream, but what will happen after that? For example, if we cancel the stream, what will happen in the future? And what about if the future throws an exception? Can a try-catch block help?

TLDR

stream/dart

Into

Problem: In dart, you can't cancel (or stop) a future

Possible solution: you can cancel a stream's subscription. So, the idea is to transform a future into a stream, and cancel it when needed.

We need to understand what will happen:

  • in case it completes normally
    • before that, the stream is canceled
    • but the stream was already canceled
  • in case it throws
    • before the stream is canceled,
      • onError is defined
        • and cancelOnError is false (default)
        • and cancelOnError is true (default)
      • onError is NOT defined
    • after the stream is canceled

Situation 1 - the future completes normally, before that the stream is canceled

void main(List<String> args) async {
  try {
    final stream = longAPICall().asStream();
    var sub = stream.listen(
      (v) => print('  stream.onData(): future finished with value: $v'),
      onDone: () => print('  stream.onDone(): done'),
      onError: (e) => print(' stream.onError(): ${e.toString()}'),
      // cancelOnError: false, // default - onDone() will be called even when errors happens
      // cancelOnError: true, // onDone() will NOT be called when errors happens
    );
    // await sub.cancel();
    // print('   stream.cancel() was ALREADY cancelled!');
  } catch (e) {
    // this never happens, even if we don't define onError()
    // in this case it'll handled by zones
    print('catched: ${e.toString()}');
  }
}

Future<bool> longAPICall() async {
  print('future has STARTED...');
  await Future.delayed(Duration(seconds: 2));
  print('future is STILL running');
  await Future.delayed(Duration(seconds: 2));
  // throw Exception('BAL');
  print('future has FINISHED');
  return true;
}
Enter fullscreen mode Exit fullscreen mode

output:

future has STARTED...
future is STILL running
future has FINISHED
  stream.onData(): future finished with value: true
  stream.onDone(): done
Exited
Enter fullscreen mode Exit fullscreen mode

conclusions:

  • both onDone and onData callbacks are called
  • onData is called with the future's result

Situation 2 - the future completes normally, but the stream was already canceled

void main(List<String> args) async {
  try {
    final stream = longAPICall().asStream();
    var sub = stream.listen(
      (v) => print('  stream.onData(): future finished with value: $v'),
      onDone: () => print('  stream.onDone(): done'),
      onError: (e) => print('  stream.onError(): ${e.toString()}'),
      // cancelOnError: false, // default - onDone() will be called even when errors happens
      // cancelOnError: true, // onDone() will NOT be called when errors happens
    );
    await sub.cancel();
    print('   stream.cancel() was ALREADY cancelled!');
  } catch (e) {
    // this never happens, even if we don't define onError()
    // in this case it'll handled by zones
    print('catched: ${e.toString()}');
  }
}

Future<bool> longAPICall() async {
  print('future has STARTED...');
  await Future.delayed(Duration(seconds: 2));
  print('future is STILL running');
  await Future.delayed(Duration(seconds: 2));
  // throw Exception('HEHE');
  print('future has FINISHED');
  return true;
}
Enter fullscreen mode Exit fullscreen mode

output:

future has STARTED...
   stream.cancel() was ALREADY cancelled!
future is STILL running
future has FINISHED
Exited
Enter fullscreen mode Exit fullscreen mode

conclusions:

  • the future continues running, even after we canceled the stream
  • both onDone and onData callbacks are NOT called (the result is completely ignored)

Situation 3 - throws before the stream is canceled, onError is defined and cancelOnError is false (default)

void main(List<String> args) async {
  try {
    final stream = longAPICall().asStream();
    var sub = stream.listen(
      (v) => print('  stream.onData(): future finished with value: $v'),
      onDone: () => print('  stream.onDone(): done'),
      onError: (e) => print('  stream.onError(): ${e.toString()}'),
      cancelOnError: false, // default - onDone() will be called even when errors happens
      // cancelOnError: true, // onDone() will NOT be called when errors happens
    );
    // await sub.cancel();
    // print('   stream.cancel() was ALREADY cancelled!');
  } catch (e) {
    // this never happens, even if we don't define onError()
    // in this case it'll handled by zones
    print('catched: ${e.toString()}');
  }
}

Future<bool> longAPICall() async {
  print('future has STARTED...');
  await Future.delayed(Duration(seconds: 2));
  print('future is STILL running');
  await Future.delayed(Duration(seconds: 2));

  throw Exception('HEHE');

  print('future has FINISHED');
  return true;
}
Enter fullscreen mode Exit fullscreen mode

output:

future has STARTED...
future is STILL running
  stream.onError(): Exception: HEHE
  stream.onDone(): done
Exited
Enter fullscreen mode Exit fullscreen mode

conclusions:

  • both onDone and onError callbacks are called

Situation 4 - throws before the stream is canceled, onError is defined, cancelOnError is true

void main(List<String> args) async {
  try {
    final stream = longAPICall().asStream();
    var sub = stream.listen(
      (v) => print('  stream.onData(): future finished with value: $v'),
      onDone: () => print('  stream.onDone(): done'),
      onError: (e) => print('  stream.onError(): ${e.toString()}'),
      // cancelOnError: false, // default - onDone() will be called even when errors happens
      cancelOnError: true, // onDone() will NOT be called when errors happens
    );
    // await sub.cancel();
    // print('   stream.cancel() was ALREADY cancelled!');
  } catch (e) {
    // this never happens, even if we don't define onError()
    // in this case it'll handled by zones
    print('catched: ${e.toString()}');
  }
}

Future<bool> longAPICall() async {
  print('future has STARTED...');
  await Future.delayed(Duration(seconds: 2));
  print('future is STILL running');
  await Future.delayed(Duration(seconds: 2));
  throw Exception('HEHE');
  print('future has FINISHED');
  return true;
}
Enter fullscreen mode Exit fullscreen mode

output:

future has STARTED...
future is STILL running
  stream.onError(): Exception: HEHE
Exited
Enter fullscreen mode Exit fullscreen mode

conclusions:

  • onError callbacks is called
  • onDone is NOT called
  • try-catch is NOT called

Situation 5 - throws before the stream is canceled, onError is NOT defined

void main(List<String> args) async {
  try {
    final stream = longAPICall().asStream();
    var sub = stream.listen(
      (v) => print('  stream.onData(): future finished with value: $v'),
      onDone: () => print('  stream.onDone(): done'),
      // onError: (e) => print('  stream.onError(): ${e.toString()}'),
      // cancelOnError: false, // default - onDone() will be called even when errors happens
      // cancelOnError: true, // onDone() will NOT be called when errors happens
    );
    // await sub.cancel();
    // print('   stream.cancel() was ALREADY cancelled!');
  } catch (e) {
    // this never happens, even if we don't define onError()
    // in this case it'll handled by zones
    print('catched: ${e.toString()}');
  }
}

Future<bool> longAPICall() async {
  print('future has STARTED...');
  await Future.delayed(Duration(seconds: 2));
  print('future is STILL running');
  await Future.delayed(Duration(seconds: 2));
  throw Exception('HEHE');
  print('future has FINISHED');
  return true;
}
Enter fullscreen mode Exit fullscreen mode

output:

future has STARTED...
future is STILL running
  stream.onDone(): done
Unhandled exception:
Exception: BAL
#0      longAPICall
bin/future_playground.dart:29
<asynchronous suspension>
#1      new Stream.fromFuture.<anonymous closure> (dart:async/stream.dart:247:17)
<asynchronous suspension>

Exited (255)
Enter fullscreen mode Exit fullscreen mode

conclusions:

  • both onDone is still called (because cancelOnError's default is false)
  • try-catch is ignored (aahá! 👀)
  • exception is caught by Zone's error handling

Situation 6 - the future throws after the stream is canceled

void main(List<String> args) async {
  try {
    final stream = longAPICall().asStream();
    var sub = stream.listen(
      (v) => print('  stream.onData(): future finished with value: $v'),
      onDone: () => print('  stream.onDone(): done'),
      // onError: (e) => print('  stream.onError(): ${e.toString()}'),
      // cancelOnError: false, // default - onDone() will be called even when errors happens
      // cancelOnError: true, // onDone() will NOT be called when errors happens
    );
    await sub.cancel();
    print('   stream was ALREADY cancelled!');
  } catch (e) {
    // this never happens, even if we don't define onError()
    // in this case it'll handled by zones
    print('catched: ${e.toString()}');
  }
}

Future<bool> longAPICall() async {
  print('future has STARTED...');
  await Future.delayed(Duration(seconds: 2));
  print('future is STILL running');
  await Future.delayed(Duration(seconds: 2));
  throw Exception('HEHE');
  print('future has FINISHED');
  return true;
}
Enter fullscreen mode Exit fullscreen mode

output:

future has STARTED...
   stream was ALREADY cancelled!
future is STILL running
Exited
Enter fullscreen mode Exit fullscreen mode

conclusions:

  • the future continues running even after we canceled the stream
  • try-catch is ignored
  • onError was not declared, and it didn't make a difference (the error is simply ignored, and wasn't sent to zone 's handler) (aahá(2)! 👀)

Oldest comments (0)

Tired of sifting through your feed?

Find the content you want to see.

Change your feed algorithm by adjusting your experience level and give weights to the tags you follow.