DEV Community

Rocky LIU Yan
Rocky LIU Yan

Posted on • Updated on

Smooth Streaming Control with System.Threading.Channels in GPT API Programming

In today’s digital world, real-time interactions are becoming increasingly essential, especially in applications that leverage powerful AI models like OpenAI’s GPT-4. One of the significant advancements in .NET that ensures smooth and efficient handling of streaming data is the System.Threading.Channels library. This blog explores how to harness this feature to improve streaming control when interacting.

What is System.Threading.Channels?

System.Threading.Channels is a modern concurrency primitive provided in .NET for handling asynchronous messaging between producer and consumer tasks. It allows developers to implement robust and scalable applications that require real-time updates without blocking the main execution flow.

Why Use Channels for Streaming?

  1. Asynchronous Message HandlingUtilizing channels helps decouple the production of messages from their consumption. In an interactive chat scenario, the producer (responsible for generating responses) can run independently of the consumer (which handles display logic). This means users can continue to interact with the application while responses are being processed in the background.
  2. Efficient Backpressure ManagementA critical feature of channels is their ability to manage backpressure effectively. In scenarios where the producer is generating messages faster than the consumer can process them, channels prevent overloading. This is crucial for maintaining application responsiveness and ensures that the system remains stable under varying loads.
  3. Flow ControlDevelopers can set constraints for the channel to regulate the maximum number of messages being queued. This enables precise control over resource allocation, preventing excessive memory consumption and ensuring that the application runs smoothly even during peak activity periods.

How It Works: A Breakdown of the Code

Let’s look at how to implement it.

Producer Task

The producer task fetches updates from the GPT model and sends them to the channel. Here’s a snippet demonstrating this:

var producer = Task.Run(async () =>
{
    int messageCount = 0;
    await foreach (var update in kernel.InvokePromptStreamingAsync(query))
    {
        if (messageCount == 0) stopwatch.Start();
        messageCount++;
        sourceText.Append(update);
        layout["Output"].Update(
            new Panel(
                Align.Left(new Markup(Markup.Escape(sourceText.ToString())))
            ).Expand()
        );
        ctx.Refresh();
        await channel.Writer.WriteAsync(update.ToString());
    }
    channel.Writer.Complete();
    stopwatch.Stop();
});
Enter fullscreen mode Exit fullscreen mode

In this code, the producer retrieves responses asynchronously and updates the display in real-time as messages are received, maintaining an engaging user experience.

Consumer Task

The consumer task reads messages from the channel and processes them:

var consumer = Task.Run(async () =>
{
    int totalLength = 0;
    await foreach (var message in channel.Reader.ReadAllAsync())
    {
        totalLength += message.Length;
        // Further processing of the message...
    }
});

By separating the logic for handling incoming messages, developers can keep the user interface responsive, allowing for fluid interactions without lag.

Performance Monitoring

Monitoring performance is essential in any real-time application. You can track key metrics like request duration and latency to optimize your workflow. For example:

layout["status"].Update(
    new Panel(
        Align.Center(new Markup($"Request Time: {stopwatch.Elapsed}"),
            VerticalAlignment.Middle)
        ).Expand()
);
Enter fullscreen mode Exit fullscreen mode

This feedback loop enhances the development and user experience by providing insights into application performance.

Components

  1. OpenAI Chat Completion(Microsoft.SemanticKernel): The application integrates with OpenAI’s API to leverage the GPT-4 model for generating responses based on user input.
  2. Spectre.Console: Used to create a visually appealing console interface, allowing for dynamic updates and rendering of text.

Conclusion

Using System.Threading.Channels for enhanced streaming control significantly improves performance, user experience, and stability. By implementing this powerful feature, developers can create real-time, interactive applications that seamlessly handle the demands of modern users.

If you’re building an application that requires real-time updates or interactions with AI models, consider adopting System.Threading.Channels for a smoother, more efficient experience. Happy coding!

Full code :

// See https://aka.ms/new-console-template for more information
using Microsoft.SemanticKernel;
using Spectre.Console;
using System;
using System.Text;
using System.Threading.Channels;
using System.Threading.Tasks;


using Microsoft.AspNetCore.Builder;
using Microsoft.Extensions.DependencyInjection;

using System.Threading.Tasks;
using System.Diagnostics;

// Create a kernel with OpenAI chat completion
var key = "sk-";
var model = "gpt-4o-mini";
var chat_url = "https://api.chatgpt.com/v1/chat/completions";

#pragma warning disable SKEXP0010 // Type is for evaluation purposes only and is subject to change or removal in future updates. Suppress this diagnostic to proceed.
var kernel = Kernel
                   .CreateBuilder()
                   .AddOpenAIChatCompletion(modelId: model, endpoint: new Uri(chat_url), apiKey: key).Build();
#pragma warning restore SKEXP0010 // Type is for evaluation purposes only and is subject to change or removal in future updates. Suppress this diagnostic to proceed.
//kernel.PromptRenderFilters.Add(new PromptRenderLoggingFilter());

// Create the layout
var layout = new Layout("Root")
    .SplitColumns(
        new Layout("Left"),
        new Layout("R").SplitRows(new Layout("Right"), new Layout("status"))
        );
layout["Right"].Ratio(11);
layout["Left"].Update(
new Panel(
    Align.Center(
        new Markup(""),
        VerticalAlignment.Middle))
    .Expand());

layout["Right"].Update(
new Panel(
    Align.Center(
        new Markup(""),
        VerticalAlignment.Middle))
    .Expand());
layout["status"].Update(
new Panel(
Align.Center(
    new Markup(""),
    VerticalAlignment.Middle))
.Expand());
Stopwatch gapWatch = new();
Stopwatch origanWatch = new();
AnsiConsole.Write(layout);
while (true)
{


    var q = AnsiConsole.Ask<string>("Let's talk, What do you want?:rocket: ");



    var channel = Channel.CreateUnbounded<string>();
    StringBuilder sourceText = new();
    StringBuilder targetText = new();




    await AnsiConsole.Live(layout)
         .StartAsync(async ctx =>
         {
             layout["Left"].Update(
                                     new Panel(
                                         Align.Center(
                                             new Markup("Loading:fire:"),
                                             VerticalAlignment.Middle))
                                         .Expand());
             ctx.Refresh();


             // AnsiConsole.Write(layout);

             var producer = Task.Run(async () =>
                     {
                         var i = 0;
                         await foreach (var update in kernel.InvokePromptStreamingAsync(q))
                         {
                             if (i == 0) origanWatch.Start();
                             i++;
                             sourceText.Append(update);
                             // Update the left column
                             layout["Left"].Update(
                                     new Panel(
                                         Align.Left(
                                             new Markup(Markup.Escape(sourceText.ToString())),
                                             VerticalAlignment.Middle))
                                         .Expand());
                             ctx.Refresh();


                             await channel.Writer.WriteAsync(update.ToString());
                         }
                         channel.Writer.Complete();
                         origanWatch.Stop();
                         gapWatch.Start();
                     });



             var consumer = Task.Run(async () =>
             {
                 //0.0014 rate/per 1k
                 int count = 0;
                 int baseDelay = 100;//400
                 int minDelay = 0;
                 int maxDelay = 20;//30
                 await foreach (var text in channel.Reader.ReadAllAsync())
                 {



                     count += text.Length;


                     var radio = (sourceText.Length - count) > 100 ? (sourceText.Length - count) / 30 : 1;

                     var smooth_delay = sourceText.Length == 0 ? minDelay : (baseDelay - (count * baseDelay / sourceText.Length)) / radio;


                     int delay = Math.Min(maxDelay, Math.Max(minDelay, smooth_delay));

                     layout["status"].Update(
                         new Panel(
                             Align.Center(
                                 new Markup($"delay:[red]{delay}[/] text length:[red]{sourceText.Length}[/] left:{sourceText.Length - count} speed: {smooth_delay} radio:{radio}  word_len: {text.Length} "),

                                 VerticalAlignment.Middle))
                             .Expand());



                     await Task.Delay(delay);  
                     targetText.Append(text);  
                                              //  if (!response.HttpContext.RequestAborted.IsCancellationRequested) 
                                              //  {
                                              //       
                                              //      await response.WriteAsync($"data: {text[i]}\n\n");
                                              //      await response.Body.FlushAsync();  
                                              //  }

                     layout["Right"].Update(
                         new Panel(
                             Align.Left(
                                 new Markup(Markup.Escape(targetText.ToString())),
                                 VerticalAlignment.Middle))
                             .Expand());
                     ctx.Refresh();



                 }

             });
             await Task.WhenAll(producer, consumer);

             gapWatch.Stop();
             Console.WriteLine(gapWatch.Elapsed);
             layout["status"].Update(
                      new Panel(
                          Align.Center(
                              new Markup($"origan:{origanWatch.Elapsed}gap: {gapWatch.Elapsed} "),

                              VerticalAlignment.Middle))
                          .Expand());

         });



}



Console.WriteLine("All text has been processed.");


Enter fullscreen mode Exit fullscreen mode

This code implements a dynamic text display system with variable delay, similar to a typewriter effect. Here's the summary:

  1. It processes text chunk by chunk with a dynamic delay between each chunk
  2. Key variables:

    • baseDelay: Starting delay (100ms)
    • minDelay: Minimum delay (0ms)
    • maxDelay: Maximum delay (20ms)
  3. The delay calculation:

    • Gets shorter as more text is displayed (count increases)
    • Uses a ratio (radio) to accelerate for longer remaining text
    • smooth_delay = (baseDelay - (progress * baseDelay / total)) / ratio
  4. Speed adjustments:

    • Speeds up when more than 100 characters remain
    • Maintains minimum and maximum delay boundaries
    • Updates progress info in UI panel

In simple terms: it starts slow and gradually speeds up as it approaches the end, ensuring smooth text display with adaptive pacing.

Top comments (1)

Collapse
 
rtom03 profile image
Tom

hey bro