DEV Community

loading...
Centrillion Technology

Async in Dart (3) ตอบค่าเป็นกระแสข้อมูลด้วย Stream

Ta
Introvert Developer who love to learn new Knowledge, Reading Books, Writing Blog, Drawing, play Badminton and Table-Tennis -- Founder and Writer at https://www.tamemo.com
Originally published at centrilliontech.co.th Updated on ・3 min read

ในการเขียนโปรแกรมทั่วๆ ไปเรามักจะส่งข้อมูลกันในรูปแบบของ sync เช่น

int getNumber() {
    return 1;
}
Enter fullscreen mode Exit fullscreen mode

การเรียกใช้งานก็ง่ายๆ แบบนี้

print(getNumber());
Enter fullscreen mode Exit fullscreen mode

แต่ในบทก่อนๆ เราเคยสอนเรื่องของ Generator ไปแล้ว นั่นคือการที่ส่งข้อมูลแบบหลายๆ ตัวกลับมาในรูปแบบของ sync*

ทบทวนแบบเร็วๆ! คือแทนที่จะตอบข้อมูลกลับแค่ครั้งเดียวด้วย return ก็เปลี่ยนเป็น yield แทน (yield ตอบได้หลายครั้งมากกว่า return)

Iterable<int> getNumbers() sync* {
    yield 1;
    yield 2;
    yield 3;
}
Enter fullscreen mode Exit fullscreen mode

เวลารับข้อมูลจากฟังก์ชันพวกนี้ไปใช้ ก็เลยต้องใช้การวนลูปนั่นเอง

for(var num in getNumbers()) {
    print(num);
}
Enter fullscreen mode Exit fullscreen mode

ถ้าสรุปเป็นรูป ก็จะได้แบบข้างล่างนี่

แต่ทั้ง 2 วิธีนี้มันเป็นการส่งข้อมูลแบบ Synchronous คือโค้ดยังทำงานเรียงๆ กันตามลำดับ และโค้ดทั้งหมดจะไม่ถูกแตกเป็นส่วนๆ เข้า Event Loop แบบ Asynchronous

ดังนั้นต่อไป... เรามาดู 2 กรณีนี้ แต่อยู่ในรูปของ Asynchronous กันบ้างดีกว่า!


ในบทที่แล้วเราพูดถึง Future กันไปแล้ว ซึ่งมันคือการเปลี่ยนจาก sync ให้กลายเป็น async นั่นเอง

แปลว่าเรายังขาดอยู่หนึ่งตัว คือ async* นั่นเอง

Single Value Zero or more Values
Sync int Iterable<int>
Async Future<int> Stream<int>

สรุปง่ายๆ คือ Stream ก็คือ Iterable ที่อยู่ในรูปของ Asynchronous นั่นเอง

วิธีการดึงค่าออกมาจาก Stream

การใช้งาน Stream ก็คล้ายๆ กับ Future นั่นแหละ แต่มีรายละเอียดเยอะกว่า เพราะมันตอบได้หลายค่ามากกว่า Future ยังไงล่ะ

ขอสมมุติว่าเรามีฟังก์ชันหนึ่ง ที่ตอบค่าเป็น Stream<int> ชื่อ getNumberStream() นะ

รอฟังค่าเรื่อยๆ ด้วย listen

สำหรับ Future เราจะใช้คำสั่ง then() ในการรอค่า แต่ Stream จะใช้ listen() แทน

Stream<int> numberStream = getNumberStream();

var subscription = numberStream.listen((num){
    print('Receive: $num');
});
Enter fullscreen mode Exit fullscreen mode

listen() จะตอบกลับเป็น subscription ซึ่งเดี๋ยวจะพูดถึงอีกที
แน่นอนว่าเวลาเอาไปรัน ผลที่ได้อาจจะเป็นแบบนี้

output:
Receive: 1
Receive: 2
Receive: 3
Receive: ...
Enter fullscreen mode Exit fullscreen mode

ก็คือเราได้รับตัวเลขหลายๆ ตัวนั่นเอง แต่อาจจะได้ตอนไหนก็ไม่รู้ แบบนี้

time            │     │    │
0|             1sec.  │    │
1| Receive: 1  ─┘    2sec. │
2| Receive: 2  ───────┘    │
3|                        4sec.
4| Receive: 3  ────────────┘
5|
Enter fullscreen mode Exit fullscreen mode

สำหรับตัว listen() นั้นมี options เสริมให้เราใช้งานได้หลายตัว คือ

  • onError: เมื่อเกิด error ให้ทำอะไร
  • onDone: เมื่อได้รับข้อมูลครบทุกตัวแล้วให้ทำอะไร (ทำงานเมื่อ Stream ตอบข้อมูลครบหมดทุกตัวแล้ว มันจะเรียก onDone ให้ทำงาน)
  • cancelOnError: เป็นตัวกำหนดว่าถ้าเกิด error ขึ้นแล้ว (ในกรณีที่ข้อมูลยังส่งกลับมาไม่ครบทุกตัว) จะเลือกที่จะหยุดการทำงานของ Stream ไปเลย หรือจะยังให้ทำงานต่อก็ได้
var subscription = numberStream.listen(
    (num){
        print('ได้รับข้อมูล $num');
    }, 
    onError: (err){
        print('มีข้อผิดพลาดเกิดขึ้นล่ะ $err');
    },
    onDone: (){
        print('ได้รับข้อมูลครบแล้วจ้า');
    },
    cancelOnError: false,
);
Enter fullscreen mode Exit fullscreen mode

ลดรูป Stream ด้วย await

เช่นเดียวกับตอน Future นะ คือเราสามารถเปลี่ยน listen() ให้ไปอยู่ในรูปแบบการเขียนแบบ Synchronous ได้ แต่ก็จะไม่ได้ตรงๆ แบบ Future นะเพราะเราต้องรับข้อมูลด้วยลูป

Stream<int> numberStream = getNumberStream();

await for(var number in numberStream) {
    print('Receive: $number');
}
Enter fullscreen mode Exit fullscreen mode

Broadcast Stream

ตามปกติแล้วการใช้งาน Stream จะมีการ listen() ได้แค่ครั้งเดียวเท่านั้น

เมื่อเรา subscription ไปครั้งหนึ่งแล้ว ถ้าจะ subscription ซึ่งกับ Stream ตัวเดิมมันจะเกิด Exception ขึ้นนะ!!

Stream<int> numberStream = getNumberStream();

var subscription1 = numberStream.listen((num){
    print('Receive: $num');
});

//second subscribe -> Exception!!
var subscription2 = numberStream.listen((num){
    print('Receive: $num');
});
Enter fullscreen mode Exit fullscreen mode

วิธีการแก้ (ถ้าอยาก subscription หลายครั้งจริงๆ) ให้เปลี่ยน Stream ตัวนั้นให้เป็นสิ่งที่เรียกว่า "Broadcast Stream" แทน

วิธีเปลี่ยนก็ง่ายๆ คือใช้ get property ที่ชื่อว่า asBroadcastStream แบบนี้

Stream<int> numberStream = getNumberStream().asBroadcastStream;

var subscription1 = numberStream.listen((num){
    print('First receive:  $num');
});

var subscription2 = numberStream.listen((num){
    print('Second receive: $num');
});
Enter fullscreen mode Exit fullscreen mode

ทีนี้ ถ้า Stream ของเรามีการส่งตัวเลข [1 เมื่อผ่านไป1วินาที] และ [3 เมื่อผ่านไป3วินาที] ก็จะได้ผลลัพธ์แบบนี้ (สังเกตดูว่าเราได้ข้อมูลตัวละ 2 ครั้ง เพราะมี subscription 2 ตัวนั่นเอง)

time                   │     │
0|                   1sec.   │
1| First receive:  1  ─┤     │
 | First receive:  1  ─┘    3sec.
2|                           │
3| Second receive: 3  ───────┤
 | Second receive: 3  ───────┘
4|
Enter fullscreen mode Exit fullscreen mode

Subscription

กลับมาที่สิ่งที่เราพูดค้างไว้ นั่นคือตอบที่เราสั่ง listen() เราจะได้สิ่งที่เรียกว่า subscription กลับมา

เพราะ Stream คือ "กระแสข้อมูล" ที่ไหลมาเรื่อยๆ มาตอนไหนก็ไม่รู้ เลยมี subscription เอาไว้ควบคุมกระแสนั้นอีกที

//หยุดการรับข้อมูล
subscription.pause();

//กลับรับข้อมูลใหม่
subscription.resume();

//แคนเซิล Stream ตัวนั้น
subscription.cancel();
Enter fullscreen mode Exit fullscreen mode

แต่มีข้อควรระวังอย่างนึง คือการ pause() ไม่ใช้การไม่รับข้อมูลในจังหวะนั้น แต่เป็นการหยุดรับข้อมูลชั่วคราวเท่านั้น แปลว่า "ข้อมูลไม่ได้หายไปนะ มันแค่เข้าคิวรอเรา resume() อีกทีนั่นเอง!"

ลองดูตัวอย่างข้างล่างประกอบ ขอสมมุติว่าเรามี stream อยู่หนึ่งตัวที่จะส่งตัวเลขกลับมาเรื่อยๆ ทุกๆ 1 วินาที [1, 2, 3, 4, 5, ...] แบบนี้นะ

var subscription = stream.listen((x){
    print('Receive: $x');
});
Future.delayed(Duration(seconds: 3), (){
    subscription.pause();
});
Future.delayed(Duration(seconds: 6), (){
    subscription.resume();
});
Future.delayed(Duration(seconds: 8), (){
    subscription.cancel();
});
Enter fullscreen mode Exit fullscreen mode

แล้วเราก็ตั้งค่า (โดยใช้ Future.delayed) ให้มัน..

  • pause: เมื่อผ่านไป 3 วินาที
  • resume: เมื่อผ่านไป 6 วินาที (จากตอนเริ่มโปรแกรม)
  • cancel: เมื่อผ่านไป 8 วินาที (จากตอนเริ่มโปรแกรม)

เราอาจจะคิดว่าระหว่างวินาทีที่ 3-6 เราจะไม่ได้รับข้อมูลช่วงนั้น (คือข้อมูล [4,5] หายไปเลย)

แต่ไม่ใช่นะ! เพราะเลข [4,5] นั้นยังอยู่นะ แค่มันเข้าคิวรอที่จะออกมาอยู่

เมื่อเรา resume ในวินาทีที่ 6 มันก็จะออกมาทีเดียวหมดเลย [4,5,6]

ดูอธิบายด้วย timeline ข้างล่างนี่น่าจะเข้าใจมากกว่า

time  
1| Receive: 1
2| Receive: 2
3| Receive: 3
4|            ├─ waiting until (6)
5|            │
6| Receive: 4 ┐
 | Receive: 5 ├─ 3 values in 1 sec.
 | Receive: 6 ┘
7| Receive: 7
8| Receive: 8
9|
Enter fullscreen mode Exit fullscreen mode

มาลองสร้าง Stream กันบ้าง

การสร้าง Stream ทำได้หลายวิธีมากๆ แต่แบบง่ายที่สุดคือใช้วิธีแบบ Generator

แปล Iterable เป็น Stream

เราเคยสอนการสร้าง Generator ไปแล้วในบท Dart 105: มันวนซ้ำได้นะ! มาสร้าง Generator และใช้งาน Iterable กันเถอะ แต่ตอนนั้นเราเขียนมันในรูปแบบของ sync

อย่างที่พูดไปตอนต้นว่า Stream ก็คือ Iterable ที่อยู่ในรูปของ Asynchronous เราจะมาแสดงให้เห็นกัน

ขอเปิดด้วยโค้ดแบบ Iterable ในรูปของ Synchronous

โจทย์คือ... เราต้องการสร้างฟังก์ชันสำหรับดึง Data ออกมาจำนวนหนึ่งตั้งแต่ from ถึง to

Data fetchOneData(int id){
    ...
}

Iterable<int> getData(int from, int to) sync* {
    var dataList = [];
    for(var i=from; i<=to; i++){
        dataList.add(fetchOneData(i))
    }
    return dataList;
}
Enter fullscreen mode Exit fullscreen mode

(ฟังก์ชัน fetchOneData() ทำอะไรไม่ต้องสนใจนะ ไม่ใช่ประเด็นของเรื่อนี้)

แต่เพื่อ performance ที่ดี เลยเราไม่โหลดข้อมูลในครั้งเดียว แต่เลือกที่จะเขียนมันในรูปแบบ Generator แทน ก็จะได้โค้ดแบบนี้..

Data fetchOneData(int id){
    ...
}

Iterable<int> getData(int from, int to) sync* {
    for(var i=from; i<=to; i++){
        yield fetchOneData(i);
    }
}
Enter fullscreen mode Exit fullscreen mode

เวลาเราจะใช้งาน ก็เอาไปวนลูปได้ธรรมดาๆ แบบนี้

var dataList = getData(1, 10);

for(var data in dataList) {
    print(data);
}
Enter fullscreen mode Exit fullscreen mode

ขึ้นต่อไปคือ แล้วถ้าข้อมูลของเราไม่ได้มาแบบ sync ล่ะ?

ถ้าฟังก์ชัน fetchOneData() ใช้เวลาโหลดข้อมูลนานขึ้น เราคงต้องเปลี่ยนมันเป็น Future แบบนี้

Future<Data> fetchData(int id) {
    ...
}
Enter fullscreen mode Exit fullscreen mode

เมื่อเป็นแบบนี้ เราก็มีปัญหาซะแล้ว!

เพราะฟังก์ชัน getData() ที่เป็น sync ไม่สามารถเรียก fetchOneData() ที่เป็น async ได้ล่ะ!!

วิธีการแก้ก็คือเปลี่ยนฟังก์ชัน fetchOneData() ให้หลายเป็น async ยังไงล่ะ

"ด้วยการเปลี่ยน Iterable ให้กลายเป็น Stream"

สิ่งที่เราต้องทำ มีอยู่ 3 อย่าง

  1. จากเดิมที่รีเทิร์นค่าเป็น Iterable<Data> เราต้องเปลี่ยนมันเป็น Stream<Data> แทน
  2. จากเดิมที่ฟังก์ชันเป็นชนิด sync* เราต้องเปลี่ยนมันเป็น async*
  3. จากเดิมที่เรา yield ค่าทันทีได้เลย แต่เมื่อมันเป็น Future แล้วเราก็ต้องสั่งให้มันรอด้วย await
Stream<Data> getData(int from, int to) async* {
    for(var id=from; id<=to; id++) {
        yield await fetchOneData(id);
    }
}

void main() {
    fetchAllData(1, 10).listen(print);
}
Enter fullscreen mode Exit fullscreen mode

จะเห็นว่า Iterable นั้นแปลงเป็น Stream ได้ง่ายๆ เลย

ลองมาดูกันอีกตัวอย่างกัน

int sumIt(List<int> items) {
    int sum = 0;
    for(var item in items) {
        sum += item;
    }
    return sum;
}
Enter fullscreen mode Exit fullscreen mode

เรามีฟังก์ชัน sumIt() สำหรับหาผลรวมทั้งหมดใน List แต่ถ้าลิสต์ตัวนี้ไม่ได้ค่ามาแบบ sync ล่ะ? จะทำยังไง?

คราวนี้ให้สังเกตว่า parameter นั้นรับ List (หรือก็คือ Iterable นั่นแหละนะ) เข้ามา แล้วมันรีเทิร์นค่าแบบสเกล่าธรรมดา (คือ int ธรรมดาๆ นี่แหละ)

สำหรับเคสนี้ สิ่งที่เราต้องทำก็คือ

  1. เปลี่ยนค่าแบบสเกล่าธรรมดาให้กลายเป็น Future
  2. เปลี่ยนให้ฟังก์ชันเป็น async (ระวัง! ไม่ได้เปลี่ยนเป็น async* นะ เพราะมันไม่ได้รีเทิร์น Stream)
  3. จุดที่เป็นปัญหาคือ for loop ที่วนค่าตัว Stream .. ให้เราเติม await ลงไปข้างหน้าเป็นอันจบ
Future<int> sumIt(Stream<int> items) async {
    int sum = 0;

    await for(var item in items) {
        sum += item;
    }

    return sum;
}
Enter fullscreen mode Exit fullscreen mode

เรื่องสุดท้ายคือในบางกรณี เราอาจจะมีฟังก์ชัน Stream ที่เรียกใช้งาน Stream อีกตัวก็เป็นได้นะ

Stream<Data> getFirstStream() async* {
    ...
}

Stream<Data> getSecondStream() async* {
    yield getFirstStream(); //Not Work!!
}
Enter fullscreen mode Exit fullscreen mode

วิธีการนี้โค้ดอาจคอมไพล์ได้ แต่เราจะไม่ได้ข้อมูลอะไรเลย (ถ้าใครยังไม่เข้าใจว่าทำไมไม่เวิร์ค ลองกลับไปอ่านบทที่เราพูดถึง Generator อีกทีนะ)

วิธีการแก้ก็เหมือนกับฟังก์ชัน sync* นั่นแหละ คือเราจะต้องวนลูปทีละตัว

Stream<Data> getFirstStream() async* {
    ...
}

Stream<Data> getSecondStream() async* {
    await for(var s in getFirstStream()){
        tield s;
    }
}
Enter fullscreen mode Exit fullscreen mode

หรือแบบง่ายกว่าคือการใช้ yield* ก็ได้

Stream<Data> getFirstStream() async* {
    ...
}

Stream<Data> getSecondStream() async* {
   yield* getFirstStream(); //Ok!!
}
Enter fullscreen mode Exit fullscreen mode

นอกจากวิธีสร้างแบบ Generator ที่เราใช้แนวคิดเดียวกับ Generator ใน sync แล้วยังมีอีกวิธีหนึ่งที่ให้เราคุมการไหลของข้อมูลใน Stream ได้คล่องตัวขึ้น นั่นคือใช้ StreamController

ซึ่งเราจะพูดถึงกันในบทต่อไปนะ (พร้อมกับเก็บตกเนื้อหาเกี่ยวกับ Stream กันอีกนิดหน่อยด้วย)

Discussion (0)