In a previous post, we showed how to receive iot device data from an MQTT broker. In this post, we will store the data to a database.
In a robust system, we may choose to store the raw data events in a data lake. Perhaps, we'll explore that in the future; but for now we'll store it in PostGres for simplicity.
The previous post demonstrated receiving the raw data and unmarshalling it into a struct that was already annotated with gorm tags. Gorm is a popular ORM for Go. If you are not familiar with it, you can for more information here.
type IoTDeviceMessage struct {
BaseModel
Time time.Time `json:"time" gorm:"index"`
DeviceID string `json:"device_id"`
DeviceType string `json:"device_type"`
DeviceData json.RawMessage `json:"device_data"`
}
So all we need to do is to configure Postgres connection, then use gorm to save the event data.
func setupPostgres(logger *zerolog.Logger) *Repository {
dbHost := os.Getenv("POSTGRES_HOST")
dbName := os.Getenv("POSTGRES_DB")
dbPort := os.Getenv("POSTGRES_PORT")
dbUser := os.Getenv("POSTGRES_USER")
dbPassword := os.Getenv("POSTGRES_PASSWORD")
dsn := fmt.Sprintf("host=%s user=%s password=%s dbname=%s port=%s sslmode=disable TimeZone=UTC",
dbHost, dbUser, dbPassword, dbName, dbPort)
logger.Info().Msg(fmt.Sprintf("Connecting to PostgreSQL at %s", dsn))
db, err := gorm.Open(postgres.Open(dsn), &gorm.Config{})
if err != nil {
logger.Fatal().Err(err).Msg("failed to connect to database")
}
// Auto-migrate the schema
err = db.AutoMigrate(&IoTDataEvent{}, &IoTRawDeviceMessage{}, &DeviceModel{}, &TempRHDevice{})
if err != nil {
logger.Fatal().Err(err).Msg("failed to migrate models")
}
sqlDB, err := db.DB()
sqlDB.SetMaxIdleConns(10)
sqlDB.SetMaxOpenConns(100)
sqlDB.SetConnMaxLifetime(time.Hour)
repo := NewRepository(db, logger)
return repo
}
Here we setup the Postgres connection. Note that we are using environment variables to store our sensitive information. This is a good practice for production systems whether they are containerized or not.
We are also initializing a struct called Repository. This struct contains our actual storage and retrieval methods. This provides us some separation from the postgres configuration.
type Repository struct {
db *gorm.DB
logger *zerolog.Logger
}
func NewRepository(db *gorm.DB, logger *zerolog.Logger) *Repository {
return &Repository{db: db, logger: logger}
}
func (r *Repository) Close() {
sqlDb, err := r.db.DB()
if err != nil {
r.logger.Error().Err(err).Msg("failed to close database")
return
}
_ = sqlDb.Close()
}
...
// Message-related functions
func (r *Repository) CreateMessage(message *IoTDeviceMessage) error {
return r.db.Create(message).Error
}
func (r *Repository) GetMessagesByDeviceID(deviceID uint, limit int) ([]IoTDeviceMessage, error) {
var messages []IoTDeviceMessage
err := r.db.Where("device_id = ?", deviceID).Order("timestamp desc").Limit(limit).Find(&messages).Error
return messages, err
}
func (r *Repository) DeleteMessagesByDeviceID(deviceID uint) error {
return r.db.Where("device_id = ?", deviceID).Delete(&IoTDeviceMessage{}).Error
}
Now the message just needs to be persisted. Since we are using the pipeline pattern to process the messages, we will add the persist step a new stage in the pipeline.
// pipeline stage to persist the message
func persistIoTEvent(ctx context.Context, logger *zerolog.Logger, repo *Repository, input <-chan IoTRawDeviceMessage) chan IoTRawDeviceMessage {
out := make(chan IoTRawDeviceMessage)
go func() {
defer close(out)
for iotMsg := range input {
logger.Info().Msg(fmt.Sprintf("Persist iot msg for device: %s", iotMsg.DeviceID))
err := repo.CreateMessage(&iotMsg)
if err != nil {
logger.Error().Err(err).Msg("Error creating IoTRawDeviceMessage")
}
}
}()
return out
}
...
finalChan := persistIoTEvent(ctx, logger, repo, processMsg(ctx, logger, mqttMsgChan))
for iotMsg := range finalChan {
// now we have the IoTRawDeviceMessage that has been persisted
logger.Info().Msg(fmt.Sprintf("Received iot msg: %+v", iotMsg))
// do something like check for alert conditions
}
That's all there is to it.
The code for this can be found here. You can use it with the same publisher code from the previous post. Be sure to configure your Postgres settings as environment variables.
Top comments (0)
Some comments may only be visible to logged-in visitors. Sign in to view all comments.