As we already discussed in the past, asynchronous programming brings many pros in developing reactive and responsive applications. However, it also carries cons and challenges, one of the main ones is the backpressure problem.
What is backpressure?
In physics
it is a resistance or force opposing the desired flow of fluid through pipes (wikipedia)
We can translate the problem on a known scenario: persistence of messages polled from a bus, where there are a huge amount of messages on a bus that our application is polling really fast, but the database underneath is really slow.
How can be funnel overflow be avoided?
In a synchronous scenario, there's no backpressure's issue, the sync nature of the computation blocks the polling from the bus until the current message is processed.
But, in the async world, the polling is executed without clues on what's happening on the database. So if the database can't handle all the messages coming from the bus, the messages will remain "in between", which means in the memory of our service.
This can lead to failures or, at worst, to service fault.
Let's try to develop an application that persists messages in a database, and make it evolve to handle backpressure
Automatic polling
At first, our verticle will do those operations:
- initialize the JDBC client
- initialize the Kafka client
- subscribe to the topic
- persist the records
The code is quite simple, and it works well with small amounts of messages.
When the load gets bigger and bigger a problem appears: using the handler of Vertx Kafka consumer means that there's no control on the message ratio, so it will poll continuously without considering the persistance rate, causing memory overload.
public class MainVerticle extends AbstractVerticle {
@Override
public void start(Promise<Void> startPromise) throws Exception {
JDBCClient jdbc = JDBCClient.create(vertx, datasourceConfiguration());
KafkaConsumer
.<String, String>create(vertx, kafkaConsumerConfiguration())
.subscribe("topic.name", startPromise)
.handler(record -> {
persist(jdbc, record)
.onSuccess(result -> System.out.println("Message persisted"))
.onFailure(cause -> System.err.println("Message not persisted " + cause));
});
}
private Map<String, String> kafkaConsumerConfiguration() {
final Map<String, String> config = new HashMap<>();
config.put(BOOTSTRAP_SERVERS_CONFIG, "kafkahost:9092");
config.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
config.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
return config;
}
private Future<UpdateResult> persist(JDBCClient jdbc, KafkaConsumerRecord<String, String> record) {
Promise<UpdateResult> promise = Promise.promise();
JsonArray params = toParams(record);
jdbc.updateWithParams("insert or update query to persist record", params, promise);
return promise.future();
}
private JsonObject datasourceConfiguration() {
// TODO datasource configuration
return null;
}
private JsonArray toParams(KafkaConsumerRecord<String, String> record) {
// TODO: convert the record into params for the sql command
return null;
}
}
Explicit polling
To handle the backpressure, explicit polling shall be used, and this can be done by avoiding the kafka consumer's handler setting and by calling poll
manually (in the following case, every 100ms).
By using this approach, it can be made so that every poll gets performed only when the batch of previously polled messages are persisted.
This behaviour can be achieved by handling every message's persist
future and collecting all of them with the CompositeFuture.all
, that will succeed only when all the messages are completed, and only in this case the next polling can be made.
If at least one of the future fails, everything will fail, and the polling will stop.
There are various solutions that can be adopted to make the service handle the failure, e.g. sending the message to a Dead Letter Queue, but we will not cover this case.
The problem with this code is that if a message fails, we will lose it, because the consumer is set to auto-commit, so, it's vertx that commits the topic offset.
public class MainVerticle extends AbstractVerticle {
@Override
public void start(Promise<Void> startPromise) throws Exception {
JDBCClient jdbc = JDBCClient.create(vertx, datasourceConfiguration());
KafkaConsumer<String, String> consumer = KafkaConsumer
.<String, String>create(vertx, kafkaConsumerConfiguration())
.subscribe("topic.name", startPromise);
<span class="n">poll</span><span class="o">(</span><span class="n">jdbc</span><span class="o">,</span> <span class="n">consumer</span><span class="o">);</span>
}
private void poll(JDBCClient jdbc, KafkaConsumer<String, String> consumer) {
Promise<KafkaConsumerRecords<String, String>> pollPromise = Promise.promise();
consumer.poll(100, pollPromise);
<span class="n">pollPromise</span><span class="o">.</span><span class="na">future</span><span class="o">()</span>
<span class="o">.</span><span class="na">compose</span><span class="o">(</span><span class="n">records</span> <span class="o">-></span> <span class="o">{</span>
<span class="nc">List</span><span class="o"><</span><span class="nc">Future</span><span class="o"><</span><span class="nc">UpdateResult</span><span class="o">>></span> <span class="n">futures</span> <span class="o">=</span> <span class="nc">IntStream</span><span class="o">.</span><span class="na">range</span><span class="o">(</span><span class="mi">0</span><span class="o">,</span> <span class="n">records</span><span class="o">.</span><span class="na">size</span><span class="o">())</span>
<span class="o">.</span><span class="na">mapToObj</span><span class="o">(</span><span class="nl">records:</span><span class="o">:</span><span class="n">recordAt</span><span class="o">)</span>
<span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="n">record</span> <span class="o">-></span> <span class="n">persist</span><span class="o">(</span><span class="n">jdbc</span><span class="o">,</span> <span class="n">record</span><span class="o">))</span>
<span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="n">toList</span><span class="o">());</span>
<span class="k">return</span> <span class="nc">CompositeFuture</span><span class="o">.</span><span class="na">all</span><span class="o">(</span><span class="k">new</span> <span class="nc">ArrayList</span><span class="o"><>(</span><span class="n">futures</span><span class="o">));</span>
<span class="o">})</span>
<span class="o">.</span><span class="na">onSuccess</span><span class="o">(</span><span class="n">composite</span> <span class="o">-></span> <span class="o">{</span>
<span class="nc">System</span><span class="o">.</span><span class="na">out</span><span class="o">.</span><span class="na">println</span><span class="o">(</span><span class="s">"All messages persisted"</span><span class="o">);</span>
<span class="n">poll</span><span class="o">(</span><span class="n">jdbc</span><span class="o">,</span> <span class="n">consumer</span><span class="o">);</span>
<span class="o">})</span>
<span class="o">.</span><span class="na">onFailure</span><span class="o">(</span><span class="n">cause</span> <span class="o">-></span> <span class="nc">System</span><span class="o">.</span><span class="na">err</span><span class="o">.</span><span class="na">println</span><span class="o">(</span><span class="s">"Error persisting messages: "</span> <span class="o">+</span> <span class="n">cause</span><span class="o">))</span>
<span class="o">;</span>
}
private Map<String, String> kafkaConsumerConfiguration() {
final Map<String, String> config = new HashMap<>();
config.put(BOOTSTRAP_SERVERS_CONFIG, "kafkahost:9092");
config.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
config.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
return config;
}
...
}
Manual commit
Setting the ENABLE_AUTO_COMMIT_CONFIG
properties to false
, the service takes ownership of the topic offset commit.
The commit will be performed only when every message will be persisted, with this trick the at least once
delivery is achieved.
public class MainVerticle extends AbstractVerticle {
@Override
public void start(Promise<Void> startPromise) throws Exception {
JDBCClient jdbc = JDBCClient.create(vertx, datasourceConfiguration());
KafkaConsumer<String, String> consumer = KafkaConsumer
.<String, String>create(vertx, kafkaConsumerConfiguration())
.subscribe("topic.name", startPromise);
<span class="n">poll</span><span class="o">(</span><span class="n">jdbc</span><span class="o">,</span> <span class="n">consumer</span><span class="o">);</span>
}
private void poll(JDBCClient jdbc, KafkaConsumer<String, String> consumer) {
Promise<KafkaConsumerRecords<String, String>> pollPromise = Promise.promise();
consumer.poll(100, pollPromise);
<span class="n">pollPromise</span><span class="o">.</span><span class="na">future</span><span class="o">()</span>
<span class="o">.</span><span class="na">compose</span><span class="o">(</span><span class="n">records</span> <span class="o">-></span> <span class="o">{</span>
<span class="nc">List</span><span class="o"><</span><span class="nc">Future</span><span class="o"><</span><span class="nc">UpdateResult</span><span class="o">>></span> <span class="n">futures</span> <span class="o">=</span> <span class="nc">IntStream</span><span class="o">.</span><span class="na">range</span><span class="o">(</span><span class="mi">0</span><span class="o">,</span> <span class="n">records</span><span class="o">.</span><span class="na">size</span><span class="o">())</span>
<span class="o">.</span><span class="na">mapToObj</span><span class="o">(</span><span class="nl">records:</span><span class="o">:</span><span class="n">recordAt</span><span class="o">)</span>
<span class="o">.</span><span class="na">map</span><span class="o">(</span><span class="n">record</span> <span class="o">-></span> <span class="n">persist</span><span class="o">(</span><span class="n">jdbc</span><span class="o">,</span> <span class="n">record</span><span class="o">))</span>
<span class="o">.</span><span class="na">collect</span><span class="o">(</span><span class="n">toList</span><span class="o">());</span>
<span class="k">return</span> <span class="nc">CompositeFuture</span><span class="o">.</span><span class="na">all</span><span class="o">(</span><span class="k">new</span> <span class="nc">ArrayList</span><span class="o"><>(</span><span class="n">futures</span><span class="o">));</span>
<span class="o">})</span>
<span class="o">.</span><span class="na">compose</span><span class="o">(</span><span class="n">composite</span> <span class="o">-></span> <span class="o">{</span>
<span class="nc">Promise</span><span class="o"><</span><span class="nc">Void</span><span class="o">></span> <span class="n">commitPromise</span> <span class="o">=</span> <span class="nc">Promise</span><span class="o">.</span><span class="na">promise</span><span class="o">();</span>
<span class="n">consumer</span><span class="o">.</span><span class="na">commit</span><span class="o">(</span><span class="n">commitPromise</span><span class="o">);</span>
<span class="k">return</span> <span class="n">commitPromise</span><span class="o">.</span><span class="na">future</span><span class="o">();</span>
<span class="o">})</span>
<span class="o">.</span><span class="na">onSuccess</span><span class="o">(</span><span class="n">any</span> <span class="o">-></span> <span class="o">{</span>
<span class="nc">System</span><span class="o">.</span><span class="na">out</span><span class="o">.</span><span class="na">println</span><span class="o">(</span><span class="s">"All messages persisted and committed"</span><span class="o">);</span>
<span class="n">poll</span><span class="o">(</span><span class="n">jdbc</span><span class="o">,</span> <span class="n">consumer</span><span class="o">);</span>
<span class="o">})</span>
<span class="o">.</span><span class="na">onFailure</span><span class="o">(</span><span class="n">cause</span> <span class="o">-></span> <span class="nc">System</span><span class="o">.</span><span class="na">err</span><span class="o">.</span><span class="na">println</span><span class="o">(</span><span class="s">"Error persisting and committing messages: "</span> <span class="o">+</span> <span class="n">cause</span><span class="o">))</span>
<span class="o">;</span>
}
private Map<String, String> kafkaConsumerConfiguration() {
final Map<String, String> config = new HashMap<>();
config.put(BOOTSTRAP_SERVERS_CONFIG, "kafkahost:9092");
config.put(ENABLE_AUTO_COMMIT_CONFIG, "false");
config.put(KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
config.put(VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
return config;
}
...
}
Bonus feature: achieve ordering
With a little effort it's possible to achieve ordering:
future composition allows forcing every persist operation to wait the completion of its precedent.
It's achievable by chaining the async computations one to another, so every one will be executed when the precedent future succeeds.
This is a smart pattern to be used when serialization
is needed.
public class MainVerticle extends AbstractVerticle {
...
private void poll(JDBCClient jdbc, KafkaConsumer<String, String> consumer) {
Promise<KafkaConsumerRecords<String, String>> pollPromise = Promise.promise();
consumer.poll(100, pollPromise);
<span class="n">pollPromise</span><span class="o">.</span><span class="na">future</span><span class="o">()</span>
<span class="o">.</span><span class="na">compose</span><span class="o">(</span><span class="n">records</span> <span class="o">-></span> <span class="nc">IntStream</span><span class="o">.</span><span class="na">range</span><span class="o">(</span><span class="mi">0</span><span class="o">,</span> <span class="n">records</span><span class="o">.</span><span class="na">size</span><span class="o">())</span>
<span class="o">.</span><span class="na">mapToObj</span><span class="o">(</span><span class="nl">records:</span><span class="o">:</span><span class="n">recordAt</span><span class="o">)</span>
<span class="o">.</span><span class="na">reduce</span><span class="o">(</span><span class="nc">Future</span><span class="o">.<</span><span class="nc">UpdateResult</span><span class="o">></span><span class="n">succeededFuture</span><span class="o">(),</span>
<span class="o">(</span><span class="n">acc</span><span class="o">,</span> <span class="n">record</span><span class="o">)</span> <span class="o">-></span> <span class="n">acc</span><span class="o">.</span><span class="na">compose</span><span class="o">(</span><span class="n">it</span> <span class="o">-></span> <span class="n">persist</span><span class="o">(</span><span class="n">jdbc</span><span class="o">,</span> <span class="n">record</span><span class="o">)),</span>
<span class="o">(</span><span class="n">a</span><span class="o">,</span><span class="n">b</span><span class="o">)</span> <span class="o">-></span> <span class="n">a</span>
<span class="o">)</span>
<span class="o">)</span>
<span class="o">.</span><span class="na">compose</span><span class="o">(</span><span class="n">composite</span> <span class="o">-></span> <span class="o">{</span>
<span class="nc">Promise</span><span class="o"><</span><span class="nc">Void</span><span class="o">></span> <span class="n">commitPromise</span> <span class="o">=</span> <span class="nc">Promise</span><span class="o">.</span><span class="na">promise</span><span class="o">();</span>
<span class="n">consumer</span><span class="o">.</span><span class="na">commit</span><span class="o">(</span><span class="n">commitPromise</span><span class="o">);</span>
<span class="k">return</span> <span class="n">commitPromise</span><span class="o">.</span><span class="na">future</span><span class="o">();</span>
<span class="o">})</span>
<span class="o">.</span><span class="na">onSuccess</span><span class="o">(</span><span class="n">any</span> <span class="o">-></span> <span class="o">{</span>
<span class="nc">System</span><span class="o">.</span><span class="na">out</span><span class="o">.</span><span class="na">println</span><span class="o">(</span><span class="s">"All messages persisted and committed"</span><span class="o">);</span>
<span class="n">poll</span><span class="o">(</span><span class="n">jdbc</span><span class="o">,</span> <span class="n">consumer</span><span class="o">);</span>
<span class="o">})</span>
<span class="o">.</span><span class="na">onFailure</span><span class="o">(</span><span class="n">cause</span> <span class="o">-></span> <span class="nc">System</span><span class="o">.</span><span class="na">err</span><span class="o">.</span><span class="na">println</span><span class="o">(</span><span class="s">"Error persisting and committing messages: "</span> <span class="o">+</span> <span class="n">cause</span><span class="o">));</span>
}
...
}
Conclusion
Backpressure is a fundamental topic to cover when working with async programming.
It does not come for free out of the vert.x box, but it can be acheived with some simple tricks.
Top comments (3)
Hi. I can't see the why there should be a need for a backpressure mechanism, because messages are consumed by a Consumer, when it's not blocked/busy and when it has enough resources. I see a need for a backpressure mechanism, when talking about a push principle based message queue, for example Redis. But here we are talking explicitly about Kafka. So why should I think of such a seemingly impossible issue? Let's assume a DB is slow and single inserts or updates take their time, then you could at least
pause()
and thenresume()
consuming. Where do one needs the described mechanisim? What may I have overseen here?Great article! You've given really easy to understand example. However, I was wondering something I didn't see you mention. In the first case, when you don't use the polling principle upon failing you see which record exactly has failed. Is it true that with the polling you can only get the batch of records in which a failing record appeared and not the single record itself? And another question, could you explain what would happen if for example the 3 of 10 records in the batch fails? Does this fail that the whole batch of 10 records?
with the polling you can get the single record by setting the config
max.poll.records
to 1, but it's not good for performances.With polling you get the a batch of messages, but you can process it one at a time, so, it's possible to know which is failing, the advice is to avoid commit when at least one message fails.
So yes, with this approach one or more fail = all batch fails.