DEV Community

Stefan  🚀
Stefan 🚀

Posted on • Originally published at wundergraph.com on

Effective Integration Testing for Distributed Systems: Mastering Cosmo Router with GraphQL Federation

Cosmo Router is a GraphQL Gateway that allows you to combine multiple GraphQL APIs into a unified Graph using GraphQL Federation. But that's really just the tip of the iceberg when it comes to the complexity of the system.

Are you looking for an Open Source Graph Manager? Cosmo is the most complete solution including Schema Registry, Router, Studio, Metrics, Analytics, Distributed Tracing, Breaking Change detection and more.

When we say "Federation", we mean that Cosmo Router integrates Subgraphs, which are GraphQL APIs that are managed by a separate team. Subgraphs can be written in any language, they can support Requests over HTTP and Subscriptions over WebSockets via SSE. Clients can make HTTP requests to the Cosmo Router, or initiate Subscriptions over WebSockets or SSE, with multiple protocols and transports supported. In addition, we've recently added support for Event-Driven Federated Subscriptions, which allows you to drive Subscriptions from events in your system.

As you can imagine, this is a lot of moving parts, and we need to make sure that everything works together seamlessly. But how do we efficiently test such a complex system?

In this article, we'll show you how we test the Cosmo Router using advanced techniques in GraphQL Federation, including the utilization of subgraphs and subscriptions, to ensure seamless end-to-end functionality, correctness and superior system performance.

What is GraphQL & GraphQL Federation?

GraphQL is a query language for APIs and a runtime for fulfilling those queries with your existing data. It provides a complete and understandable description of the data in your API, gives clients the power to ask for exactly what they need and nothing more, makes it easier to evolve APIs over time, and enables powerful developer tools.

GraphQL Federation is a set of specifications that allow you to build a single GraphQL API from multiple GraphQL APIs (Subgraphs). It allows you to build a single unified Graph that can be queried by clients using a single GraphQL endpoint, while each Subgraph can be managed by a separate team and written in any language.

What is Cosmo Router?

Cosmo Router is an Open Source GraphQL API Gateway that implements the GraphQL Federation specification. It handles client requests, plans the execution of those requests across multiple Subgraphs, and then executes those requests in parallel, combining the results into a single response.

What is Cosmo?

Cosmo is a complete GraphQL Platform that includes Cosmo Router, Cosmo CLI, and Cosmo Studio. Together, these tools allow you to build, deploy, and manage your Federated GraphQL APIs and Subgraphs. Cosmo is Open Source and available on GitHub. You can self-host Cosmo, e.g. if you have to comply with strict data privacy regulations, or you can use our Soc2 compliant SaaS offering, Cosmo Cloud.

Integration Testing challenges for GraphQL Federation

Integration testing is a type of testing that tests the integration between different components of a system. In our case, we want to test the integration between the Cosmo Router, the Subgraphs, Event-Driven Federated Subscriptions, and the various client protocols and transports.

Let's start by lising some of the criteria that is important to us when we set out to build our integration testing framework:

  • tests...

  • we need to be able to...

Integration Testing Cosmo Router, the Open Source GraphQL Federation Gateway

How to achieve fast and parallel end-to-end tests for Distributed Systems

Let's tackle the first few requirement to set the foundation for all other requirements. We want our tests to be fast, deterministic, reproducible, and not flaky at all. This might seem trivial, but it was quite a challenge to achieve this for a distributed system like Cosmo Router.

In addition, we want our tests to be easy to debug, which means that we would need to run all services in a single process.

We also learned that it's important to run our tests in isolation. We're using GraphQL Subscriptions, a mechanism that allows clients to subscribe to events in the system. If you're looking to test Subscriptions in a deterministic way, you will realize that you need to run these tests in isolation or they might interfere with each other.

As we wanted to make tests as easy to write as possible, we decided to create a test framework that abstracts away all the complexity of setting up an isolated environment, including Subgraphs, Event-Driven Federated Subscriptions using NATS, and the Cosmo Router itself.

Luckily, we can use gqlgen to implement our Subgraphs and run them in the same process as our tests. This means that we can debug into our Subgraphs if necessary. NATS is also written in Go, so we can use the testserver provided by NATS, as well as the NATS client, to define Event-Driven Federated Subscriptions tests. Lastly, Cosmo Router is also written in Go, so we can run everything together in a single process and debug all components if necessary.

Let's take a look at how a simple test looks like:

func TestIntegration(t *testing.T) {
    testenv.Run(t, &testenv.Config{}, func(t *testing.T, xEnv *testenv.Environment) {
        res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{
            Query: `query { employees { id } }`,
        })
        require.JSONEq(t, `{"data":{"employees":[{"id":1},{"id":2},{"id":3},{"id":4},{"id":5},{"id":7},{"id":8},{"id":9},{"id":10},{"id":11},{"id":12}]}}`, res.Body)
    })
}

Enter fullscreen mode Exit fullscreen mode

From within the top level test function, we call testenv.Run to start the test environment. All components, like the Subgraphs, NATS, and Cosmo Router, are started behind the scenes using the httptest package. We'll take a look at the testenv.Run function in a bit.

The testenv.Run function takes a testenv.Config struct as an argument, which allows us to configure the test environment. For this simple test, we don't need to configure anything, so we pass an empty struct.

In addition to the *testing.T argument, the testenv.Run function also provides us with an *testenv.Environment argument. This argument allows us to interact with the test environment and abstracts away redundant code, like making a GraphQL request and asserting that the response comes back with a 200 OK status code, like we do in the test above.

If anything goes wrong during the test, we're using t.Cleanup to make sure that all components are shut down and cleaned up properly.

Achieving parallelism with complex tests in Go

First, when you want to run tests in parallel in Go, you need to call t.Parallel() at the beginning of your test function, like in this example:

func TestAnonymousQuery(t *testing.T) {
    t.Parallel()
    testenv.Run(t, &testenv.Config{}, func(t *testing.T, xEnv *testenv.Environment) {
        res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{
            Query: `{ employees { id } }`,
        })
        require.JSONEq(t, `{"data":{"employees":[{"id":1},{"id":2},{"id":3},{"id":4},{"id":5},{"id":7},{"id":8},{"id":9},{"id":10},{"id":11},{"id":12}]}}`, res.Body)
    })
}

Enter fullscreen mode Exit fullscreen mode

The Go testing framework runs all tests in parallel that call t.Parallel(). All other tests are run sequentially.

But to achieve parallelism without flakiness, we need to ensure that we're not accidentally trying to use the same port twice. We need to get two free ports per test, one for the Router and one for NATS. All Subgraphs are running using the httptest package, which automatically assigns a free port.

Initially, getting two free ports in parallel tests was sometimes flaky, so we had to implement a simple solution to prevent multiple tests running in parallel from using the same port.

This is how the testenv package internally starts the test environment:


var (
    envCreateMux sync.Mutex
)

func createTestEnv(t testing.TB, cfg *Config) (*Environment, error) {

    // Ensure that only one test environment is created at a time
    // We use freeport to get a free port for NATS and the Router
    // If we don't lock here, two parallel tests might get the same port
    envCreateMux.Lock()
    defer envCreateMux.Unlock()

    ctx, cancel := context.WithCancelCause(context.Background())

    natsPort, err := freeport.GetFreePort()
    if err != nil {
        t.Fatalf("could not get free port: %s", err)
    }

    opts := natsserver.Options{
        Host: "localhost",
        Port: natsPort,
        NoLog: true,
        NoSigs: true,
    }

    ns := natstest.RunServer(&opts)
    if ns == nil {
        t.Fatalf("could not start NATS test server")
    }

  // ...

Enter fullscreen mode Exit fullscreen mode

All tests run in the same process, so a simple mutex is enough to ensure that we're only creating one test environment at a time. It's not a big slowdown, but it ensures that freeport.GetFreePort() doesn't return the same port twice.

Let's take a look at how we're starting the Subgraphs:

employees := &Subgraph{
        handler: subgraphs.EmployeesHandler(subgraphOptions(t, ns)),
        middleware: cfg.Subgraphs.Employees.Middleware,
        globalMiddleware: cfg.Subgraphs.GlobalMiddleware,
        globalCounter: counters.Global,
        localCounter: counters.Employees,
        globalDelay: cfg.Subgraphs.GlobalDelay,
        localDelay: cfg.Subgraphs.Employees.Delay,
}

employeesServer := httptest.NewServer(employees)

Enter fullscreen mode Exit fullscreen mode

The package subgraphs contains all the implementations of our Subgraphs. It's using gqlgen to generate the Subgraph code from a GraphQL schema. We import the Subgraph Handlers and pass them to the Subgraph struct. In addition, you can see that we're passing quite a few options to the Subgraph struct, like middleware, counters, and delays.

Let's take a look at the implementation of the Subgraph struct to see how we're using these options:

type Subgraph struct {
    handler http.Handler
    globalMiddleware func(http.Handler) http.Handler
    middleware func(http.Handler) http.Handler

    globalDelay time.Duration
    localDelay time.Duration

    globalCounter *atomic.Int64
    localCounter *atomic.Int64
}

func (s *Subgraph) ServeHTTP(w http.ResponseWriter, r *http.Request) {
    s.globalCounter.Inc() // increment global counter for all requests
    s.localCounter.Inc() // increment local counter for this subgraph

    if s.globalDelay > 0 {
        time.Sleep(s.globalDelay) // enforce global delay (e.g. to simulate network latency) across all subgraphs
    }
    if s.localDelay > 0 {
        time.Sleep(s.localDelay) // enforce local delay (e.g. to simulate network latency) for this subgraph
    }

  // run the request through a global middleware (for all subgraphs) or a local middleware (only for this subgraph)
  // if no middleware is configured, we just run the request through the handler

    if s.globalMiddleware != nil {
        s.globalMiddleware(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
            if s.middleware != nil {
                s.middleware(s.handler).ServeHTTP(w, r)
                return
            }
            s.handler.ServeHTTP(w, r)
        })).ServeHTTP(w, r)
        return
    }
    if s.middleware != nil {
        s.middleware(s.handler).ServeHTTP(w, r)
        return
    }
    s.handler.ServeHTTP(w, r)
}

Enter fullscreen mode Exit fullscreen mode

As you can see, we've got quite a bit of flexibility here, which is absolutely necessary to test all the different scenarios that we want to test. E.g. we might want to test when a Subgraph has some artificial latency, or when a Subgraph returns an error.

The counters, global and local, are used to assert that the correct Subgraphs are called for a given request. The Cosmo Router has a feature called Singleflight / Origin Request Deduplication, which ensures that the exact same request is only sent to a Subgraph once at a time. Using the counters, we can assert that there are no duplicate requests.

Now that you understand the foundation of our integration testing framework, let's take a look at some of the more complex tests that we're running and how they're implemented.

End-to-end testing of Event-Driven Federated GraphQL Subscriptions, deterministic and reproducible

Event-Driven Federated Subscriptions (EDFS) allow you to drive Subscriptions from events in your system. This is a very powerful feature, but it's also quite complex to test.

The first challenge is to ensure that the test is deterministic and reproducible. You'll see that this is actually quite hard to achieve due to the nature of Subscriptions, or WebSockets more specifically.

Let's take a look at the simplest test that we can write for Subscriptions. As the code is quite complex, I'll add inline comments and some explanations below:

t.Run("subscribe async", func(t *testing.T) {
  t.Parallel()
  testenv.Run(t, &testenv.Config{}, func(t *testing.T, xEnv *testenv.Environment) {

    // just a struct to create the Subscription
    var subscription struct {
      employeeUpdated struct {
        ID float64 `graphql:"id"`
        Details struct {
          Forename string `graphql:"forename"`
          Surname string `graphql:"surname"`
        } `graphql:"details"`
      } `graphql:"employeeUpdated(employeeID: 3)"`
    }

    // we want to use an external GraphQL client to subscribe to the Subscription
    // so we get the subscription URL from the test environment
    surl := xEnv.GraphQLSubscriptionURL()
    // init the GraphQL client
    client := graphql.NewSubscriptionClient(surl)
    t.Cleanup(func() {
      _ = client.Close()
    })

    // we want to wait for two messages to be sent to the client
    // so we create a WaitGroup with a counter of 2
    wg := &sync.WaitGroup{}
    wg.Add(2)

    subscriptionID, err := client.Subscribe(&subscription, nil, func(dataValue []byte, errValue error) error {
      require.NoError(t, errValue)
      require.JSONEq(t, `{"employeeUpdated":{"id":3,"details":{"forename":"Stefan","surname":"Avram"}}}`, string(dataValue))
      defer wg.Done()
      return nil
    })
    require.NoError(t, err)
    require.NotEqual(t, "", subscriptionID)

    go func() {
      err := client.Run()
      require.NoError(t, err)
    }()

    go func() {
      wg.Wait()
      err = client.Unsubscribe(subscriptionID)
      require.NoError(t, err)
      err := client.Close()
      require.NoError(t, err)
    }()

    // this is where the test environment shows its power
    // we can wait until one Subscription is fully registered and ready to receive messages from NATS
    xEnv.WaitForSubscriptionCount(1, time.Second*5)

    // Send a mutation to trigger the subscription

    // Make a GraphQL request to update the availability of an employee
    // Internally, this will send a message to NATS
    res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{
      Query: `mutation { updateAvailability(employeeID: 3, isAvailable: true) { id } }`,
    })
    require.JSONEq(t, `{"data":{"updateAvailability":{"id":3}}}`, res.Body)

    // Trigger the subscription directly via NATS
    // For convenience, we're adding a nats client (NC) to the test environment
    // This allows us to send messages to NATS directly
    err = xEnv.NC.Publish("employeeUpdated.3", []byte(`{"id":3,"__typename": "Employee"}`))
    require.NoError(t, err)

    err = xEnv.NC.Flush()
    require.NoError(t, err)

    // wait until the server has sent two messages to the client
    // the client will then trigger the wait group and unsubscribe from the subscription
    xEnv.WaitForMessagesSent(2, time.Second*10)
    // as we expect the client to unsubscribe, we can await until the subscription count is 0
    xEnv.WaitForSubscriptionCount(0, time.Second*10)
    // in addition, we can await until the client has closed the connection
    xEnv.WaitForConnectionCount(0, time.Second*10)
    // we could have just ended the test before,
    // but we wanted to ensure that Subscriptions and Connections are cleaned up properly
  })
})

Enter fullscreen mode Exit fullscreen mode

There are a few things to note here:

Due to the asynchronous nature of Subscriptions, it's almost impossible to write a deterministic test. You can establish a connection, wait 5 seconds, and "hope" that the Subscription is ready to receive messages, but that's not a reliable way to test Subscriptions. What if it takes 1 second on you local machine, but 6 seconds on GitHub Actions? In the first case, you will waste 4 seconds, in the second case, your test will fail, but only in the CI environment.

To solve this problem, we've added an important feature to the Router. We count the number of active client connections and the number of active Subscriptions using atomic counters. In addition, we added an "event-bus" to the test environment, which allows us to wait for certain events to happen. In this case, we're waiting for events like ConnectionCount and SubscriptionCount to reach a certain value.

Furthermore, we've embedded a NATS client into the test environment, making it easy to send messages to NATS directly.

Once we switched to this approach, we were able to write deterministic and reproducible tests for Subscriptions, which previously was quite a challenge. We usually had to re-run the tests multiple times to make sure they succeed.

Testing the Cosmo Router with advanced request tracing (ART)

One very powerful feature of the Cosmo Router is the ability to get a detailed view of the execution of a GraphQL request. For more info, check out the ART Documentation.

There are a couple of problems we needed to solve to test ART:

The trace contains a lot of information that is not deterministic, like timestamps, timings, generated UUIDs, etc. We needed to find a way to make the trace deterministic, so that we can compare it to a static trace.

Let's take a look at the test:

func TestTracing(t *testing.T) {
    t.Parallel()
    testenv.Run(t, &testenv.Config{
        ModifyEngineExecutionConfiguration: func(cfg *config.EngineExecutionConfiguration) {
            cfg.EnableRequestTracing = true
        },
    }, func(t *testing.T, xEnv *testenv.Environment) {
        res, err := xEnv.MakeGraphQLRequest(testenv.GraphQLRequest{
            Query: bigEmployeesQuery,
            Header: http.Header{
                "X-WG-Trace": []string{"true", "enable_predictable_debug_timings"},
            },
        })
        require.NoError(t, err)
        require.Equal(t, http.StatusOK, res.Response.StatusCode)
        tracingJsonBytes, err := os.ReadFile("testdata/tracing.json")
        require.NoError(t, err)
        // we generate a random port for the test server, so we need to replace the port in the tracing json
        rex, err := regexp.Compile(`http://127.0.0.1:\d+/graphql`)
        require.NoError(t, err)
        tracingJson := string(rex.ReplaceAll(tracingJsonBytes, []byte("http://localhost/graphql")))
        resultBody := rex.ReplaceAllString(res.Body, "http://localhost/graphql")
        // all nodes have UUIDs, so we need to replace them with a static UUID
        rex2, err := regexp.Compile(`"id":"[a-f0-9\-]{36}"`)
        require.NoError(t, err)
        tracingJson = rex2.ReplaceAllString(tracingJson, `"id":"00000000-0000-0000-0000-000000000000"`)
        resultBody = rex2.ReplaceAllString(resultBody, `"id":"00000000-0000-0000-0000-000000000000"`)
        require.Equal(t, prettifyJSON(t, tracingJson), prettifyJSON(t, resultBody))
        if t.Failed() {
            t.Log(resultBody)
        }
        // make the request again, but with "enable_predictable_debug_timings" disabled
        // compare the result and ensure that the timings are different
        res2, err := xEnv.MakeGraphQLRequest(testenv.GraphQLRequest{
            Query: bigEmployeesQuery,
            Header: http.Header{
                "X-WG-Trace": []string{"true"},
            },
        })
        require.NoError(t, err)
        require.Equal(t, http.StatusOK, res2.Response.StatusCode)
        body := []byte(res2.Body)
        data, _, _, err := jsonparser.Get(body, "data")
        require.NoError(t, err)
        require.NotNilf(t, data, "data should not be nil: %s", body)
        tracing, _, _, err := jsonparser.Get(body, "extensions", "trace")
        require.NoError(t, err)
        require.NotNilf(t, tracing, "tracing should not be nil: %s", body)
        require.NotEqual(t, prettifyJSON(t, tracingJson), prettifyJSON(t, string(body)))
    })
}

Enter fullscreen mode Exit fullscreen mode

First, we need to enable ART in the test environment. We can do this by passing a function to the ModifyEngineExecutionConfiguration field of the testenv.Config struct.

Next, we need to make sure that the trace is deterministic. For this, we've added a special "mode" to ART called enable_predictable_debug_timings, which ensures that all timings are deterministic, as we're not using the real time, but a fake time that is incremented for each step in the trace.

In addition, you might remember that we wanted to run all tests in parallel. This means that we spin up isolated test-Subgraphs for each test. As the trace contains the URL of the Subgraph, we need to replace the port in the trace with a static port, so that we can compare the trace to the static trace that we've stored in a file.

Lastly, we need to make sure that the UUIDs in the trace are static. We're using a regular expression to replace all UUIDs with a static UUID.

Now that we've got a deterministic trace, we can compare it to the static trace that we've stored in a file.

However, you might be thinking that this prevents us from testing "real" time measurements. That's true, but we can do something about it. We can run the same test again, but this time without the enable_predictable_debug_timings mode. This will give us a trace with real timings. We can then compare the two traces and ensure that the timings are different.

Testing the Cosmo Router with Subgraph Errors

Next, we want to test what happens when a Subgraph returns errors. There are different types of errors that might occur:

  • A GraphQL Server Error for a root request, which is a valid GraphQL response, but with errors
  • A GraphQL Server Error for a nested request, which is a valid GraphQL response, but with errors
  • A HTTP Error, which is an invalid GraphQL response, but with a valid HTTP status code
  • No Response

Let's start with a test where the Origin Subgraph doesn't return a response at all:

func TestWithOriginErrors(t *testing.T) {
    testenv.Run(t, &testenv.Config{
        Subgraphs: testenv.SubgraphsConfig{
            Employees: testenv.SubgraphConfig{
                CloseOnStart: true,
            },
        },
    }, func(t *testing.T, xEnv *testenv.Environment) {
        res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{
            Query: `{ employees { id details { forename surname } notes } }`,
        })
        require.Equal(t, `{"errors":[{"message":"Failed to fetch from Subgraph '0' at path 'query'."}],"data":null}`, res.Body)
    })
}

Enter fullscreen mode Exit fullscreen mode

We can achieve this by passing a custom SubgraphsConfig to the testenv.Config struct. We can set the CloseOnStart field to true to make sure that the Subgraph is closed immediately after it has been started. We need to start the Subgraph in order to build a valid Cosmo Router configuration, but using this flag, we can immediately close it again to simulate a Subgraph that doesn't respond.

As you can see in the assertions, we expect a GraphQL Server Error with a message that indicates that the Subgraph didn't respond. We were not able to resolve any data, so we return null for the data.

What if the same problem occurs for a nested request?

func TestPartialOriginErrors(t *testing.T) {
    testenv.Run(t, &testenv.Config{
        Subgraphs: testenv.SubgraphsConfig{
            Products: testenv.SubgraphConfig{
                CloseOnStart: true,
            },
        },
    }, func(t *testing.T, xEnv *testenv.Environment) {
        res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{
            Query: `{ employees { id details { forename surname } notes } }`,
        })
        require.Equal(t, `{"errors":[{"message":"Failed to fetch from Subgraph '3' at path 'query.employees.@'."}],"data":{"employees":[{"id":1,"details":{"forename":"Jens","surname":"Neuse"},"notes":null},{"id":2,"details":{"forename":"Dustin","surname":"Deus"},"notes":null},{"id":3,"details":{"forename":"Stefan","surname":"Avram"},"notes":null},{"id":4,"details":{"forename":"Björn","surname":"Schwenzer"},"notes":null},{"id":5,"details":{"forename":"Sergiy","surname":"Petrunin"},"notes":null},{"id":7,"details":{"forename":"Suvij","surname":"Surya"},"notes":null},{"id":8,"details":{"forename":"Nithin","surname":"Kumar"},"notes":null},{"id":9,"details":{"forename":"Alberto","surname":"Garcia Hierro"},"notes":null},{"id":10,"details":{"forename":"Eelco","surname":"Wiersma"},"notes":null},{"id":11,"details":{"forename":"Alexandra","surname":"Neuse"},"notes":null},{"id":12,"details":{"forename":"David","surname":"Stutt"},"notes":null}]}}`, res.Body)
    })
}

Enter fullscreen mode Exit fullscreen mode

We're using the same technique here, but this time we're closing the Products Subgraph, which is responsible for the notes field of the Employee type. In the response, you can see that we're expecting an error, but also a partial response with the data that we were able to resolve, and null for the notes field.

Let's say we'd like to test what happens when one of the Subgraphs returns a HTTP error:

func TestPartialOriginErrors500(t *testing.T) {
    testenv.Run(t, &testenv.Config{
        Subgraphs: testenv.SubgraphsConfig{
            Products: testenv.SubgraphConfig{
                Middleware: func(handler http.Handler) http.Handler {
                    return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
                        w.WriteHeader(http.StatusInternalServerError)
                    })
                },
            },
        },
    }, func(t *testing.T, xEnv *testenv.Environment) {
        res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{
            Query: `{ employees { id details { forename surname } notes } }`,
        })
        require.Equal(t, `{"errors":[{"message":"Failed to fetch from Subgraph '3' at path 'query.employees.@'."}],"data":{"employees":[{"id":1,"details":{"forename":"Jens","surname":"Neuse"},"notes":null},{"id":2,"details":{"forename":"Dustin","surname":"Deus"},"notes":null},{"id":3,"details":{"forename":"Stefan","surname":"Avram"},"notes":null},{"id":4,"details":{"forename":"Björn","surname":"Schwenzer"},"notes":null},{"id":5,"details":{"forename":"Sergiy","surname":"Petrunin"},"notes":null},{"id":7,"details":{"forename":"Suvij","surname":"Surya"},"notes":null},{"id":8,"details":{"forename":"Nithin","surname":"Kumar"},"notes":null},{"id":9,"details":{"forename":"Alberto","surname":"Garcia Hierro"},"notes":null},{"id":10,"details":{"forename":"Eelco","surname":"Wiersma"},"notes":null},{"id":11,"details":{"forename":"Alexandra","surname":"Neuse"},"notes":null},{"id":12,"details":{"forename":"David","surname":"Stutt"},"notes":null}]}}`, res.Body)
    })
}

Enter fullscreen mode Exit fullscreen mode

We can achieve this by passing a custom SubgraphConfig to the testenv.Config struct, which allows us to pass a custom middleware to the Subgraph. In this middleware, we're simply returning a 500 Internal Server Error for all requests. We explicitly don't call the handler function, so the Subgraph won't return a valid GraphQL response.

If you asked yourself earlier why we added the middleware feature to the Subgraph struct, this is the reason. For flexible testing, we need to be able to override the default behavior of the Subgraph.

Testing Singleflight / Origin Request Deduplication

Another important feature of the Cosmo Router is Singleflight / Origin Request Deduplication. This feature ensures that the exact same request is only sent to a Subgraph once at a time. If you have a lot of traffic, this can save you a lot of resources, reducing the load on your Subgraphs, and improving the overall performance of your system.

But how do we know that the Router is actually deduplicating origin requests? What might be simple for a REST API is a bit more complex for GraphQL APIs, as we need to make sure that the exact same request is sent to a Subgraph.

Let's take a look at the test:

func TestSingleFlight(t *testing.T) {
    testenv.Run(t, &testenv.Config{
        Subgraphs: testenv.SubgraphsConfig{
            GlobalDelay: time.Millisecond * 100,
        },
    }, func(t *testing.T, xEnv *testenv.Environment) {
        var (
            numOfOperations = 10
            wg sync.WaitGroup
        )
        wg.Add(numOfOperations)
        trigger := make(chan struct{})
        for i := 0; i < numOfOperations; i++ {
            go func() {
                defer wg.Done()
                <-trigger
                res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{
                    Query: `{ employees { id } }`,
                })
                require.Equal(t, `{"data":{"employees":[{"id":1},{"id":2},{"id":3},{"id":4},{"id":5},{"id":7},{"id":8},{"id":9},{"id":10},{"id":11},{"id":12}]}}`, res.Body)
            }()
        }
        close(trigger)
        wg.Wait()
        // We expect that the number of requests is less than the number of operations
        require.NotEqual(t, int64(numOfOperations), xEnv.SubgraphRequestCount.Global.Load())
    })
}

Enter fullscreen mode Exit fullscreen mode

In this test, we're using the GlobalDelay option to add some artificial latency to all Subgraphs. Without this delay, the test would be too fast to reliably test Singleflight. Next, we're creating 10 goroutines (threads) that will all make the same request to the Router. As goroutines are executed concurrently and randomly, we need to make sure that all goroutines are ready to make the request, so we're using a semaphore pattern to synchronize the goroutines.

After all goroutines have made the request, we're asserting that the number of requests is less than the number of operations. This is a simple way to assert that the Router is deduplicating origin requests.

However, there's a catch with Singleflight. We must not deduplicate requests that are Mutations. If we did, we would break the Mutations, as they would only be executed once.

Let's take a look at how we're testing this:

func TestSingleFlightMutations(t *testing.T) {
    testenv.Run(t, &testenv.Config{
        Subgraphs: testenv.SubgraphsConfig{
            GlobalDelay: time.Millisecond * 100,
        },
    }, func(t *testing.T, xEnv *testenv.Environment) {
        var (
            numOfOperations = 10
            wg sync.WaitGroup
        )
        wg.Add(numOfOperations)
        trigger := make(chan struct{})
        for i := 0; i < numOfOperations; i++ {
            go func() {
                defer wg.Done()
                <-trigger
                res := xEnv.MakeGraphQLRequestOK(testenv.GraphQLRequest{
                    Query: `mutation { updateEmployeeTag(id: 1, tag: "test") { id tag } }`,
                })
                require.Equal(t, `{"data":{"updateEmployeeTag":{"id":1,"tag":"test"}}}`, res.Body)
            }()
        }
        close(trigger)
        wg.Wait()
        // We expect that the number of requests is less than the number of operations
        require.Equal(t, int64(numOfOperations), xEnv.SubgraphRequestCount.Global.Load())
    })
}

Enter fullscreen mode Exit fullscreen mode

This test is very similar to the previous test, but this time we're making a Mutation request. As you can see in the assertion, we expect that the number of requests is equal to the number of operations.

Key takeaways for testing distributed systems

In this article, we've taken a look at how we're testing the Cosmo Router. Let's summarize the key takeaways on how to test a distributed system like Cosmo Router, Federation, Subgraphs and an Event Source:

  1. You should be able to start and clean up all components of the system in a short amount of time

Being able to run tests fast is crucial for a good developer experience. If you have to wait 5 minutes for your tests to run, you will be less likely to run them, and you're less likely to add new tests, which will lead to a less stable system.

We want to build a system that is stable and reliable, so we need to make sure that we can run all tests in a short amount of time.

  1. We need to be able to modify / mock / override the behavior of all components

In order to test all the different scenarios that we want to test, we need to be able to modify the behavior of all components, especially the Subgraphs.

  1. Our system needs to be event driven and easy to debug

Being able to debug both the Router and the Subgraphs is crucial for a good developer experience and developer productivity. We've had scenarious where something wasn't exactly working as expected in the Subgraphs, but the problem was easy to debug as we were able to set breakpoints in the Subgraph code.

Implementing the Router as an event-driven system is another huge enabler for testability. Thanks to the event-bus in the test environment, we can wait for certain events to happen, which allows us to write deterministic and reproducible tests.

  1. Our tests need to be deterministic and reproducible

Which brings us to the final point. We need to be able to write deterministic and reproducible tests. If tests are flaky in CI, we're more likely to ignore them, write less tests, or even disable them. Overall, this will lead to a less stable system and drive down developer productivity, as we will spend more time manually testing and debugging problems.

Conclusion

This article gave you an overview of how we're testing a distributed system like the Cosmo Router. We've taken a look at the test environment that we've built to test the Router, and we've taken a look at some of the more complex tests that we're running.

I hope that this article gave you some ideas on how to test your own distributed systems, how to improve the developer experience of your own tests, and how to make your tests more deterministic and reproducible.

For us, testing is a crucial part of our development process. Velocity is everything to us, but not at the cost of quality. Building the test framework for the Cosmo Router was a huge investment, but we're confident that it will pay off in the long run. We're not aiming for short-term gains, but rather for long-term stability and reliability.

If you're interested in learning more about the Cosmo Router, check out the code on GitHub. You can find the integration tests in the router-tests package.

Top comments (0)