DEV Community

keigodasu
keigodasu

Posted on • Updated on

Transferring Commit Offset with MirrorMaker 2

MirrorMaker 2(MM2) is the tool to copy messages from one Kafka cluster to another. And current version of MM2 supports to transfer consumer commit offset to another cluster. Thanks to this feature, when failing over to another site, Consumer is able to consume messages from the last committed offset mirrored by MM2.

Prerequisite of this article

Run MM2 on Kafka Connect.

Steps

  1. Register MM2 with Kafka Connect
  2. Write Consumer on destination side with RemoteClusterUtils

1. Register MM2 with Kafka Connect

Need two connectors for transferring consumer commit offset. For details about parameters, go to the MM2 document

MirrorSourceConnector

replicates a set of topics from a source cluster into a destination cluster

curl -s -XPOST -H 'Content-Type: application/json' -H 'Accept: application/json' http://<kafka-connect>:8083/connectors -d'{
     "name": "mirror-source-connector",
     "config": {
     "connector.class": "org.apache.kafka.connect.mirror.MirrorSourceConnector",
     "source.cluster.alias": "kafka-source",
     "target.cluster.alias": "kafka-dist",
     "source.cluster.bootstrap.servers": "dc01-kafka01:9091",
     "target.cluster.bootstrap.servers": "dc02-kafka01:9091",
     "topics": ".*",
     "backup.sync.group.offsets.enabled": true,
     "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
     }
}
Enter fullscreen mode Exit fullscreen mode

MirrorCheckpointConnector

emits consumer offset and syncs the offset __consumer_offsets. Synced consumer offset is stored in <source-cluster-alias>.checkpoints.internal on the destination cluster.

curl -s -XPOST -H 'Content-Type: application/json' -H 'Accept: application/json' http://<kafka-connect>:8083/connectors -d'{
    "name": "mirror-checkpoint-connector",
    "config": {
      "connector.class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector",
      "sync.group.offsets.enabled": "true",
      "source.cluster.alias": "kafka-source",
      "target.cluster.alias": "kafka-dist",
      "exclude.internal.topics":"false",
      "tasks.max": "1",
      "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
      "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
      "source.cluster.bootstrap.servers": "dc01-kafka01:9091",
      "target.cluster.bootstrap.servers": "dc02-kafka01:9091"
    }
}
Enter fullscreen mode Exit fullscreen mode

2. Write Consumer on destination side with RemoteClusterUtils

add dependency of RemoteClusterUtils

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>connect-mirror-client</artifactId>
    <version>3.0.0</version>
</dependency>
Enter fullscreen mode Exit fullscreen mode

RemoteClusterUtils.translateOffsets internally fetches synced offset from <source-cluster-alias>.checkpoints.internal. Once get synced offset, consumer on the destination side is able to continue operations by setting the offset via seek API.

public class RemoteUtilTest {
    private static final Properties props = new Properties();

    @SuppressWarnings("InfiniteLoopStatement")
    public static void main(final String[] args) {
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "dc02-kafka01:9091");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "dist-consumer-group");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);

        // config for MM2 (bootstrap.servers in destination side is required)
        Map<String, Object> mmConfig = new HashMap<>();
        mmConfig.put("bootstrap.servers", "dc02-kafka01:9091");

        try (final KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {
            // get synced consumer offset
            Map<TopicPartition, OffsetAndMetadata> destinationOffsetsMap = RemoteClusterUtils.translateOffsets(mmConfig, "kafka-source", "source-consumer-group", Duration.ofMillis(10000));
            destinationOffsetsMap.forEach(((topicPartition, offsetAndMetadata) -> System.out.printf("topicPartiion: %s, offsetAndMetadata: %s%n", topicPartition, offsetAndMetadata) ));

            // set the offset
            List<TopicPartition> topicPartitions = destinationOffsetsMap.keySet().stream().collect(Collectors.toList());
            destinationOffsetsMap.forEach(((topicPartition, offsetAndMetadata) -> consumer.assign(topicPartitions)));
            destinationOffsetsMap.forEach(((topicPartition, offsetAndMetadata) -> consumer.seek(topicPartition, offsetAndMetadata)));
            while (true) {
                final ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                for (final ConsumerRecord<String, String> record : records) {
                    final String key = record.key();
                    final String value = record.value();
                    System.out.printf("key = %s, value = %s%n", key, value);
                }
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
    }
}

Enter fullscreen mode Exit fullscreen mode

Discussion (0)