Hi All,
I have the following setup:
Kafka broker (3.9.0)
Kafka producer (for now, using the producer-console in kafka itself)
This setup works fine for basic TCP, TLS and even tried SASL authentication using PLAIN, SHA256.
Now, I am trying to setup OAuth2 SASL authentication on this setup and get an invalid_token error from kafka broker while doing SASL authentication;.
This is what my configuration looks like: (included only properties relevant to SASL oauth)
server.properties:
sasl.enabled.mechanisms=OAUTHBEARER
JWKS URL from the openid-configuration URL for the oauth2 host
sasl.oauthbearer.jwks.endpoint.url=https://:443/admin/v1/SigningCert/jwk
listener.name.sasl_plaintext.sasl.enabled.mechanisms=OAUTHBEARER
verifief that this isn sync with the values for aud and iss from the access token
sasl.oauthbearer.expected.audience=""
sasl.oauthbearer.expected.issuer=""
listener.name.sasl_plaintext.oauthbearer.principal.builder.class=org.apache.kafka.common.security.authenticator.DefaultKafkaPrincipalBuilder
sasl.server.callback.handler.class=org.apache.kafka.common.security.oauthbearer.secured.OAuthBearerValidatorCallbackHandler
Server side jaas config file:
KafkaServer {
org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required;
};
producer.properties:
security.protocol=SASL_PLAINTEXT
sasl.mechanism=OAUTHBEARER
sasl.jaas.config=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required **
clientId="" \
clientSecret="" \
scope="";
sasl.oauthbearer.token.endpoint.url*=https://:443/oauth2/v1/token
sasl.login.callback.handler.class=org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginCallbackHandler*
Note: I am not interested in any custom functionality. I just want to be able to use oauth2 authentication for my kafka client. Basically, I want that if I give the provided oauth2 credentials (client id and client secret), I should be able to login and carry out with the kafka functionality.
This is what happens with the above configuration:
- I can see from the logs that the kafka producer is able to login to the oauth2 server and get the access token. I see logs like this on the producer console which tell me that the client can authenticate with the oauth2 server:
DEBUG getClaim - scope: all (org.apache.kafka.common.security.oauthbearer.internals.secured.LoginAccessTokenValidator)
[2024-12-13 13:08:04,852] DEBUG getClaim - exp: 1734098884 (org.apache.kafka.common.security.oauthbearer.internals.secured.LoginAccessTokenValidator)
[2024-12-13 13:08:04,853] DEBUG getClaim - sub (org.apache.kafka.common.security.oauthbearer.internals.secured.LoginAccessTokenValidator)
[2024-12-13 13:08:04,853] DEBUG getClaim - iat: 1734095284 (org.apache.kafka.common.security.oauthbearer.internals.secured.LoginAccessTokenValidator)
[2024-12-13 13:08:04,863] DEBUG Login succeeded; invoke commit() to commit it; current committed token count=0 (org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule)
[2024-12-13 13:08:04,864] TRACE Committing my token; current committed token count = 0 (org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule)
[2024-12-13 13:08:04,865] DEBUG Done committing my token; committed token count is now 1 (org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule)
[2024-12-13 13:08:04,866] INFO Successfully logged in.
- However, after that, when the producer tries to do SASL authentication with the kafka broker, it fails with the error: {"status":"invalid_token"}.
I see the following logs on the producer console:
DEBUG [Producer clientId=console-producer] Set SASL client state to INITIAL (org.apache.kafka.common.security.authenticator.SaslClientAuthenticator)
[2024-12-13 13:08:08,076] DEBUG Setting SASL/OAUTHBEARER client state to RECEIVE_SERVER_FIRST_MESSAGE (org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslClient)
[2024-12-13 13:08:08,084] DEBUG [Producer clientId=console-producer] Set SASL client state to INTERMEDIATE (org.apache.kafka.common.security.authenticator.SaslClientAuthenticator)
[2024-12-13 13:08:08,085] TRACE [Producer clientId=console-producer] Found least loaded connecting node :9093 (id: -1 rack: null) (org.apache.kafka.clients
.NetworkClient)
[2024-12-13 13:08:08,086] TRACE For telemetry state SUBSCRIPTION_NEEDED, returning the value 0 ms; (org.apache.kafka.common.telemetry.internals.ClientTelemetryReporter)
[2024-12-13 13:08:08,086] TRACE [Producer clientId=console-producer] Found least loaded connecting node :9093 (id: -1 rack: null) (org.apache.kafka.clients
.NetworkClient)
[2024-12-13 13:08:08,091] DEBUG Sending %%x01 response to server after receiving an error: {"status":"invalid_token"} (org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslClient)
In the server log file, I see the following lines:
DEBUG Set SASL server state to HANDSHAKE_OR_VERSIONS_REQUEST during authentication (org.apache.kafka.common.security.authenticator.SaslServerAuthenticator)
[2024-12-13 12:28:42,091] DEBUG Handling Kafka request API_VERSIONS during authentication (org.apache.kafka.common.security.authenticator.SaslServerAuthenticator)
[2024-12-13 12:28:42,093] DEBUG Set SASL server state to HANDSHAKE_REQUEST during authentication (org.apache.kafka.common.security.authenticator.SaslServerAuthenticator)
[2024-12-13 12:28:42,101] DEBUG Handling Kafka request SASL_HANDSHAKE during authentication (org.apache.kafka.common.security.authenticator.SaslServerAuthenticator)
[2024-12-13 12:28:42,138] DEBUG Using SASL mechanism 'OAUTHBEARER' provided by client (org.apache.kafka.common.security.authenticator.SaslServerAuthenticator)
[2024-12-13 12:28:42,142] DEBUG Set SASL server state to AUTHENTICATE during authentication (org.apache.kafka.common.security.authenticator.SaslServerAuthenticator)
[2024-12-13 12:28:42,168] DEBUG {"status":"invalid_token"} (org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslServer)
[2024-12-13 12:28:42,182] DEBUG Received %x01 response from client after it received our error (org.apache.kafka.common.security.oauthbearer.internals.OAuthBearerSaslServer)
[2024-12-13 12:28:42,191] DEBUG Set SASL server state to FAILED during authentication (org.apache.kafka.common.security.authenticator.SaslServerAuthenticator
I searched online for clues and verified the following:
- The problem is caused by the following file: https://github.com/a0x8o/kafka/blob/master/clients/src/main/java/org/apache/kafka/common/security/oauthbearer/OAuthBearerValidatorCallbackHandler.java in the function handleValidatorCallback(OAuthBearerValidatorCallback callback)
For some reason, here, it is trying to validate the token and considers it invalid.
OAuthBearerToken token;
try {
token = **accessTokenValidator.validate(callback.tokenValue());**
callback.token(token);
} catch (ValidateException e) {
log.warn(e.getMessage(), e);
callback.error("invalid_token", null, null);
}
I verified that the JWKS URL configured in the server properties is accessible. Ran it via curl and postman. (I however do not see anything in the server.log file to indicate that the kafka broker contacted the JWKS URL)
I even verified that the sub value in the access token is right. I see the log in producer console: DEBUG getClaim - sub
The iat and exp values in the access token are also appropriate. I even synced up the clocks on kafka broker, producer and the oauth2 server (all now use UTC)
I verified that the kid in the access token matches the kid in the JWKS JSON.
Any suggestions on what the issue could be? I have already enabled logging level to DEBUG on producer and server.
Thanks in advance!
Iyer
Top comments (0)