DEV Community

Dash One
Dash One

Posted on

Iteration - a Stream Generator for Recursive Minds

The Iterators

For anyone having been in programming a couple of years, chances are you've run into an interview question or an assignment like iterator for binary tree.

There are variations like for the "pre-order", "post-order" or "in-order" traversal; or sometimes you got a n-ary tree instead of a bnary tree.

A reason this kind of questions are used in interviews is because they require some meticulous bookkeeping, and there are loads of edge cases to bite you if you aren't careful.

Fine, it's 2024 and we have Stream now. How about having some fun and creating an infinite stream to generate the Fibonacci sequence?

The one thing in common? They are all boringly painful to write and read. Playing with them for the interview is cool; but having to read them in real life is no fun.

Recursion is easy

Imagine you are writing in-order traversal the most naive way:

void traverseInOrder(Tree tree) {
  if (tree.left != null) traverseInOrder(tree.left);
  System.out.println(tree.value);
  if (tree.right != null) traverseInOrder(tree.right);
}
Enter fullscreen mode Exit fullscreen mode
void fibonacci(int a, int b) {
  System.out.println(a);
  fibonacci(b, a + b);
}

fibonacci(0, 1);
Enter fullscreen mode Exit fullscreen mode

Except the hardcoding of System.out.println(), and never mind the stack overflow caused by the infinite recursion, at least it's easy, right?

Why can't we have some meat - scratch that - why can't we keep the simplicity and just fix the hardcoding and the stack overflow?

Design the API

This is my favorite part: to dream about what I would really really want, if everything just magically works.

Let's see... I don't want the hardcoding, so maybe this?

void traverseInOrder(Tree tree) {
  if (tree.left != null) traverseInOrder(tree.left);
  generate(tree.value);
  if (tree.right != null) traverseInOrder(tree.right);
}
Enter fullscreen mode Exit fullscreen mode
void fibonacci(int a, int b) {
  generate(a);
  fibonacci(b, a + b);
}

fibonacci(0, 1);
Enter fullscreen mode Exit fullscreen mode

The imagined generate() call will put the elements in the final stream.

It doesn't solve the stack overflow though.

The very nature of an infinite stream means that it has to be lazy. The caller could decide to use .limit(50) to only print the first 50 numbers and the recursive call shouldn't progress beyond the first 50 numbers.

So I need something that delays the recursion call. The idiomatic way in Java to model laziness is to wrap it in a callback interface. So let's create one:

interface Continuation {
  void evaluate();
}
Enter fullscreen mode Exit fullscreen mode

And then I need a method similar to the generate() call but accepts a Continuation. The syntax should look like:

void traverseInOrder(Tree tree) {
  if (tree.left != null) {
    yield(() -> traverseInOrder(tree.left));
  }
  generate(tree.value);
  if (tree.right != null) {
    yield(() -> traverseInOrder(tree.right));
  }
}
Enter fullscreen mode Exit fullscreen mode

I stole the yield keyword from other languages like C# with native generator support (it means to yield control to the runtime and also yield the evaluation result into the stream). Recent Java versions have made it a reserved word so you'll likely need to call it using this.yield() instead.

By wrapping the recursive call in a lambda passed to yield(), I should get the effect of lazy evaluation. The implementation's job will be to ensure the right order between the directly generated elements and the lazily generated ones.

In summary, the draft API we have come up with so far:

class Iteration<T> {
  void generate(T element);
  void yield(Continuation continuation);

  /** Turns the Iteration into a stream */
  Stream<T> iterate();
}
Enter fullscreen mode Exit fullscreen mode

In case it wasn't obvious, we need the iterate() method to package it all up as a lazy stream.

The final syntax we want:

class InOrderTraversal<T> extends Iteration<T> {
  InOrderTraversal<T> traverseInOrder(Tree tree) {
    if (tree.left != null) {
      this.yield(() -> traverseInOrder(tree.left));
    }
    generate(tree.value);
    if (tree.right != null) {
      this.yield(() -> traverseInOrder(tree.right));
    }
    return this;
  }
}
Stream<T> stream =
    new InOrderTraversal<T>().traverseInOrder(tree).iterate();
Enter fullscreen mode Exit fullscreen mode
class Fibonacci extends Iteration<Integer> {
  Fibonacci startFrom(int a, int b) {
    generate(a);
    this.yield(() -> startFrom(b, a + b));
    return this;
  }
}

Stream<Integer> stream =
    new Fibonacci().startFrom(0, 1).iterate();
Enter fullscreen mode Exit fullscreen mode

Making it

It's nice to have a dream once in a while. We'll be able to use such API to turn many recursive algorithms trivially into lazy streams.

But aside from polishing up an API surface I like, there is nothing in concrete. How do I make it actually work?

First of all, if the thing only needs to handle generate(), it'd be a pointlessly trivial wrapper around a Queue<T>.

But we need to handle yield() with its lazy evaluation. Imagine we are effectively running this sequence:

generate(1);
generate(2);
yield(() -> {  // line 3
  generate(3);
  generate(4);
});
generate(5); // line 7
Enter fullscreen mode Exit fullscreen mode

At line 3, can we enqueue the Continuation into the same queue before moving on to line 7?

If I do so, after line 7, the queue will look like [1, 2, Continuation, 5].

Now if we call iterate() and start to consume elements from the stream, 1 and 2 will be popped out, and then the Continuation object, with the number 5 still remaining in the queue.

Once the Continuation object is consumed, it needs to be evaluated, which will in turn generate 3 and 4. The question is where to put the two numbers?

We can't keep enqueueing them after the number 5 because it'll be out of order; we can't treat the queue as a stack and push them in FILO order because then we'll get [4, 3, 5].

Stack or Queue?

There are several ways to go about this.

One possibility is to create a stack of queues, where each time a Continuation is to be evaluated, push a new queue onto the stack. The stream will always consume elements from the top queue of the stack.

The downside is that you might end up creating and discarding many instances of ArrayDeque, which can be wasteful.

With some micro-optimization in mind, another approach is to use the two-stacks-as-a-queue trick.

The are two stacks: the inbox for writing and outbox for reading. When either generate() or yield() is called, we push into the
inbox stack; when the stream tries to consume, it flushes everything out of inbox into the outbox and then consumes one-by-one from the outbox.

To put it in context, upon seeing [Continuation, 5] in the outbox, the Continuation is evaluated, which puts [4, 3] in the inbox stack.

On the other side, the stream tries to consume. It pops and pushes [4, 3] onto the outbox stack, resulting in [3, 4, 5].

Implmentation-wise, we get to allocate no more than two ArrayDeques.

public class Iteration<T> {
  private final Deque<Object> outbox = new ArrayDeque<>();
  private final Deque<Object> inbox = new ArrayDeque<>(8);

  public final <T> void generate(T element) {
    if (element instanceof Continuation) {
      throw new IllegalArgumentException("Do not stream Continuation objects");
    }
    inbox.push(element);
  }

  public final void yield(Continuation continuation) {
    inbox.push(continuation);
  }

  private T consumeNextOrNull() {
    for (; ; ) {
      Object top = poll();
      if (top instanceof Continuation) {
        ((Continuation) top).evaluate();
      } else {
        return (T) top;
      }
    }
  }

  private Object poll() {
    Object top = inbox.poll();
    if (top == null) {  // nothing in inbox
      return outbox.poll();
    }
    // flush inbox -> outbox
    for (Object second = inbox.poll();
         second != null;
         second = inbox.poll()) {
      outbox.push(top);
      top = second;
    }
    return top;
  }
}
Enter fullscreen mode Exit fullscreen mode

The poll() private method does the "flush inbox into outbox" logic mentioned above.

The consumeNextOrNull() method consumes the next element from the two-stack-queue, and evaluates Continuation when it sees one.

Wrap it all up

If you are with me so far, all we are missing is the iterate() method that wraps it all up as a Stream.

I'll cheat by just using Mug's whileNotNull() convenience method. But it's not hard to create your own by implementing a Spliterator.

public Stream<T> iterate() {
  return MoreStreams.whileNotNull(this::consumeNextOrNull);
}
Enter fullscreen mode Exit fullscreen mode

And with that, our little generic recursive stream generator is complete.

Use it for real

Before we call it the day, let's see if we can use it to solve a more realistic problem.

Say, you are calling a ListAssets API that supports pagination. Request and response definitions are:

ListAssetsRequest {
  string userId;
  int page_size;
  string page_token;  // start from this page
}

ListAssetsResponse {
  List<Asset> assets;
  string next_page_token;  // empty if no more
}
Enter fullscreen mode Exit fullscreen mode

If we were to naively fetch all pages and print them, it'll be as simple as sending request over and over again until the response.next_page_token is empty:

void showAllAssets(String userId) {
  ListAssetsRequest.Builder request =
      ListAssetsRequest.newBuilder().setUserId(userId);
  for (; ;) {
    var response = assetsApi.listAssets(request.build());
    for (Asset asset : response.getAssets()) {
      System.out.println(asset);
    }
    if (response.getNextPageToken().isEmpty()) {
      return;  // no more pages
    }
    request.setPageToken(response.getNextPageToken());
  }
}
Enter fullscreen mode Exit fullscreen mode

But we can do better! Let's wrap it up as a lazy Stream<Asset> to give callers more flexibility. For example they can consume any number of assets and stop when needed without over-fetching pages unnecessarily.

Stream<Asset> listAssets(String userId) {
  class Pagination extends Iteration<ListAssetsResponse> {
    Pagination startFrom(String pageToken) {
      var response = assetsApi.listAssets(
          ListAssetsRequest.newBuilder()
              .setUserId(userId)
              .setPageSize(100)
              .setPageToken(pageToken)
              .build());
      generate(response);
      if (!response.getNextPageToken().isEmpty()) {
        this.yield(() -> startFrom(response.getNextPageToken()));
      }
      return this;
    }
  }
  return new Pagination().startFrom("").iterate()
      .flatMap(response -> response.getAssets().stream());
}
Enter fullscreen mode Exit fullscreen mode

I trust you can read it alright, my friend. :)

Top comments (0)