Cloud Fusion

Working with Cloud Fusion

Pipeline to redact PII (phone number)

Roles required

Service Account

Role

Description

Cloud Data Fusion API Service Agent

Gives Cloud Data Fusion service account access to Service Networking, Cloud Dataproc, Cloud Storage, BigQuery, Cloud Spanner, and Cloud Bigtable resources.

  • Compute Engine default service account

  • Cloud Data Fusion Runner

  • Cloud Data Fusion API Service Agent Editor

Access to data fusion runtime resources.

Gives Cloud Data Fusion service account access to Service Networking, Cloud Dataproc, Cloud Storage, BigQuery, Cloud Spanner, and Cloud Bigtable resources.

Components:

  • Source

  • Transform

  • Analytics

  • Sink

  • Conditions and Actions

Pipeline Import

Transform

Wrangler

Deploying the Pipeline

Templating

Pipeline configure

Deploy

Runtime arguments

Key

Value

system.profile.name

SYSTEM:dataproc

Airflow and Data fusion

.https://airflow.apache.org/docs/apache-airflow-providers-google/stable/operators/cloud/datafusion.html

By populating the Data Fusion operator with a few parameters, we can now deploy, start and stop pipelines.

Start a data pipeline - https://airflow.apache.org/docs/apache-airflow-providers-google/stable/operators/cloud/datafusion.html#start-a-datafusion-pipeline

start_pipeline = CloudDataFusionStartPipelineOperator(
    location=LOCATION,
    pipeline_name=PIPELINE_NAME,
    instance_name=INSTANCE_NAME,
    task_id="start_pipeline",
)

Order of task flow using bit shift operator

gcs_sensor_task >> start_pipeline

Data Lineage

.https://cloud.google.com/solutions/architecture-concept-data-lineage-systems-in-a-data-warehouse

Abstract

Lineage extraction flowchart

Example

Shipment data

Dataset level lineage

Field level lineage

This shows the history of changes a particular field has gone through.

Troubleshooting

If memory is not sufficient increase the executor memory quota in the pipeline config.

PROVISION task failed in REQUESTING_CREATE state for program run program_run:default.Reusable-Pipeline.-SNAPSHOT.workflow.DataPipelineWorkflow.a48de625-b92a-11eb-b814-66625c4294b9 due to Dataproc operation failure: INVALID_ARGUMENT: Insufficient 'DISKS_TOTAL_GB' quota. Requested 3000.0, available 1096.0.

Last updated

Was this helpful?