DEV Community

loading...
Centrillion Technology

Async in Dart (4) ควบคุมข้อมูลในstreamอย่างเหนือชั้นด้วย StreamController

Ta
Introvert Developer who love to learn new Knowledge, Reading Books, Writing Blog, Drawing, play Badminton and Table-Tennis -- Founder and Writer at https://www.tamemo.com
Originally published at centrilliontech.co.th Updated on ・2 min read

Async in Dart (4) ควบคุมข้อมูลในstreamอย่างเหนือชั้นด้วย StreamController

ในบทที่แล้วเราสอนการสร้าง Stream แบบง่ายๆ ไปแล้ว แต่ในบางครั้งเราต้องการควบคุมข้อมูลใน Stream แบบคัสตอมมากๆ ซึ่งข้อมูลอาจจะเข้ามาจากหลายทางมากๆ การที่เรามีแค่ yield จากฟังก์ชันเดียวอาจจะไม่เพียงพอ

ดังนั้นเลยเป็นที่มาของคลาสที่ชื่อว่า StreamController ซึ่งเป็นคลาสที่เอาไว้ควบคุม Stream อีกที

StreamController

การสร้าง Stream แบบธรรมดาก็จะเป็นประมาณนี้

Stream<int> getNumberStream() async* {
    ...
}

Stream<int> numberStream = getNumberStream();
Enter fullscreen mode Exit fullscreen mode

ส่วนใหญ่เราจะสร้าง Stream จากฟังก์ชันประเภท async* ซึ่งภายในฟังก์ชันก็จะมีการ yield ข้อมูลกลับมา

ทีนี้ ถ้าเราจะเปลี่ยนไปใช้ StreamController แทน ก็จะเป็นแบบนี้

StreamController<int> controller = StreamController<int>();
Enter fullscreen mode Exit fullscreen mode

นั่นคือ จากปกติที่เราจะสร้างข้อมูลในฟังก์ชันเท่านั้น การสร้างด้วย StreamController คือเราไม่ต้องกำหนดว่า Stream ตัวนี้จะสร้างอะไรเลย (แค่ new ตัวคอนโทรเลอร์ขึ้นมาเฉยๆ ก็พอแล้ว)

StreamController ไม่ใช่ Stream นะ ไม่สามารถเอาไปใช้แทน Stream ได้ แต่สามารถดึง Stream ออกมาจากตัวมันได้

ให้คิดง่ายๆ ว่ามันคือ Controller ที่ถือ Stream เอาไว้ข้างในอีกทีก็ได้

สำหรับอ็อบเจค StreamController ที่เราสร้างขึ้นมา จะมี Stream อยู่ข้างใน เราสามารถดึงออกมาใช้งานได้ด้วยการสั่ง .stream ตรงๆ เลย

Stream stream = controller.stream;
stream.listen((value) {
    print('value: $value');
});

//หรือ

controller.stream.listen((value) {
    print('value: $value');
});
Enter fullscreen mode Exit fullscreen mode

แล้วข้อมูลจะมาจากไหน? ใช้คำสั่ง add ยังไงล่ะ

StreamController นั้นไม่ได้เป็นฟังก์ชัน ดังนั้นมันจะไม่มีโลจิคการสร้างข้อมูลอะไรอยู่กับตัวมันทั้งนั้น!

StreamController เป็นแค่ตัวกลางสำหรับใส่ข้อมูลเข้า Stream เท่านั้น

import 'dart:async';

var controller = StreamController<int>();

void main() {
    controller.stream.listen((x) {
        print(x);
    });

    ...
}

void addNumberToStream(int x) {
    controller.add(x);
}
Enter fullscreen mode Exit fullscreen mode

จากโค้ดด้านบน เรามีการสร้าง StreamController ขึ้นมา 1 ตัว แล้วจัดการ listen() มันใน main (listen ไว้ก่อน ยังไม่มีข้อมูลอะไรเข้ามาทั้งนั้น)

ทีนี้ เราก็ไปสร้างฟังก์ชันอีกตัวหนึ่งชื่อ addNumberToStream(int x) ซึ่งจะมีการเพิ่มค่าเข้า Stream ผ่านคำสั่ง add()

เมื่อฟังก์ชัน addNumberToStream(int x) ถูกเรียก StreamController ก็จะส่งค่านั้นไปให้ listener ที่รอค่าอยู่นั่นเอง

เราสามารถส่งข้อมูลกี่ครั้งก็ได้ผ่านคำสั่ง add()

ส่วนใหญ่ use case ที่เราต้องใช้ StreamController แทนการสร้างฟังก์ชัน async* แบบธรรมดาจะเป็นการเขียนโปรแกรมแบบ Event-Driving เพราะเราไม่รู้ว่าข้อมูลจะเข้ามาเมื่อไหร่ ตอนไหนที่ผู้ใช้จะกดปุ่ม อะไรแบบนั้น เราเลยสร้างฟังก์ชันเตรียมไว้ก่อนไม่ได้ ทำได้แค่ให้ controller คอยส่งข้อมูลมาให้เมื่อเกิด Event เท่านั้น

คำเตือน! StreamController มักจะไม่รู้จุดสิ้นสุด!

การใช้งาน Stream ธรรมดา เราสามารถเปลี่ยนจากการใช้ callback ด้วย listen() ไปใช้ await เพื่อเปลี่ยนโค้ดให้อยู่ในรูปแบบ sync แทนได้

Stream<int> numberStream = getNumberStream();

await for(var number in numberStream) {
    print('Receive: $number');
}
Enter fullscreen mode Exit fullscreen mode

ซึ่งมีข้อควรระวังในการใช้ await คือ Stream ตัวนั้นจะต้องเป็น "Finite Stream" หรือสตรีมที่รู้ว่ามีจุดสิ้นสุดแน่ๆ !

ไม่งั้นโค้ดของเราจะถูก await บล็อกการทำงานอยู่ที่ลูปนั้น ไปต่อไม่ได้ เช่น

import 'dart:async';

var controller = StreamController<void>();

void main() {

    print('start');

    await for(var _ in controller.stream) {
        print('clicked!');
    }

    print('end');

    ...
}

void onClick() {
    controller.add(null);
}
Enter fullscreen mode Exit fullscreen mode

ในกรณีแบบนี้ โค้ดจะถูกบล็อกอยู่ที่ลูป (โปรแกรมจะไม่หลุดไปยังคำสั่งปริ้น end เลย) เพราะไม่มีการปิดสตรีม

ดังนั้นถ้าฟังก์ชันเรามีคำสั่งให้ทำอะไรบ้างอย่างต่อไป เราไม่ควรเขียนมันในรูป await นะ ใช้ listen() จะดีกว่า

ยังมี addError และ close อีกนะ

นอกจากการ add ข้อมูลเข้า Stream แล้วเรายังสามารถสั่งอีก 2 คำสั่งกับคอรโทรเลอร์ได้ คือการโยน Exception หรือสั่งปิด Stream นั่นเอง

import 'dart:async';

var controller = StreamController<int>();

void main() {
    controller.stream.listen(
        (x) {
            print(x);
        },
        onDone: () {
            print('จบแล้วจ้า!');
        },
        onError: (e){
            print('error is $e');
        },
        cancelOnError: false,
    );

    print('start');
    controller.add(1);
    controller.add(2);
    controller.add(3);
    controller.addError(Exception('ตู้มมม'));
    controller.add(4);
    controller.add(5);
    controller.close();
    print('end');
}
Enter fullscreen mode Exit fullscreen mode

output:

start
end
1
2
3
error is Exception: ตู้มมม
4
5
จบแล้วจ้า!
Enter fullscreen mode Exit fullscreen mode

การสั่ง addError ไม่ได้ทำให้ Stream หยุดทำงานนะ ไม่งั้นต้องเพิ่มอ็อบชัน cancelOnError: true ลงไปด้วย (ถ้าไม่เซ็ต ดีฟอลต์จะเป็น false คือถึงเจอ Exception ฉันก็ไม่หยุดนะ)

ข้อสังเกตอีกอย่างคือ listen() นั้นเป็น Isolates นะ มันจะปริ้น start และ end (อยู่ใน Isolates ของ ฟังก์ชัน main) ก่อน

ใครงงว่า Isolates คืออะไร ย้อนกลับไปอ่านบทความแรกของซีรีส์นี้อีกทีนะ

สุดท้าย การใช้งาน StreamController นั้นมีอีกเรื่องที่ต้องระวัง คือ...

Stream อาจจะไม่ได้ถูก subscription ในทันที

ลองดูตัวอย่างโค้ดนี้

Stream<int> counter1(int maxCount) {
    var controller = StreamController<int>();
    int counter = 0;

    Timer.periodic(Duration(seconds: 1), (Timer timer) {
        counter++;
        controller.add(counter);
        if (counter >= maxCount) {
            timer.cancel();
            controller.close();
        }
    });

    return controller.stream;
}
Enter fullscreen mode Exit fullscreen mode

เราตั้งให้ฟังก์ชัน counter1 ทำการ add ตัวเลขเข้าไปเรื่อยๆ ทุกๆ 1 วินาที (ฟังก์ชัน Timer.periodic() เอาไว้สำหรับสั่งงานที่ทำงานซ้ๆ กันทุกๆ ช่วงเวลาหนึ่ง คล้ายๆ กับ setInterval ใน JavaScript)

เวลาใช้งาน ก็ใช้ listen() ตามปกติหรือใช้ for loop แล้วเติม await ลงไปก็ได้

void main() {
    //start!
    var counterStream = counter1(5);

    //listen
    counterStream.listen((n){
        print(n);
    });
}
Enter fullscreen mode Exit fullscreen mode

แบบนี้ถ้าเอาไปรัน ก็จะได้ output เป็นเลข 1, 2, 3, 4, 5 ตัวใน 5 วินาที ... อันนี้มาตราฐาน ตามคอมมอนเซ้นส์ ไม่มีอะไรแปลก

แต่ถ้าเรามีการสั่ง delay ก่อนที่เราจะ listen() แบบนี้

void main() {
    //start!
    var counterStream = counter1(5);

    //waiting 2 sec.
    await Future.delayed(const Duration(seconds: 2));

    //listen
    counterStream.listen((n){
        print(n);
    });
}
Enter fullscreen mode Exit fullscreen mode

คำถามคือถ้าเขียนโค้ดแบบนี้ เราคาดหวังว่า output จะเป็นยังไง?

  • โปรแกรมก็น่าจะหยุดรอ 2 วินาที
  • แล้วก็ค่อยๆ ปริ้นเลข 1, 2, 3, 4, 5 ทีละวิ

แต่มันไม่ใช่แบบนั้นน่ะสิ!

การทำงานของโปรแกรมคือหลังจาก delay 2 วินาทีแล้ว มันจะทำการปริ้นเลข 1, 2 ออกมาในทันที (ดูรูปข้างล่างประกอบนะ)

สำหรับการใช้งาน Stream นั้นมันจะทำงานทันที แม้ว่าจะยังไม่มีใครมารอ listen มันเลยก็ตาม

แบบในเคสนี้ เราเรียกฟังก์ชัน counter1() ให้ทำงาน แล้วทำการ delay มันซะ 2 วินาทีแล้วค่อย listen แต่ตัว counter นั้นดันทำงานก่อนไปแล้ว ไม่ได้เริ่มทำงานตอนที่เรา listen

แต่ถ้าเราต้องการให้มันทำงานเมื่อสั่ง listen จังหวะนั้นเลย (แบบ counter2 ในรูปข้างบน) เราจะต้องเซ็ตค่าตอนสร้าง StreamController

ในการสร้าง StreamController เราสามารถกำหนดอ็อบชันเพิ่มได้คือ

  • onListen
  • onPause
  • onResume
  • onCancel

วิธีการก็คือเราจะต้องสร้าง Stream ที่ตอนแรกยังไม่เริ่มนับ counter แล้วสั่งให้มันเริ่มนับตอน onListen นั่นเอง

Stream<int> counter2(int maxCount) {
    StreamController<int> controller;
    Timer timer;
    int counter = 0;

    //ฟังก์ชันสำหรับเริ่มรับเลข
    void startTimer() {
        timer = Timer.periodic(
            Duration(seconds: 1), 
            (_) {
                counter++;
                controller.add(counter);
                if (counter == maxCount) {
                    timer.cancel();
                    controller.close();
                }
            },
        );
    }

    //ฟังก์ชันสำหรับหยุดนับเลข
    void stopTimer() {
        if (timer != null) {
            timer.cancel();
            timer = null;
        }
    }

    controller = StreamController<int>(

        //เริ่มทำงาน startTimer() เมื่อ listen นะ
        onListen: startTimer,

        onPause: stopTimer,
        onResume: startTimer,
        onCancel: stopTimer,
    );

    return controller.stream;
}
Enter fullscreen mode Exit fullscreen mode

Discussion (0)