DEV Community

Mahboob Hussain
Mahboob Hussain

Posted on

Configuring Apache Kafka MongoSinkConnector on Windows

I downloaded the connector from confluent
Click on the blue Download button at the left to get mongodb-kafka-connect-mongodb-1.0.1.zip file.

There is an -all.jar file in the zip. There is also the file MongoSinkConnector.properties in the etc folder inside the zip file.

Move the jar file to kafka_installation_folder\plugins.
Move the properties file to kafka_installation_folder\config.

My Kafka installation folder is E:\Tools\kafka_2.12-2.4.0
That is, in my case,
E:\Tools\kafka_2.12-2.4.0\plugins has mongo-kafka-1.0.1-all.jar file. And E:\Tools\kafka_2.12-2.4.0\config has MongoSinkConnector.properties.

My connect-standalone.properties file has the following entries:

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter

key.converter.schemas.enable=false
value.converter.schemas.enable=false

offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
plugin.path=E:/Tools/kafka_2.12-2.4.0/plugins/mongo-kafka-1.0.1-all.jar

My MongoSinkConnector.properties file has the following entries

topics=topic1,topic2
connector.class=com.mongodb.kafka.connect.MongoSinkConnector
tasks.max=1

connection.uri=mongodb://localhost:27017,localhost:27017,localhost:27017
database=test_kafka
collection=transactions
max.num.retries=3
retries.defer.timeout=5000

field.renamer.mapping=[]
field.renamer.regex=[]

max.batch.size = 0
rate.limiting.timeout=0
rate.limiting.every.n=0

How To Run
Start mongodb, zookeeper, kafka server in three consoles.
In 4th console, start Kafka connect --
bin\windows\connect-standalone config\connect-standalone.properties config\MongoSinkConnector.properties

In 5th console, send msgs to a topic (I did for topic1)
bin\windows\kafka-console-producer --broker-list localhost:9092 --topic topic1
>{"Hello":1}
>{"Mongo":2}
>{"World":3}

Open mongo (client) in a new (5th console) and check your database/collections. You will see the above three messages.

Originally posted on : https://mh-github.github.io/kafka-connect2.html

Discussion (2)

Collapse
munishsinghal profile image
munishsinghal

[2020-09-07 19:35:40,926] WARN could not create Dir using jarFile from url file:/C:/temp/kafka/kafka_2.12-2.2.0/plugins/mongo-kafka-1.2.0-all.jar. skipping. (org.reflections.Reflections)
java.lang.NullPointerException
at java.util.zip.ZipFile.(Unknown Source)
at java.util.zip.ZipFile.(Unknown Source)
at java.util.jar.JarFile.(Unknown Source)
at java.util.jar.JarFile.(Unknown Source)
at org.reflections.vfs.Vfs$DefaultUrlTypes$1.createDir(Vfs.java:214)
at org.reflections.vfs.Vfs.fromURL(Vfs.java:104)
at org.reflections.vfs.Vfs.fromURL(Vfs.java:96)
at org.reflections.Reflections.scan(Reflections.java:257)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader$InternalReflections.scan(DelegatingClassLoader.java:452)
at org.reflections.Reflections.lambda$scan$0(Reflections.java:213)
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.util.concurrent.FutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)
[2020-09-07 19:35:41,007] WARN could not create Vfs.Dir from url. ignoring the exception and continuing (org.reflections.Reflections)
org.reflections.ReflectionsException: Could not open url connection
at org.reflections.vfs.JarInputDir$1.(JarInputDir.java:34)
at org.reflections.vfs.JarInputDir.lambda$getFiles$0(JarInputDir.java:30)
at org.reflections.Reflections.scan(Reflections.java:260)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader$InternalReflections.scan(DelegatingClassLoader.java:452)
at org.reflections.Reflections.lambda$scan$0(Reflections.java:213)
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.util.concurrent.FutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)

Caused by: java.lang.ClassNotFoundException: io.netty.internal.tcnative.CertificateVerifier
at java.net.URLClassLoader.findClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
at sun.misc.Launcher$AppClassLoader.loadClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:310)

[2020-09-07 19:35:41,937] WARN could not get type for name org.easymock.EasyMockSupport from any class loader (org.reflections.Reflections)
org.reflections.ReflectionsException: could not get type for name org.easymock.EasyMockSupport
at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:312)
at org.reflections.Reflections.expandSuperTypes(Reflections.java:382)
at org.reflections.Reflections.(Reflections.java:140)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader$InternalReflections.(DelegatingClassLoader.java:444)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:334)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:268)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:216)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:209)
at org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:61)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:79)
Caused by: java.lang.ClassNotFoundException: org.easymock.EasyMockSupport
at java.net.URLClassLoader.findClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
at sun.misc.Launcher$AppClassLoader.loadClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:310)
... 9 more
[2020-09-07 19:35:41,957] WARN could not get type for name org.osgi.framework.SynchronousBundleListener from any class loader (org.reflections.Reflections)
org.reflections.ReflectionsException: could not get type for name org.osgi.framework.SynchronousBundleListener
at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:312)
at org.reflections.Reflections.expandSuperTypes(Reflections.java:382)
at org.reflections.Reflections.(Reflections.java:140)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader$InternalReflections.(DelegatingClassLoader.java:444)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:334)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:268)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:216)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:209)
at org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:61)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:79)
Caused by: java.lang.ClassNotFoundException: org.osgi.framework.SynchronousBundleListener
at java.net.URLClassLoader.findClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
at sun.misc.Launcher$AppClassLoader.loadClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:310)
... 9 more
[2020-09-07 19:35:41,974] WARN could not get type for name org.eclipse.jetty.npn.NextProtoNego$ClientProvider from any class loader (org.reflections.Reflections)
org.reflections.ReflectionsException: could not get type for name org.eclipse.jetty.npn.NextProtoNego$ClientProvider
at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:312)
at org.reflections.Reflections.expandSuperTypes(Reflections.java:382)
at org.reflections.Reflections.(Reflections.java:140)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader$InternalReflections.(DelegatingClassLoader.java:444)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:334)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:268)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:216)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:209)
at org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:61)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:79)
Caused by: java.lang.ClassNotFoundException: org.eclipse.jetty.npn.NextProtoNego$ClientProvider
at java.net.URLClassLoader.findClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
at sun.misc.Launcher$AppClassLoader.loadClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:310)
... 9 more
[2020-09-07 19:35:41,983] WARN could not get type for name org.conscrypt.AllocatedBuffer from any class loader (org.reflections.Reflections)
org.reflections.ReflectionsException: could not get type for name org.conscrypt.AllocatedBuffer
at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:312)
at org.reflections.Reflections.expandSuperTypes(Reflections.java:382)
at org.reflections.Reflections.(Reflections.java:140)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader$InternalReflections.(DelegatingClassLoader.java:444)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:334)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:268)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:216)
at org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:209)
at org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:61)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:79)
Caused by: java.lang.ClassNotFoundException: org.conscrypt.AllocatedBuffer
at java.net.URLClassLoader.findClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
at sun.misc.Launcher$AppClassLoader.loadClass(Unknown Source)
at java.lang.ClassLoader.loadClass(Unknown Source)
at org.reflections.ReflectionUtils.forName(ReflectionUtils.java:310)
... 9 more
[2020-09-07 19:35:42,093] WARN The configuration 'offset.flush.interval.ms' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig)
[2020-09-07 19:35:42,093] WARN The configuration 'key.converter.schemas.enable' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig)
[2020-09-07 19:35:42,095] WARN The configuration 'offset.storage.file.filename' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig)
[2020-09-07 19:35:42,095] WARN The configuration 'value.converter.schemas.enable' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig)
[2020-09-07 19:35:42,095] WARN The configuration 'plugin.path' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig)
[2020-09-07 19:35:42,101] WARN The configuration 'value.converter' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig)
[2020-09-07 19:35:42,101] WARN The configuration 'key.converter' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig)
[2020-09-07 19:35:42,483] WARN The configuration 'offset.flush.interval.ms' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig)
[2020-09-07 19:35:42,483] WARN The configuration 'key.converter.schemas.enable' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig)
[2020-09-07 19:35:42,485] WARN The configuration 'offset.storage.file.filename' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig)
[2020-09-07 19:35:42,490] WARN The configuration 'value.converter.schemas.enable' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig)
[2020-09-07 19:35:42,491] WARN The configuration 'plugin.path' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig)
[2020-09-07 19:35:42,491] WARN The configuration 'value.converter' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig)
[2020-09-07 19:35:42,492] WARN The configuration 'key.converter' was supplied but isn't a known config. (org.apache.kafka.clients.admin.AdminClientConfig)
Sep 07, 2020 7:35:43 PM org.glassfish.jersey.internal.inject.Providers checkProviderRuntime
WARNING: A provider org.apache.kafka.connect.runtime.rest.resources.RootResource registered in SERVER runtime does not implement any provider interfaces applicable in the SERVER runtime. Due to constraint configuration problems the provider org.apache.kafka.connect.runtime.rest.resources.RootResource will be ignored.
Sep 07, 2020 7:35:43 PM org.glassfish.jersey.internal.inject.Providers checkProviderRuntime
WARNING: A provider org.apache.kafka.connect.runtime.rest.resources.LoggingResource registered in SERVER runtime does not implement any provider interfaces applicable in the SERVER runtime. Due to constraint configuration problems the provider org.apache.kafka.connect.runtime.rest.resources.LoggingResource will be ignored.
Sep 07, 2020 7:35:43 PM org.glassfish.jersey.internal.inject.Providers checkProviderRuntime
WARNING: A provider org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource registered in SERVER runtime does not implement any provider interfaces applicable in the SERVER runtime. Due to constraint configuration problems the provider org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource will be ignored.
Sep 07, 2020 7:35:43 PM org.glassfish.jersey.internal.inject.Providers checkProviderRuntime
WARNING: A provider org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource registered in SERVER runtime does not implement any provider interfaces applicable in the SERVER runtime. Due to constraint configuration problems the provider org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource will be ignored.
Sep 07, 2020 7:35:43 PM org.glassfish.jersey.internal.Errors logErrors
WARNING: The following warnings have been detected: WARNING: The (sub)resource method listLoggers in org.apache.kafka.connect.runtime.rest.resources.LoggingResource contains empty path annotation.
WARNING: The (sub)resource method createConnector in org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains empty path annotation.
WARNING: The (sub)resource method listConnectors in org.apache.kafka.connect.runtime.rest.resources.ConnectorsResource contains empty path annotation.
WARNING: The (sub)resource method listConnectorPlugins in org.apache.kafka.connect.runtime.rest.resources.ConnectorPluginsResource contains empty path annotation.
WARNING: The (sub)resource method serverInfo in org.apache.kafka.connect.runtime.rest.resources.RootResource contains empty path annotation.

[2020-09-07 19:35:43,378] ERROR Failed to create job for ....\config\MongoSinkConnector.properties (org.apache.kafka.connect.cli.ConnectStandalone)
[2020-09-07 19:35:43,378] ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone)
java.util.concurrent.ExecutionException: org.apache.kafka.connect.runtime.rest.errors.BadRequestException: Connector configuration is invalid and contains the following 1 error(s):
Configuration is not defined: topic
Missing required configuration "name" which has no default value.
You can also find the above list of errors at the endpoint /connector-plugins/{connectorType}/config/validate
at org.apache.kafka.connect.util.ConvertingFutureCallback.result(ConvertingFutureCallback.java:115)
at org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:99)
at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:118)
Caused by: org.apache.kafka.connect.runtime.rest.errors.BadRequestException: Connector configuration is invalid and contains the following 1 error(s):
Configuration is not defined: topic
Missing required configuration "name" which has no default value.
You can also find the above list of errors at the endpoint /connector-plugins/{connectorType}/config/validate
at org.apache.kafka.connect.runtime.AbstractHerder.maybeAddConfigErrors(AbstractHerder.java:613)
at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:215)
at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.lambda$null$1(StandaloneHerder.java:201)
at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source)
at java.util.concurrent.FutureTask.run(Unknown Source)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(Unknown Source)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.lang.Thread.run(Unknown Source)

Collapse
ilhansaglik_ profile image
İlhan SAĞLIK

I am getting the same mistakes too