DEV Community

Cover image for Start Maxwell with Namespaced Topic Kafka Producer (Look for Idle Listeners in Kafka)
Montana Mendy
Montana Mendy

Posted on

Start Maxwell with Namespaced Topic Kafka Producer (Look for Idle Listeners in Kafka)

After running through the prerequisites, you will have:

  • A AWS Aurora instance
  • A Maxwell image named osheroff/maxwell
  • AKafka service named kafka, listening on kafka:9092

Start Maxwell with Namespaced Topic Kafka Producer
This is a slight variation of the prerequisite, AWS Aurora to Maxwell Kafka Producer.

In the prerequisite we ran Maxwell with the default Kafka Producer configuration which will produce messages on the Maxwell topic.

You'll want to get the number of messages in a Maxwell topic, you can run: bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic mytopic --time -1 --offsets 1 | awk -F ":" '{sum += $3} END {print sum}'

In this example we are overriding the MAXWELL_OPTIONS environment variable and specifying a dynamic topic name, so that Maxwell will route messages from each table to topics by the same name, namespaced by the database name.

docker run -it --rm \
    --env MYSQL_USERNAME=AURORA_USERNAME \
    --env MYSQL_PASSWORD=AURORA_PASSWORD \
    --env MYSQL_HOST=AURORA_HOST \
    --link kafka \
    --env KAFKA_HOST=kafka \
    --env KAFKA_PORT=9092 \
    --env MAXWELL_OPTIONS="--kafka_topic=maxwell_%{database}_%{table}
    --name maxwell \
    osheroff/maxwell
Enter fullscreen mode Exit fullscreen mode

Image description

Here's a graphical way of looking at things when it comes to Consumers. Now let's get back to Maxwell.

This will be the output of Maxwell:

17:44:34,901 INFO  ProducerConfig - ProducerConfig values: 
    request.timeout.ms = 30000
    retry.backoff.ms = 100
    buffer.memory = 33554432
    ssl.truststore.password = null
    batch.size = 16384
    ssl.keymanager.algorithm = SunX509
    receive.buffer.bytes = 32768
    ssl.cipher.suites = null
    ssl.key.password = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    ssl.provider = null
    sasl.kerberos.service.name = null
    max.in.flight.requests.per.connection = 5
    sasl.kerberos.ticket.renew.window.factor = 0.8
    bootstrap.servers = [kafka:9092]
    client.id = 
    max.request.size = 1048576
    acks = 1
    linger.ms = 0
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    metadata.fetch.timeout.ms = 60000
    ssl.endpoint.identification.algorithm = null
    ssl.keystore.location = null
    value.serializer = class org.apache.kafka.common.serialization.StringSerializer
    ssl.truststore.location = null
    ssl.keystore.password = null
    key.serializer = class org.apache.kafka.common.serialization.StringSerializer
    block.on.buffer.full = false
    metrics.sample.window.ms = 30000
    metadata.max.age.ms = 300000
    security.protocol = PLAINTEXT
    ssl.protocol = TLS
    sasl.kerberos.min.time.before.relogin = 60000
    timeout.ms = 30000
    connections.max.idle.ms = 540000
    ssl.trustmanager.algorithm = PKIX
    metric.reporters = []
    compression.type = none
    ssl.truststore.type = JKS
    max.block.ms = 60000
    retries = 0
    send.buffer.bytes = 131072
    partitioner.class = class org.apache.kafka.clients.producer.internals.DefaultPartitioner
    reconnect.backoff.ms = 50
    metrics.num.samples = 2
    ssl.keystore.type = JKS

17:44:34,952 INFO  AppInfoParser - Kafka version : 0.9.0.1
17:44:34,952 INFO  AppInfoParser - Kafka commitId : 23c69d62a0cabf06
17:44:35,012 INFO  Maxwell - Maxwell v1.7.0 is booting (MaxwellKafkaProducer), starting at BinlogPosition[mysql-bin-changelog.000002:84337]
17:44:35,680 INFO  MysqlSavedSchema - Restoring schema id 1 (last modified at BinlogPosition[mysql-bin-changelog.000002:3521])
17:44:38,991 INFO  OpenReplicator - starting replication at mysql-bin-changelog.000002:84337
Enter fullscreen mode Exit fullscreen mode

Image description

The process is now waiting for new data events and looking for idle Kafka listeners.

Start a consumer (in another terminal window). This command will start an unnamed instance of Spotify/Kafka linked to the Kafka service, start a consumer, display existing messages from the Maxwell topic, and wait for new messages until you quit (which destroys the container):

docker run -it --rm --link kafka spotify/kafka /opt/kafka_2.11-0.10.1.0/bin/kafka-console-consumer.sh --bootstrap-server kafka:9092 --topic maxwell_{AURORA_DATABASE}_{AURORA_TABLE} --from-beginning
Enter fullscreen mode Exit fullscreen mode

Connect to the AWS Aurora instance, insert some records, and update some records. Data events from Maxwell will be printed in the Consumer terminal window:

{"database":"AURORA_DATABASE","table":"AURORA_TABLE","type":"update","ts":1484606003,"xid":1655558,"commit":true,"data":{"id":4,"first_name":"Mendy","last_name":"Montana"},"old":{"first_name":"Montana"}}
{"database":"AURORA_DATABASE","table":"AURORA_TABLE","type":"update","ts":1484606435,"xid":1658343,"commit":true,"data":{"id":4,"first_name":"Montana","last_name":"Mendy"},"old":{"first_name":"Tim"}}
{"database":"AURORA_DATABASE","table":"AURORA_TABLE","type":"update","ts":1484606451,"xid":1658455,"commit":true,"data":{"id":4,"first_name":"Tim","last_name":"Mendy"},"old":{"first_name":"Montana"}}
Enter fullscreen mode Exit fullscreen mode

Discussion (0)