Bigtable change streams to Pub/Sub template

The Bigtable change streams to Pub/Sub template is a streaming pipeline that streams Bigtable data change records and publishes them to a Pub/Sub topic by using Dataflow.

A Bigtable change stream lets you subscribe to data mutations on a per-table basis. When you subscribe to table change streams, the following constraints apply:

  • Only modified cells and descriptors of delete operations are returned.
  • Only the new value of a modified cell is returned.

When data change records are published to a Pub/Sub topic, messages might be inserted out of order compared to the original Bigtable commit timestamp ordering.

Bigtable data change records that can't be published to Pub/Sub topics are temporarily placed in a dead-letter queue (unprocessed messages queue) directory in Cloud Storage. After the maximum number of unsuccessful retries, these records are indefinitely placed in the same dead-letter queue directory for human review or further processing by the user.

The pipeline requires that the destination Pub/Sub topic exists. The destination topic might be configured to validate messages using a schema. When a Pub/Sub topic specifies a schema, the pipeline only starts if the schema is valid. Depending on the schema type, use one of the following schema definitions for the destination topic:

Protocol buffers

syntax = "proto2";

package com.google.cloud.teleport.bigtable;

option java_outer_classname = "ChangeLogEntryProto";

message ChangelogEntryProto{
  required bytes rowKey = 1;
  enum ModType {
    SET_CELL = 0;
    DELETE_FAMILY = 1;
    DELETE_CELLS = 2;
    UNKNOWN = 3;
  }
  required ModType modType = 2;
  required bool isGC = 3;
  required int32 tieBreaker = 4;
  required int64 commitTimestamp = 5;
  required string columnFamily = 6;
  optional bytes column = 7;
  optional int64 timestamp = 8;
  optional int64 timestampFrom = 9;
  optional int64 timestampTo = 10;
  optional bytes value = 11;
  required string sourceInstance = 12;
  required string sourceCluster = 13;
  required string sourceTable = 14;
}
  

Avro

{
    "name" : "ChangelogEntryMessage",
    "type" : "record",
    "namespace" : "com.google.cloud.teleport.bigtable",
    "fields" : [
      { "name" : "rowKey", "type" : "bytes"},
      {
        "name" : "modType",
        "type" : {
          "name": "ModType",
          "type": "enum",
          "symbols": ["SET_CELL", "DELETE_FAMILY", "DELETE_CELLS", "UNKNOWN"]}
      },
      { "name": "isGC", "type": "boolean" },
      { "name": "tieBreaker", "type": "int"},
      { "name": "columnFamily", "type": "string"},
      { "name": "commitTimestamp", "type" : "long"},
      { "name" : "sourceInstance", "type" : "string"},
      { "name" : "sourceCluster", "type" : "string"},
      { "name" : "sourceTable", "type" : "string"},
      { "name": "column", "type" : ["null", "bytes"]},
      { "name": "timestamp", "type" : ["null", "long"]},
      { "name": "timestampFrom", "type" : ["null", "long"]},
      { "name": "timestampTo", "type" : ["null", "long"]},
      { "name" : "value", "type" : ["null", "bytes"]}
   ]
}
    

JSON

Use the following Protobuf schema with JSON message encoding:

syntax = "proto2";

package com.google.cloud.teleport.bigtable;

option java_outer_classname = "ChangelogEntryMessageText";

message ChangelogEntryText{
  required string rowKey = 1;
  enum ModType {
    SET_CELL = 0;
    DELETE_FAMILY = 1;
    DELETE_CELLS = 2;
    UNKNOWN = 3;
  }
  required ModType modType = 2;
  required bool isGC = 3;
  required int32 tieBreaker = 4;
  required int64 commitTimestamp = 5;
  required string columnFamily = 6;
  optional string column = 7;
  optional int64 timestamp = 8;
  optional int64 timestampFrom = 9;
  optional int64 timestampTo = 10;
  optional string value = 11;
  required string sourceInstance = 12;
  required string sourceCluster = 13;
  required string sourceTable = 14;
}
    

Each new Pub/Sub message includes one entry from a data change record returned by the change stream from its corresponding row in your Bigtable table. The Pub/Sub template flattens the entries in each data change record into individual cell-level changes.

Pub/Sub output message description

Field name Description
rowKey The row key of the changed row. Arrives in a form of a byte array. When JSON message encoding is configured, row keys are returned as strings. When useBase64Rowkeys is specified, row keys are Base64-encoded. Otherwise, a charset specified by bigtableChangeStreamCharset is used to decode row key bytes into a string.
modType The type of the row mutation. Use one of the following values: SET_CELL, DELETE_CELLS, or DELETE_FAMILY.
columnFamily The column family affected by the row mutation.
column The column qualifier affected by the row mutation. For the DELETE_FAMILY mutation type, the column field isn't set. Arrives in a form of a byte array. When JSON message encoding is configured, columns are returned as strings. When useBase64ColumnQualifier is specified, the column field is Base64-encoded. Otherwise, a charset specified by bigtableChangeStreamCharset is used to decode row key bytes into a string.
commitTimestamp The time when Bigtable applies the mutation. The time is measured in microseconds since the Unix epoch (January 1, 1970 at UTC).
timestamp The timestamp value of the cell affected by the mutation. For DELETE_CELLS and DELETE_FAMILY mutation types, timestamp is not set. The time is measured in microseconds since the Unix epoch (January 1, 1970 at UTC).
timestampFrom Describes an inclusive start of the timestamp interval for all cells deleted by the DELETE_CELLS mutation. For other mutation types, timestampFrom isn't set. The time is measured in microseconds since the Unix epoch (January 1, 1970 at UTC).
timestampTo Describes an exclusive end of the timestamp interval for all cells deleted by the DELETE_CELLS mutation. For other mutation types, timestampTo isn't set.
isGC A boolean value that indicates whether the mutation is generated by a Bigtable garbage collection mechanism.
tieBreaker When two mutations are registered at the same time by different Bigtable clusters, the mutation with the highest tiebreaker value is applied to the source table. Mutations with lower tiebreaker values are discarded.
value The new value set by the mutation. Unless the stripValues pipeline option is set, the value is set for SET_CELL mutations. For other mutation types, the value isn't set. Arrives in a form of a byte array. When JSON message encoding is configured, values are returned as strings. When useBase64Values is specified, the value is Base64-encoded. Otherwise, a charset specified by bigtableChangeStreamCharset is used to decode value bytes into a string.
sourceInstance The name of the Bigtable instance that registered the mutation. Might be when multiple pipelines stream changes from different instances to the same Pub/Sub topic.
sourceCluster The name of the Bigtable cluster that registered the mutation. Might be used when multiple pipelines stream changes from different instances to the same Pub/Sub topic.
sourceTable The name of the Bigtable table that received the mutation. Might be used in case when a multiple pipelines stream changes from a different tables to the same Pub/Sub topic.

Pipeline requirements

  • The specified Bigtable source instance.
  • The specified Bigtable source table. The table must have change streams enabled.
  • The specified Bigtable application profile.
  • The specified Pub/Sub topic must exist.

Template parameters

Required parameters

  • pubSubTopic: The name of the destination Pub/Sub topic.
  • bigtableChangeStreamAppProfile: The Bigtable application profile ID. The application profile must use single-cluster routing and allow single-row transactions.
  • bigtableReadInstanceId: The source Bigtable instance ID.
  • bigtableReadTableId: The source Bigtable table ID.

Optional parameters

  • messageEncoding: The encoding of the messages to be published to the Pub/Sub topic. When the schema of the destination topic is configured, the message encoding is determined by the topic settings. The following values are supported: BINARY and JSON. Defaults to JSON.
  • messageFormat: The encoding of the messages to publish to the Pub/Sub topic. When the schema of the destination topic is configured, the message encoding is determined by the topic settings. The following values are supported: AVRO, PROTOCOL_BUFFERS, and JSON. The default value is JSON. When the JSON format is used, the rowKey, column, and value fields of the message are strings, the contents of which are determined by the pipeline options useBase64Rowkeys, useBase64ColumnQualifiers, useBase64Values, and bigtableChangeStreamCharset.
  • stripValues: When set to true, the SET_CELL mutations are returned without new values set. Defaults to false. This parameter is useful when you don't need a new value to be present, also known as cache invalidation, or when values are extremely large and exceed Pub/Sub message size limits.
  • dlqDirectory: The directory for the dead-letter queue. Records that fail to be processed are stored in this directory. Defaults to a directory under the Dataflow job temp location. In most cases, you can use the default path.
  • dlqRetryMinutes: The number of minutes between dead-letter queue retries. Defaults to 10.
  • dlqMaxRetries: The dead letter maximum retries. Defaults to 5.
  • useBase64Rowkeys: Used with JSON message encoding. When set to true, the rowKey field is a Base64-encoded string. Otherwise, the rowKey is produced by using bigtableChangeStreamCharset to decode bytes into a string. Defaults to false.
  • pubSubProjectId: The Bigtable project ID. The default is the project of the Dataflow job.
  • useBase64ColumnQualifiers: Used with JSON message encoding. When set to true, the column field is a Base64-encoded string. Otherwise, the column is produced by using bigtableChangeStreamCharset to decode bytes into a string. Defaults to false.
  • useBase64Values: Used with JSON message encoding. When set to true, the value field is a Base64-encoded string. Otherwise, the value isproduced by using bigtableChangeStreamCharset to decode bytes into a string. Defaults to false.
  • disableDlqRetries: Whether or not to disable retries for the DLQ. Defaults to: false.
  • bigtableChangeStreamMetadataInstanceId: The Bigtable change streams metadata instance ID. Defaults to empty.
  • bigtableChangeStreamMetadataTableTableId: The ID of the Bigtable change streams connector metadata table. If not provided, a Bigtable change streams connector metadata table is automatically created during pipeline execution. Defaults to empty.
  • bigtableChangeStreamCharset: The Bigtable change streams charset name. Defaults to: UTF-8.
  • bigtableChangeStreamStartTimestamp: The starting timestamp (https://tools.ietf.org/html/rfc3339), inclusive, to use for reading change streams. For example, 2022-05-05T07:59:59Z. Defaults to the timestamp of the pipeline start time.
  • bigtableChangeStreamIgnoreColumnFamilies: A comma-separated list of column family name changes to ignore. Defaults to empty.
  • bigtableChangeStreamIgnoreColumns: A comma-separated list of column name changes to ignore. Defaults to empty.
  • bigtableChangeStreamName: A unique name for the client pipeline. Lets you resume processing from the point at which a previously running pipeline stopped. Defaults to an automatically generated name. See the Dataflow job logs for the value used.
  • bigtableChangeStreamResume: When set to true, a new pipeline resumes processing from the point at which a previously running pipeline with the same bigtableChangeStreamName value stopped. If the pipeline with the given bigtableChangeStreamName value has never run, a new pipeline doesn't start. When set to false, a new pipeline starts. If a pipeline with the same bigtableChangeStreamName value has already run for the given source, a new pipeline doesn't start. Defaults to false.
  • bigtableReadProjectId: The Bigtable project ID. The default is the project for the Dataflow job.

Run the template

Console

  1. Go to the Dataflow Create job from template page.
  2. Go to Create job from template
  3. In the Job name field, enter a unique job name.
  4. Optional: For Regional endpoint, select a value from the drop-down menu. The default region is us-central1.

    For a list of regions where you can run a Dataflow job, see Dataflow locations.

  5. From the Dataflow template drop-down menu, select the Bigtable change streams to Pub/Sub template.
  6. In the provided parameter fields, enter your parameter values.
  7. Click Run job.

gcloud

In your shell or terminal, run the template:

gcloud dataflow flex-template run JOB_NAME \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_PubSub \
    --parameters \
bigtableReadInstanceId=BIGTABLE_INSTANCE_ID,\
bigtableReadTableId=BIGTABLE_TABLE_ID,\
bigtableChangeStreamAppProfile=BIGTABLE_APPLICATION_PROFILE_ID,\
pubSubTopic=PUBSUB_TOPIC

Replace the following:

  • PROJECT_ID: the Google Cloud project ID where you want to run the Dataflow job
  • JOB_NAME: a unique job name of your choice
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • REGION_NAME: the region where you want to deploy your Dataflow job—for example, us-central1
  • BIGTABLE_INSTANCE_ID: your Bigtable instance id.
  • BIGTABLE_TABLE_ID: your Bigtable table id.
  • BIGTABLE_APPLICATION_PROFILE_ID: your Bigtable application profile id.
  • PUBSUB_TOPIC: the Pub/Sub destination topic name

API

To run the template using the REST API, send an HTTP POST request. For more information on the API and its authorization scopes, see projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
  "launch_parameter": {
    "jobName": "JOB_NAME",
    "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_PubSub",
    "parameters": {
        "bigtableReadInstanceId": "BIGTABLE_INSTANCE_ID",
        "bigtableReadTableId": "BIGTABLE_TABLE_ID",
        "bigtableChangeStreamAppProfile": "BIGTABLE_APPLICATION_PROFILE_ID",
        "pubSubTopic": "PUBSUB_TOPIC"
    }
  }
}

Replace the following:

  • PROJECT_ID: the Google Cloud project ID where you want to run the Dataflow job
  • JOB_NAME: a unique job name of your choice
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • LOCATION: the region where you want to deploy your Dataflow job—for example, us-central1
  • BIGTABLE_INSTANCE_ID: your Bigtable instance id.
  • BIGTABLE_TABLE_ID: your Bigtable table id.
  • BIGTABLE_APPLICATION_PROFILE_ID: your Bigtable application profile id.
  • PUBSUB_TOPIC: the Pub/Sub destination topic name

What's next