Let’s imagine we have XML data on a queue in IBM MQ, and we want to ingest it into Kafka to then use downstream, perhaps in an application or maybe to stream to a NoSQL store like MongoDB.
Note
This same pattern for ingesting XML will work with other connectors such as JMS and ActiveMQ.
I’ve got a Docker Compose stack running that includes:
IBM MQ
Apache Kafka (deployed as Confluent Platform to include the all-important Schema Registry)
MongoDB
Loading some test data onto IBM MQ
Let’s load some messages onto the queue from an XML file:
docker exec --interactive ibmmq \
/opt/mqm/samp/bin/amqsput DEV.QUEUE.1 QM1 < data/note.xml
Streaming from IBM MQ to Kafka and translating the XML messages
Now we can ingest this into Kafka using the Kafka Connect with the IbmMQSourceConnector plugin and XML Transformation:
curl -i -X PUT -H "Content-Type:application/json" \
http://localhost:8083/connectors/source-ibmmq-note-01/config \
-d '{
"connector.class": "io.confluent.connect.ibm.mq.IbmMQSourceConnector",
"kafka.topic":"ibmmq-note-01",
"mq.hostname":"ibmmq",
"mq.port":"1414",
"mq.queue.manager":"QM1",
"mq.transport.type":"client",
"mq.channel":"DEV.APP.SVRCONN",
"mq.username":"app",
"mq.password":"password123",
"jms.destination.name":"DEV.QUEUE.1",
"jms.destination.type":"queue",
"confluent.license":"",
"confluent.topic.bootstrap.servers":"broker:29092",
"confluent.topic.replication.factor":"1",
"transforms": "extractPayload,xml",
"transforms.extractPayload.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
"transforms.extractPayload.field": "text",
"transforms.xml.type": "com.github.jcustenborder.kafka.connect.transform.xml.FromXml$Value",
"transforms.xml.schema.path": "file:///data/note.xsd",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"http://schema-registry:8081"
}'
Note
ExtractField
is needed otherwise the XML transform will fail withjava.lang.UnsupportedOperationException: STRUCT is not a supported type.
since it will be trying to operate on the entire payload from IBM MQ which includes fields other than the XML that we’re interested in.
The resulting Kafka topic holds the value of the text
field in the messages, serialised in Avro:
docker exec kafkacat \
kafkacat \
-b broker:29092 \
-r http://schema-registry:8081 \
-s key=s -s value=avro \
-t ibmmq-note-01 \
-C -o beginning -u -q -J | \
jq -c '.payload'
{"Note":{"to":"Tove","from":"Jani","heading":"Reminder 01","body":"Don't forget me this weekend!"}}
{"Note":{"to":"Jani","from":"Tove","heading":"Reminder 02","body":"Of course I won't!"}}
…
To understand more about the concepts around getting XML data into Kafka see here, and I’ve written about the specifics of Kafka Connect and the XML transformation here.
Streaming the data from Kafka to MongoDB
We can then add another Kafka Connect connector to the pipeline, using the official plugin for Kafka Connect from MongoDB, which will stream data straight from a Kafka topic into MongoDB:
curl -i -X PUT -H "Content-Type:application/json" \
http://localhost:8083/connectors/sink-mongodb-note-01/config \
-d '{
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
"topics":"ibmmq-note-01",
"connection.uri":"mongodb://mongodb:27017",
"database":"rmoff",
"collection":"notes",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"http://schema-registry:8081"
}'
Check out the data in MongoDB:
docker exec --interactive mongodb mongo localhost:27017 <<EOF
use rmoff
db.notes.find()
EOF
MongoDB shell version v4.4.1
connecting to: mongodb://localhost:27017/test?compressors=disabled&gssapiServiceName=mongodb
Implicit session: session { "id" : UUID("9aae83c4-0e25-43a9-aca5-7278d366423b") }
MongoDB server version: 4.4.1
switched to db rmoff
{ "_id" : ObjectId("5f77b64eee00df1cc80135a1"), "to" : "Tove", "from" : "Jani", "heading" : "Reminder 01", "body" : "Don't forget me this weekend!" }
{ "_id" : ObjectId("5f77b64eee00df1cc80135a2"), "to" : "Jani", "from" : "Tove", "heading" : "Reminder 02", "body" : "Of course I won't!" }
{ "_id" : ObjectId("5f77b64eee00df1cc80135a3"), "to" : "Tove", "from" : "Jani", "heading" : "Reminder 03", "body" : "Where are you?" }
{ "_id" : ObjectId("5f77b64eee00df1cc80135a4"), "to" : "Jani", "from" : "Tove", "heading" : "Reminder 04", "body" : "I forgot ð¤·ââï¸" }
bye
Let’s check that this is actually streaming, by sending another record to the MQ:
echo "<note> <to>Tove</to> <from>Jani</from> <heading>Reminder 05</heading> <body>Srsly?</body> </note>" | docker exec --interactive ibmmq /opt/mqm/samp/bin/amqsput DEV.QUEUE.1 QM1
Sample AMQSPUT0 start
target queue is DEV.QUEUE.1
Sample AMQSPUT0 end
And, behold, the new record in MongoDB:
docker exec --interactive mongodb mongo localhost:27017 <<EOF
use rmoff
db.notes.find()
EOF
MongoDB shell version v4.4.1
connecting to: mongodb://localhost:27017/test?compressors=disabled&gssapiServiceName=mongodb
Implicit session: session { "id" : UUID("2641e93e-9c5d-4270-8f64-e52295a60309") }
MongoDB server version: 4.4.1
switched to db rmoff
{ "_id" : ObjectId("5f77b64eee00df1cc80135a1"), "to" : "Tove", "from" : "Jani", "heading" : "Reminder 01", "body" : "Don't forget me this weekend!" }
{ "_id" : ObjectId("5f77b64eee00df1cc80135a2"), "to" : "Jani", "from" : "Tove", "heading" : "Reminder 02", "body" : "Of course I won't!" }
{ "_id" : ObjectId("5f77b64eee00df1cc80135a3"), "to" : "Tove", "from" : "Jani", "heading" : "Reminder 03", "body" : "Where are you?" }
{ "_id" : ObjectId("5f77b64eee00df1cc80135a4"), "to" : "Jani", "from" : "Tove", "heading" : "Reminder 04", "body" : "I forgot ð¤·ââï¸" }
{ "_id" : ObjectId("5f77b77cee00df1cc80135a6"), "to" : "Tove", "from" : "Jani", "heading" : "Reminder 05", "body" : "Srsly?" }
bye
What if my data’s not in XML? What if we want other fields from the payload?
In the example above we’re taking data from the source system (IBM MQ) and Kafka Connect is applying a schema to the field called text
within it (the XML transformation does this, based on the supplied XSD). When it’s written to Kafka it’s serialised using the selected converter which since it’s Avro stores the schema in the Schema Registry. This is a Good Way of doing things, since we retain the schema for use by any consumer. We could use Protobuf or JSON Schema here too if we wanted. If this doesn’t all make sense to you then check out Schemas, Schmeeeemas / Why not just JSON?.
But the full payload that comes through from IBM MQ looks like this:
messageID=ID:414d5120514d3120202020202020202060e67a5f06352924
messageType=text
timestamp=1601893142430
deliveryMode=1
redelivered=false
expiration=0
priority=0
properties={JMS_IBM_Format=Struct{propertyType=string,string=MQSTR },
JMS_IBM_PutDate=Struct{propertyType=string,string=20201005},
JMS_IBM_Character_Set=Struct{propertyType=string,string=ISO-8859-1},
JMSXDeliveryCount=Struct{propertyType=integer,integer=1},
JMS_IBM_MsgType=Struct{propertyType=integer,integer=8},
JMSXUserID=Struct{propertyType=string,string=mqm },
JMS_IBM_Encoding=Struct{propertyType=integer,integer=546},
JMS_IBM_PutTime=Struct{propertyType=string,string=10190243},
JMSXAppID=Struct{propertyType=string,string=amqsput },
JMS_IBM_PutApplType=Struct{propertyType=integer,integer=6}}
text=<note> <to>Jani</to> <from>Tove</from> <heading>Reminder 02</heading> <body>Of course I won't!</body> </note>
If we want to retain some or all of these fields, we’re going to have to approach things a different way. As things stand, there is no Single Message Transform that I’m aware of that can take both the non-XML fields and the XML field and wrangle them into a single structured schema (which is the ideal outcome, or perhaps putting the non-XML fields into the Kafka message header). By default the IBM MQ Source Connector will write the full payload to a schema. This means that you still use a schema-supporting serialisation method, but the text
payload field remains unparsed.
Here’s an example:
curl -i -X PUT -H "Content-Type:application/json" \
http://localhost:8083/connectors/source-ibmmq-note-03/config \
-d '{
"connector.class": "io.confluent.connect.ibm.mq.IbmMQSourceConnector",
"kafka.topic":"ibmmq-note-03",
"mq.hostname":"ibmmq",
"mq.port":"1414",
"mq.queue.manager":"QM1",
"mq.transport.type":"client",
"mq.channel":"DEV.APP.SVRCONN",
"mq.username":"app",
"mq.password":"password123",
"jms.destination.name":"DEV.QUEUE.1",
"jms.destination.type":"queue",
"confluent.license":"",
"confluent.topic.bootstrap.servers":"broker:29092",
"confluent.topic.replication.factor":"1",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"http://schema-registry:8081"
}'
Now the full IBM MQ message is written to a Kafka topic, serialised with a schema. We can deserialise it with something like kafkacat:
kafkacat \
-b broker:29092 \
-r http://schema-registry:8081 \
-s key=s -s value=avro \
-t ibmmq-note-03 \
-C -c1 -o beginning -u -q -J | \
jq '.'
{
"topic": "ibmmq-note-03",
"partition": 0,
"offset": 0,
"tstype": "create",
"ts": 1601894073400,
"broker": 1,
"key": "Struct{messageID=ID:414d5120514d3120202020202020202060e67a5f033a2924}",
"payload": {
"messageID": "ID:414d5120514d3120202020202020202060e67a5f033a2924",
"messageType": "text",
"timestamp": 1601894073400,
"deliveryMode": 1,
"properties": {
"JMS_IBM_Format": {
"propertyType": "string",
"boolean": null,
"byte": null,
"short": null,
"integer": null,
"long": null,
"float": null,
"double": null,
"string": {
"string": "MQSTR "
}
},
…
"map": null,
"text": {
"string": "<note> <to>Tove</to> <from>Jani</from> <heading>Reminder 01</heading> <body>Don't forget me this weekend!</body> </note>"
}
}
}
Observe that the text
field is just a string, holding [what happens to be] XML.
You can use ksqlDB to work with the data, to an extent - although there’s currently no support for handing the XML:
SELECT "PROPERTIES"['JMSXAppID']->STRING as JMSXAppID,
"PROPERTIES"['JMS_IBM_PutTime']->STRING as JMS_IBM_PutTime,
"PROPERTIES"['JMSXDeliveryCount']->INTEGER as JMSXDeliveryCount,
"PROPERTIES"['JMSXUserID']->STRING as JMSXUserID,
text
FROM IBMMQ_SOURCE
EMIT CHANGES;
+-----------+-----------------+-------------------+------------+------------------------------------+
|JMSXAPPID |JMS_IBM_PUTTIME |JMSXDELIVERYCOUNT |JMSXUSERID |TEXT |
+-----------+-----------------+-------------------+------------+------------------------------------+
|amqsput |10302905 |1 |mqm |<note> <to>Jani</to> <from>Tove</fro|
| | | | |m> <heading>Reminder 02</heading> <b|
| | | | |ody>Of course I won't!</body> </note|
| | | | |> |
|amqsput |10302905 |1 |mqm |<note> <to>Tove</to> <from>Jani</fro|
| | | | |m> <heading>Reminder 03</heading> <b|
| | | | |ody>Where are you?</body> </note> |
👾 Try it out!
You can find the code to run this for yourself using Docker Compose on GitHub.
Top comments (1)
Great, thanks for sharing