Development Guide
Overview
This guide introduces you to key concepts and practices to help you build efficient data pipelines and leverage the various features and advanced capabilities of DataZen.
Basic Rules
Let's start with basic rules and syntax. These conventions are necessary to ensure that the pipelines are well constructed.
- A pipeline must have either a SELECT or a LOAD operation to be saved; if LOAD is found first, the pipeline is saved as a Writer; otherwise, it is saved as a Reader.
- A pipeline may have multiple SELECT operations but can only have 1 PUSH operation.
- DECLARE operations must be specified on top of the pipeline script (if any) and TRIGGER operations must be found at the very end.
- Every operation must end with a semi-column.
- Connection keys must be specified within square brackets and are case-sensitive.
- Parameters of SQL CDC commands can either be provided in parenthesis or single-quotes. When inside single-quotes, single-quotes must be escaped (ex: PRINT 'o''hare';), the content is usually trimmed. When using parenthesis, single-quotes do not need to be escaped; in addition, the content within parenthesis is never trimmed.
Scripting Convention
You can include comments in your SQL CDC Script anywhere.
Using Parenthesis
When content is specified within parenthesis, it is treated as-is.
PRINT (this will be printed normally);However, additional nested parenthesis must be closed -- this will generate a compilation error:
PRINT (#now(s));), but additional parenthesis within single quotes are ignored
PRINT ('1) What time is it?');
When additional parenthesis should be included within the content, consider using the '...' notation instead.
Using Square Brackets
Content within parenthesis is always treated as-is
SELECT * FROM DB [connection]
Using Single Quotes
Content inside single quotes will be escaped when two single quotes are provided
PRINT 'Name: O''Hare';Parenthesis within single quotes have no effect
PRINT '1) do this, 2) do that';
Job Parameters
A pipeline may access job-specific parameters that represent both metadata and runtime variables about the pipeline. Not all variables are available all the time; for example, Writer pipelines have additional variables that provide metadata about the source data being processed, while other variables are only meaningful in the read pipeline.
Pipeline variables are read-only and case-sensitive.
Variables
- @agentid: the cloud agent identifier currently executing the pipeline
- @executionid: the current execution id of the pipeline (Unix Timestamp)
- @executionguid: the current execution session id of the pipeline (Guid)
- @guid: the unique identifier of the pipeline executing (Guid)
- @ispreview: 1 during preview operations; 0 when the pipeline is executing live
- @jobkey: the name of the pipeline executing (TEST in preview)
- @peakmb: the peak memory used by the pipeline in MB
- @maxpeakmb: the maximum peak memory a pipeline can consume in MB
- @rundate: the current execution date/time
- @upsertcolumns: when CDC is engaged using the CAPTURE operation, this variable holds the key columns that make a record unique
Reader-Only Variables
- @pagingmarker: the current paging marker used during SELECT HTTP operations
- @pagingcount: the current 0-based page count used during SELECT HTTP operations
- @pagingindex: the current 1-based page index used during SELECT HTTP operations (pagingcount + 1)
- @recordcount: the current 0-based record count retrieved so far by a SELECT HTTP operations
- @recordindex: the current 1-based record index retrieved so far by a SELECT HTTP operations (recordcount + 1)
- @sourcebatchindex: the current 1-based page index being processed by a SELECT BATCH DB operation
Writer-Only Variables
- @batchid: a unique Guid representing a batch of records when BATCH N is used in PUSH operations
- @packagefile: the name of the change being executed
- @sourceexecutionid: the original source execution id of the Read pipeline that created the change log being executed (Unix Timestamp)
- @sourceobject: the name of the source object of a SELECT operation
- @sourceobjectname: when the source object is a file name or URL, the name of the resource or file
- @startindex: 1-base index value of the record count processed so far (@totalwrite + 1)
- @status: the push operation status (1: success, 0: failure)
- '@status': the push operation status as a string (success, failure)
- @targetobject: the name of the target object of a PUSH operation
- @totalwrite: total number of records written so far
Count Variables
- @@colcount: the number of columns in the data pipeline
- @@rowcount: the number of records in the data pipeline
For example, you can enter an IF block if you detect 0 records are left in the data pipeline:
SELECT * FROM DB [sql2017] (SELECT * FROM sys.databases); APPLY FILTER '1=0'; IF (@@rowcount = 0) BEGIN PRINT 'No records were found... exiting pipeline @jobkey.' LEVEL 'warning'; EXIT KEEP 0; END;
This example shows how to enter an IF block only when debugging the pipeline in preview mode:
SELECT * FROM DB [sql2017] (SELECT * FROM sys.databases); IF (@ispreview = 1) BEGIN PRINT 'Current pipeline in-memory foorprint: #data.sizekb() (KB)'; END;
DataZen Functions
DataZen provides a number of functions that can be applied almost anywhere in the SQL CDC script. Functions are evaluated before the start of each SQL command or for every row of a data pipeline, depending on the decorator used. Calling a function can be done in one of the following ways:
The list of DataZen functions is available here.
- #function() - The function is evaluated immediately before the execution of an SQL command
- @function() - The function is evaluated during the execution of an SQL command for every row in the data pipeline
For example, the following uses both a script-level and row-level (late-bound) function call:
APPLY SCHEMA (
string globalGuid = '#rndguid()' -- same guid for all rows
string rowGuid = '@rndguid()' -- different guid for each row
);
The following SQL commands can operate per row:
- APPLY HTTP when using the left join or inner join PROCESSING mode
- EXEC ON DB when using the PER_ROW processing option
- PUSH commands with BATCH 1
- PUSH commands with BATCH N and combined with one of the array operators (@concatjson, @concatjsonarr, @concatxml) [see Payload Formatting below]
Data Pipeline Functions
In addition to the DataZen discussed above, additional pipeline data functions are available. The difference is that they operate on the pipeline dataset directly by row and column or as a batch operation.
- #data.value(i,j) or #data.value(i,field) - returns the value of a specific cell given the row index (i) and the column index (j) or name
- #data.calc_hwm(col1, lastmark) - compares the values found in column col1 with a previous known value and returns the highest value of the comparison using a string comparer
- #data.calc_hwmdate(col1, lastmark) - compares the values found in column col1 with a previous known value and returns the highest value of the comparison using a datetime comparer
- #data.calc_hwmnumber(col1, lastmark) - compares the values found in column col1 with a previous known value and returns the highest value of the comparison using a numeric comparer
- #data.count() or data.count(filter) - returns the number of records found in the pipeline dataset with or without a SQL filter
- #data.sizekb() - returns the size of the pipeline dataset in memory in KB
- #data.calc(operation, filter) - performs a set-based calculation on the pipeline dataset using the SQL operation provided using an optional SQL filer (ex: #data.calc("AVG([age]"), "age > 0"))
For example, you can conditionally enter an IF-block like this:
IF (#data.value(0,statusCode) >= 300) BEGIN PRINT 'Http call was unsuccessful... exiting' LEVEL 'warning'; EXIT KEEP 0; END;
Pipeline Field Values
After selecting data from a source system or loading a change log, the pipeline contains data that can be accessed from the SQL CDC script directly. Some SQL operations automatically operate "per row", such as the APPLY HTTP with 'left join' processing, or a PUSH operation with BATCH 1. In these examples, accessing the pipeline data is assumed to be in the context of a single record using the field notation below. In other scenarios, such as the PRINT operation, accessing the pipeline data requires using the #data.value() DataZen function since the PRINT operation does not operate per row.
- {{field}} - reads the 'field' column name in the current row being processed
- {{{{field}}}} - escapes the 'field' column name (normally used when creating pipelines that create other pipelines)
For byte array processing, such as raw file content, you can use a decorator in front of the field name to specify the encoding to be used when converting the byte array to a string or payload (by default, utf8 is assumed):
- {{base64:field}} - converts the byte[] into a base64 encoded string
- {{hex:field}} - converts the byte[] into a hexadecimal representation of the string
- {{ascii:field}} - converts the byte[] into an ascii encoded string
- {{utf8:field}} - converts the byte[] into a utf8 encoded string
- {{unicode:field}} - converts the byte[] into a unicode encoded string
High Watermark Values
Built-in high watermark support is provided for the first SELECT operation in a Read pipeline, for HTTP and DB readers.
- @highwatermark - returns the last known high watermark value or empty string if none exists
- @highwatermarknull - returns the last known high watermark value or null if none exists
For example, this high watermark is automatically calculated and saved by the pipeline. Choosing between @highwatermark or @highwatermarknull depends on the database engine. For example, for MySQL, the following operation keeps a high watermark on the last_update field automatically.
SELECT * FROM DB [mysqldb] (
SELECT * FROM Film WHERE last_update > '@highwatermark'
) WITH HWM 'last_update';
Pipeline Dataset
When executing a EXEC ON DB command, the pipeline dataset is available directly within the native SQL script provided in the body of the operation.
- @pipelinedata() - the serialized dataset of the data pipeline accessible directly within the native SQL script provided
For example, the following operation runs a native SQL command against a cloud database; the @pipelinedata() notation will automatically be detected and the DataZen will first serialize the current pipeline dataset into a temporary table, modify the script to use the temporary table, and drop the temporary table automatically upon completion of the script.
EXEC ON DB [snowflakedb] (
SELECT * FROM @pipelinedata() WHERE state = 'FL' ORDER BY lastname
) WITH REPLACE;
Payload Formatting
When executing a PUSH operation on HTTP, BIGDATA, or DB endpoints, you may need to format JSON or XML payloads. In some cases, the endpoint expects a single object at a time, and in other cases you may be able to send an array of objects for JSON. The following payload formatting operators are available. All these operators accept field names using the field notation previously discussed.
- @concatjson(...) - generates a JSON object (when multiple rows are detected, a comma is automatically added to separate each object)
- @concatjsonarr(...) - generates an array of JSON objects or values by automatically adding surrounding square brackets
- @concatxml(...) - generates an XML document
- @sql(...) - generates an inline SQL operation typically when BATCH is set to 1 (this is usually meant to build an SQL command inside the payload sent to an HTTP endpoint)
- @sql_concat(...) - generates an inline SQL operation typically when BATCH is greater than 1 and concatenates the output on a new line for each row
- @sql_union(...) - generates an inline SQL operation typically when BATCH is greater than 1 and adds a UNION operator for each row
- @sql_unionall(...) - generates an inline SQL operation typically when BATCH is greater than 1 and adds a UNION ALL operator for each row
- @sql_values(...) - builds a list of field values on a new line for each row; you are responsible for adding the necessary quotes around field values
For example, the following sends a batch of up to 50 JSON documents as an array of objects to a Medius APA endpoint to import purchase orders. The various field values are pre-calculated in the pipeline.
Nesting payload formatting operations is not currently supported and may yield unexpected results. For example, this is an unsupported operation: @concatjson(@concatjson({{name}}))
PUSH INTO HTTP [Medius]
ON_UPSERT (POST /integration/masterdata/v1/purchaseorderImportBatches)
CONTENT_TYPE 'application/json'
PAYLOAD (
[@concatjson({
"ErpSourceId": "{{ErpSourceId}}",
"ExternalSystemID": "company[{{companyid}}];{{ExternalSystemID}}",
"companyid": "company[{{companyid}}]",
"Amount": "{{Amount}}",
"CurrencyCode": "{{CurrencyCode}}",
"Orderidentifier": "{{Orderidentifier}}",
"RegisterDate": "{{RegisterDate}}",
"DueDate": "{{DueDate}}",
"IsActive":"true",
"Supplier": "company[{{companyid}}];{{Supplier}}",
"Reference": "{{Reference}}",
"PaymentTerm": "company[{{companyid}}];{{PaymentTerm}}",
"OrderType": "{{OrderType}}",
"reference2":"{{Shiplocation}}",
"PurchaseOrderLines": {{lines}}
})]
)
WITH BATCH 50;
APPLY TX
The APPLY TX operation is a powerful in-memory transformation of data (JSON, XML, CSV) into rows and columns.
For information on how to transform XML and JSON documents using this operator, see the Documentation Path help section.
CSV Options
In addition to transforming JSON and XML documents, this operator also supports flat files (comma-delimited and fixed-length). For information on how to transform CSV documents using this operator, see the APPLY TX help section.
SELECT * FROM DRIVE [adls] (*.txt) WITH FORMAT 'csv'; APPLY TX 'headers:1 delimiter:, trim:1';
Date Tokens
Date tokens are primarily used to name files or other objects using the current timestamp or a known field in the data pipeline when creating files. The implementation is slightly different for lower-case and upper-case: when using lower-case, the final output may be either 1 or 2 characters, while upper-case will add leading zeros if needed.
- [yyyy] or [YYYY] - 4-character year
- [yy] - 1 or 2-character year
- [YY] - 2-character year
- [mm] - 1 or 2-character month
- [MM] - 2-character month
- [dd] - 1 or 2-character day
- [DD] - 2-character day
- [doy] - 1 or 2 or 3-character day of year
- [DOY] - 3-character day of year
- [dow] or [DOW] - 1-character day of week
- [hh] - 1 or 2-character hour
- [HH] - 2-character hour
- [nn] - 1 or 2-character minutes
- [NN] - 2-character minutes
- [ss] - 1 or 2-character seconds
- [SS] - 2-character seconds
