Processors

Processors are built-in operators for processing the data by performing various transformations and analytical operations. Each processor is responsible for performing a specific action. You can drag and drop multiple processors and stitch them together to implement the business logic in a pipeline.

 Processor

Advanced Sort

AdvancedSort processor sorts the output fields before further processing the data pipeline. Each field can be then sorted in ascending or descending order. The user can specify multiple sort criteria on multiple fields to achieve secondary/tertiary and n-level sorting on input data.

Configuring AdvancedSort processor for Spark pipelines

Here, Column Order can be selected and corresponding to the same is the Sort Order, which can be set to either ascending and descending.

The output of this processor will arrange the column data in the selected sort order.

Alert

Alert processor notifies users about occurrence of a specific event during pipeline execution as configured by the user.

Configuring Alert processor for Spark pipelines

To add an Alert processor into your pipeline, drag the processor to the canvas and right click on it to configure.

Field

Description

Alert Name

Specify a name for the alert. The alert is triggered with this name when the criteria is met.

Alert Type

Depending upon the severity of the alert, you can provide an Alert Type.

General: low severity

Critical: high severity

Enable

To enable or disable the alert.

If enabled, alerts are generated based on the criteria specified.

If disabled, no alerts are generated.

Aggregation

It accumulates incoming records for a specific window interval.

You can enable or disable the Aggregation property.

Enabled: When Aggregation is selected, a single alert is generated for a specific time window and frequency when it matches the alert criteria.

Disabled: When Aggregation is deselected, alert is generated every time whenever it matches the alert criteria. 

Occurs/frequency

Number of occurrences of a repeating event.

times in

Window duration of the event.

Grouping

Grouping field appears only if Aggregation checkbox is selected.


Suppose you want to generate an alert for a device when its temperature goes above 1000 F, for 5 times in the last 30 seconds.


You start by specifying the Alert criteria as: temperature > 1000 F


Then select the Aggregation checkbox and specify values as occurs “5” times in “30”seconds.

 

The system will check for the specified criteria and verify the devices one by one.


You are receiving temperatures for device MAC001, and if it registers more than 1000 F for 5 times in 30 seconds, the system will generate the alert once.


If both Aggregation and Grouping fields are not selected, and criteria specified is: “temperature > 1000 F”. Alert will be generated every time whenever condition is met.

ADD CONFIGURATION

Enables to configure additional properties.

Click on the NEXT button. Specify the Alert criteria here.

 

Field

Description

Criteria

Criteria provides the condition for the Alert to be generated. Multiple condition on same or different fields can be configured and operations (AND & OR) can be applied to them as well. Supported conditions for creating an expression are as follows (applicable as per data type – Text/Number/Decimal):

If you select text field, following function will appear

• contains

• does not contain

• starts with

• does not start with

• ends with

• does not end with

• equals

• not equal to

• less than

Add Group/ Add Rule

Multiple rules and groups can be configured on same or different message fields.

Send Alerts

Specify email address to which notifications should be sent when an alert is generated. You can send mail to multiple recipients with CC and BCC fields.

Frequency

Occurrence of filter condition when specified rule criteria is met.

Click on the NEXT button. Enter the notes in the space provided. Click Save for saving the configuration details.

How to View Generated Alerts

To view generated Alerts, go to your Workspace > Alerts > Information.

The bell icon on top right of the page shows notification of alerts generated. For example, if you see 1 next to bell icon, it indicates 1 alert is generated.

If you click on the bell icon, you can view the alerts generated. The system will display last twenty alerts at a time.

Refresh Interval: The time interval after which you want to refresh or reload the page. For example, if refresh interval is 5 minutes, page will be refreshed after 5 minutes.

Actions:

The following two types of actions can be performed on the Alerts.

1. View Description: The Alert description which you entered at the time of creation of Alert can be viewed by clicking on the eye icon.

2. View Message Source: The source record for which alert is generated can be viewed by clicking on the View Message Source icon.

Aggregation

The aggregation processor is used for performing the operations like min, max, average, sum and count over the streaming data.


For example, you have an IOT device, which generates an event comprising of three fields: id, time and action.


You wish to calculate how many actions of each type happened in a window of 15 seconds duration. You can make use of Aggregation (COUNT) operation for calculating the number of actions of each type from total events received in a window of 15 seconds duration.

The following table lists the functions supported by Aggregation processor.

 

Average

Calculates average of selected fields and also assigns it to provided output field.

Sum

Sum function totals all the selected fields and also assigns it to provided output field.

Count

Count values of selected input fields and also assigns it to provided output field.

Minimum

Calculates the minimum value of selected fields and also assigns it to provided output field.

Maximum

Calculates the maximum value of selected fields and also assigns it to provided output field.

First

Returns the first value of `expr` for a group of rows. If `isIgnoreNull` is true, returns only non-null values.

Last

Returns the last value of `expr` for a group of rows. If `isIgnoreNull` is true, returns only non-null values.

Median

Percentile

Returns the exact percentile value array of numeric column `col` at the given percentage(s). Each value of the percentage array must be between 0.0 and 1.0. The value of frequency should be positive integral.

Standard Deviation

Returns the sample standard deviation calculated from values of a group.

Variance

Returns the sample variance calculated from values of a group.

Collect List

Collects and returns a list of non-unique elements.

Collect Set

Collects and returns a set of unique elements.

Configuring an Aggregation processor for Spark pipelines:

To add an Aggregation processor into your pipeline, drag the processor to the canvas and right click on it to configure.

Field

Description

Functions

All Aggregation functions are listed here. Select the function from the list which is to be applied over Input fields.

Input Fields

Select the fields from the list on which Aggregation has to be applied.

Output Field

Outcome of the Aggregation function is stored in this field.

Time Window

Type of time window that you want to apply.

Time window options:

• Fixed Length: When selected, configured aggregation functions get applied over a fixed window duration. 

• Sliding Window: When selected, configured aggregation functions get applied over the moving window.

Window Duration

Time window duration in seconds.

Slide Duration

Window slide duration in seconds.

Event Column

Time stamp column of incoming records.

Watermarking

Watermark handles the data which arrives late. The data is considered to be late when it arrives to the system after the end of the window.

The water mark mechanism keeps the window open for specified watermark duration in addition to initial defined window duration.

• Yes: Enables watermarking.

• No: Disables watermarking.

Watermark duration

It keeps the window open for the specified watermark duration in addition to initial defined window duration.

• Yes: when selected Yes, watermarking will be applied.

• No: When selected No, watermarking will not be applied.

Group By

Applies grouping based on the selected fields.

• Yes: Enables selection of grouping fields.

• No: Disables selection of grouping fields. This is the default option.

Grouping Fields

Select fields on which grouping is to be applied.

Click on the NEXT button. Enter the notes in the space provided.

Click SAVE for saving the configuration details.

App ID Generator

The App ID Generator processor will add a new column in the dataset with value as ddmmyyyy eg 06012021.

Configuring App ID Generator processor for Spark pipelines

To add processor into your pipeline, drag the processor to the canvas and right click on it to configure.

Field

Description

Column Name

Name of the column where the generated application ID will be stored. If the provided column name already exists, then the column values will be reassigned with generated application ID.

The user can add further configurations by clicking at the ADD CONFIGURATION button by entering value. Click Next to proceed further.

Binary Avro Parser

Add the Binary Avro Parser processor into your pipeline to be able to parse the binary avro data to JSON string. Drag the processor to the canvas and click on it to configure:

Field

Description

Binary Data Column

Specify the binary data column to be parsed in string format.

Avro Schema

Provide the Avro schema details in JSON format to convert data from binary format.

Parsed Column Name

Provide the new column name to store the parsed data and convert from Binary to String format to produce the JSON string.

ADD CONFIGURATION

User can add further configurations by providing values after clicking the ADD CONFIGURATION option.

Environment Params

ADD PARAMS

User can add further environment parameters as per requirement.

Cache

Configuring the Cache for Spark pipelines.

To add the processor into your pipeline, drag the processor to the canvas and right click on it to configure.

Field

Description

Storage Level

Select option to store RDD.

The below options are available in the drop down list:

Memory Only

Stores RDD as decentralized Java objects in the JVM.

If RDD does not fit in memory, some partitions will not be cached and be recomputed on-the-go each time they are needed (Default).


Memory Only_2

Same as Memory Only but replicates each partition on two cluster nodes.


Memory Only SER

Stores RDD as serialized Java objects for space efficiency (one byte array per partition).


Memory Only SER_2

Same as Memory Only SER but replica of each partition is replicated partitions on two cluster nodes.


Disk Only

Stores the RDD partitions only on disk.


Disk Only_2

Same as Disk Only but the replica of each partition is replicated on two cluster nodes.


Memory and Disk

Store RDD as decentralized Java objects in the JVM. If the RDD does not fit in memory, store partitions that don't fit in disk and read from there when needed.


Memory and Disk_2

Same as Memory and Disk but replicates each partition on two clusters.


Memory and Disk SER

Similar to Memory Only SER, but partitions that don't fit in memory to disk instead of recomputing.


Memory and Disk SER_2

Same as Memory and Disk SER, but the replica of each partition is replicated on two cluster nodes.

Enable Cache

Option to enable caching operation.

Refresh Interval

The time interval after which the application's page is to be reloaded or refreshed.

ADD CONFIGURATION

Additional properties can be added using + ADD CONFIGURATION button.

Environment Params

ADD PARAMS

User can add further environment parameters as per requirement.

Click Next, to proceed further.

Note: Cache cannot be performed over streaming data.

Custom

It is a processor which enables you to create a custom business logic. It is used for components which are not inbuilt within Gathr.

Implements the ‘com.Gathr.framework.api.processor.JSONProcessor ‘interface and provide custom business logic in implemented methods.

You will be able to perform operations on datasets which are not supported by other processors.

Prerequisites

To use a custom processor, you need to create a jar file containing your custom code. Then upload the jar file in a pipeline or as a registered component.

To write a custom logic for your custom Data Source, download the sample project.

Import the downloaded sample project as a maven project in Eclipse. Ensure that Apache Maven is installed on your machine and that the PATH for the same is set.

Implement your custom code. Build the project to create a jar file containing your code, using the following command:

mvn clean install –DskipTests.

custom processor-1

If the maven build is successful, you need to upload the jar on the pipeline canvas.

Custom processor for Spark

Custom processor for Spark supports writing code in Java programming language.

While using Custom processor in pipeline, you need to create a class which implements com.Gathr.framework.api.spark.processor.CustomProcessor interface. Add unimplemented methods in this class to write your own business logic in it.

Shown below is a sample class structure:

There are three methods to implement.

1. Init: Enables to enter any initialization calls.

2. Process: Contains actual business logic.This method is called for each and every tuple.

3. Cleanup: All resource cleanup occurs in this method.

Configuring Custom processor for Spark pipelines

To add a Custom processor into your Spark pipeline, drag the Custom processor on the canvas. Right click on it to configure as explained below.

Field

Description

Implementation Class

Fully qualified name of the class which implements custom processor and has custom logic.


The control will be passed to this class to process incoming dataset.

ADD CONFIGURATION

Additional properties can be added using ADD CONFIGURATION link.

Click on the NEXT button. Enter the notes in the space provided.

Click SAVE for saving the configuration details.

Container

The container processor loads data to Couchbase server and reads data from it.

This processor is used as a caching container which reads both the aggregated data and raw data from the bucket.

Configuring Container processor for Spark pipelines

To add a Container processor into your pipeline, drag the processor to the canvas and right click on it to configure.

Field

Description

Load From Template

Select the template from the list.

Only those templates will be visible in the drop down list that are created globally or within that workspace.

Sync With Source Template

When you use a template configuration, this tab is reflected. If any change is made in the source template, corresponding changes are reflected in Container’s configuration.

Connection Name

Provide a connection name for creating a connection.

Bucket Name

Couchbase bucket name. Select the Couchbase bucket that gets generated with Couchbase connection.


You can also create a new bucket by specifying the bucket name.

you also need to specify bucket password.

Bucket Password

Specify Bucket password in case new bucket is created.

Memory Quota

Memory Quota in megabytes per Couchbase server node. Memory Quota cannot be less than 100 MB.

Replicate View Indexes

By checking the Replicate view indexes checkbox, you ensure that view indexes, as well as data are replicated

Flush

When flushed, all items in the bucket are removed as soon as possible.

Document Id

Unique identifier for newly created documents of Couchbase.

Retention Enable

When selected, each newly created item lives for the number of seconds specified by retention policy. After the expiration time is reached item will be deleted by Couchbase server.

Record Limit

Limit to fetch the records from Couchbase.

When selected Yes, specify number of records to be fetched from the Couchbase server.

When selected No, all records will be fetched from the server.

Max No of Records

Number of records to be pulled from the server.

Overflow Condition

Fetch the records based on order for the selected column. (Fetch minimum records for ascending order and fetch maximum records for descending order)

Note: With aggregation, select the grouping field in the Overflow condition.

Fetch Data

Raw Data:

Fetch the raw data from the Couchbase bucket.

Aggregated Data:

Fetch the aggregated data from the Couchbase. A new field is populated, where you can group the fields from the schema.

Output Fields

Fields in the message that needs to be a part of the output data.

Grouping Fields

Field of selected message on which group-by is applied.

Fields

Select the fields on which function is to be applied.

Click on the NEXT button. An option to Save as Template will be available. Add notes in the space provided and click on Save as Template.

Choose the scope of the Template and Click SAVE for saving the Template and configuration details.

Decoder

Fields are decoded to hide sensitive data.

Base64 is a group of binary-to-text encoding schemes that represent binary data in an ASCII string format by translating it into a radix-64 representation.

Base64 encoding schemes are commonly used when there is a need to encode binary data that needs be stored and transferred over media that are designed to deal with textual data. This is to ensure that the data remains intact without modification during transport.

Configuring Decoder for Spark pipelines

To add a Decoder processor into your pipeline, drag the processor to the canvas and right click on it to configure.

Field

Description

Output Field

The list of columns in which decoded value of selected column will be stored. New Column name can be added to this field (This column will be added to dataset.)

Scheme

Base64 is supported.

Input Field

The list of columns for which you want to decode the field.

Add Field

Add multiple columns for decoding.

Click Next to get the schema detected. Click next to add Notes and then save the configuration.

Decryption

Decryption processor is used to decrypt data coming from data source.

Configuring Decryption processor for Spark pipelines

To add a Decryption processor into your pipeline, drag the processor on to the canvas and right click on it to configure.

Field

Description

Output Field

The decrypted data field.

Algorithm Type

Decryption algorithm used to encrypt data.

Input Field

The field to be decrypted.

Secret Key

A valid secret key used to encrypt data.

Key Format

Format of the secret key, it could be hexadecimal or Base64 encoded.

It is important to encode the binary data to ensure that it is intact without modification when it is stored or transferred.

Algorithm Types

Algorithm

Key Size

Algorithm Values to be Used

AES

128 bit

AES

DES

56 bit

DES

Triple DES

192 bit

DESede

Blowfish

32-448 bit

Blowfish

RSA

1024, 2048, or 4096 bit

RSA

After the decryption, the Detect Schema window will show the decrypted data.

Distinct

Distinct is a core operation of Apache Spark over streaming data. The Distinct processor is used for eliminating duplicate records of any dataset.

Configuring Distinct processor for Spark pipelines

To add a Distinct processor into your pipeline, drag the processor on to the canvas and right click on it to configure.

Enter the fields on which distinct operation is to be performed.

Click on the NEXT button. Enter the notes in the space provided.

Click SAVE for saving the configuration details.

Note: Distinct can't be used right after Aggregation and Pivot processor.

Example to demonstrate how distinct works.

If you apply Distinct on any two fields: Name and Age, then the output for the given fields will be as shown below:

 

Input Set

{Name:Mike,Age:7}

{Name:Rosy,Age:9}

{Name:Jack,Age:5}


{Name:Mike,Age:6}

{Name:Rosy,Age:9}

{Name:Jack,Age:5}

Output Set

{Name:Mike,Age:7}

{Name:Mike,Age:6}

{Name:Rosy,Age:9}

{Name:Jack,Age:5}

Dedup

In applications, you are often encountered with large datasets where duplicate records are available. To make the data consistent and accurate, you need to get rid of duplicate records keeping only one of them in the records.

Dedup processor returns a new dataset after removing all duplicate records.

Configuring Dedup processor for Spark pipelines

To add Dedup processor into your pipeline, drag the processor to the canvas and right click on it to configure.

Field

Description

De-Dup Columns

Columns used for determining the duplicate values.

Watermarking

Yes: When selected Yes, watermarking will be applied.

No: When selected No, watermarking will not be applied.

Watermark Duration

Specify the watermark duration.

eventColumn

Message field of type timestamp.

ADD CONFIGURATION

Additional properties can be added using ADD CONFIGURATION link.

Click on the NEXT button. Enter the notes in the space provided.

Click Save for saving the configuration details.

Example to demonstrate how Dedup works: 

You have a dataset with following rows:

[Row(name='Alice', age=5, height=80),
Row(name='Alice', age=5, height=80),
Row(name='Alice', age=10, height=80)]

Now if Dedup columns are [age, height], then Dedup processor would return below dataset:

[Row(name='Alice', age=5, height=80),
Row(name='Alice', age=10, height=80)]

and if Dedup columns are [name, height], then Dedup processor would return below dataset:

[Row(name='Alice', age=5, height=80)]

Data Cleansing Processor

The Data Cleansing Processor is used to cleanse the dataset using the metadata. To add a Data Cleansing Processor into your pipeline, drag the processor to the canvas and right click on it to configure:

Field

Description

Columns included while Extract Schema

Column names mentioned here will be used in the data cleansing process.

Connection Type

Select connection type from where the user wants to read the metadata files. The available connection types are RDS and S3.

Connection Name

Select the connection name to fetch the metadata file.

S3 Protocol

Select the S3 protocol from the drop down list. Below protocols are supported for various versions when user selects S3 connection type:

- For HDP versions, S3a protocol is supported.

- For CDH versions, S3a protocol is supported.

- For Apache versions, S3n protocol is supported.

- For GCP, S3n and S3a protocol is supported.

- For EMR S3, S3n, and S3a protocol is supported.

- For AWS Databricks, s3a protocol is supported.

Bucket Name

Provide the bucket name if the user selects S3 connection.

Path

Provide the path or sub-directories of the bucket name mentioned above to which the data is to be written in case the user has opted for S3 connection.

Schema Name

Select the schema name from the drop-down list in case the RDS connection is selected.

Table Name

Select the table name from the drop-down list in case the RDS connection is selected.

Note: Meta data should be in tabular form.

Feed ID

Provide the name of feed ID to be filtered out from metadata.

Remove Duplicate

User has an option to check-mark the checkbox to remove duplicate records.

Include Extra Input Columns

User has an option to check-mark the checkbox to include extra input columns.

Note:

User can add further configurations by clicking the ADD CONFIGURATION button.

DQM (Data Quality Management)

DQM stands for Data Quality Management. DQM processor defines the quality rules for data and performs necessary action on the faulty data.

Note: This processor does not work on Mozilla Firefox.

It performs series of validation on individual data field along with actions on functions, in case if applied validation fails. Varieties of validations are available for each Data Type such as Not Null, Contains, and Starts With and Ends With.

DQM processor has seven categories of Validation functions, one for each Data Type.

Data Type

1.Array

Note: Array represents the JSON (not nested) array of primitive types (String, Number, Double and Boolean)

2.String

3.Number

number_validation

4.Decimal

decimal_validation

5.Date

date_validation

6.Boolean

boolean-validation

7.Timestamp

timestamp_validation

8.Opening Braces and Closing Braces

Use these braces for adding or separating two or more expressions.

DQM processor has three Action functions that are triggered when Validation fails:

1.Send To Error –It persists the invalid records in configured error handler target and discard the records for further processing in pipeline. Right click the Send To Error button to open the error handler configuration. When the data validation fails, a json message is sent to the configured error handler target. This json has the key “errorMessage” in the error handler target, that contains the expressions due to which validation fails.

Note: The last Error Log Target configuration saved on Send to Error is where the configuration is saved.

2.Assign Value –It gives option to assign static or dynamic values to any message field.

a.Static Values:

Select any message field and apply validation on it. If the validation criteria is not met, assign static values to that field.

Example:

Let us apply validation on field “id” (id equals to 10).

Click on the Save button for saving the validation criteria.

Whenever the value of the field “id” is equal to “10”, the record will be processed as it is.

If the value of the field “id” is not equal to “10”, static value 100 will be assigned to that id.

Click on the Save button for saving the configuration settings.

Click Next and write the notes. The changes will be reflected in Auto Inspect window

We can see in the above screen wherever id value was not 10, the value got replaced with the static value 100.

b.Dynamic values:

• Dynamic String Values:

• Dynamic Numeric values

Dynamic String Values

Dynamic values can be assigned using @ (the rate operator).

Select any message field and apply validation on it. If the validation criteria is not met, assign dynamic values to that field.

Example:

Let us apply validation on field “id” (id equals to 10).

Wherever the value of the field id is equal to 10, the record will be processed as it is. If this validation criteria is not met, dynamic value should be assigned to that id.

You can dynamically assign the values of any other field like crop, soil, climate, pest, cost_per_hectare (message fields) to the field id wherever validation criteria is not met.

Write @ (the rate operator) in the values text box of Configuration Settings-Assign Value.

All the message fields will be displayed in the drop down list. Select the field whose value is to be dynamically assigned to field id.

For example, let us select field “soil” from the drop down list.

Write the expression as: @{soil}

The selected field value has to be enclosed within the curly braces.

After writing the expression, click on the Save button.

Wherever the value of the field id is 10, the record will be processed as it is.

Otherwise, the value of the field soil will be assigned to the id value in that column.

Click on the Save button for saving the configuration settings.

Click Next and write the notes. The changes will be reflected in Auto Inspect window

We can see in the below screen, wherever field id value was not 10, field soil got assigned to the id value.

Note: It is important to keep data type into consideration while assigning the dynamic values. The data type of two columns should be same for assigning the values except the String datatype.

Dynamic Numeric value: For numeric fields, mathematical operations can be used for assigning the values.

Example:

Let us apply validation on field “id” (id equals to 10).

Wherever the value of the field id is equal to 10, the record will be processed as it is. If this validation criteria is not met, assign dynamic value to the field id using arithmetic operators.

Let us apply simple mathematical operation on the field “cost_per_hect”.

Write the expression as @{cost_per_hect+100}

The expression will be evaluated as cost per hectare value will be added to 100. The output will be assigned to the field id wherever its value is not 10.

Click on the Save button for saving the configuration settings.

Click Next and write the notes. The changes will be reflected in Auto Inspect window.

Let us evaluate the columns “id and cost_per_hect”.

The value of the cost_per_hect in the second row is 7193, if 100 is added to it, the output is 7293 and this value gets assigned to the field id.

The value of the cost_per_hect in the third row is 7403, if 100 is added to tithe output is 7503, this value gets assigned to the field id.

Accordingly, all id values get updated except where the id is 10.

3.Discard –It discards the record for further processing in pipeline.

If a record is eligible for more than one action then Actions will be executed as per below order:

1.Send To Error (Highest priority)

2.Discard (Lowest priority)

Configuring DQM Processor:

To add a DQM processor into your pipeline, drag the DQM processor to the canvas and connect it to a Data Source or processor. Right click on it to configure.

On the configuration canvas of Data Quality processor, select fields, then drag and drop the Validation and Action functions.

Choose a field in message panel from Configuration, respective data type validations will be expanded in function panel.

After selecting the message, drag and drop the validation functions and connect them.

Connecting lines show the Logical operator, AND/OR over it.

By default, the AND operator is selected, but it can be toggled to OR operator via right click on it.

Most validation functions in Data Quality processor does not require user input such as Not Null, Is Empty, Upper Case etc, hence no error icon gets visible on top of them. There are some validation functions which requires user input and shows the error (red) icon for configuration.

Right click on the field (message) to configure the required input for validation function. Configuration panel also shows the Negate option so that same validation function can be utilized with reverse condition.

Note: The schema fields name should not match with any of the chosen function’s field name.

Example: For a schema field with name start date if a function is selected having a similar field name, then the value in the respective function’s field will get filled according to the schema field.
In such a scenario for an expression to be successful, the schema field name must be changed to not match the function field name.

Once all of the validations are configured, you can add an Action function at the end. This action function gets triggered if validation fails.

You can select multiple fields and define the validations.

Note: If you are using any Timestamp field for configuration, specify the value in following format:

“yyyy-MM-dd HH:mm:ss”

For Date field, use below format:

“yyyy-MM-dd”

Drools

Using Drools Processor we can perform rules based processing on incoming data using Drools Rule Engine.

To use this processor user needs to create a jar either manually or using Drools Workbench (Kie Jar) and upload it on the processor.

Configuring Drool processor for Spark pipelines

To add a Drools processor into your pipeline, drag the processor to the canvas and right click on it to configure.

Field

Description

Drool Source

Specify the Drool Jar source i.e. If drools jar is created using Workbench then select Kie JAR radio button else select Other JAR

Upload File

Upload the jar whose type is selected in drools source.

Select Files

If the Drools Source is Other JAR then this option will list all files relevant to Drool and found in the JAR file uploaded. Select the files that you want to use in pipeline.

Fact Class Names

This displays all the Fact classes auto detected from the Uploaded Jar file. Select the Fact classes you want to use in the pipeline. If the Fact Class Name is not displayed in drop-down that you want to use then you can manually type its fully qualified class name here.

Output Fact Class

This is the Final Fact class whose data will be moved to next component in the pipeline. Select the fact class that you want to process further.

ADD CONFIGURATION

Enables to configure additional properties.

Drop

Drop Processor removes the selected fields from the Data Sets, which means it will not be reflected in the Schema.

Configuring Drop processor for Spark pipelines

To add a Drop processor into your pipeline, drag the processor to the canvas and right click on it to configure

Field

Description

Fields

Message Fields which needs to filtered in output records.

Click NEXT and add notes if required.

Click SAVE for saving the configuration details.

Encoder

Fields are encoded to hide sensitive data.

Base64 is a group of binary-to-text encoding schemes that represent binary data in an ASCII string format by translating it into a radix-64 representation.

Base64 encoding schemes are commonly used when there is a need to encode binary data that needs be stored and transferred over media that are designed to deal with textual data. This is to ensure that the data remains intact without modification during transport.

Configuring Encoder for Spark pipelines

To add an Encoder processor into your pipeline, drag the processor to the canvas and right click on it to configure.

Field

Description

Output Field

The list of columns in which encoded value of selected column will be stored. New Column name can be added to this field (This column will be added to dataset.)

Scheme

The list of columns for which you want to Encode Field.

Input Field

Base64 is supported.

Add Field

Add multiple columns for encoding.

Click Next to get the schema detected.

Click Next to add Notes and save the configuration.

Encryption

Encryption processor allows data to be encrypted. The data coming out from a data source, which requires further processing and data security, this processor allows to hide the actual data during transformation or processing.

This is useful to hide or transform sensitive or confidential data such as credit card details, bank details, contact details, personal details etc. Encryption processor lets you to encrypt that sensitive information.

Configuring Encryption for Spark pipelines

To add an Encryption processor into your pipeline, drag the processor to the canvas and right click on it to configure.

Field

Description

Output Field

Encrypted data field.

Algorithm Type

Encryption algorithm used to encrypt data.

Input Field

The field to be encrypted.

Secret Key

A valid secret key used to encrypt data.

Key Format

Format of the secret key, which is hexadecimal or Base64 encoded.

It is important to encode the binary data to ensure it to be intact without modification when it is stored or transferred.

Eviction Processor

The eviction processor can delete data from the containers which are configured in the JDBC Container processors. The pipeline with JDBC container processor will have retention enabled.

You can use an Eviction processor to view and delete the retention data from JDBC oracle containers.

There are a few prerequisites of an Eviction Processor

• One of the pipeline contains JDBC container processor with retention enabled. To do so, create a pipeline with JDBC container processor and set retention policy.

• A successful Oracle database connection – Eviction processor works only for Oracle database.

• A configured dummy channel.

Configuring Eviction processor for Spark pipelines

To add an Eviction processor into your pipeline, drag the processor to the canvas and right click on it to configure.

The details which are configured here will be used by the eviction processor to delete the data which includes:

• Table Name (Container)

• Retention Policy

• Retention Column

• Record Limit

• Grouping Field.

Click on the NEXT button.

Click on the Save button If you do not wish to create a new template.

Use data eviction processor to delete the data according to the retention policy from the respective Oracle JDBC container.

For data eviction, select data eviction processor and attach to configured dummy channel.

Select the oracle connection containing the required containers.

The list of Oracle JDBC containers (tables) will be shown on the UI.

Select or deselect the JDBC container as per the requirement.

Data eviction Processor

The data of the selected containers will be deleted according to the retention information (policy) configured for the respective container.

Expression Evaluator

This processor is responsible for performing transformation operations on incoming dataset, e.g., replace, format, trim, uppercase, lowercase, etc. It uses spark expression language for preparing transformation query.

Configuring Expression Evaluator for Spark pipelines

To add an Expression Evaluator processor into your pipeline, drag the processor to the canvas and right click on it to configure.

Field

Description

Expression

Provide SQL expression using SparkSQL functions, that evaluates to a column. Evaluated value can be updated to an existing column or can be assigned to a new column.

Note: Enclose string literal/constant values inside single quotes e.g. ‘John’, ‘Suite 130 Los Gatos, CA 95032, US’.

Validate

Validate the expressions applied on the column.

All the columns of the schema are populated and you can apply any transformation on it. The functions that can be applied are all listed in the right table, as shown in the figure.

Note: Refer Expression Evaluator section in Data Preparation section.

Enlisted Functions:

Field

Description

ABS

Description

Computes the absolute value.

Parameters

arg0: The column for which absolute value to be calculated.

Returns

Returns the computed absolute value.

Throws

ApplicationException

Example

abs(77.76) will return 77.76

Field

Description

ACOS

Description

Computes the cosine inverse of the given value; the returned angle is in the range 0.0 through pi.

Parameters

arg0: The column for which cosine inverse to be calculated.

Returns

Returns the computed cosine inverse in the range 0.0 through pi.

Throws

ApplicationException

Example

acos(0.45) will return 1.104031001096478

Field

Description

ADD_MONTHS

Description

Computes the date that is 'arg1' after 'arg0'

Parameters

arg0: The date to which months to be added.

arg1: No of months to be added.

Returns

Returns the date that is numMonths after startDate

Throws

ApplicationException

Example

add_months("2009-03-01",2) will return "2009-05-01"

Field

Description

ARRAY

Description

Returns an array with the given elements

Parameters

arg0: The given columns to create array column

Returns

Returns an array with the given elements

Throws

ApplicationException

Example

array(1, 2, 3) will return [1,2,3]

Field

Description

ARRAY_CONTAINS

Description

Returns TRUE if the array contains value.

Parameters

arg0:An array column

arg1:A value to be checked

Returns

A boolean true/false

Throws

ApplicationException

Example

We have taken column1.colors as ["black","red"]

array_contains(@{column.schema.column1.colrs},"red") will return true

Field

Description

ARRAY_DISTINCT

Description

Removes duplicate values from the array

Parameters

arg0: The given array column

Returns

Returns the array with duplicate values removed

Throws

ApplicationException

Example

array_distinct(array(1, 2, 3, null, 3)) will return [1,2,3,null]

Field

Description

ARRAY_EXCEPT

Description

Returns an array of the elements in array1 but not in array2, without duplicates

Parameters

arg0: First array column

arg1: Second array column

Returns

Returns an array of the elements in array1 but not in array2, without duplicates

Throws

ApplicationException

Example

array_except(array(1, 2, 3), array(1, 3, 5)) will return [2]

Field

Description

ARRAY_INTERSECT

Description

Performs intersection of array1 and array2, without duplicates.

Parameters

arg0: First array column

arg1: Second array column

Returns

Returns an array of the elements in the intersection of array1 and array2, without duplicates

Throws

ApplicationException

Example

array_intersect(array(1, 2, 3), array(1, 3, 5)) will return [1,3]

Field

Description

ARRAY_JOIN

Description

Concatenates the elements of the given array using the delimiter and an optional string to replace nulls. If no value is set for nullReplacement, any null value is filtered

Parameters

arg0: array column

arg1: delimiter

arg2: nullReplacement

Returns

Returns the concatenated array

Throws

ApplicationException

Example

array_join(array('hello', null ,'world'), ' ', ',') will return hello , world

Field

Description

ARRAY_MAX

Description

Returns the maximum value in the array. NULL elements are skipped

Parameters

arg0: The array column

Returns

Returns the maximum value in the array. NULL elements are skipped

Throws

ApplicationException

Example

array_max(array(1, 20, null, 3)) will return 20

Field

Description

ARRAY_MIN

Description

Returns the minimum value in the array. NULL elements are skipped

Parameters

arg0: The array column

Returns

Returns the minimum value in the array. NULL elements are skipped

Throws

ApplicationException

Example

array_min(array(1, 20, null, 3)) will return 1

Field

Description

ARRAY_POSITION

Description

Returns the (1-based) index of the first element of the array as long

Parameters

arg0: The array column

arg1: The position

Returns

Returns the (1-based) index of the first element of the array as long

Throws

ApplicationException

Example

array_position(array(3, 2, 1), 1) will return 3

Field

Description

array_remove

Description

Remove all elements that equal to element from array.

Parameters

arg0: The array column.

arg1: The position.

Returns

Returns the array with elements removed.

Throws

ApplicationException

Example

array_remove(array(1, 2, 3, null, 3), 3) will return [1,2,null]

Field

Description

array_repeat

Description

Returns the array containing element count times.

Parameters

arg0: The array column.

arg1: The count.

Returns

Returns the array containing element count times.

Throws

ApplicationException

Example

array_repeat('123', 2) will return ["123","123"]

Field

Description

array_sort

Description

Sorts the input array in ascending order. The elements of the input array must be orderable. Null elements will be placed at the end of the returned array.

Parameters

arg0: The array column.

Returns

Returns the sorted array.

Throws

ApplicationException

Example

array_sort(array('b', 'd', null, 'c', 'a')) will return ["a","b","c","d",null]

Field

Description

array_union

Description

Returns an array of the elements in the union of array1 and array2, without duplicates.

Parameters

arg0: The first array column.

arg1: The second array column.

Returns

Returns an array of the elements in the union of array1 and array2, without duplicates.

Throws

ApplicationException

Example

array_union(array(1, 2, 3), array(1, 3, 5)) will return [1,2,3,5]

Field

Description

arrays_overlap

Description

Returns true if a1 contains at least a non-null element present also in a2. If the arrays have no common element and they are both non-empty and either of them contains a null element null is returned, false otherwise.

Parameters

arg0: The first array column.

arg1: The second array column.

Returns

Returns true or false.

Throws

ApplicationException

Example

arrays_overlap(array(1, 2, 3), array(3, 4, 5)) will return true

Field

Description

arrays_zip

Description

Returns a merged array of structs in which the N-th struct contains all N-th values of input arrays.

Parameters

arg0: The Columns to be zipped.

Returns

Returns a merged array of structs in which the N-th struct contains all N-th values of input arrays.

Throws

ApplicationException

Example

arrays_zip(array(1, 2, 3), array(2, 3, 4)) Will return [{"0":1,"1":2},{"0":2,"1":3},{"0":3,"1":4}]

Field

Description

ascii

Description

Computes the numeric value of the first character of the string column, and returns the result as an int column.

Parameters

arg0: The string for which the first character's numeric value to be calculated.

Returns

Returns the ascii value as an int column.

Throws

ApplicationException

Example

ascii("An apple") will return 65

Field

Description

asin

Description

Computes the sine inverse of the given value; the returned angle is in the range -pi/2 through pi/2

Parameters

arg0: The column for which sine inverse to be calculated.

Returns

Returns the computed sine inverse in the range -pi/2 through pi/2.

Throws

ApplicationException

Example

asin(0.45) will return 0.4667653256984187

Field

Description

atan

Description

Computes the tangent inverse of the given value.

Parameters

arg0: The column for which tangent inverse to be calculated

Returns

Returns the computed the tangent inverse.

Throws

ApplicationException

Example

atan(0.45) will return 0.42285391621948626

Field

Description

atan2

Description

Computes the angle theta from the conversion of rectangular coordinates (arg0, arg1) to polar coordinates (arg1, theta).

Parameters

arg0: The x rectangular coordinate.

arg1: The y rectangular coordinate.

Returns

Returns the computed angle theta.

Throws

ApplicationException

Example

atan2(12, 71.21) will return 1.403849169952035

Field

Description

base64

Description

Converts the argument from a binary bin to a base 64 string.

Parameters

arg0: The Column to be converted to base64 string.

Returns

Returns the base64 string.

Throws

ApplicationException

Example

base64('Spark SQL') will return U3BhcmsgU1FM

Field

Description

bigint

Description

Casts the value expr to the target data type bigint.

Parameters

arg0:The column or expression.

Returns

Returns the bigint column or value.

Throws

ApplicationException

Example

SELECT bigint(column); will cast column to bigint

Field

Description

bin

Description

Returns the string representation of the long value expr represented in binary

Parameters

arg0: The numerical Column to be converted to represented in binary.

Returns

Returns the binary representation.

Throws

ApplicationException

Example

bin(13) will return 1101

Field

Description

boolean

Description

Casts the value expr to the target data type boolean.

Parameters

arg0:The column or expression.

Returns

Returns the boolean column or value.

Throws

ApplicationException

Example

SELECT boolean(expr); will cast expression to boolean

Field

Description

bround

Description

Computes the value of the column arg0 rounded to 0 decimal places with HALF_EVEN round mode.

Parameters

arg0: The column for which value rounded to 0 decimal places with HALF_EVEN round mode to be calculated.

Returns

Returns the computed value.

Throws

ApplicationException

Example

bround(71.21) will return 71.0

Field

Description

cast

Description

Casts the value expr to the target data type

Parameters

arg0:The column or expression.

Returns

Returns the targeted type

Throws

ApplicationException

Example

SELECT cast('10' as int); will cast 10 to int

Field

Description

cbrt

Description

Computes the cube-root of the given value.

Parameters

arg0: The column for which cube-root to be calculated

Returns

Returns the computed cube-root.

Throws

ApplicationException

Example

cbrt(80.89) will return 4.324789202233814

Field

Description

ceil

Description

Computes the ceiling of the given value.

Parameters

arg0: The column for which ceiling to be calculated.

Returns

Returns the computed ceiling.

Throws

ApplicationException

Example

ceil(77.76) will return 4.2682720044742055

Field

Description

ceiling

Description

Computes the ceiling of the given value.

Parameters

arg0: The column for which ceiling to be calculated.

Returns

Returns the computed ceiling.

Throws

ApplicationException

Example

SELECT ceiling(-0.1); will return 0

Field

Description

char

Description

Converts the ASCII value to equivalent character. If n is larger than 256 the result is equivalent to chr(n % 256).

Parameters

arg0: The ASCII value.

Returns

Returns the ASCII character having the binary equivalent to expr.

Throws

ApplicationException

Example

SELECT char(65); will return A

Field

Description

char_length

Description

Calculates the character length of string data or number of bytes of binary data. The length of string data includes the trailing spaces. The length of binary data includes binary zeros.

Parameters

arg0: The string column or expression.

Returns

Returns the character length of string data or number of bytes of binary data.

Throws

ApplicationException

Example

SELECT char_length('Spark SQL '); will return 10

Field

Description

chr

Description

Converts the ASCII character having the binary equivalent to expr. If n is larger than 256 the result is equivalent to chr(n % 256)

Parameters

arg0: The ASCII value.

Returns

Returns the ASCII character having the binary equivalent to expr.

Throws

ApplicationException

Example

SELECT chr(65); will return A

Field

Description

coalesce

Description

Returns the first non-null argument if exists. Otherwise, null.

Parameters

arg0:columns representing expressions.

Returns

Returns the first non-null argument if exists. Otherwise, null.

Throws

ApplicationException

Example

coalesce(NULL, 1, NULL) will return 1

Field

Description

concat

Description

Concatenates multiple input string columns together into a single string column.

Parameters

arg0: The String columns to be concatenated.

Returns

Returns the concatenated string as a single string column.

Throws

ApplicationException

Example

concat("format","string") will return "formatstring"

Field

Description

concat_ws

Description

Concatenates multiple input string columns together into a single string column, using the given separator.

Parameters

arg0: The separator to be used.

arg1: The String columns to be concatenated.

Returns

Returns the concatenated strings using the given separator as a single string column.

Throws

ApplicationException

Example

concat_ws("-","format","string") will return "format-string"

Field

Description

conv

Description

Converts a number from a given base to another.

Parameters

arg0:A number/String number column

arg1:Integer value of base from which a number is to be converted

arg2:Integer value of base to which a number is to be converted

Returns

A string value.

Throws

ApplicationException

Example

We have taken column1 as '258'

conv(@{column.schema.column1},10,2) will return 100000010

Field

Description

cos

Description

Computes the cosine of the given value.

Parameters

arg0: The column for which cosine to be calculated.

Returns

Returns the computed cosine.

Throws

ApplicationException

Example

cos(76.56) will return 0.3977126102073901

Field

Description

cosh

Description

Computes the hyperbolic cosine of the given value.

Parameters

arg0: The column for which hyperbolic cosine to be calculated.

Returns

Returns the computed hyperbolic cosine.

Throws

ApplicationException

Example

cosh(71.21) will return -0.5004897466536994

Field

Description

crc32

Description

Computes a cyclic redundancy check value for string.

Parameters

arg0:A string argument

Returns

bigint value

Throws

Application Exception

Example

We have taken column1 as 'ABC'

crc32(@{column.schema_id.column1}) will return 2743272264

Field

Description

current_date

Description

Computes the current date as a date column.

Parameters

-

Returns

Returns the current date as a date column

Throws

ApplicationException

Example

current_date() will return the current date.

Field

Description

current_timestamp

Description

Computes the current timestamp as a timestamp column.

Parameters

-

Returns

Returns the current timestamp as a timestamp column.

Throws

ApplicationException

Example

current_timestamp() will return the current timestamp.

Field

Description

date_add

Description

Computes the date that is 'arg1' days after start date

Parameters

arg0: The date to which days to be added.

arg1: No of days to be added.

Returns

Returns the computed date.

Throws

ApplicationException

Example

date_add("2009-03-01",2) will return "2009-03-03"

Field

Description

date_format

Description

Converts a date/timestamp/string to a value of string in the format specified by the date format given by the second argument.

Parameters

arg0: The date/timestamp/string to be converted.

arg1: The format to which the date/timestamp/string to be converted.

Returns

Returns the converted string.

Throws

ApplicationException

Example

date_format("2009-03-01","MM-dd-yyyy") will return "03-01-2009"

Field

Description

date_sub

Description

Computes the date that is 'arg1' days before start date.

Parameters

arg0: The date to which days to be substracted.

arg1: No of days to be substracted.

Returns

Returns the computed date.

Throws

ApplicationException

Example

date_sub("2009-03-02",1) will return "2009-03-01"

Field

Description

date_trunc

Description

Returns timestamp ts truncated to the unit specified by the format model fmt. fmt should be one of ["YEAR" "YYYY" "YY" "MON" "MONTH" "MM" "DAY" "DD" "HOUR" "MINUTE" "SECOND" "WEEK" "QUARTER"]

Parameters

arg0: The fmt format.

arg1: The ts timestamp.

Returns

Returns timestamp ts truncated to the unit specified by the format model fmt.

Throws

ApplicationException

Example

date_trunc('YEAR', '2015-03-05T09:32:05.359') will return 2015-01-01 00:00:00

Field

Description

datediff

Description

Computes the number of days from 'arg0' date to 'arg1' date.

Parameters

arg0: The end date.

arg1: The start date.

Returns

Returns the computed number of days.

Throws

ApplicationException.

Example

datediff("2009-03-01","2009-02-27") will return 2

Field

Description

day

Description

Extracts and returns the day of month of the given date/timestamp.

Parameters

arg0: The column or expression.

Returns

Returns the day of month of the date/timestamp.

Throws

ApplicationException

Example

SELECT day('2009-07-30'); will return 30.

Field

Description

dayofmonth

Description

Extracts the day of the month as an integer from a given date/timestamp/string.

Parameters

arg0: The date/timestamp/string from which the day of month to be extracted.

Returns

Returns the extracted day as an integer.

Throws

ApplicationException

Example

dayofmonth("2009-03-01") will return 1

Field

Description

dayofweek

Description

Returns the day of the week for date/timestamp (1 = Sunday).

Parameters

arg0: The date column.

Returns

Returns the day of the week for date/timestamp

Throws

ApplicationException

Example

Dayofweek('2009-07-30') will return 5

Field

Description

dayofyear

Description

Extracts the day of the year as an integer from a given date/timestamp/string.

Parameters

arg0: The date/timestamp/string from which the day of year to be extracted.

Returns

Returns the extracted day as an integer.

Throws

ApplicationException

Example

dayofyear("2017-12-15") will return 349

Field

Description

decimal

Description

Casts the value expr to the target data type decimal.

Parameters

arg0:The column or expression.

Returns

Returns the decimal column or value.

Throws

ApplicationException

Example

SELECT decimal(column); will cast column to decimal

Field

Description

decode

Description

Decodes the first argument using the second argument character set.

Parameters

arg0:Column to be decoded.

arg1: The charset.

Returns

Returns the decoded column.

Throws

ApplicationException

Example

decode(encode('abc', 'utf-8'), 'utf-8') will return abc

Field

Description

degrees

Description

Converts an angle measured in radians to an approximately equivalent angle measured in degrees.

Parameters

arg0: The column for which the equivalent angle measured in degrees to be calculated.

Returns

Returns the converted angle measured in degrees.

Throws

ApplicationException.

Example

degrees(71.21) will return 4080.0324066707394

Field

Description

dense_rank

Description

Computes the rank of a value in a group of values. Unlike the function rank, dense_rank will not produce gaps in the ranking sequence.

Parameters

arg0:Not applicable

Returns

The calculated dense rank.

Throws

ApplicationException.

Example

select dense_rank() OVER (order by col) will return 1,2,3,4...

Field

Description

element_at

Description

For Array Column Returns element of array at given (1-based) index. If index < 0, accesses elements from the last to the first. For map column,Returns NULL if the index exceeds the length of the array.

Parameters

arg0:Array or map column.

arg1: index or keyt.

Returns

Returns the element.

Throws

ApplicationException.

Example

element_at(array(1, 2, 3), 2) will return 2 and element_at(map(1, 'a', 2, 'b'), 2) will return b

Field

Description

encode

Description

Encodes the first argument using the second argument character set.

Parameters

arg0:Column to be encoded.

arg1: The charset.

Returns

Returns the encoded column.

Throws

ApplicationException.

Example

encode('abc', 'utf-8') will return abc

Field

Description

exp

Description

Computes the exponential of the given value.

Parameters

arg0: The column for which exponential to be calculated.

Returns

Returns the computed exponential.

Throws

ApplicationException.

Example

exp(0.78) will return 2.18147220308578

Field

Description

explode

Description

Separates the elements of array expr into multiple rows, or the elements of map expr into multiple rows and columns.

Parameters

arg0: The expr Column.

Returns

Returns the exploded column.

Throws

ApplicationException.

Example

explode(array(10, 20)) will return 10, 20 in a new column.

Field

Description

explode_outer

Description

Separates the elements of array expr into multiple rows, or the elements of map expr into multiple rows and columns.

Parameters

arg0: The expr Column.

Returns

Returns the exploded column.

Throws

ApplicationException.

Example

explode_outer(array(10, 20)) will return 10, 20

Field

Description

expm1

Description

Computes the exponential of the given value minus one.

Parameters

arg0: The column for which exponential minus one to be calculated.

Returns

Returns the computed value.

Throws

ApplicationException.

Example

expm1(0.23) will return 0.2586000151807663

Field

Description

expr

Description

Parses the expression string into the column that it represents.

Parameters

arg0: The expression string to be parsed.

Returns

Returns the parsed expression string.

Throws

ApplicationException.

Example

expr("colA", "colB as newName") will return two columns colA and newName

Field

Description

factorial

Description

Computes the factorial of the given value.

Parameters

arg0: The column for which factorial to be calculated.

Returns

Returns the computed factorial.

Throws

ApplicationException.

Example

Factorial(11) will return 39916800

Field

Description

flatten

Description

Transforms an array of arrays into a single array.

Parameters

arg0: The array of array Column.

Returns

Returns the flatten array.

Throws

ApplicationException.

Example

flatten(array(array(1, 2), array(3, 4))) will return [1,2,3,4]

Field

Description

float

Description

Casts the value expr to the target data type float.

Parameters

arg0:The column or expression.

Returns

Returns the float column or value.

Throws

ApplicationException

Example

SELECT float(column); will cast column to float

Field

Description

floor

Description

Computes the floor of the given value.

Parameters

arg0: The column for which floor to be calculated.

Returns

Returns the computed floor.

Throws

ApplicationException.

Example

floor(71.21) will return 71

Field

Description

format_number

Description

Formats numeric column arg0 to a format like '#,###,###.##', rounded to arg1 decimal places, and returns the result as a string column.

Parameters

arg0: The column to be formated.

arg1: The integer specifying the decimal places to be used for rounding.

Returns

Returns the formated result as a string column.

Throws

ApplicationException.

Example

format_number(7120.12, 1) will return 7,120.1

Field

Description

format_string

Description

Formats the arguments in printf-style and returns the result as a string column.

Parameters

arg0: The pintf style format.

arg1: The columns to be formatted.

Returns

Returns the formated arguments as a string column.

Throws

ApplicationException.

Example

We have taken column1 as "cow" , column2 as "moon" and column3 as 2

format_string("the %s jumped over the %s, %d times",@{column.schema.column1},@{column.schema.column2},@{column.schema.column3}) will return "the cow jumped over the moon 2 times"

Field

Description

from_json

Description

Returns a struct value with the given jsonStr and schema.

Parameters

arg0:The Json string column.

arg1: The schema column.

arg2: The properties map.

Returns

Returns the struct value.

Throws

ApplicationException.

Example

from_json('{"a":1, "b":0.8}', 'a INT, b DOUBLE') will return {"a":1, "b":0.8} and from_json('{"time":"26/08/2015"}', 'time Timestamp', map('timestampFormat', 'dd/MM/yyyy'))will return {"time":"2015-08-26 00:00:00.0"}

Field

Description

from_unixtime

Description

Converts the number of seconds from unix epoch (1970-01-01 00:00:00 UTC) to a string representing the timestamp of that moment in the current system time zone in the given format.

Parameters

arg0: The number of seconds from unix epoch.

arg1: The format for timestamp in current system timezone to which conversion has to be done.

Returns

Returns the converted string.

Throws

ApplicationException.

Example

from_unixtime(1255033470,"yyyy-dd-MM") will return 2009-09-10

Field

Description

from_utc_timestamp

Description

Given a timestamp like '2017-07-14 02:40:00.0', interprets it as a time in UTC, and renders that time as a timestamp in the given time zone.

Parameters

arg0:timestamp column.

arg1: Timezone column.

Returns

Returns the timestamp.

Throws

ApplicationException.

Example

from_utc_timestamp('2016-08-31', 'Asia/Seoul') will return 2016-08-31 09:00:00

Field

Description

get_json_object

Description

Extracts a json object from path.

Parameters

arg0:The json txt column.

arg1: the path.

Returns

Returns the extracted json object.

Throws

ApplicationException.

Example

get_json_object('{"a":"b"}', '$.a') will return b

Field

Description

greatest

Description

It gives the greatest value of the list of values.This function takes at least 2 parameters.

Parameters

arg0:A column from the schema

arg1:A column from the schema

Returns

A Column.

Throws

ApplicationException.

Example

We have taken column1 as '258', column2 as '259'

greatest(@{column.schema.column1},@{column.schema.column2}) will return column2's values

Field

Description

hash

Description

Returns a hash value of the arguments.

Parameters

arg0:The columns for which hash to be calculated.

Returns

Returns a hash value of the arguments.

Throws

ApplicationException.

Example

hash('Spark', array(123), 2) will return -1321691492

Field

Description

hex

Description

If the argument is an INT, hex returns the number as a STRING in hexadecimal format. Otherwise if the number is a STRING, it converts each character into its hexadecimal representation and returns the resulting STRING.

Parameters

arg0:A int/string column

Returns

A string value.

Throws

ApplicationException.

Example

We have taken column1 as 258,

hex(@{column.schema.column1}) will return 102

Field

Description

hour

Description

Extracts the hours as an integer from a given date/timestamp/string.

Parameters

arg0: The date/timestamp/string from which the hours to be extracted.

Returns

Returns the extracted hours as an integer.

Throws

ApplicationException.

Example

hour("2017-12-15 11:02:03") will return 11

Field

Description

hypot

Description

Computes sqrt(arg0^2^ + arg1^2^) without intermediate overflow or underflow.

Parameters

arg0: Will be used while computing sqrt.

arg1: Will be used while computing sqrt.

Returns

Returns the computed sqrt(arg0^2^ + arg1^2^).

Throws

ApplicationException.

Example

hypot(71.21, 10.5) will return 71.97995533209642

Field

Description

ifnull

Description

Returns expr2 if expr1 is null, or expr1 otherwise.

Parameters

arg0:The first column expression.

arg1:The second column expression.

Returns

Returns the binary.

Throws

ApplicationException.

Example

SELECT nvl(NULL, array('2')); will return ["2"]

Field

Description

initcap

Description

Computes a new string column by converting the first letter of each word to uppercase.

Parameters

arg0: The input string.

Returns

Returns the converted string column.

Throws

ApplicationException.

Example

initcap("apple") will return "Apple"

Field

Description

input_file_name

Description

Returns the name of the file being read, or empty string if not available.

Parameters

-

Returns

Returns the name of the file being read, or empty string if not available.

Throws

ApplicationException.

Example

input_file_name() - will return the name of the file being read

Field

Description

instr

Description

Locate the position of the first occurrence of given substring in the given string column.

Parameters

arg0: The string column in which the location to be determined.

arg1: The substring for which the position to be determined.

Returns

Returns the position of the first occurrence of substring.

Throws

ApplicationException.

Example

instr("apple","le") will return 4

Field

Description

int

Description

Casts the value expr to the target data type int.

Parameters

arg0:The column or expression.

Returns

Returns the int column or value.

Throws

ApplicationException.

Example

SELECT int(column); will cast column to int

Field

Description

isnan

Description

Returns true if the input is Not a Number.

Parameters

arg0:A column whose values needs to be checked

Returns

A boolean true if the value is not a number

Throws

Application Exception.

Example

We have taken column1 as 'abc' isnan(@{column.schema.column1}) will return true

Field

Description

isnotnull

Description

Checks if the given expression is not null.

Parameters

arg0:The column or expression.

Returns

Returns true if expr is not null, or false otherwise.

Throws

ApplicationException.

Example

SELECT isnotnull(1); will return true.

Field

Description

isnull

Description

Returns true if a is NULL and false otherwise.

Parameters

arg0:A column whose values needs to be checked

Returns

A boolean true if the value is null.

Throws

Application Exception.

Example

We have taken column1 as 'abc' isnull(@{column.schema.column1}) will return false

Field

Description

lag

Description

Returns the value of input at the offsetth row before the current row in the window. The default value of offset is 1 and the default value of default is null. If the value of input at the offsetth row is null, null is returned. If there is no such offset row (e.g., when the offset is 1, the first row of the window does not have any previous row), default is returned.

Parameters

arg0:The input - a string expression to evaluate offset rows before the current row.

arg1:The offset- an int expression which is rows to jump back in the partition.

arg1:The default- a string expression which is to use when the offset row does not exist.

Returns

Returns the value of input at the offsetth row before the current row in the window.

Throws

ApplicationException.

Example

select lag(col, 1) OVER (order by col)

Field

Description

last_day

Description

Given a date column, returns the last day of the month which the given date belongs to.

Parameters

arg0: The date from which last day of month to be extracted.

Returns

Returns the computed last day.

Throws

ApplicationException.

Example

last_day("2017-12-15") will return "2017-12-31"

Field

Description

last_value

Description

Returns the last value of expr for a group of rows. If isIgnoreNull is true, returns only non-null values.

Parameters

arg0:The column or expression.

arg1:The isIgnoreNull.

Returns

Returns the last value of expr.

Throws

ApplicationException.

Example

SELECT last_value(col) FROM VALUES (10), (5), (20) AS tab(col); will return 20

Field

Description

lcase

Description

Converts str with all characters changed to lowercase.

Parameters

arg0:The column or expression.

Returns

Returns str with all characters changed to lowercase.

Throws

ApplicationException

Example

SELECT lcase('SparkSql'); will return sparksql

Field

Description

lead

Description

Returns the value of input at the offsetth row after the current row in the window. The default value of offset is 1 and the default value of default is null. If the value of input at the offsetth row is null, null is returned. If there is no such an offset row (e.g., when the offset is 1, the last row of the window does not have any subsequent row), default is returned.

Parameters

arg0:The input- a string expression to evaluate offset rows after the current row.

arg1:The offset- an int expression which is rows to jump ahead in the partition.

arg1:The default- a string expression which is to use when the offset is larger than the window. The default value is null.

Returns

Returns the value of input at the offsetth row before the current row in the window.

Throws

ApplicationException

Example

select lead(col, 1) OVER (order by col)

Field

Description

least

Description

It gives the least value of the list of values.This function takes at least 2 parameters.

Parameters

arg0:A column from the schema

arg1:A column from the schema

Returns

A column.

Throws

ApplicationException

Example

We have taken column1 as '258', column2 as '259'

least(@{column.schema.column1},@{column.schema.column2}) will return column1's values

Field

Description

length

Description

Computes the length of a given string or binary column.

Parameters

arg0: The string for which the length to be determined.

Returns

Returns the computed length.

Throws

ApplicationException

Example

length("apple") will return 5

Field

Description

levenshtein

Description

Computes the Levenshtein distance of the two given string columns.

Parameters

arg0: The first string column from which the Levenshtein distance from the second string column to be determined.

arg1: The second string column.

Returns

Returns the computed Levenshtein distance.

Throws

ApplicationException

Example

levenshtein("kitten", "sitting") will return 3

Field

Description

like

Description

str like pattern - Returns true if str matches pattern, null if any arguments are null, false otherwise.

Parameters

arg0:A string expression.

arg1:The pattern string which is matched.

Returns

Returns true, false or null.

Throws

ApplicationException

Example

SELECT '%SystemDrive%UsersJohn' like '%SystemDrive%Users%'; will return true

Field

Description

ln

Description

Computes the natural logarithm of the given value.

Parameters

arg0: The column for which natural logarithm to be calculated.

Returns

Returns the computed natural logarithm.

Throws

ApplicationException

Example

ln(20) will return 2.995732273553991

Field

Description

locate

Description

Locate the position of the first occurrence of given substring in a string column, after the given position.

Parameters

arg0: The String for which the location to be determined.

arg1: The string in which the location to be determined.

arg2: The position after which the location to be determined.

Returns

Returns the position of the first occurrence of substring.

Throws

ApplicationException

Example

locate("apple","An apple",1) will return 3

Field

Description

log

Description

Computes the natural logarithm of the given value.

Parameters

arg0: The column for which natural logarithm to be calculated.

Returns

Returns the computed natural logarithm.

Throws

ApplicationException

Example

log(20) will return 2.995732273553991

Field

Description

log10

Description

Computes the logarithm with base 10 of the given value.

Parameters

arg0: The column for which logarithm to be calculated.

Returns

Returns the computed logarithm with base 10.

Throws

ApplicationException

Example

log10(10) will return 1

Field

Description

log1p

Description

Computes the natural logarithm of the given value plus one.

Parameters

arg0: The column for which natural logarithm plus one to be calculated.

Returns

Returns the computed natural logarithm plus one.

Throws

ApplicationException

Example

log1p(20) will return 3.044522437723423

Field

Description

log2

Description

Computes the logarithm with base 2 of the given value.

Parameters

arg0: The column for which logarithm to be calculated.

Returns

Returns the computed logarithm with base 2.

Throws

ApplicationException

Example

log2(2) will return 1

Field

Description

lower

Description

Converts a string column to lower case.

Parameters

arg0: The string column to be converted to lower case.

Returns

Returns the converted string.

Throws

ApplicationException

Example

lower("APple") will return "apple"

Field

Description

lpad

Description

Left-pad the string column with the given string, to a given length.

Parameters

arg0: The string column to be left-padded.

arg1: The length for the padding.

arg2: The string to used for left-pad.

Returns

Returns the Left-padded string.

Throws

ApplicationException

Example

lpad("SQL Tutorial", 20, "ABC") will return "ABCABCABSQL Tutorial"

Field

Description

ltrim

Description

Removes the leading string contains the characters from the trim string.

Parameters

arg0:the trim string characters to trim, the default value is a single space.

arg1: a string expression.

Returns

Returns the trimed string.

Throws

ApplicationException

Example

ltrim('Sp','SsparkSQLS')will return ArkSQLS

Field

Description

map

Description

Creates a map with the given key/value pairs.

Parameters

arg0:The columns for key and value.

Returns

returns the map.

Throws

ApplicationException

Example

map(1.0, '2', 3.0, '4') will return {1.0:"2",3.0:"4"}

Field

Description

map_concat

Description

Returns the union of all the given maps

Parameters

arg0:The map columns.

Returns

Returns the union of all the given maps

Throws

ApplicationException

Example

map_concat(map(1, 'a', 2, 'b'), map(2, 'c', 3, 'd')) will return {1:"a",2:"c",3:"d"}

Field

Description

map_from_arrays

Description

Creates a map with a pair of the given key/value arrays. All elements in keys should not be null.

Parameters

arg0:Array of keys.

arg1:Array of values.

Returns

Returns the map.

Throws

ApplicationException

Example

map_from_arrays(array(1.0, 3.0), array('2', '4')) will return {1.0:"2",3.0:"4"}

Field

Description

map_from_entries

Description

Returns a map created from the given array of entries.

Parameters

arg0:Array of entries.

Returns

Returns the map.

Throws

ApplicationException

Example

map_from_entries(array(struct(1, 'a'), struct(2, 'b'))) will return {1:"a",2:"b"}

Field

Description

map_keys

Description

Returns an unordered array containing the keys of the map.

Parameters

arg0:Map column.

Returns

Returns the array.

Throws

ApplicationException

Example

map_keys(map(1, 'a', 2, 'b')) will return [1,2]

Field

Description

map_values

Description

Returns an unordered array containing the values of the map.

Parameters

arg0:Map column.

Returns

Returns the array.

Throws

ApplicationException

Example

map_values(map(1, 'a', 2, 'b')) will return ["a","b"]

Field

Description

md5

Description

Calculates an MD5 128-bit checksum for the string.

Parameters

arg0:A string column

Returns

The value is returned as a string of 32 hex digits, or NULL if the argument was NULL.

Throws

ApplicationException

Example

md5(@{column.schema.column1}) will give you an output '902fbdd2b1df0c4f70b4a5d23525e932' if value of column1 is 'ABC'.

Field

Description

minute

Description

Extracts the minutes as an integer from a given date/timestamp/string.

Parameters

arg0: The date/timestamp/string from which the minutes to be extracted.

Returns

Returns the extracted minutes as an integer.

Throws

ApplicationException

Example

minute("2017-12-15 11:02:03") will return 2

Field

Description

mod

Description

Calculated the remainder of the given expressions.

Parameters

arg0:The first column expression.

arg1:The second column expression.

Returns

Returns the remainder after expr1/expr2.

Throws

ApplicationException

Example

SELECT MOD(2, 1.8); will return 0.2

Field

Description

monotonically_increasing_id

Description

A column expression that generates monotonically increasing 64-bit integers.

Parameters

-

Returns

Monotonically increasing integers.

Throws

ApplicationException

Example

monotonically_increasing_id() will return rows as 0,1,2...

Field

Description

month

Description

Extracts the month as an integer from a given date/timestamp/string.

Parameters

arg0: The date/timestamp/string from which the month to be extracted.

Returns

Returns the extracted month as an integer.

Throws

ApplicationException

Example

month("2017-12-15 11:02:03") will return 12

Field

Description

months_between

Description

If timestamp1 is later than timestamp2, then the result is positive. If timestamp1 and timestamp2 are on the same day of month, or both are the last day of month, time of day will be ignored. Otherwise, the difference is calculated based on 31 days per month, and rounded to 8 digits unless roundOff=false.

Parameters

arg0:timestamp1 column.

arg1:timestamp2 column.

arg2:roundoff boolean.

Returns

Returns the months difference.

Throws

ApplicationException

Example

months_between('1997-02-28 10:30:00', '1996-10-30', false) will return 3.9495967741935485

Field

Description

nanvl

Description

Returns expr1 if it's not NaN, or expr2 otherwise.

Parameters

arg0:expr1 column.

arg1:expr2 column.

Returns

Returns expr1 if it's not NaN, or expr2 otherwise.

Throws

ApplicationException

Example

nanvl(cast('NaN' as double), 123) will return 123.0

Field

Description

negative

Description

Returns the negated value of expr.

Parameters

arg0:The column or expression.

Returns

The negated value.

Throws

ApplicationException

Example

SELECT negative(1); will return -1

Field

Description

next_day

Description

Given a date column, returns the first date which is later than the value of the date column that is on the specified day of the week.

Parameters

arg0: The date later which the first date for a particular day of week has to determined.

arg1: The day of week.

Returns

Returns the computed first date.

Throws

ApplicationException

Example

next_day("2017-12-15","friday") will return "2017-12-22"

Field

Description

not

Description

Perform logical not of given column.

Parameters

arg0:Given boolean column.

Returns

Returns logical not of given column.

Throws

ApplicationException

Example

not(false) will return true.

Field

Description

now

Description

Returns the current timestamp at the start of query evaluation.

Parameters

arg0:Not applicable.

Returns

The current timestamp

Throws

ApplicationException

Example

SELECT now(); will return 2020-06-26 15:09:37

Field

Description

nullif

Description

Returns null if expr1 equals to expr2, or expr1 otherwise.

Parameters

arg0:The first column expression.

arg1:The second column expression.

Returns

Returns the binary.

Throws

ApplicationException

Example

SELECT nullif(2, 2); will return NULL

Field

Description

nvl

Description

Returns expr2 if expr1 is null, or expr1 otherwise.

Parameters

arg0:The first column expression.

arg1:The second column expression.

Returns

Returns the binary.

Throws

ApplicationException

Example

SELECT nvl(NULL, array('2')); will return ["2"]

Field

Description

nvl2

Description

Returns expr2 if expr1 is not null, or expr3 otherwise.

Parameters

arg0:The first column expression.

arg1:The second column expression.

arg1:The third column expression.

Returns

Returns the binary.

Throws

ApplicationException

Example

SELECT nvl2(NULL, 2, 1); will return 1

Field

Description

parse_url

Description

Extracts a part from a URL

Parameters

arg0:The URL.

arg1:The part to extract.

arg1:The key.

Returns

Returns the binary.

Throws

ApplicationException

Example

SELECT parse_url('http://spark.apache.org/path?query=1', 'HOST') will return spark.apache.org

Field

Description

percent_rank

Description

Computes the percentage ranking of a value in a group of values.

Parameters

arg0:Not applicable.

Returns

Returns percentage ranking.

Throws

ApplicationException

Example

select percent_rank() OVER (order by col) will return 1,2,3,4....

Field

Description

pi

Description

Returns pi.

Parameters

arg0:Not applicable.

Returns

Returns pi.

Throws

ApplicationException

Example

SELECT pi(); will return 3.141592653589793

Field

Description

pmod

Description

Computes the positive value of arg0 mod arg1.

Parameters

arg0: The dividend.

arg1: The divisor.

Returns

Returns the computed positive value of arg0 mod arg1.

Throws

ApplicationException

Example

pmod(19, 0.78) will return 0.2800007

Field

Description

posexplode

Description

Separates the elements of array expr into multiple rows with positions, or the elements of map expr into multiple rows and columns with positions.

Parameters

arg0: The array expression.

Returns

Returns multiple rows and columns.

Throws

ApplicationException

Example

SELECT posexplode(array(10,20)); will return 0 10

1 20

Field

Description

posexplode_outer

Description

Separates the elements of array expr into multiple rows with positions, or the elements of map expr into multiple rows and columns with positions.

Parameters

arg0: The array expression.

Returns

Returns multiple rows and columns.

Throws

ApplicationException

Example

SELECT posexplode(array(10,20)); will return 0 10

1 20

Field

Description

position

Description

Returns the position of the first occurrence of substr in str after position pos. The given pos and return value are 1-based.

Parameters

arg0:The substr.

arg1:The str.

arg1:The pos.

Returns

Returns the position.

Throws

ApplicationException

Example

SELECT position('bar', 'foobarbar'); will return 4

Field

Description

pow

Description

Computes the value of the first argument raised to the power of the second argument.

Parameters

arg0: The base.

arg1: The exponent.

Returns

Returns the computed value.

Throws

ApplicationException

Example

pow(20, 2) will return 400

Field

Description

pow_left_arg_double

Description

Computes the value of the first argument raised to the power of the second argument.

Parameters

arg0: The base.

arg1: The exponent.

Returns

Returns the computed value.

Throws

ApplicationException

Example

pow_left_arg_double(20, 2) will return 400

Field

Description

pow_right_arg_double

Description

Computes the value of the first argument raised to the power of the second argument.

Parameters

arg0: The base.

arg1: The exponent.

Returns

Returns the computed value.

Throws

ApplicationException

Example

pow_right_arg_double(20, 2) will return 400

Field

Description

quarter

Description

Extracts the quarter as an integer from a given date/timestamp/string.

Parameters

arg0: The column for which quarter to be calculated.

Returns

Returns the extracted quarter as an integer.

Throws

ApplicationException

Example

quarter(“2017-12-22 01:12:00”) will return 4

Field

Description

radians

Description

Converts an angle measured in degrees to an approximately equivalent angle measured in radians.

Parameters

arg0: The column for which equivalent angle measured in radians to be calculated.

Returns

Returns the converted angle measured in radians.

Throws

ApplicationException

Example

radians(20) will return 0.3490658503988659

Field

Description

rand

Description

Generate a random column with independent and identically distributed (i.i.d.) samples from 0.0 to 1.0.

Parameters

-

Returns

Returns the generated column.

Throws

ApplicationException

Example

rand() will return a new column with independent and identically distributed (i.i.d.) samples from 0.0 to 1.0.

Field

Description

randn

Description

Generate a column with independent and identically distributed (i.i.d.) samples from the standard normal distribution.

Parameters

-

Returns

Returns the generated column.

Throws

ApplicationException

Example

randn() will return a new column with independent and identically distributed (i.i.d.) samples from the standard normal distribution.

Field

Description

rank

Description

Computes the rank of a value in a group of values. The result is one plus the number of rows preceding or equal to the current row in the ordering of the partition. The values will produce gaps in the sequence.

Parameters

arg0:Not applicable.

Returns

The calculated rank.

Throws

ApplicationException

Example

select rank() OVER (order by col) will return 1,2,3,4...

Field

Description

regexp_extract

Description

Extract a specific group matched by a Java regex, from the specified string column.

Parameters

arg0: The string column from which the group to be extracted.

arg1: The string specifying the regex.

arg2: The regex group id.

Returns

Returns the extracted group.

Throws

ApplicationException

Example

regexp_extract("foothebar","foo(.*?)(bar)", 2) will return "bar"

Field

Description

regexp_replace

Description

Replace all substrings of the specified string value that match regexp with the given replacement.

Parameters

arg0: The string column from which substrings to be replaced.

arg1: The pattern to be used.

arg2: The replacement.

Returns

Returns the string after replacement of substrings that match regexp with rep.

Throws

ApplicationException

Example

regexp_replace("foobar", "oo|ar", "") will return "fb"

Field

Description

repeat

Description

Repeats each value in select column n times.

Parameters

arg0:A column needs to be repeated

arg1:Integer value representing no of times arg0 is to be repeated

Returns

A repeated value.

Throws

ApplicationException

Example

We have taken column1 as 'str'

repeat(@{column.schema.column1},2) will return 'strstr'

Field

Description

replace

Description

Replaces all occurrences of search with replace.

Parameters

arg0:str - A string expression.

arg1:search - a string expression. If search is not found in str, str is returned unchanged.

arg1:replace - a string expression. If replace is not specified or is an empty string, nothing replaces the string that is removed from str.

Returns

Returns the replaced string.

Throws

ApplicationException

Example

SELECT replace('ABCabc', 'abc', 'DEF'); will return ABCDEF

Field

Description

reverse

Description

Reverses the string column and returns it as a new string column.

Parameters

arg0: The string column to be reversed.

Returns

Returns the reversed string column.

Throws

ApplicationException

Example

reverse("apple") will return "elppa"

Field

Description

rint

Description

Computes the double value that is closest in value to the argument and is equal to a mathematical integer.

Parameters

arg0: The column for which double value to be calculated.

Returns

Returns the computed value.

Throws

ApplicationException

Example

rint(80.89) will return 81.0

Field

Description

round

Description

Computes the value of the column arg0 rounded to 0 decimal places.

Parameters

arg0: The column for which value rounded to 0 decimal places to be calculated.

Returns

Returns the computed value.

Throws

ApplicationException

Example

round(80.89) will return 81.0

Field

Description

row_number

Description

Assigns a unique, sequential number to each row, starting with one, according to the ordering of rows within the window partition.

Parameters

arg0:Not applicable.

Returns

The row number.

Throws

ApplicationException

Example

select row_number() OVER (order by col) will return 1,2,3,4....

Field

Description

rpad

Description

Right-pad the string column with the given string, to a given length.

Parameters

arg0: The string column to be right-padded.

arg1: The length for the padding.

arg2: The string to used for right-pad.

Returns

Returns the Right-padded string.

Throws

ApplicationException

Example

rpad("SQL Tutorial", 20, "ABC") will return "SQL TutorialABCABCAB"

Field

Description

rtrim

Description

Trim the spaces from right end for the specified string value.

Parameters

arg0: The string column from which right spaces to be trimmed.

Returns

Returns the trimmed string value.

Throws

ApplicationException

Example

rtrim("apple ") will return "apple"

Field

Description

schema_of_json

Description

Returns schema in the DDL format of JSON string.

Parameters

arg0:Given json string column

Returns

Returns schema of the json.

Throws

ApplicationException

Example

schema_of_json('[{"col":0}]') will return array>

Field

Description

second

Description

Extracts the seconds as an integer from a given date/timestamp/string.

Parameters

arg0: The date/timestamp/string from which the seconds to be extracted.

Returns

Returns the seconds as an integer.

Throws

ApplicationException

Example

second("2017-12-15 11:02:03") will return 3

Field

Description

sequence

Description

Generates an array of elements from start to stop (inclusive), incrementing by step. The type of the returned elements is the same as the type of argument expressions.Supported types are: byte, short, integer, long, date, timestamp.The start and stop expressions must resolve to the same type. If start and stop expressions resolve to the 'date' or 'timestamp' type then the step expression must resolve to the 'interval' type, otherwise to the same type as the start and stop expressions.

Parameters

arg0:start - an expression. The start of the range.

arg1:stop - an expression. The end the range (inclusive).

arg2:step - an optional expression. The step of the range. By default step is 1 if start is less than or equal to stop, otherwise -1. For the temporal sequences it's 1 day and -1 day respectively. If start is greater than stop then the step must be negative, and vice versa.

Returns

Returns the sequence

Throws

ApplicationException

Example

sequence(to_date('2018-01-01'), to_date('2018-03-01'), interval 1 month) will return [2018-01-01,2018-02-01,2018-03-01]

Field

Description

sha1

Description

Calculates the SHA-1 digest for string and returns the value as a hex string

Parameters

arg0:A string column

Returns

A hex string.

Throws

ApplicationException

Example

We have taken column1 as 'ABC'

sha1(@{column.schema.column1}) will return '3c01bdbb26f358bab27f267924aa2c9a03fcfdb8'

Field

Description

sha2

Description

Calculates the SHA-2 family of hash functions of a string value and returns the value as a hex string. NumBits controls the number of bits in the message digest.

Parameters

arg0:A string column

arg1:one of 224, 256, 384, or 512.

Returns

A hex string

Throws

ApplicationException

Example

We have taken column1 as 'Sam'

sha2(@{column.schema.column1},256) will return '4ecde249d747d51d8..'

Field

Description

shiftLeft

Description

Bitwise left shift, Shifts a b positions to the left.

Parameters

arg0:A number column

arg1:An integer column

Returns

If the given value is a long value, this function will return a long value else it will return an integer value

Throws

ApplicationException

Example

We have taken column1 as '258'

shiftLeft(@{column.schema.column1},2) will return 1032

Field

Description

shiftRight

Description

Bitwise right shift, Shifts a b positions to the right.

Parameters

arg0:A number column

arg1:An integer column

Returns

If the given value is a long value, this function will return a long value else it will return an integer value

Throws

ApplicationException

Example

We have taken column1 as '258'

shiftRight(@{column.schema.column1},2) will return 64

Field

Description

shiftRightUnsigned

Description

Bitwise unsigned right shift, Shifts a b positions to the right.

Parameters

arg0:A number column

arg1:An integer column

Returns

If the given value is a long value, this function will return a long value else it will return an integer value

Throws

ApplicationException

Example

We have taken column1 as '258'

shiftRightUnsigned(@{column.schema.column1},2) will return 64

Field

Description

shuffle

Description

Returns a random permutation of the given array.

Parameters

arg0: array column.

Returns

Returns a random permutation of the given array.

Throws

ApplicationException

Example

shuffle(array(1, 20, null, 3))will return [20,null,3,1]

Field

Description

sign

Description

Returns -1.0, 0.0 or 1.0 as expr is negative, 0 or positive.

Parameters

arg0:Not applicable.

Returns

Returns 1.0, 0.0 or 1.0.

Throws

ApplicationException

Example

SELECT signum(40); will return 1.0

Field

Description

signum

Description

Computes the signum of the given value.

Parameters

arg0: The column for which signum to be calculated.

Returns

Returns the computed signum.

Throws

ApplicationException

Example

signum(20) will return 1.0

Field

Description

sin

Description

Computes the sine of the given value.

Parameters

arg0: The column for which sine to be calculated.

Returns

Returns the computed sine.

Throws

ApplicationException

Example

sin(20) will return 0.9129452507276277

Field

Description

sinh

Description

Computes the hyperbolic sine of the given value.

Parameters

arg0: The column for which hyperbolic sine to be calculated.

Returns

Returns the computed hyperbolic sine.

Throws

ApplicationException

Example

sinh(20) will return 2.4258259770489514E8

Field

Description

size

Description

Returns the size of an array or a map. The function returns -1 if its input is null and spark.sql.legacy.sizeOfNull is set to true. If spark.sql.legacy.sizeOfNull is set to false, the function returns null for null input. By default, the spark.sql.legacy.sizeOfNull parameter is set to true.

Parameters

arg0: array or map column.

Returns

Returns the size

Throws

ApplicationException

Example

size(array('b', 'd', 'c', 'a')) will return 4

Field

Description

slice

Description

Subsets array x starting from index start (or starting from the end if start is negative) with the specified length.

Parameters

arg0:array column.

arg1:start index.

arg2:end index

Returns

Returns an array.

Throws

ApplicationException

Example

slice(array(1, 2, 3, 4), 2, 2) will return [2,3]

Field

Description

smallint

Description

Casts the value expr to the target data type smallint.

Parameters

arg0:The column or expression.

Returns

Returns the smallint column or value.

Throws

ApplicationException

Example

SELECT smallint(column); will cast column to smallint

Field

Description

sort_array

Description

Sorts the input array in ascending or descending order according to the natural ordering of the array elements. Null elements will be placed at the beginning of the returned array in ascending order or at the end of the returned array in descending order.

Parameters

arg0:array column.

arg1:boolean flag for order.

Returns

Returns the sorted array.

Throws

ApplicationException

Example

sort_array(array('b', 'd', null, 'c', 'a'), true) will return [null,"a","b","c","d"]

Field

Description

soundex

Description

Returns Soundex code of the string.

Parameters

arg0:String column.

Returns

Returns Soundex code of the string.

Throws

ApplicationException

Example

soundex('Miller') will return M460

Field

Description

spark_partition_id

Description

Returns the current partition id.

Parameters

arg0:Not applicable.

Returns

The partition id.

Throws

ApplicationException

Example

select spark_partition_id(); will return 1

Field

Description

split

Description

Splits str around occurrences that match regex

Parameters

arg0: str string column.

arg1:the regex string.

Returns

Returns the splits.

Throws

ApplicationException

Example

split('oneAtwoBthreeC', '[ABC]')will return ["one","two","three",""]

Field

Description

sqrt

Description

Computes the square root of the specified float value.

Parameters

arg0: The column for which square root to be calculated.

Returns

Returns the computed square root.

Throws

ApplicationException

Example

sqlSqrt(20) will return 4.47213595499958

Field

Description

string

Description

Casts the value expr to the target data type string.

Parameters

arg0:The column or expression.

Returns

Returns the string column or value.

Throws

ApplicationException

Example

SELECT string(column); will cast column to string

Field

Description

struct

Description

Creates a struct with the given field values.

Parameters

arg0:columns using which the struct will be created.

Returns

returns the struct column

Throws

ApplicationException

Example

-

Field

Description

substr

Description

Substring starts at given position and is of given length when given argument is String type or returns the slice of byte array that starts at given position in byte and is of given length when given argument is Binary type

Parameters

arg0: The String column from which substring to be extracted.

arg1: The start position for the substring.

arg2: The end position for the substring.

Returns

Returns the result substring.

Throws

ApplicationException

Example

substring("foo bar",4,6) will return "bar"

Field

Description

substring

Description

Substring starts at given position and is of given length when given argument is String type or returns the slice of byte array that starts at given position in byte and is of given length when given argument is Binary type

Parameters

arg0: The String column from which substring to be extracted.

arg1: The start position for the substring.

arg2: The end position for the substring.

Returns

Returns the result substring.

Throws

ApplicationException

Example

substring("foo bar",4,6) will return "bar"

Field

Description

substring_index

Description

Computes the substring from given string before given count occurrences of the given delimiter.

Parameters

arg0: The String column from which substring to be extracted.

arg1: The delimiter.

arg2: The count occurrences for the delimiter.

Returns

Returns the result substring.

Throws

ApplicationException

Example

substring_index("www.xyz.com",".",2) will return "www.xyz"

Field

Description

tan

Description

Computes the tangent of the given value.

Parameters

arg0: The column for which tangent to be calculated.

Returns

Returns the computed tangent.

Throws

ApplicationException

Example

tan(20) will return 2.237160944224742

Field

Description

tanh

Description

Computes the hyperbolic tangent of the given value.

Parameters

arg0: The column for which hyperbolic tangent to be calculated.

Returns

Returns the computed hyperbolic tangent.

Throws

ApplicationException

Example

tanh(20) will return 1.0

Field

Description

timestamp

Description

Casts the value expr to the timestamp type.

Parameters

arg0: The string column or expression.

Returns

Returns the date column or value.

Throws

ApplicationException

Example

SELECT date('2020-06-10 02:12:45'); will return 2020-06-10 02:12:45 as timestamp type.

Field

Description

tinyint

Description

Casts the value expr to the target data type tinyint.

Parameters

arg0:The column or expression.

Returns

Returns the tinyint column or value.

Throws

ApplicationException

Example

SELECT tinyint(column); will cast column to tinyint

Field

Description

toDegrees

Description

Returns the angle measured in radians to an approximately equivalent angle measured in degrees.

Parameters

arg0:A column for which degree to be calculated

Returns

A double value.

Throws

ApplicationException

Example

We have taken column1 as 3.14159,

toDegrees(@{column.schema.column1}) will return 180

Field

Description

toRadians

Description

Returns the angle measured in degrees to an approximately equivalent angle measured in radians.

Parameters

arg0:A column for which radians to be calculated

Returns

A double value.

Throws

ApplicationException

Example

We have taken column1 as 180,

toRadians(@{column.schema.column1}) will return 3.14159

Field

Description

to_date

Description

Parses the date_str expression with the fmt expression to a date. Returns null with invalid input. By default, it follows casting rules to a date if the fmt is omitted.

Parameters

arg0: str date_str column column.

arg1:the format string.

Returns

returns the formatted date.

Throws

ApplicationException

Example

to_date('2016-12-31', 'yyyy-MM-dd') will return 2016-12-31

Field

Description

to_json

Description

Returns a JSON string with a given struct value

Parameters

arg0:struct column.

arg1:additional options map.

Returns

Returns a JSON string with a given struct value

Throws

ApplicationException

Example

to_json(named_struct('time', to_timestamp('2015-08-26', 'yyyy-MM-dd')), map('timestampFormat', 'dd/MM/yyyy')) will return {"time":"26/08/2015"}

Field

Description

to_timestamp

Description

Parses the timestamp expression with the fmt expression to a timestamp. Returns null with invalid input. By default, it follows casting rules to a timestamp if the fmt is omitted.

Parameters

arg0:The timestamp column.

arg1:The format string.

Returns

Returns the formated timestamp.

Throws

ApplicationException

Example

to_timestamp('2016-12-31', 'yyyy-MM-dd') will return 2016-12-31 00:00:00

Field

Description

to_utc_timestamp

Description

Given a timestamp like '2017-07-14 02:40:00.0', interprets it as a time in the given time zone, and renders that time as a timestamp in UTC

Parameters

arg0:The timestamp column.

arg1:The timezone column.

Returns

Returns the timestamp.

Throws

ApplicationException

Example

to_utc_timestamp('2016-08-31', 'Asia/Seoul') will return 2016-08-30 15:00:00

Field

Description

transform

Description

Transforms elements in an array using the function.

Parameters

arg0:The array expression.

arg1:The function to apply on each element of the array expression.

Returns

Returns the transformed array.

Throws

ApplicationException

Example

SELECT transform(array(1, 2, 3), x -> x + 1); will return [2,3,4]

Field

Description

translate

Description

Translate any character in the given string by a given character in given replaceString.

Parameters

arg0: The string column in which the translation to be done.

arg1: The matching character.

arg2: The replacement charater.

Returns

Returns the translated string.

Throws

ApplicationException

Example

translate("The foo bar","f","t") will return "The too bar"

Field

Description

trim

Description

Remove the leading and trailing trimStr characters from str

Parameters

arg0:The trimStr String column.

arg1:The str string.

Returns

Returns the trimed string

Throws

ApplicationException

Example

trim('SL', 'SsparkSQLS') will return parkSQ

Field

Description

trunc

Description

Computes the date truncated to the unit specified by the format.

Parameters

arg0: The date to be truncated.

arg1: The format for truncation.

Returns

Returns truncated date.

Throws

ApplicationException

Example

trunc("2017-12-15","YEAR") will return "2017-01-01"

Field

Description

ucase

Description

Converts str with all characters changed to uppercase.

Parameters

arg0:The column or expression.

Returns

Returns str with all characters changed to uppercase.

Throws

ApplicationException

Example

SELECT lcase('SparkSql'); will return SPARKSQL

Field

Description

unbase64

Description

Converts the argument from a base 64 string str to a binary.

Parameters

arg0:The base 64 String column.

Returns

Returns the unbase64 of string.

Throws

ApplicationException

Example

unbase64('U3BhcmsgU1FM') will return Spark SQL

Field

Description

unhex

Description

Converts hexadecimal expr to binary.

Parameters

arg0:The hexadecimal column.

Returns

Returns the binary.

Throws

ApplicationException

Example

decode(unhex('537061726B2053514C'), 'UTF-8') will return Spark SQL

Field

Description

unix_timestamp

Description

Convert time string with given pattern (see [http://docs.oracle.com/javase/tutorial/i18n/format/simpleDateFormat.html]) to Unix time stamp (in seconds), return null if fail.

Parameters

arg0: The time string to be converted.

arg1: The format of the time string.

Returns

Returns the converted Unix time stamp (in seconds), return null if fail.

Throws

ApplicationException

Example

unix_timestamp("2017-12-15 11:56","yyyy-MM-dd hh:mm") will return 1513339008

Field

Description

upper

Description

Converts a string column to upper case.

Parameters

arg0: The string column to be converted o upper case.

Returns

Returns the converted string column.

Throws

ApplicationException

Example

upper("aPPle") will return "APPLE"

Field

Description

uuid

Description

Returns an universally unique identifier (UUID) string. The value is returned as a canonical UUID 36-character string

Parameters

arg0:Not applicable.

Returns

Returns the uuid.

Throws

ApplicationException

Example

SELECT uuid(); will return 12ee-419a-ac70-88c948edd439

Field

Description

weekday

Description

Extracts and returns the day of week of the given date/timestamp.

Parameters

arg0: The column or expression.

Returns

Returns the day of week of the date/timestamp.

Throws

ApplicationException

Example

SELECT day('2009-07-30'); will return 3.

Field

Description

weekofyear

Description

Extracts the week number as an integer from a given date/timestamp/string.

Parameters

arg0: The date/timestamp/string from which the week to be extracted.

Returns

Returns the converted Unix time stamp (in seconds), return null if fail.

Throws

ApplicationException

Example

weekofyear("2017-12-15 11:02:03") will return 50

Field

Description

xpath

Description

Extracts and returns a string array of values within the nodes of xml that match the XPath expression.

Parameters

arg0:The string xml.

arg1:The xpath expression.

Returns

Returns the array of strings.

Throws

ApplicationException

Example

SELECT xpath('<a><b>b1</b><b>b2</b><b>b3</b><c>c1</c><c>c2</c></a>','a/b/text()'); will return ['b1','b2','b3']

Field

Description

xpath_boolean

Description

Returns true if the XPath expression evaluates to true, or if a matching node is found.

Parameters

arg0:The string xml.

arg1:The xpath expression.

Returns

Returns true or false

Throws

ApplicationException

Example

SELECT xpath_boolean('<a><b>1</b></a>','a/b'); will return true

Field

Description

xpath_double

Description

Evaluates given xpath expression and returns double value.

Parameters

arg0:The string xml.

arg1:The xpath expression.

Returns

Returns a double value, the value zero if no match is found, or NaN if a match is found but the value is non-numeric.

Throws

ApplicationException

Example

SELECT xpath_double('<a><b>1</b><b>2</b></a>', 'sum(a/b)'); will return 3.0

Field

Description

xpath_float

Description

Evaluates given xpath expression and returns float value.

Parameters

arg0:The string xml.

arg1:The xpath expression.

Returns

Returns a float value, the value zero if no match is found, or NaN if a match is found but the value is non-numeric.

Throws

ApplicationException

Example

SELECT xpath_float('<a><b>1</b><b>2</b></a>', 'sum(a/b)'); will return 3.0

Field

Description

xpath_int

Description

Evaluates given xpath expression and returns integer value.

Parameters

arg0:The string xml.

arg1:The xpath expression.

Returns

Returns a integer value, the value zero if no match is found, or NaN if a match is found but the value is non-numeric.

Throws

ApplicationException

Example

SELECT xpath_int('<a><b>1</b><b>2</b></a>', 'sum(a/b)'); will return 3

Field

Description

xpath_long

Description

Evaluates given xpath expression and returns long value.

Parameters

arg0:The string xml.

arg1:The xpath expression.

Returns

Returns a long value, the value zero if no match is found, or NaN if a match is found but the value is non-numeric.

Throws

ApplicationException

Example

SELECT xpath_long('<a><b>1</b><b>2</b></a>', 'sum(a/b)'); will return 3

Field

Description

xpath_number

Description

Evaluates given xpath expression and returns double value.

Parameters

arg0:The string xml.

arg1:The xpath expression.

Returns

Returns a double value, the value zero if no match is found, or NaN if a match is found but the value is non-numeric.

Throws

ApplicationException

Example

SELECT xpath_number('<a><b>1</b><b>2</b></a>', 'sum(a/b)'); will return 3.0

Field

Description

xpath_short

Description

Evaluates given xpath expression and returns short integer value.

Parameters

arg0:The string xml.

arg1:The xpath expression.

Returns

Returns a short integer value, the value zero if no match is found, or NaN if a match is found but the value is non-numeric.

Throws

ApplicationException

Example

SELECT xpath_short('<a><b>1</b><b>2</b></a>', 'sum(a/b)'); will return 3

Field

Description

xpath_string

Description

Extracts and returns the text contents of the first xml node that matches the XPath expression.

Parameters

arg0:The string xml.

arg1:The xpath expression.

Returns

Returns the string content.

Throws

ApplicationException

Example

SELECT xpath_string('<a><b>b</b><c>cc</c></a>','a/c'); will return cc

Field

Description

year

Description

Extracts the year as an integer from a given date/timestamp/string.

Parameters

arg0: The date/timestamp/string from which the year to be extracted.

Returns

Returns the extracted year as an integer.

Throws

ApplicationException

Example

year("2017-12-15 11:02:03") will return 2017

Field

Description

zip_with

Description

Merges the two given arrays, element-wise, into a single array using function. If one array is shorter, nulls are appended at the end to match the length of the longer array, before applying function.

Parameters

arg0:The first array.

arg1:The second array.

arg1:The function to apply while merging.

Returns

Returns the merged array.

Throws

ApplicationException

Example

SELECT zip_with(array(1, 2, 3), array('a', 'b', 'c'), (x, y) -> (y, x)); will return [{"y":"a","x":1},{"y":"b","x":2},{"y":"c","x":3}]


Field

Description

zulu Time Format

Description

Returns the UTC date of input date column or value in given output date format.

Parameters

Date: String date value or date column.

inputDateFormat: Date format of the input date.

outputDateFormat: Date format of the outdate date.

Returns

UTC date in output date format in string.

Throws

ApplicationException.

Example

zuluTimeFormat(columnName1,"yyyy-MM-dd HH:mm:ss","yyyy/MM/dd HHmmss") will return UTC date in format "yyyy/MM/dd HHmmss".

Note: If user is using column name then the column should be of string type.

Expression Filter

This processor is responsible for performing filtering operations on incoming dataset, e.g., equals, contains, not-null, ranges, matches, starts-with, ends-with, etc. It uses spark expression language for constructing filter criteria.

Configuring Expression Filter for Spark pipelines

To add an Expression Filter processor into your pipeline, drag the processor to the canvas and right click on it to configure.

Field

Description

Expression

Provide SQL expression using SparkSQL functions, that evaluates to true/false, e.g., sqrt(ceil(TXN_ID))>2.0 will return either true or false. Rows which satisfy the criteria, e.g. return true, will remain in stream and rest will be filtered out.

Note: Enclose string literal/constant values inside single quotes e.g. ‘John’, ‘Suite 130 Los Gatos, CA 95032, US’.

Validate

Validate the expressions applied on the column.

Note: Refer Expression Filter section in Data Preparation section.

The user can apply any transformation and the list of functions that can be applied are all listed in the right table on the user interface.

Filter

Data filtering is required to refine the datasets so that you simply get what you need excluding other data that can be repetitive, irrelevant or sensitive. Different type of filters can be used to query results or other kind of information.

The Filter processor is used to display specific records by using filter criteria. For example, you can define the Filter rule: grade contains “software” and name contains “Mike”. The system will fetch all the records which meet this criteria.

Configuring Filter processor for Spark pipelines

To add a Filter processor into your pipeline, drag the processor to the canvas and right click on it to configure.

 

Field

Description

Config Fields

Enables to create local variables.

Message Name

Message on which filter rule is to be applied.

Filter Rule

Enables to create a filter rule.

The rule applies filter on the data based on the criteria provided.

For example, OrderMethodType is not null, Year less than 2050 and OrderMethodType matches Fax.

Add Rule

Enables to configure additional rules.

Add group

Additional groups can be added using Add group link.

Rule(s) clubbed with either AND or OR operator that comes under a group i.e. Rule1 AND/OR Rule2.


For example, (grade contains software) AND (name contains Mike)


You need multiple groups for adding multiple operators.

Delete

Enables to delete added group or rule. 

Negate

If Negate is true, the filter criteria is evaluated as NOT of the given criteria.


If Negate is false, then the filter criteria is evaluated as it is. By default it is set to false.

Add New Criteria

Enables to configure additional criteria.

Up and Down arrows

Enables to move the groups and rules up and down.

Click on the NEXT button.

Click on the Save button If you do not wish to create a new template.

Field Splitter

The Field Splitter splits string data based on a regular expression and passes the separated data to new fields. Use the Field Splitter to split complex string values into logical components.

Field Splitter splits string data based on a regular expression and passes the separated data to new fields. Use the Field Splitter to split complex string values into logical components.

For example, if a field contains an error code and error message separated by a comma, you can use the comma to separate the code and message into different fields.

Configuring Field Splitter processor for Spark pipelines

To add Field Splitter processor into your pipeline, drag the processor to the canvas and right click on it to configure as explained below:

Field

Description

Field To Split

Field on which splitting is to be applied.

Separator

Delimiter used for separating field.

New Split Fields

Columns in which fields will be split.

Not Enough Split

It has two options Continue and Send to error.


Note: If Send To Error option is selected Error log target and its configuration field will be visible.

Error Log Target

Select Target where you want to send data which failed to process in the pipeline.

Queue Name

Name of the queue where data will be published.

Connection Name

Connection name for creating connection.

Too Many Splits

When a field has too many splits, the Field Splitter can include all the remaining data in the last listed split field, or it can write the additional splits to a specified list field.


It has two options: Put Remaining in the last field or Store Remaining in New Field.

If Store Remaining in New Field is selected as a Field then remaining split options will be visible.

Original Field

The field that has splitting applied on it.

Whether you want to keep or remove the field. It has two options Remove or Keep.

Field Converter

To edit the Data Type of the schema, you can use this processor. The Field Converter processor generates the schema and the fields are editable after the schema is detected.

Configuring Field Converter processor for Spark pipelines

To add Field Converter processor into your pipeline, drag the processor to the canvas and right click on it to configure as explained below.

On the first page you will be asked to Add Configuration, which is optional.

Once you click Next, the schema is detected and you can edit the Data Type as required.

Field Replacer

A field replacer allows you to replace a field value by providing a hard coded value e.g. null with "some value", replace a value which is following a pattern by providing regex, and replace a field value by other field value or combination of field values.

You can replace input field if the regex pattern matches.

There are two option to provide values.

1. Value will be a combination (concatenation) of two columns.

For e.g: in below image it is “JOBROLE” CONCAT “NAME”.

Order in which you select the columns will be considered for the combination.

2. Enter a single value to replace with.

Configuring Field Replacer processor for Spark pipelines

To add a Field Replacer processor into your pipeline, drag the processor to the canvas and right click on it to configure as explained below:

Field

Description

Output Field

Name of the output column, which is created or replaced by input values.

Value

The replacement value for matching regex pattern.

Regex Pattern

Regex pattern.

Input Field

Input column on which Regex should be applied.

After configuring the component, detect the schema for desired result.

The below image confirms the input field (NAME) where regex pattern (sagar|Nikhil) matches replaced with uservalue and created new column (NEWNAME).

Field Flattener

Field Flattener allows you to flatten a nested record to produce a record with no nested fields, it can also flatten specific list or map fields.

By default, the new flattened fields is separated by”_”

Let’s take an example of a nested JSON.

Specify a path of input json in the record, so that it flattens all nested structures until the nested path is flat.

For example:

Flatten Example 1: The JSON below has nested fields under Employee, ID, Name, Permanent and Address. Now using the Field flattener, you will be able to flatten the nested fields i.e., Street, City, Zipcode as shown below:

{

"Employee":{

"Id":123,

"Name":"Steve Rogers",

"Permanent":true,

"Address":{

"Street":"El Camino Real",

"City":"San Jose",

"Zipcode":55014

},

"Role":"Developer"

}

}

Configuring Field Flattener processor for Spark pipelines

To add Field Flattener processor into your pipeline, drag the processor to the canvas and right click on it to configure as explained below:

 

Field

Description

Flatten Path

Flatten list and map fields that contains additional nested list or map fields. Specify the path to the field, for example: Employee.

Functions

Function processor enables to perform spark SQL functions on dataset.

Functions processor supports:

• MVEL expressions

• lookup functions

• Date

• String

• Math

• Miscellaneous functions

Configuring Functions processor for Spark pipelines

To add Functions processor into your pipeline, drag the processor to the canvas and right click on it to configure as explained below:

Field

Description

Config Fields

Config fields are used to create local variables

Add Config Fields

Additional Config fields can be added by clicking on the plus sign(+).

Transform/Add Fields

Select a schema on which Function is to be applied.

Additional fields can be added by clicking on the plus sign(+).

On right side, select the function that is to be applied. You can select an existing field or create a new field to hold the output of the applied function.

Click on the NEXT button. You will view the fields generated from data source. You can provide new output format of the date field. For example, if the input date format was yyyy-dd-yy, specify new date format as yyyy/MM/dd

Click Next after changing the Date format. Enter the notes in the space provided.

Click SAVE for saving the configuration details.

HTTP

HTTP Processor is used to enrich the incoming data with the response of external rest calls. The user can consume different URLs in parallel and use their response to enrich the resulting data either in a new column or existing column.

Field

Description

HTTP URL

HTTP or HTTPS URL to send request to a source.

Method Type

Based on the type of HTTP URL, the user chooses one of the below mentioned request method types:

1. GET

Note: User is required to fill the Request Param.

2. POST

Note: In certain posts the user will be required to mention the request body value and request param.

3. PUT

Note: Request body value needs to be specified along with request param.

4. DELETE

Note: The user is required to fill the request body value and request param fields.

Note: In POST, PUT and DELETE, when the user provides the request body value in the below format:

{‘value’.’key’}

Request Param

The request parameters can either be provided with actual values or can be parameterized. The placeholders in case of parameterized inputs will be resolved during runtime as per the data source column value.

The user provides parameter in two ways:

e.g: param1=value1

or

param2=${column}

Enable SSL

This option can be selected as True if SSL is enabled on HTTP. It is set to False by default.

If selected as True, keystore/certificate file needs to be uploaded or the keystore file path should be provided in the respective configuration field.

Output Fields

The output field should be selected in which the response from the given HTTP URL will get reflected. Here, the output field is mentioned based on the HTTP URL.

Header

In certain URL the user requires to provide a Header. The user can mention the header details in key value pair. Likewise, the user can add header too.

Auth Type

For different URLs there are different authorization types that must be provided by the user.

Types of authorization:

1. Basic

Note: In case of basic authorization type the user will be required to fill - user name and password.

2. Token

Note: In case of token authorization type, the user will be required to specify the tokenID and the token value.

3. Auth

Note: In case of Auth type of authorization, the user will be required to specify the header(s) and also specify the clientID, secret key and end-point Auth URL.

4. None

Note: When no authorization is required, None is set as default.

Drop Record

By selecting drop record, the record will be dropped and not shown in the output in case of an exception/error being generated in a URL.

Retry Count

By specifying the number of retry count, the user can run the URL as many number of times as mentioned in this field in case of failure to run the URL.

Read Timeout (sec)

While a URL is accessing a server and the output generation is taking time, the user can choose read timeout (in seconds).

Connection Timeout (sec)

In certain scenarios, the connection is not established, the user can specify the connection time out (in seconds).

Enable Request Rate

Control the rate at which HTTP calls are made on each partition.

Note: The HTTP calls are made in parallel on multiple partitions. Therefore, set the number of partitions using the repartition processor before using the HTTP emitter.

Example: For a client, the API hit rate limit is set to 40 calls/sec. Add a repartition processor before HTTP emitter and provide the partitioning value as 10. Now, set the rate limit to 4 requests per second, which will apply to each partition. This means 10 parallel partitions will be working with 4 requests per second on each partition allowing 40 requests per second on the HTTP emitter.

Note:

The user can create and run multiple URLs by clicking on the ADD FIELD option.

Hashing

Hash fields based on different Hashing Algorithms, to hide sensitive information.

Hashing is a technique in which an algorithm (also called a hash function) is applied to a portion of data to create a unique digital “fingerprint” that is a fixed-size variable. If anyone changes the data by so much as one binary digit, the hash function will produce a different output (called the hash value) and the recipient will know that the data has been changed. Hashing can ensure integrity and provide authentication as well.

The hash function cannot be “reverse-engineered”; that is, you can't use the hash value to discover the original data that was hashed. Thus, hashing algorithms are referred to as one-way hashes. A good hash function will not return the same result from two different inputs (called a collision); each result should be unique.

Configuring Hashing processor for Spark pipelines

To add Hashing processor into your pipeline, drag the processor to the canvas and right click on it to configure as explained below:

Field

Description

Output Field

The list of columns in which hashed value of selected column is stored.

New column name can be added to this field (This column will be added to dataset.)

Input Field

This is the list of columns for which you want to hash the field.

Hashing Type

This is the list of options for the type of Hashing.

The options are: MURMUR3_128, MURMUR3_32, MD5, SHA1, SHA256, SHA512, ADLER_32 AND CRC_32.

Add Field

Add multiple columns for hashing, simultaneously with different hashing type.

After configuring all the fields, click Next; the schema will be detected and then you can verify and save the configuration.

JDBC Container

JDBC Container allows you to perform- Read, Aggregation, Merge and Write opera­tions, by using this processor over an oracle connection.

A JDBC Container allows the implementation of a retention policy as well. Retention policy lets you cleanse/delete unwanted data suing a retention period.

You can also apply checksum in case of aggregation to avoid duplicity of records.

Configuring JDBC-Container processor for Spark pipelines

To add a JDBC Container Processor into your pipeline, drag the processor on the canvas and right click on it to configure.

Field

Description

Connection Name

Provide a oracle connection name for creating a connection.

Table Name

Existing table name of specified database.

Output Mode

Output mode is used to what data will be written to a streaming sink when there is new data available.

In case of Upsert the item with existing ids gets updated and if it does not exist, it gets created, that is called, Insert.

Enable Retention

When selected, each newly created item lives for the num­ber of seconds specified by retention policy. After the expi­ration time is reached item will be deleted by server.

Retention Policy

Number of days/month for which data is to be retained.

You can select a number and select either DAYS or MONTH as a unit.

Retention Column

Retention policy on a table will be applicable based on the field selected here.

Record Limit

Enable the limit to keep the maximum number of records in the container.

Maximum Number of records

Maximum Number of records to be retained for each group depending on grouping field criteria.

Grouping Field

Field on which ‘GroupBy’ is applied to limit the maximum number of records that is to be retained for specified group.

Write Data

Write the raw or aggregated data from the table.

Fields

Select the fields for aggregated data i.e., Function, Input Fields and Output fields.

Group By

Field of selected message on which group by is applied.

Data Duplication Handling

If Data Duplication Handling is enabled, already processed data will not be processed again on the basis of selected fields.

Fields

Data Duplication handling will be processed on the basis of selected fields.

Schema Results

Table Column Name

Name of the column populated from the selected Table.

Mapping Value

Map a corresponding value to the column.

Database Data Type

Data type of the Mapped Value.

Ignore All

Select the Ignore All check box to ignore all the Schema Results or select a checkbox adjacent to the column to ignore that column from the Schema Results.

Use Ignore All or selected fields while pushing data to emit­ter.

This will add that field as the part of partition fields while creating the table.

Auto Fill

Auto Fill automatically populates and map all incoming schema fields with the fetched table columns. The left side shows the table columns and right side shows the incoming schema fields.

If same field, as of table column, not found in incoming schema then the first field will be selected by default.

Download Mapping

It downloads the mappings of schema fields and table col­umns in a file.

Upload Mapping

Uploading the mapping file automatically populates the table columns and schema fields.

Configuration Table:

After the configuration is the Mapping page, where you can map the incoming schema in the fetched columns. Shown above in the table are the properties and description.

Once the configuration is finalized, click Done to save the file.

Join

A join processor can create a relation between two incoming messages (DataSets).

Configuring Join processor for Spark pipelines

To add a Join Processor into your pipeline, drag the processor on the canvas and right click on it to configure.

First tab is Join Condition. It allows to define the Join Type between two DataSets where two columns from both DataSets can have a Join condition applied to them.

Second Tab is Broadcast Table, it guides Spark to broadcast the specified table, when joining them with another table.

The available Join Types are:

Field

Description

Equi

The Equi Join performs a Join against equality or matching column(s) values of the associated tables. In Gathr, if an Equi Join is applied, it gets rendered in Filter Condition. If you want to apply a Join on two columns of the same table, you can use an Equi join for the same.

Full Outer

The full outer join is the solution when we need to match all corresponding data and include the rows without matches from both DataSets. In other words, it contains all, eventually merged, data.

Left Outer

In Left Outer join, all data from the left dataset is contained in joined dataset. The rows that have matches in right dataset are enriched with appropriated information while the ones without the matches, have this information set to null.

Right Outer

The right outer join is a variation of left outer join. It matches all data from right dataset to entries from the left dataset - even if some of matches are missing

Left Semi

When the left semi join is used, all rows from the left dataset having their correspondence in the right dataset are returned in the final result. However, unlike left outer join, the result doesn't contain merged data from both datasets. Instead, it contains only the information (columns) brought by the left dataset.

Left Anti

The last described type is left anti join. It takes all rows from the left dataset that don't have their matching in the right dataset.

Inner

It joins rows only if they've correspondence in both DataSets.

Second Tab is Filter Condition, It allows to apply a filter condition to the DataSets. You can apply AND and OR operations wit two filter conditions.

The Filter Condition that can be applied are as follows:

l contains

l not contains

l begins_with

l ends_with

l equal

l not_begins_with

l not_ends_with

l is_null

l is_not_null

l in

l not_in

l matches

l custom

l not_equal

Third tab is Join Projection where you can apply a query and expressions to the columns.

Field

Description

Add Column/Expression

Select a column or expression that you want to apply the query to.

Value

Selected Columns will be reflected here.

View Query

View the query in the box shown.

ADD ALL COLUMNS/REMOVE COLUMNS

Add or Remove the columns.

Jolt

Jolt is a JSON transformation library written in Java where the specification for the transformation is a JSON document.

Jolt processor is used for performing a JSON to JSON transformations in Java. It applies transformation on Input JSON document and generates a new JSON document as per the Jolt specifications

Jolt processor transforms the structure of JSON data and does not manipulate the values.

Each transform has it's own Domain Specific Language which facilitates it’s job.

Refer following links for detailed information:

https://github.com/bazaarvoice/jolt

https://jolt-demo.appspot.com/#inception

Configuring Jolt processor for Spark pipelines

To add a JOLT Processor into your pipeline, drag the processor on the canvas and right click on it to configure.

Select the Jolt Specification JSON file which is to be uploaded. Click on UPLOAD FILE button. Browse to the location where JSON file is saved and double-click on it for uploading the JSON file.

The content of the specification JSON file gets displayed below. This specification gets applied on incoming json document and generates the transformed output.

Additional properties can be added using ADD CONFIGURATION link.

Click on the NEXT button. Enter the notes in the space provided. Click Save for saving the configuration details.

JSON Parser

The JSON. parse() method parses a JSON string, constructing the JavaScript value or object described by the string. Below are the configuration details of JSON Parser:


Field

Description

JSON Data Column

Provide the column name for which JSON Data will be parsed.

Schema Type

Options to choose between inferring schema from JSON or specify schema for the particular JSON by filling details under the JSON Schema field that appears after selecting Specify Schema option.

Allow unquoted JSON field names

Check mark the option to allow parsing of unquoted JSON field names.

Allow single quoted JSON field names

Check mark the option to allow parsing of single quoted JSON field names.

ADD CONFIGURATION

The use will have an option to further upgrade and add configuration by providing values.

Limit

The Limit processor is used to fetch specified number of rows from the dataset.

For example, if 1000 records are getting generated and you want to fetch only 50 rows, you can make use of this processor.

The record of these 5o rows will be carried to the next component.

This processor can only be used with Batch components.

Note: If you try to add Limit processor after Streaming Data Source, it will not get added and you will get a warning message.

Configuring Limit Processor for Spark Pipelines

To add Limit processor into your pipeline, drag the processor to the canvas and right click on it to configure.

Specify number of rows which are to be fetched in “No of Elements“fields.

Additional configuration details can be added using ADD CONFIGURATION link.

Click on the Next button. Enter the notes in the space provided. Click Save for saving the configuration details.

Masking

Field masking is done to hide sensitive data.

Configuring Masking processor for Spark pipelines

To add a Masking processor into your pipeline, drag the processor to the canvas and right click on it to configure as explained below

Configurable fields are as follows:

Field

Description

Output Field

The list of columns is populated, in which masked value of selected column will be stored. You can search for a column or create a new Column name.

Masking Type

Masking has four options:

l All

l Alternate Character

l Head Characters

l Trail Characters.

Masking Character

The Default masking character is *

Characters to be masked

This field is populated when masking type is Head Characters and Trailing Characters. Enter the number of characters to be masked.

Add Field

Add additional fields to be masked.

The screenshot shown below has two fields, one with Masking Type and other has Input Field to be masked.

Click next to get the schema detected.

As shown above, ALL the characters of field fieldname are masked and Head Charaters of newcol field.

Click Done to save the configuration.

NA

The NA processor is used to handle the null values of any dataset.

If the value specified is Number, all null values of selected input field will be replaced by the specified number.

If the value specified is String, all null values of selected input field will be replaced by the given string.

For example, if there is any dataset in which input fields “Name” and “Age” are null, you can specify value for Name as “Mike” and “Age” as “30” that should substitute the null values.

Note: It does not support Date and Timestamp fields.

Configuring NA processor for Spark pipelines

To add NA Processor into your pipeline, drag the processor to the canvas and right click on it to configure.

Field

Description

Input Field

Select the field from drop-down list that needs to be replaced.

Value

It will replace the null values of input field with this value.

ADD FIELD

Additional fields can be added using ADD FIELD link.

Click on the NEXT button. Enter the notes in the space provided.

Click SAVE for saving the configuration details.

PII Masking

PII or Personally Identifiable Information can be defined as any real details which could be used to identify or impersonate with a particular person. Common examples of PII are dates of birth, IP addresses, credit card details, passport numbers or full names. gathr users can leverage the PII auto detection and Masking functionality to mask sensitive data.

In gathr, the batch and streaming incoming data can be identified based on regular expressions defined in the PII xml file which can be further masked.


In gathr, you can enable the PII Masking functionality under the Schema Type tab in any data source.

PII_1

Also, a PII Masking processor will be automatically added in the pipeline flow on the canvas. The processor will have details of the columns selected for PII Masking from the incoming data of the source file.

PII_2

Note:

- The columns that have been detected as PII Masked will appear highlighted in blue color under the detected current schema tab.

- Option to enable/disable PII Masking is available under (each PII Masking column) gear icon.

PII_3

- Supported file formats for PII Masking are CSV, Parquet, JSON, Avro, and XML.

To configure PII Masking Processor select the processor and join it with the pipeline.

The configuration to PII Masking Processor is explained in detail below.

Here, in the pipeline for example, the option for PII Masking has been enabled using Expression Evaluator processor.

PII_4

Now, as shown in the below image, the columns can be enabled for PII masking current schema can be enabled for PII Masking.

Once the option ‘Enable PII Masking’ is selected by clicking the gear icon, a PII Masking processor will get auto-connected to the pipeline as shown below:

PII_05

Configuring PII Masking processor:

PII_6

Under the Select Output field, the columns that have been enabled for PII Masking in the schema will be available in the drop-down list.

Select Output Field and provide character under the Add Masking Character column to mask the details of the schema. The Mask Type options are mentioned below:

Field

Description

All

Selects all the characters for masking.

Alternate Character

Selects alternative character for masking.

Head Characters

Select characters from the beginning of the data in the selected column for masking.


User needs to provide the number of characters that needs to be masked from the beginning.

Trailing Characters

Select characters from the end of the string (right most part of the string) of the data in the selected column for masking. User needs to provide the number of characters that needs to be masked from end of the string.

Pivot

Pivot is an aggregation processor where one of the grouping columns have distinct values which later gets transposed to form individual columns. These column values further help in data analysis and reporting.

Configuring Pivot processor for Spark pipelines:

To add Pivot processor into your pipeline, drag the processor to the canvas and right click on it to configure.

Field

Description

Grouping Fields

All fields of selected message will be displayed here.Select the fields on which groping has to be done.

Pivot Column

All columns of selected message will be listed here.Select the column which is to be considered as Pivot column.

Pivot Values

Enter the comma separated values which are to be transposed to different columns.

Type of aggregation

All aggregation functions will be listed here. Select the function which is to be applied to the fields.

Aggregation Column

All columns of selected message will be listed here. Select the column on which aggregation is to be applied.

Time Window

Time window options.

None: Does not apply the window operation on data.

Fixed Window: Window length and sliding interval remains same.

Sliding Window: Enables to configure the window length and sliding interval.

Window Duration

Time window in seconds.

Slide Duration

Window slide duration in seconds.

Event Column

Message field of type time stamp.

ADD CONFIGURATION

Additional properties can be added using ADD CONFIGURATION link.

Click on the NEXT button. You can view the new schema based on your pivot configuration on Schema tab.

Click SAVE for getting the final output of pivot processor.

Example:

Let us say, you have a dataset comprising of five records having four fields: “OrderId”, “Product”, “Category” and “Amount”.

You wish to group on the basis of field “Product”, pivot on column “Category” and type of aggregation to be applied is “SUM” on column “Amount”.

The Pivot processor applies the aggregation operation(SUM) on the Pivot column and creates pivot table.

Since, you have applied the operation “SUM” on column “Amount”, it will calculate the sum of different vegetables and fruits.

Sum of the fruits banana is 300, vegetable broccoli is 400 whereas carrot is 100.

If no amount is found for a particular vegetable, it will show null values against the product.

Processor Group

In the components panel, under the Processors tab, select the Processor Group component and configure it.

Note: To understand more about the Processor Group, see Processor Group.

Field

Description

Processor Group

Under this tab, all the processor groups will be enlisted.

Input Channel(s)

Input channel which was selected in the processor group page.

Input Dataset

The data set(s) selected in the data pipeline page.

You will have an accordion view of the mapped input channel(s) with the incoming dataset(s).

The processor group input source name is mapped against the input dataset.

All the fields within the input channel listed in the table are mapped against the columns of the corresponding dataset. Here, you can select column(s) of the dataset or write an expression against the field(s) of the input channel.

Note: In the processor group configuration panel the input channel(s) that were selected in the processor group page are mapped/configured against the input dataset(s) chosen in the data pipeline page.

Click Next to detect schema and click Done.

The user can connect any processor/emitter.

Common scenarios where Processor Groups could be used

1. If the user has a single input data source – processor group(s) – Emitter(s).

2. If the user has multiple input data sources – processor group(s) – Emitter(s).

Python

The Python processor allows you to perform following operations:

• Write custom Python Spark code for defining transformations on Spark DataFrames.

• Write custom Python code for processing input records at runtime.

Gathr provides support for Python 3. Multiple version support enables a python processor to run on different python versions.

Configuring Python processor for Spark pipelines

To add a Python processor into your pipeline, drag the processor on the canvas and right click on it to configure. To select a version from Python 3, select either from Python Version property.

Field

Description

Input Source

Input type to the custom Python function that will be called by Gathr.


The input provided to the custom python function will be a DataFrame. The custom function is expected to return a DataFrame after any processing.

Input Type

There are two ways for configuring the Python processor:

Inline: This option enables you to write Python code in text editor.

If selected, you will view one additional field Python Code.


Upload: This option enables you to upload single and multiple python scripts(.py files) and python packages (.egg/.zip files).

You have to specify module name (should be part of uploaded files or package) and method name that will be called by python processor.


When you select Upload, UPLOAD FILE option appears on the screen, browse and select the files that need to be used in python processor.


One additional field Import Module will also appear on the screen, if option selected is Upload

Python Code

Enables to write custom Python code directly on text editor.

Import Module

Specify module name which contains function that will be called by python processor. Here you will get list of all uploaded files in drop down list.


The drop down list will show only.py files. You can also write a module name if it does not appear in drop down list

Function Name

Name of the python function that is defined in Inline Python code or uploaded script.

Python Version

Select a version from Python 3.

To use Python 3 in python processor, write code or upload script which is compatible with Python 3.

Add Configuration

Enables to add Additional properties.

Note: Code snippets contains sample codes for your reference.

To pass configuration parameters in Python processor.

You can provide configuration parameters in Python processor in form of key value pair. These parameters will be available in form of dictionary in function given in Function Name field as second argument. So function given in field Function Name will take two arguments: (df, config_map)

Where first argument will be dataframe and second argument will be a dictionary that contains configuration parameters as key value pair.

In below example HDFS path of trained model is provided in configuration parameter with key as model_path’ and same is used in predictionFunction.

If there is any error/exception in python code, an error/exception is generated.

Example:

A simple program is written in text area in Python processor. Here the variable model is used but not defined in program, hence an exception with message ‘global name model is not defined’ is displayed on the screen.

Click on the NEXT button after specifying the values for all the fields.

Enter the notes in the space provided.

Click on the SAVE button after entering all the details.

Rank

Use rank functions (rank, dense_rank, percent_rank, ntile, row_number) while configuring the Rank processor.

Rank Processor computes the rank of a value in a group of values. The result is one plus the number of rows preceding or equal to the current row in the ordering of the partition.

dense_rank() - Computes the rank of a value in a group of values. The result is one plus the previously assigned rank value. Unlike the function rank, dense_rank will not produce gaps in the rankings.

percent_rank() - Computes the percentage ranking of a value in a group of values.

ntile(n) - Divides the rows for each window partition into n buckets ranging from 1 to at most n .

row_number() - Assigns a unique, sequential number to each row, starting with one, according to the ordering of rows within the window partition.

Configuring Rank processor for Spark pipelines

To add a Rank processor into your pipeline, drag the processor on the pipeline canvas and right click on it configure.

Field

Description

Rank Function Type

This drop down has five options (rank, dense_rank, percent_rank, ntile, row_number).

Input Fields

This is the list of columns that you want to calculate the rank for.

Output Fields

This is the list of columns in which masked value of selected column will be stored. New Column name can be added to this field (This column will be added to dataset.)

Group by

This radio button enables grouping the of the input fields.

Grouping Fields

This is the list of columns that you want to be grouped.

You can select multiple columns from input field for them masking simultaneously.

Click on NEXT button. Enter the notes in the space provided. Click on the SAVE button for saving the configuration details.

Repartition

The Repartition processor changes how pipeline data is partitioned by dividing large datasets into multiple parts.

The Repartition processor is used when you want to increase or decrease the parallelism in an executor. The number of parallelism maps to number of tasks running in an executor. It creates either more or fewer partitions to balance data across them and shuffles data over the network.

For example, you can use single partition if you wish to write data in a single file. Multiple partitions can be used for writing data in multiple files.

Configuring Repartition processor for Spark pipelines

To add a Repartition processor into your pipeline, drag the processor on the pipeline canvas and right click on it to configure

Field

Description

Partition By

The user can select the repartition type either on the basis of number. column or expression

Number

Upon selecting Number as an option to repartition, the user can enter value for the number of executors (threads) of a processor or channel.

Column

Partition Columns

Upon selecting Column as an option to repartition, the user requires to select the columns/fields from on which the partition is to be done.

Partition Number

Enter the value for number of executors (threads) of a processor/channel.

Expression

Partition Expression

Upon selecting Expression as an option to repartition, the user must enter the expression value according to which repartition is to be done.

Partition Number

Enter value for the number of executors (threads) of a processor/channel.

Enter the number of partitions in the Parallelism field.

Click on the NEXT button. Enter the notes in the space provided.

Click on the Done button after entering all the details.

Register as Table

This processor is used for fetching historical data from any streaming or batch source and that data gets registered as a table. This table can be referred further if user has to perform some queries on registered data sources.

User can fetch tables from ADLS, Cassandra, Couchbase, Redshift, HDFS, Hive, JDBC, Snowflake, File, Incoming Message, GCS, BigQuery and S3.

For an example, the user has historical data on HDFS which contains information about different departments of an organization. The user can register that data using Register as Table processor and that registered table can be used in SQL processor to fetch number of employees in an organization.

Configuring Register as Table processor for Spark pipelines

To add a Register as Table processor into your pipeline, drag the processor on the canvas and click on it to configure.

If the data source is selected as ADLS, then there will be the following fields:

Field

Description

Data Source

Select the ADLS source from where historical data is to be read.

Available options to fetch tables from are:

ADLS, Cassandra, Couchbase, Redshift, HDFS, Hive, JDBC, Snowflake, File, Incoming Message, GCS, BigQuery and S3.

If the option selected is incoming message, the output of sources connected before this processor will act as an incoming message for Register As Table Processor.

In case of File Data Source, file data will be registered as a table.

Internally file will be uploaded in default HDFS.

Table Name

Name with which the table is to be registered.

Connection Name

All available connections are listed here. Select a connection from where data is to be read.

Data Format

Historical data format. Available options are:

- CSV

- JSON

- Parquet

- ORC

- avro

- text

Delimiter

Select the delimiter of historical data format.For example, if your historical data of type CSV is separated by comma(,) select delimiter as (,Comma)

Container Name

ADLS container name from which the data should be read.

ADLS Path

Provide directory path for ADLS file system.

Cache Table

Enable caching of registered tables within application memory.

If this option is selected then the table will be read only once after it is registered.

This option is used to cache the registered table data and can be used for all available data sources. It is helpful in batch pipelines where the data is used at multiple places.

Max no of rows

Provide maximum number of rows included.

Is Header Included

If the first row of the file is header then mark this field else leave it unmarked.

Post Query

Provide post query. Example: where column=value order by column desc limit 2

There is an option to further register table by clicking the +Register Table button. User can also add Environment Params by clicking at the +ADD PARAM button.

If data source selected is HDFS, there will be following fields:

Field

Description

Data Source

Select the HDFS source from where historical data is to be read.

If the option selected is incoming message, output of sources connected before this processor will act as an incoming message for Register As Table Processor.

In case of File Data Source, file data will be registered as a table.

Internally file will be uploaded in default HDFS.

Table Name

Specify a name for the table which is to be registered.

Connection Name

All available connections are listed here. Select a connection from where data is to be read.

Data Format

Historical data format.

Delimiter

Select the delimiter of historical data format.For example, if your historical data of type CSV is separated by comma(,) select delimiter as (,Comma)

HDFS Path

HDFS path where data is stored.

Cache Table

If this option is selected then the table will be read only once after it is registered.

This option is used to cache the registered table data and can be used for all available data sources. It is helpful in batch pipelines where the data is used at multiple places.

Is Header Included

Select the checkbox if first row of data file is header else leave it unchecked.

If you select the Data Source as HIVE or JDBC, there will be two additional fields:

• Database Table Name

• Execute Query

 

Field

Description

Database Table Name

Table from where data will be fetched.

If option selected is “Database Table Name”, specify a name for the table.

Execute Query

If option selected is “Execute Query”, you can write your custom query.

Output of this query will be stored in current Spark session.

If the user selects Snowflake, the below mentioned field will be additional

.

Field

Description

Connection Name

The user will be required to provide the connection name for creating connection.

Warehouse Name

Provide the warehouse name against this column.

Schema Name

Provide the schema name against this column.

Note: The user can provide the database table name or provide a query.

If Data Source selected is S3, there will be one additional field:

• Bucket Name:

Field

Description

Bucket Name

S3 bucket name from where data will be read.

If the option selected is incoming message, output of sources connected before this processor will act as an incoming message for Register As Table processor.

You need to specify the name with which table is to be registered after fetching the data.

If you select the Data Source as Cassandra, there will be two additional fields:

• KeySpace Name

• Cassandra Table Name

Field

Description

KeySpace Name

Cassandra Key Space Name

Cassandra Table Name

Table name inside the keyspace from where we read data.

If you select the Data Source as Couchbase, there will be an additional field:

• Bucket Name

Field

Description

Bucket Name

Couchbase Bucket Name.

If you select the Data Source as Redshift, there will be a few additional fields, depending on the two following options:

l Database Table Name

Field

Description

Database Table Name

Name of the table from where data is to be fetched.

Max no. of Rows

Specify the maximum number of rows.

l Execute Query

Field

Description

Database Table Name

Name of the Redshift table from where data is to be fetched.

Max no. of Rows

Write a custom query.

Output of this query is stored in existing Spark session.

Rename

Rename processor enables renaming the Fields. The renamed columns will be reflected in the Schema.

Configuring Rename processor for Spark pipelines

To add a Rename processor into your pipeline, drag the processor to the canvas and right click on it to configure as explained below.

Click on NEXT button. Enter the notes in the space provided. Click on the SAVE button for saving the configuration details.

Router

Router enables routing multiple data streams from a Router Processor to any emitter/processor according to the route selected.

Configuring Router processor for Spark pipelines

To add a Router processor into your pipeline, drag the processor to the canvas and right click on it to configure.

The properties shown in above image are explained below:

Field

Description

Name

Name of the Expression

Expression

Expression field tests each row against the given valid expression which ultimately evaluates to a Boolean value.

For example if expression has value (id>2 and country = 'India') then only those rows which is having id is greater than 2 and country is equals to India will come as an output.

Output Column

Columns which needs to be routed as per the given expression.

Sagemaker

Amazon SageMaker allows to build, train, and deploy machine learning models quickly. Amazon SageMaker is a fully-managed service that covers the entire machine learning workflow to label and prepare your data. Choose an algorithm, train the algorithm, tune and optimize it for deployment, make predictions, and take action. Your models get to production faster with much less effort and lower cost. Once you have trained the model over Sagemaker, you can deploy them as endpoint which can be further utilized in StreamAnalytix for prediction over incoming data.

Configuring Sagemaker processor for Spark pipelines

To add a SageMaker processor into the pipeline, drag the processor to the canvas and right click on it to configure as explained below:


Field

Description

Connection Name

Mention the SageMaker Connection Name.

Region

Specify the region name where end point is served. For.e.g., us-west-2.

End Point

It lists all the endpoint created in the region mentioned above.

Feature Column

All the columns, which serves as input to the model deployed as endpoint over SageMaker.

Note: The type of all columns should be double which are mentioned in Feature Column Section.

Algorithm Name

Name of the model algorithm, which is getting served as endpoint over Sagemaker.

Score

Select the score.

Predicted Label

Select the predicted label.

Out of all built in supported algorithms from Sagemaker, user can utilize below mentioned algorithms endpoints for scoring over incoming data using Gathr.

1. Linear Learner Binary Classification

2. Linear Learner Regression

3. Kmeans

4. PCA

5. Tensorflow Classification

6. XGBoost

StreamAnalytix SageMaker supports Custom model as well. For example, one can train model-using scikit-learn api and then deploy that model as endpoint over SageMaker for prediction. Once endpoint is in In-service state it can be used for prediction in Gathr.

The below table has Algorithms with their corresponding output field tabs:


Field

Description

Linear Learner Binary Classification

Score, Predicted Label.

Linear Learner Regression

Score.

KMeans

ClusterLabel, DistanceToCluster.

PCA

Projection.

XGBoost

Prediction.

Tensorflow

Classification

Score, OutputClass.

Custom

Prediction.

Select

The Select processor is used to retrieve specific columns from the dataset.

On configuration settings page, specify the schema fields that you wish to retrieve in the output.

For example, you entered the fields: OrderId, ShipVia and Freight.

Only these three fields will flow to the next component which is connected in the pipeline.

Click on NEXT button. Enter the notes in the space provided. Click on the SAVE button for saving the configuration details.

SQL

SQL Processor allows you to run SQL queries over streaming data and registered tables. It provides a common way for accessing different data sources.

For example, if you want to analyze weather data stream continuously and find the average temperature which is recorded every minute, you can use the SQL query. The output would be stream of new records showing average temperature recordings.

Configuring SQL processor for Spark pipelines

To add an SQL processor into your pipeline, drag the processor to the canvas and right click on it to configure as explained below:

Field

Description

Download entered queries JSON

User has an option to download the JSON queries.

ADD CONFIGURATION

User can add configuration by clicking the ADD CONFIGURATION option.

Table Name

Displays all the registered tables. Select the table on which query is to be performed.

Read queries from S3

Check mark the checkbox if user wants to read the queries from S3. User will be required to validate the S3 queries.

Note:

User can download the Sample S3 queries JSON and S3 queries JSON.

S3 Connection

Select the S3 connection name.

Bucket Name

Specify the bucket name

Path

Specify the path.

Note:

- After entering values for S3 Connection, Bucket Name and Path, click Fetch S3 Queries button.

- If the Read Queries from S3 checkbox option is unchecked, user can add query by following the below steps under the Query section.

Query Name

Provide name for each query to register query result as a dynamic table.

Skip While Inspect

Select this option to skip the query while inspect.

If a SQl has CREATE/DROP/DELETE/UPDATE statements, it is recommended to skip it. As this will get executed over actual tables/Datasets.

Note: If select query is skipped, it will not return any dataset to be used further for pipeline creation.

Query

Provides option of auto suggestion that enables to write query instantaneously.


Press CTRL + space in the query text box.

It will display all the options like Table name, schema name, schema fields, query operations for writing the query.


Examples:


Click SELECT from the list and write the query.

SELECT * from Kafka_657752


Click Delete from the list and write the query

Delete Emp_table


Accordingly, select other options from the auto suggestion list and enter the query.

ADD QUERY

Click this button to add multiple queries that can be executed over different tables.

Click on the NEXT button. Enter the notes in the space provided.

Click SAVE for saving the configuration details.

Scala Processor

Scala is a new language and is based on Java Virtual machine compatibility. You can code Scala in the similar manner as you code Java and has minimal prerequisites. Writing code in Scala is much faster and easy to use.

The Scala processor is used for writing custom code in Scala language.

Configuring Scala processor for Spark pipelines

To add a Scala processor into your pipeline, drag the processor on the canvas and right click on it to configure.

 

Field

Description

Package Name

Package for the Scala Implementation class.

Class Name

Class name for the Scala implementation.

Imports

Import statements for the Scala implementation.

Input Source

Input Source for the Scala Code:

Scala Code

Scala code implementation to perform the operation on JSON RDD

Next, click on the Jar Upload tab. Jar file is generated when you build Scala code. You can upload the third party jars from here so that the API's can be utilized in Scala code by adding the import statement.

Click on the NEXT button. Enter the notes in the space provided.

Click SAVE for saving the configuration details.

Schema Flattener

By using Schema Flattener user can flatten the incoming JSON as per the meta data which is provided by using a S3/RDS/Upload feature.

Field

Description

Meta JSON Source

The Meta JSON files will be fetched from the source. Pipeline restart will be required once the source file is updated. User can select one of the below options and provide further required configurations:

- S3

- RDS

- UPLOAD META FILE

Upon selecting S3 as Meta JSON Source, provide the below details:

S3 Connection

Select the connection name for creating a connection.

S3 Bucket Name

Provide the S3 bucket name.

S3 Path

Provide S3 bucket path.

S3 Protocol

Select the S3 protocol from the drop down list.

Below protocols are supported for various versions:

- For HDP versions, S3a protocol is supported.

- For CDH versions, S3a protocol is supported.

- For Apache versions, S3n protocol is supported.

- For GCP, S3n and S3a protocol is supported.

- For EMR S3, S3n, and S3a protocol is supported.

- For AWS Databricks, s3a protocol is supported.

Remove Name Space

User has an option to remove name space by choosing Yes/No from the drop down.

Error Column

User can optionally add a new column that will contain error/null values.

Note: User can add further configurations by clicking at the ADD CONFIGURATION button.

Upon selecting RDS as Meta JSON Source, provide the below details:

RDS Connection

Provide the RDS connection name for creating a connection.

Schema Name

Provide the schema name.

Table Name

Provide the table name.

Meta Data Name

Provide the name for metadata column.

Remove Name Space

User has an option to remove name space by choosing Yes/No from the drop down.

Error Column

User can optionally add a new column that will contain error/null values.

Note: User can add further configurations by clicking at the ADD CONFIGURATION button.

Upon selecting UPLOAD META FILE as Meta JSON Source, provide the below details:

Upload File

Upload the meta file.

Remove Name Space

User has an option to remove name space by choosing Yes/No from the drop down.

Error Column

User can optionally add a new column that will contain error/null values.

Note: User can add further configurations by clicking at the ADD CONFIGURATION button.

Schema Transformer

Schema Transformer transforms the desired schema in a format which is expected at the next processor. In other words, you can use this processor in those pipelines where you want to transform the incoming schema.

You can use schema transformation processor for performing cast, rename and projection operation on incoming schema collectively.

As shown in the figure below, you can edit the Column Alias and set a new Data Type. The transformed schema will further flow into the pipeline and the new schema will be available on the next processor.

The checkbox allows you to select the columns that can be projected on to the next operator.

Configuring Schema Transformer processor for Spark pipelines

To add a Schema Transformer processor into your pipeline, drag the processor on the canvas and right click on it to configure.

Click on the NEXT button. Enter the notes in the space provided.

Click DONE for saving the configuration details.

Sequence Generator

Sequence Generator processor allows you to generate a unique id for all the records generated by the processor. In other words, for generation of sequence in pipeline.

This processor will generate sequence in a field which can be used further in a pipeline.

Configuring Sequence processor for Spark pipelines

To add a Sequence processor into your pipeline, drag the processor to the canvas and right click on it to configure. There are three options to generate the type of sequence for the records.

Field

Description

Type

Sequence: generates numeric sequence of values.

Composite Key: The combination of fields selected in order will serve as primary key

UUID: The generated random alphanumeric 128 bit sequences.

Output Field Name

Output Field name.

Now, in case of Sequence, the following properties will be generated:

Field

Description

Type

Sequence: Generates numeric sequence of values.

Output Field Name

Output Field name.

Start Value

First value to be generated, default is 0.

Increment By

Increment the value by the value entered in this field. Default is 1.

Cycle

If enabled, sequence values cycle through the range of start to end values. If disabled tasks will fail if end value reached and rows remain to process.

End Value

Maximum value to be generated (inclusive) after that value from start value will be generated.

Reset

If Reset is selected, the sequence will be generated after the last sequence (value) is generated. This field will not be a part of auto-inspection.

In case of Composite Key, the following properties will be generated:

Field

Description

Type

Composite Key: The combination of fields selected in order will serve as primary key.

Output Field Name

Output Field name.

Select Fields

Keys will be created in the order of the selection of the fields.

Field Separator

Separator for the field values.

Snowflake

Snowflake is a cloud-based data warehouse system. The user can use Snowflake as a processor in Gathr for ELT pipelines.

Field

Description

Connection Name

The user will be required to mention the connection name for creating connection.

Warehouse Name

Specify the warehouse name.

Schema Name

Mention the schema name.

Query Type

The user can select the type of query that he wants to write. i.e either Static query or Dynamic query.

Static query contains normal SQL query while as in dynamic query the user can use an existing dataset columns using @ as shown in the image above.

Query

The user will be required to specify the static value/dynamic value, which ever he selects in the query type.

Note: A static query will run once to update the query. while the dynamic type will update the query at run time.

Sort

Sorting is arranging the data in ascending or descending order.

For example, you can use this processor for sorting the height of students in ascending order.

Configuring Sort processor for Spark pipelines

To add a Sort processor into your pipeline, drag the processor to the canvas and right click on it to configure.

 

Field

Description

Sort Key

Select a field on which sorting is to be applied.

Sort Order

Order of sorting can be either, Ascending or Descending.

Click on the NEXT button.

 Enter the notes in the space provided.

Click SAVE for saving the configuration details.

Note: In Batch pipelines, Sort processor can be used without Aggregation processor.Whereas in streaming pipelines, Sort processor can be used only after Aggregation processor.

Stored Procedure Processor

The Stored Procedure processor is used to run a set of SQL queries. The user can now invoke stored procedure and functions defined in the database within Gathr. While the input data is passed as an argument to functions and stored procedures, the resultant output can be used to enrich the incoming data.

Instead of sending multiple SQL statements to the database server, an application can send a set of queries in what is called a stored procedure.

A function is a set of SQL statements that perform some operation and return a result.

Field

Description

Connection Name

Choose the database by selecting a connection from the connection name drop down. Supports Oracle and MSQL.

Operation

The two types of operations available are: Stored Procedure and Functions. Upon selection Stored procedure, the user will get to choose from the Stored procedures available in the database. While as, if the user opts for Functions; he can choose the functions. (as shown in the below image)

User can map the operation input with the source and can add subsequent configuration.

Ignore in Inspect

By selecting this option, the updated records will not be changed during inspect.

Stored Procedures

Stored Procedure/Function – Select a procedure or function from the drop down.

TextPatternToField

Configuration details of TextPatternToField processor are provided below:

Field

Description

Input String with Placeholder

Select the text pattern input string source.

The available options are:

Input String: Provide the input string.

Upload File: Upload the local file to provide the input string source.

Pattern

Enter the text pattern of the placeholders.

Output Column name

Provide the output column name for field.

ADD CONFIGURATION

User can add further configurations by providing values after clicking the ADD CONFIGURATION option.

Environment Params

User can add further environment parameters as per requirement.



TopNRecords

Top-N queries are queries that limit the result to a specific number of rows. These are often queries for the most recent or the “best” entries of a result set. For efficient execution, the ranking must be done with a pipelined order by. You can get Top-N records based on a sorting criterion.

Top N Records works with Batch Pipeline only.

Configuring TopNRecords processor for Spark pipelines

To add a TopNRecords processor into your pipeline, drag the processor to the canvas and right click on it to configure.

Field

Description

Top N Rows

Number of Top Rows that will be processed.

Sorting Fields

Select a field on which sorting is to be applied.

Sort Order

Order of sorting can be either, Ascending or Descending.

Click Next to detect schema and Save the configuration.

Union

Union processor is not configurable.

The Union operation is used when you want to combine elements of two datasets.

Example to demonstrate how union works.

Input Set 1

{name:xyz,rating:7}

{name:abc,rating:9}

{name:klm,rating:5}

Input Set 2

{name:xyz,rating:7}

{name:abc,rating:9}

{name:abc,rating:9}

{name:klm,rating:6}

Output Set after applying Union Operation

{name:xyz,rating:7}

{name:abc,rating:9}

{name:klm,rating:5}

{name:xyz,rating:7}

{name:abc,rating:9}

{name:abc,rating:9}

{name:klm,rating:6}

Watermark

Watermark processor provides Spark Watermarking feature for late data handling and Stream- Stream Joins. The user can specify event column and delay for the incoming data to enable watermark on the dataset.

Configuring TopNRecords processor for Spark pipelines

Field

Description

Event Time Column

Select the Event Time Column. Event Time column is the timestamp field from incoming data on the basis of which the can enable watermarking.

Delay

Enter values for the delay fields.

The user can add further Configuration by clicking at the ADD CONFIGURATION button. Click Next to proceed further.

XML Parser

To add an XML Parser processor into your pipeline, drag the processor to the canvas and click on it to configure.

Field

Description

Evaluation

Select option for XML parse and validation. Choose one of the below options:

- XPATH

- XSD Validation

- XSD Validation And Drop Invalid XML Tags.

XPATH: If the user selects XPATH option, fill the below fields:

XPATH

Provide value for XPATH.

XML Data Column

Provide value for XML Data Column.

Include Input XML Column

The user can opt to include the input XML data in extra output column. Select one of the options available in the drop-down list as per user’s requirement:

- Always

- Only with Invalid XML

-Only with Valid XML

- Never

Note: The user can add further configurations by clicking at the ADD CONFIGURATION button.

XSD Validation: If the user selects XSD option, fill the below fields:

XSD Source

The user has options to provide the XSD source file either by selecting the HDFS or Upload XSD from the drop-down list.

If the user selects HDFS option, fill the below fields:

Connection Name

Provide the connection name for creating connection. User can select default connection.

HDFS Path

Provide the HDFS file path.

Note: The HDFS path should include XSD file name.

If XSD file has an import statement, in that case the imported file must be at parallel path with extention .xsd

Error Column

User can optionally add a new column that will contain error/null values.

Input XML Data Column

Select the Input XML Data Column from the drop-down list.

Output XML Data Column

Select the Output XML Data Column from the drop-down list.

Note: User can add further configurations by clicking at the ADD CONFIGURATION button.

If the user selects Upload XSD option, then Upload XSD file and provide Error Column, Input XML Data Column and Output XML Data Column details. User can also add further configurations by clicking at the ADD CONFIGURATION option.

XSD Validation And Drop Invalid XML Tags: If the user selects XSD Validation And Drop Invalid XML Tags option, fill the below fields:

XSD Source

The user has options to provide the XSD source file either by selecting the HDFS or Upload XSD from the drop-down list.

If the user selects HDFS option, fill the below fields:

Connection Name

Provide the connection name for creating connection. User can select default connection.

HDFS Path

Provide the HDFS file path.

Note: The HDFS path should include XSD file name.

If XSD file has an import statement, in that case the imported file must be at parallel path with extention.xsd

Error Column

User can optionally add a new column that will contain error/null values.

Input XML Data Column

Select the Input XML Data Column from the drop-down list.

Output XML Data Column

Select the Output XML Data Column from the drop-down list.

Note: User can add further configurations by clicking at the ADD CONFIGURATION button.

If the user selects Upload XSD option, then Upload XSD file and provide Error Column, Input XML Data Column and Output XML Data Column details. User can also add further configurations by clicking at the ADD CONFIGURATION option.

XSLT

By using the XSLT processor, user can transform incoming XML input to another transformed XML by using an XSL file from S3 or HDFS.

To add an XSLT processor into your pipeline, drag the processor to the canvas and click on it to configure.


Field

Description

XSL Connection Type

The user can select connection type from the drop-down. XSL will be fetched from given types; HDFS, S3 and Upload XSL File.

Note: Pipeline restart will be required if XSL source is updated.

If user selects HDFS as XSL Connection Type, fill the below fields:

HDFS Connection

Provide the connection name for creating connection. User can select default connection.

HDFS Path

Provide the absolute HDFS path for XSL.

Input Field

Select the input field from the drop-down list or add the input field.

Output Field

Select the output field from the drop-down list or add the required output.

Error Column

User can optionally add a new column that will contain error/null values.

Splitter

Provide value for splitter that will split the output of XSLT into new records.

If user selects S3 as XSL Connection Type, fill the below fields:

S3 Connection

Provide S3 connection name for creating connection.

S3 Bucket Name

Provide value for S3 bucket name.

S3 Path

Provide the absolute S3 path for XSL.

S3 Protocol

Select the S3 protocol from the drop down list.

Below protocols are supported for various versions:

- For HDP versions, S3a protocol is supported.

- For CDH versions, S3a protocol is supported.

- For Apache versions, S3n protocol is supported.

- For GCP, S3n and S3a protocol is supported.

- For EMR S3, S3n, and S3a protocol is supported.

- For AWS Databricks, s3a protocol is supported.

Input Field

Select the input field from the drop-down list or add the input field.

Output Field

Select the output field from the drop-down list or add the required output.

Error Column

User can optionally add a new column that will contain error/null values.

Splitter

Provide value for splitter that will split the output of XSLT into new records.

If user selects Upload XSL FILE as XSL Connection Type, fill the below fields:

Upload File

Upload the XSL file.

Input Field

Select the input field from the drop-down list or add the input field.

Output Field

Select the output field from the drop-down list or add the required output.

Error Column

User can optionally add a new column that will contain error/null values.

Splitter

Provide value for splitter that will split the output of XSLT into new records.