DEV Community

Pallat Anchaleechamaikorn
Pallat Anchaleechamaikorn

Posted on

เขียน Go ต่อ Kafka ตอนที่ 1

เวลาเราจะเขียน Go เพื่อไปทำงานกับ Kafka เราก็ต้องเริ่มจากการเลือก library กันก่อน ซึ่งผมจะลองยกตัวอย่างมาให้ดูสัก 3 ตัวครับ
https://githu

  1. https://github.com/confluentinc/confluent-kafka-go ตัวนี้จะถือว่าเป็น official library ก็ว่าได้ เพราะเจ้าของก็คือ confluent เอง ซึ่งก็คือ kafka ในเวอร์ชั่นเสียเงินนั่นเอง แต่ว่า lib ตัวนี้ไม่ค่อยเป็นที่นิยม เนื่องจากมันทำตัวเป็นแค่ wrapper เป็นทางผ่านไปเรียก librdkafka อีกที ซึ่งเป็น lib ตัวจริงที่เขียนด้วย c ทำให้เราจำเป็นต้องติดตั้ง sdk ที่เขียนด้วย c ตัวนี้ด้วยเสมอ
  2. https://github.com/IBM/sarama ตัวนี้เป็นที่นิยมมาก แต่ต้องดูให้ดีเพราะเจ้าของเดิมคือ Shopify ได้โอนให้ IBM เป็นคนดูแลต่อ แต่ตัว lib ตัวเก่าที่ Shopify ก็ยังมีอยู่ด้วย ให้ดูว่าตัวล่าสุดจะอยู่กับ IBM นะครับ
  3. https://github.com/segmentio/kafka-go ตัวนี้เป็นน้องใหม่ที่เข้ามาเสนอทางเลือกว่าตัวเองเขียนง่ายกว่าใครๆ

แต่ในที่นี้ผมจะขอเลือก sarama มาใช้เนื่องจากเราค่อนข้างใช้กันเยอะ และพอเราใช้กันเยอะ คนที่เข้าทีมมาทีหลังก็อาจจะไม่รู้ว่าทำไปจะต้องไปเหนื่อยหาตัวใหม่ด้วย เราก็ใช้ให้มันเหมือนๆกันไปแหล่ะ

ก็ด้วยความที่เราก็ใช้กันไปแหล่ะนั่นแหล่ะครับ ผมก็เลยเห็นว่า เราอาจจะยังขาดความเข้าใจมันอยู่บ้าง เลยหยิบตัวนี้มาอธิบายกันสักหน่อย

ก่อนจะไปเล่าเรื่อง lib เรามาทำความเข้าใจ Kafka กันคร่าวๆก่อน สิ่งที่เราต้องรู้ก่อนคือคำศัพท์ต่างๆที่เกี่ยวข้องกับ Kafka เช่น

  • Producer: คือตัวที่จะส่ง message เข้าไปใน Kafka broker
  • Broker: เอาง่ายๆเลยก็คือ Kafka server นั่นแหล่ะ คือเป็นตัวรับ และ ส่งต่อ message
  • Consumer: ตัวที่จะรับ message ไปใช้งาน
  • Topic: ก็คือ channel ที่จะใช้ส่ง message หากัน ให้นึกถึงช่องใน youtube ว่าคนทำคอนเทนต์ก็คือ producer เขาเปิดช่องมาช่องหนึ่งก็คือ Topic แล้วเราก็ไปดู คนดูก็คือ consumer
  • Partition: เวลาเราส่งข้อมูลเข้าไปใน topic แล้วคนรับข้อมูลไปใช้ แล้วข้อมูลมีเยอะมาก จะทำให้การดึง message ไปใช้ จะใช้เวลานาน ถ้าอยากจะลดเวลาลง ก็ต้องกระจ่าย message ให้มันแบ่งช่องกันอยู่ เวลา consumer มาดึงไป จะได้ช่วยกันได้หลายๆ consumer เราก็จะใช้เทคนิคในการแบ่ง partition ออกไป อยากเร็วกี่เท่าก็สร้าง partition ตามนั้นเลยเช่น อยากเร็วขึ้น 10 เท่า ก็ทำ 10 partition
  • Replica: ก็คือการทำสำเนาข้อมูลไว้สำรอง เผื่อกรณีที่ broker มีปัญหา จะได้มีสำรองไว้
  • ISR (In-sync replica): คือจำนวนของ replica ที่ active อยู่ในขณะนั้น

เอาคร่าวๆเท่านี้ก่อนนะครับ
ทีนี้เราก็จะมาดู code ตัวอย่างที่ทาง sarama ทำไว้ให้ดูที่หน้า package โดยเราจะไปเริ่มที่ตัวอย่าง SyncProducer กันก่อนเลย

producer, err := NewSyncProducer([]string{"localhost:9092"}, nil)
if err != nil {
    log.Fatalln(err)
}
defer func() {
    if err := producer.Close(); err != nil {
        log.Fatalln(err)
    }
}()

msg := &ProducerMessage{Topic: "my_topic", Value: StringEncoder("testing 123")}
partition, offset, err := producer.SendMessage(msg)
if err != nil {
    log.Printf("FAILED to send message: %s\n", err)
} else {
    log.Printf("> message sent to partition %d at offset %d\n", partition, offset)
}
Enter fullscreen mode Exit fullscreen mode

ปกติโค้ดตัวอย่างของ sarama เวลาเรา copy ลงมาแปะไว้ในเครื่องเรา มันจะใช้ไม่ได้สักตัวเลยนะครับ เพราะมันเขียนผิดบ้าง ไม่ใส่ sarama. บ้าง เราก็ต้องมานั่งแก้ๆกันก่อนครับ เช่น

producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, nil)
if err != nil {
    log.Fatalln(err)
}
defer func() {
    if err := producer.Close(); err != nil {
        log.Fatalln(err)
    }
}()

msg := &sarama.ProducerMessage{Topic: "my_topic", Value: sarama.StringEncoder("testing 123")}
partition, offset, err := producer.SendMessage(msg)
if err != nil {
    log.Printf("FAILED to send message: %s\n", err)
} else {
    log.Printf("> message sent to partition %d at offset %d\n", partition, offset)
}
Enter fullscreen mode Exit fullscreen mode

พอแก้เสร็จแล้วเราก็มาลองอ่านโค้ดกันดูว่าตัวอย่างมันเขียนอะไรไว้บ้าง เริ่มจาก

sarama.NewSyncProducer ตัวนี้คือการสร้าง producer instant โดยมันต้องการ parameter 2 ตัว ตัวแรกคือ []string ที่เราจะต้องระบุลงไปว่าเรามี broker อยู่กี่ตัว ให้ใส่ลงไปให้หมดเช่นถ้ามี 3 ตัวก็อาจจะใส่

[]string{"localhost:9092","localhost:9093","localhost:9094"}
Enter fullscreen mode Exit fullscreen mode

ส่วน parameter ตัวที่สองคือ config ซึ่งถ้าใส่เป็น nil มันก็จะไปใช้ default config ให้เอง

ทีนี้ตัว producer เมื่อใช้เสร็จก็ต้อง close เราก็เลยต้องทำ defer เอาไว้เลยตามตัวอย่าง จากนั้น

msg := &ProducerMessage{Topic: "my_topic", Value: StringEncoder("testing 123")}

นี่ก็เป็นการสร้าง message instant ด้วยการระบุ topic ชื่อ my_topic และ value คือตัว message นั่นเอง
กรณีที่เราไม่เคยสร้าง topic ไว้ใน kafka มาก่อน ถ้าเรา produce เข้าไปเลย มันจะสร้าง topic ง่ายๆขึ้นมาให้ โดยจะไม่ได้แบ่ง partition และไม่มี replica ด้วย

สุดท้ายเราก็ส่ง message เข้าไปด้วยบรรทัดนี้

partition, offset, err := producer.SendMessage(msg)

เสร็จแล้วมันจะคืนค่ามาว่า message ที่ส่งเข้าไปนั้นไปลงที่ partition เลขอะไร และ message นั้นอยู่ในลำดับ(offset) ที่เท่าไร และมี error หรือไม่

เดี๋ยวคราวหน้าเราจะมาต่อกันที่ consumer นะครับ

Top comments (0)