Use BigQuery metastore with tables in BigQuery
This document explains how to use BigQuery metastore with BigQuery tables and Spark.
With BigQuery metastore, you can create and use standard (built-in) tables, BigQuery tables for Apache Iceberg, and BigLake external tables from BigQuery.
Before you begin
- Enable billing for your Google Cloud project. Learn how to check if billing is enabled on a project.
Enable the BigQuery, and Dataproc APIs.
Optional: Understand how BigQuery metastore works and why you should use it.
Required roles
To get the permissions that you need to use Spark and Dataproc with BigQuery metastore as a metadata store, ask your administrator to grant you the following IAM roles:
-
Create BigQuery metastore tables in Spark:
-
Dataproc Worker (
roles/dataproc.worker
) on the Dataproc Serverless service account in the project -
BigQuery Data Editor (
roles/bigquery.dataEditor
) on the Dataproc Serverless service account in the project -
Storage Object Admin (
roles/storage.objectAdmin
) on the Dataproc Serverless service account in the project
-
Dataproc Worker (
-
Query BigQuery metastore tables in BigQuery:
-
BigQuery Data Viewer (
roles/bigquery.dataViewer
) on the project -
BigQuery User (
roles/bigquery.user
) on the project -
Storage Object Viewer (
roles/storage.objectViewer
) on the project
-
BigQuery Data Viewer (
For more information about granting roles, see Manage access to projects, folders, and organizations.
You might also be able to get the required permissions through custom roles or other predefined roles.
Connect to a table
Create a dataset in the Google Cloud console.
CREATE SCHEMA `PROJECT_ID`.DATASET_NAME;
Replace the following:
PROJECT_ID
: the ID of the Google Cloud project to create the dataset.DATASET_NAME
: a name for your dataset.
Create a Cloud Resource Connection.
Create a standard BigQuery table.
CREATE TABLE `PROJECT_ID`.DATASET_NAME.TABLE_NAME (name STRING,id INT64);
Replace the following:
TABLE_NAME
: a name for your table.
Insert data into the standard BigQuery table.
INSERT INTO `PROJECT_ID`.DATASET_NAME.TABLE_NAME VALUES ('test_name1', 123),('test_name2', 456),('test_name3', 789);
Create a BigQuery table for Apache Iceberg.
For example, to create a table, run the following
CREATE
statement.CREATE TABLE `PROJECT_ID`.DATASET_NAME.ICEBERG_TABLE_NAME( name STRING,id INT64 ) WITH CONNECTION `CONNECTION_NAME` OPTIONS ( file_format = 'PARQUET', table_format = 'ICEBERG', storage_uri = 'STORAGE_URI');
Replace the following:
ICEBERG_TABLE_NAME
: a name for your Iceberg table. For example,iceberg_managed_table
.CONNECTION_NAME
: the name of your connection. You created this in the previous step. For example,myproject.us.myconnection
.STORAGE_URI
: a fully qualified Cloud Storage URI. For example,gs://mybucket/table
.
Insert data into the BigQuery table for Apache Iceberg.
INSERT INTO `PROJECT_ID`.DATASET_NAME.ICEBERG_TABLE_NAME VALUES ('test_name1', 123),('test_name2', 456),('test_name3', 789);
Create a read-only Iceberg table.
For example, to create a read-only Iceberg table, run the following
CREATE
statement.CREATE OR REPLACE EXTERNAL TABLE `PROJECT_ID`.DATASET_NAME.READONLY_ICEBERG_TABLE_NAME WITH CONNECTION `CONNECTION_NAME` OPTIONS ( format = 'ICEBERG', uris = ['BUCKET_PATH'], require_partition_filter = FALSE);
Replace the following:
READONLY_ICEBERG_TABLE_NAME
: a name for your read-only table.BUCKET_PATH
: the path to the Cloud Storage bucket that contains the data for the external table, in the format['gs://bucket_name/[folder_name/]file_name']
.
From PySpark, query the standard table, managed Iceberg table, and read-only Iceberg table.
from pyspark.sql import SparkSession # Create a spark session spark = SparkSession.builder \ .appName("BigQuery Metastore Iceberg") \ .config("spark.sql.catalog.CATALOG_NAME", "org.apache.iceberg.spark.SparkCatalog") \ .config("spark.sql.catalog.CATALOG_NAME.catalog-impl", "org.apache.iceberg.gcp.bigquery.BigQueryMetastoreCatalog") \ .config("spark.sql.catalog.CATALOG_NAME.gcp_project", "PROJECT_ID") \ .config("spark.sql.catalog.CATALOG_NAME.gcp_location", "LOCATION") \ .config("spark.sql.catalog.CATALOG_NAME.warehouse", "WAREHOUSE_DIRECTORY") \ .getOrCreate() spark.conf.set("viewsEnabled","true") # Use the bqms_catalog spark.sql("USE `CATALOG_NAME`;") spark.sql("USE NAMESPACE DATASET_NAME;") # Configure spark for temp results spark.sql("CREATE namespace if not exists MATERIALIZATION_NAMESPACE"); spark.conf.set("materializationDataset","MATERIALIZATION_NAMESPACE") # List the tables in the dataset df = spark.sql("SHOW TABLES;") df.show(); # Query a standard BigQuery table sql = """SELECT * FROM DATASET_NAME.TABLE_NAME""" df = spark.read.format("bigquery").load(sql) df.show() # Query a BigQuery Managed Apache Iceberg table sql = """SELECT * FROM DATASET_NAME.ICEBERG_TABLE_NAME""" df = spark.read.format("bigquery").load(sql) df.show() # Query a BigQuery Readonly Apache Iceberg table sql = """SELECT * FROM DATASET_NAME.READONLY_ICEBERG_TABLE_NAME""" df = spark.read.format("bigquery").load(sql) df.show()
Replace the following:
WAREHOUSE_DIRECTORY
: the URI of the Cloud Storage folder that contains your data warehouse.CATALOG_NAME
: the name of the catalog that you're using.MATERIALIZATION_NAMESPACE
: the namespace for storing temp results.
Run the PySpark script using Serverless Spark.
gcloud dataproc batches submit pyspark SCRIPT_PATH \ --version=2.2 \ --project=PROJECT_ID \ --region=REGION \ --deps-bucket=YOUR_BUCKET \ --jars=https://storage-download.googleapis.com/maven-central/maven2/org/apache/iceberg/iceberg-spark-runtime-3.5_2.12/1.5.2/iceberg-spark-runtime-3.5_2.12-1.5.2.jar,gs://spark-lib/bigquery/iceberg-bigquery-catalog-1.5.2-1.0.0-beta.jar
Replace the following:
SCRIPT_PATH
: the path to the script that the batch job uses.PROJECT_ID
: the ID of the Google Cloud project to run the batch job in.REGION
: the region where your workload runs.YOUR_BUCKET
: the location of the Cloud Storage bucket to upload workload dependencies. Thegs://
URI prefix of the bucket is not required. You can specify the bucket path or bucket name, for example,mybucketname1
.