DEV Community

Cover image for BlockingQueue and ExecutorService
Sergio Garcia Moratilla for Playtomic

Posted on • Edited on

BlockingQueue and ExecutorService

This is a quick and dirty post, but I have promised to publish everything I research at Playtomic.

We were having a discussion about how to limit how many tasks an ExecutorService can enqueue. We were trying to
control how much memory a service can handle to avoid out of memory exceptions. This service accepts messages from a Kafka topic
and from an API. Those operations end in the same internal logic, which is threaded.

There is a kind of Queue, BlockingQueue, that can wait until a spot in the queue is free. It would seem that using an ExecutionService
with a BlockingQueue would wait when submitting a task until that queue is not full. But it is not, the ExecutionService rejects the task.

You know that hours of trial and error can save you hours of reading the manual. I'm proud to say that I have read the manual first this time.

This test shows what happens:

public class BlockingQueueExecutorServiceTest {

    @Test
    public void submitTest() {
        // Worst case scenario: accept only 1 thread in the queue.
        int nThreads = 1;


        ExecutorService exService = new ThreadPoolExecutor(
            nThreads,
            nThreads,
            0L,
            TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<>(nThreads));


        // Full this with tasks
        for (int i = 0; i < 10000; ++i) {
            WaitingTask t = new WaitingTask(i);
            exService.submit(t);

        }
    }

    private static class WaitingTask implements Runnable {

        int index;
        public WaitingTask(int index) {
            this.index = index;
        }

        @Override
        public void run() {

            try {
                log.info("Running task {}", index);
                Thread.sleep(1000);
            } catch (InterruptedException e) {

            }
        }
    }
}
Enter fullscreen mode Exit fullscreen mode
java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@550dbc7a[Not completed, task = java.util.concurrent.Executors$RunnableAdapter@4dbb42b7[Wrapped task = com.playtomic.anemone.matchmaker.service.BlockingQueueExecutorServiceTest$WaitingTask@66f57048]] rejected from java.util.concurrent.ThreadPoolExecutor@21282ed8[Running, pool size = 1, active threads = 1, queued tasks = 1, completed tasks = 0]

    at java.base/java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2055)
    at java.base/java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:825)
    at java.base/java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1355)
    at java.base/java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:118)
    at com.playtomic.anemone.matchmaker.service.BlockingQueueExecutorServiceTest.submitTest(BlockingQueueExecutorServiceTest.java:28)
... more boring stacktrace
Enter fullscreen mode Exit fullscreen mode

If you want to wait until queue is not full, you have to provide a RejectedExecutionHandler which does that. For example, Spring's CallerBlocksPolicy.

public class BlockingQueueExecutorServiceTest {

    @Test
    public void submitTest() {
        // Worst case scenario: accept only 1 thread in the queue.
        int nThreads = 1;

        CallerBlocksPolicy policy = new CallerBlocksPolicy(10000); // 10secs
        ExecutorService exService = new ThreadPoolExecutor(
            nThreads,
            nThreads,
            0L,
            TimeUnit.MILLISECONDS,
            new LinkedBlockingQueue<>(nThreads), 
            policy);


        // Full this with tasks
        for (int i = 0; i < 10000; ++i) {
            WaitingTask t = new WaitingTask(i);
            exService.submit(t);

        }
    }

    private static class WaitingTask implements Runnable {

        int index;
        public WaitingTask(int index) {
            this.index = index;
        }

        @Override
        public void run() {

            try {
                log.info("Running task {}", index);
                Thread.sleep(1000);
            } catch (InterruptedException e) {

            }
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

And this time we get:

12:21:32.409 [pool-1-thread-1] INFO com.playtomic.anemone.matchmaker.service.BlockingQueueExecutorServiceTest - Running task 0
12:21:32.422 [main] DEBUG org.springframework.integration.util.CallerBlocksPolicy - Attempting to queue task execution for 10000 milliseconds
12:21:33.420 [pool-1-thread-1] INFO com.playtomic.anemone.matchmaker.service.BlockingQueueExecutorServiceTest - Running task 1
12:21:33.420 [main] DEBUG org.springframework.integration.util.CallerBlocksPolicy - Task execution queued
12:21:33.421 [main] DEBUG org.springframework.integration.util.CallerBlocksPolicy - Attempting to queue task execution for 10000 milliseconds
12:21:34.423 [pool-1-thread-1] INFO com.playtomic.anemone.matchmaker.service.BlockingQueueExecutorServiceTest - Running task 2
Enter fullscreen mode Exit fullscreen mode

Header: https://unsplash.com/photos/Kj2SaNHG-hg

Top comments (0)