Starting Point
Let's start with the example from the io-library-tour on Streaming file Contents:
import 'dart:async';
import 'dart:io';
import 'dart:convert';
Future main() async {
var config = File('config.txt');
Stream<List<int>> inputStream = config.openRead();
var lines = inputStream
.transform(utf8.decoder)
.transform(LineSplitter());
try {
await for (var line in lines) {
print('Got ${line.length} characters from stream');
}
print('file is now closed');
} catch (e) {
print(e);
}
}
What you can see there is that a 'config.txt' file is processed in a streamed fashion. As part of the processing there are 2 transformations going on.
- utf8.decoder that converts a list of unsigned 8-bit integers to a string
- LineSplitter that splits the one string into single pieces line by line
The await for
will then process the stream basically line by line, where as the EOL-String is part of the yielded list.
Let's dive in
So how is this transform
working? For this we going to write a small transformator that will transform every string
to a UPPER CASED string.
Cool, how to start this?
Let's check the API for transform on Stream. There we find a StreamTransformer<T, S>
that needs to be passed over. But after checking we figure out that there is higher level concept that implements this interface and simplifies a lot. It's called a Converter<S, T>
. So our implementation could like this:
class UpperCase extends Converter<String, String> {
@override
String convert(String input) => input.toUpperCase();
}
Well, that was easy! Let's run the whole program and check how it looks:
import 'dart:async';
import 'dart:io';
import 'dart:convert';
class UpperCase extends Converter<String, String> {
@override
String convert(String input) => input.toUpperCase();
}
Future main() async {
var config = File(Platform.script.toFilePath());
Stream<List<int>> inputStream = config.openRead();
var lines = inputStream
.transform(utf8.decoder)
.transform(LineSplitter())
.transform(UpperCase());
try {
await for (var line in lines) {
print('Got ${line.length} characters from stream');
print(line);
}
print('file is now closed');
} catch (e) {
print(e);
}
}
$ dart io_expedition_iter0.dart
Unsupported operation: This converter does not support chunked conversions: Instance of 'UpperCase'
Oooops!
What the hell are chunked conversions?
Let's find out where this exception is originated. That is Converter<S, T>
:
/**
* Starts a chunked conversion.
*
* The returned sink serves as input for the long-running conversion. The
* given [sink] serves as output.
*/
Sink<S> startChunkedConversion(Sink<T> sink) {
throw new UnsupportedError(
"This converter does not support chunked conversions: $this");
}
It shows us at least that for some reason a Converter
seems to operate in 2 ways:
- like normal where only the
convert
method is involved - like chunked
The doc block indicates that this is for long-running conversion used. Still unclear how or why this is the choosen path by the runtime.
Let's focus on how to solve that
As you can see from the signature a Sink<S>
is expected to be returned. In our case a Sink<String>
that is simply a destination for sending Strings to. So let's intercept the streaming with a small decorator class like below:
class UpperCaseConversionSink extends StringConversionSinkBase {
EventSink<String> wrapped;
UpperCaseConversionSink(this.wrapped);
@override
void addSlice(String str, int start, int end, bool isLast) {
wrapped.add(str.toUpperCase());
}
@override
void close() {
wrapped.close();
}
}
and let's implement the start of chunked conversion in the UpperCase
Converter like this:
@override
Sink<String> startChunkedConversion(Sink<String> sink) {
return UpperCaseConversionSink(sink);
}
$ dart io_expedition_iter1.dart
Got 19 characters from stream
LIBRARY IO_TESTING;
Got 0 characters from stream
Got 20 characters from stream
IMPORT 'DART:ASYNC';
Got 17 characters from stream
IMPORT 'DART:IO';
Got 22 characters from stream
# [...]
Nice! That works.
Let's refactor a bit
As you can see the small decorator sink UpperCaseConversionSink
has now also knowledge about the conversion technique as well as the UpperCase
converter itself. That duplication can be cleaned by introducing a more generic sink that accepts a converter and delegates the concrete conversion back to the converter. Let's see how this might looks:
class StringEventConverterSink extends StringConversionSinkBase {
EventSink<String> innerSink;
Converter<String, String> converter;
// [sink] is wrapped and [converter] knows about the concrete conversion algorithm
StringEventConverterSink(Sink<String> sink, Converter<String, String> converter) {
this.innerSink = sink;
this.converter = converter;
}
@override
void addSlice(String str, int start, int end, bool isLast) {
innerSink.add(converter.convert(str));
}
@override
void close() {
innerSink.close();
}
}
the usage of this looks then like:
class UpperCaseConverter extends Converter<String, String> {
@override
String convert(String input) => input.toUpperCase();
@override
Sink<String> startChunkedConversion(Sink<String> sink) {
return StringEventConverterSink(sink, this);
}
}
The full final code can be found on my github page.
What about closures
Sure, we can even simplify further and make the Converter itself more generic in a way that it only accepts a closure to do the job. So that our usage would look as simple as this
.transform(StringConverter((String x) => x.toUpperCase()));
So we will introduce a generic StringConverter
that accepts this closure:
class StringConverter extends Converter<String, String> {
String Function(String x) convertFunction;
StringConverter(this.convertFunction);
@override
String convert(String input) =>
convertFunction(input);
@override
Sink<String> startChunkedConversion(Sink<String> sink) =>
StringEventConverterSink(sink, this);
}
The full code is on my github page too
Round up
- Dart streams come with build in support for transformators
-
Converter
are used for those transformations - Long running transformations are processed in chunks
- String chunk processing can be achieved by subclassing from
StringConversionSinkBase
- Decorator pattern can help to intercept with the source and destination sink
- Converter can be passed over to the interceptors to keep the logic in one place
- Even closures can be used to simplify things further
For me the only open question is: dow does Dart decide whether a conversion can happen direct or in a chunked fashion.
If you can clarify this, feel free to leave a comment or share resources that illustrate that further.
Thanks for reading
$ dart --version
Dart VM version: 2.0.0 (Fri Aug 3 10:53:23 2018 +0200) on "macos_x64"
Top comments (0)