เวลาเราจะเขียน Go เพื่อไปทำงานกับ Kafka เราก็ต้องเริ่มจากการเลือก library กันก่อน ซึ่งผมจะลองยกตัวอย่างมาให้ดูสัก 3 ตัวครับ
https://githu
-
https://github.com/confluentinc/confluent-kafka-go
ตัวนี้จะถือว่าเป็น official library ก็ว่าได้ เพราะเจ้าของก็คือ confluent เอง ซึ่งก็คือ kafka ในเวอร์ชั่นเสียเงินนั่นเอง แต่ว่า lib ตัวนี้ไม่ค่อยเป็นที่นิยม เนื่องจากมันทำตัวเป็นแค่ wrapper เป็นทางผ่านไปเรียก
librdkafka
อีกที ซึ่งเป็น lib ตัวจริงที่เขียนด้วยc
ทำให้เราจำเป็นต้องติดตั้ง sdk ที่เขียนด้วยc
ตัวนี้ด้วยเสมอ - https://github.com/IBM/sarama ตัวนี้เป็นที่นิยมมาก แต่ต้องดูให้ดีเพราะเจ้าของเดิมคือ Shopify ได้โอนให้ IBM เป็นคนดูแลต่อ แต่ตัว lib ตัวเก่าที่ Shopify ก็ยังมีอยู่ด้วย ให้ดูว่าตัวล่าสุดจะอยู่กับ IBM นะครับ
- https://github.com/segmentio/kafka-go ตัวนี้เป็นน้องใหม่ที่เข้ามาเสนอทางเลือกว่าตัวเองเขียนง่ายกว่าใครๆ
แต่ในที่นี้ผมจะขอเลือก sarama
มาใช้เนื่องจากเราค่อนข้างใช้กันเยอะ และพอเราใช้กันเยอะ คนที่เข้าทีมมาทีหลังก็อาจจะไม่รู้ว่าทำไปจะต้องไปเหนื่อยหาตัวใหม่ด้วย เราก็ใช้ให้มันเหมือนๆกันไปแหล่ะ
ก็ด้วยความที่เราก็ใช้กันไปแหล่ะนั่นแหล่ะครับ ผมก็เลยเห็นว่า เราอาจจะยังขาดความเข้าใจมันอยู่บ้าง เลยหยิบตัวนี้มาอธิบายกันสักหน่อย
ก่อนจะไปเล่าเรื่อง lib เรามาทำความเข้าใจ Kafka กันคร่าวๆก่อน สิ่งที่เราต้องรู้ก่อนคือคำศัพท์ต่างๆที่เกี่ยวข้องกับ Kafka เช่น
- Producer: คือตัวที่จะส่ง message เข้าไปใน Kafka broker
- Broker: เอาง่ายๆเลยก็คือ Kafka server นั่นแหล่ะ คือเป็นตัวรับ และ ส่งต่อ message
- Consumer: ตัวที่จะรับ message ไปใช้งาน
- Topic: ก็คือ channel ที่จะใช้ส่ง message หากัน ให้นึกถึงช่องใน youtube ว่าคนทำคอนเทนต์ก็คือ producer เขาเปิดช่องมาช่องหนึ่งก็คือ Topic แล้วเราก็ไปดู คนดูก็คือ consumer
- Partition: เวลาเราส่งข้อมูลเข้าไปใน topic แล้วคนรับข้อมูลไปใช้ แล้วข้อมูลมีเยอะมาก จะทำให้การดึง message ไปใช้ จะใช้เวลานาน ถ้าอยากจะลดเวลาลง ก็ต้องกระจ่าย message ให้มันแบ่งช่องกันอยู่ เวลา consumer มาดึงไป จะได้ช่วยกันได้หลายๆ consumer เราก็จะใช้เทคนิคในการแบ่ง partition ออกไป อยากเร็วกี่เท่าก็สร้าง partition ตามนั้นเลยเช่น อยากเร็วขึ้น 10 เท่า ก็ทำ 10 partition
- Replica: ก็คือการทำสำเนาข้อมูลไว้สำรอง เผื่อกรณีที่ broker มีปัญหา จะได้มีสำรองไว้
- ISR (In-sync replica): คือจำนวนของ replica ที่ active อยู่ในขณะนั้น
เอาคร่าวๆเท่านี้ก่อนนะครับ
ทีนี้เราก็จะมาดู code ตัวอย่างที่ทาง sarama ทำไว้ให้ดูที่หน้า package โดยเราจะไปเริ่มที่ตัวอย่าง SyncProducer
กันก่อนเลย
producer, err := NewSyncProducer([]string{"localhost:9092"}, nil)
if err != nil {
log.Fatalln(err)
}
defer func() {
if err := producer.Close(); err != nil {
log.Fatalln(err)
}
}()
msg := &ProducerMessage{Topic: "my_topic", Value: StringEncoder("testing 123")}
partition, offset, err := producer.SendMessage(msg)
if err != nil {
log.Printf("FAILED to send message: %s\n", err)
} else {
log.Printf("> message sent to partition %d at offset %d\n", partition, offset)
}
ปกติโค้ดตัวอย่างของ sarama เวลาเรา copy ลงมาแปะไว้ในเครื่องเรา มันจะใช้ไม่ได้สักตัวเลยนะครับ เพราะมันเขียนผิดบ้าง ไม่ใส่ sarama.
บ้าง เราก็ต้องมานั่งแก้ๆกันก่อนครับ เช่น
producer, err := sarama.NewSyncProducer([]string{"localhost:9092"}, nil)
if err != nil {
log.Fatalln(err)
}
defer func() {
if err := producer.Close(); err != nil {
log.Fatalln(err)
}
}()
msg := &sarama.ProducerMessage{Topic: "my_topic", Value: sarama.StringEncoder("testing 123")}
partition, offset, err := producer.SendMessage(msg)
if err != nil {
log.Printf("FAILED to send message: %s\n", err)
} else {
log.Printf("> message sent to partition %d at offset %d\n", partition, offset)
}
พอแก้เสร็จแล้วเราก็มาลองอ่านโค้ดกันดูว่าตัวอย่างมันเขียนอะไรไว้บ้าง เริ่มจาก
sarama.NewSyncProducer
ตัวนี้คือการสร้าง producer instant โดยมันต้องการ parameter 2 ตัว ตัวแรกคือ []string ที่เราจะต้องระบุลงไปว่าเรามี broker อยู่กี่ตัว ให้ใส่ลงไปให้หมดเช่นถ้ามี 3 ตัวก็อาจจะใส่
[]string{"localhost:9092","localhost:9093","localhost:9094"}
ส่วน parameter ตัวที่สองคือ config ซึ่งถ้าใส่เป็น nil
มันก็จะไปใช้ default config ให้เอง
ทีนี้ตัว producer เมื่อใช้เสร็จก็ต้อง close เราก็เลยต้องทำ defer เอาไว้เลยตามตัวอย่าง จากนั้น
msg := &ProducerMessage{Topic: "my_topic", Value: StringEncoder("testing 123")}
นี่ก็เป็นการสร้าง message instant ด้วยการระบุ topic ชื่อ my_topic
และ value คือตัว message นั่นเอง
กรณีที่เราไม่เคยสร้าง topic ไว้ใน kafka มาก่อน ถ้าเรา produce เข้าไปเลย มันจะสร้าง topic ง่ายๆขึ้นมาให้ โดยจะไม่ได้แบ่ง partition และไม่มี replica ด้วย
สุดท้ายเราก็ส่ง message เข้าไปด้วยบรรทัดนี้
partition, offset, err := producer.SendMessage(msg)
เสร็จแล้วมันจะคืนค่ามาว่า message ที่ส่งเข้าไปนั้นไปลงที่ partition เลขอะไร และ message นั้นอยู่ในลำดับ(offset) ที่เท่าไร และมี error หรือไม่
เดี๋ยวคราวหน้าเราจะมาต่อกันที่ consumer นะครับ
Top comments (0)