DEV Community

Nicola Apicella
Nicola Apicella

Posted on • Updated on

Go for Java developers (or is the Java concurrency that bad?!)

I am by no means an expert in Go, indeed quite the opposite. I am currently trying to get familiar with it. I started getting familiar with the syntax, the memory and the concurrency model. As usual for me, I am trying to contrast it with something I already know, like Java.
So I stumbled in this interesting talk in which the great Sajma introduced the Go concurrency model with some examples. The slides for the talk and the examples are here. Not far in the talk, a question popped up: think about what it would take to implement the same thing in other languages like Java.
Is it really that hard? I was not that sure, I mean, Java does not have 'select' statement, neither it has built-in channels, but it should not be difficult to replicate the examples in Java or is it?
So I though I could have some fun implementing the examples in Java.

Go concurrency

Before getting to the example, this is a streamlined recap of the talk (by the way, it's a cool talk, so I really suggest you to watch it).

  1. Go concurrency
    • The concurrency model is based on Communication sequential Processes (Hoare, 1978)
    • Concurrent programs are structured as independent processes that execute sequentially and communicate by passing messages.
    • "Don't communicate by sharing memory, share memory by communicating"
    • Go primitives: go routines, channels and the select statement
  2. Go routines
    • It's a lightweight thread (it's not a thread)
    • Channel provide communication between go routines (analougous to synchronized queue in Java)
    • Select multiplex communication among go routines

The example

In the examples we have to build an hypothetical client which queries google services (web, image and video services). Ideally, we would like to query those services in parallel and aggregate the answers. All the code for the examples is in github.com/napicella/go-for-java-programmers.
So let's get started.

First example: querying Google search in parallel

This is how the go code looks like:

        func Google(query string) (results []Result) {
            c := make(chan Result)
            go func() { c <- Web(query) }()
            go func() { c <- Image(query) }()
            go func() { c <- Video(query) }()

            for i := 0; i < 3; i++ {
                result := <-c
                results = append(results, result)
            }
            return
        }
Enter fullscreen mode Exit fullscreen mode

What about Java? My solution involves using CompletableFuture like the following.

    public void google(String query) throws ExecutionException, InterruptedException {
        CompletableFuture<String>[] futures = new CompletableFuture[] {
            supplyAsync(() -> web(query)),
            supplyAsync(() -> image(query)),
            supplyAsync(() -> video(query))
        };

        List<String> result = new ArrayList<>();

        allOf(futures)
            .thenAccept((ignore) -> Arrays.stream(futures)
                                          .map(this::safeGet)
                                          .forEach(result::add)).get();
        // Note: calling get is necessary only because I want to print the result 
        // before returning from the function
        System.out.println(result);
    }

    protected String safeGet(Future<String> future) {
        try {
            return future.get();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }

        return "";
    }
Enter fullscreen mode Exit fullscreen mode

The web, image and video services are just mocks with random sleeps.
So, what's the difference between the java code and go one? The java code is a bit more verbose and the code does not use message passing between threads like in Go, besides that they look really similar.

Let's move to the second example.

Second example: timeout

What if we don't want to wait for slow servers? We can use a timeout!
The idea is to wait until either all the servers replies to our request or the timeout goes off.


func Google(query string) (results []Result) {
    c := make(chan Result, 3)
    go func() { c <- Web(query) }()
    go func() { c <- Image(query) }()
    go func() { c <- Video(query) }()

    timeout := time.After(80 * time.Millisecond)
    for i := 0; i < 3; i++ {
        select {
        case result := <-c:
            results = append(results, result)
        case <-timeout:
            fmt.Println("timed out")
            return
        }
    }
    return
Enter fullscreen mode Exit fullscreen mode

Let's see how that would look like in java:

    public void googleWithTimeout(String query) throws ExecutionException, InterruptedException {
        // This is the first difference with the go example, the result array must             
        // be a synchronized list.
        // Go channel are completely thread safe, so it's totally okay to funnel 
        // data from multiple go routines to an array.
        List<String> result = Collections.synchronizedList(new ArrayList<>());

        // this is not safe since it's possible that all the threads in the thread     
        // pool (default to ForkJoin) are busy, so the timer won't start
        CompletableFuture<Void> timeout = runAsync(() -> timeout(TIMEOUT_MILLIS));

        anyOf(
            allOf(runAsync(() -> result.add(web(query))),
                  runAsync(() -> result.add(image(query))),
                  runAsync(() -> result.add(video(query)))),
            timeout
        ).get();


        System.out.println(result);
    }

    protected Void timeout(int millis) {
        try {
            Thread.sleep(millis);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        return null;
    }
Enter fullscreen mode Exit fullscreen mode

In the Java example there is a substantial difference to the go one: the tasks share the result array, so for the java code to work, we need a synchronized array. On the other hand, Go channel are completely thread safe, so it's totally okay to funnel data from multiple go routines to an array.
As mentioned in the comment the use of timeout is not completely safe indeed it's possible that all the threads in the thread pool (default to ForkJoin) are busy so the timer won't start. We can obviously run a Thread with a different ExecutorService or just manually create a Thread and run it.

    protected CompletableFuture<Void> timeout(int millis) {
        ScheduledExecutorService executorService = Executors.newScheduledThreadPool(1);

        final CompletableFuture<Void> timeout = new CompletableFuture<>();
        executorService.schedule(() -> {
            timeout.complete(null);
        }, millis, TimeUnit.MILLISECONDS);

        return timeout;
    }
Enter fullscreen mode Exit fullscreen mode

Third example : Reduce tail latency using replicated search servers.

In go:

func Google(query string) (results []Result) {
    c := make(chan Result, 3)
    go func() { c <- First(query, Web1, Web2) }()
    go func() { c <- First(query, Image1, Image2) }()
    go func() { c <- First(query, Video1, Video2) }()
    timeout := time.After(80 * time.Millisecond)
    for i := 0; i < 3; i++ {
        select {
        case result := <-c:
            results = append(results, result)
        case <-timeout:
            fmt.Println("timed out")
            return
        }
    }
    return
Enter fullscreen mode Exit fullscreen mode

where the function First is defined as follow:

func First(query string, replicas ...Search) Result {
    c := make(chan Result, len(replicas))
    searchReplica := func(i int) { c <- replicas[i](query) }
    for i := range replicas {
        go searchReplica(i)
    }
    return <-c
}
Enter fullscreen mode Exit fullscreen mode

Let's see in Java

    public void googleWithReplicatedServers(String query) throws ExecutionException, InterruptedException {
        List<String> result = Collections.synchronizedList(new ArrayList<>());

        // Unfortunately this does not work as expected because the inner anyOf 
        // won't stop the other calls, so the result might end up having 
        // duplicates, i.e [some-image, some-image, some-video]
        anyOf(
            allOf(
                anyOf(runAsync(() -> result.add(web(query))), runAsync(() -> result.add(webReplica(query)))),
                anyOf(runAsync(() -> result.add(image(query))), runAsync(() -> result.add(imageReplica(query)))),
                anyOf(runAsync(() -> result.add(video(query))), runAsync(() -> result.add(videoReplica(query))))
            ),
            timeout(TIMEOUT_MILLIS)
        ).get();

        System.out.println(result);
    }
Enter fullscreen mode Exit fullscreen mode

Unfortunately the code does not quite work because when one of the future completes will add the result in the array but the execution of the other one will continue causing a duplicate in the result.
Let's correct that:

    // replicate servers and use the first response - fixing the problem mentioned 
    // earlier by using supplyAsync + thenAccept instead of runAsync
    public void googleWithReplicatedServers2(String query) throws ExecutionException, InterruptedException {
        List<String> result = Collections.synchronizedList(new ArrayList<>());

        anyOf(
            allOf(
                anyOf(supplyAsync(() -> web(query)),
                      supplyAsync(() -> webReplica(query))).thenAccept((s) -> result.add((String) s)),
                anyOf(supplyAsync(() -> image(query)),
                      supplyAsync(() -> imageReplica(query))).thenAccept((s) -> result.add((String) s)),
                anyOf(supplyAsync(() -> video(query)),
                      supplyAsync(() -> videoReplica(query))).thenAccept((s) -> result.add((String) s))
            ),
            timeout(TIMEOUT_MILLIS)
        ).get();

        System.out.println(result);
    }

    // same as above, but this time we use the function 'first', which is really 
    // just a wrapper around CompletableFuture.anyOf
    public void googleWithReplicatedServers3(String query) throws ExecutionException, InterruptedException {
        List<String> result = Collections.synchronizedList(new ArrayList<>());

        anyOf(
            allOf(
                first(query, Google::web, Google::webReplica).thenAccept((s) ->  result.add((String) s)),
                first(query, Google::image, Google::imageReplica).thenAccept((s) ->  result.add((String) s)),
                first(query, Google::video, Google::videoReplica).thenAccept((s) ->  result.add((String) s))
            ),
            timeout(TIMEOUT_MILLIS)
        ).get();

        System.out.println(result);
    }
Enter fullscreen mode Exit fullscreen mode

Conclusions

Besides the fact I had some fun with CompletableFuture, the clear advantage of Go is really the fact that the concurrency model is built in the language itself which simplifies the communication among different agents.
On the other side, I am not sure why they dropped OOP support, like classes for example. I mean, what's wrong with OOP?

Top comments (8)

Collapse
 
tobias_salzmann profile image
Tobias Salzmann • Edited

Nice comparison!

Having a sane built-in concurrency model is definitely a big plus for a language. That's definitely one of the things where go excels at.

I do like future-based async code, because it tends to be very clean and testable. Java's implementation however, is a big mess with very questionable design choices.

Here's a quick draft in scala, that intention wise is very close to your Java code, but using Scala's Future.sequence and Future.firstCompletedOf, avoiding low level concurrency completely.

def web(query: String): Future[String] = ???
def image(query: String): Future[String] = ???
def video(query: String): Future[String] = ???
def webReplicated(query: String): Future[String] = ???
def imageReplicated(query: String): Future[String] = ???
def videoReplicated(query: String): Future[String] = ???
def failAfter(timeOut: FiniteDuration): Future[Nothing] = ???


def google(query: String): Future[Seq[String]] = {
  Future.sequence(Seq(web(query), image(query), video(query)))
}

def googleWithTimeout(query: String, timeOut: FiniteDuration): Future[Seq[String]] = {
  Future.firstCompletedOf(Seq(
    Future.sequence(Seq(web(query), image(query), video(query))),
    failAfter(timeOut)
  ))
}

def googleWithTimeoutReplicated(query: String, timeOut: FiniteDuration): Future[Seq[String]] = {
  Future.firstCompletedOf(Seq(
    Future.sequence(Seq(
      Future.firstCompletedOf(Seq(web(query), webReplicated(query))),
      Future.firstCompletedOf(Seq(image(query), imageReplicated(query))),
      Future.firstCompletedOf(Seq(video(query), videoReplicated(query)))
    )),
    failAfter(timeOut)
  ))
}

If I were forced to implement this in Java, I would implement sequence (using allOf) and firstCompletedOf (using anyOf) instead of using concurrent datastructures.

Collapse
 
netvl profile image
Vladimir Matveev

I think you can write the third function in an even shorter way with Future.traverse instead of Future.sequence:

Future.firstCompletedOf(Seq(
  Future.traverse(Future.firstCompletedOf)(Seq(
    Seq(web(query), webReplicated(query)),
    Seq(image(query), imageReplicated(query)),
    Seq(video(query), videoReplicated(query))
  )),
  failAfter(timeOut)
))

I didn't expect this, but I discovered that Future.traverse is really handy in lots of situations.

Collapse
 
napicella profile image
Nicola Apicella

Thanks for writing down the Scala way.
I have not written any significant piece of code in Scala, but every time I stumble in something written in it, it looks just so elegant.
You made me think I should learn more about it :)

By the way, your scala code would make a great response post :)

Collapse
 
tterb profile image
Brett Stevenson • Edited

As someone who was introduced to programming using Java and a newfound interest in learning Go, this is an awesome introduction to the basic concepts of the language and has definitely inspired me to investigate further.
Down the rabbit hole I go!

Collapse
 
napicella profile image
Nicola Apicella

Thank you :)

Collapse
 
jervine791 profile image
john ervine

Just curious but whats more performant?

I also think the lines between concurrency and parallism are blurred here. Yes, the goroutines are allocated a thread of execution but CSP is synchronise and blocking by nature. You can only put something on the channel if the consumer is ready to accept it.

Collapse
 
Sloan, the sloth mascot
Comment deleted
Collapse
 
napicella profile image
Nicola Apicella

The post was not meant to describe alternative concurrency models, like the actor model you mentioned.
I had some fun writing concurrent code that we can write using the primitives included in java as version 8.
Like you said there are java libs which help to write concurrent code within the boundaries of those models, but those libs are not shipped together with the jdk.
Likewise, I am pretty sure there attempts to port the go concorrency model in java, for example cs.kent.ac.uk/projects/ofa/jcsp/ but again my post was not about that.

Anyway, your comment about reading (more) about akka or kotlin is valid, so I 'll take it as a constructive feedback, thanks!