Flink Queries🔗
Iceberg support streaming and batch read With Apache Flink's DataStream API and Table API.
Reading with SQL🔗
Iceberg support both streaming and batch read in Flink. Execute the following sql command to switch execution mode from streaming to batch, and vice versa:
-- Execute the flink job in streaming mode for current session context
SET execution.runtime-mode = streaming;
-- Execute the flink job in batch mode for current session context
SET execution.runtime-mode = batch;
Flink batch read🔗
Submit a Flink batch job using the following sentences:
-- Execute the flink job in batch mode for current session context
SET execution.runtime-mode = batch;
SELECT * FROM sample;
Flink streaming read🔗
Iceberg supports processing incremental data in Flink streaming jobs which starts from a historical snapshot-id:
-- Submit the flink job in streaming mode for current session.
SET execution.runtime-mode = streaming;
-- Enable this switch because streaming read SQL will provide few job options in flink SQL hint options.
SET table.dynamic-table-options.enabled=true;
-- Read all the records from the iceberg current snapshot, and then read incremental data starting from that snapshot.
SELECT * FROM sample /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s')*/ ;
-- Read all incremental data starting from the snapshot-id '3821550127947089987' (records from this snapshot will be excluded).
SELECT * FROM sample /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'start-snapshot-id'='3821550127947089987')*/ ;
There are some options that could be set in Flink SQL hint options for streaming job, see read options for details.
FLIP-27 source for SQL🔗
Here is the SQL setting to opt in or out of the FLIP-27 source.
-- Opt out the FLIP-27 source.
-- Default is false for Flink 1.19 and below, and true for Flink 1.20 and above.
SET table.exec.iceberg.use-flip27-source = false;
All other SQL settings and options documented above are applicable to the FLIP-27 source.
Reading branches and tags with SQL🔗
Branch and tags can be read via SQL by specifying options. For more details refer to Flink Configuration
--- Read from branch b1
SELECT * FROM table /*+ OPTIONS('branch'='b1') */ ;
--- Read from tag t1
SELECT * FROM table /*+ OPTIONS('tag'='t1') */;
--- Incremental scan from tag t1 to tag t2
SELECT * FROM table /*+ OPTIONS('streaming'='true', 'monitor-interval'='1s', 'start-tag'='t1', 'end-tag'='t2') */;
Reading with DataStream🔗
Iceberg support streaming or batch read in Java API now.
Batch Read🔗
This example will read all records from iceberg table and then print to the stdout console in flink batch job:
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path");
DataStream<RowData> batch = FlinkSource.forRowData()
     .env(env)
     .tableLoader(tableLoader)
     .streaming(false)
     .build();
// Print all records to stdout.
batch.print();
// Submit and execute this batch read job.
env.execute("Test Iceberg Batch Read");
Streaming read🔗
This example will read incremental records which start from snapshot-id '3821550127947089987' and print to stdout console in flink streaming job:
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path");
DataStream<RowData> stream = FlinkSource.forRowData()
     .env(env)
     .tableLoader(tableLoader)
     .streaming(true)
     .startSnapshotId(3821550127947089987L)
     .build();
// Print all records to stdout.
stream.print();
// Submit and execute this streaming read job.
env.execute("Test Iceberg Streaming Read");
There are other options that can be set, please see the FlinkSource#Builder.
Reading with DataStream (FLIP-27 source)🔗
FLIP-27 source interface
was introduced in Flink 1.12. It aims to solve several shortcomings of the old SourceFunction
streaming source interface. It also unifies the source interfaces for both batch and streaming executions.
Most source connectors (like Kafka, file) in Flink repo have  migrated to the FLIP-27 interface.
Flink is planning to deprecate the old SourceFunction interface in the near future.
A FLIP-27 based Flink IcebergSource is added in iceberg-flink module. The FLIP-27 IcebergSource is currently an experimental feature.
Batch Read🔗
This example will read all records from iceberg table and then print to the stdout console in flink batch job:
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path");
IcebergSource<RowData> source = IcebergSource.forRowData()
    .tableLoader(tableLoader)
    .assignerFactory(new SimpleSplitAssignerFactory())
    .build();
DataStream<RowData> batch = env.fromSource(
    source,
    WatermarkStrategy.noWatermarks(),
    "My Iceberg Source",
    TypeInformation.of(RowData.class));
// Print all records to stdout.
batch.print();
// Submit and execute this batch read job.
env.execute("Test Iceberg Batch Read");
Streaming read🔗
This example will start the streaming read from the latest table snapshot (inclusive). Every 60s, it polls Iceberg table to discover new append-only snapshots. CDC read is not supported yet.
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path");
IcebergSource source = IcebergSource.forRowData()
    .tableLoader(tableLoader)
    .assignerFactory(new SimpleSplitAssignerFactory())
    .streaming(true)
    .streamingStartingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT)
    .monitorInterval(Duration.ofSeconds(60))
    .build();
DataStream<RowData> stream = env.fromSource(
    source,
    WatermarkStrategy.noWatermarks(),
    "My Iceberg Source",
    TypeInformation.of(RowData.class));
// Print all records to stdout.
stream.print();
// Submit and execute this streaming read job.
env.execute("Test Iceberg Streaming Read");
There are other options that could be set by Java API, please see the IcebergSource#Builder.
Reading branches and tags with DataStream🔗
Branches and tags can also be read via the DataStream API
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path");
// Read from branch
DataStream<RowData> batch = FlinkSource.forRowData()
    .env(env)
    .tableLoader(tableLoader)
    .branch("test-branch")
    .streaming(false)
    .build();
// Read from tag
DataStream<RowData> batch = FlinkSource.forRowData()
    .env(env)
    .tableLoader(tableLoader)
    .tag("test-tag")
    .streaming(false)
    .build();
// Streaming read from start-tag
DataStream<RowData> batch = FlinkSource.forRowData()
    .env(env)
    .tableLoader(tableLoader)
    .streaming(true)
    .startTag("test-tag")
    .build();
Read as Avro GenericRecord🔗
FLIP-27 Iceberg source provides AvroGenericRecordReaderFunction that converts
Flink RowData Avro GenericRecord. You can use the convert to read from
Iceberg table as Avro GenericRecord DataStream.
Please make sure flink-avro jar is included in the classpath.
Also iceberg-flink-runtime shaded bundle jar can't be used
because the runtime jar shades the avro package.
Please use non-shaded iceberg-flink jar instead.
TableLoader tableLoader = ...;
Table table;
try (TableLoader loader = tableLoader) {
    loader.open();
    table = loader.loadTable();
}
AvroGenericRecordReaderFunction readerFunction = AvroGenericRecordReaderFunction.fromTable(table);
IcebergSource<GenericRecord> source =
    IcebergSource.<GenericRecord>builder()
        .tableLoader(tableLoader)
        .readerFunction(readerFunction)
        .assignerFactory(new SimpleSplitAssignerFactory())
        ...
        .build();
DataStream<Row> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(),
    "Iceberg Source as Avro GenericRecord", new GenericRecordAvroTypeInfo(avroSchema));
Emitting watermarks🔗
Emitting watermarks from the source itself could be beneficial for several purposes, like harnessing the Flink Watermark Alignment, or prevent triggering windows too early when reading multiple data files concurrently.
Enable watermark generation for an IcebergSource by setting the watermarkColumn.
The supported column types are timestamp, timestamptz and long.
Iceberg timestamp or timestamptz inherently contains the time precision. So there is no need
to specify the time unit. But long type column doesn't contain time unit information. Use
watermarkTimeUnit to configure the conversion for long columns.
The watermarks are generated based on column metrics stored for data files and emitted once per split.
If multiple smaller files with different time ranges are combined into a single split, it can increase
the out-of-orderliness and extra data buffering in the Flink state. The main purpose of watermark alignment
is to reduce out-of-orderliness and excess data buffering in the Flink state. Hence it is recommended to
set read.split.open-file-cost to a very large value to prevent combining multiple smaller files into a
single split. The negative impact (of not combining small files into a single split) is on read throughput,
especially if there are many small files. In typical stateful processing jobs, source read throughput is not
the bottleneck. Hence this is probably a reasonable tradeoff.
This feature requires column-level min-max stats. Make sure stats are generated for the watermark column
during write phase. By default, the column metrics are collected for the first 100 columns of the table.
If watermark column doesn't have stats enabled by default, use
write properties starting with write.metadata.metrics when needed.
The following example could be useful if watermarks are used for windowing. The source reads Iceberg data files in order, using a timestamp column and emits watermarks:
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path");
DataStream<RowData> stream =
    env.fromSource(
        IcebergSource.forRowData()
            .tableLoader(tableLoader)
            // Watermark using timestamp column
            .watermarkColumn("timestamp_column")
            .build(),
        // Watermarks are generated by the source, no need to generate it manually
        WatermarkStrategy.<RowData>noWatermarks()
            // Extract event timestamp from records
            .withTimestampAssigner((record, eventTime) -> record.getTimestamp(pos, precision).getMillisecond()),
        SOURCE_NAME,
        TypeInformation.of(RowData.class));
Example for reading Iceberg table using a long event column for watermark alignment:
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
TableLoader tableLoader = TableLoader.fromHadoopTable("hdfs://nn:8020/warehouse/path");
DataStream<RowData> stream =
    env.fromSource(
        IcebergSource source = IcebergSource.forRowData()
            .tableLoader(tableLoader)
            // Disable combining multiple files to a single split 
            .set(FlinkReadOptions.SPLIT_FILE_OPEN_COST, String.valueOf(TableProperties.SPLIT_SIZE_DEFAULT))
            // Watermark using long column
            .watermarkColumn("long_column")
            .watermarkTimeUnit(TimeUnit.MILLI_SCALE)
            .build(),
        // Watermarks are generated by the source, no need to generate it manually
        WatermarkStrategy.<RowData>noWatermarks()
            .withWatermarkAlignment(watermarkGroup, maxAllowedWatermarkDrift),
        SOURCE_NAME,
        TypeInformation.of(RowData.class));
Options🔗
Read options🔗
Flink read options are passed when configuring the Flink IcebergSource:
IcebergSource.forRowData()
    .tableLoader(TableLoader.fromCatalog(...))
    .assignerFactory(new SimpleSplitAssignerFactory())
    .streaming(true)
    .streamingStartingStrategy(StreamingStartingStrategy.INCREMENTAL_FROM_LATEST_SNAPSHOT)
    .startSnapshotId(3821550127947089987L)
    .monitorInterval(Duration.ofMillis(10L)) // or .set("monitor-interval", "10s") \ set(FlinkReadOptions.MONITOR_INTERVAL, "10s")
    .build()
For Flink SQL, read options can be passed in via SQL hints like this:
Options can be passed in via Flink configuration, which will be applied to current session. Note that not all options support this mode.
Check out all the options here: read-options
Inspecting tables🔗
To inspect a table's history, snapshots, and other metadata, Iceberg supports metadata tables.
Metadata tables are identified by adding the metadata table name after the original table name. For example, history for db.table is read using db.table$history.
History🔗
To show table history:
| made_current_at | snapshot_id | parent_id | is_current_ancestor | 
|---|---|---|---|
| 2019-02-08 03:29:51.215 | 5781947118336215154 | NULL | true | 
| 2019-02-08 03:47:55.948 | 5179299526185056830 | 5781947118336215154 | true | 
| 2019-02-09 16:24:30.13 | 296410040247533544 | 5179299526185056830 | false | 
| 2019-02-09 16:32:47.336 | 2999875608062437330 | 5179299526185056830 | true | 
| 2019-02-09 19:42:03.919 | 8924558786060583479 | 2999875608062437330 | true | 
| 2019-02-09 19:49:16.343 | 6536733823181975045 | 8924558786060583479 | true | 
Info
This shows a commit that was rolled back. In this example, snapshot 296410040247533544 and 2999875608062437330 have the same parent snapshot 5179299526185056830. Snapshot 296410040247533544 was rolled back and is not an ancestor of the current table state.
Metadata Log Entries🔗
To show table metadata log entries:
| timestamp | file | latest_snapshot_id | latest_schema_id | latest_sequence_number | 
|---|---|---|---|---|
| 2022-07-28 10:43:52.93 | s3://.../table/metadata/00000-9441e604-b3c2-498a-a45a-6320e8ab9006.metadata.json | null | null | null | 
| 2022-07-28 10:43:57.487 | s3://.../table/metadata/00001-f30823df-b745-4a0a-b293-7532e0c99986.metadata.json | 170260833677645300 | 0 | 1 | 
| 2022-07-28 10:43:58.25 | s3://.../table/metadata/00002-2cc2837a-02dc-4687-acc1-b4d86ea486f4.metadata.json | 958906493976709774 | 0 | 2 | 
Snapshots🔗
To show the valid snapshots for a table:
| committed_at | snapshot_id | parent_id | operation | manifest_list | summary | 
|---|---|---|---|---|---|
| 2019-02-08 03:29:51.215 | 57897183625154 | null | append | s3://.../table/metadata/snap-57897183625154-1.avro | { added-records -> 2478404, total-records -> 2478404, added-data-files -> 438, total-data-files -> 438, flink.job-id -> 2e274eecb503d85369fb390e8956c813 } | 
You can also join snapshots to table history. For example, this query will show table history, with the application ID that wrote each snapshot:
select
    h.made_current_at,
    s.operation,
    h.snapshot_id,
    h.is_current_ancestor,
    s.summary['flink.job-id']
from prod.db.table$history h
join prod.db.table$snapshots s
  on h.snapshot_id = s.snapshot_id
order by made_current_at;
| made_current_at | operation | snapshot_id | is_current_ancestor | summary[flink.job-id] | 
|---|---|---|---|---|
| 2019-02-08 03:29:51.215 | append | 57897183625154 | true | 2e274eecb503d85369fb390e8956c813 | 
Files🔗
To show a table's current data files:
| content | file_path | file_format | spec_id | partition | record_count | file_size_in_bytes | column_sizes | value_counts | null_value_counts | nan_value_counts | lower_bounds | upper_bounds | key_metadata | split_offsets | equality_ids | sort_order_id | 
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| 0 | s3:/.../table/data/00000-3-8d6d60e8-d427-4809-bcf0-f5d45a4aad96.parquet | PARQUET | 0 | {1999-01-01, 01} | 1 | 597 | [1 -> 90, 2 -> 62] | [1 -> 1, 2 -> 1] | [1 -> 0, 2 -> 0] | [] | [1 -> , 2 -> c] | [1 -> , 2 -> c] | null | [4] | null | null | 
| 0 | s3:/.../table/data/00001-4-8d6d60e8-d427-4809-bcf0-f5d45a4aad96.parquet | PARQUET | 0 | {1999-01-01, 02} | 1 | 597 | [1 -> 90, 2 -> 62] | [1 -> 1, 2 -> 1] | [1 -> 0, 2 -> 0] | [] | [1 -> , 2 -> b] | [1 -> , 2 -> b] | null | [4] | null | null | 
| 0 | s3:/.../table/data/00002-5-8d6d60e8-d427-4809-bcf0-f5d45a4aad96.parquet | PARQUET | 0 | {1999-01-01, 03} | 1 | 597 | [1 -> 90, 2 -> 62] | [1 -> 1, 2 -> 1] | [1 -> 0, 2 -> 0] | [] | [1 -> , 2 -> a] | [1 -> , 2 -> a] | null | [4] | null | null | 
Manifests🔗
To show a table's current file manifests:
| path | length | partition_spec_id | added_snapshot_id | added_data_files_count | existing_data_files_count | deleted_data_files_count | partition_summaries | 
|---|---|---|---|---|---|---|---|
| s3://.../table/metadata/45b5290b-ee61-4788-b324-b1e2735c0e10-m0.avro | 4479 | 0 | 6668963634911763636 | 8 | 0 | 0 | [[false,null,2019-05-13,2019-05-15]] | 
Note:
- Fields within partition_summariescolumn of the manifests table correspond tofield_summarystructs within manifest list, with the following order:- contains_null
- contains_nan
- lower_bound
- upper_bound
 
- contains_nancould return null, which indicates that this information is not available from the file's metadata. This usually occurs when reading from V1 table, where- contains_nanis not populated.
Partitions🔗
To show a table's current partitions:
| partition | spec_id | record_count | file_count | total_data_file_size_in_bytes | position_delete_record_count | position_delete_file_count | equality_delete_record_count | equality_delete_file_count | last_updated_at(μs) | last_updated_snapshot_id | 
|---|---|---|---|---|---|---|---|---|---|---|
| {20211001, 11} | 0 | 1 | 1 | 100 | 2 | 1 | 0 | 0 | 1633086034192000 | 9205185327307503337 | 
| {20211002, 11} | 0 | 4 | 3 | 500 | 1 | 1 | 0 | 0 | 1633172537358000 | 867027598972211003 | 
| {20211001, 10} | 0 | 7 | 4 | 700 | 0 | 0 | 0 | 0 | 1633082598716000 | 3280122546965981531 | 
| {20211002, 10} | 0 | 3 | 2 | 400 | 0 | 0 | 1 | 1 | 1633169159489000 | 6941468797545315876 | 
Note: For unpartitioned tables, the partitions table will not contain the partition and spec_id fields.
All Metadata Tables🔗
These tables are unions of the metadata tables specific to the current snapshot, and return metadata across all snapshots.
Danger
The "all" metadata tables may produce more than one row per data file or manifest file because metadata files may be part of more than one table snapshot.
All Data Files🔗
To show all of the table's data files and each file's metadata:
| content | file_path | file_format | partition | record_count | file_size_in_bytes | column_sizes | value_counts | null_value_counts | nan_value_counts | lower_bounds | upper_bounds | key_metadata | split_offsets | equality_ids | sort_order_id | 
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| 0 | s3://.../dt=20210102/00000-0-756e2512-49ae-45bb-aae3-c0ca475e7879-00001.parquet | PARQUET | {20210102} | 14 | 2444 | {1 -> 94, 2 -> 17} | {1 -> 14, 2 -> 14} | {1 -> 0, 2 -> 0} | {} | {1 -> 1, 2 -> 20210102} | {1 -> 2, 2 -> 20210102} | null | [4] | null | 0 | 
| 0 | s3://.../dt=20210103/00000-0-26222098-032f-472b-8ea5-651a55b21210-00001.parquet | PARQUET | {20210103} | 14 | 2444 | {1 -> 94, 2 -> 17} | {1 -> 14, 2 -> 14} | {1 -> 0, 2 -> 0} | {} | {1 -> 1, 2 -> 20210103} | {1 -> 3, 2 -> 20210103} | null | [4] | null | 0 | 
| 0 | s3://.../dt=20210104/00000-0-a3bb1927-88eb-4f1c-bc6e-19076b0d952e-00001.parquet | PARQUET | {20210104} | 14 | 2444 | {1 -> 94, 2 -> 17} | {1 -> 14, 2 -> 14} | {1 -> 0, 2 -> 0} | {} | {1 -> 1, 2 -> 20210104} | {1 -> 3, 2 -> 20210104} | null | [4] | null | 0 | 
All Manifests🔗
To show all of the table's manifest files:
| path | length | partition_spec_id | added_snapshot_id | added_data_files_count | existing_data_files_count | deleted_data_files_count | partition_summaries | 
|---|---|---|---|---|---|---|---|
| s3://.../metadata/a85f78c5-3222-4b37-b7e4-faf944425d48-m0.avro | 6376 | 0 | 6272782676904868561 | 2 | 0 | 0 | [{false, false, 20210101, 20210101}] | 
Note:
- Fields within partition_summariescolumn of the manifests table correspond tofield_summarystructs within manifest list, with the following order:- contains_null
- contains_nan
- lower_bound
- upper_bound
 
- contains_nancould return null, which indicates that this information is not available from the file's metadata. This usually occurs when reading from V1 table, where- contains_nanis not populated.
References🔗
To show a table's known snapshot references:
| name | type | snapshot_id | max_reference_age_in_ms | min_snapshots_to_keep | max_snapshot_age_in_ms | 
|---|---|---|---|---|---|
| main | BRANCH | 4686954189838128572 | 10 | 20 | 30 | 
| testTag | TAG | 4686954189838128572 | 10 | null | null |