The why and how of partitioning in Apache Iceberg

Apache Iceberg is an open source table format for storing huge data sets. Partitioning is an optimization technique used to divide a table into certain parts based on some attributes. Learn more about partitioning in Apache Iceberg, and follow along with an example to see how easy Iceberg makes partition evolution.

What is Apache Iceberg?

Apache Iceberg is a relatively new, open source table format for storing petabyte-scale data sets. Iceberg fits easily into the existing big data ecosystem and currently has integration with Spark and Presto execution engines. Using a host of metadata kept on each table, Iceberg provides functions that are not traditionally available with other table formats. This includes schema evolution, partition evolution, and table version rollback — all possible without the need for costly table rewrites or table migration.

What is partitioning?

In general, partitioning is just a way to group items of a certain type or value for faster access. Partitioning in databases is no different: Large tables are divided into multiple smaller tables by grouping similar rows together. The benefit is faster read and load time for queries that access only part of the data. For example, a log table that keeps track of log events, messages, and event time might have millions of entries spanning many months. Partitioning these entries by day make querying for the 100 or so log events that occurred from Dec. 11-19, 2019, much quicker.

Iceberg seeks to improve upon conventional partitioning, such as that done in Apache Hive. In Hive, partitions are explicit and appear as a separate column in the table that must be supplied in every table write. Queries in Hive also must explicitly supply a filter for the partition column because Hive doesn’t keep track of the relationship between a partition column and its source column. Hive tables also do not support in-place partition evolution; to change a partition, the entire table must be completely rewritten with the new partition column. This is costly for large tables and can create data accuracy issues. Additionally, queries dependent on table partitions must now be rewritten for the new table. Iceberg, on the other hand, implements partitioning in a way that does not result in these issues.

Partitioning in Apache Iceberg

Iceberg makes partitioning simple for the user by implementing hidden partitioning. Rather than forcing the user to supply a separate partition filter at query time, Iceberg handles all the details of partitioning and querying under the hood. Users do not need to maintain partition columns or even understand the physical table layout to get accurate query results.

Iceberg has several partitioning options. Users can partition timestamps by year, month, day, and hour. Iceberg keeps track of the relationship between a column value and its partition without requiring additional columns. Queries on timestamp data with format “YYYY-MM-DD hh:mm:ss” partitioned by day, for example, need not include the “hh:mm:ss” portion in the query statement. Iceberg can also partition categorical column values by identity, hash buckets, or truncation.

Another benefit of hidden partitioning is that users do not need to supply partition layout information when querying Iceberg tables. Not only does this make Iceberg partitioning extremely user-friendly but it also allows partition layouts to be changed over time without breaking pre-written queries. When evolving a partition spec, the data in the table before the change is unaffected, as is its metadata. Only data that is written to the table after evolution is partitioned with the new spec, and the metadata for this new set of data is kept separately. When querying, each partition layout’s respective metadata is used to identify the files that it needs to access; this is called split-planning. Split-planning is one of many Iceberg features that are made possible due to the table metadata, which creates a separation between the physical and the logical. This concept is what makes Iceberg so versatile.

Evolving a partition in Apache Iceberg

In this section, we will go through an example of partition evolution and see how seamless it is to query across data with multiple partition layouts. But first, an explanation of how Iceberg tables are structured on disk might help clarify the big picture. Within each Iceberg table folder, there is a metadata folder and a data folder. The metadata folder contains, among other things, information about partition specs, their unique IDs, and manifests connecting individual data files with the appropriate partition spec IDs. The data folder contains all the table data files that make up the full Iceberg table. When writing data to a table with a partition, Iceberg creates several folders in the data folder. Each is named with the partition description and the value. For example, a column titled time and partitioned on the month will have folders time_month=2008-11, time_month=2008-12, and so on. We will see this firsthand in the following example. Data partitioned on multiple columns creates multiple layers of folders, with each top-level folder containing one folder for each of the second-level partition values.

Now onto our example, which uses Iceberg’s HiveCatalog API to create and load Iceberg tables from a Hive metastore. For brevity, we use a trivial data set that mimics a log table for some software product developed by Company X, with columns for time, log_id, and log_message. If you are following along with this example on your machine, copy the following data and save it in a CSV file. Note that the timestamps in the data appear as long data types corresponding to their UNIX timestamp in seconds:

1225526400,1,a
1225699200,2,b
1225785600,3,c
1226476800,4,d
1226908800,5,e
1226995200,6,f
1227513600,7,g
1227772800,8,h
1228032000,9,i
1228118400,10,j
1228377600,11,k
1228809600,12,l
1228982400,13,m
1229673600,14,n
1230019200,15,o
1230278400,16,p
1230451200,17,q
1230624000,18,r
1230710400,19,s
1230796800,20,t
1230969600,21,u
1231747200,22,v
1232352000,23,w
1232784000,24,x
1233216000,25,y
1233302400,26,z

In this example, the table was created in 2008. Since then, Company X has gained several customers and now expects log events to occur more frequently. They determine that starting in 2009, a partition by day of log events would be more useful. We will walk through this scenario, manually adding the necessary data.

First, start the spark-shell with the following command. For this example, we use Spark 3.0.0, Hive metastore 2.3.0, and Iceberg package release 0.9.0. We recommend that you use the same. If you are using Spark 2.4.x, the mechanism to display tables is different, but instructions are provided as needed.

> $SPARK_HOME/bin/spark-shell --packages org.apache.iceberg:iceberg-spark3-runtime:0.9.0 \
    --conf spark.sql.catalog.my_catalog=org.apache.iceberg.spark.SparkCatalog \
    --conf spark.sql.catalog.my_catalog.type=hive

After you start the Spark shell, import the required Iceberg packages for this example:

import org.apache.iceberg.{PartitionSpec, Schema, Table}
import org.apache.iceberg.catalog.TableIdentifier
import org.apache.iceberg.hive.HiveCatalog
import org.apache.iceberg.types.Types

Now we create our table (logtable) in the namespace called default, originally partitioned by month of event. To do so, we create both a HiveCatalog object and a TableIdentifier object. We also must define the table schema and the initial partition spec to provide this information to the createTable function:

val namespace = "default"
val tableName = "logtable"
val catalog = new HiveCatalog(spark.sparkContext.hadoopConfiguration)
val name = TableIdentifier.of(namespace, tableName)

val tableschema = new Schema(
      Types.NestedField.optional(1, "time", Types.TimestampType.withZone()),
      Types.NestedField.optional(2, "id", Types.IntegerType.get()),
      Types.NestedField.optional(3, "data", Types.StringType.get())
   )

val spec = PartitionSpec.builderFor(tableschema).month("time").build()
val logtable = catalog.createTable(name, tableschema, spec)

Then add the data to the table. If you are using the data set provided here, set the file_location variable equal to the path to the data CSV file on your machine. In the following commands, we are only adding data for timestamps prior to Jan. 1, 2009, mimicking the scenario set forth in our Company X example. If you are using your own data set to loosely follow this example, make sure you are sorting the data on the partition column when you write to the table (as shown here):

val file_location = "/path/to/data.csv"
val schema = StructType(Array(
                    StructField("time", IntegerType, true),
                    StructField("id", IntegerType, true),
                    StructField("data", StringType, true)
                ))

val data = spark.read.format("csv")
              .option("delimiter", ",")
              .schema(schema)
              .load(file_location)

data.select(
         data.col("time").cast(DataTypes.TimestampType),
         data.col("id"),
         data.col("data")
       ).where(data.col("time").lt(1230768000))
        .sort(data.col("time"))
        .write
        .format("iceberg")
        .mode("overwrite")
        .save(s"$namespace.$tableName")

You can see the current table version and its schema and spec. If you are working in Spark 2.4.x, rather than displaying the table with spark.table(“default.logtable”).show, use spark.read.format(“iceberg”).load(“default.logtable”).show.

Figure 1

Next, we define a new PartitionSpec object, specifying the schema on which to build it (defined earlier) and the wanted partition and source column:

val newspec = PartitionSpec.builderFor(logtable.schema()).day("time").build()

Then, we update the partition spec for the table by defining a TableOperations object. From this object, we can define the base metadata version and the new metadata version with the evolved partition spec. A commit must be performed on the TableOperations object to make the wanted changes official:

val metadata = catalog.newTableOps(name)
val baseMetadata = metadata.current()
val newMetadata = baseMetadata.updatePartitionSpec(newspec)
metadata.commit(baseMetadata, newmeta)
table.refresh()

In our Company X scenario, a few days have passed since they have evolved the partition spec of their table, and new logs have been added. We mimic this by manually adding the new logs to the table with the following code. In this write operation, we are only adding data that has occurred on or after Jan. 1, 2009:

data.select(
         data.col("time").cast(DataTypes.TimestampType),
         data.col("id"),
         data.col("data")
       ).where(data.col("time").gt(1230768000))
       .sort(data.col("time"))
        .write
        .format("iceberg")
        .mode("overwrite")
        .save(s"$namespace.$tableName")

As you can see, there is no need to rewrite the entire table after partition evolution in Iceberg. If you navigate to the data folder of your logtable, you will see that Iceberg has organized the data files according to their partition values – timestamps prior to Jan. 1, 2009, are organized by month; timestamps on and after that date are organized by day.

Figure 2

The new table spec and data can be seen below. Again, if you are working in Spark 2.4.x, use spark.read.format(“iceberg”).load(“default.logtable”).show to display the table.

Figure 3

Company X now wants to query for all log events that happened while its employees were on holiday break, so they can ensure that they didn’t miss any major errors. The query, shown below, crosses multiple partition layouts, but still works seamlessly without the user having to specify any additional information or know anything about the table’s partitions:

spark.table("default.logtable").createOrReplaceTempView("logtable")
spark.sql("SELECT * FROM logtable WHERE time > '2008-12-14' AND time < '2009-1-14'").show

The result of the query is below.

Figure 4

Next steps for Apache Iceberg partitioning

Overall, Iceberg provides a great deal of functions for partitioning and partition evolution. Most of the partition tests performed worked exactly as claimed in the Iceberg documentation.

However, we did run into one issue when testing partition evolution. Attempting to display or access the manifest file metadata table after partition evolution results in an IndexOutOfBoundsException. Fortunately, despite this error, other table operations work as intended, and additional changes to the manifest file table are recorded appropriately and don’t cause further errors outside of the inability to be displayed.

Also of note, Iceberg currently only supports partition evolution through its HiveCatalog interface, and users of the HadoopTable interface are not able to access the same functions. The community is working on adding all partition evolution features directly to the table API so they can be easily accessed from the Iceberg table object (the logtable variable in our example). You can see the open pull request for this feature here. This ensures consistency in partition evolution operations for all users, as the table object is used in HadoopTables and HiveCatalog implementations.

Conclusion

In summary, Apache Iceberg’s abstraction of the physical from the logical using extensive metadata makes hidden partitioning – and several other useful features – possible in data lake implementations. I hope this article was helpful to your understanding of partitioning and partition evolution in Apache Iceberg. Be sure to keep your eye out for the frequent new developments in the Iceberg community.