User authentication and authorization in Apache Kafka – IBM Developer

Build cloud-native applications for regulated workloads with IBM Cloud for Financial Services Learn more

User authentication and authorization in Apache Kafka

Two built-in security features of Apache Kafka are user access control and data encryption. While a production Kafka cluster normally provides both of these features, they are not necessarily required in development, test, or experimental environments. In fact, some production environments don’t need these features, such as when the cluster is behind a firewall. The cluster implementation environment and other considerations play a role when deciding which security features need to be implemented in a Kafka cluster.

In this tutorial, you learn the ways user authentication and authorization can be implemented. If data encryption is also required, it can be configured on top of the user access control configurations explained here, but this is not covered in this tutorial.

This tutorial is intended for those who have a basic understanding of Apache Kafka concepts, know how to set up a Kafka cluster, and work with its basic tools.

Terminology

Kafka provides authentication and authorization using Kafka Access Control Lists (ACLs) and through several interfaces (command line, API, etc.) Each Kafka ACL is a statement in this format:

Principal P is [Allowed/Denied] Operation O From Host H On Resource R.

In this statement,

  • Principal is a Kafka user.
  • Operation is one of Read, Write, Create, Describe, Alter, Delete, DescribeConfigs, AlterConfigs, ClusterAction, IdempotentWrite, All.
  • Host is a network address (IP) from which a Kafka client connects to the broker.
  • Resource is one of these Kafka resources: Topic, Group, Cluster, TransactionalId. These can be matched using wildcards.

Not all operations apply to every resource. The current list of operations per resource are in the table below. All associations can be found in the Kafka documentation:

Resource Operations
Topic Read, Write, Describe, Delete, DescribeConfigs, AlterConfigs, All
Group Read, Describe, All
Cluster Create, ClusterAction, DescribeConfigs, AlterConfigs, IdempotentWrite, Alter, Describe, All
TransactionalId Describe, Write, All

Background

This article aims at demonstrating authentication and authorizations capabilities of Kafka. It is not a guide how to configure Kafka for production environments.

In particular, it does not cover encryption which is usually a requirement in secured environments. We’ll demonstrate how to perform authentication using the SASL_PLAINTEXT security protocol which transmits all data, including passwords, as plain text. SASL_SSL enables encrypting data over the wire and should be used for production use cases. All information included here also apply to environments using SASL_SSL.

Authentication

Kafka uses SASL to perform authentication. It currently supports many mechanisms including PLAIN, SCRAM, OAUTH and GSSAPI and it allows administrator to plug custom implementations. Authentication can be enabled between brokers, between clients and brokers and between brokers and ZooKeeper. It allows restricting access to only parties that have the required secrets.

Authorization

Kafka manages and enforces authorization via ACLs through an authorizer. An authorizer implements a specific interface, and is pluggable. Kafka provides a default authorizer implementation (AclAuthorize) that stores ACLs in ZooKeeper. The authorizer class name is provided via the broker configuration authorizer.class.name. If no such configuration exists, then everyone is authorized to access any resource.

The typical workflow around Kafka authorization is depicted below. At startup the Kafka broker initiates an ACL load. The populated ACL cache is maintained and used for authorization purposes whenever an API request comes through.

Figure 1. Kafka Authorization Workflow

Setting it up

In order to enable authentication and authorizations of clients in a Kafka cluster, both brokers and clients need to be properly configured. Brokers need to know valid credentials, and clients need to provide valid credentials in order to properly execute the underlying commands. The following sections describe these configurations using examples.

Broker-side configuration

To enable authentication and authorization on the broker side, you need to perform two steps on each broker:

  • Configure valid credentials
  • Configure the proper security protocol and authorizer implementation

Set up broker-side credentials

Configure the broker with its user credentials and authorize the client’s user credentials. These credentials along with the login module specification, are stored in a JAAS login configuration file.

This is the JAAS file used to run the use cases in this article:

KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin"
    user_admin="admin"
    user_alice="alice"
    user_bob="bob"
    user_charlie="charlie";
};

This example defines the following for the KafkaServer entity:

  • The custom login module that is used for user authentication,
  • admin/admin is the username and password for inter-broker communication (i.e. the credentials the broker uses to connect to other brokers in the cluster),
  • admin/admin, alice/alice, bob/bob, and charlie/charlie as client user credentials. Note that the valid username and password is provided in this format: user_username="password". If the line user_admin="admin" is removed from this file, the broker is not able to authenticate and authorize an admin user. Only the admin user can to connect to other brokers in this case.

Pass in this file as a JVM configuration option when running the broker, using -Djava.security.auth.login.config=[path_to_jaas_file]. [path_to_jaas_file] can be something like: config/jaas-kafka-server.conf. This can be done by setting the KAFKA_OPTS environment variable, for example:

export KAFKA_OPTS="-Djava.security.auth.login.config=<path-to-jaas-file>/jaas-kafka-server.conf"

The default login module for the PLAIN mechanism should not be used in production environments as it requires storing all credentials in a JAAS file that is stored in plain text. In real environment, you should provide your own authentication callbacks via sasl.server.callback.handler.class.

Configure broker-side settings

Define the accepted protocol and the ACL authorizer used by the broker by adding the following configuration to the broker properties file (server.properties):

authorizer.class.name=kafka.security.authorizer.AclAuthorizer
listeners=SASL_PLAINTEXT://:9092
security.inter.broker.protocol= SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN

The other configuration that can be added is for Kafka super users: users with full access to all APIs. This configuration reduces the overhead of defining per-API ACLs for the user who is meant to have full API access. From our list of users, let’s make admin a super user with the following configuration:

super.users=User:admin

This modified properties file is named sasl-server.properties.

When the broker runs with this security configuration (bin/kafka-server-start.sh config/sasl-server.properties), only authenticated and authorized clients are able to connect to and use it.

Once you complete steps 1 and 2, the Kafka brokers are prepared to authenticate and authorize clients. In the next section, you’ll learn how to enable Kafka clients for authentication.

Client-side configuration

In the previous section, you defined a set of user credentials that are authenticated by the Kafka broker. In this section, you’ll learn how Kafka’s command line tools can be authenticated against the secured broker via a simple use case. The use case involves users alice, bob, and charlie where:

  • alice creates and produces to topic test.
  • bob consumes from topic test in consumer group bob-group.
  • charlie queries the group bob-group to retrieve the group offsets.

So far, the broker is configured for authenticated access. Running a Kafka console producer or consumer not configured for authenticated and authorized access fails with messages like the following (assuming auto.create.topics.enable is true):

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
[2017-10-24 15:44:56,426] WARN [Consumer clientId=consumer-1, groupId=console-consumer-7511] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
...

Let’s now look at the required configurations on the client side.

Configure client-side settings

Specify the broker protocol as well as the credentials to use on the client side. The following configuration is placed inside the corresponding configuration file (alice.properties) provided to the particular client:

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="alice" password="alice";

For example:

bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test --producer.config config/alice.properties
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --group bob-group --consumer.config config/bob.properties
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group bob-group --command-config config/charlie.properties

Note that the files config/bob.properties, config/charlie.properties, and config/admin.properties have the same configurations as config/alice.properties with their respective credentials.

If you run these commands with the configuration so far, you’ll notice they don’t work as expected. For example:

$ bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test --producer.config config/alice.properties
>message1
[2017-10-24 16:20:52,259] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 1 : {test=TOPIC_AUTHORIZATION_FAILED} (org.apache.kafka.clients.NetworkClient)
[2017-10-24 16:20:52,260] ERROR Error when sending message to topic test with key: null, value: 1 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [test]
>

Or, for example:

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --group bob-group --consumer.config config/bob.properties
[2017-10-24 16:27:01,431] WARN [Consumer clientId=consumer-1, groupId=bob-group] Error while fetching metadata with correlation id 2 : {test=TOPIC_AUTHORIZATION_FAILED} (org.apache.kafka.clients.NetworkClient)
[2017-10-24 16:27:01,435] ERROR Unknown error when running consumer:  (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [test]

Or, for example:

$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group bob-group --command-config config/charlie.properties

Error: Executing consumer group command failed due to Not authorized to access group: Group authorization failed.

The security configuration still does not give specific permissions to our Kafka users (except for admin who is a super user). These permissions are defined using the ACL command (bin/kafka-acls.sh). To verify existing ACLs run:

./bin/kafka-acls.sh --bootstrap-server localhost:9092 --command-config ./config/admin.properties --list

This returns no ACL definitions. Note that only admin can run this command, as only admin has the permissions to list and edit ACLs.

You have handled authentication, but have not provided any authorization rules to define the users able run specific APIs and access certain Kafka resources. This is covered in the next section.

Producing to topics

Start with user alice. Alice needs to be able to create and produce to topic test. For this exercise, users can connect to the broker from any host. If necessary, host restrictions can also be embedded into the Kafka ACLs discussed in this section. For this use case, the corresponding Kafka ACLs are:

Principal alice is Allowed Operation Write From Host * On Topic test.
Principal alice is Allowed Operation Create From Host * On Topic test.

To create them, run:

./bin/kafka-acls.sh --bootstrap-server localhost:9092 --command-config ./config/admin.properties --add --allow-principal User:alice --operation Write --operation Create --topic test

The expected output is:

Adding ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test, patternType=LITERAL)`:
     (principal=User:alice, host=*, operation=WRITE, permissionType=ALLOW)
    (principal=User:alice, host=*, operation=CREATE, permissionType=ALLOW)

Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test, patternType=LITERAL)`:
     (principal=User:alice, host=*, operation=CREATE, permissionType=ALLOW)
    (principal=User:alice, host=*, operation=WRITE, permissionType=ALLOW)

As a result of granting her this permission, Alice can now create the topic test:

$ bin/kafka-topics.sh --bootstrap-server localhost:9092 --command-config ./config/alice.properties --create --topic test --partitions 1 --replication-factor 1

And also produce messages to it:

$ bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic test --producer.config config/alice.properties
>message1
>message2
>message3
...

Consuming from Topics

Next, you need to let user bob consume from topic test, as a member of the bob-group consumer group. Bob’s ACL for fetching from topic test is:

Principal bob is Allowed Operation Read From Host * On Topic test.

To create the ACL, run:

$ bin/kafka-acls.sh --bootstrap-server localhost:9092 --command-config ./config/admin.properties --add --allow-principal User:bob --operation Read --topic test

The output should look like:

Adding ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test, patternType=LITERAL)`:
     (principal=User:bob, host=*, operation=READ, permissionType=ALLOW)

Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test, patternType=LITERAL)`:
     (principal=User:bob, host=*, operation=READ, permissionType=ALLOW)
    (principal=User:alice, host=*, operation=CREATE, permissionType=ALLOW)
    (principal=User:alice, host=*, operation=WRITE, permissionType=ALLOW)

Bob needs a second ACL for committing offsets to group bob-group (using the OffsetCommit API):

Principal bob is Allowed Operation Read From Host * On Group bob-group.

To create the ACL, run:

$ bin/kafka-acls.sh --bootstrap-server localhost:9092 --command-config ./config/admin.properties --add --allow-principal User:bob --operation Read --group bob-group

The output should be:

Adding ACLs for resource `ResourcePattern(resourceType=GROUP, name=bob-group, patternType=LITERAL)`:
     (principal=User:bob, host=*, operation=READ, permissionType=ALLOW)

Current ACLs for resource `ResourcePattern(resourceType=GROUP, name=bob-group, patternType=LITERAL)`:
     (principal=User:bob, host=*, operation=READ, permissionType=ALLOW)

By granting these permissions to Bob, he can now consume messages from topic test as a member of bob-group.

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --group bob-group --consumer.config config/bob.properties --from-beginning
message3
message1
message2
...

Describing consumer groups

Lastly, user charlie needs permission to retrieve committed offsets from group bob-group (using the OffsetFetch API). According to the table above, Charlie’s first ACL for fetching offsets from this consumer group is:

Principal charlie is Allowed Operation Describe From Host * On Group bob-group.

To create the ACL, run:

$ bin/kafka-acls.sh --bootstrap-server localhost:9092 --command-config ./config/admin.properties --add --allow-principal User:charlie --operation Read --group bob-group

The output should be:

Adding ACLs for resource `ResourcePattern(resourceType=GROUP, name=bob-group, patternType=LITERAL)`:
     (principal=User:charlie, host=*, operation=READ, permissionType=ALLOW)

Current ACLs for resource `ResourcePattern(resourceType=GROUP, name=bob-group, patternType=LITERAL)`:
     (principal=User:bob, host=*, operation=READ, permissionType=ALLOW)
    (principal=User:charlie, host=*, operation=READ, permissionType=ALLOW)

This permission alone is not enough to meet the use case. If Charlie runs the consumer group command, he isn’t able to see any rows in the output. Charlie needs to read (fetch) offsets of topics in the consumer group. To do this, he must have Describe access to all topics in the group. According to the table above, this second ACL is:

Principal charlie is Allowed Operation Describe From Host * On Topic test.

To create the ACL, run:

$ bin/kafka-acls.sh --bootstrap-server localhost:9092 --command-config ./config/admin.properties --add --allow-principal User:charlie --operation Describe --topic test

The output should be:

Adding ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test, patternType=LITERAL)`:
     (principal=User:charlie, host=*, operation=DESCRIBE, permissionType=ALLOW)

Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test, patternType=LITERAL)`:
     (principal=User:charlie, host=*, operation=DESCRIBE, permissionType=ALLOW)
    (principal=User:bob, host=*, operation=READ, permissionType=ALLOW)
    (principal=User:alice, host=*, operation=CREATE, permissionType=ALLOW)
    (principal=User:alice, host=*, operation=WRITE, permissionType=ALLOW)

Now Charlie is able to get the proper listing of offsets in the group:

$ bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group bob-group --command-config config/charlie.properties

Consumer group 'bob-group' has no active members.

TOPIC          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG    CONSUMER-ID          HOST           CLIENT-ID
test           1          1               1               0      -                    -              -
test           0          1               1               0      -                    -              -
test           2          1               1               0      -                    -              -

That does it. The above ACLs grants enough permissions for this use case to run. To summarize, ACLs are

$ bin/kafka-acls.sh --bootstrap-server localhost:9092 --command-config ./config/admin.properties --list

The output should be:

Current ACLs for resource `ResourcePattern(resourceType=GROUP, name=bob-group, patternType=LITERAL)`:
     (principal=User:bob, host=*, operation=READ, permissionType=ALLOW)
    (principal=User:charlie, host=*, operation=READ, permissionType=ALLOW)

Current ACLs for resource `ResourcePattern(resourceType=TOPIC, name=test, patternType=LITERAL)`:
     (principal=User:charlie, host=*, operation=DESCRIBE, permissionType=ALLOW)
    (principal=User:bob, host=*, operation=READ, permissionType=ALLOW)
    (principal=User:alice, host=*, operation=CREATE, permissionType=ALLOW)
    (principal=User:alice, host=*, operation=WRITE, permissionType=ALLOW)

Summary

Kafka provides the means to enforce user authentication and authorization to access its various resources and operations. Authentication is provided via SASL with multiple supported mechanisms. Authorization is done using its ACLs and pluggable authorizer entities. This article demonstrated how authentication and authorization can be set up in a Kafka cluster.