The spark-bigquery-connector is used with Apache Spark to read and write data from and to BigQuery. This tutorial provides example code that uses the spark-bigquery-connector within a Spark application. For instructions on creating a cluster, see the Dataproc Quickstarts.
Make the connector available to your application
You can make the spark-bigquery-connector available to your application in one of the following ways:
Install the spark-bigquery-connector in the Spark jars directory of every node by using the Dataproc connectors initialization action when you create your cluster.
Provide the connector URI when you submit your job:
- Google Cloud console: Use the Spark job
Jars files
item on the Dataproc Submit a job page. - gcloud CLI: Use the
gcloud dataproc jobs submit spark --jars
flag. - Dataproc API: Use the
SparkJob.jarFileUris
field.
- Google Cloud console: Use the Spark job
Include the jar in your Scala or Java Spark application as a dependency (see Compiling against the connector).
How to specify the connector jar URI
Spark-BigQuery connector versions are listed in the GitHub GoogleCloudDataproc/spark-bigquery-connector repository.
Specify the connector jar by substituting the Scala and connector version
information in the following URI string:
gs://spark-lib/bigquery/spark-bigquery-with-dependencies_SCALA_VERSION-CONNECTOR_VERSION.jar
Use Scala
2.12
with Dataproc image versions1.5+
gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-CONNECTOR_VERSION.jar
gcloud CLI example:
gcloud dataproc jobs submit spark \ --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.12-0.23.2.jar \ -- job-args
Use Scala
2.11
with Dataproc image versions1.4
and earlier:gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-CONNECTOR_VERSION.jar
gcloud CLI example:
gcloud dataproc jobs submit spark \ --jars=gs://spark-lib/bigquery/spark-bigquery-with-dependencies_2.11-0.23.2.jar \ -- job-args
Calculating costs
In this document, you use the following billable components of Google Cloud:
- Dataproc
- BigQuery
- Cloud Storage
To generate a cost estimate based on your projected usage,
use the pricing calculator.
Reading and writing data from BigQuery
This example reads data from BigQuery into a Spark DataFrame to perform a word count using the standard data source API.
The connector writes the data to BigQuery by
first buffering all the data into a Cloud Storage temporary table. Then it
copies all data from into BigQuery in one operation. The
connector attempts to delete the temporary files once the BigQuery
load operation has succeeded and once again when the Spark application terminates.
If the job fails, remove any remaining temporary
Cloud Storage files. Typically, temporary BigQuery
files are located in gs://[bucket]/.spark-bigquery-[jobid]-[UUID]
.
Configuring billing
By default, the project associated with the credentials or service account is
billed for API usage. To bill a different project, set the following
configuration: spark.conf.set("parentProject", "<BILLED-GCP-PROJECT>")
.
It can also be added to a read/write operation, as follows:
.option("parentProject", "<BILLED-GCP-PROJECT>")
.
Running the code
Before running this example, create a dataset named "wordcount_dataset" or change the output dataset in the code to an existing BigQuery dataset in your Google Cloud project.
Use the
bq command to create
the wordcount_dataset
:
bq mk wordcount_dataset
Use the Google Cloud CLI command to create a Cloud Storage bucket, which will be used to export to BigQuery:
gcloud storage buckets create gs://[bucket]
Scala
- Examine the code and replace the [bucket] placeholder with
the Cloud Storage bucket you created earlier.
/* * Remove comment if you are not running in spark-shell. * import org.apache.spark.sql.SparkSession val spark = SparkSession.builder() .appName("spark-bigquery-demo") .getOrCreate() */ // Use the Cloud Storage bucket for temporary BigQuery export data used // by the connector. val bucket = "[bucket]" spark.conf.set("temporaryGcsBucket", bucket) // Load data in from BigQuery. See // https://github.com/GoogleCloudDataproc/spark-bigquery-connector/tree/0.17.3#properties // for option information. val wordsDF = (spark.read.format("bigquery") .option("table","bigquery-public-data:samples.shakespeare") .load() .cache()) wordsDF.createOrReplaceTempView("words") // Perform word count. val wordCountDF = spark.sql( "SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word") wordCountDF.show() wordCountDF.printSchema() // Saving the data to BigQuery. (wordCountDF.write.format("bigquery") .option("table","wordcount_dataset.wordcount_output") .save())
- Run the code on your cluster
- Use SSH to connect to the Dataproc cluster master node
- Go to the Dataproc Clusters page in the Google Cloud console, then click the name of your cluster
- On the >Cluster details page, select the VM Instances tab. Then, click
SSH
to the right of the name of the cluster master node
A browser window opens at your home directory on the master nodeConnected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- Create
wordcount.scala
with the pre-installedvi
,vim
, ornano
text editor, then paste in the Scala code from the Scala code listingnano wordcount.scala
- Launch the
spark-shell
REPL.$ spark-shell --jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar ... Using Scala version ... Type in expressions to have them evaluated. Type :help for more information. ... Spark context available as sc. ... SQL context available as sqlContext. scala>
- Run wordcount.scala with the
:load wordcount.scala
command to create the BigQuerywordcount_output
table. The output listing displays 20 lines from the wordcount output.:load wordcount.scala ... +---------+----------+ | word|word_count| +---------+----------+ | XVII| 2| | spoil| 28| | Drink| 7| |forgetful| 5| | Cannot| 46| | cures| 10| | harder| 13| | tresses| 3| | few| 62| | steel'd| 5| | tripping| 7| | travel| 35| | ransom| 55| | hope| 366| | By| 816| | some| 1169| | those| 508| | still| 567| | art| 893| | feign| 10| +---------+----------+ only showing top 20 rows root |-- word: string (nullable = false) |-- word_count: long (nullable = true)
To preview the output table, open theBigQuery
page, select thewordcount_output
table, and then click Preview.
- Use SSH to connect to the Dataproc cluster master node
PySpark
- Examine the code and replace the [bucket] placeholder with
the Cloud Storage bucket you created earlier.
#!/usr/bin/env python """BigQuery I/O PySpark example.""" from pyspark.sql import SparkSession spark = SparkSession \ .builder \ .master('yarn') \ .appName('spark-bigquery-demo') \ .getOrCreate() # Use the Cloud Storage bucket for temporary BigQuery export data used # by the connector. bucket = "[bucket]" spark.conf.set('temporaryGcsBucket', bucket) # Load data from BigQuery. words = spark.read.format('bigquery') \ .option('table', 'bigquery-public-data:samples.shakespeare') \ .load() words.createOrReplaceTempView('words') # Perform word count. word_count = spark.sql( 'SELECT word, SUM(word_count) AS word_count FROM words GROUP BY word') word_count.show() word_count.printSchema() # Save the data to BigQuery word_count.write.format('bigquery') \ .option('table', 'wordcount_dataset.wordcount_output') \ .save()
- Run the code on your cluster
- Use SSH to connect to the Dataproc cluster master node
- Go to the Dataproc Clusters page in the Google Cloud console, then click the name of your cluster
- On the Cluster details page, select the VM Instances tab. Then, click
SSH
to the right of the name of the cluster master node
A browser window opens at your home directory on the master nodeConnected, host fingerprint: ssh-rsa 2048 ... ... user@clusterName-m:~$
- Create
wordcount.py
with the pre-installedvi
,vim
, ornano
text editor, then paste in the PySpark code from the PySpark code listingnano wordcount.py
- Run wordcount with
spark-submit
to create the BigQuerywordcount_output
table. The output listing displays 20 lines from the wordcount output.spark-submit --jars gs://spark-lib/bigquery/spark-bigquery-latest.jar wordcount.py ... +---------+----------+ | word|word_count| +---------+----------+ | XVII| 2| | spoil| 28| | Drink| 7| |forgetful| 5| | Cannot| 46| | cures| 10| | harder| 13| | tresses| 3| | few| 62| | steel'd| 5| | tripping| 7| | travel| 35| | ransom| 55| | hope| 366| | By| 816| | some| 1169| | those| 508| | still| 567| | art| 893| | feign| 10| +---------+----------+ only showing top 20 rows root |-- word: string (nullable = false) |-- word_count: long (nullable = true)
To preview the output table, open theBigQuery
page, select thewordcount_output
table, and then click Preview.
- Use SSH to connect to the Dataproc cluster master node
For more information
- BigQuery Storage & Spark SQL - Python
- Creating a table definition file for an external data source
- Querying externally partitioned data
- Spark job tuning tips