Summary
With the growth of the Hadoop ecosystem came a proliferation of implementations for the Hive table format. Unfortunately, with no formal specification, each project works slightly different which increases the difficulty of integration across systems. The Hive format is also built with the assumptions of a local filesystem which results in painful edge cases when leveraging cloud object storage for a data lake. In this episode Ryan Blue explains how his work on the Iceberg table format specification and reference implementation has allowed Netflix to improve the performance and simplify operations for their S3 data lake. This is a highly detailed and technical exploration of how a well-engineered metadata layer can improve the speed, accuracy, and utility of large scale, multi-tenant, cloud-native data platforms.
Preamble
- Hello and welcome to the Data Engineering Podcast, the show about modern data management
- When you’re ready to build your next pipeline you’ll need somewhere to deploy it, so check out Linode. With private networking, shared block storage, node balancers, and a 40Gbit network, all controlled by a brand new API you’ve got everything you need to run a bullet-proof data platform. Go to dataengineeringpodcast.com/linode to get a $20 credit and launch a new server in under a minute.
- Go to dataengineeringpodcast.com to subscribe to the show, sign up for the mailing list, read the show notes, and get in touch.
- Join the community in the new Zulip chat workspace at dataengineeringpodcast.com/chat
- Your host is Tobias Macey and today I’m interviewing Ryan Blue about Iceberg, a Netflix project to implement a high performance table format for batch workloads
Interview
- Introduction
- How did you get involved in the area of data management?
- Can you start by explaining what Iceberg is and the motivation for creating it?
- Was the project built with open-source in mind or was it necessary to refactor it from an internal project for public use?
- How has the use of Iceberg simplified your work at Netflix?
- How is the reference implementation architected and how has it evolved since you first began work on it?
- What is involved in deploying it to a user’s environment?
- For someone who is interested in using Iceberg within their own environments, what is involved in integrating it with their existing query engine?
- Is there a migration path for pre-existing tables into the Iceberg format?
- How is schema evolution managed at the file level?
- How do you handle files on disk that don’t contain all of the fields specified in a table definition?
- One of the complicated problems in data modeling is managing table partitions. How does Iceberg help in that regard?
- What are the unique challenges posed by using S3 as the basis for a data lake?
- What are the benefits that outweigh the difficulties?
- What have been some of the most challenging or contentious details of the specification to define?
- What are some things that you have explicitly left out of the specification?
- What are your long-term goals for the Iceberg specification?
- Do you anticipate the reference implementation continuing to be used and maintained?
Contact Info
Parting Question
- From your perspective, what is the biggest gap in the tooling or technology for data management today?
Links
- Iceberg Reference Implementation
- Iceberg Table Specification
- Netflix
- Hadoop
- Cloudera
- Avro
- Parquet
- Spark
- S3
- HDFS
- Hive
- ORC
- S3mper
- Git
- Metacat
- Presto
- Pig
- DDL (Data Definition Language)
- Cost-Based Optimization
The intro and outro music is from The Hug by The Freak Fandango Orchestra / CC BY-SA
Hello, and welcome to the Data Engineering Podcast, the show about modern data management. When you're ready to build your next pipeline, you'll need somewhere to deploy it, so check out Linode. With private networking, shared block storage, node balancers, and a 40 gigabit network, all controlled by a brand new API, you've got everything you need to run a bulletproof data platform. Go to dataengineeringpodcast.com/linode today to get a $20 credit and launch a new server in under a minute. You can go to data engineering podcast.com to subscribe to the show, sign up for the mailing list, read the show notes, and get in touch, and and join the discussion at data engineering podcast.com/chat.
Your host is Tobias Macy. And today, I'm interviewing Ryan Blue about Iceberg, a Netflix project to implement a high performance table format for batch workloads. So, Ryan, could you start by introducing yourself? Yeah. Sure. I'm Ryan Blue. I work for Netflix on the data platform team. And do you remember how you first got involved in the area of data management?
[00:01:08] Unknown:
Yeah. Mostly through distributed systems. So I've been working in the distributed systems space for, I guess, most of my career, starting with, like, private projects for the company I was working for at that time. Eventually moved into the Hadoop space and then got hooked up with a job at Cloudera. From Cloudera, I was working on like file formats and data storage APIs and eventually, met up with the the guys at Netflix and and decided to move over here. So Yeah. I saw that you were working on Avro and I believe Parquet as well when you were at Cloudera? Yeah. Exactly. I was actually the the 1 man file formats team for there for a while.
So, yeah, I I got to know Avro and Parquet pretty well, which transferred over here to Netflix nicely. It it was a a good move. And then once at Netflix, I started working on, like, Spark and, basically, my experience on file formats and and the low level, like, data storage stuff was how I got I shouldn't say stuck. Let me think. My my experience at, Cloudera with, like, data formats and stuff was, what landed me are are projects for working with s 3 tables. So, like, our our batch pattern right projects and our s 3 committers and stuff like that. So that's how I got into the format and table space.
[00:02:30] Unknown:
Yeah. I had a great conversation a while back with Doug Cutting and Julian Ledem talking about Avro and Parquet and some of the finer points of data serialization, and when you want to use which 1. So that that was a fun conversation.
[00:02:43] Unknown:
Oh, yeah. Yeah. Those are 2 of the best people to talk to on it too. And so
[00:02:49] Unknown:
that brings us to the work you're doing now with Iceberg. So can you give a brief explanation about what Iceberg is at the high level, and what the original motivation was for creating it?
[00:03:00] Unknown:
So Iceberg is a table format, and that's not something that in the Hadoop space we think about very often because we only have 1, and it's sort of this ubiquitous table layout that we don't even think of it. It's so pervasive. So Hive lays out data files in a directory structure, and then we basically go look at that directory structure and see, you know, these this partition is this directory, and then all the files in that directory are in my table. Iceberg takes that concept, basically, how how you know which files are in your table, what files make up the dataset itself, and instead of tracking that the way Hive does it in a directory structure and and with, listing partition directories, Iceberg tracks files in a very different way. Well, I guess it's not very different, but just a different way.
So, really, Iceberg, the original concept was about how do we improve on keeping track of what data is in a table? Then the project grew from there just a little bit because we we quickly realized that if we're gonna fix this 1 aspect of how tables work, then we may as well fix some of the other sort of, what would you call them? The the other challenges that we were hitting with those tables. So things like schema evolution is is really tricky and not well defined across file formats and and in different situations, as well as, like, how partitioning is done. You know, so, like, the the hidden versus explicit partitions.
So we decided to expand the scope somewhat from there, but it's really about how do I track what data is in a table and the metadata for a table.
[00:04:49] Unknown:
And given that Iceberg is primarily a metadata representation of the underlying files within these various directory hierarchies or partition hierarchies, how much of it is responsible for how the data is actually read through the query engine that's executing? Or is it merely just as a means of being able to locate those files for the query engine to then be able to retrieve and process them? So the main purpose is to keep track of the list of files in the, in the table
[00:05:22] Unknown:
for passing off to a query engine. So Iceberg core itself is really just that that metadata component. Now because we also get into fixing things like, schema evolution and, basically, how do I take a file that was written, you know, a long time ago and read it using the table's current schema? We have to get involved somewhat in the engines themselves. So Iceberg provides or Iceberg core provides, so Iceberg core itself provides some helper functions for doing those projections and basically saying, hey. Here's my iceberg schema. I want to get records out of this file. But it it also uses the approach of, what looks like the parquet file format and that you can bring your own object model and you you implement some API and you can get records, reconstructed in whatever your in memory model is.
[00:06:24] Unknown:
And so for the files that are contained within these hierarchies, are they do they have to be homogeneous and that they're all of the same format and all of the same schema within a given table structure or within a given directory structure? Or is there room for heterogeneity in the serialization formats or in the state of the schema across individual files? And then, within those files themselves, is it sort of 1 record per file? Or is each file a collection of multiple records and the sort of, aggregation of those records is just determined by whatever process wrote them out to disk?
[00:07:07] Unknown:
Yeah. It it's the latter. The the files in the table are, whatever aggregation was necessary or, you know, optimal for whatever wrote it out to disk. So that reflects, like, your your runtime query engine. Often users are trying to meet some SLA for getting this this data done and and out to the the next level in a pipeline or or next consumer. And we want to be able to use whatever parallelism is necessary to meet that deadline. So grouping records into files is all up to the query engine and I I should say at this point that I I know you asked about schema as well And, that was a a big question. What was the other 1? Whether the,
[00:07:54] Unknown:
files within
[00:08:04] Unknown:
point right after your question. We we we want the, point right after your question. We we we want the processing engines to be able to do whatever is necessary there. Where iceberg differs slightly is that it uses a model that gives you, snapshot isolation between writers. So, it makes it easier to have something that needs to write out a lot of smaller files to meet some SLA. And then we can have another process go in and compact those files to be optimal for the table offline. And and, that way we get sort of the best of both worlds. You can write data files for an availability deadline but then you can also come back and compact them and, you know, write them another time for long term storage. And that kind of dovetails into your question on do these all have to be the same file formats, can they be different formats.
The answer there is they can be different formats. Every file is, the format of every file is tracked individually. So we can have, avro files or orc files or parquet files all in the same table and they they should behave exactly the same way. That allows us to do things like, have a streaming data system, streaming data into a record oriented format because it makes sense for the memory requirements of that streaming system. And then again, we can come back and either merge and and shift those into a different format or simply merge the files in in the same format.
We can clean up those those artifacts of the initial processing
[00:09:44] Unknown:
later. And it's also worth mentioning and pointing out that you were, be very careful to refer to iceberg core, during your earlier response. And also worth pointing out the fact that you have written a fairly extensive specification for the table format itself. So I'm wondering if you can talk about your reasoning for committing all of that to a document and what the current state of that specification is and its relation to the open source software project that bears the same name? Oh, yeah. So 1 of the problems that we identified with,
[00:10:24] Unknown:
the way, you know, most tables work in the, for lack of a better name, de facto or or hive table format is that there's no specification for it. So this is kind of like when someone thinks they can write a CSV file by just adding commas, to print statements. Right? You're going to work 99% of the time, but there are edge cases that you're not gonna think about ahead of time. Right? So what we wanted was to eliminate the the huge number of just slightly different implementations of the table format or at least, you know, prevent that from happening for iceberg. So our motivation here is that because we've taken the time to come up with what we think is a a good and, you know, fairly future proof design.
Shouldn't call it future proof. That's crazy. So our motivation here is that because we've taken the time to design this, for a lot of the characteristics that we think we're gonna need in the next few years. We want to push this to, you know, share it with other companies and and hopefully make this a a standard just like the hive table standard is today. And as long as we're doing that, we wanna prevent sort of the the accidental fragmentation that we've seen in the Hive table standard, where Presto and Hive have slightly different implementations, and you know things like how do I track mills and partitions or when do I escape partition values like all of those things aren't written down anywhere.
So it's really easy to come up with a a 90% solution, but it has all these slight little bugs. And, this is something that I've personally had to deal with here at Netflix because, you know, we have things that have our own implementation or or, you know, parse partition keys and things like that. And then inevitably, you realize that you weren't escaping partition values and you didn't handle those correctly and and all of these things, you know, just eventually come around to bite you. So we wanted to be really careful about that. So it sounds like it would be fair to say that the Hive table format is the markdown of the big data world. Yeah. I think that that might be a a pretty apt analogy.
You know, everyone has a a slightly different flavor of it. I think it it works pretty well because there is a, you know, standard implementation you can come back to. Or sorry, a reference implementation you can come back to and point and say, hey, Hive does it this way. And Hive was usually the first to, you know, figure out these problems existed. So that at at least it has going for it.
[00:13:25] Unknown:
And the software implementation that you are using as the reference for your, day to day work in the project is, as I mentioned before, open source. And I'm wondering if that was the intent from day 1 or if there was some effort necessary to clean up some of the code base to make it publicly consumable after the fact.
[00:13:49] Unknown:
Absolutely. It was our intent to, to open source this project. So we knew that a lot of the challenges we were seeing are not specific to Netflix. In fact, I I don't think any of these challenges are specific to Netflix. There are a couple reasons why we had, I think, a greater motivation than most people to solve the challenges, and that's primarily our use of s 3. But, you know, even if you take a look at those, you know, s 3 has more correctness issues if you're using the the hive table layout. There's still correctness issues with the hive table layout in HDFS, and there are still scale challenges with the Hive table layout in HDFS. And those are our 2 main motivating factors, both of which are made worse by using s 3, but they're by no means, you know, just gone if you're using HDFS instead.
[00:14:44] Unknown:
And on that note, how has the use of Iceberg simplified your work at Netflix for those batch workloads?
[00:14:51] Unknown:
Well, we're still rolling it out, so I don't know if I'd say it's simplified any of my work yet. The the main thing that it's going to do is it will replace, a number of projects that we've been maintaining. Maybe projects is is not the right word. It replaces a number of workarounds that we have for working with s 3. So we've given talks in the past at, conferences and and, you know, I think done blog posts about things like Semper, which is our consistent listing, solution for s 3, which is basically where we hijack, file system calls and make sure that we check a consistent database for what we expect from those list calls.
So that when we say I wanna read this partition, give me all the files in it, we know that we got all of the files instead of some of the files because that's a correctness problem. So because Iceberg tracks all of the files in a table individually, we no longer have to list directories, so we no longer have consistent listing issues. And on that note, we no longer have the issues where we're listing too many directories. So things that we've done to solve both of those challenges in the past, like Semper for consistent listing and, hacks in our s 3 file system to make those listing calls faster, we no longer really need because Iceberg is just not listing in s 3, which is we think a a better design for both s 3 and HDFS based tables. So really, it's taking all of these things that we've done in the past and think and rethinking how we interact with the underlying data file system. So in the past, we've just tried to integrate with s 3 at the file system level. How do we make our file system faster? How do we guarantee consistent listings? How do we avoid renames? Things like that.
And what we've realized the problem is is that we're using a table format that requires listing and renames and all of those things that are slow. So by using Iceberg, we no longer need those things.
[00:17:10] Unknown:
And and so how is the actual software implementation of Iceberg implemented and designed, and how has that architecture evolved since you first began working on it?
[00:17:24] Unknown:
So Iceberg is at the core just a a metadata layer for keeping track of snapshots of of file listings in the table. It's evolved quite a bit in that it it came from that high level idea of let's track all the files in the table at, you know, time t equals 1, t equals 2, etcetera. We really had to do a lot of work to figure out how are we gonna make this feasible. You know? We we've had to think, you know, how do we list these files and keep track of them? Then how do we, make sure we're not writing too much, on on every you know, to keep track of metadata for every application level right and things like that. And it has grown significantly in scope as we realize that we need to do things like have an expression library so that you can, pass an expression to iceberg so we can filter the files that you need as well as pass that down into the file formats so that we can take advantage of, like, parquet row group pruning and and things like that. And for the actual
[00:18:36] Unknown:
deployment and integration with somebody's query engine, what's involved in getting it installed and set up and configured to be able to use within somebody's existing workflow?
[00:18:48] Unknown:
Good question. So right now, there's there's 1 main challenge, and that is how we keep track of the pointer to the top level metadata. So Iceberg manages metadata. Well, I I guess data files and metadata files in a git like tree structure. So you've got 1 metadata file at the top that keeps track of the valid snapshots of a table. Those valid snapshots point to manifest files that list the data files, and then those data files are are stored somewhere as well. So you can think of it as a tree structure. And we guarantee atomic, transactions on the table by simply writing a new version of this tree just like you would with a a new git commit and then pointing the the current reference to the new version of the tree, you know, the new root.
So here at Netflix, we have a system, called MetaCat that we use as a federated Hive Metastore, and we use MetaCat for that, pointing to the root metadata of a table. So that's a challenge if you're not here because we need to figure out some way to keep track of which metadata file is current. So that's that's the first challenge. And for for that in the public space, we just committed a implementation that uses locking and and a generic Hive metastore. So we're testing that out and I think our our, Presto support is going to be based on that. So it'll use a a regular Hive metastore and use that Hive metastore to do this, you know, pointer update from 1 version of table metadata to the next. We also have a version of this that uses just an HDFS path, either HDFS or local file system, I should say. Because a file system with atomic rename can also be used to keep track of, the the metadata without I'm trying to think of a good way to So we also have an implementation that uses either the local file system or HDFS or basically any file system with atomic rename to also keep track of what the current metadata for a table is. So those 2 implementations are public and we're we're testing those out and trying to integrate them with with the public parts of the project. And I I should mention the only, non public part of the project is our own, integration with Medicat.
I think Medicat's public so even that might be public soon anyway. So that's the the first challenge is how do we keep track of table metadata. So that's the the challenge of how do we keep track of iceberg tables. Then there's a challenge of, you know, how do I deliver iceberg to the the query engine that I'm using? We're building iceberg integration with Presto, Spark, read only integration in pig, and then client libraries in Python and Java. So Spark and Presto are really the the 2 main ones that I think people will be using in the near future. Like I said, we just released the Hive support for using a Hive metastore to keep track of tables, and that was the only thing that our Presto implementation was waiting for. So we're gonna be posting a poll request in the next week or 2 with, integration with Presto that uses that Hive integration to or Hive metastore to track your tables. Then in the project, there's also a module called iceberg runtime, and that's a shaded jar that contains everything that you need in order to get Iceberg working with Spark 2 3. There's version 0.2.0, which you can use with Spark 2.3 and 0 dot 3 dot o, which you can use with, spark 232.
So that should just like you drop it into your Spark class path, and then you can load iceberg tables, using
[00:23:03] Unknown:
the data frame API. And do you have a migration path either implemented or planned for people who are moving from their existing Hive tables to using Iceberg within their query engines?
[00:23:18] Unknown:
Yes. There there is a a pretty easy migration path. Our internal Spark version has, like, all the table DBL and and things like that implemented, and we're working on getting those things out into the the public version of spark or, you know, the upstream Spark version. So we've basically, you know, built it out, tested it out here, and we're working on, getting those features into upstream Spark through the new, data source b 2 API in Spark. So once that's out, or once our our Presto support is out, it'll be very easy to create tables using just regular regular SQL DDL. So the migration path would be created an iceberg table using normal create table DDL and then insert into or create table as select from 1 table to another. So that's the easiest path. It's slightly harder if you don't want to copy your data. And there are only some situations where you can avoid copying your data. So if your tables are currently in CSV or JSON or or formats that we don't really think of as database formats, I shouldn't say database formats, but, you know, high performance database formats.
If your tables are in in those formats, then you would need to forklift your code and your your data and copy it in. There are some cases if your data is already in ORC or Parquet or Avro where you could just build your iceberg metadata and point to all those existing files. Now the only trick there is we're going to have to have some sort of mapping from the schemas in those files to an iceberg schema. Iceberg schemas are different because we want the same schema implementation across all the different formats. So orc, avro, and parquet. And, we do schema evolution differently.
We use field IDs to keep track of every, every data field in a schema. That way we can do, ads and drops and renames that match the behavior of traditional SQL databases.
[00:25:26] Unknown:
And in terms of the schema for the individual files on disk, how do you manage evolution of those schemas that is managed at the iceberg table level and propagating that down to the actual files on disk?
[00:25:44] Unknown:
So iceberg has a a pretty strict set of rules for schema evolution. It it actually implements, you know, exactly what you can do in SQL DDL. So it it aligns very nicely. But you can add columns, drop columns, rename, etcetera. To track those changes or or basically to track columns themselves, we use field IDs. So when we're writing data out to disk, we write the field IDs into the schemas of the files that we're writing. So Parquet and Avro, for example, we just embed the, the field IDs in their schemas so that when we read the file we can do our resolution based on the field IDs and then project the the correct columns.
And we use field IDs so that we can, get rid of some of the edge cases where in Avro, for example, if you delete a column and then readd back a a column with the same name, you'll get the old data. So we have to use something unique. So we just have a 1 up counter that, assigns a new ID for every column. That way, if you dropped column a where the ID was 6 and you add column a, you get a new ID for for that, say, 7. And we consider those 2 unique columns in the data.
[00:27:05] Unknown:
And in the process of mutating the schema at the table level, do you actually modify the underlying files themselves? Or do you just modify what actually gets read out from those files when you issue a query to the iceberg table, for the query planning?
[00:27:23] Unknown:
Good question. So, using this ID system, we don't have to modify any of the underlying files. We can guarantee that schema evolution works exactly as you would expect it to without modifying the data itself. So like I said, when you drop a column, you really just say, hey. I'm I'm never going to need ID 7 again or something like that. When you add a column, you know that it's not going to conflict with any existing data because it uses a fresh ID and like renames and things like that, because projection is always based on IDs, the name of a column always comes from the table's current schema. So whatever the table's current schema says that the name of that column is the name of that column.
[00:28:09] Unknown:
So the names from the the files themselves don't don't matter. And so earlier, you had mentioned that 1 of the provisions built into Iceberg is the ability to compact to the underlying data files from maybe Avro or Parquet into new representations of that data? And so does that compaction rely on the currently defined schema, such that if you do that drop and add back in of a column, if the column ID that was present when the file was first written isn't the same as what is there now? Will that information be discarded during that compaction process?
[00:28:50] Unknown:
So that actually depends on the engine that you're using to do that compaction process. You could choose the newer of the 2 schemas. So, you know, say I have file a and file b. If they have the same schema, then I can just continue using that schema and compact them together. The only guarantee that we make is that the newer of the 2 of 2 schemas can read the older schema. So all of our schema transitions make that guarantee. That way you know that the current table schema can read any data schema in the table. So if you were if you wanted to preserve as much data as possible you could use the newer of all of the schemas of the files that you're you're compacting.
So the schema that's actually used kind of depends on what's doing the compaction. The simple compaction case where you just say, hey, Spark. Go read this data and then write back out and overwrite what I just read. That would certainly use the, the table's current schema. So, yeah, that would discard data like you were saying. And in terms of the
[00:29:56] Unknown:
table definition for a heterogeneous combination of files within that directory structure, how do you handle the case where a given file doesn't contain all of the fields that are specified in a given table definition?
[00:30:11] Unknown:
What we do is if if you've added a new column, that column has to be optional because we realize that not all of the data files in the table may have that column. So any new column has to be optional just like, I think Avro schema evolution requires. And so when the the read happens, optional fields that aren't present in a particular file are filled in with nulls. So that's a slight departure from, the way other systems act where you can set a default. In iceberg schemas, the default is always null because we wanted to avoid the the problem that you run into with Avro files, which is that you can't tell if a value was in the file or was the value of the default. So we always default to null and at this time, what we're doing is allowing the query engine to use something like a coalesce to fill in values. I don't think it's the right place to put the the default values at the file format level and and possibly not even the table format level. Although, I think the table format could do that.
I'm I'm not really sure. But what we're trying to do is make sure that we have very clear and well defined rules for schema evolution to avoid cases like that. And 1 of the
[00:31:33] Unknown:
most pernicious and complicated problems in data modeling, and particularly when you're dealing with large volumes of data and distributed systems, is that of managing table partitions. So I'm wondering how Iceberg helps in that regard. Oh, so Iceberg fixes 1 of my pet peeves with
[00:31:53] Unknown:
with tables, and that's that, there is no relationship in a a hive table between partition columns and the data those partition columns are derived from. So that sounds, you know, kind of abstract. So let me give you a concrete example. We partition a lot of tables here by day or day and hour. So the simpler case is just by day. So I'll go with that. Now this is a really common use case. Right? Partitioning your data as it comes in from a streaming system by day or or by timestamp. The problem with the way we do this currently is that when we store the data, we have something that goes through and takes a timestamp field and says, okay, I know how to derive a day from this time stamp. So it builds a date, you know, a date string and then stores that date string in a date column. So now we have a timestamp column and a date column. When you go and read a table, you can use the date column to prune all the partitions that you don't need. But you have this problem where if someone doesn't know the physical structure of the table, that it's partitioned by that date, or they don't know that they need to care about the the physical structure of the table, they don't add the date column because they see this timestamp column and they think, oh, well, I know which time stamps I need and a database is smart enough to figure it out. So what we end up with is, novice users or, users that are used to maybe SQL in other database systems, you know, more traditional, expensive databases.
They don't know that they need a query on partition columns. So what they end up with is a query that takes forever because they didn't know to add a a date filter. What we do in iceberg is we keep track of the relationship between that time stamp column, you know, where what we're deriving this date column from and the date column that we're storing things by. That way, when that user comes and issues a query with a time stamp range, we know how to bake that down into a predicate that matches just the partitions or just the files that they also need. So hiding those partitions from the user is really valuable on the query side. And it's also incredibly useful on the right side because the query engine can be written just once to know, oh, here's how I derive a date from this timestamp instead of pushing that problem onto users when they're storing data in their tables because it's not easy to know the the date function relies on some time zone or, you know, might behave weirdly in in different situations.
What we really want is is to to remove that decision or or that problem from users so they don't have to think about it. Yeah. It's always great to have the computer do what they're best at of performing the same task repeatedly
[00:34:53] Unknown:
every time they get the same type of input rather than relying on humans to remember to do that because humans are terrible at doing that. Oh, exactly.
[00:35:02] Unknown:
And, you know, the there are more benefits to doing this as well. So our initial, motivation for basically having these partition transforms forms instead of partition columns was to take the human error out of generating that partition data and take the human error out of using that partition data. Once we realized that there was this separation between like the logical columns and this physical representation though, you know, how we store it on disk. We realized that we this allows us to change how it's stored on disk. So we can take a table that's partitioned by day and change it into a table that's partitioned by hour and the queries don't have to change because the queries are all written in terms of the time stamp column and not this weird derived column that you have to include for no reason. So it's really nice that this the same principle gets us to the point where we can actually fix tables we can actually fix tables that have had data volume increase over time or maybe instead of when a a user is creating a a proof of concept, right, we can take the table that they use for that proof of concept and fix the partitioning on it instead of saying, okay, your proof of concept works but you're gonna have to go plan for your data volume in a couple years before you can move this to production.
So it really smooths out that pipeline for our our user base where they can test something out in an unpartitioned table and then just go update the table settings or at least that's that's, that's where we're headed. I don't think we have anything exposed other than me going out and changing the partitioning of a table at this point by hand, but, that's where that's definitely where we're headed. And
[00:36:51] Unknown:
as you mentioned before, your primary data lake implementation is running on top of s 3, which is becoming increasingly popular to use s 3 and other object stores instead of the previous generation that relied on HDFS and other distributed file systems that were run by the data platform team. So I'm wondering what are some of the unique challenges that are posed by the fact that you are using s 3 as the basis for your data lake? So I I think we actually touched on this a bit earlier where s 3 has 2 main,
[00:37:27] Unknown:
difficulties for us. So the first 1 is correctness due to, you know, s three's consistency you know, going out and saying, hey, what paths have this prefix or, you know, what object keys have some prefix that we get the the correct output there. We also have a a unique challenge with s 3 because it's a remote store. So a lot of the operations take maybe an order of magnitude longer than they do if you're hitting a named node in h d f in a a local HDFS cluster. So those 2 things you still have those challenges in HDFS where if you're listing too many directories it's slow And there are still times because of how the Hive table layout works, there are still times when even if you get consistent listings, your query results can be incorrect. So you still have all these problems, but they're more pronounced for s 3 tables. So that's why we were more motivated to fix these.
[00:38:31] Unknown:
And what are the benefits of using the s 3 file store as the basis for your platform that outweigh those difficulties?
[00:38:40] Unknown:
First of all, it's really nice not having to have a a static HDFS cluster. A lot of companies have to deal with problems where they have to keep their HDFS cluster up and running all the time. They have to worry about how do we back up this HDFS cluster, you know, all of the data in our cluster and and things like that. For us, because we have separate compute and storage, we view our clusters as just ephemeral compute resources. So we can lose a cluster, you know, lose a name node and we just bring up another 1 in a couple minutes. Well, it might take longer to get the the nodes but, we can bring up a cluster very very quickly. So what we found is that separating compute and storage is really important for us because we can move a whole lot faster and don't have to maintain a lot of infrastructure. S 3 is also great for us because, they take care of the durability problems, they take care of worrying about like erasure coating and things that know, the HDFS community is just now, really grappling with. So those are just handled for us.
But I would I would definitely say that the major thing is not having to worry about long lived clusters and what do we do with this persistent data that's that's sticky on those clusters.
[00:40:04] Unknown:
And in terms of the specification that you're defining for iceberg, you have published it as a Google doc that anybody can go and comment on and suggest edits. And from looking at it, there seems to be a healthy amount of activity and people involved in that process. So I'm curious what you have found to be some of the most challenging or contentious details of the specification to define and some of the discussions that have arisen out of those?
[00:40:34] Unknown:
So the the way that Iceberg tracks, field IDs is a departure from what's already there. And I think a lot of people thought, we've already got by name, field resolution. Why change it? And, you know, we were going for, Really make this so that users never have to care about a problem. And if I go add a, you know, ID column to my table and I find out that there was 1 3 years ago that got deleted and now I'm getting bad data, that's a, you know, that's a problem where we're leaking to users. But we've had questions and and, you know, had some some good discussions on, you know, why are we going to this length to make sure that those problems are eliminated? Why not just say take a step back and say we'll use, you know, some existing schemas and and get through this problem. You know, we've definitely had discussions with people that think, you know, why go this far? But again, we really think it's a a guarantee. We don't want our users to ever have to think, hey, did we have a column with this name 3 years back that's going to, you know, ruin our long term queries.
[00:41:47] Unknown:
What are some of the things that you have explicitly left out of the specification and have there been any discussions or disagreements that arose out of those omissions?
[00:41:58] Unknown:
So there are a couple things. 1 that comes to mind is how to manage the manifest files that store all of the data files in the table. That's purposely ambiguous. So a a snapshot of a table, you know, the the table state at some point in time is the union of all the data files listed in 1 or more manifests. And that is purposely vague so that we have a lot of flexibility with how we manage those manifests. You know, how we compact them, how we write them out to optimize either, you know, quick writes or overall, you know, metadata size or things like that. And that's 1 of the areas where we think there's gonna be a lot of improvement. We didn't wanna preclude a lot of the designs that we could go with in the future.
So we get a few questions about that. There's also some confusion over the fact that we don't have, explicit partitions. So in iceberg, you you do partition data, but the the format really tracks, partitions as a tuple of data values that are the same for all rows in a in a data file. So you can think of files having partitions instead of partitions having files in Iceberg. And, there's a lot of confusion over the fact that a manifest file can store a whole bunch of data files in any partition and not just a single partition. Then You know, there there are healthy discussions on why use Avro as our underlying manifest format and and things like that, you know, and I I think that we're pretty flexible on the long term decisions here. We really want to go forward with something that works and the community is going to rally behind in the long term.
[00:43:46] Unknown:
And what are your long term goals for the iceberg specification?
[00:43:50] Unknown:
We really want iceberg to take off in open source projects, with people using tables in the big data space. We think that this is a pretty significant step forward in terms of, the guarantees that the tables can make in big data. So like I said earlier, there are just times unavoidable times if you're using hive tables when a query against that table might produce incorrect results. And that's really just the fact that it's storing table state partially in the file system and you can't have a transaction in HDFS or s 3 that makes a whole bunch of files show up all at once. So there's just no way to have unless you lock and make sure no one's reading the the table until you're done writing and you confirm that all of the all those files have shown up in listings. There's just no way to have consistency and therefore correctness.
We think that having, correctness solved, having schema evolution solved, having partition predicates and and pruning solved, and and some of these steps forward that that Iceberg is delivering, we think that that's going to be pretty attractive to other companies and not just us. And we really hope that this becomes a community standard that, is a successful open source project that we can all use and benefit from. And do you think that the reference implementation
[00:45:16] Unknown:
that you're using today is going to continue to be used and maintained and evolved? Or do you think that once the specification has been, sort of ratified and confirmed by a community consensus that there will be other implementations that will eventually take its place?
[00:45:36] Unknown:
I don't know when we would stop maintaining the current 1. After all we're building it into, Presto, you know, we're building it into Presto, We're building it into Spark. We've got a a pig library. So, you know, we're really using the that 1 implementation in a lot of places. I think that's gonna probably make the center of gravity in the Java world the the reference implementation. That said, you know, if if there's a better 1 out there, then that that's great. I hope that, you know, we we really do standardize on a table format that can have multiple, you know, correct implementations.
I still think that there is going to be 1 probably in the c plus plus world as well and we're implementing something in in Java. So sorry. Of course, we're implementing something in Java, and we're implementing 1 in Python as well. So there there are definitely going to be multiple implementations implementations of the table spec, and and I just don't see a reason why the the current 1 in the JVM space would be superseded. Yeah. You I mean, superseded if we stop maintaining it, certainly, but
[00:46:42] Unknown:
we're we're definitely all in on on maintaining this. And do you have any sort of timeline in mind or any criteria to determine when the specification itself can be deemed complete and brought to a community vote or through some sort of confirmation process to say this is the complete version of this specification
[00:47:06] Unknown:
at this point in time so that there can be a stable reference point to work from if there are any revisions in the future? Yeah. So I I think that's gonna require a couple of things. First of all, we wanna get it to the point where there aren't, things that we we know we should probably change about it. And at this point, there are a couple things that we're thinking, yeah, we might wanna change those. So and I should mention that the the Java reference implementation will have backward compatibility. I've worked in file formats for a long time so I understand the forward and backward compatibility concerns and we'll make that that transition, super smooth if if it needs to be done. But there are some, you know, completely compatible changes that we think we wanna do as well as, at least 1 incompatible change that I think we're going to need. We would want the the format to be stable for at least some period of time. And the other the other blocker for us to declare a a stable format is really, you know, once a community has formed around this. I think that the discussion on the doc has gone really well and I'm I'm very excited to see great people, with good questions talking on that doc. Once we have that community of people and and, you know, that are involved in the project, that care about the project, you know, it's really going to that group of people and saying, hey, do we think that this is a finished specification that we can stamp a 1.0 on? Right now, I don't think that we've, you know, filled out that community of people that that care and that would, take part in that, vote.
So
[00:48:42] Unknown:
it's just a little premature at this point. And are there any other aspects of the iceberg project or specification or the,
[00:48:52] Unknown:
underlying issues that it's addressing that we didn't discuss yet which you think we should cover before we close out the show? Oh, so what gets me really excited about the project right now is all of the ways that we didn't see initially that it's changing the way we view what our platform can do. So I could, you know, just briefly mention some of the things that we're excited about for for future work. So 1 of them is, you know, we wanted to fix like being able to trust rights to our tables. And it turns out once you can trust writing to your tables, you can write a whole lot more often, right? So if you trust writing to your table then you'll you'll build things that actually do write more often. So we're building a process to automatically tune our tables.
You know, do that background compaction that we were talking about earlier. Not only that, we're building something to go take a look at a table and take an individual data file, rewrite it 10 times with different settings and figure out what the optimal settings or or tuning for parquet is for this table and then update the table configuration so that that table is always writing optimal files. Those sorts of things we just didn't anticipate at first, but having this central library where we can put where we can configure file tuning settings at a table level, having the ability to have a background process decide, hey, I'm gonna go rewrite files in this table for efficiency. Those are really exciting thing. Another area where we didn't really think of this ahead of time is cost based optimization.
So right now, it's a challenge to get good metrics for cost based optimization in Spark and Presto and and some of the databases that are coming out with you know new cost based optimizers. Hive tables because the data underlying the table can change at any time your your metrics may just be out of date. So when you're tracking at the per file level, you can have really accurate stats on how many rows you wrote into that file, what were the min max values in that file, and and things like that. So we're getting, you know, really high quality metrics for cost based optimization. So I'm really excited to see where that goes because I think cost based optimization is, you know, the next big thing for both Presto and Spark and really gonna be a huge positive win for our our user base.
[00:51:21] Unknown:
Alright. For anybody who wants to get in touch with you and keep up to date with the work that you're doing, I'll have you add your preferred contact information to the show notes. And as a final question, I'd like to get your perspective on what you see as being the biggest gap in the tooling or technology that that's available for data management
[00:51:39] Unknown:
today? You mean other than the table format?
[00:51:42] Unknown:
Yeah. Other than the work that we just discussed.
[00:51:46] Unknown:
I I think the the biggest challenge, is the lack of cost based optimization like we were just talking about a minute ago. A lot of our users are coming from databases that can reorder joins and and, you know, do some fairly, simple things based on that. And if you assume that the database is gonna know how to prune your partitions, the database is gonna reorder joins and things like that, it's just a completely different mindset. And when you run a query, it's just slow and it seems awful. So I think that, being able to really fix the plans automatically is going to widen the the base of users we can serve with these systems.
And I think that's a, you know, general trend that we're seeing, right, becoming easier and easier, better and better optimizations. And that's certainly what what we want for our platform. You know. Make it so that data engineers don't have to worry about something because worrying about how to make Spark run faster is, you know, not, I mean, it's an important part of their jobs today, but we want to free up their time so they can work on, you know, other things, more important data engineering tasks. And we wanna, you know, broaden our platform so that it can be used by anyone and not just data engineers who know how to do those things.
[00:53:12] Unknown:
Alright. Well, thank you very much for taking the time today to discuss the work that you're doing with Iceberg, and building a reliable platform for your, data needs at Netflix. So it's definitely a very interesting project and 1 that seems like it's bearing a lot of fruit. So I'm excited to see how it progresses into the future. So thank you for that, and I hope you enjoy the rest of your evening. Thanks. You too. It was it was wonderful to talk to you. Thanks for the opportunity.
Introduction and Guest Introduction
Ryan Blue's Background and Journey
Introduction to Iceberg
Iceberg's Metadata and Query Engine Integration
Iceberg's Impact on Netflix's Data Management
Iceberg's Architecture and Evolution
Deployment and Integration of Iceberg
Schema Evolution and Compaction
Managing Table Partitions with Iceberg
Challenges and Benefits of Using S3
Iceberg Specification and Community Involvement
Future Goals and Community Consensus
Exciting Future Prospects for Iceberg
Biggest Gaps in Data Management Tooling
Closing Remarks