DEV Community

loading...

ใช้ Stream.unfold และ Enum.reduce แทน loop ในแบบ imperative

Weerasak Chongnguluam
Software Developer/Love to code/Teaching to code
・2 min read

งานที่ทำวันนี้ต้องการเขียนโค้ดเพื่อ request API ที่เป็น pagination ซึ่งถ้าเราต้องการเอาข้อมูลทั้งหมด เราก็ต้อง request ไปทีละ page จนกว่าจะถึง page สุดท้าย

ทีนี้ถ้าเขียนแบบ imperative ที่มี loop ทั่วไปก็จะออกมาประมาณนี้ (ป.ล. โค้ดเป็น pseudo code)

result = []
page = 1
limit = 10
loop do
  resp = fetch(page: page, limit: limit)
  result = result ++ resp.list
  if page * limit >= resp.total do
    break
  end
  page = page + 1
end
Enter fullscreen mode Exit fullscreen mode

คือดึงข้อมูลจนกว่า page * limit จะมากกว่า total นั่นเอง

ทีนี้ถ้าเราจะทำในลักษณะนี้ใน Elixir เราสามารถเขียน recursive function เองก็ได้แล้วให้มี accumulator parameter ในการรวบรวม result ในแต่ละรอบที่เรียกซ้ำฟังก์ชัน

แต่ก็คิดว่า Elixir น่าจะมี function ที่ช่วยให้เราไม่ต้องเขียนเองเอาไว้แล้ว จนค้นไปเจอว่า Stream.unfold ใช้ร่วมกับ Enum.reduce นั่นสามารถทำได้

Stream คือ package ที่ช่วยให้เราจัดการกับข้อมูลที่ค่อยๆถูกสร้าง หรือค่อยๆถูกดึงออกมาโดยไม่ต้องมาเป็นก้อนใหญ่ๆก้อนเดียว ซึ่ง Stream.unfold นั้นก็ช่วยให้เราสร้าง Stream ของผลลัพธ์จากการเรียกฟังก์ชันใดๆ ตัวอย่างเช่นเราอยากสร้าง Stream ของการ fetch ข้อมูลทีละหน้าแบบด้านบนเราสามารถเขียนได้แบบนี้

Stream.unfold([start_page: 1, limit: 10], fn 
  [page: page, limit: limit] ->
    fetch(page, limit)
    |> case do
      %{list: list, total: total}
      when page * limit >= total ->
        {list, :halt}

      %{list: list} ->
        {list, [page: page + 1, limit: limit]}
    end

  :halt ->
    nil
end)
Enter fullscreen mode Exit fullscreen mode

จากโค้ดจะเห็นว่าเมื่อมีการทำให้ค่าจาก Stream emit ออกมานั้นจะเริ่มเอาค่าเริ่มต้นแรก [start_page: 1, limit: 10] ไปเรียก function ที่เราส่งให้กับ Stream.unfold ซึ่งเราก็จะเอาไปเรียก fetch(page, limit) จากนั้นก็เอาผลลัพธ์ที่ได้มาเช็คว่าถ้า page*limit >= total เราจะตอบกลับเป็น tuple ที่ค่าแรกเป็นผลลัพธ์ที่จะ emit ออกไปในครั้งนี้ และ ค่าที่สองคือ parameter ที่จะเอาไปเรียกฟังก์ชันในครั้งถัดไป ซึ่งเราก็กำหนดเป็น :halt เพื่อที่จะได้ไป match กับอีก case แล้วก็จะ return nil เพื่อบอก Stream.unfold ให้จบ stream เหมือนกับ break ใน imperative ที่เราเขียนให้ดูนั่นเอง ถ้าไม่เป็นตามเงื่อนไข เราก็ emit list ออกไปพร้อมกับส่ง [page: page+1, limit: limit] เป็น parameter ในรอบถัดไปเพื่อให้ fetch page ถัดไปนั่นเอง

การเรียกฟังก์ชันนี้จะยังไม่เกิดการเรียก fetch(page, limit) ทันทีเพราะมันจะแค่สร้าง Stream เตรียมเอาไว้จนกว่าเราจะเอา Stream ไปจัดการด้วยฟังก์ชันของ Enumerator อย่างในกรณีนี้เราต้องการรวบรวม ค่าของ list ที่ถูก emit ออกมาให้เป็น list เดียว เราเลยจะใช้ Enum.reduce ช่วย แล้วก็เอามา pipe ต่อกันได้แบบนี้

Stream.unfold([start_page: 1, limit: 10], fn 
  [page: page, limit: limit] ->
    fetch(page, limit)
    |> case do
      %{list: list, total: total}
      when page * limit >= total ->
        {list, :halt}

      %{list: list} ->
        {list, [page: page + 1, limit: limit]}
    end

  :halt ->
    nil
end)
|> Enum.reduce([], fn list, acc -> acc ++ list end)
Enter fullscreen mode Exit fullscreen mode

ทีนี้ถ้าฟังก์ชัน fetch ของเรานั้นมี error กลับออกมาด้วย โดยถ้าเรียกสำเร็จจะได้ {:ok, %{list: list, total: total}} ถ้าไม่สำเร็จจะได้ {:error, reason} แบบนี้ เราสามารถใช้ Enum.reduce_while เพื่อช่วย reduce จนกว่าจะเจอ pattern ที่เป็น error ได้ โดยในส่วนของ Stream.unfold ถ้าเราเจอ error ตอน fetch ก็ให้จบ stream เช่นกัน เขียนได้แบบนี้

Stream.unfold([start_page: 1, limit: 10], fn 
  [page: page, limit: limit] ->
    fetch(page, limit)
    |> case do
      {:ok, %{list: list, total: total}}
      when page * limit >= total ->
        {list, :halt}

      {:ok, %{list: list}} ->
        {list, [page: page + 1, limit: limit]}
      {:error, _reason} = error ->
        {error, :halt}
    end

  :halt ->
    nil
end)
|> Enum.reduce_while({:ok, []}, fn 
  {:ok, next_list}, {:ok, list} -> {:cont, {:ok, list ++ next_list}}
  {:error, _reason} = error, _acc -> {:halt, error}
end)
Enter fullscreen mode Exit fullscreen mode

คือถ้า fetch แล้วได้ error ก็จะ emit error เป็นค่าสุดท้ายแล้วรอบถัดไปก็ :halt จบ stream

ส่วน Enum.reduce_while นั้นตรงผลลัพธ์ของฟังก์ชันนั้นต้องตอบกลับเป็น tuple ถ้าจะให้จบ reduce tuple ค่าแรกต้องเป็น :halt แต่ถ้าจะต่อไปให้เป็น :cont ส่วนค่าที่สองใน tuple คือค่าที่เราจะให้เป็น accumulator ในแต่ละรอบ ส่วนถ้า stream ไม่มี error เลยก็จะ reduce จนจบ stream นั่นเอง

ขอฝาก Buy Me a Coffee

สำหรับท่านใดที่อ่านแล้วชอบโพสต์ต่างๆของผมที่นี่ ต้องการสนับสนุนค่ากาแฟเล็กๆน้อยๆ สามารถสนับสนุนผมได้ผ่านทาง Buy Me a Coffee คลิ๊กที่รูปด้านล่างนี้ได้เลยครับ

Buy Me A Coffee

ส่วนท่านใดไม่สะดวกใช้บัตรเครดิต หรือ Paypal สามารถสนับสนุนผมได้ผ่านทาง PromptPay โดยดู QR Code ได้จากโพสต์ที่พินเอาไว้ได้ที่ Page DevDose ครับ https://web.facebook.com/devdoseth

ขอบคุณครับ 🙏

Discussion (0)