ในบทที่แล้วเราสอนการสร้าง Stream แบบง่ายๆ ไปแล้ว แต่ในบางครั้งเราต้องการควบคุมข้อมูลใน Stream แบบคัสตอมมากๆ ซึ่งข้อมูลอาจจะเข้ามาจากหลายทางมากๆ การที่เรามีแค่ yield
จากฟังก์ชันเดียวอาจจะไม่เพียงพอ
ดังนั้นเลยเป็นที่มาของคลาสที่ชื่อว่า StreamController
ซึ่งเป็นคลาสที่เอาไว้ควบคุม Stream
อีกที
StreamController
การสร้าง Stream แบบธรรมดาก็จะเป็นประมาณนี้
Stream<int> getNumberStream() async* {
...
}
Stream<int> numberStream = getNumberStream();
ส่วนใหญ่เราจะสร้าง Stream จากฟังก์ชันประเภท async*
ซึ่งภายในฟังก์ชันก็จะมีการ yield
ข้อมูลกลับมา
ทีนี้ ถ้าเราจะเปลี่ยนไปใช้ StreamController แทน ก็จะเป็นแบบนี้
StreamController<int> controller = StreamController<int>();
นั่นคือ จากปกติที่เราจะสร้างข้อมูลในฟังก์ชันเท่านั้น การสร้างด้วย StreamController คือเราไม่ต้องกำหนดว่า Stream ตัวนี้จะสร้างอะไรเลย (แค่ new ตัวคอนโทรเลอร์ขึ้นมาเฉยๆ ก็พอแล้ว)
StreamController ไม่ใช่ Stream นะ ไม่สามารถเอาไปใช้แทน Stream ได้ แต่สามารถดึง Stream ออกมาจากตัวมันได้
ให้คิดง่ายๆ ว่ามันคือ Controller ที่ถือ Stream เอาไว้ข้างในอีกทีก็ได้
สำหรับอ็อบเจค StreamController ที่เราสร้างขึ้นมา จะมี Stream อยู่ข้างใน เราสามารถดึงออกมาใช้งานได้ด้วยการสั่ง .stream
ตรงๆ เลย
Stream stream = controller.stream;
stream.listen((value) {
print('value: $value');
});
//หรือ
controller.stream.listen((value) {
print('value: $value');
});
แล้วข้อมูลจะมาจากไหน? ใช้คำสั่ง add ยังไงล่ะ
StreamController นั้นไม่ได้เป็นฟังก์ชัน ดังนั้นมันจะไม่มีโลจิคการสร้างข้อมูลอะไรอยู่กับตัวมันทั้งนั้น!
StreamController เป็นแค่ตัวกลางสำหรับใส่ข้อมูลเข้า Stream เท่านั้น
import 'dart:async';
var controller = StreamController<int>();
void main() {
controller.stream.listen((x) {
print(x);
});
...
}
void addNumberToStream(int x) {
controller.add(x);
}
จากโค้ดด้านบน เรามีการสร้าง StreamController ขึ้นมา 1 ตัว แล้วจัดการ listen()
มันใน main (listen ไว้ก่อน ยังไม่มีข้อมูลอะไรเข้ามาทั้งนั้น)
ทีนี้ เราก็ไปสร้างฟังก์ชันอีกตัวหนึ่งชื่อ addNumberToStream(int x)
ซึ่งจะมีการเพิ่มค่าเข้า Stream ผ่านคำสั่ง add()
เมื่อฟังก์ชัน addNumberToStream(int x)
ถูกเรียก StreamController ก็จะส่งค่านั้นไปให้ listener ที่รอค่าอยู่นั่นเอง
เราสามารถส่งข้อมูลกี่ครั้งก็ได้ผ่านคำสั่ง add()
ส่วนใหญ่ use case ที่เราต้องใช้ StreamController แทนการสร้างฟังก์ชัน async* แบบธรรมดาจะเป็นการเขียนโปรแกรมแบบ Event-Driving เพราะเราไม่รู้ว่าข้อมูลจะเข้ามาเมื่อไหร่ ตอนไหนที่ผู้ใช้จะกดปุ่ม อะไรแบบนั้น เราเลยสร้างฟังก์ชันเตรียมไว้ก่อนไม่ได้ ทำได้แค่ให้ controller คอยส่งข้อมูลมาให้เมื่อเกิด Event เท่านั้น
คำเตือน! StreamController มักจะไม่รู้จุดสิ้นสุด!
การใช้งาน Stream ธรรมดา เราสามารถเปลี่ยนจากการใช้ callback ด้วย listen()
ไปใช้ await
เพื่อเปลี่ยนโค้ดให้อยู่ในรูปแบบ sync แทนได้
Stream<int> numberStream = getNumberStream();
await for(var number in numberStream) {
print('Receive: $number');
}
ซึ่งมีข้อควรระวังในการใช้ await
คือ Stream ตัวนั้นจะต้องเป็น "Finite Stream" หรือสตรีมที่รู้ว่ามีจุดสิ้นสุดแน่ๆ !
ไม่งั้นโค้ดของเราจะถูก await
บล็อกการทำงานอยู่ที่ลูปนั้น ไปต่อไม่ได้ เช่น
import 'dart:async';
var controller = StreamController<void>();
void main() {
print('start');
await for(var _ in controller.stream) {
print('clicked!');
}
print('end');
...
}
void onClick() {
controller.add(null);
}
ในกรณีแบบนี้ โค้ดจะถูกบล็อกอยู่ที่ลูป (โปรแกรมจะไม่หลุดไปยังคำสั่งปริ้น end เลย) เพราะไม่มีการปิดสตรีม
ดังนั้นถ้าฟังก์ชันเรามีคำสั่งให้ทำอะไรบ้างอย่างต่อไป เราไม่ควรเขียนมันในรูป await
นะ ใช้ listen()
จะดีกว่า
ยังมี addError และ close อีกนะ
นอกจากการ add ข้อมูลเข้า Stream แล้วเรายังสามารถสั่งอีก 2 คำสั่งกับคอรโทรเลอร์ได้ คือการโยน Exception หรือสั่งปิด Stream นั่นเอง
import 'dart:async';
var controller = StreamController<int>();
void main() {
controller.stream.listen(
(x) {
print(x);
},
onDone: () {
print('จบแล้วจ้า!');
},
onError: (e){
print('error is $e');
},
cancelOnError: false,
);
print('start');
controller.add(1);
controller.add(2);
controller.add(3);
controller.addError(Exception('ตู้มมม'));
controller.add(4);
controller.add(5);
controller.close();
print('end');
}
output:
start
end
1
2
3
error is Exception: ตู้มมม
4
5
จบแล้วจ้า!
การสั่ง addError
ไม่ได้ทำให้ Stream หยุดทำงานนะ ไม่งั้นต้องเพิ่มอ็อบชัน cancelOnError: true
ลงไปด้วย (ถ้าไม่เซ็ต ดีฟอลต์จะเป็น false
คือถึงเจอ Exception ฉันก็ไม่หยุดนะ)
ข้อสังเกตอีกอย่างคือ listen()
นั้นเป็น Isolates นะ มันจะปริ้น start และ end (อยู่ใน Isolates ของ ฟังก์ชัน main) ก่อน
ใครงงว่า Isolates คืออะไร ย้อนกลับไปอ่านบทความแรกของซีรีส์นี้อีกทีนะ
สุดท้าย การใช้งาน StreamController นั้นมีอีกเรื่องที่ต้องระวัง คือ...
Stream อาจจะไม่ได้ถูก subscription ในทันที
ลองดูตัวอย่างโค้ดนี้
Stream<int> counter1(int maxCount) {
var controller = StreamController<int>();
int counter = 0;
Timer.periodic(Duration(seconds: 1), (Timer timer) {
counter++;
controller.add(counter);
if (counter >= maxCount) {
timer.cancel();
controller.close();
}
});
return controller.stream;
}
เราตั้งให้ฟังก์ชัน counter1
ทำการ add ตัวเลขเข้าไปเรื่อยๆ ทุกๆ 1 วินาที (ฟังก์ชัน Timer.periodic()
เอาไว้สำหรับสั่งงานที่ทำงานซ้ๆ กันทุกๆ ช่วงเวลาหนึ่ง คล้ายๆ กับ setInterval
ใน JavaScript)
เวลาใช้งาน ก็ใช้ listen()
ตามปกติหรือใช้ for loop แล้วเติม await
ลงไปก็ได้
void main() {
//start!
var counterStream = counter1(5);
//listen
counterStream.listen((n){
print(n);
});
}
แบบนี้ถ้าเอาไปรัน ก็จะได้ output เป็นเลข 1, 2, 3, 4, 5 ตัวใน 5 วินาที ... อันนี้มาตราฐาน ตามคอมมอนเซ้นส์ ไม่มีอะไรแปลก
แต่ถ้าเรามีการสั่ง delay ก่อนที่เราจะ listen()
แบบนี้
void main() {
//start!
var counterStream = counter1(5);
//waiting 2 sec.
await Future.delayed(const Duration(seconds: 2));
//listen
counterStream.listen((n){
print(n);
});
}
คำถามคือถ้าเขียนโค้ดแบบนี้ เราคาดหวังว่า output จะเป็นยังไง?
- โปรแกรมก็น่าจะหยุดรอ 2 วินาที
- แล้วก็ค่อยๆ ปริ้นเลข 1, 2, 3, 4, 5 ทีละวิ
แต่มันไม่ใช่แบบนั้นน่ะสิ!
การทำงานของโปรแกรมคือหลังจาก delay 2 วินาทีแล้ว มันจะทำการปริ้นเลข 1, 2 ออกมาในทันที (ดูรูปข้างล่างประกอบนะ)
สำหรับการใช้งาน Stream นั้นมันจะทำงานทันที แม้ว่าจะยังไม่มีใครมารอ listen มันเลยก็ตาม
แบบในเคสนี้ เราเรียกฟังก์ชัน counter1()
ให้ทำงาน แล้วทำการ delay มันซะ 2 วินาทีแล้วค่อย listen แต่ตัว counter นั้นดันทำงานก่อนไปแล้ว ไม่ได้เริ่มทำงานตอนที่เรา listen
แต่ถ้าเราต้องการให้มันทำงานเมื่อสั่ง listen จังหวะนั้นเลย (แบบ counter2 ในรูปข้างบน) เราจะต้องเซ็ตค่าตอนสร้าง StreamController
ในการสร้าง StreamController เราสามารถกำหนดอ็อบชันเพิ่มได้คือ
- onListen
- onPause
- onResume
- onCancel
วิธีการก็คือเราจะต้องสร้าง Stream ที่ตอนแรกยังไม่เริ่มนับ counter แล้วสั่งให้มันเริ่มนับตอน onListen
นั่นเอง
Stream<int> counter2(int maxCount) {
StreamController<int> controller;
Timer timer;
int counter = 0;
//ฟังก์ชันสำหรับเริ่มรับเลข
void startTimer() {
timer = Timer.periodic(
Duration(seconds: 1),
(_) {
counter++;
controller.add(counter);
if (counter == maxCount) {
timer.cancel();
controller.close();
}
},
);
}
//ฟังก์ชันสำหรับหยุดนับเลข
void stopTimer() {
if (timer != null) {
timer.cancel();
timer = null;
}
}
controller = StreamController<int>(
//เริ่มทำงาน startTimer() เมื่อ listen นะ
onListen: startTimer,
onPause: stopTimer,
onResume: startTimer,
onCancel: stopTimer,
);
return controller.stream;
}
Top comments (0)