Este documento mostra como criar tarefas de qualidade de dados do Dataplex que permitem programar e executar verificações de qualidade de dados para tabelas internas e externas do BigQuery.
Para mais informações, consulte Visão geral das tarefas de qualidade de dados.
Antes de começar
Este documento pressupõe que você tenha um lake do Dataplex para criar a tarefa de qualidade de dados.
Ativar APIs e serviços do Google
Ativar a API Dataproc.
Ative o Acesso privado do Google para sua rede e sub-rede. Ative o Acesso privado do Google na rede que você planeja usar com as tarefas de qualidade de dados do Dataplex. Se você não especificar uma rede ou sub-rede ao criar a tarefa de qualidade de dados do Dataplex, vai ser usada a sub-rede padrão. Nesse caso, é necessário ativar o Acesso privado do Google na sub-rede padrão.
Criar um arquivo de especificação
O Dataplex usa o CloudDQ de código aberto como programa do driver. Os requisitos de verificação de qualidade de dados do Dataplex são definidos nos arquivos de especificação YAML do CloudDQ.
Como entrada para a tarefa de qualidade de dados, é possível ter um único arquivo YAML ou um único arquivo ZIP contendo um ou mais arquivos YAML. É recomendado capturar os requisitos de verificação de qualidade de dados em arquivos de especificação YAML separados, com um arquivo para cada seção.
Para preparar um arquivo de especificação, faça o seguinte:
-
Crie um ou mais arquivos de especificação YAML do CloudDQ que definam os requisitos de verificação de qualidade de dados. Para mais informações sobre a sintaxe necessária, consulte a seção Sobre o arquivo de especificação deste documento.
Salve o arquivo de especificação YAML no formato
.yml
ou.yaml
. Se você criar vários arquivos de especificação YAML, salve todos em um único arquivo ZIP. - Crie um bucket do Cloud Storage.
- Faça upload do arquivo de especificação para o bucket do Cloud Storage.
Sobre o arquivo de especificação
Seu arquivo de especificação YAML do CloudDQ precisa ter estas três seções:
Regras (definidas sob o nó YAML
rules
de nível superior): uma lista de regras a serem executadas. É possível criar essas regras com base em tipos de regras predefinidos, comoNOT_NULL
eREGEX
, ou ampliá-las com instruções SQL personalizadas, comoCUSTOM_SQL_EXPR
eCUSTOM_SQL_STATEMENT
. A instruçãoCUSTOM_SQL_EXPR
sinaliza qualquer linha quecustom_sql_expr
avaliou para oFalse
como falha. A instruçãoCUSTOM_SQL_STATEMENT
sinaliza qualquer valor retornado pela instrução inteira como uma falha.Filtros de linha (definidos sob o nó YAML
row_filters
de nível superior): expressões SQL que retornam um valor booleano que define filtros para buscar um subconjunto de dados do assunto da entidade subjacente para validação.Vinculações de regras (definidas no nó YAML
rule_bindings
de nível superior): definerules
erule filters
a serem aplicados às tabelas.Dimensões da regra (definidas no nó YAML
rule_dimensions
): define a lista de dimensões de regra de qualidade de dados permitidas que uma regra pode definir no campodimension
correspondente.Exemplo:
rule_dimensions: - consistency - correctness - duplication - completeness - conformance
O campo
dimension
é opcional para uma regra. A seção de dimensões da regra será obrigatória sedimension
estiver listado em qualquer regra.
Para mais informações, consulte o guia de referência do CloudDQ e os arquivos de especificação de amostra.
Crie um conjunto de dados para armazenar os resultados
-
Para armazenar os resultados, crie um conjunto de dados do BigQuery.
O conjunto de dados precisa estar na mesma região que as tabelas em que você executa a tarefa de qualidade de dados.
O Dataplex usa esse conjunto de dados e cria ou reutiliza uma tabela de sua escolha para armazenar os resultados.
Crie uma conta de serviço
Crie uma conta de serviço com os seguintes papéis e permissões do Identity and Access Management (IAM):
- Acesso de leitura ao caminho do Cloud Storage que contém as especificações do YAML. É possível usar o papel Leitor de objetos do Storage (
roles/storage.objectViewer
) no bucket do Cloud Storage. - Acesso de leitura a conjuntos de dados do BigQuery com dados a serem validados. É possível usar o papel Leitor de dados do BigQuery.
- Acesso de gravação ao conjunto de dados do BigQuery para criar uma tabela (se necessário) e gravar os resultados nessa tabela. É possível usar o papel de Editor de dados do BigQuery (
roles/bigquery.dataEditor
) no nível do conjunto de dados. - Papel usuário de jobs do BigQuery (
roles/bigquery.jobUser
) no nível do projeto para criar jobs do BigQuery em um projeto. - Papel de leitor de metadados do Dataplex (
roles/dataplex.metadataReader
) no nível do projeto ou do lake. - Papel consumidor do Service Usage (
roles/serviceusage.serviceUsageConsumer
) para envolvidos no projeto. - Papel de worker do Dataproc.
- Permissão
iam.serviceAccounts.actAs
concedida ao usuário que envia o job. - Papel de usuário da conta de serviço concedido à conta de serviço do lake do Dataplex. É possível ver a conta de serviço do lake do Dataplex no console do Google Cloud .
Opcional: usar configurações avançadas
Estas etapas são opcionais:
Por padrão, o BigQuery executa verificações de qualidade de dados no projeto do usuário atual. Como alternativa, é possível escolher um projeto diferente para executar os jobs do BigQuery usando o argumento
--gcp_project_id
TASK_ARGS
para a propriedade--execution-args
da tarefa.Se o ID do projeto especificado para executar consultas do BigQuery for diferente do projeto em que a conta de serviço (especificada pelo
--execution-service-account
) foi criada, verifique se a política da organização que desativa o uso da conta de serviço entre projetos (iam.disableServiceAccountCreation
) está desativada. Além disso, verifique se a conta de serviço pode acessar a programação de jobs do BigQuery no projeto em que as consultas do BigQuery estão sendo executadas.
Limitações
- Todas as tabelas especificadas para uma determinada tarefa de qualidade de dados precisam pertencer à mesma região do Google Cloud .
Programar uma tarefa de qualidade de dados
Console
- No console do Google Cloud , acesse a página Processo do Dataplex.
- Clique em Criar tarefa.
- No card Verificar qualidade de dados, clique em Criar tarefa.
- Em Dataplex lake, escolha seu lake.
- Em ID, insira um ID.
- Na seção Especificação da qualidade de dados, faça o seguinte:
- No campo Selecionar arquivo GCS, clique em Procurar.
Selecione o bucket do Cloud Storage.
Clique em Selecionar.
Na seção Tabela de resultados, faça o seguinte:
No campo Selecionar conjunto de dados do BigQuery, clique em Procurar.
Selecione o conjunto de dados do BigQuery para armazenar os resultados da validação.
Clique em Selecionar.
No campo Tabela do BigQuery, insira o nome da tabela para armazenar os resultados. Se a tabela não existir, o Dataplex vai criar uma para você. Não use o nome
dq_summary
, porque ele é reservado para tarefas de processamento interno.
Na seção Conta de serviço, selecione uma conta de serviço no menu Conta de serviço de usuário.
Clique em Continuar.
Na seção Definir programação, configure a programação para executar a tarefa de qualidade de dados.
Clique em Criar.
CLI da gcloud
Veja a seguir um exemplo de execução de uma tarefa de qualidade de dados que usa o comando da CLI gcloud de tarefas do Dataplex:
export USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH="USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH" # Google Cloud project where the Dataplex task is created. export GOOGLE_CLOUD_PROJECT="GOOGLE_CLOUD_PROJECT" # Google Cloud region for the Dataplex lake. export DATAPLEX_REGION_ID="DATAPLEX_REGION_ID" # Public Cloud Storage bucket containing the prebuilt data quality executable artifact. There is one bucket for each Google Cloud region. export DATAPLEX_PUBLIC_GCS_BUCKET_NAME="dataplex-clouddq-artifacts-${DATAPLEX_REGION_ID}" # The Dataplex lake where your task is created. export DATAPLEX_LAKE_NAME="DATAPLEX_LAKE_NAME" # The service account used for running the task. Ensure that this service account has sufficient IAM permissions on your project, including BigQuery Data Editor, BigQuery Job User, Dataplex Editor, Dataproc Worker, and Service Usage Consumer. # The BigQuery dataset used for storing the intermediate data quality summary results and the BigQuery views associated with each rule binding. export TARGET_BQ_DATASET="TARGET_BQ_DATASET" # If you want to use a different dataset for storing the intermediate data quality summary results and the BigQuery views associated with each rule binding, use the following: export CLOUDDQ_BIGQUERY_DATASET=$TARGET_BQ_DATASET # The BigQuery dataset where the final results of the data quality checks are stored. This could be the same as CLOUDDQ_BIGQUERY_DATASET. export TARGET_BQ_DATASET="TARGET_BQ_DATASET" # The BigQuery table where the final results of the data quality checks are stored. export TARGET_BQ_TABLE="TARGET_BQ_TABLE" # The unique identifier for the task. export TASK_ID="TASK_ID" gcloud dataplex tasks create \ --location="${DATAPLEX_REGION_ID}" \ --lake="${DATAPLEX_LAKE_NAME}" \ --trigger-type=ON_DEMAND \ --execution-service-account="$DATAPLEX_TASK_SERVICE_ACCOUNT" \ --spark-python-script-file="gs://${DATAPLEX_PUBLIC_GCS_BUCKET_NAME}/clouddq_pyspark_driver.py" \ --spark-file-uris="gs://${DATAPLEX_PUBLIC_GCS_BUCKET_NAME}/clouddq-executable.zip","gs://${DATAPLEX_PUBLIC_GCS_BUCKET_NAME}/clouddq-executable.zip.hashsum","${USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH}" \ --execution-args=^::^TASK_ARGS="clouddq-executable.zip, ALL, ${USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH}, --gcp_project_id='GOOGLE_CLOUD_PROJECT', --gcp_region_id='${DATAPLEX_REGION_ID}', --gcp_bq_dataset_id='${TARGET_BQ_DATASET}', --target_bigquery_summary_table='${GOOGLE_CLOUD_PROJECT}.${TARGET_BQ_DATASET}.${TARGET_BQ_TABLE}'," \ "$TASK_ID"
Parâmetro | Descrição |
---|---|
USER_CLOUDDQ_YAML_CONFIGS_GCS_PATH |
O caminho do Cloud Storage para a entrada de configurações do YAML de qualidade de dados para a tarefa de qualidade de dados. É possível ter um único arquivo YAML no formato .yml ou .yaml ou um arquivo ZIP com vários arquivos YAML. |
GOOGLE_CLOUD_PROJECT |
O projeto do Google Cloud em que a tarefa do Dataplex e os jobs do BigQuery são criados. |
DATAPLEX_REGION_ID |
A região do lake do Dataplex em que a tarefa de qualidade de dados é criada. |
SERVICE_ACCOUNT |
A conta de serviço usada para executar a tarefa. Verifique se essa conta de serviço tem permissões do IAM suficientes, conforme descrito na seção Antes de começar. |
Para --execution-args
, os argumentos a seguir precisam ser transmitidos como argumentos posicionados e, portanto, nesta ordem:
Argumento | Descrição |
---|---|
clouddq-executable.zip |
Um executável pré-compilado que foi transmitido no spark-file-uris de um bucket público do Cloud Storage. |
ALL |
Execute todas as vinculações de regras. Também é possível fornecer vinculações de regras específicas como uma lista separada por vírgulas.
Por exemplo, RULE_1,RULE_2 . |
gcp-project-id |
ID do projeto que executa as consultas do BigQuery. |
gcp-region-id |
Região para executar os jobs do BigQuery para validação da qualidade de dados. Essa região precisa ser igual à do gcp-bq-dataset-id e do target_bigquery_summary_table . |
gcp-bq-dataset-id |
Conjunto de dados do BigQuery usado para armazenar as visualizações rule_binding e os resultados resumidos da qualidade de dados intermediários. |
target-bigquery-summary-table |
Referência do ID da tabela da tabela do BigQuery em que os resultados finais das verificações de qualidade de dados são armazenados. Não use o valor do ID
dq_summary , porque ele é reservado para tarefas de processamento interno. |
--summary_to_stdout |
(Opcional) Quando essa sinalização é transmitida, todas as linhas de resultados de validação
criadas na tabela dq_summary na última execução são
registradas como registros JSON no Cloud Logging e stdout . |
API
Substitua:
PROJECT_ID = "Your Dataplex Project ID" REGION = "Your Dataplex lake region" LAKE_ID = "Your Dataplex lake ID" SERVICE_ACC = "Your service account used for reading the data" DATAPLEX_TASK_ID = "Unique task ID for the data quality task" BUCKET_NAME = "Your Cloud Storage bucket name containing the CloudDQ configs or YAML specification" GCP_BQ_BILLING_PROJECT_ID = "Your BigQuery billing project" GCP_BQ_REGION_ID = "Your BigQuery dataset region ID" #Optional GCP_BQ_DATASET_ID = "Your BigQuery dataset to store the data quality summary results" TARGET_TABLE_NAME = "Your target table name to store the results in BigQuery dataset"
- Envie uma solicitação POST HTTP:
POST https://dataplex.googleapis.com/v1/projects/${PROJECT_ID}/locations/${REGION}/lakes/${LAKE_ID}/tasks?task_id=${DATAPLEX_TASK_ID} { "spark": { "python_script_file": f"gs://dataplex-clouddq-artifacts-us-central1/clouddq_pyspark_driver.py", "file_uris": [ f"gs://dataplex-clouddq-artifacts-us-central1/clouddq-executable.zip", f"gs://dataplex-clouddq-artifacts-us-central1/clouddq-executable.zip.hashsum", f"gs://dataplex-clouddq-artifacts-us-central1/your-clouddq-configs.zip" ] }, "execution_spec": { "args": { "TASK_ARGS":f"clouddq-executable.zip, ALL, gs://BUCKET_NAME/your-clouddq-configs.zip, --gcp_project_id=${GCP_BQ_BILLING_PROJECT_ID}, --gcp_region_id=${GCP_BQ_REGION_ID}, --gcp_bq_dataset_id=${GCP_BQ_DATASET_ID}, --target_bigquery_summary_table=${GCP_BQ_BILLING_PROJECT_ID}.${GCP_BQ_DATASET_ID}.${TARGET_TABLE_NAME}" }, "service_account": "SERVICE_ACC" }, "trigger_spec": { "type": "ON_DEMAND" }, "description": "${DATAPLEX_TASK_DESCRIPTION}" }
Consulte também Exemplo de DAG do Airflow para tarefa de qualidade de dados do Dataplex.
Monitorar uma tarefa programada de qualidade de dados
Veja como monitorar sua tarefa.
Ver os resultados
Os resultados das validações de qualidade de dados são armazenados no conjunto de dados e na tabela de resumo do BigQuery que você especificou, conforme descrito em Criar um conjunto de dados para armazenar os resultados. A tabela de resumo contém o resumo da saída de cada combinação de vinculação de regras e regra para cada execução de validação. A saída na tabela de resumo inclui as seguintes informações:
Nome da coluna | Descrição |
---|---|
dataplex_lake |
(string) ID do lake do Dataplex que contém a tabela que está sendo validada. |
dataplex_zone |
(string) ID da zona do Dataplex que contém a tabela que está sendo validada. |
dataplex_asset_id |
(string) ID do recurso do Dataplex que contém a tabela que está sendo validada. |
execution_ts |
(carimbo de data/hora) Carimbo de data/hora de quando a consulta de validação foi executada. |
rule_binding_id |
(string) ID da vinculação de regra para a qual os resultados de validação são informados. |
rule_id |
(string) ID da regra sob a vinculação de regra para a qual os resultados de validação são informados. |
dimension |
(string) Dimensão de qualidade de dados do rule_id . Esse valor só pode ser um dos valores especificados no nó do YAML rule_dimensions . |
table_id |
(string) ID da entidade para que os resultados de validação são informados.
Esse ID é especificado no parâmetro entity da vinculação da respectiva regra. |
column_id |
(string) ID da coluna para que os resultados da validação informados.
Esse ID é especificado no parâmetro column da vinculação da respectiva regra. |
last_modified |
(carimbo de data/hora) O último carimbo de data/hora modificado do table_id que está sendo validado. |
metadata_json_string |
(string) Pares de chave-valor do conteúdo do parâmetro de metadados especificado sob a vinculação de regra ou durante a execução da qualidade de dados. |
configs_hashsum |
(string) A soma de hash do documento JSON que contém a vinculação de regra e todas as regras associadas, vinculações de regra, filtros de linha e configurações de entidade.
configs_hashsum permite rastrear quando o conteúdo de um
ID rule_binding ou uma das configurações referenciadas é alterado. |
dq_run_id |
(string) ID exclusivo do registro. |
invocation_id |
(string) ID da execução da qualidade de dados. Todos os registros de resumo da qualidade de dados gerados na mesma instância de execução de qualidade de dados compartilham o mesmo invocation_id . |
progress_watermark |
(booleano) Determina se esse registro específico é considerado pela verificação de qualidade de dados ao determinar a alta marca d'água para validação incremental. Se FALSE , o respectivo registro será ignorado ao estabelecer o valor de marca-d'água alto. Essas informações são úteis ao executar validações de qualidade de dados de teste que não devem avançar a marca d'água alta. O Dataplex preenche esse campo com TRUE por padrão, mas isso pode ser substituído se o argumento --progress_watermark tiver um valor de FALSE .
|
rows_validated |
(número inteiro) Número total de registros validados após a aplicação de row_filters e quaisquer filtros de marca-d'água alta na coluna incremental_time_filter_column_id , se especificado. |
complex_rule_validation_errors_count |
(float) Número de linhas retornadas por uma regra CUSTOM_SQL_STATEMENT . |
complex_rule_validation_success_flag |
(booleano) Status de sucesso das regras CUSTOM_SQL_STATEMENT .
|
success_count |
(número inteiro) Número total de registros que passaram na validação. Esse campo está definido como NULL para regras CUSTOM_SQL_STATEMENT .
|
success_percentage |
(ponto flutuante) A porcentagem do número de registros que passaram na validação dentro do número total de registros validados. Esse campo está definido como NULL para regras CUSTOM_SQL_STATEMENT . |
failed_count |
(número inteiro) Número total de registros que não passaram na validação. Esse campo está definido como NULL para regras CUSTOM_SQL_STATEMENT .
|
failed_percentage |
(ponto flutuante) A porcentagem do número de registros que não foram validados dentro do número total de registros validados. Esse campo está definido como NULL para regras CUSTOM_SQL_STATEMENT . |
null_count |
(número inteiro) Número total de registros que retornaram o valor nulo durante a validação.
Esse campo é definido como NULL para regras NOT_NULL e CUSTOM_SQL_STATEMENT . |
null_percentage |
(ponto flutuante) A porcentagem do número de registros que retornaram nulo durante a validação dentro do número total de registros validados. Esse campo está definido como NULL para as regras NOT_NULL e CUSTOM_SQL_STATEMENT . |
failed_records_query |
Para cada regra que falha, esta coluna armazena uma consulta que pode ser usada para receber registros com falha. Neste documento, consulte
Resolver problemas de regras com falhas usando
failed_records_query . |
Para entidades do BigQuery, uma visualização é criada para cada rule_binding
que contém a lógica de validação SQL da execução mais recente. Essas visualizações podem ser encontradas no conjunto de dados do BigQuery especificado no argumento --gcp-bq-dataset-id
.
Otimizações de custo
Você pode ajudar a reduzir custos com as otimizações a seguir.
Validações incrementais
Muitas vezes, você tem tabelas atualizadas rotineiramente com novas partições (novas linhas). Se não quiser revalidar as partições antigas em cada execução, use validações incrementais.
Para validações incrementais, é necessário ter uma coluna do tipo TIMESTAMP
ou DATETIME
na tabela em que o valor da coluna aumenta monotonicamente. Use as colunas em que a tabela do BigQuery está particionada.
Para especificar a validação incremental, especifique um valor para incremental_time_filter_column_id=TIMESTAMP/DATETIME type column
como parte de uma vinculação de regra.
Quando você especifica uma coluna, a tarefa de qualidade de dados considera apenas as linhas com um valor de TIMESTAMP
maior que o carimbo de data/hora da última tarefa de qualidade de dados executada.
Exemplos de arquivos de especificação
Para usar esses exemplos, crie um conjunto de dados do BigQuery
chamado sales
. Em seguida, crie uma tabela de fatos chamada sales_orders
e adicione dados de amostra executando uma consulta com as seguintes instruções do GoogleSQL:
CREATE OR REPLACE TABLE sales.sales_orders
(
id STRING NOT NULL,
last_modified_timestamp TIMESTAMP,
customer_id STRING,
item_id STRING,
amount NUMERIC,
transaction_currency STRING
);
INSERT INTO sales.sales_orders
(id, last_modified_timestamp, customer_id, item_id, amount, transaction_currency)
VALUES
("order1",CURRENT_TIMESTAMP(),"customer1","ASDWQ123456789012345",100,"USD"),
("order1",CURRENT_TIMESTAMP(),"customer2","bad_item_id",-10,"XXX"),
("order2",CURRENT_TIMESTAMP(),"customer3","INTNL987654321098765",50,"GBP"),
("order3",CURRENT_TIMESTAMP(),"customer4","INTNL932716428593847",50,"GBP")
Exemplo 1
O exemplo de código a seguir cria verificações de qualidade de dados para validar esses valores:
amount
: os valores são zero ou números positivos.item_id
: uma string alfanumérica de cinco caracteres alfabéticos, seguida de 15 dígitos.transaction_currency
: um tipo de moeda permitido, conforme definido por uma lista estática. A lista estática dessa amostra permite GBP e JPY como tipos de moeda. Essa validação se aplica somente às linhas marcadas como internacionais.
# The following `NONE` row filter is required.
row_filters:
NONE:
filter_sql_expr: |-
True
# This filters for rows marked as international (INTNL).
INTERNATIONAL_ITEMS:
filter_sql_expr: |-
REGEXP_CONTAINS(item_id, 'INTNL')
# Rule dimensions are optional but let you aggregate reporting.
rule_dimensions:
- consistency
- correctness
- duplication
- completeness
- conformance
- integrity
# Rules can apply to multiple tables or columns.
rules:
VALUE_ZERO_OR_POSITIVE:
rule_type: CUSTOM_SQL_EXPR
dimension: correctness
params:
custom_sql_expr: |-
$column >= 0
VALID_ITEM_ID:
rule_type: REGEX
dimension: conformance
params:
pattern: |-
[A-Z]{5}[0-9]{15}
VALID_CURRENCY_ID:
rule_type: CUSTOM_SQL_EXPR
dimension: integrity
params:
custom_sql_expr: |-
$column in ('GBP', 'JPY')
# Rule bindings associate rules to columns within tables.
rule_bindings:
TRANSACTION_AMOUNT_VALID:
entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
column_id: amount
row_filter_id: NONE
rule_ids:
- VALUE_ZERO_OR_POSITIVE
TRANSACTION_VALID_ITEM_ID:
entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
column_id: item_id
row_filter_id: NONE
rule_ids:
- VALID_ITEM_ID
TRANSACTION_CURRENCY_VALID:
entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
column_id: transaction_currency
row_filter_id: INTERNATIONAL_ITEMS
rule_ids:
- VALID_CURRENCY_ID
Substitua:
PROJECT_ID
: o ID do projeto.DATASET_ID
: o código do conjunto de dados.
Sample 2
Se a tabela a ser verificada fizer parte de um lake do Dataplex, você poderá especificar as tabelas usando a notação de lake ou zona. Isso permite agregar os resultados por lago ou zona. Por exemplo, é possível gerar uma pontuação no nível da zona.
Para usar este exemplo, crie um lake do Dataplex com o ID operations
e o ID da zona procurement
. Em seguida, adicione a tabela sales_orders
como um recurso à zona.
# This is a convenience section that allows you to shorten the entity_uri
metadata_registry_defaults:
dataplex:
projects: PROJECT_ID
locations: REGION_ID
lakes: operations
zones: procurement
# You have to define a NONE row filter
row_filters:
NONE:
filter_sql_expr: |-
True
INTERNATIONAL_ITEMS:
filter_sql_expr: |-
REGEXP_CONTAINS(item_id, 'INTNL')
# rule dimensions are optional but allow you to aggregate reporting.
rule_dimensions:
- consistency
- correctness
- duplication
- completeness
- conformance
- integrity
# Rules can be shared across tables or columns.
rules:
VALUE_ZERO_OR_POSITIVE:
rule_type: CUSTOM_SQL_EXPR
dimension: correctness
params:
custom_sql_expr: |-
$column >= 0
VALID_ITEM_ID:
rule_type: REGEX
dimension: conformance
params:
pattern: |-
[A-Z]{5}[0-9]{15}
VALID_CURRENCY_ID:
rule_type: CUSTOM_SQL_EXPR
dimension: integrity
params:
custom_sql_expr: |-
$column in ('GBP', 'JPY')
#rule bindings associate rules to {table, column}
rule_bindings:
TRANSACTION_AMOUNT_VALID:
entity_uri: dataplex://projects/PROJECT_ID/locations/REGION_ID/lakes/operations/zones/procurement/entities/sales_orders
column_id: amount
row_filter_id: NONE
rule_ids:
- VALUE_ZERO_OR_POSITIVE
TRANSACTION_VALID_ITEM_ID:
entity_uri: dataplex://zones/procurement/entities/sales_orders # omitting projects/locations/lakes from uri path to use the default values specified in metadata_registry_defaults
column_id: item_id
row_filter_id: NONE
rule_ids:
- VALID_ITEM_ID
TRANSACTION_CURRENCY_VALID:
entity_uri: dataplex://zones/procurement/entities/sales_orders
column_id: transaction_currency
row_filter_id: INTERNATIONAL_ITEMS
rule_ids:
- VALID_CURRENCY_ID
Substitua:
- PROJECT_ID: o ID do projeto.
- REGION_ID: o ID da região do lago do Dataplex em que a tabela existe, como
us-central1
.
Exemplo 3
Este exemplo aprimora o Exemplo 2 adicionando uma verificação SQL personalizada para saber se os valores de ID são exclusivos.
# This is a convenience section that allows you to shorten the entity_uri
metadata_registry_defaults:
dataplex:
projects: PROJECT_ID
locations: REGION_ID
lakes: operations
zones: procurement
# You have to define a NONE row filter
row_filters:
NONE:
filter_sql_expr: |-
True
INTERNATIONAL_ITEMS:
filter_sql_expr: |-
REGEXP_CONTAINS(item_id, 'INTNL')
# rule dimensions are optional but allow you to aggregate reporting.
rule_dimensions:
- consistency
- correctness
- duplication
- completeness
- conformance
- integrity
# Rules can be shared across tables or columns.
rules:
# This rule is parameterized with column_names as parameter
NO_DUPLICATES_IN_COLUMN_GROUPS:
rule_type: CUSTOM_SQL_STATEMENT
dimension: duplication
params:
custom_sql_arguments:
- column_names
custom_sql_statement: |-
select a.*
from data a
inner join (
select
$column_names
from data
group by $column_names
having count(*) > 1
) duplicates
using ($column_names)
VALUE_ZERO_OR_POSITIVE:
rule_type: CUSTOM_SQL_EXPR
dimension: correctness
params:
custom_sql_expr: |-
$column >= 0
VALID_ITEM_ID:
rule_type: REGEX
dimension: conformance
params:
pattern: |-
[A-Z]{5}[0-9]{15}
VALID_CURRENCY_ID:
rule_type: CUSTOM_SQL_EXPR
dimension: integrity
params:
custom_sql_expr: |-
$column in ('GBP', 'JPY')
#rule bindings associate rules to {table, column}
rule_bindings:
TRANSACTIONS_UNIQUE:
entity_uri: dataplex://projects/PROJECT_ID/locations/REGION_ID/lakes/operations/zones/procurement/entities/sales_orders
column_id: id
row_filter_id: NONE
rule_ids:
- NO_DUPLICATES_IN_COLUMN_GROUPS:
column_names: "id"
TRANSACTION_AMOUNT_VALID:
entity_uri: dataplex://zones/procurement/entities/sales_orders # omitting projects/locations/lakes from uri path to use the default values specified in metadata_registry_defaults
column_id: amount
row_filter_id: NONE
rule_ids:
- VALUE_ZERO_OR_POSITIVE
TRANSACTION_VALID_ITEM_ID:
entity_uri: dataplex://zones/procurement/entities/sales_orders
column_id: item_id
row_filter_id: NONE
rule_ids:
- VALID_ITEM_ID
TRANSACTION_CURRENCY_VALID:
entity_uri: dataplex://zones/procurement/entities/sales_orders
column_id: transaction_currency
row_filter_id: INTERNATIONAL_ITEMS
rule_ids:
- VALID_CURRENCY_ID
Exemplo 4
Este exemplo melhora o Exemplo 3 adicionando validações incrementais usando a coluna last_modified_timestamp
. É possível adicionar validações incrementais para uma
ou mais vinculações de regras.
# This is a convenience section that allows you to shorten the entity_uri
metadata_registry_defaults:
dataplex:
projects: PROJECT_ID
locations: REGION_ID
lakes: operations
zones: procurement
# You have to define a NONE row filter
row_filters:
NONE:
filter_sql_expr: |-
True
INTERNATIONAL_ITEMS:
filter_sql_expr: |-
REGEXP_CONTAINS(item_id, 'INTNL')
# rule dimensions are optional but allow you to aggregate reporting.
rule_dimensions:
- consistency
- correctness
- duplication
- completeness
- conformance
- integrity
# Rules can be shared across tables or columns.
rules:
# This rule is parameterized with column_names as parameter
NO_DUPLICATES_IN_COLUMN_GROUPS:
rule_type: CUSTOM_SQL_STATEMENT
dimension: duplication
params:
custom_sql_arguments:
- column_names
custom_sql_statement: |-
select a.*
from data a
inner join (
select
$column_names
from data
group by $column_names
having count(*) > 1
) duplicates
using ($column_names)
VALUE_ZERO_OR_POSITIVE:
rule_type: CUSTOM_SQL_EXPR
dimension: correctness
params:
custom_sql_expr: |-
$column >= 0
VALID_ITEM_ID:
rule_type: REGEX
dimension: conformance
params:
pattern: |-
[A-Z]{5}[0-9]{15}
VALID_CURRENCY_ID:
rule_type: CUSTOM_SQL_EXPR
dimension: integrity
params:
custom_sql_expr: |-
$column in ('GBP', 'JPY')
#rule bindings associate rules to {table, column}
rule_bindings:
TRANSACTIONS_UNIQUE:
entity_uri: dataplex://projects/PROJECT_ID/locations/REGION_ID/lakes/operations/zones/procurement/entities/sales_orders
column_id: id
row_filter_id: NONE
incremental_time_filter_column_id: last_modified_timestamp
rule_ids:
- NO_DUPLICATES_IN_COLUMN_GROUPS:
column_names: "id"
TRANSACTION_AMOUNT_VALID:
entity_uri: dataplex://zones/procurement/entities/sales_orders # omitting projects/locations/lakes from uri path to use the default values specified in metadata_registry_defaults
column_id: amount
row_filter_id: NONE
incremental_time_filter_column_id: last_modified_timestamp
rule_ids:
- VALUE_ZERO_OR_POSITIVE
TRANSACTION_VALID_ITEM_ID:
entity_uri: dataplex://zones/procurement/entities/sales_orders
column_id: item_id
row_filter_id: NONE
incremental_time_filter_column_id: last_modified_timestamp
rule_ids:
- VALID_ITEM_ID
TRANSACTION_CURRENCY_VALID:
entity_uri: dataplex://zones/procurement/entities/sales_orders
column_id: transaction_currency
row_filter_id: INTERNATIONAL_ITEMS
incremental_time_filter_column_id: last_modified_timestamp
rule_ids:
- VALID_CURRENCY_ID
Resolver problemas de regras com falhas usando failed_records_query
Para cada regra que falha, a tabela de resumo armazena uma consulta na coluna failed_records_query
que pode ser usada para receber registros com falha.
Para depurar, também é possível usar reference columns
no arquivo YAML, o que permite mesclar a saída de failed_records_query
com os dados originais para acessar o registro inteiro. Por exemplo, é possível especificar uma coluna primary_key
ou uma coluna primary_key
composta como uma coluna de referência.
Especificar as colunas de referência
Para gerar colunas de referência, adicione o seguinte à sua especificação YAML:
Seção
reference_columns
. Nesta seção, é possível criar um ou mais conjuntos de colunas de referência, cada um especificando uma ou mais colunas.Seção
rule_bindings
. Nesta seção, é possível adicionar uma linha a uma vinculação de regra que especifica um ID de coluna de referência (reference_columns_id
) a ser usado pelas regras nessa vinculação de regra. Precisa ser um dos conjuntos de colunas de referência especificados na seçãoreference_columns
.
Por exemplo, o arquivo YAML a seguir especifica uma seção reference_columns
e define três colunas: id
, last_modified_timestamp
e item_id
como parte do conjunto ORDER_DETAILS_REFERENCE_COLUMNS
. O
exemplo a seguir usa a tabela de exemplo sales_orders
.
reference_columns:
ORDER_DETAILS_REFERENCE_COLUMNS:
include_reference_columns:
- id
- last_modified_timestamp
- item_id
rules:
VALUE_ZERO_OR_POSITIVE:
rule_type: CUSTOM_SQL_EXPR
params:
custom_sql_expr: |-
row_filters:
NONE:
filter_sql_expr: |-
True
rule_bindings:
TRANSACTION_AMOUNT_VALID:
entity_uri: bigquery://projects/PROJECT_ID/datasets/DATASET_ID/tables/sales_orders
column_id: amount
row_filter_id: NONE
reference_columns_id: ORDER_DETAILS_REFERENCE_COLUMNS
rule_ids:
- VALUE_ZERO_OR_POSITIVE
Como usar a consulta de registros com falha
A consulta de registros com falha gera uma linha para cada registro com uma regra que falhou. Isso inclui o nome da coluna que acionou a falha, o valor que acionou a falha e os valores das colunas de referência. Também inclui metadados que podem ser usados para estabelecer uma relação à execução da tarefa de qualidade de dados.
Veja a seguir um exemplo de saída de uma consulta de registros com falha para o arquivo YAML descrito em Especificar colunas de referência. O exemplo mostra uma falha para a coluna amount
e um valor com falha de -10
. Também registra o valor correspondente para a coluna de referência.
_dq_validation_invocation_id | _dq_validation_rule_binding_id | _dq_validation_rule_id | _dq_validation_column_id | _dq_validation_column_value | _dq_validation_dimension | _dq_validation_simple_rule_row_is_valid | _dq_validation_complex_rule_validation_errors_count | _dq_validation_complex_rule_validation_success_flag | id | last_modified_timestamp | item_id |
---|---|---|---|---|---|---|---|---|---|---|---|
10a25be9-8dfa-446c-a42c-75f6bb4a49d9 | TRANSACTION_AMOUNT_VALID | VALUE_ZERO_OR_POSITIVE | amount | -10 | FALSO | order1 | 2022-01-22T02:30:06.321Z | bad_item_id |
Usar consultas de registros com falha para regras CUSTOM_SQL_STATEMENT
Para regras CUSTOM_SQL_STATEMENT
, as consultas de registro com falha incluem a coluna
custom_sql_statement_validation_errors
. A coluna custom_sql_statement_validation_errors
é aninhada com campos que correspondem à saída da instrução SQL. As colunas
de referência não estão incluídas nas consultas de registros com falha para regras CUSTOM_SQL_STATEMENT
.
Por exemplo, sua regra CUSTOM_SQL_STATEMENT
pode ser assim:
rules: TEST_RULE: rule_type: CUSTOM_SQL_STATEMENT custom_sql_arguments: - existing_id - replacement_id params: CUSTOM_SQL_STATEMENT: |- (SELECT product_name, product_key FROM data where $existing_id != $replacement_id)
custom_sql_statement_validation_errors
, com uma linha para cada ocorrência em que existing_id!=replacement_id
.
Quando renderizada em JSON, o conteúdo de uma célula na coluna pode ter a seguinte aparência:
{ "custom_sql_statement_valdation_errors" :{ "product_name"="abc" "product_key"="12345678" "_rule_binding_id"="your_rule_binding" } }
É possível mesclar esses resultados com a tabela original com uma referência aninhada, como join on custom_sql_statement_valdation_errors.product_key
.
A seguir
- Consulte a referência da especificação YAML do CloudDQ.
- Para conferir exemplos de regras de qualidade de dados, consulte Regras simples e Regras avançadas.
- Consulte Exemplo de DAG do Airflow para tarefa de qualidade de dados do Dataplex.