DEV Community

Cover image for .NET 5 Named Pipes
Aleksander Parchomenko
Aleksander Parchomenko

Posted on

.NET 5 Named Pipes

Intro

Named pipes provide interprocess communication between a pipe server and one or more pipe clients. They offer more functionality than anonymous pipes, which provide interprocess communication on a local computer. Named pipes support full duplex communication over a network and multiple server instances, message-based communication, and client impersonation, which enables connecting processes to use their own set of permissions on remote servers.

Use cases

When we can use it?

  • Desktop applications: for example to notify GUI application from batch service worker (e.g. Windows Service). It is much faster than setup self-hosted API on GUI and request it via HTTP
  • You can consider use NamedPipes for MVP that can be developed and scaled in a future: you not need to install queues or event stream for asynchronous communication between processes.
  • Instead of WCF on one machine or over local network for duplex interprocess communication. Since .NET Core and .NET 6 do not offer server-side support for hosting WCF (actually it is recommended to migrate WCF to gRPC or use CoreWCF) NamedPipes can be used for communication.

Code

Lest send command from one app to another. In example below Web API app sends message to console application (It can be any app).
First, in our server (console app) we start PipeServer and setup it for listening for messages:

internal static class Program
{
    static void Main(string[] args)
    {
        var pipeServer = new PipeServer("TestPipe");
        pipeServer.Start();
        pipeServer.MessageReceivedEvent += (sender, args) => ReceiveMessage(sender, args);            
        Console.ReadKey();
    }

    private static void ReceiveMessage(object sender, MessageReceivedEventArgs args)
    {
        var message = JsonSerializer.Deserialize<Note>(args.Message);
        if (message is not null)
        {
            Console.WriteLine(args.Message);
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

And a client application (for simplicity it is done in ApiController):

[ApiController]
[Route("notes")]
public class NotesController : ControllerBase
{
    private readonly IPipeClient _pipeClient;

    public NotesController(IPipeClient pipeClient)
    {
        _pipeClient = pipeClient;
        _pipeClient.Start();            
    }

    [HttpGet]
    public async Task<IEnumerable<Note>> Get()
    {
        var note = new Note { Id = 1, Title = "Title", Value = "Value" };
        await _pipeClient.SendMessage(JsonSerializer.Serialize(note));

        // Do not stop for future messages
        //_pipeClient.Stop();

        return await Task.FromResult(new List<Note> { note });
    }        
}
Enter fullscreen mode Exit fullscreen mode

and IPipeClient is registered in dependency injection container:

services.AddSingleton<IPipeClient>(factory => new PipeClient("TestPipe"));
Enter fullscreen mode Exit fullscreen mode

with TestPipe name. It is important that your server also created with TestPipe name.

Here we simply can send message (or more precisely, command) to another process or thread. This command is serialized on client and deserialized on the server. Pipe client and server here act as transport layer without binding to certain object types - developer is responsible to properly serialize and deserialize messages that defined in contracts assembly that is shared between applications.

How it works

NamedPipeServerStream and NamedPipeClientStream are wrapped in
IPipeServer and IPipeClient that expose methods for sending messages and events fired when messages are received on the PipeServer. Here is the client code:

public class PipeClient : IPipeClient
{
    private readonly NamedPipeClientStream _pipeClient;

    public PipeClient(string serverId)
    {
        _pipeClient = new NamedPipeClientStream(".", serverId, PipeDirection.InOut, PipeOptions.Asynchronous);
    }

    #region ICommunicationClient implementation

    /// <summary>
    /// Starts the client. Connects to the server.
    /// </summary>
    public void Start()
    {
        // 5 minutes
        const int tryConnectTimeout = 5 * 60 * 1000;
        if (!_pipeClient.IsConnected)
        {
            _pipeClient.Connect(tryConnectTimeout);
        }
    }

    /// <summary>
    /// Stops the client. Waits for pipe drain, closes and disposes it.
    /// </summary>
    public void Stop()
    {
        try
        {
            _pipeClient.WaitForPipeDrain();
        }
        finally
        {
            _pipeClient.Close();
            _pipeClient.Dispose();
        }
    }

    /// <summary>
    /// Sends string message to the server
    /// </summary>
    /// <param name="message">Message to be sent</param>
    /// <returns>Async result</returns>
    public Task<TaskResult> SendMessage(string message)
    {
        var taskCompletionSource = new TaskCompletionSource<TaskResult>();

        if (_pipeClient.IsConnected)
        {
            var buffer = Encoding.UTF8.GetBytes(message);
            _pipeClient.BeginWrite(buffer, 0, buffer.Length, asyncResult =>
            {
                try
                {
                    taskCompletionSource.SetResult(EndWriteCallBack(asyncResult));
                }
                catch (Exception ex)
                {
                    taskCompletionSource.SetException(ex);
                }
            }, null);
        }
        else
        {
            throw new IOException("pipe is not connected");
        }

        return taskCompletionSource.Task;
    }

    #endregion

    #region private methods

    /// <summary>
    /// This callback is called when the BeginWrite operation is completed.
    /// It can be called whether the connection is valid or not.
    /// </summary>
    /// <param name="asyncResult">Async Operation result</param>
    /// <returns>Task result</returns>
    private TaskResult EndWriteCallBack(IAsyncResult asyncResult)
    {
        _pipeClient.EndWrite(asyncResult);
        _pipeClient.Flush();

        return new TaskResult { IsSuccess = true };
    }

    #endregion
}
Enter fullscreen mode Exit fullscreen mode

The server code:

public class PipeServer : IPipeServer
{
    private const int MaxNumberOfServerInstances = 10;
    private readonly string _pipeName;
    private readonly SynchronizationContext _synchronizationContext;
    private readonly IDictionary<string, IPipeServer> _servers;

    public PipeServer(string serverName)
    {
        _pipeName = serverName;
        _synchronizationContext = AsyncOperationManager.SynchronizationContext;
        _servers = new ConcurrentDictionary<string, IPipeServer>();
    }

    #region Events

    public event EventHandler<MessageReceivedEventArgs> MessageReceivedEvent;

    public event EventHandler<ClientConnectedEventArgs> ClientConnectedEvent;

    public event EventHandler<ClientDisconnectedEventArgs> ClientDisconnectedEvent;

    #endregion

    #region IPipeServer implementation

    public string ServerId => _pipeName;

    public void Start()
    {
        StartNamedPipeServer();
    }

    public void Stop()
    {
        foreach (var server in _servers.Values)
        {
            try
            {
                UnregisterFromServerEvents(server);
                server.Stop();
            }
            catch (Exception)
            {
                //Logger.Error("Fialed to stop server");
            }
        }

        _servers.Clear();
    }

    #endregion

    #region Private methods

    /// <summary>
    /// Starts a new NamedPipeServerStream that waits for connection
    /// </summary>
    private void StartNamedPipeServer()
    {
        var server = new InternalPipeServer(_pipeName, MaxNumberOfServerInstances);
        _servers[server.Id] = server;

        server.ClientConnectedEvent += ClientConnectedHandler;
        server.ClientDisconnectedEvent += ClientDisconnectedHandler;
        server.MessageReceivedEvent += MessageReceivedHandler;

        server.Start();
    }

    /// <summary>
    /// Stops the server that belongs to the given id
    /// </summary>
    /// <param name="id">Server ID</param>
    private void StopNamedPipeServer(string id)
    {
        UnregisterFromServerEvents(_servers[id]);
        _servers[id].Stop();
        _servers.Remove(id);
    }

    /// <summary>
    /// Unregisters from the given server's events
    /// </summary>
    /// <param name="server">Server</param>
    private void UnregisterFromServerEvents(IPipeServer server)
    {
        server.ClientConnectedEvent -= ClientConnectedHandler;
        server.ClientDisconnectedEvent -= ClientDisconnectedHandler;
        server.MessageReceivedEvent -= MessageReceivedHandler;
    }

    /// <summary>
    /// Fires MessageReceivedEvent in the current thread
    /// </summary>
    /// <param name="eventArgs">Message event Args</param>
    private void OnMessageReceived(MessageReceivedEventArgs eventArgs)
    {
        _synchronizationContext.Post(e => MessageReceivedEvent.SafeInvoke(this, (MessageReceivedEventArgs)e), eventArgs);
    }

    /// <summary>
    /// Fires ClientConnectedEvent in the current thread
    /// </summary>
    /// <param name="eventArgs">Client connected event Args</param>
    private void OnClientConnected(ClientConnectedEventArgs eventArgs)
    {
        _synchronizationContext.Post(e => ClientConnectedEvent.SafeInvoke(this, (ClientConnectedEventArgs)e), eventArgs);
    }

    /// <summary>
    /// Fires ClientDisconnectedEvent in the current thread
    /// </summary>
    /// <param name="eventArgs">Client disconnected event Args</param>
    private void OnClientDisconnected(ClientDisconnectedEventArgs eventArgs)
    {
        _synchronizationContext.Post(e => ClientDisconnectedEvent.SafeInvoke(this, (ClientDisconnectedEventArgs)e), eventArgs);
    }

    /// <summary>
    /// Handles a client connection. Fires the relevant event and prepares for new connection.
    /// </summary>
    /// <param name="sender">Event sender</param>
    /// <param name="eventArgs">Client connected event Args</param>
    private void ClientConnectedHandler(object sender, ClientConnectedEventArgs eventArgs)
    {
        OnClientConnected(eventArgs);

        // Create a additional server as a preparation for new connection
        StartNamedPipeServer();
    }

    /// <summary>
    /// Hanldes a client disconnection. Fires the relevant event ans removes its server from the pool
    /// </summary>
    /// <param name="sender">Event sender</param>
    /// <param name="eventArgs">Client disconnected event Args</param>
    private void ClientDisconnectedHandler(object sender, ClientDisconnectedEventArgs eventArgs)
    {
        OnClientDisconnected(eventArgs);

        StopNamedPipeServer(eventArgs.ClientId);
    }

    /// <summary>
    /// Handles a message that is received from the client. Fires the relevant event.
    /// </summary>
    /// <param name="sender">Event sender</param>
    /// <param name="eventArgs">Message received event Args</param>
    private void MessageReceivedHandler(object sender, MessageReceivedEventArgs eventArgs)
    {
        OnMessageReceived(eventArgs);
    }

    #endregion
}
Enter fullscreen mode Exit fullscreen mode

InternalPipeServer wraps all the staff of reading from buffers, manage thread and propagates events to the PipeServer

Conclusion

Using wrapper for NamedPipes it is easy to develop async communication between different apps. Especially it is useful for desktop apps but also can be considered as asynchronous channel for MVP apps that planned to be cloud-native in future and later can be easy adopted for queues or event streams.

Full code available on GitHub:

GitHub logo alekshura / Compentio.Pipes

.NET 5 Named Pipes interprocess communication library

Compentio.Pipes

Provides interprocess communication between a pipe server and one or more pipe clients.

When can be used

  • Desktop applications: for example to notify GUI application from batch service worker (e.g. Windows Service). It is much faster than setup self-hosted API on GUI and request it via HTTP
  • You can consider use NamedPipes for MVP that can be developed and scaled in a future: you not need to install queues or event stream for asynchronous communication between processes.
  • Instead of WCF on one machine or over local network for duplex interprocess communication. Since .NET Core and .NET 6 do not offer server-side support for hosting WCF (actually it is recommended to migrate WCF to gRPC or use CoreWCF) NamedPipes can be used for communication.

How to use

Just setup pipe cient and server in your apps and send command to server In example below Web API app sends message…

Discussion (0)