Note: For Kafka 1.0.0 and a more recent version of this tutorial please refer to this article.

Apache Kafka comes with Kafka ACLs, a mechanism for defining users and allowing/disallowing access of those users to its various APIs. A Kafka ACL has the following format:

Principal P is [Allowed/Denied] Operation O From Host H On Resource R.

In this definition,

  • Principal is a Kafka user.
  • Operation is a one of Read, Write, Describe, Delete, DescribeConfigs, AlterConfigs, Create, ClusterAction, DescribeConfigs, AlterConfigs, IdempotentWrite, All.
  • Host is a network address (IP) from which a Kafka client connects to the broker.
  • Resource is one of these Kafka resources: Topic, Group, Cluster, TransactionalId.

Not all operations apply to each Kafka resource. You can directly look at the source code to see what operation is valid for which resource.

There are some hands-on articles that talk about how to use Kafka ACLs (e.g. Apache Kafka Security 101), but they also bring encryption to the mix. However, there is no comprehensive article that shows how ACLs can be set up without having to worry about encryption. This article attempts to cover that gap. We use the SASL_PLAINTEXT protocol: SASL authentication over plaintext channel. Once the SASL authentication establishes between client and server, the session will have client’s principal as authenticated user. There is no wire encryption in this case as all the channel communication will be over plain text.

Authentication/Authorization Setup

We describe how the broker and the clients need to be set up properly for authenticated and authorized access.

Broker Setup

To run the secure broker two steps need to be performed.

First, we need to let the broker know authorized users credentials. This will be stored in a JAAS file. The following is an example of what goes in this file. Here, we describe the username and password under which the broker runs (admin/admin). We also define three users admin, alice, bob, charlie, where their passwords is the same as their username.

KafkaServer {
   org.apache.kafka.common.security.plain.PlainLoginModule required
   username="admin"
   password="admin"
   user_admin="admin"
   user_alice="alice"
   user_bob="bob"
   user_charlie="charlie";
};

This file needs to be passed in as a JVM config option when running the broker, using -Djava.security.auth.login.config=[path_to_jaas_file]. [path_to_jaas_file] can be something like this: config/kafka_server_jaas.conf. One way to enforce this JVM config is to make a copy of the script kafka-server-start.sh and change the last line from

exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"

to

exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=$base_dir/../config/kafka_server_jaas.conf kafka.Kafka "$@"

.
We call this modified broker start script sasl-kafka-server-start.sh.

Second, the following needs to be added to the broker properties file (e.g. server.properties) the defines the accepted protocol and also the ACL authorizer used by the broker:

authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
listeners=SASL_PLAINTEXT://:9092
security.inter.broker.protocol= SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN

The other thing that can be added here is specifying Kafka super users, that have full access to all APIs. This config reduces the overhead defining per-API ACLs for the user who is meant to have full access. From our list of users we would like to make admin a super user using the following config:

super.users=User:admin

We call this modified properties file server-sasl.properties.

When the broker runs with the above security config (e.g. bin/sasl-kafka-server-start.sh config/server-sasl.properties) only authenticated and authorized clients would be able to connect to and use it. Note: Currently, there are exceptions to this statement. Topic related activities (i.e. AlterTopics, CreateTopics, DeleteTopics, DescribeAcls, CreateAcls, DeleteAcls) that are handled directly through ZooKeeper do not honor ACLs. To secure these APIs other means can be put in place (e.g. network firewalls) to make sure anonymous users cannot make changes to Kafka topics, or Kafka ACLs. The full ACL support of these APIs will be implemented in a future Kafka release.

Client Setup

The use case we want to implement in this article is

  • alice produces to topic test.
  • bob consumes from topic test in consumer-group bob-group.
  • charlie queries the group bob-group to retrieve the group offsets.

So far we have only set up the broker for authenticated access. If we run a Kafka console producer or consumer, that are not ACL configured, like this

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test

or

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test

they fail to producer/consumer and a message similar to this is reported:

[2017-05-31 10:51:55,195] WARN Bootstrap broker localhost:9092 disconnected (org.apache.kafka.clients.NetworkClient)

Kafka clients (producer, consumer, …) are set up to authenticate and authorize themselves with a Kafka broker by following the following two steps.
First, and to authenticate, their credentials need to be specified in a JAAS file. The content of the JAAS file for user alice (e.g. kafka_client_jaas_alice.conf) would look like this:

KafkaClient {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="alice"
  password="alice";
};

These credentials are also provided via a JVM config option. For example, alice could use a copy of the console clients for herself, in which her JAAS file is fed to the client command. As an example, alice‘s console producer (sasl-kafka-console-producer-alice.sh) has its last line modified from the original script to this:

exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=$(dirname $0)/../config/kafka_client_jaas_alice.conf kafka.tools.ConsoleProducer "$@"

Second, we need to specify to the broker the protocol we are using on the client side. This config in our case would look like this

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN

and is placed inside the config file (e.g. client-sasl.properties) that is provided to the particular client. For example,

bin/sasl-kafka-console-producer-alice.sh --broker-list localhost:9092 --topic test --producer.config config/client-sasl.properties
bin/sasl-kafka-console-consumer-bob.sh --bootstrap-server localhost:9092 --topic test --consumer.config config/consumer-bob.properties
bin/sasl-kafka-consumer-groups-charlie.sh --bootstrap-server localhost:9092 --describe --group bob-group --command-config config/client-sasl.properties

Note that the file config/consumer-bob.properties has the same two lines above plus an entry for specifying the group id; e.g.

group.id=bob-group

If we run one of these commands we notice that they do not work as expected. For example,

$ bin/sasl-kafka-console-producer-alice.sh --broker-list localhost:9092 --topic test --producer.config config/client-sasl.properties
>message1
[2017-05-31 16:09:52,001] WARN Error while fetching metadata with correlation id 1 : {test=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient)
[2017-05-31 16:09:52,114] WARN Error while fetching metadata with correlation id 3 : {test=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient)
[2017-05-31 16:09:52,218] WARN Error while fetching metadata with correlation id 4 : {test=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient)
...

or

$ bin/sasl-kafka-console-consumer-bob.sh --bootstrap-server localhost:9092 --topic test --consumer.config config/consumer-bob.properties
[2017-05-31 16:19:02,299] WARN Error while fetching metadata with correlation id 2 : {test=UNKNOWN_TOPIC_OR_PARTITION} (org.apache.kafka.clients.NetworkClient)
[2017-05-31 16:19:02,309] ERROR Unknown error when running consumer:  (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.GroupAuthorizationException: Not authorized to access group: bob-group

or

$ bin/sasl-kafka-consumer-groups-charlie.sh --bootstrap-server localhost:9092 --describe --group bob-group --command-config config/client-sasl.properties
Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).
Error: Executing consumer group command failed due to Not authorized to access group: Group authorization failed.

The reason is with all the security configuration we have so far put in place we still have not given specific permissions to our Kafka users (except for admin who is a super user). These permissions are defined using the ACL command (bin/kafka-acls.sh). To verify existing ACLs we run

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --list

which should return no ACL definitions.

ACL Definitions

We now add necessary ACLs to our users so they can perform the use case mentioned above (i.e. alice producing to topic, bob consuming from topic, charlie describing the corresponding consumer group). A listing of APIs and their required permission and resource, that can be found here, would help us in this section in determining what permissions need to be granted to which users for the use case to run smoothly.

We start with alice. She needs to be able to produce to topic test. For this exercise we do not limit hosts from which users can connect to the broker. If necessary, host restrictions can also be embedded into the Kafka ACLs discussed in this section. Based on the listing mentioned above she needs the corresponding Kafka ACL would be:
Principal alice is Allowed Operation Write From Host * On Topic test., or

$ bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:alice --operation Write --topic test

As a result of granting her this permission, she can now produce messages to topic test:

$ bin/sasl-kafka-console-producer-alice.sh --broker-list localhost:9092 --topic test --producer.config config/client-sasl.properties
message1
message2
message3
...

Next, we need to let bob consume (or fetch) from topic test as a member of bob-group consumer group. Therefore, his ACL for fetching from topic test would be:
Principal bob is Allowed Operation Read From Host * On Topic test., or

$ bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:bob --operation Read --topic test

He would need a second ACL for committing offsets to group bob-group:
Principal bob is Allowed Operation Read From Host * On Group bob-group., or

$ bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:bob --operation Read --group bob-group

As a result of granting these permissions to bob, he can now consume messages from topic test as a member of bob-group.

$ bin/sasl-kafka-console-consumer-bob.sh --bootstrap-server localhost:9092 --topic test --consumer.config config/consumer-bob.properties --from-beginning
message1
message2
message3
...

Finally, we would want to give charlie necessary permissions so he could retrieve committed offsets in group bob-group. His ACL for fetching offsets from this consumer group would be:
Principal charlie is Allowed Operation Read From Host * On Group bob-group., or

$ bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:charlie --operation Read --group bob-group

However, this permission alone would not be enough. If charlie runs the consumer group command, he would not be able to see any row in the output. This is because he needs to read (fetch) offsets of topics in the consumer group. Therefore, he must have Describe access to all topics in the group. This ACL would be:
Principal charlie is Allowed Operation Describe From Host * On Topic test., or

$ bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:charlie --operation Describe --topic test

Now he should be able to get the proper listing of offsets in the group:

$ bin/sasl-kafka-consumer-groups-charlie.sh --bootstrap-server localhost:9092 --group bob-group --describe --command-config config/client-sasl.properties
Note: This will only show information about consumers that use the Java consumer API (non-ZooKeeper-based consumers).

Consumer group 'bob-group' has no active members.

TOPIC                PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG        CONSUMER-ID        HOST            CLIENT-ID
test                 1          1               1               0          -                  -               -
test                 0          2               2               0          -                  -               -
test                 2          2               2               0          -                  -               -

That does it. The above ACLs granted enough permissions for this use case to run. To summarize, they are

$ bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --list
Current ACLs for resource `Topic:test`: 
 	User:bob has Allow permission for operations: Read from hosts: *
	User:charlie has Allow permission for operations: Describe from hosts: *
	User:alice has Allow permission for operations: Write from hosts: *

Current ACLs for resource `Group:bob-group`:
 	User:bob has Allow permission for operations: Read from hosts: *
	User:charlie has Allow permission for operations: Read from hosts: *

15 comments on"Kafka ACLs in Practice – User Authentication and Authorization"

  1. Very good article, helpful and explanatory.

  2. That was a nice article, can anyone help to achieve the same SASL with just SSL encryption

  3. Thank you for your nice way of explanation.
    I have a peculiar requirement where I use both SASL & non-security in listeners.
    listeners=PLAINTEXT://:9092, SASL_PLAINTEXT://:9093

    In this scenario, I would like to secure only few topics(9093 port). For such topics, how can I impose security such that consumers shouldn’t be able to read topic from non-secured port (9092).
    Can you please suggest on this.

  4. I followed the tutorial almost word by word. Server, producer and consumer start properly. However, when I send the message through producer, I get the following error:

    [2018-07-09 18:56:54,557] ERROR Error when sending message to topic test with key: null, value: 1 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
    org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for test-2: 1519 ms has passed since batch creation plus linger time

    Issue does not appear when I revert sasl and acl config.
    I have already tried changing batch size and request timeout in producer config but it doesn’t help. Can anyone please help?

    • Vahid Hashemian July 09, 2018

      What version of Kafka are you using?

      • 0.11.0

        • Vahid Hashemian July 10, 2018

          The article assumes topic ‘test’ already exists. Please create the topic and try again: ‘bin/kafka-topics.sh –zookeeper localhost:2181 –create –topic test –replication-factor 1 –partitions 1’

  5. Hello,
    I tried to use all informations below, but when i launch consumer bob i can’t get the message.
    Besides, i have warning as (Client part is missed in kafka_server_jaas.conf) and i can’t see this on your screen capture.

    Kafka Version: 1.0.0
    Zookeeper: 3.4.10

    Thanks,

    • Vahid Hashemian August 09, 2018

      Note that for version 1.0.0 and later it’s better to use the newer version of this article as referenced at the beginning.
      What are the exact error/warning messages you’re getting (on both client and broker side) when things don’t go as expected?

  6. I’ve one simple question.

    where we create these users called alice,bob and other one?I dnt see anythwhere creating them and providing passwords while connecting to any topic etc?

    Am I missing anything?

    • Vahid Hashemian August 09, 2018

      They are in the file config/kafka_server_jaas.conf, as explained in Broker Setup section above.

  7. I was able to setup this with out any issues?
    Do we need to start the broker every time we add a new user?

    Regards,
    Anil Kumar

    • Vahid Hashemian August 14, 2018

      Yes, a server restart is required when users are added or removed. In a production level Kafka cluster rolling restarts should take care of this without an outage.

Join The Discussion

Your email address will not be published. Required fields are marked *