Write from Dataflow to Pub/Sub

This document describes how to write text data from Dataflow to Pub/Sub by using the Apache Beam PubSubIO I/O connector.

Overview

To write data to Pub/Sub, use the PubSubIO connector. The input elements can be either Pub/Sub messages or just the message data. If the input elements are Pub/Sub messages, you can optionally set attributes or an ordering key on each message.

You can use either the Java, Python, or Go version of the PubSubIO connector, as follows:

Java

To write to a single topic, call the PubsubIO.writeMessages method. This method takes an input collection of PubsubMessage objects. The connector also defines convenience methods for writing strings, binary-encoded Avro messages, or binary-encoded protobuf messages. These methods convert the input collection into Pub/Sub messages.

To write to a dynamic set of topics based on the input data, call writeMessagesDynamic. Specify the destination topic for each message by calling PubsubMessage.withTopic on the message. For example, you can route messages to different topics based on the value of a particular field in your input data.

For more information, see the PubsubIO reference documentation.

Python

Call the pubsub.WriteToPubSub method. By default, this method takes an input collection of type bytes, representing the message payload. If the with_attributes parameter is True, the method takes a collection of PubsubMessage objects.

For more information, see the pubsub module reference documentation.

Go

To write data to Pub/Sub, call the pubsubio.Write method. This method takes an input collection of either PubSubMessage objects or byte slices that contain the message payloads.

For more information, see the pubsubio package reference documentation.

For more information about Pub/Sub messages, see Message format in the Pub/Sub documentation.

Timestamps

Pub/Sub sets a timestamp on each message. This timestamp represents the time when the message is published to Pub/Sub. In a streaming scenario, you might also care about the event timestamp, which is the time when the message data was generated. You can use the Apache Beam element timestamp to represent event time. Sources that create an unbounded PCollection often assign each new element a timestamp that corresponds to the event time.

For Java and Python, the Pub/Sub I/O connector can write each element's timestamp as a Pub/Sub message attribute. Message consumers can use this attribute to get the event timestamp.

Java

Call PubsubIO.Write<T>.withTimestampAttribute and specify the name of the attribute.

Python

Specify the timestamp_attribute parameter when you call WriteToPubSub.

Message delivery

Dataflow supports exactly-once processing of messages within a pipeline. However, the Pub/Sub I/O connector can't guarantee exactly-once delivery of messages through Pub/Sub.

For Java and Python, you can configure the Pub/Sub I/O connector to write each element's unique ID as a message attribute. Message consumers can then use this attribute to deduplicate messages.

Java

Call PubsubIO.Write<T>.withIdAttribute and specify the name of the attribute.

Python

Specify the id_label parameter when you call WriteToPubSub.

Direct output

If you enable at-least-once streaming mode in your pipeline, then the I/O connector uses direct output. In this mode, the connector doesn't checkpoint messages, which enables faster writes. However, retries in this mode might cause duplicate messages with different message IDs, possibly making it harder for message consumers to deduplicate the messages.

For pipelines that use exactly-once mode, you can enable direct output by setting the streaming_enable_pubsub_direct_output service option. Direct output reduces write latency and results in more efficient processing. Consider this option if your message consumers can handle duplicate messages with non-unique message IDs.

Examples

The following example creates a PCollection of Pub/Sub messages and writes them to a Pub/Sub topic. The topic is specified as a pipeline option. Each message contains payload data and a set of attributes.

Java

To authenticate to Dataflow, set up Application Default Credentials. For more information, see Set up authentication for a local development environment.

import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptor;



public class PubSubWriteWithAttributes {
  public interface Options extends PipelineOptions {
    @Description("The Pub/Sub topic to write to. Format: projects/<PROJECT>/topics/<TOPIC>")
    String getTopic();

    void setTopic(String value);
  }

  // A custom datatype for the source data.
  @DefaultCoder(AvroCoder.class)
  static class ExampleData {
    public String name;
    public String product;
    public Long timestamp; // Epoch time in milliseconds

    public ExampleData() {}

    public ExampleData(String name, String product, Long timestamp) {
      this.name = name;
      this.product = product;
      this.timestamp = timestamp;
    }
  }

  // Write messages to a Pub/Sub topic.
  public static void main(String[] args) {
    // Example source data.
    final List<ExampleData> messages = Arrays.asList(
        new ExampleData("Robert", "TV", 1613141590000L),
        new ExampleData("Maria", "Phone", 1612718280000L),
        new ExampleData("Juan", "Laptop", 1611618000000L),
        new ExampleData("Rebeca", "Videogame", 1610000000000L)
    );

    // Parse the pipeline options passed into the application. Example:
    //   --runner=DirectRunner --topic=projects/MY_PROJECT/topics/MY_TOPIC"
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    var options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    var pipeline = Pipeline.create(options);
    pipeline
        // Create some data to write to Pub/Sub.
        .apply(Create.of(messages))
        // Convert the data to Pub/Sub messages.
        .apply(MapElements
            .into(TypeDescriptor.of(PubsubMessage.class))
            .via((message -> {
              byte[] payload = message.product.getBytes(StandardCharsets.UTF_8);
              // Create attributes for each message.
              HashMap<String, String> attributes = new HashMap<String, String>();
              attributes.put("buyer", message.name);
              attributes.put("timestamp", Long.toString(message.timestamp));
              return new PubsubMessage(payload, attributes);
            })))
        // Write the messages to Pub/Sub.
        .apply(PubsubIO.writeMessages().to(options.getTopic()));
    pipeline.run().waitUntilFinish();
  }
}

Python

To authenticate to Dataflow, set up Application Default Credentials. For more information, see Set up authentication for a local development environment.

import argparse
from typing import Any, Dict, List

import apache_beam as beam
from apache_beam.io import PubsubMessage
from apache_beam.io import WriteToPubSub
from apache_beam.options.pipeline_options import PipelineOptions

from typing_extensions import Self


def item_to_message(item: Dict[str, Any]) -> PubsubMessage:
    # Re-import needed types. When using the Dataflow runner, this
    # function executes on a worker, where the global namespace is not
    # available. For more information, see:
    # https://cloud.google.com/dataflow/docs/guides/common-errors#name-error
    from apache_beam.io import PubsubMessage

    attributes = {"buyer": item["name"], "timestamp": str(item["ts"])}
    data = bytes(item["product"], "utf-8")

    return PubsubMessage(data=data, attributes=attributes)


def write_to_pubsub(argv: List[str] = None) -> None:
    # Parse the pipeline options passed into the application. Example:
    #     --topic=$TOPIC_PATH --streaming
    # For more information, see
    # https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    class MyOptions(PipelineOptions):
        @classmethod
        # Define a custom pipeline option to specify the Pub/Sub topic.
        def _add_argparse_args(cls: Self, parser: argparse.ArgumentParser) -> None:
            parser.add_argument("--topic", required=True)

    example_data = [
        {"name": "Robert", "product": "TV", "ts": 1613141590000},
        {"name": "Maria", "product": "Phone", "ts": 1612718280000},
        {"name": "Juan", "product": "Laptop", "ts": 1611618000000},
        {"name": "Rebeca", "product": "Video game", "ts": 1610000000000},
    ]
    options = MyOptions()

    with beam.Pipeline(options=options) as pipeline:
        (
            pipeline
            | "Create elements" >> beam.Create(example_data)
            | "Convert to Pub/Sub messages" >> beam.Map(item_to_message)
            | WriteToPubSub(topic=options.topic, with_attributes=True)
        )

    print("Pipeline ran successfully.")