Process a Bigtable change stream

This tutorial shows how to deploy a data pipeline to Dataflow for a real-time stream of database changes sourced from a Bigtable table's change stream. The output of the pipeline is written to a series of files on Cloud Storage.

An example dataset for a music listening application is provided. In this tutorial, you track songs that are listened to and then rank the top five over a period.

This tutorial is intended for technical users familiar with writing code and deploying data pipelines to Google Cloud.

Prepare the environment

Get the code

Clone the repository that contains the sample code. If you already previously downloaded this repository, pull to get the latest version.

git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
cd java-docs-samples/bigtable/beam/change-streams

Create a bucket

  • Create a Cloud Storage bucket:
    gcloud storage buckets create gs://BUCKET_NAME
    Replace BUCKET_NAME with a bucket name that meets the bucket naming requirements.
  • Create a Bigtable instance

    You can use an existing instance for this tutorial or create an instance with the default configurations in a region near you.

    Create a table

    The sample application tracks the songs that users listen to and stores the listen events in Bigtable. Create a table with a change stream enabled that has one column family (cf) and one column (song) and uses user IDs for row keys.

    Create the table.

    gcloud bigtable instances tables create song-rank \
    --column-families=cf --change-stream-retention-period=7d \
    --instance=BIGTABLE_INSTANCE_ID --project=PROJECT_ID
    

    Replace the following:

    • PROJECT_ID: the ID of the project that you are using
    • BIGTABLE_INSTANCE_ID: the ID of the instance to contain the new table

    Start the pipeline

    This pipeline transforms the change stream by doing the following:

    1. Reads the change stream
    2. Gets the song name
    3. Groups the song listen events into N-second windows
    4. Counts the top five songs
    5. Outputs the results

    Run the pipeline.

    mvn compile exec:java -Dexec.mainClass=SongRank \
    "-Dexec.args=--project=PROJECT_ID --bigtableProjectId=PROJECT_ID \
    --bigtableInstanceId=BIGTABLE_INSTANCE_ID --bigtableTableId=song-rank \
    --outputLocation=gs://BUCKET_NAME/ \
    --runner=dataflow --region=BIGTABLE_REGION --experiments=use_runner_v2"
    

    Replace BIGTABLE_REGION with the ID of the region that your Bigtable instance is in, such as us-east5.

    Understand the pipeline

    The following snippets of code from the pipeline can help you understand the code you are running.

    Reading the change stream

    The code in this sample configures the source stream with the parameters for the specific Bigtable instance and table.

    p.apply(
            "Stream from Bigtable",
            BigtableIO.readChangeStream()
                .withProjectId(options.getBigtableProjectId())
                .withInstanceId(options.getBigtableInstanceId())
                .withTableId(options.getBigtableTableId())
                .withAppProfileId(options.getBigtableAppProfile())
    
        )

    Getting the song name

    When a song is listened to, the song name is written to the column family cf and column qualifier song, so the code extracts the value from the change stream mutation and outputs it to the next step of the pipeline.

    private static class ExtractSongName extends DoFn<KV<ByteString, ChangeStreamMutation>, String> {
    
      @DoFn.ProcessElement
      public void processElement(ProcessContext c) {
    
        for (Entry e : Objects.requireNonNull(Objects.requireNonNull(c.element()).getValue())
            .getEntries()) {
          if (e instanceof SetCell) {
            SetCell setCell = (SetCell) e;
            if ("cf".equals(setCell.getFamilyName())
                && "song".equals(setCell.getQualifier().toStringUtf8())) {
              c.output(setCell.getValue().toStringUtf8());
            }
          }
        }
      }
    }

    Counting the top five songs

    You can use the built-in Beam functions Count and Top.of to get the top five songs in the current window.

    .apply(Count.perElement())
    .apply("Top songs", Top.of(5, new SongComparator()).withoutDefaults())

    Outputting the results

    This pipeline writes the results to standard out as well as files. For the files, it windows the writes into groups of 10 elements or one-minute segments.

    .apply("Print", ParDo.of(new PrintFn()))
    .apply(
        "Collect at least 10 elements or 1 minute of elements",
        Window.<String>into(new GlobalWindows())
            .triggering(
                Repeatedly.forever(
                    AfterFirst.of(
                        AfterPane.elementCountAtLeast(10),
                        AfterProcessingTime
                            .pastFirstElementInPane()
                            .plusDelayOf(Duration.standardMinutes(1)
                            )
                    )
                ))
            .discardingFiredPanes())
    .apply(
        "Output top songs",
        TextIO.write()
            .to(options.getOutputLocation() + "song-charts/")
            .withSuffix(".txt")
            .withNumShards(1)
            .withWindowedWrites()
    );

    View the pipeline

    1. In the Google Cloud console, go to the Dataflow page.

      Go to Dataflow

    2. Click the job with a name that begins with song-rank.

    3. At the bottom of the screen click Show to open the logs panel.

    4. Click Worker logs to monitor the output logs of the change stream.

    Stream writes

    Use the cbt CLI to write a number of song listens for various users to the song-rank table. This is designed to write over a few minutes to simulate song listens streaming in over time.

    cbt -instance=BIGTABLE_INSTANCE_ID -project=PROJECT_ID import \
    song-rank song-rank-data.csv  column-family=cf batch-size=1
    

    View the output

    Read the output on Cloud Storage to see the most popular songs.

    gcloud storage cat gs://BUCKET_NAME/song-charts/GlobalWindow-pane-0-00000-of-00001.txt
    

    Example output:

    2023-07-06T19:53:38.232Z [KV{The Wheels on the Bus, 199}, KV{Twinkle, Twinkle, Little Star, 199}, KV{Ode to Joy , 192}, KV{Row, Row, Row Your Boat, 186}, KV{Take Me Out to the Ball Game, 182}]
    2023-07-06T19:53:49.536Z [KV{Old MacDonald Had a Farm, 20}, KV{Take Me Out to the Ball Game, 18}, KV{Für Elise, 17}, KV{Ode to Joy , 15}, KV{Mary Had a Little Lamb, 12}]
    2023-07-06T19:53:50.425Z [KV{Twinkle, Twinkle, Little Star, 20}, KV{The Wheels on the Bus, 17}, KV{Row, Row, Row Your Boat, 13}, KV{Happy Birthday to You, 12}, KV{Over the Rainbow, 9}]