Two built-in security features of Apache Kafka are user access control and data encryption. While a production Kafka cluster normally provides both of these features, they are not necessarily required in development, test, or experimental environments. In fact, some production environments don’t need these features, such as when the cluster is behind a firewall. The cluster implemenation environment and other considerations play a role when deciding which security features need to be implemented in a Kafka cluster. In this how-to, you learn the ways user authentication and authorization can be implemented. If data encryption is also required, it can be configured on top of the user access control configurations explained here.

This article is intended for those who have a basic understanding of Apache Kafka concepts, know how to set up a Kafka cluster, and work with its basic tools.

Note: This article is up-to-date with Apache Kafka Version 1.0.0. For older versions, refer to this article here.

Terminology

Kafka provides authentication and authorization using Kafka Access Control Lists (ACLs) and through several interfaces (command line, API, etc.) Each Kafka ACL is a statement in this format:

Principal P is [Allowed/Denied] Operation O From Host H On Resource R.

In this statement,

  • Principal is a Kafka user.
  • Operation is one of Read, Write, Create, Describe, Alter, Delete, DescribeConfigs, AlterConfigs, ClusterAction, IdempotentWrite, All.
  • Host is a network address (IP) from which a Kafka client connects to the broker.
  • Resource is one of these Kafka resources: Topic, Group, Cluster, TransactionalId.

Not all operations apply to every resource. The current list of operations per resource are in the table below. All associations can be found in the source code:

Resource Operations
Topic Read, Write, Describe, Delete, DescribeConfigs, AlterConfigs, All
Group Read, Describe, All
Cluster Create, ClusterAction, DescribeConfigs, AlterConfigs, IdempotentWrite, Alter, Describe, All
TransactionalId Describe, Write, All

Background

There are hands-on articles that talk about how to use Kafka ACLs (e.g. Apache Kafka Security 101), but they also bring encryption into the mix. This article shows how ACLs can be set up without having to worry about encryption. The SASL_PLAINTEXT protocol: SASL authentication over plain text channel is used. Once SASL authentication is established between client and server, the session has the client’s principal as the authenticated user. There is no wire encryption in this case, as all the channel communication is over plain text.

Kafka manages and enforces ACLs through an authorizer. An authorizer implements a specific interface, and is pluggable. Kafka provides a default authorizer implementation (SimpleAclAuthorize) that stores ACLs in ZooKeeper. The authorizer class name is provided via the broker configuration authorizer.class.name. If no such configuration exists, then everyone is authenticated and authorized to access any resource.

The typical workflow around Kafka authorization is depicted below. At startup the Kafka broker initiates an ACL load. The populated ACL cache is maintained and used for authentication and authorization purposes whenever an API request comes through.

Figure 1. Kafka Authorization Workflow

In order to implement user authentication and implementation in a Kafka cluster, both brokers and clients need to be properly configured. Brokers need to know valid credentials, and clients need to provide valid credentials in order to properly execute the underlying commands. The following sections describe these configurations using examples.

Broker-Side configuration

To enable authentication and authorization on the broker side, you need to perform two steps: configure valid credentials for each broker, and configure the broker with the proper security protocol and access authorizer implementation.

Setup broker-side credentials

Configure the broker with its user credentials and authorize the client’s user credentials. These credentials along with the login module specification, are stored in a JAAS login configuration file. This is the JAAS file used to run the use cases in this article:

KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="admin"
    password="admin"
    user_admin="admin"
    user_alice="alice"
    user_bob="bob"
    user_charlie="charlie";
};

This example defines the following for the KafkaServer entity:

  • the custom login module that is used for user authentication,
  • admin/admin is the username and password for inter-broker communication (i.e. the credentials the broker uses to connect to other brokers in the cluster),
  • admin/admin, alice/alice, bob/bob, and charlie/charlie as client user credentials. Note that the valid username and password is provided in this format: user_username="password". If the line user_admin="admin" is removed from this file, the broker is not able to authenticate and authorize an admin user. Only the admin user can to connect to other brokers in this case.

Pass in this file as a JVM configuration option when running the broker, using -Djava.security.auth.login.config=[path_to_jaas_file]. [path_to_jaas_file] can be something like: config/jaas-kafka-server.conf. One way to enforce this JVM configuration is to make a copy of the script kafka-server-start.sh (in Kafka’s bin folder) and change the last line from

exec $base_dir/kafka-run-class.sh $EXTRA_ARGS kafka.Kafka "$@"

to

exec $base_dir/kafka-run-class.sh $EXTRA_ARGS -Djava.security.auth.login.config=$base_dir/../config/jaas-kafka-server.conf kafka.Kafka "$@"

This modified broker start script is called sasl-kafka-server-start.sh. For other methods of providing the JAAS login configuration file, refer to this answer.

Configure broker-side protocol

Define the accepted protocol and the ACL authorizer used by the broker by adding the following configuration to the broker properties file (server.properties):

authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
listeners=SASL_PLAINTEXT://:9092
security.inter.broker.protocol= SASL_PLAINTEXT
sasl.mechanism.inter.broker.protocol=PLAIN
sasl.enabled.mechanisms=PLAIN

The other configuration that can be added is for Kafka super users: users with full access to all APIs. This configuration reduces the overhead of defining per-API ACLs for the user who is meant to have full API access. From our list of users, let’s make admin a super user with the following configuration:

super.users=User:admin

This modified properties file is named sasl-server.properties.

When the broker runs with the this security configuration (bin/sasl-kafka-server-start.sh config/sasl-server.properties), only authenticated and authorized clients are able to connect to and use it. Note: Currently, there are exceptions to this statement. Topic and ACL related activities (i.e. CreateTopics, DescribeTopics, AlterTopics, DeleteTopics, CreateAcls, DescribeAcls, DeleteAcls) handled directly through ZooKeeper do not honor ACLs. To secure these APIs, other means can be put in place (e.g., network firewalls) to ensure anonymous users cannot make changes to Kafka topics or Kafka ACLs. The full API ACL support will be implemented in a future Kafka release.

Once you complete steps 1 and 2, the Kafka brokers are prepared to authenticate and authorize clients. In the next section, you’ll learn how to enable Kafka clients for authentication.

Client-Side configuration

In the previous section, you defined a set of user credentials that are authenticated by the Kafka broker. In this section, you’ll learn how Kafka’s command line tools can be authenticated against the secured broker via a simple use case. The use case involves users alice, bob, and charlie where:

  • alice produces to topic test.
  • bob consumes from topic test in consumer group bob-group.
  • charlie queries the group bob-group to retrieve the group offsets.

So far, the broker is configured for authenticated access. Running a Kafka console producer or consumer not configured for authenticated and authorized access fails with messages like the following (assuming auto.create.topics.enable is true):

$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
[2017-10-24 15:44:18,530] WARN [Producer clientId=console-producer] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
...

or

$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test
[2017-10-24 15:44:56,426] WARN [Consumer clientId=consumer-1, groupId=console-consumer-7511] Bootstrap broker localhost:9092 (id: -1 rack: null) disconnected (org.apache.kafka.clients.NetworkClient)
...

Kafka clients (producer, consumer, etc.) are configured to authenticate and authorize with a Kafka broker using two steps: provide valid credentials and specify the security protocol.

Setup client-side credentials

Specify a JAAS login configuration file to authenticate the credentials. The content of the JAAS file for user alice (e.g. jaas-kafka-client-alice.conf) looks like this:

KafkaClient {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  username="alice"
  password="alice";
};

These credentials can also be provided via a JVM configuration option. For example, Alice can use a copy of the console clients by feeding her JAAS file to the client command. In this case, the last line of Alice’s console producer (sasl-kafka-console-producer-alice.sh) is modified from the original script to this:

exec $(dirname $0)/kafka-run-class.sh -Djava.security.auth.login.config=$(dirname $0)/../config/jaas-kafka-client-alice.conf kafka.tools.ConsoleProducer "$@"

Configure client-side credentials

Specify the broker protocol to use on the client side. The configuration:

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN

is placed inside the corresponding configuration file (sasl-producer-alice.properties) provided to the particular client. For example,

bin/sasl-kafka-console-producer-alice.sh --broker-list localhost:9092 --topic test --producer.config config/sasl-producer-alice.properties
bin/sasl-kafka-console-consumer-bob.sh --bootstrap-server localhost:9092 --topic test --group bob-group --consumer.config config/sasl-consumer-bob.properties
bin/sasl-kafka-consumer-groups-charlie.sh --bootstrap-server localhost:9092 --describe --group bob-group --command-config config/sasl-consumergroup-charlie.properties

Note that the files config/sasl-consumer-bob.properties and config/sasl-consumergroup-charlie.properties have the same two lines as above.

If you run these commands with the configuration so far, you’ll notice they don’t work as expected. For example,

$ bin/sasl-kafka-console-producer-alice.sh --broker-list localhost:9092 --topic test --producer.config config/sasl-producer-alice.properties
>message1
[2017-10-24 16:20:52,259] WARN [Producer clientId=console-producer] Error while fetching metadata with correlation id 1 : {test=TOPIC_AUTHORIZATION_FAILED} (org.apache.kafka.clients.NetworkClient)
[2017-10-24 16:20:52,260] ERROR Error when sending message to topic test with key: null, value: 1 bytes with error: (org.apache.kafka.clients.producer.internals.ErrorLoggingCallback)
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [test]
>

or

$ bin/sasl-kafka-console-consumer-bob.sh --bootstrap-server localhost:9092 --topic test --group bob-group --consumer.config config/sasl-consumer-bob.properties
[2017-10-24 16:27:01,431] WARN [Consumer clientId=consumer-1, groupId=bob-group] Error while fetching metadata with correlation id 2 : {test=TOPIC_AUTHORIZATION_FAILED} (org.apache.kafka.clients.NetworkClient)
[2017-10-24 16:27:01,435] ERROR Unknown error when running consumer:  (kafka.tools.ConsoleConsumer$)
org.apache.kafka.common.errors.TopicAuthorizationException: Not authorized to access topics: [test]

or

$ bin/sasl-kafka-consumer-groups-charlie.sh --bootstrap-server localhost:9092 --describe --group bob-group --command-config config/sasl-consumergroup-charlie.properties
Note: This will not show information about old Zookeeper-based consumers.

Error: Executing consumer group command failed due to Not authorized to access group: Group authorization failed.

The security configuration still does not give specific permissions to our Kafka users (except for admin who is a super user). These permissions are defined using the ACL command (bin/kafka-acls.sh). To verify existing ACLs run:

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --list

This returns no ACL definitions. You have handled authentication, but have not provided any authorization rules to define the users able run specific APIs and access certain Kafka resources. This is covered in the next section.

ACL definitions

To allow a specific user to perform a certain Kafka function, or access a certain Kafka resource, create a rule that lets a user and only that user do so. The rule must be very specific to ensure a user can’t access any unintended Kafka functionality or resource. The generic ACL statement at the start of this article achieves this purpose. In this section, you use this generic statement to provide the necessary access to users alice, bob, and charlie. It allows each user to perform a portion of the earlier use case (i.e. alice produces to topic test, bob consumes from topic test in the consumer group bob-group, and charlie queries the consumer group bob-group). A listing of APIs and their required permission and resource is provided in the table below. Refer to this table to determine what permissions need to be granted to which users for the use case to run smoothly.

API Key API Required Operation Required Resource Comments
0 Produce Write Topic
0 Produce Write TransactionalId Transactional producer only
0 Produce IdempotentWrite Cluster Idempotent producer only
1 Fetch Read Topic
2 Offsets Describe Topic
3 Metadata Describe Topic
3 Metadata Create Cluster Only if auto.create.topics.enable is true
4 LeaderAndIsr ClusterAction Cluster
5 StopReplica ClusterAction Cluster
6 UpdateMetadata ClusterAction Cluster
7 ControlledShutdown ClusterAction Cluster
8 OffsetCommit Read Group
8 OffsetCommit Read Topic
9 OffsetFetch Describe Group
9 OffsetFetch Describe Topic
10 FindCoordinator Describe Group Group coordinator requests only
10 FindCoordinator Describe TransactionalId Transaction coordinator requests only
11 JoinGroup Read Group
12 Heartbeat Read Group
13 LeaveGroup Read Group
14 SyncGroup Read Group
15 DescribeGroups Describe Group
16 ListGroups Describe Cluster
17 SaslHandshake
18 ApiVersions
19 CreateTopics Create Cluster
20 DeleteTopics Delete Topic
21 DeleteRecords Delete Topic
22 InitProducerId Write TransactionalId Transactional producer only
22 InitProducerId IdempotentWrite Cluter Idempotent producer only
23 OffsetForLeaderEpoch ClusterAction Cluster
24 AddPartitionsToTxn Write TransactionalId
24 AddPartitionsToTxn Write Topic Internal topics only
25 AddOffsetsToTxn Write TransactionalId
25 AddOffsetsToTxn Read Group
26 EndTxnWrite TransactionalId
27 WriteTxnMarkers ClusterAction Cluster
28 TxnOffsetCommit Write TransactionalId
28 TxnOffsetCommit Read Group
28 TxnOffsetCommit Read Topic
29 DescribeAcls Describe Cluster
30 CreateAcls Alter Cluster
31 DeleteAcls Alter Cluster
32 DescribeConfigs DescribeConfigs Cluster Only for broker resource type
32 DescribeConfigs DescribeConfigs Topic Only for topic resource type
33 AlterConfigs AlterConfigs Cluster Only for broker resource type
33 AlterConfigs AlterConfigs Topic Only for topic resource type
34 AlterReplicaLogDirs Alter Cluster
35 DescribeLogDirs Describe Cluster
36 SaslAuthenticate
37 CreatePartitions Alter Topic

Producing to topics

Assume at this point a topic test with 3 partitions is created:

bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic test --partitions 3 --replication-factor 1

Start with user alice. Alice needs to be able to produce to topic test using the Produce API. For this exercise, users can connect to the broker from any host. If necessary, host restrictions can also be embedded into the Kafka ACLs discussed in this section. For this use case, the corresponding Kafka ACL is:

Principal alice is Allowed Operation Write From Host * On Topic test.

or

bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:alice --operation Write --topic test

The expected output is:

Adding ACLs for resource `Topic:test`:
    User:alice has Allow permission for operations: Write from hosts: *

Current ACLs for resource `Topic:test`:
    User:alice has Allow permission for operations: Write from hosts: *

As a result of granting her this permission, Alice can now produce messages to topic test:

$ bin/sasl-kafka-console-producer-alice.sh --broker-list localhost:9092 --topic test --producer.config config/client-sasl.properties
>message1
>message2
>message3
...

Consuming from Topics

Next, you need to let user bob consume (or fetch) from topic test using the Fetch API, as a member of the bob-group consumer group. Bob’s ACL for fetching from topic test is:

Principal bob is Allowed Operation Read From Host * On Topic test.

or

$ bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:bob --operation Read --topic test
Adding ACLs for resource `Topic:test`:
    User:bob has Allow permission for operations: Read from hosts: *

Current ACLs for resource `Topic:test`:
    User:alice has Allow permission for operations: Write from hosts: *
    User:bob has Allow permission for operations: Read from hosts: *

Bob needs a second ACL for committing offsets to group bob-group (using the OffsetCommit API):

Principal bob is Allowed Operation Read From Host * On Group bob-group.

or

$ bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:bob --operation Read --group bob-group
Adding ACLs for resource `Group:bob-group`:
    User:bob has Allow permission for operations: Read from hosts: *

Current ACLs for resource `Group:bob-group`:
    User:bob has Allow permission for operations: Read from hosts: *

By granting these permissions to Bob, he can now consume messages from topic test as a member of bob-group.

$ bin/sasl-kafka-console-consumer-bob.sh --bootstrap-server localhost:9092 --topic test --group bob-group --consumer.config config/sasl-consumer-bob.properties --from-beginning
message3
message1
message2
...

Describing consumer groups

Lastly, user charlie needs permission to retrieve committed offsets from group bob-group (using the OffsetFetch API). According to the table above, Charlie’s first ACL for fetching offsets from this consumer group is:

Principal charlie is Allowed Operation Describe From Host * On Group bob-group.

or

$ bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:charlie --operation Describe --group bob-group
Adding ACLs for resource `Group:bob-group`:
    User:charlie has Allow permission for operations: Describe from hosts: *

Current ACLs for resource `Group:bob-group`:
    User:bob has Allow permission for operations: Read from hosts: *
    User:charlie has Allow permission for operations: Describe from hosts: *

This permission alone is not enough to meet the use case. If Charlie runs the consumer group command, he isn’t able to see any rows in the output. Charlie needs to read (fetch) offsets of topics in the consumer group. To do this, he must have Describe access to all topics in the group. According to the table above, this second ACL is:

Principal charlie is Allowed Operation Describe From Host * On Topic test.

or

$ bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --add --allow-principal User:charlie --operation Describe --topic test
Adding ACLs for resource `Topic:test`:
    User:charlie has Allow permission for operations: Describe from hosts: *

Current ACLs for resource `Topic:test`:
    User:alice has Allow permission for operations: Write from hosts: *
    User:bob has Allow permission for operations: Read from hosts: *
    User:charlie has Allow permission for operations: Describe from hosts: *

Now Charlie is able to get the proper listing of offsets in the group:

$ bin/sasl-kafka-consumer-groups-charlie.sh --bootstrap-server localhost:9092 --describe --group bob-group --command-config config/sasl-consumergroup-charlie.properties
Note: This will not show information about old Zookeeper-based consumers.

Consumer group 'bob-group' has no active members.

TOPIC          PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG    CONSUMER-ID          HOST           CLIENT-ID
test           1          1               1               0      -                    -              -
test           0          1               1               0      -                    -              -
test           2          1               1               0      -                    -              -

That does it. The above ACLs grants enough permissions for this use case to run. To summarize, ACLs are

$ bin/kafka-acls.sh --authorizer kafka.security.auth.SimpleAclAuthorizer --authorizer-properties zookeeper.connect=localhost:2181 --list
Current ACLs for resource `Topic:test`:
    User:alice has Allow permission for operations: Write from hosts: *
    User:bob has Allow permission for operations: Read from hosts: *
    User:charlie has Allow permission for operations: Describe from hosts: *

Current ACLs for resource `Group:bob-group`:
    User:bob has Allow permission for operations: Read from hosts: *
    User:charlie has Allow permission for operations: Describe from hosts: *

Summary

Kafka provides the means to enforce user authentication and authorization to access its various resources and operations. It does so using its ACLs and pluggable authorizer entities. This article includes a walkthrough of how to set up this authentication and authorization in a Kafka cluster.