DEV Community πŸ‘©β€πŸ’»πŸ‘¨β€πŸ’»

Cover image for Go and ElasticSearch full-text search microservice in k8sπŸ‘‹βœ¨πŸ’«
Alexander
Alexander

Posted on

Go and ElasticSearch full-text search microservice in k8sπŸ‘‹βœ¨πŸ’«

πŸ‘¨β€πŸ’» Full list what has been used:

Elasticsearch client for Go
RabbitMQ Go RabbitMQ Client Library
Jaeger open source, end-to-end distributed tracing
Prometheus monitoring and alerting
Grafana for to compose observability dashboards with everything from Prometheus
Echo web framework
Kibana is user interface that lets you visualize your Elasticsearch
Docker and docker-compose
Kubernetes K8s
Helm The package manager for Kubernetes

Source code you can find in GitHub repository.
The main idea of this project is the implementation of a full-text search with support for synonyms, mistyping, and the wrong keyboard layout
using Elasticsearch and RabbitMQ.

All UI interfaces will be available on ports:

Grafana UI: http://localhost:3005

Grafana

Kibana UI: http://localhost:5601/app/home#/

Kibana

RabbitMQ UI: http://localhost:15672

RabbitMQ

Jaeger UI: http://localhost:16686

Jaeger

Prometheus UI: http://localhost:9090

Prometheus

Docker-compose file for this project:

version: "3.9"

services:
  rabbitmq:
    image: rabbitmq:3.9-management-alpine
    container_name: rabbitmq
    restart: always
    ports:
      - "5672:5672"
      - "15672:15672"
    networks: [ "microservices" ]

  node01:
    image: docker.elastic.co/elasticsearch/elasticsearch:8.3.3
    container_name: node01
    restart: always
    environment:
      - node.name=node01
      - cluster.name=es-cluster-8
      - discovery.type=single-node
      - "ES_JAVA_OPTS=-Xms512m -Xmx512m"
      - xpack.license.self_generated.type=basic
      - xpack.security.enabled=false
    ulimits:
      memlock:
        soft: -1
        hard: -1
    volumes:
      - ./es-data01:/usr/share/elasticsearch/data
    ports:
      - "9200:9200"
      - "9300:9300"
    networks: [ "microservices" ]

  kibana:
    image: docker.elastic.co/kibana/kibana:8.3.3
    restart: always
    environment:
      ELASTICSEARCH_HOSTS: http://node01:9200
    ports:
      - "5601:5601"
    depends_on:
      - node01
    networks: [ "microservices" ]

  jaeger:
    container_name: jaeger_container
    restart: always
    image: jaegertracing/all-in-one:1.35
    environment:
      - COLLECTOR_ZIPKIN_HTTP_PORT=9411
    ports:
      - "5775:5775/udp"
      - "6831:6831/udp"
      - "6832:6832/udp"
      - "5778:5778"
      - "16686:16686"
      - "14268:14268"
      - "14250:14250"
      - "9411:9411"
    networks: [ "microservices" ]

  prometheus:
    image: prom/prometheus:latest
    container_name: prometheus
    ports:
      - "9090:9090"
    command:
      - --config.file=/etc/prometheus/prometheus.yml
    volumes:
      - ./monitoring/prometheus.yml:/etc/prometheus/prometheus.yml:ro
    networks: [ "microservices" ]

  node_exporter:
    container_name: node_exporter_container
    restart: always
    image: prom/node-exporter
    ports:
      - '9101:9100'
    networks: [ "microservices" ]

  grafana:
    container_name: grafana_container
    restart: always
    image: grafana/grafana
    ports:
      - '3005:3000'
    networks: [ "microservices" ]

volumes:
  es-data01:
    driver: local

networks:
  microservices:
    name: microservices
Enter fullscreen mode Exit fullscreen mode

Full-text search with auto-completion can be realized in different ways, it's up to you choose which is better for your case.
First, we have to create mappings for our index, of course, elasticsearch can create it for us, but it's not a production solution.
As data model used here simple abstract shopping product item with searchable title, description, and shop name fields, for this example it's enough.
The creation of the right mapping is very important and tricky,
to realize full-text search with synonyms, mistyping, and wrong keyboard layout we will configure our own analyzer,
which combines the character filters, tokenizer, and token filters.

Our ngram_filter is elastic builtin edge_ngram filter,
for understand how it works highly recommend read elasticsearch documentation,
We have to assign min and max values, it always depends on each unique case.
Next, let's specify name_synonym_filter, in synonyms field we have to add an array of strings, each string is a comma separate synonyms which will be used for search.

analyzer

In mapping properties let's create our document fields mappings, for full-text search field is "type": "text",
as an analyzer our newly created autocomplete_analyzer, it's used when elastic index documents, and for search queries, we don't need so complex analyzer it will only slow down our search queries,
so we add another one search_analyzer: standard
Handling of wrong keyboard language layout we can implement in different ways, on elasticsearch side by synonyms, but it makes mappings to huge
and on the application level by creating a mapping for keyboard language then map users search term to apposite language and send to elastic query for search any one of it,
so for example search the words "Apple, apple, яблоко, Π―Π±Π»ΠΎΠΊΠΎ, z,kjrj, Z,kjrj, Π€Π·Π·Π΄Ρƒ, Ρ„Π·Π·Π΄Ρƒ" will find the same for the all of it.
In real-world scenarios usually mapping is much more complicated but here for example it's enough:

mappings

For Go available two good libraries for elasticsearch, the official Elasticsearch client and another one from community olivere elastic,
both is good, but at this moment only the official client supports 8 version of elasticsearch and for serious production think it's the choice.

Our microservice interacts by HTTP using Echo web framework and RabbitMQ official client,
REST controller has index document and search methods:

func (h *productController) indexAsync() echo.HandlerFunc {
    return func(c echo.Context) error {
        ctx, span := tracing.StartHttpServerTracerSpan(c, "productController.indexAsync")
        defer span.Finish()

        var product domain.Product
        if err := c.Bind(&product); err != nil {
            h.log.Errorf("(Bind) err: %v", tracing.TraceWithErr(span, err))
            return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
        }
        product.ID = uuid.NewV4().String()

        if err := h.productUseCase.IndexAsync(ctx, product); err != nil {
            h.log.Errorf("(productUseCase.IndexAsync) err: %v", tracing.TraceWithErr(span, err))
            return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
        }

        h.log.Infof("created product: %+v", product)
        h.metrics.HttpSuccessIndexAsyncRequests.Inc()
        return c.JSON(http.StatusCreated, product)
    }
}

func (h *productController) index() echo.HandlerFunc {
    return func(c echo.Context) error {
        ctx, span := tracing.StartHttpServerTracerSpan(c, "productController.index")
        defer span.Finish()

        var product domain.Product
        if err := c.Bind(&product); err != nil {
            h.log.Errorf("(Bind) err: %v", tracing.TraceWithErr(span, err))
            return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
        }
        product.ID = uuid.NewV4().String()

        if err := h.productUseCase.Index(ctx, product); err != nil {
            h.log.Errorf("(productUseCase.Index) err: %v", tracing.TraceWithErr(span, err))
            return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
        }

        h.log.Infof("created product: %+v", product)
        h.metrics.HttpSuccessIndexRequests.Inc()
        return c.JSON(http.StatusCreated, product)
    }
}

func (h *productController) search() echo.HandlerFunc {
    return func(c echo.Context) error {
        ctx, span := tracing.StartHttpServerTracerSpan(c, "productController.search")
        defer span.Finish()

        searchTerm := c.QueryParam("search")
        pagination := utils.NewPaginationFromQueryParams(c.QueryParam(constants.Size), c.QueryParam(constants.Page))

        searchResult, err := h.productUseCase.Search(ctx, searchTerm, pagination)
        if err != nil {
            h.log.Errorf("(productUseCase.Search) err: %v", tracing.TraceWithErr(span, err))
            return httpErrors.ErrorCtxResponse(c, err, h.cfg.Http.DebugErrorsResponse)
        }

        h.log.Infof("search result: %s", searchResult.PaginationResponse.String())
        h.metrics.HttpSuccessSearchRequests.Inc()
        span.LogFields(log.String("search result", searchResult.PaginationResponse.String()))
        return c.JSON(http.StatusOK, dto.SearchProductsResponse{
            SearchTerm: searchTerm,
            Pagination: searchResult.PaginationResponse,
            Products:   searchResult.List,
        })
    }
}
Enter fullscreen mode Exit fullscreen mode

Postman

The useCase for IndexAsync method serialize data and publish it to RabbitMQ:

func (p *productUseCase) IndexAsync(ctx context.Context, product domain.Product) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "productUseCase.IndexAsync")
    defer span.Finish()
    span.LogFields(log.Object("product", product))

    dataBytes, err := serializer.Marshal(&product)
    if err != nil {
        return tracing.TraceWithErr(span, errors.Wrap(err, "serializer.Marshal"))
    }

    return p.amqpPublisher.Publish(
        ctx,
        p.cfg.ExchangeAndQueueBindings.IndexProductBinding.ExchangeName,
        p.cfg.ExchangeAndQueueBindings.IndexProductBinding.BindingKey,
        amqp.Publishing{
            Headers:   tracing.ExtractTextMapCarrierHeadersToAmqpTable(span.Context()),
            Timestamp: time.Now().UTC(),
            Body:      dataBytes,
        },
    )
}
Enter fullscreen mode Exit fullscreen mode

Jaeger

The RabbitMQ consumer is listening queue and processing messages using BulkIndexer for Bulk API
which has better performance for indexing documents.

func (c *consumer) ConsumeIndexDeliveries(ctx context.Context, deliveries <-chan amqp.Delivery, workerID int) func() error {
    return func() error {
        c.log.Infof("starting consumer workerID: %d, for queue deliveries: %s", workerID, c.cfg.ExchangeAndQueueBindings.IndexProductBinding.QueueName)

        for {
            select {
            case <-ctx.Done():
                c.log.Errorf("products consumer ctx done: %v", ctx.Err())
                return ctx.Err()

            case msg, ok := <-deliveries:
                if !ok {
                    c.log.Errorf("deliveries channel closed for queue: %s", c.cfg.ExchangeAndQueueBindings.IndexProductBinding.QueueName)
                    return errors.New("deliveries channel closed")
                }

                c.log.Infof("Consumer delivery: workerID: %d, msg data: %s, headers: %+v", workerID, string(msg.Body), msg.Headers)

                if err := c.bulkIndexProduct(ctx, msg); err != nil {
                    c.log.Errorf("bulkIndexProduct err: %v", err)
                    continue
                }

                c.log.Infof("Consumer <<<ACK>>> delivery: workerID: %d, msg data: %s, headers: %+v", workerID, string(msg.Body), msg.Headers)
                c.metrics.RabbitMQSuccessBatchInsertMessages.Inc()
            }
        }

    }
}

func (c *consumer) indexProduct(ctx context.Context, msg amqp.Delivery) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "consumer.indexProduct")
    defer span.Finish()

    var product domain.Product
    if err := serializer.Unmarshal(msg.Body, &product); err != nil {
        c.log.Errorf("indexProduct serializer.Unmarshal <<<Reject>>> err: %v", tracing.TraceWithErr(span, err))
        return msg.Reject(true)
    }
    if err := c.productUseCase.Index(ctx, product); err != nil {
        c.log.Errorf("indexProduct productUseCase.Index <<<Reject>>> err: %v", tracing.TraceWithErr(span, err))
        return msg.Reject(true)
    }
    return msg.Ack(true)
}

func (c *consumer) bulkIndexProduct(ctx context.Context, msg amqp.Delivery) error {
    ctx, span := tracing.StartRabbitConsumerTracerSpan(ctx, msg.Headers, "consumer.bulkIndexProduct")
    defer span.Finish()

    var product domain.Product
    if err := serializer.Unmarshal(msg.Body, &product); err != nil {
        c.log.Errorf("indexProduct serializer.Unmarshal <<<Reject>>> err: %v", tracing.TraceWithErr(span, err))
        return msg.Reject(true)
    }

    if err := c.bulkIndexer.Add(ctx, esutil.BulkIndexerItem{
        Index:      c.cfg.ElasticIndexes.ProductsIndex.Name,
        Action:     "index",
        DocumentID: product.ID,
        Body:       bytes.NewReader(msg.Body),
        OnSuccess: func(ctx context.Context, item esutil.BulkIndexerItem, item2 esutil.BulkIndexerResponseItem) {
            c.log.Debugf("bulk indexer onSuccess for index alias: %s", c.cfg.ElasticIndexes.ProductsIndex.Alias)
        },
        OnFailure: func(ctx context.Context, item esutil.BulkIndexerItem, item2 esutil.BulkIndexerResponseItem, err error) {
            if err != nil {
                c.log.Errorf("bulk indexer OnFailure err: %v", err)
            }
        },
    }); err != nil {
        c.log.Errorf("indexProduct bulkIndexer.Add <<<Reject>>> err: %v", tracing.TraceWithErr(span, err))
        return msg.Reject(true)
    }

    c.log.Infof("consumer <<<ACK>>> bulk indexer add product: %+v", product)
    return msg.Ack(true)
}
Enter fullscreen mode Exit fullscreen mode

Kibana

The repository has the same methods for index and search.
For the search method, we use should multi_match query where we pass the original term and mapped to the opposite keyboard language layout search term.
Good practice for Elasticsearch is always use alias for indexes.
The implementation for keyboard language layout converter is to load JSON file with mappings when the application starts, marshal it to the map and have one method for converting one language to another.

func (k *keyboardLayoutsManager) GetOppositeLayoutWord(originalWord string) string {
    sb := k.sbPool.Get().(*strings.Builder)
    defer k.sbPool.Put(sb)
    sb.Reset()

    for _, c := range []rune(originalWord) {
        lowerCasedChar := strings.ToLower(string(c))
        if char, ok := k.keyMappings[lowerCasedChar]; ok {
            sb.WriteString(char)
        } else {
            sb.WriteString(lowerCasedChar)
        }
    }
    return sb.String()
}
Enter fullscreen mode Exit fullscreen mode

Jaeger

func (e *esRepository) Index(ctx context.Context, product domain.Product) error {
    span, ctx := opentracing.StartSpanFromContext(ctx, "esRepository.Index")
    defer span.Finish()
    span.LogFields(log.Object("product", product))

    dataBytes, err := serializer.Marshal(&product)
    if err != nil {
        return tracing.TraceWithErr(span, errors.Wrap(err, "serializer.Marshal"))
    }

    response, err := e.esClient.Index(
        e.cfg.ElasticIndexes.ProductsIndex.Alias,
        bytes.NewReader(dataBytes),
        e.esClient.Index.WithPretty(),
        e.esClient.Index.WithHuman(),
        e.esClient.Index.WithTimeout(indexTimeout),
        e.esClient.Index.WithContext(ctx),
        e.esClient.Index.WithDocumentID(product.ID),
    )
    if err != nil {
        return tracing.TraceWithErr(span, errors.Wrap(err, "esClient.Index"))
    }
    defer response.Body.Close()

    if response.IsError() {
        return tracing.TraceWithErr(span, errors.Wrap(errors.New(response.String()), "esClient.Index response error"))
    }

    e.log.Infof("document indexed: %s", response.String())
    return nil
}

func (e *esRepository) Search(ctx context.Context, term string, pagination *utils.Pagination) (*domain.ProductSearch, error) {
    span, ctx := opentracing.StartSpanFromContext(ctx, "esRepository.Search")
    defer span.Finish()
    span.LogFields(log.String("term", term))

    shouldQuery := map[string]any{
        "query": map[string]any{
            "bool": map[string]any{
                "should": []map[string]any{
                    {
                        "multi_match": map[string]any{
                            "query":  term,
                            "fields": searchFields,
                        },
                    },
                    {
                        "multi_match": map[string]any{
                            "query":  e.keyboardLayoutManager.GetOppositeLayoutWord(term),
                            "fields": searchFields,
                        },
                    },
                    {
                        "range": map[string]any{
                            "count_in_stock": map[string]any{
                                "gte": 0,
                            },
                        },
                    },
                },
            },
        },
    }

    dataBytes, err := serializer.Marshal(&shouldQuery)
    if err != nil {
        return nil, tracing.TraceWithErr(span, errors.Wrap(err, "serializer.Marshal"))
    }

    response, err := e.esClient.Search(
        e.esClient.Search.WithContext(ctx),
        e.esClient.Search.WithIndex(e.cfg.ElasticIndexes.ProductsIndex.Alias),
        e.esClient.Search.WithBody(bufio.NewReader(bytes.NewReader(dataBytes))),
        e.esClient.Search.WithPretty(),
        e.esClient.Search.WithHuman(),
        e.esClient.Search.WithTimeout(searchTimeout),
        e.esClient.Search.WithSize(pagination.GetSize()),
        e.esClient.Search.WithFrom(pagination.GetOffset()),
    )
    if err != nil {
        return nil, tracing.TraceWithErr(span, errors.Wrap(err, "esClient.Search"))
    }
    defer response.Body.Close()

    if response.IsError() {
        return nil, tracing.TraceWithErr(span, errors.Wrap(errors.New(response.String()), "esClient.Search error"))
    }

    hits := esclient.EsHits[*domain.Product]{}
    if err := serializer.NewDecoder(response.Body).Decode(&hits); err != nil {
        return nil, tracing.TraceWithErr(span, errors.Wrap(err, "serializer.Decode"))
    }

    responseList := make([]*domain.Product, len(hits.Hits.Hits))
    for i, source := range hits.Hits.Hits {
        responseList[i] = source.Source
    }

    e.log.Infof("repository search result responseList: %+v", responseList)
    return &domain.ProductSearch{
        List:               responseList,
        PaginationResponse: utils.NewPaginationResponse(hits.Hits.Total.Value, pagination),
    }, nil
}
Enter fullscreen mode Exit fullscreen mode

Lens

For Kubernetes prefer to use minikube with helm,
RabbitMQ, Elasticsearch, Jaeger, Kibana used same images as docker-compose file has, and prometheus community chart,
for correct working we have to add ServiceMonitor component for our microservice.
Working with k8s personally like to use lens it has a friendly UI and many useful features.

apiVersion: apps/v1
kind: Deployment
metadata:
  name: {{ .Values.searchMicroserviceName }}
  labels:
    app: {{ .Values.searchMicroserviceName }}
spec:
  replicas: {{ .Values.searchMicroserviceReplicas }}
  template:
    metadata:
      name: {{ .Values.searchMicroserviceName }}
      labels:
        app: {{ .Values.searchMicroserviceName }}
    spec:
      containers:
        - name: {{ .Values.searchMicroserviceName }}
          image: {{.Values.searchMicroserviceImage }}
          imagePullPolicy: Always
          resources:
            requests:
              memory: {{.Values.resources.requests.memory }}
              cpu: {{.Values.resources.requests.cpu }}
            limits:
              memory: {{.Values.resources.limits.memory }}
              cpu: {{.Values.resources.limits.cpu }}
          livenessProbe:
            httpGet:
              path: {{.Values.searchMicroserviceLivenessProbePath }}
              port: {{.Values.searchMicroserviceLivenessProbePort }}
            initialDelaySeconds: {{ .Values.searchMicroserviceInitialDelaySeconds }}
            periodSeconds: {{ .Values.searchMicroservicePeriodSeconds }}
          readinessProbe:
            httpGet:
              path: {{.Values.searchMicroserviceReadinessProbePath }}
              port: {{.Values.searchMicroserviceReadinessProbePort }}
            initialDelaySeconds: {{ .Values.searchMicroserviceInitialDelaySeconds }}
            periodSeconds: {{ .Values.searchMicroservicePeriodSeconds }}
          ports:
            - containerPort: {{.Values.searchMicroserviceHttpPort }}
              name: http
            - containerPort: {{.Values.searchMicroserviceMetricsPort }}
              name: metrics
            - containerPort: {{.Values.searchMicroserviceHealthcheckPort }}
              name: healthcheck
          env:
            - name: JAEGER_HOST_PORT
              value: {{ .Values.jaegerHotPost }}
            - name: ELASTIC_URL
              value: {{ .Values.elasticSearchURL }}
            - name: RABBITMQ_URI
              value: {{ .Values.rabbitMqURI }}
            - name: CONFIG_PATH
              value: "/search-config/search-config.yaml"
          volumeMounts:
            - name: config
              mountPath: "/search-config"
      restartPolicy: Always
      terminationGracePeriodSeconds: {{ .Values.searchMicroserviceTerminationGracePeriodSeconds }}
      volumes:
        - name: config
          configMap:
            name: {{ .Values.searchMicroserviceName }}-config-map
            items:
              - key: search-config.yaml
                path: search-config.yaml
  selector:
    matchLabels:
      app: {{ .Values.searchMicroserviceName }}

---

apiVersion: v1
kind: Service
metadata:
  name: {{ .Values.searchMicroserviceName }}-service
  labels:
    app: {{ .Values.searchMicroserviceName }}
spec:
  type: ClusterIP
  selector:
    app: {{ .Values.searchMicroserviceName }}
  ports:
    - name: http
      port: {{.Values.searchMicroserviceHttpPort }}
      protocol: TCP
    - name: healthcheck
      port: {{.Values.searchMicroserviceHealthcheckPort }}
      protocol: TCP
      targetPort: metrics
    - name: metrics
      port: {{.Values.searchMicroserviceMetricsPort }}
      protocol: TCP
      targetPort: metrics

---

apiVersion: monitoring.coreos.com/v1
kind: ServiceMonitor
metadata:
  labels:
    release: monitoring
  name: {{ .Values.searchMicroserviceName }}-service-monitor
  namespace: default
spec:
  selector:
    matchLabels:
      app: {{ .Values.searchMicroserviceName }}
  endpoints:
    - interval: 10s
      port: metrics
      path: {{.Values.prometheusPath }}
  namespaceSelector:
    matchNames:
      - default

---

apiVersion: v1
kind: ConfigMap
metadata:
  name: {{ .Values.searchMicroserviceName }}-config-map
data:
  search-config.yaml: |
    serviceName: search_microservice
    grpc:
      port: :5001
      development: true
    http:
      port: :{{ .Values.searchMicroserviceHttpPort }}
      development: {{ .Values.http.development }}
      basePath: {{ .Values.http.basePath }}
      productsPath: {{ .Values.http.productsPath }}
      debugErrorsResponse: {{ .Values.http.debugErrorsResponse }}
      ignoreLogUrls: {{ .Values.http.ignoreLogUrls }}
    probes:
      readinessPath: {{ .Values.searchMicroserviceReadinessProbePath }}
      livenessPath: {{ .Values.searchMicroserviceLivenessProbePath }}
      port: :{{ .Values.searchMicroserviceHealthcheckPort }}
      pprof: :6001
      prometheusPath: {{ .Values.prometheusPath }}
      prometheusPort: :{{.Values.searchMicroserviceMetricsPort }}
      checkIntervalSeconds: 10
    logger:
      level: {{ .Values.searchMicroserviceLogging.level }}
      devMode: {{ .Values.searchMicroserviceLogging.devMode }}
      encoder: {{ .Values.searchMicroserviceLogging.encoder }}
    jaeger:
      enable: true
      serviceName: {{ .Values.searchMicroserviceName }}
      hostPort: {{ .Values.jaegerHotPost }}
      logSpans: false
    timeouts:
      postgresInitMilliseconds: 1500
      postgresInitRetryCount: 3
    elasticSearch:
      addresses: [ {{ .Values.elasticSearchURL }} ]
      username: ""
      password: ""
      apiKey: ""
      enableLogging: false
    elasticIndexes:
      products:
        path: {{ .Values.elasticIndexes.products.path }}
        name: {{ .Values.elasticIndexes.products.name }}
        alias: {{ .Values.elasticIndexes.products.alias }}
    rabbitmq:
      uri: {{ .Values.rabbitMqURI }}
    exchangeAndQueueBindings:
      indexProductBinding:
        exchangeName: {{.Values.exchangeAndQueueBindings.indexProductBinding.exchangeName }}
        exchangeKind: {{.Values.exchangeAndQueueBindings.indexProductBinding.exchangeKind }}
        queueName: {{.Values.exchangeAndQueueBindings.indexProductBinding.queueName }}
        bindingKey: {{.Values.exchangeAndQueueBindings.indexProductBinding.bindingKey }}
        concurrency: {{.Values.exchangeAndQueueBindings.indexProductBinding.concurrency }}
        consumer: {{.Values.exchangeAndQueueBindings.indexProductBinding.consumer }}
    bulkIndexer:
      numWorkers: {{ .Values.bulkIndexer.numWorkers }}
      flushBytes: {{ .Values.bulkIndexer.flushBytes }}
      flushIntervalSeconds: {{ .Values.bulkIndexer.flushIntervalSeconds }}
      timeoutMilliseconds: {{ .Values.bulkIndexer.timeoutMilliseconds }}
Enter fullscreen mode Exit fullscreen mode

More details and source code of the full project you can find here,
of course, in real-world applications, full-text search and business requirements can be much more complicated and for example includes machine learning, etc.
I hope this article is usefully and helpfully, and be happy to receive any feedback or questions, feel free to contact me by email or any messengers :)

Top comments (0)

🌚 Life is too short to browse without dark mode