Introduction
Recently at work, I have been working a lot with gRPC and Golang, and I felt enjoy using gRPC as an application interface. It is simple to implement and works well.. Anyway about 2 weeks ago while reading about various technologies related to my work. I came across Cap'n Proto and was interested to learn that it was created by one of the primary authors of Protocol Buffers v2. Since I was already using Protocol Buffers with gRPC at work, I decided to try out Cap'n Proto + Cap'n Proto RPC with Rust.
What is Cap'n Proto?
Cap'n Proto is a incredibly fast data serialization format. If you want to know more about Cap'n Proto in general, please visit their website!. If you are familiar with Protocol buffers, then a majority of the following concepts will be familiar to you.
My Goal
My goal here is to create a simple API that accepts a list of parameters, and a source directory then return a job ID, or accepts a job ID and returns the results. See the following diagrams.
NOTE: The example built in this article will be split into the following files.
main.rs
,server.rs
,client.rs
,build.rs
,service.capnp
Cap'n Proto Interface Definition
With a goal in mind, I set out and defined my service.capnp
as follows:
@0xeb1bbbd418f18514;
interface Job {
register @0 Request -> RegistryItem;
getResult @1 RegistryItem -> Result;
struct Request {
parms @0 :List(Text);
source @1 :Text;
}
struct Result {
union {
link @0 :Text;
error @1 :Text;
}
}
struct RegistryItem {
# Single Job Identity
id @0 :Text;
}
}
NOTE: You can write your methods in a more verbose format, however this is not recommended as you will have to
pry!
multiple times in your server implementation.
Right away you can see we have defined a Job
interface with two methods register
and getResults
, and three data structures Request
, Result
, and RegistryItem
.
To actually use the defined RPC interface, we will need to generate Rust code from the above service.capnp
file.
Generate the Code
In order to generate the code, you will need to create a build.rs
file as follows.
extern crate capnpc;
fn main() {
::capnpc::CompilerCommand::new().file("service.capnp").run().unwrap();
}
This will generate the RPC code from the service.capnp
file defined in the previous section, it roughly translates into the following:
-
interface Job {...}
- Creates
pub mod job
for client/server communications
- Creates
-
register @0 Request -> RegistryItem
- Adds
register
method tojob::Server
trait - Implements
register_request
for thejob::Client
struct to generating a Cap'nProto Request object for the register method. - Creates type
job::RegisterParams
which controls access to incoming data on the server side. - creates type
job::RegisterResults
which controls access to outgoing data on the server side.
- Adds
-
getResult @1 RegistryItem -> Result
- Adds
get_result
method tojob::Server
trait - Implements
get_result_request
for thejob::Client
struct to generating a Cap'nProto Request object for the get_result method. - Creates type
job::GetResultParams
which controls access to incoming data on the server side. - creates type
job::GetResultResults
which controls access to outgoing data on the server side.
- Adds
-
struct Request {...}
- Creates a
pub mod request
(job::request) submodule - Creates type
job::request::Reader
containing getters for each capnp parameter - Creates type
job::request::Builder
containing setters for each capnp parameter
- Creates a
-
struct Result {...}
- Creates a
pub mod result
(job::result) submodule - Creates type
job::result::Reader
containing getters for each capnp parameter - Creates type
job::result::Builder
containing setters for each capnp parameter
- Creates a
-
struct RegistryItem {...}
- Creates a
pub mod registry_item
(job::registry_item) submodule - Creates type
job::registry_item::Reader
containing getters for each capnp parameter - Creates type
job::registry_item::Builder
containing setters for each capnp parameter
- Creates a
Importing the Generated Code
To use the code we just generated, we must import into our source as follows.
use crate::service_capnp::{job};
To build our service we still need to create an RPC client and server.
Creating the Server
The requires implementing 2 things.
- Create the local implementation of your interface
- Create your interface methods
Implement the local interface
First things first, we need to create a local struct
upon which we can implement our interface methods. This is easy.
struct Job;
Once this is finished we get to the interesting part. The implementation of our methods.
Implement the methods
Cap'nProto methods will always follow the below signature.
impl <imported_rpc_interface>::Server for <local_struct> {
fn <rpc_method_name>(&mut self, params: <imported_rpc_interface>::<sentence_case_rpc_method_name>Params, mut results: <imported_rpc_interface>::<sentence_case_rpc_method_name>Results)
-> Promise<(), capnp::Error> {
// Do Something
}
}
or in our case, the following..
impl job::Server for Job {
fn register(&mut self, params: job::RegisterParams, mut results: job::RegisterResults)
-> Promise<(), capnp::Error> {
// Do Something
}
fn get_result(&mut self, params: job::GetResultParams, mut results: job::GetResultResults)
-> Promise<(), capnp::Error> {
// Do Something
}
}
For the purposes of this article the business logic for the methods is scaled back to printing out received values, and sending back static responses.
impl job::Server for Job {
fn register(&mut self, params: job::RegisterParams, mut results: job::RegisterResults)
-> Promise<(), capnp::Error> {
println!("Processing Register")
// get a reader object for the sent request
let request_reader = pry!(params.get());
// get the value for the sent parms parameter
let parms = request_reader.get_parms().unwrap();
// get the value for the sent source parameter
let source = request_reader.get_source().unwrap();
for item in parms.iter() {
// print each parms value
println!("Parameter: {}", item);
}
// print the source value
println!("DataSource: {}", source);
// set return value for id
results.get().set_id("01E3S4Q9SN3VHVXB2KCAGD9P62");
Promise::ok(())
}
fn get_result(&mut self, params: job::GetResultParams, mut results: job::GetResultResults)
-> Promise<(), capnp::Error> {
println!("Processing Get Result");
// get a reader object for the sent request
let request_reader = pry!(params.get());
// get the send ID
let id = request_reader.get_id();
// print the sent ID
println!("ID: {}", id);
// set return value for link
results.get().set_link("https://link.to.your/output.csv");
Promise::ok(())
}
}
NOTE:
pry!
is a macro from thecapnp_rpc
crate that acts like try!(), but for functions that return a Promise rather than a Result.
We wrap it all together and build our server::main()
with the following.
use capnp::capability::Promise;
use capnp_rpc::{pry, RpcSystem};
use capnp_rpc::twoparty::{VatNetwork};
use capnp_rpc::rpc_twoparty_capnp::{Side};
use async_std::net::{TcpListener};
use futures::{AsyncReadExt, FutureExt, StreamExt};
use futures::task::LocalSpawn;
use crate::service_capnp::{job};
struct Job;
impl job::Server for Job {
fn register(&mut self, params: job::RegisterParams, mut results: job::RegisterResults)
-> Promise<(), capnp::Error> {
println!("Processing Register")
// get a reader object for the sent request
let request_reader = pry!(params.get());
// get the value for the sent parms parameter
let parms = request_reader.get_parms().unwrap();
// get the value for the sent source parameter
let source = request_reader.get_source().unwrap();
for item in parms.iter() {
// print each parms value
println!("Parameter: {}", item);
}
// print the source value
println!("DataSource: {}", source);
// set return value for id
results.get().set_id("01E3S4Q9SN3VHVXB2KCAGD9P62");
Promise::ok(())
}
fn get_result(&mut self, params: job::GetResultParams, mut results: job::GetResultResults)
-> Promise<(), capnp::Error> {
println!("Processing Get Result");
// get a reader object for the sent request
let request_reader = pry!(params.get());
// get the send ID
let id = request_reader.get_id();
// print the sent ID
println!("ID: {}", id);
// set return value for link
results.get().set_link("https://link.to.your/output.csv");
Promise::ok(())
}
}
pub fn main(arg_addr: &String) {
use std::net::ToSocketAddrs;
// parse input address..
let addr = arg_addr.to_socket_addrs().unwrap().next().expect("Invalid Address");
let mut exec = futures::executor::LocalPool::new();
let spawner = exec.spawner();
// Start Server
let _server: Result<(), Box<dyn std::error::Error>> = exec.run_until(async move {
// Initialize TCP listener @ addr
let listener = TcpListener::bind(&addr).await?;
// Initialize local interface defined above
let job_client = job::ToClient::new(Job).into_client::<capnp_rpc::Server>();
// Initialize and execute async handler
let mut incoming = listener.incoming();
let handle_incoming = async move {
while let Some(socket_result) = incoming.next().await {
let sock = socket_result?;
println!("Accepted connection from {:?}", sock.peer_addr());
sock.set_nodelay(true)?;
let (reader, writer) = sock.split();
let network = VatNetwork::new(reader, writer, Side::Server, Default::default());
let rpc_system = RpcSystem::new(Box::new(network), Some(job_client.clone().client));
spawner.spawn_local_obj(Box::pin(rpc_system.map(|_| ())).into())?;
}
Ok::<(), Box<dyn std::error::Error>>(())
};
handle_incoming.await?;
Ok(())
});
}
Creating the Client
Creating the client only requires creating the main loop for sending our requests as follows.
use capnp_rpc::RpcSystem;
use capnp_rpc::twoparty::VatNetwork;
use capnp_rpc::rpc_twoparty_capnp::Side;
use async_std::net::TcpStream;
use futures::{AsyncReadExt, FutureExt};
use futures::task::LocalSpawn;
use crate::service_capnp::{job};
pub fn main(arg_addr: &String) {
use std::net::ToSocketAddrs;
// Resolve input server address
let addr = arg_addr.to_socket_addrs().unwrap().next().expect("Invalid Address");
// Create a pool for executing async requests
let mut exec = futures::executor::LocalPool::new();
let spawner = exec.spawner();
// Execute the client
let _client: Result<(), Box<dyn std::error::Error>> = exec.run_until(async move {
// Create RPC connection to the server
let stream = TcpStream::connect(&addr).await?;
stream.set_nodelay(true)?;
let (reader, writer) = stream.split();
let network = Box::new(
VatNetwork::new(reader, writer, Side::Client, Default::default())
);
let mut rpc_system = RpcSystem::new(network, None);
// Setup the client interface
let job_client: job::Client = rpc_system.bootstrap(Side::Server);
spawner.spawn_local_obj(Box::pin(rpc_system.map(|_|())).into())?;
{
println!("Sending Register Job Request...");
// Create a register request object
let mut request = job_client.register_request();
// Set parms - list parameters require initialization
let mut parms_setter = request.get().init_parms(2);
parms_setter.set(0, "dark");
parms_setter.set(1, "world");
// Set source - strings do not require prior initialization
request.get().set_source("/link/to/the/path");
// Send request, and await response
let response = request.send().promise.await?;
println!("Received JobID: {}", response.get().unwrap().get_id().unwrap())
}
{
println!("Sending Get Result Request...");
// Create get_result request object
let mut request = job_client.get_result_request();
// Set JobID
request.get().set_id("01E3S4Q9SN3VHVXB2KCAGD9P62");
// Send request, and await response
let response = request.send().promise.await?;
// The result is of the union type, so we must determine which type it is
match response.get()?.which()? {
job::result::Link(t) => {
println!("Received Link: {}", t?);
}
job::result::Error(t) => {
println!("Received Error: {}", t?);
}
}
}
Ok(())
});
}
Creating The main()
For the purposes of this article, both the client and server will be part of the same binary. As such, we have a dedicated main.rs
which looks as follows.
// Include capnp generated code
pub mod source_capnp {
include!(concat!(env!("OUT_DIR"), "/source_capnp.rs"));
}
pub mod client;
pub mod server;
fn main() {
let args: Vec<String> = ::std::env::args().collect();
if args.len() >= 2 {
match &args[1][..] {
"client" => return client::main(&args[2]),
"server" => return server::main(&args[2]),
_ => ()
}
}
println!("usage: {} [client | server] ADDRESS", args[0]);
}
Running the server
We will run our server on localhost:2020
, and should receive the following output (pre-client)
16:46:22 🖎 export-rust master* ± cargo run server localhost:2020
Compiling export-rust v0.1.0 (/home/procyclinsur/test/export-rust)
Finished dev [unoptimized + debuginfo] target(s) in 2.90s
Running `target/debug/export-rust server 'localhost:2020'`
Running the client
Similarly the client will run against the server running at localhost:2020
. When the client is run however there should be output in both the server terminal and the client terminal as follows.
Client
16:50:53 🖎 export-rust master* ± cargo run client localhost:2020
Finished dev [unoptimized + debuginfo] target(s) in 0.04s
Running `target/debug/export-rust client 'localhost:2020'`
Sending Register Job Request...
Received JobID: 01E3S4Q9SN3VHVXB2KCAGD9P62
Sending Get Result Request...
https://link.to.your/output.csv
Server
16:46:22 🖎 export-rust master* ± cargo run server localhost:2020
Compiling export-rust v0.1.0 (/home/procyclinsur/test/export-rust)
Finished dev [unoptimized + debuginfo] target(s) in 2.90s
Running `target/debug/export-rust server 'localhost:2020'`
Accepted connection from Ok(V6([::1]:36374))
Processing Register
Parameter: dark
Parameter: world
DataSource: /link/to/the/path
Processing Get Result
ID: 01E3S4Q9SN3VHVXB2KCAGD9P62
As you can see, we are able to send and receive data via our API powered by Cap'n Proto RPC. If you love it, hate it, have any questions, or suggestions for improvement please let me know!
Top comments (1)
Thanks for this! Great resource.
Can I ask what you use to make your sequence diagrams? I really like the aesthetic :)