Raw Data Ingestion into Delta Lake Bronze tables using Azure Synapse Mapping Data Flow

By:   |   Updated: 2021-11-26   |   Comments (2)   |   Related: 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | 10 | 11 | 12 | 13 | 14 | 15 | 16 | 17 | > Azure Synapse Analytics


Problem

This tutorial is part of a series of posts, dedicated to the building of a Lakehouse solution, based on Delta Lake and Azure Synapse Analytics technologies. In the previous post (see Data Ingestion Into Landing Zone Using Azure Synapse Analytics), we've built a Synapse Analytics pipeline, that deposits JSON and Parquet files into the landing zone. The next logical step in this ingestion will read from the landing zone and push the data into the Bronze layer Delta Lake tables, and this is what we'll be covering here.

Solution

Bronze Ingestion Design Considerations

Just like Landing Zone ingestion, Bronze Delta Lake ingestion doesn't involve many transformations. However, I'd like to discuss a few design considerations that are part of any Data warehousing project:

  • Metadata capture. It's a good practice to include metadata info, like ingestion timestamp, source file names in the Bronze tables.
  • The tooling considerations. The pipeline should be able to read from the sources and write into the destination locations. Although some data sources can be accessed by many tools, there're some (like Delta Lake sources) that can be accessed by only a few tools.
  • Re-usability and modular design. Typically, ingestion pipelines operate on many tables with wit similar ingestion requirements. Therefore, it's worth creating nested parameterized pipelines, that can reduce the development efforts.
  • Change history maintenance. The ingestion pipelines should handle the rows changed in the source on repetitive ingestions. There're a few common approaches to handle that:
    • Updating changed target rows. This approach requires upsert (merge) functionality to update the changed rows. As a result, the target table will contain a single version of each row, reflecting the latest state of the corresponding source row. This approach also requires a slightly more complex ingestion design, as you'd need to provide primary keys for each table.
    • Appending changed source rows into the target table as new versions. Preserving change history is one of the key requirements when building a solution that is subject to strict financial and security restrictions. The Bronze ingestion pipelines designed with this approach are simpler in comparing the ones that require updates. However, because the destination table will contain multiple versions of each row, this approach requires some kind of deduplication in the next phases of ingestion. Optionally, this approach allows you to choose whether or not to truncate the target table.

The Data Mapping Flow is capable of addressing all of the above mentioned needs, so we'll use it extensively throughout the next few posts. Although I'll explain all of the configuration steps in great detail, familiarity with the basics of the Data Mapping Flow, as well as Azure Synapse Studio would certainly help you understand further material better, so I encourage you to review the material included at the end of this post.

Our pipeline will have two layers:

  • Child layer, based on Data Mapping Flow technologies. We'll create two data flows-one for each data format. These flows will be parameterized, with the source/destination table names as parameters.
  • Parent layer, based on Synapse Data Integration technologies. We'll create a single pipeline, which will loop through required table lists and call the child mapping flow.

As to the change history maintenance, I've chosen an append approach, to keep Bronze ingestion simple. I'll also demonstrate deduplicating technics from Bronze tables when we discuss the next transformation phases.

Building the first child data flow

Now that we've made design choices, let's get our hands dirty and start building the first child flow to ingest JSON files! Open Synapse Analytics Studio, and add a new data flow:

Open Synapse Analytics Studio and add a new data flow

Figure 1

Let's add the following parameters, that will be supplied by the parent flow:

  • SourceTableName
  • TargetTableName

Here's the screenshot:

add parameters to data flow

Figure 2

Before we proceed, let's turn on the debugging option, so that we could interactively validate the output from each component, as we add them into the data flow. Once it's turned on, the Debug settings button will be available:

enable debugging

Figure 3

Open Debug settings, switch to the Parameters tab and enter following sample parameter values:

  • SourceTableName: "SalesLTAddress"
  • TargetTableName: "Address"

These parameters will be needed only for debugging purposes, as we build our data flow.

debug settings

Figure 4

Next, navigate to the design surface and add a source named JsonRaw. Create a new dataset of Azure Blob Storage type, as follows:

new dataset azure blog storage

Figure 5

Next, select JSON as a file format:

select json format

Figure 6

Next, assign the name to the data source and create Azure Blob Storage linked service (I've named it AzureBlobStorage1), pointing to the storage account created earlier. Click a folder button and navigate to the raw/json folder that contains JSON source files:

set json properties

Figure 7

Next, switch to the Source options tab, open expression builder for Wildcard paths textbox and add following expression:

concat("raw/json/",$SourceTableName,".json")

This expression will construct a file path to the source files, based on the table name parameter.

Next add SourceFileName in the Column to store file name textbox, to allow capturing the source file paths. Select Delete source files button, to ensure the source files are removed after successful ingestion. Here's the screenshot with the required settings:

set source options

Figure 8

Now that the source component is configured, let's switch to the Data preview window and validate the data:

data preview

Figure 9

Next, let's add a derived column transformation and name it MetadataColumns. Add calculated column DateInserted with the following expression:

currentTimestamp()

Here's the screenshot with the required settings:

build expression

Figure 10

Switch to Data preview tab again, to ensure that newly added columns are good:

data preview tab

Figure 11

Finally, add a sink component and name it Delta.Select Inline sink type, then Delta as Inline dataset type, and select the linked service AzureBlobStorage1, as follows:

add a sink for delta

Figure 12

Navigate to the Settings tab, open expression builder for Folder path field and enter the following expression:

concat("delta/bronze/",$TargetTableName,"/")

This expression will construct the destination path for the Delta Lake table, based on the TargetTableName parameter.

Here's the relevant screenshot:

settings values

Figure 13

Notice that I've selected Allow insert checkbox as an update method.

This concludes the data flow for JSON files, so navigate to the Data preview tab to ensure data looks good and commit your work.

Building the second child data flow

Our second data flow to fetch parquet files will be similar to the first one. So, let's clone DataflowLandingBronzeJson flow and rename it as DataflowLandingBronzeParquet. Create a new dataset of Parquet format:

source settings

Figure 14

Navigate to the Source options tab and enter the following expression in the Wildcard paths textbox:

concat("raw/parquet/",$SourceTableName,".parquet")

Building the parent pipeline

Let's navigate to Synapse Studio's Data Integration design page, add a pipeline and name it CopyRawToDelta.

Next, add array type pipeline variable, and name it JsonTables.This variable will contain source/destination parameter values for each JSON file. Assign the following value to the JsonTablesvariable.

[{"source":"SalesLTCustomer","destination":"Customer"},{"source":"SalesLTAddress","destination":"Address"},{"source":"SalesLTCustomerAddress","destination":"CustomerAddress"}]

This string contains one set of parameters for each table, with their respective source/destination parts. Similarly, add a variable ParquetTables with the following value, to handle Parquet file parameter sets:

[{"source":"SalesLTProduct","destination":"Product"},{"source":"SalesLTProductCategory","destination":"ProductCategory"},{"source":"SalesLTProductDescription","destination":"ProductDescription"},{"source":"SalesLTProductModel","destination":"ProductModel"},{"source":"SalesLTSalesOrderDetail","destination":"SalesOrderDetail"},{"source":"SalesLTSalesOrderHeader","destination":"SalesOrderHeader"}]

Next, add ForEach activity and enter the following expression in the Items field:

@variables('JsonTables')

Here's the screenshot with the required settings:

for each activity

Figure 15

Open the configuration of the ForEach activity, add a data flow activity and select DataflowLandingBronzeJson flow, as follows:

for each settings

Figure 16

Navigate to the Parameters tab and enter following expressions for data flow parameters:

  • SourceTableName: @item().source
  • TargetTableName: @item().destination

These expressions will extract the source/destination parts for each set of the table-specific parameter set.

Here's the screenshot:

data flow parameters

Figure 17

Now, let's add ForEach activity with similar steps to call DataflowLandingBronzeParquet data flow. Here's the screenshot with the final design:

for each activity

Figure 18

This concludes the parent pipeline configuration. Let's execute the pipeline in debug mode and ensure that it succeeds:

execute pipeline in debug mode

Figure 19

Registering Delta Lake tables

The Delta Lake tables can be read and written using Delta Lake APIs, and that's the method used by Data Flow. However, we can also register these tables in the Hive meta store, which can help us to query these tables using Spark SQL. So, we'll create Spark tables, to browse and validate our tables.

Let's navigate to the Develop tab and create a notebook:

register new notebook

Figure 20

Select Spark SQL as a language, attach the Spark pool created earlier and enter the following command in the first cell, to create a Bronze database:

CREATE DATABASE IF NOT EXISTS bronze LOCATION "/delta/bronze/"

Here's a related screenshot:

create database

Figure 21

Add the following commands in the next cell, to create Spark tables on top of the data folders, where we've ingested the Bronze data:

USE bronze;
CREATE TABLE IF NOT EXISTS Address USING DELTA LOCATION "/delta/bronze/Address/";
CREATE TABLE IF NOT EXISTS Customer USING DELTA LOCATION "/delta/bronze/Customer/";       
CREATE TABLE IF NOT EXISTS CustomerAddress USING DELTA LOCATION "/delta/bronze/CustomerAddress/";
CREATE TABLE IF NOT EXISTS Product USING DELTA LOCATION "/delta/bronze/Product/";  
CREATE TABLE IF NOT EXISTS ProductCategory USING DELTA LOCATION "/delta/bronze/ProductCategory/"; 
CREATE TABLE IF NOT EXISTS ProductDescription USING DELTA LOCATION "/delta/bronze/ProductDescription/";
CREATE TABLE IF NOT EXISTS ProductModel USING DELTA LOCATION "/delta/bronze/ProductModel/";       
CREATE TABLE IF NOT EXISTS SalesOrderDetail USING DELTA LOCATION "/delta/bronze/SalesOrderDetail/";
CREATE TABLE IF NOT EXISTS SalesOrderHeader USING DELTA LOCATION "/delta/bronze/SalesOrderHeader/";      

Finally, add following query in the next cell, to browse the table content:

Select * from bronze.SalesOrderDetail

Let's execute both cells and examine the sample data:

create tables

Figure 22

We can also run some basic validation queries, to ensure that row counts match in the source and destination:

query tables

Figure 23

Next Steps


sql server categories

sql server webinars

subscribe to mssqltips

sql server tutorials

sql server white papers

next tip



About the author
MSSQLTips author Fikrat Azizov Fikrat Azizov has been working with SQL Server since 2002 and has earned two MCSE certifications. He’s currently working as a Solutions Architect at Slalom Canada.

This author pledges the content of this article is based on professional experience and not AI generated.

View all my tips


Article Last Updated: 2021-11-26

Comments For This Article




Friday, July 15, 2022 - 6:39:20 PM - Amanda Back To Top (90267)
I've followed these steps and was able to create the delta files (parquet files, _delta_log transaction files). When I try to run the CREATE TABLE scripts, I consistently get an error message that says I am trying to create an external table but the schema is not specified when the input path is empty. The path is not empty - it has parquet files and json files in the delta log folder. I have tried this from Synapse and from a Databricks workspace and consistently get this error.

I've tried specifying the column list for one of my sources and it was able to specify the table but when I query the table, it is empty. When I use serverless SQL to query the parquet file directly I can see that it is not empty. Do you have any suggestions?

Sunday, December 5, 2021 - 10:08:49 PM - Jordan Back To Top (89532)
Great series! Can’t wait for the rest














get free sql tips
agree to terms