ในการเขียนโปรแกรมทั่วๆ ไปเรามักจะส่งข้อมูลกันในรูปแบบของ sync
เช่น
int getNumber() {
return 1;
}
การเรียกใช้งานก็ง่ายๆ แบบนี้
print(getNumber());
แต่ในบทก่อนๆ เราเคยสอนเรื่องของ Generator ไปแล้ว นั่นคือการที่ส่งข้อมูลแบบหลายๆ ตัวกลับมาในรูปแบบของ sync*
ทบทวนแบบเร็วๆ! คือแทนที่จะตอบข้อมูลกลับแค่ครั้งเดียวด้วย return
ก็เปลี่ยนเป็น yield
แทน (yield ตอบได้หลายครั้งมากกว่า return)
Iterable<int> getNumbers() sync* {
yield 1;
yield 2;
yield 3;
}
เวลารับข้อมูลจากฟังก์ชันพวกนี้ไปใช้ ก็เลยต้องใช้การวนลูปนั่นเอง
for(var num in getNumbers()) {
print(num);
}
ถ้าสรุปเป็นรูป ก็จะได้แบบข้างล่างนี่
แต่ทั้ง 2 วิธีนี้มันเป็นการส่งข้อมูลแบบ Synchronous คือโค้ดยังทำงานเรียงๆ กันตามลำดับ และโค้ดทั้งหมดจะไม่ถูกแตกเป็นส่วนๆ เข้า Event Loop แบบ Asynchronous
ดังนั้นต่อไป... เรามาดู 2 กรณีนี้ แต่อยู่ในรูปของ Asynchronous กันบ้างดีกว่า!
ในบทที่แล้วเราพูดถึง Future
กันไปแล้ว ซึ่งมันคือการเปลี่ยนจาก sync
ให้กลายเป็น async
นั่นเอง
แปลว่าเรายังขาดอยู่หนึ่งตัว คือ async*
นั่นเอง
Single Value | Zero or more Values | |
---|---|---|
Sync | int |
Iterable<int> |
Async | Future<int> |
Stream<int> |
สรุปง่ายๆ คือ
Stream
ก็คือIterable
ที่อยู่ในรูปของ Asynchronous นั่นเอง
วิธีการดึงค่าออกมาจาก Stream
การใช้งาน Stream ก็คล้ายๆ กับ Future นั่นแหละ แต่มีรายละเอียดเยอะกว่า เพราะมันตอบได้หลายค่ามากกว่า Future ยังไงล่ะ
ขอสมมุติว่าเรามีฟังก์ชันหนึ่ง ที่ตอบค่าเป็น Stream<int>
ชื่อ getNumberStream()
นะ
รอฟังค่าเรื่อยๆ ด้วย listen
สำหรับ Future เราจะใช้คำสั่ง then()
ในการรอค่า แต่ Stream จะใช้ listen()
แทน
Stream<int> numberStream = getNumberStream();
var subscription = numberStream.listen((num){
print('Receive: $num');
});
listen()
จะตอบกลับเป็น subscription ซึ่งเดี๋ยวจะพูดถึงอีกที
แน่นอนว่าเวลาเอาไปรัน ผลที่ได้อาจจะเป็นแบบนี้
output:
Receive: 1
Receive: 2
Receive: 3
Receive: ...
ก็คือเราได้รับตัวเลขหลายๆ ตัวนั่นเอง แต่อาจจะได้ตอนไหนก็ไม่รู้ แบบนี้
time │ │ │
0| 1sec. │ │
1| Receive: 1 ─┘ 2sec. │
2| Receive: 2 ───────┘ │
3| 4sec.
4| Receive: 3 ────────────┘
5|
สำหรับตัว listen()
นั้นมี options เสริมให้เราใช้งานได้หลายตัว คือ
-
onError
: เมื่อเกิด error ให้ทำอะไร -
onDone
: เมื่อได้รับข้อมูลครบทุกตัวแล้วให้ทำอะไร (ทำงานเมื่อ Stream ตอบข้อมูลครบหมดทุกตัวแล้ว มันจะเรียก onDone ให้ทำงาน) -
cancelOnError
: เป็นตัวกำหนดว่าถ้าเกิด error ขึ้นแล้ว (ในกรณีที่ข้อมูลยังส่งกลับมาไม่ครบทุกตัว) จะเลือกที่จะหยุดการทำงานของ Stream ไปเลย หรือจะยังให้ทำงานต่อก็ได้
var subscription = numberStream.listen(
(num){
print('ได้รับข้อมูล $num');
},
onError: (err){
print('มีข้อผิดพลาดเกิดขึ้นล่ะ $err');
},
onDone: (){
print('ได้รับข้อมูลครบแล้วจ้า');
},
cancelOnError: false,
);
ลดรูป Stream ด้วย await
เช่นเดียวกับตอน Future นะ คือเราสามารถเปลี่ยน listen()
ให้ไปอยู่ในรูปแบบการเขียนแบบ Synchronous ได้ แต่ก็จะไม่ได้ตรงๆ แบบ Future นะเพราะเราต้องรับข้อมูลด้วยลูป
Stream<int> numberStream = getNumberStream();
await for(var number in numberStream) {
print('Receive: $number');
}
Broadcast Stream
ตามปกติแล้วการใช้งาน Stream จะมีการ listen()
ได้แค่ครั้งเดียวเท่านั้น
เมื่อเรา subscription ไปครั้งหนึ่งแล้ว ถ้าจะ subscription ซึ่งกับ Stream ตัวเดิมมันจะเกิด Exception ขึ้นนะ!!
Stream<int> numberStream = getNumberStream();
var subscription1 = numberStream.listen((num){
print('Receive: $num');
});
//second subscribe -> Exception!!
var subscription2 = numberStream.listen((num){
print('Receive: $num');
});
วิธีการแก้ (ถ้าอยาก subscription หลายครั้งจริงๆ) ให้เปลี่ยน Stream ตัวนั้นให้เป็นสิ่งที่เรียกว่า "Broadcast Stream" แทน
วิธีเปลี่ยนก็ง่ายๆ คือใช้ get property ที่ชื่อว่า asBroadcastStream
แบบนี้
Stream<int> numberStream = getNumberStream().asBroadcastStream;
var subscription1 = numberStream.listen((num){
print('First receive: $num');
});
var subscription2 = numberStream.listen((num){
print('Second receive: $num');
});
ทีนี้ ถ้า Stream ของเรามีการส่งตัวเลข [1 เมื่อผ่านไป1วินาที] และ [3 เมื่อผ่านไป3วินาที] ก็จะได้ผลลัพธ์แบบนี้ (สังเกตดูว่าเราได้ข้อมูลตัวละ 2 ครั้ง เพราะมี subscription 2 ตัวนั่นเอง)
time │ │
0| 1sec. │
1| First receive: 1 ─┤ │
| First receive: 1 ─┘ 3sec.
2| │
3| Second receive: 3 ───────┤
| Second receive: 3 ───────┘
4|
Subscription
กลับมาที่สิ่งที่เราพูดค้างไว้ นั่นคือตอบที่เราสั่ง listen()
เราจะได้สิ่งที่เรียกว่า subscription
กลับมา
เพราะ Stream คือ "กระแสข้อมูล" ที่ไหลมาเรื่อยๆ มาตอนไหนก็ไม่รู้ เลยมี subscription เอาไว้ควบคุมกระแสนั้นอีกที
//หยุดการรับข้อมูล
subscription.pause();
//กลับรับข้อมูลใหม่
subscription.resume();
//แคนเซิล Stream ตัวนั้น
subscription.cancel();
แต่มีข้อควรระวังอย่างนึง คือการ pause()
ไม่ใช้การไม่รับข้อมูลในจังหวะนั้น แต่เป็นการหยุดรับข้อมูลชั่วคราวเท่านั้น แปลว่า "ข้อมูลไม่ได้หายไปนะ มันแค่เข้าคิวรอเรา resume()
อีกทีนั่นเอง!"
ลองดูตัวอย่างข้างล่างประกอบ ขอสมมุติว่าเรามี stream อยู่หนึ่งตัวที่จะส่งตัวเลขกลับมาเรื่อยๆ ทุกๆ 1 วินาที [1, 2, 3, 4, 5, ...] แบบนี้นะ
var subscription = stream.listen((x){
print('Receive: $x');
});
Future.delayed(Duration(seconds: 3), (){
subscription.pause();
});
Future.delayed(Duration(seconds: 6), (){
subscription.resume();
});
Future.delayed(Duration(seconds: 8), (){
subscription.cancel();
});
แล้วเราก็ตั้งค่า (โดยใช้ Future.delayed) ให้มัน..
- pause: เมื่อผ่านไป 3 วินาที
- resume: เมื่อผ่านไป 6 วินาที (จากตอนเริ่มโปรแกรม)
- cancel: เมื่อผ่านไป 8 วินาที (จากตอนเริ่มโปรแกรม)
เราอาจจะคิดว่าระหว่างวินาทีที่ 3-6 เราจะไม่ได้รับข้อมูลช่วงนั้น (คือข้อมูล [4,5] หายไปเลย)
แต่ไม่ใช่นะ! เพราะเลข [4,5] นั้นยังอยู่นะ แค่มันเข้าคิวรอที่จะออกมาอยู่
เมื่อเรา resume ในวินาทีที่ 6 มันก็จะออกมาทีเดียวหมดเลย [4,5,6]
ดูอธิบายด้วย timeline ข้างล่างนี่น่าจะเข้าใจมากกว่า
time
1| Receive: 1
2| Receive: 2
3| Receive: 3
4| ├─ waiting until (6)
5| │
6| Receive: 4 ┐
| Receive: 5 ├─ 3 values in 1 sec.
| Receive: 6 ┘
7| Receive: 7
8| Receive: 8
9|
มาลองสร้าง Stream กันบ้าง
การสร้าง Stream ทำได้หลายวิธีมากๆ แต่แบบง่ายที่สุดคือใช้วิธีแบบ Generator
แปล Iterable เป็น Stream
เราเคยสอนการสร้าง Generator ไปแล้วในบท Dart 105: มันวนซ้ำได้นะ! มาสร้าง Generator และใช้งาน Iterable กันเถอะ แต่ตอนนั้นเราเขียนมันในรูปแบบของ sync
อย่างที่พูดไปตอนต้นว่า Stream
ก็คือ Iterable
ที่อยู่ในรูปของ Asynchronous เราจะมาแสดงให้เห็นกัน
ขอเปิดด้วยโค้ดแบบ Iterable ในรูปของ Synchronous
โจทย์คือ... เราต้องการสร้างฟังก์ชันสำหรับดึง Data ออกมาจำนวนหนึ่งตั้งแต่ from ถึง to
Data fetchOneData(int id){
...
}
Iterable<int> getData(int from, int to) sync* {
var dataList = [];
for(var i=from; i<=to; i++){
dataList.add(fetchOneData(i))
}
return dataList;
}
(ฟังก์ชัน fetchOneData()
ทำอะไรไม่ต้องสนใจนะ ไม่ใช่ประเด็นของเรื่อนี้)
แต่เพื่อ performance ที่ดี เลยเราไม่โหลดข้อมูลในครั้งเดียว แต่เลือกที่จะเขียนมันในรูปแบบ Generator แทน ก็จะได้โค้ดแบบนี้..
Data fetchOneData(int id){
...
}
Iterable<int> getData(int from, int to) sync* {
for(var i=from; i<=to; i++){
yield fetchOneData(i);
}
}
เวลาเราจะใช้งาน ก็เอาไปวนลูปได้ธรรมดาๆ แบบนี้
var dataList = getData(1, 10);
for(var data in dataList) {
print(data);
}
ขึ้นต่อไปคือ แล้วถ้าข้อมูลของเราไม่ได้มาแบบ sync ล่ะ?
ถ้าฟังก์ชัน fetchOneData()
ใช้เวลาโหลดข้อมูลนานขึ้น เราคงต้องเปลี่ยนมันเป็น Future แบบนี้
Future<Data> fetchData(int id) {
...
}
เมื่อเป็นแบบนี้ เราก็มีปัญหาซะแล้ว!
เพราะฟังก์ชัน getData()
ที่เป็น sync ไม่สามารถเรียก fetchOneData()
ที่เป็น async ได้ล่ะ!!
วิธีการแก้ก็คือเปลี่ยนฟังก์ชัน fetchOneData()
ให้หลายเป็น async ยังไงล่ะ
"ด้วยการเปลี่ยน Iterable ให้กลายเป็น Stream"
สิ่งที่เราต้องทำ มีอยู่ 3 อย่าง
- จากเดิมที่รีเทิร์นค่าเป็น
Iterable<Data>
เราต้องเปลี่ยนมันเป็นStream<Data>
แทน - จากเดิมที่ฟังก์ชันเป็นชนิด
sync*
เราต้องเปลี่ยนมันเป็นasync*
- จากเดิมที่เรา
yield
ค่าทันทีได้เลย แต่เมื่อมันเป็น Future แล้วเราก็ต้องสั่งให้มันรอด้วยawait
Stream<Data> getData(int from, int to) async* {
for(var id=from; id<=to; id++) {
yield await fetchOneData(id);
}
}
void main() {
fetchAllData(1, 10).listen(print);
}
จะเห็นว่า Iterable นั้นแปลงเป็น Stream ได้ง่ายๆ เลย
ลองมาดูกันอีกตัวอย่างกัน
int sumIt(List<int> items) {
int sum = 0;
for(var item in items) {
sum += item;
}
return sum;
}
เรามีฟังก์ชัน sumIt()
สำหรับหาผลรวมทั้งหมดใน List แต่ถ้าลิสต์ตัวนี้ไม่ได้ค่ามาแบบ sync ล่ะ? จะทำยังไง?
คราวนี้ให้สังเกตว่า parameter นั้นรับ List (หรือก็คือ Iterable นั่นแหละนะ) เข้ามา แล้วมันรีเทิร์นค่าแบบสเกล่าธรรมดา (คือ int
ธรรมดาๆ นี่แหละ)
สำหรับเคสนี้ สิ่งที่เราต้องทำก็คือ
- เปลี่ยนค่าแบบสเกล่าธรรมดาให้กลายเป็น Future
- เปลี่ยนให้ฟังก์ชันเป็น
async
(ระวัง! ไม่ได้เปลี่ยนเป็นasync*
นะ เพราะมันไม่ได้รีเทิร์น Stream) - จุดที่เป็นปัญหาคือ for loop ที่วนค่าตัว Stream .. ให้เราเติม
await
ลงไปข้างหน้าเป็นอันจบ
Future<int> sumIt(Stream<int> items) async {
int sum = 0;
await for(var item in items) {
sum += item;
}
return sum;
}
เรื่องสุดท้ายคือในบางกรณี เราอาจจะมีฟังก์ชัน Stream ที่เรียกใช้งาน Stream อีกตัวก็เป็นได้นะ
Stream<Data> getFirstStream() async* {
...
}
Stream<Data> getSecondStream() async* {
yield getFirstStream(); //Not Work!!
}
วิธีการนี้โค้ดอาจคอมไพล์ได้ แต่เราจะไม่ได้ข้อมูลอะไรเลย (ถ้าใครยังไม่เข้าใจว่าทำไมไม่เวิร์ค ลองกลับไปอ่านบทที่เราพูดถึง Generator อีกทีนะ)
วิธีการแก้ก็เหมือนกับฟังก์ชัน sync*
นั่นแหละ คือเราจะต้องวนลูปทีละตัว
Stream<Data> getFirstStream() async* {
...
}
Stream<Data> getSecondStream() async* {
await for(var s in getFirstStream()){
tield s;
}
}
หรือแบบง่ายกว่าคือการใช้ yield*
ก็ได้
Stream<Data> getFirstStream() async* {
...
}
Stream<Data> getSecondStream() async* {
yield* getFirstStream(); //Ok!!
}
นอกจากวิธีสร้างแบบ Generator ที่เราใช้แนวคิดเดียวกับ Generator ใน sync แล้วยังมีอีกวิธีหนึ่งที่ให้เราคุมการไหลของข้อมูลใน Stream ได้คล่องตัวขึ้น นั่นคือใช้ StreamController
ซึ่งเราจะพูดถึงกันในบทต่อไปนะ (พร้อมกับเก็บตกเนื้อหาเกี่ยวกับ Stream กันอีกนิดหน่อยด้วย)
Top comments (0)