Taxonomy Icon

Analytics

Apache Kafka is growing in popularity as a messaging and streaming platform in distributed systems. The addition of Kafka Streams has enabled Kafka to address a wider range of use cases, and support real-time streams in addition of batch-like ETL (Extract, Transform and Load) models.

Even though there are numerous articles and code snippets on Kafka Streams, there seems to be a shortage of end-to-end use cases that help developers better understand all the pieces that need to come together for a functioning data stream. This article intends to narrow that gap by implementing a simple and basic use case: real-time monitoring of website visitors. More advanced use cases then can be built on top of what is described here to address the specific requirements of similar systems that deal with streaming data.

Apache Kafka is a perfect choice when it comes to storing web server access logs:

  • It can store the logs for any desired period of time, and expire them when they are no longer needed
  • It can provide them for processing to multiple consumers while maintaining the consumption progress in each case
  • It can be used subsequently by those processes to store additional information (as this article will demonstrate)

Learning objectives

This article is intended for those who have a basic understanding of Apache Kafka concepts, and know how to set up a Kafka cluster and work with its basic tools.

After completing this how-to, a reader will:

  • Learn about Apache Kafka, Kafka Streams, and KSQL
  • Know how web site access logs can be stored and processed in Kafka, and presented two methods for monitoring

Prerequisites

The use case implemented in this article and the components involved are the following:

  1. A website hosted on Apache HTTP server where certain information about visitors access is captured and stored in Apache Kafka.
  2. An Apache HTTP log parser reads web server access logs from Kafka, parses them, and stores them in JSON formatted information back into Kafka.
  3. A number of Kafka Stream applications read the JSON formatted log and produce streams of number of page visits per country, number of page visits per US state, etc.
  4. As an alternative to Step 3, KSQL DDLs and queries are used to produce similar streams.

Steps to set up each of these components will be explained as this use case is gradually implemented in this article.

Even though in this article the above components will be implemented on a number of Ubuntu virtual machines and coded with IntelliJ IDEA, they can also be implemented on a different setting or operating system with minimal changes.

In this article, the Apache HTTP web server is installed and configured in Web Server VM, the Kafka cluster is installed in Kafka Cluster VM, and code development is done in Dev VM.

Estimated time

Installing and setting up the environments, and implementing the use case should take approximately 2 hours.

Steps

Setup Apache web server and the website (on Web Server VM)

This step is straight-forward. Simply follow these steps on the Web Server VM:

  1. Install Apache web server
sudo apt-get install apache2
  1. Verify it is running
sudo /etc/init.d/apache2 status
  1. Copy your website’s source files into the web server root folder (/var/www/html/). For the purposes of this how-to document download a website template from http://www.free-css.com (example), and unzip it directly under this folder.

  2. Direct a web browser on the VM to localhost and make sure the default Apache2 web page comes up.

Set up Apache Kafka to receive web server logs (on Kafka Cluster VM)

Set up a single-broker Kafka cluster using Kafka version 1.0.0. Follow these steps on the Kafka Cluster VM:

  1. Make sure Java 8 or higher is installed on the VM. To install Java:
sudo add-apt-repository ppa:webupd8team/java
sudo apt-get update
sudo apt-get install oracle-java8-installer
  1. Download Apache Kafka 1.0.0 binary and untar it into ~/kafka folder.

  2. Make a note of the Kafka VM’s public IP. [KAFKA_VM_IP] is used as the placeholder for this IP in the rest of this document.

  3. Update ~/kafka/config/server.properties and replace the line

#advertised.listeners=PLAINTEXT://your.host.name:9092

with

advertised.listeners=PLAINTEXT://[KAFKA_VM_IP]:9092

This will allow cluster to be reached from other hosts, or VMs in this case. If you use a Unix-based environment you may also want to change the default Kafka log directory from /tmp/kafka-logs to preserve Kafka logs through potential server restarts. To do so, change the target of log.dirs, as shown below, to a permanent location outside /tmp/.

log.dirs=/tmp/kafka-logs
  1. Run Kafka
cd ~/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties &
bin/kafka-server-start.sh config/server.properties &
  1. On a different console session, create an access-log topic (here both replication and number of partitions are set to 1, but these number should be chosen carefully in a production environment).
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic access-log --partitions 1 --replication-factor 1

The topic access-log to receive web server access logs is now created and ready to receive messages.

Redirect web server access logs to Kafka (on Web Server VM)

Kafka Producer is used to redirect web server logs to Kafka. Therefore, it needs to be accessible on the Web Server VM to produce messages to the Kafka cluster:

  1. Make sure Java 8 or higher is installed on the VM. To install Java,
sudo add-apt-repository ppa:webupd8team/java
sudo apt-get update
sudo apt-get install oracle-java8-installer
  1. Download Apache Kafka 1.0.0 binary and untar it into ~/kafka. This makes Kafka Producer client tool accessible on this VM for sending access log to the Kafka cluster. The access-log topic is also ready to receive them.

  2. Configure the web server to generate the logs in the desired format (what access log entries are needed to be captured and stored by the web server). A list of candidates can be found here. Update apache2.conf (it is likely in /etc/apache2) and create a new or modify an existing log format. An example is (refer to the above URL to see what each directive means):

LogFormat "%t [%h] [%m] [%U] [%B] [%b] [%D] [%q] [%s] [%{Referer}i] [%{User-Agent}i]" custom_user_tracking

This log format instructs the web server to record information such as access timestamp, visitor’s host name, requested URL path, size of response, … in the access log record.

  1. Redirect web server access logs to Kafka. In order to do this, edit the site config file /etc/apache2/sites-available/000-default.conf and replace the line
CustomLog ${APACHE_LOG_DIR}/access.log combined

with

CustomLog "| [HOME_DIR]/kafka/bin/kafka-console-producer.sh --topic access-log --broker-list [KAFKA_VM_IP]:9092" custom_user_tracking

Replace [HOME_DIR] with the home directory under which the Kafka folder was created, and [KAFKA_VM_IP] with the public IP of the Kafka VM. With this statement, the access logs, that are formatted using the log format defined earlier, will be produced into the access-log topic using a Kafka Producer.

  1. Restart the web server for the config changes to take effect:
sudo /etc/init.d/apache2 restart

With both web server and Kafka running, access logs should now appear in the access-log topic. To verify run

[HOME_DIR]/kafka/bin/kafka-console-consumer.sh --bootstrap-server [KAFKA_VM_IP]:9092 --topic access-log --from-beginning

and hit the web site.

Parse web server logs and break down log entries (on Dev VM)

In this step, each access log in the access-log topic is parsed and broken down and its pieces of interest are stored in other Kafka topics. This is done for a couple of information pieces, but the same approach can be used to extract other information too, if necessary. This step is performed in the Dev VM.

This is the workflow to implement:

  • A Kafka Consumer (or a set of Kafka Consumers) reads access logs from the access-log topic
  • A log parser application parses each access log and breaks it apart and extracts pieces of interest (visitor’s IP address in this case)
  • A Kafka Producer produces those pieces of interest or information based on them into their own Kafka topics (visitor’s geographic location in this case)
  • Kafka Stream applications perform necessary aggregations on these dedicated topics (e.g. real-time page visits per country)

For log parsing purposes, nielsbasjes’ logparser is used. It is open source, and parses NGINX as well Apache HTTPD access logs.

As mentioned earlier, in this article IntelliJ IDEA is the choice for IDE:

  1. Make sure Java 8 or higher is installed on the Dev VM:
sudo add-apt-repository ppa:webupd8team/java
sudo apt-get update
sudo apt-get install oracle-java8-installer
  1. Download and install IntelliJ IDEA (the Community edition will do) by following the instruction at this link. The rest of instructions assumes use of IntelliJ IDEA, but feel free to use IDE of your choice and follow along:

  2. Create a new Java project: WebServerLogAnalysis.

  3. Pre-built logparser library exists on Maven. Add it as a library to the project. In IDEA:

    1. Right-click on the project in Project view (View > Tool Windows > Project) and select Open Module Settings.
    2. In Project Structure window, choose Project Settings > Libraries from the left panel, then click the + sign on the adjacent panel, and select From Maven....
    3. In Download Library from Maven Repository window, enter nl.basjes.parse.httpdlog:httpdlog-parser in the drop down and click on the search button.
    4. From the retrieved list, choose the latest version of httplog-parser. (nl.basjes.parse.httpdlog:httpdlog-parser:4.0 at the time of publishing this article).
    5. Check Download to: so the libraries are downloaded locally, then select OK.
    6. In Choose Modules window, make sure the project module is selected, then press OK, and one more OK on the Project Structure window.
  4. As mentioned earlier, Kafka Consumer and Kafka Producer are used to implement this use case. Therefore, Kafka Libraries also need to be included in the project. Apache Kafka libraries exist on Maven too. Follow similar steps as above, and this time look for org.apache.kafka:kafka_2.12:1.0.0 (the latest Kafka version at the time of publishing this article), and include it in the project. The application will work by

    1. running a Kafka consumer that reads access logs from the access-log topic
    2. passing each consumer access log to the parser and extracting the corresponding visitor’s IP
    3. using the open source freegeoip project’s JSON API to extract the geographic location of each extracted IP
    4. storing the returned JSON-formatted geographic information back in Kafka in geo-location topic
  5. Create the geo-location topic on the Kafka Cluster VM:

bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic geo-location --partitions 1 --replication-factor 1
  1. The JSON interface of the freegeoip API provides the geographic location associated with an IP in JSON format. To parse the output JSON, use Google’s gson library. Download the library from Maven, similar to how other libraries were added, and by looking for com.google.code.gson.

  2. Create a class WebServerLogAnalyzer. In this class, and its helper classes, implement the simple use case above that works like this at a high level:

create a Kafka Consumer subscribed to 'access-log' topic
create a Kafka Producer to send geographic location of web site visitor to 'geo-location' topic
set up a log parser that reads httpd logs of the given format
while (true) {
. read messages from 'access-log' topic
. for each message {
.. parse the message and extract the visitor IP
.. find the location information associated with the IP
.. store the location information in 'geo-location' topic
. }
}

A Java implementation of this step can be found in the provided source code in com.ibm.code.samples.kafka.loganalysis.extract.WebsiteVisitorGeoLocExtractor. You can also run the JAR artifact:

java -jar WebsiteVisitorGeoLocExtractor.jar [KAFKA_VM_IP]:9092

Set up a stream application (on Dev VM)

In this step, Java is used again to create a Kafka Stream application to contain number of page visits by US state. To keep things simple, you can build this project in the same IDEA module. Since Kafka Streams related libraries will be used, they should also be import to the project. Look for org.apache.kafka:kafka-streams:1.0.0 maven repository and add it to the project libraries.

Since this implementation involves serializing and deserializing JSON objects Kafka Connect JSON library should also be imported. Look for org.apache.kafka:connect-json:1.0.0 maven repository and add it to the project libraries.

The goal is to create a Kafka stream to track the number of visits from each US state in real-time. First, create a Kafka topic for this stream:

bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic us-visits-by-state --partitions 1 --replication-factor 1

A nice and descriptive example of creating a Kafka Streams application is provided here, and the application code in this step is based on it. The target stream is created based on the geo-location topic. Record values in geo-location topic are JSON strings. Perform these steps to achieve the page visits by US state stream:

  1. Filter out records that do not associate with a US visitor or those that associate with various web page resources (not the page itself with a .html extension):
geolocJson.get("uri").asText().toLowerCase().endsWith(".html") &&
geolocJson.get("country_code").asText().equals("US");
  1. Map key-values that are of the form (ip, geoloc JSON string) to key-values of the form (ip, US state).

  2. Select the US state as the key for aggregation:

selectKey((ip, state) -> state)
  1. Group records by key (which is the US state): groupByKey()

  2. Do a count aggregation of records by key: count()

These steps are depicted in the following code snippet:

// stream from Kafka
KStream<String, String> geoLocationInput = builder.stream(Constants.GEO_LOCATION_TOPIC);

KStream<String, Long> pageVisitByState = geoLocationInput
        // 1. filter out irrelevant records
        .filter((ip, geolocJsonString) -> {
            ObjectMapper mapper = new ObjectMapper();
            try {
                JsonNode geolocJson = mapper.readTree(geolocJsonString);
                return geolocJson != null && geolocJson.has("uri") &&
                        geolocJson.has("country_code") &&
                        geolocJson.get("uri").asText().toLowerCase().endsWith(".html") &&
                        geolocJson.get("country_code").asText().equals("US") &&
                        !geolocJson.get("region_name").asText().trim().isEmpty();
            } catch (IOException e) {
                e.printStackTrace();
                return false;
            }
        })
        // 2. map key-value pairs
        .map(new KeyValueMapper<String, String, KeyValue<String, String>>() {
            @Override
            public KeyValue<String, String> apply(String ip, String geolocJsonString) {
                ObjectMapper mapper = new ObjectMapper();
                String state = "";
                try {
                    JsonNode geolocJson = mapper.readTree(geolocJsonString);
                    state = geolocJson.get("region_name").asText();
                    System.out.println(state);
                } catch (IOException e) {
                    e.printStackTrace();
                }
                return new KeyValue<>(ip, state);
            }
        })
        // 3. select the aggregation key
        .selectKey((ip, state) -> state)
        // 4. group by the aggrgation key
        .groupByKey()
        // 5. perform the aggregation
        .count()
        .toStream();

pageVisitByState.to(Constants.US_VISITS_BY_STATE_TOPIC, Produced.with(Serdes.String(), Serdes.Long()));

A Java implementation of this step can also be found in the provided source code in com.ibm.code.samples.kafka.loganalysis.stream.VisitsByStateStreamProcessor. You can also run the JAR artifact:

java -jar VisitsByStateStreamProcessor.jar [KAFKA_VM_IP]:9092

To verify the behavior, run

bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic visits-by-state --from-beginning --formatter kafka.tools.DefaultMessageFormatter --property print.key=true --property print.value=true --property key.deserializer=org.apache.kafka.common.serialization.StringDeserializer --property value.deserializer=org.apache.kafka.common.serialization.LongDeserializer

As the web site receives activity the above command should start outputting state-visits key-value pairs (for visits from US visitors):

...
New Jersey 33
Alabama 14
Massachusetts 27
Oklahoma 6
Connecticut 28
Mississippi 2
Virginia 34
Texas 46
District of Columbia 9
Missouri 12
New York 25
New Jersey 34
Michigan 33
Maryland 7
Missouri 13
California 123
Louisiana 3
Arizona 49
Pennsylvania 13
Texas 47
Tennessee 6
Michigan 34
...

Creating stream processors takes some time and coding (which in turn requires familiarity with the underlying programming language) to generate the intended stream. In the next step, KSQL is used to achieve a similar result.

Use KSQL to generate streams (on Kafka Cluster VM)

KSQL is an open source project meant to simplify working with Kafka Streams, similar to how SQL simplifies working with relational databases and relieves users of writing code to create or query the content in the database. KSQL is currently in developer preview mode and is not yet recommended for production use.

This step is not dependent on Step 5 above. In fact, the intention is to start with the result of Step 4 and show how KSQL helps in creating Kafka Streams without having to deal with any coding or knowledge of any programming language. The geo-location topic is the starting point for this step, and proper KSQL DDLs are written based on it. The goal is to achieve a similar outcome as in Step 5.

Perform this step on the Kafka VM, by first setting up and running KSQL, and then creating the necessary KSQL constructs:

  1. Download the latest release of KSQL (v0.3-temp at the time of publishing this article) and unzip it into its own folder:
wget https://github.com/confluentinc/ksql/archive/v0.3-temp.zip
unzip v0.3-temp.zip

KSQL source will be in ksql-master folder.

  1. Build KSQL:
sudo apt install maven
cd ksql-0.3-temp/
mvn clean compile install -DskipTests

The installation could take a while.

  1. Run KSQL Client:
bin/ksql-cli local
  1. Create a stream based on the geo-location topic:
ksql> create stream geo_location_stream (timestamp varchar, ip varchar, uri varchar, country_code varchar, country_name varchar, region_code varchar, region_name varchar, city varchar, zip_code varchar, time_zone varchar, latitude varchar, longitude varchar) with (value_format = 'json', kafka_topic = 'geo-location');

This stream takes apart the geographic location JSON. To verify the content run:

ksql> select * from geo_location_stream;
...
1515197461321 | 100.55.49.75 | 05/Jan/2018:16:10:49 -0800 | 100.55.49.75 | /images/slide_3.jpg | US | United States |  |  |  |  |  | 37.751 | -97.822
1515197461510 | 21.169.48.202 | 05/Jan/2018:16:10:49 -0800 | 21.169.48.202 | /index.html | US | United States |  |  |  |  |  | 37.751 | -97.822
1515197461683 | 211.73.250.201 | 05/Jan/2018:16:10:49 -0800 | 211.73.250.201 | /css/bootstrap.css | TW | Taiwan | TPE | Taipei City | Taipei |  | Asia/Taipei | 25.0418 | 121.4966
1515197461872 | 6.154.73.171 | 05/Jan/2018:16:10:49 -0800 | 6.154.73.171 | /css/style.css | US | United States | AZ | Arizona | Fort Huachuca | 85613 | America/Phoenix | 31.5273 | -110.3607
1515197462065 | 230.251.91.254 | 05/Jan/2018:16:10:49 -0800 | 230.251.91.254 | /js/jquery.easing.1.3.js |  |  |  |  |  |  |  | 0 | 0
1515197462266 | 238.122.42.105 | 05/Jan/2018:16:10:49 -0800 | 238.122.42.105 | /js/hoverIntent.js |  |  |  |  |  |  |  | 0 | 0
1515197462441 | 60.181.182.11 | 05/Jan/2018:16:10:49 -0800 | 60.181.182.11 | /js/main.js | CN | China | 33 | Zhejiang | Wenzhou |  | Asia/Shanghai | 27.9994 | 120.6668
...
  1. Now use this stream and build the target US visits by state table:
ksql> create table visits_by_state_table as select region_name, count(*) as page_visits from geo_location_stream where len(uri) > 5 and lcase(uri) like '%.html' and country_code = 'US' and region_name != '' group by region_name;

Make sure there is content in geo-location topic in order to see content in this table. It is created as a table because it is a view into a stream and does not represent sequence of events; in fact, it represents a report based on a stream. For more on the distinction between the stream and table concepts see this page. You can inspect this table by running:

ksql> select region_name, page_visits from visits_by_state_table;
...
Michigan | 30
Colorado | 19
California | 120
Alabama | 12
Pennsylvania | 12
Virginia | 29
Virginia | 30
Missouri | 11
Ohio | 8
Arizona | 46
Florida | 18
Florida | 19
New York | 22
Maryland | 6
Washington | 14
Massachusetts | 25
Michigan | 31
...

Comparing the steps takes in this and previous section, it is clear that KSQL makes working with Kafka Streams much simpler. It paves the way for non-developers to benefit from its capabilities without having to rely on code development.

Here are some other examples of tables for this use case:

  • Page visits by country:
create table page_visits_by_country_table as select country_name, count(*) as page_visits from geo_location_stream where len(uri) > 5 and lcase(uri) like '%.html' and country_name != '' group by country_name;
  • Malicious user detection (find IP addresses that hit the website more than 1000 times during any 60 second window):
create table malicious_ip as select ip, count(*) as hits from geo_location_stream window tumbling (size 60 seconds) group by ip having count(*) > 1000;

More advanced structures (e.g. involving joining streams and tables, etc.) are also possible with KSQL, but those will be studied in a future article.

Summary

This article presented a hands-on and end-to-end example of how Apache Kafka, Kafka Streams, and KSQL can help with common use cases such as monitoring visits to a web site. It demonstrated how web site access logs can be stored and processed in Kafka, and presented two methods for monitoring: developing stream processors, and using KSQL. Even though writing stream processors is still more powerful and perhaps something that makes developers much more entertained, KSQL is targeted for a wider audience and speeds up creating and manipulating streams.