DEV Community

perber
perber

Posted on • Edited on

Building a Kubernetes Dashboard: Implementing a real-time Logview with Server-Sent Events and React-Window

Last year, my colleague and I started working on a Kubernetes dashboard called kdd. The goal of the dashboard is to make troubleshooting easier and provide insights into the overall health of a Kubernetes cluster. One of the most important features for troubleshooting is the logview, which allows for real-time logs, following logs, highlighting and filtering, getting logs from multiple pods within a workload, and selecting a specific pod while streaming the logs from the pods in the selected workload.

In this blog post, I will explain how we implemented a real-time logview using Server-Sent Events, React-Window, and Go as the backend language. I will provide code snippets and explanations for the backend and frontend implementation, as well as the challenges we faced and how we solved them. Our goal is to make kdd an easy-to-use troubleshooting solution for developers, even before observability tools like Jaeger or Prometheus are set up.

Here you could find the source code: https://github.com/perber/kdd
Please be aware that we are in a pretty early phase.

This is how the logview is looking:
Image kdd logview

Requirements

Before diving into the implementation details, let's outline the requirements for our logview:

  • Real-time logs: The logview should show real-time logs streaming from Kubernetes API server.
  • Follow logs: The logview should allow users to follow the logs as they are being generated in real-time.
  • Highlighting and filtering: The logview should highlight and filter logs based on specific search terms entered by the user.
  • Display logs from multiple pods within a workload: The logview should allow users to display logs from multiple pods within a workload.
  • Select specific pod while streaming the whole logs from a workload: The logview should allow users to select a specific pod while streaming logs from all pods.

Backend Implementation

We chose to use Go as our backend language for the Kubernetes Dashboard project. Our backend doesn't only access the Kubernetes API, but also store metrics for a longer period of time and we want to implement a history feature as well. The aim of the dashboard is to be an easy-to-use troubleshooting solution. Which should give the user the most important insights into the cluster without the need to customize dashboards. It should be easy to install and should give valuable insights before monitoring solutions like prometheus are set up.

To enable real-time log streaming, we implemented Server-Sent Events (SSE) using the Gin framework. We set the necessary headers for the SSE response in the setHeaders() function:



// setHeaders sets headers for the SSE response
func (sa *StreamAPI) setHeaders(c *gin.Context) {
    c.Writer.Header().Set("Content-Type", "text/event-stream")
    c.Writer.Header().Set("Cache-Control", "no-cache")
    c.Writer.Header().Set("Connection", "keep-alive")
    c.Writer.Header().Set("Transfer-Encoding", "chunked")
    c.Writer.Header().Set("X-Accel-Buffering", "no")
}


Enter fullscreen mode Exit fullscreen mode

The GetLogs() endpoint is called to get logs from the Kubernetes API server. It validates and extracts the required parameters, streams logs from the Kubernetes API server, and streams logs to the client using SSE. The streamLogsToClient() function handles the streaming of logs to the client.



// GetLogs is the endpoint which gets called 
func (sa *StreamAPI) GetLogs(c *gin.Context) {
    sa.setHeaders(c)

    // Validate and extract parameters
    params := map[string]string{
        "workloadType": c.Param("workloadType"),
        "namespace":    c.Param("namespace"),
        "name":         c.Param("name"),
        "container":    c.Param("container"),
    }
    for paramName, paramValue := range params {
        if paramName != "container" && paramValue == "" {
            c.JSON(http.StatusBadRequest, gin.H{"error": "missing parameter " + paramName})
            return
        }
    }

    // Stream logs from Kubernetes API server
    logs := make(chan models.LogMessage)
    done := make(chan struct{})
    errCh := make(chan error)
    go func() {
        err := sa.logStreamClient.StreamLogs(c.Request.Context(), params, logs, done)
        if err != nil {
            errCh <- err
        }
    }()

    // Stream logs to client using Server-Sent Events (SSE)
    streamLogsToClient(c, logs, errCh, done)
}

func streamLogsToClient(c *gin.Context, logs chan models.LogMessage, errCh chan error, done chan struct{}) {
    for {
        select {
// received new log line in go channel
        case log := <-logs:
            c.SSEvent(log.Type, log)
            c.Writer.Flush()
        case err := <-errCh:
            c.SSEvent("error", err.Error())
            return
// channel should be closed
        case <-c.Writer.CloseNotify():
            close(done)
            return
        }
    }
}


Enter fullscreen mode Exit fullscreen mode

The StreamContainerLogs() function streams logs of a container in a Kubernetes pod to a channel until the done channel is closed. Each log message is sent as a streaming.Event with a log type and a LogMessage body containing the name of the pod, the name of the container, and the log message itself. If an error occurs while opening or reading the log stream, the method returns an error.



// StreamContainerLogs streams the logs of a container in a Kubernetes pod to a channel
// until the "done" channel is closed. Each log message is sent as a "streaming.Event"
// with a "log" type and a "LogMessage" body containing the name of the pod, the name of
// the container, and the log message itself. If an error occurs while opening or reading
// the log stream, the method returns an error.
func (lsc *LogStreamClient) StreamContainerLogs(ctx context.Context, namespace, name, container string, ch chan models.LogMessage, done <-chan struct{}) error {
    tailLines := int64(5000) // we don`t need more then 5000 logs. If more is required then tools like kibana are better suitable.
    options := &core_v1.PodLogOptions{
        Container:  container, // which container name
        Follow:     true,
        Timestamps: true, // include timestamps
        TailLines:  &tailLines,
    }

    podClient := lsc.cfg.ClientSet.CoreV1().Pods(namespace)
// open logs stream
    req := podClient.GetLogs(name, options)

    stream, err := req.Stream(ctx)
    if err != nil {
        return fmt.Errorf("error while opening stream: %w", err)
    }
    defer stream.Close()

// reading logs with scanner, line by line
    scanner := bufio.NewScanner(stream)
    for scanner.Scan() {
        select {
        case <-done:
            return nil
        default:
            text := scanner.Text()
            parts := strings.SplitN(text, "Z", 2) // the date ends with a Z, so we will split it there!

            timestamp, err := time.Parse(time.RFC3339Nano, fmt.Sprintf("%sZ", parts[0]))
            if err != nil {
                zap.L().Info("Error parsing timestamp ", zap.Error(err))
                return fmt.Errorf("error parsing timestamp: %v", err)
            }
// forward to other channel.
            message := strings.TrimSpace(parts[1])

            ch <- models.LogMessage{
                Type:      "log",
                Timestamp: timestamp,
                Pod:       name,
                Container: container,
                Message:   message,
                Unix:      timestamp.Unix(),
            }
        }
    }
    if err := scanner.Err(); err != nil {
        return fmt.Errorf("error while scanning log stream: %w", err)
    }

    return nil
}
```

You could checkout the code here: https://github.com/perber/kdd

## Frontend Implementation
The frontend of our logview component is responsible for rendering the log messages in real-time and providing the user with the ability to follow logs, highlight and filter logs.

To achieve this, we are using the fetch API instead of axios to enable Server-Sent Events (SSE) communication between the frontend and backend. We are creating an EventSource to listen to the stream, which works well with SSE, and using an EventEmitter to forward the data to the component.

```typescript
export function createLogStream(url: string): EventEmitter | Error {
    const logStream = new EventEmitter();
    let sse: EventSource | null = null

    async function startLogStream() {
        const eventSource = new EventSource(url);
        sse = eventSource

        eventSource.addEventListener("open", (e) => {
            console.log("connection established")
        })
        eventSource.addEventListener("log", (e) => {
            const logMessage = JSON.parse(e.data) as LogMessage;
            logStream.emit("log", logMessage)
        });

        eventSource.addEventListener("error", (e) => {
            console.error("an error occurred", e)
            logStream.emit("error", e)
        });
    }

    function stopLogStream(): void {
        if (sse) {
            sse.close() // closes connection
        }
        logStream.removeAllListeners(); // removes all event listeners
    }

    return Object.assign(logStream, { startLogStream, stopLogStream });
}

```
Once we receive log messages in the component, we buffer them before updating the logs state to prevent UI freeze while processing incoming logs. We also use debouncing to prevent the rendering of logs from becoming too fast, and we use virtualization to improve performance when many logs are being loaded.

```typescript
 // Ref to buffer messages before updating logs
    const bufferRef = useRef<Array<LogViewMessage>>([]);
...

    // updates logs
    const updateLogs = useCallback(() => {
        if (bufferRef.current.length > 0) {
            setLogs((prevLogs) =>
                [...prevLogs, ...bufferRef.current].sort((a, b) => a.unix - b.unix)
            );
            bufferRef.current = [];
            setLoading(false)
        }
    }, [])

    const debouncedUpdateLogs = useMemo(() => _.debounce(updateLogs, 1000, {
        leading: false,
        trailing: true,
    }), [updateLogs])

    // debounced function that updates logs
    const debouncedSetLogs = useCallback(() => {
        debouncedUpdateLogs()
    }, [debouncedUpdateLogs])

    // hook to cleanup debounced function
    useEffect(() => {
        return () => {
            debouncedUpdateLogs.cancel()
        }
    }, [debouncedUpdateLogs])

...
    // useEffect hook to handle streaming logs from the server for a given workload.

...
        const emitter: any = client().getLogs(workloadType, namespace, workloadName, container)
        emitter.on("log", (log: LogMessage) => {
            const logViewMessage: LogViewMessage = {
                ...log,
            };
...
            bufferRef.current.push(logViewMessage);
            debouncedSetLogs();
        });

        emitter.on("error", ((error: any) => {
            setStatus({ message: "oops! during streaming an error occurred.", success: false })
            console.error(error)
            emitter.stopLogStream()
        }))

        emitter.startLogStream()

        return () => {
            emitter.stopLogStream()
            setLogs([])
            setSearchTerm("")
        }

    }, [debouncedSetLogs, workloadType, namespace, workloadName, container, open])
```

We use the react-window library to implement virtualization, specifically a FixedSizeList component, which renders only those rows that are in the visible area.
```typescript
<FixedSizeList
    onItemsRendered={handleListReady}
    width="100%"
    height={containerHeight}
    itemCount={logs.length}
    itemSize={21}
    ref={listRef}
>
    {Row}
</FixedSizeList>
```

Finally, to highlight logs that match a search term, we pass an array of indexes of those log lines that match the search term to the LogLine component to render them differently.

```typescript
// Memoized Row component that renders a single log line
const Row = React.memo(({ index, style }: { index: number, style: React.CSSProperties }) => {
    const isHighlighted = highlightedLines.indexOf(index) !== -1;
    const log = logs[index];

    return <LogLine key={log.id} log={log} highlighted={isHighlighted} sx={style} />;
});
```

You could checkout the code here: https://github.com/perber/kdd


## Challenges & Solutions
**Challenges**
- Using SSE with fetch API instead of axios
- Unresponsive UI when many logs are loading
- Rendering many logs can lead to performance issues
- Handling follow logs feature with search feature simultaneously

**Solutions**
- Used fetch API and EventSource to implement SSE in the frontend
- Implemented a buffer and debounce to prevent the UI from becoming unresponsive while loading many logs and used react-window for virtualization to avoid performance issues
- Implemented a memoized Row component that renders a single log line to improve rendering performance
- Follow logs and search: When a user is searching for a specific log message, disable the "follow logs" feature to prevent the log view from automatically scrolling to the bottom. However, it is still important to receive new log messages in real-time, so continue streaming them to the client.

## Conclusion
Overall, we're really excited about the progress we've made with kdd and the real-time logview feature we've implemented. We hope that this feature will help many developers and DevOps teams save time and will make debugging easier.

Of course, we know that there's still more work to be done. In the future, we're planning to add features like the ability to see previous logs and to display only those logs that match a search string. We're also planning to define a clear roadmap for the project and start implementing testing to ensure that everything is working as expected.

If you're interested in contributing to the project or have any questions about our implementation, please don't hesitate to reach out. You can find the project on GitHub at https://github.com/perber/kdd. We're always happy to collaborate and learn from others in the community.
Enter fullscreen mode Exit fullscreen mode

Top comments (2)

Collapse
 
daming000 profile image
daming000