Emitters

Emitters define the destination stage of a pipeline which could be a NoSQL store, Indexer, relational database, or third party BI tool.

Emitters

Post Action Tab

Under the Post Action tab, if you want to perform certain actions

You can provide sql and stored procedure queries that can run to perform actions like update, delete or insert data from the database during pipeline run.

Note: All batch pipelines will have Post-Action at the emitter.

These actions could be performed on the Emitter:

Field

Description

SQL

The user can execute SQL from the actions option by selecting the connection at which he wants to execute the SQL and provide the query which he wants to execute.

Shell Script

The user can invoke a Shell Script as an Action.

To invoke the shell script there are two ways to provide the input:

- Writing the shell script in the inline editor

- The user can upload the shell script file to execute it.

Stored Procedure

With this option user can execute a stored procedure or function stored in a database.

To do so, the user has to select an appropriate JDBC connection.

Further he would be required to select the relevant stored procedure appearing in the drop-down menu.

Note: 

- If for some reason the action configured fails to execute, the user has an option to check mark the ‘Ignore Action Error’ option so that the pipeline runs without getting impacted.

- By check marking the ‘Ignore while execution’ option, the configuration will remain intact in the pipeline, but the configured action will not get executed.

- The user can also configure multiple actions by clicking at the Add Action button.

ADLS

Add an ADLS batch or streaming data source to create a pipeline. Click the component to configure it.

Under the Schema Type tab, select Fetch From Source or Upload Data File. Edit the schema if required and click next to configure.

Provide the below fields to configure ADLS data source:

Field

Description

Save As Dataset

Check mark the checkbox to save the schema as dataset.

Scope

Select the scope of dataset as Project or Workspace.

Dataset Name

Provide the dataset name.

Access Option

ADLS access option to access data lake storage using DBFS mount point or directly access the container and folder path.

Connection Name

Connections are the Service identifiers.

Select the connection name from the available list of connections, from where you would like to read the data.

Container

Provide the ADLS container name in which the transformed data should be emitted.

Path

Provide the directory path for ADLS file system.

Output Type

Select the output format in which the results will be processed. The available output type options are: Avro, Delimited, JSON, Parquet, ORC and xml.

Based on the selected output type, the supported compression algorithms will be selected.

Delimiter

This option is available upon selecting the output type as Delimited to select the message field separator type.

Output Fields

Select fields that need to be a part of the output data.

Partitioning Required

If checked, data will be partitioned.

Partition Columns

Select fields on which data will be partitioned.

Save Mode

Save Mode is used to specify the expected behavior of saving data to a data sink.


ErrorifExist: When persisting data, if the data already exists, an exception is expected to be thrown.


Append: When persisting data, if data/table already exists, contents of the Schema are expected to be appended to existing data.


Overwrite: When persisting data, if data/table already exists, existing data is expected to be overwritten by the contents of the Data.


Ignore: When persisting data, if data/table already exists, the save operation is expected to not save the contents of the Data and to not change the existing data.


This is similar to a CREATE TABLE IF NOT EXISTS in SQL.

Compression Type

Supported algorithm used to compress the data.

Based on the above selected Output Type, the supported compression algorithms will be available under Compression Type drop-down list.

The list of Supported Compression Type Algorithms with the selected Output Type is mentioned below:

Output Type:

Compression Type:

Avro

None

Deflate

BZIP2

SNAPPY

X2

Delimited

None

Deflate

GZIP

BZIP2

SNAPPY

LZ4

JSON

None

Deflate

GZIP

BZIP2

SNAPPY

LZ4

Parquet

None

GZIP

LZO

SNAPPY

LZ4

ORC

None

LZO

SNAPPY

ZLIB

ADD CONFIGURATIONS

User can add further configurations (Optional).

Note:

Add various Spark configurations as per requirement. For example: Perform imputation by clicking the ADD CONFIGURATION button. Note: For imputation replace nullValue/emptyValue with the entered value across the data. (Optional)

Example: nullValue =123, the output will replace all null values with 123

Environment Params

User can add further environment parameters. (Optional)

Advanced Kafka

Advance Kafka Emitter stores data to Advance Kafka cluster. Data format supported are JSON and DELIMITED.

Configuring Advance Kafka Emitter for Spark Pipelines

To add a Advance Kafka Emitter into your pipeline, drag the Advance Kafka to the canvas, connect it to a Data Source or processor, and right click on it to configure.

Note: If the data source in pipeline has a streaming component, then the emitter will show four additional properties, Checkpoint Storage Location; Checkpoint Connections; Checkpoint Directory; and Time-Based checkpoint.

   

Field

Description

Connection Name

All Kafka connections will be listed here. Select a connection for connecting to Advance Kafka.

Topic Name

Advance Kafka topic name where you want to emit data.

Partitions

Number of partitions to be created for a topic. Each partition is ordered, immutable sequence of messages that is continually appended to a commit log.

Replication Factor

For a topic with replication factor N, Advance Kafka will tolerate up to N-1 failures without losing any messages committed to the log.

Output Format

Data Type format of the output message.

Output Fields

Fields in the message that needs to be a part of the output data.

Advance Kafka Partitioner

Round Robin(Default): Advance Kafka uses the default partition mechanism to distribute the data.

Key Based: Advance Kafka partitioning is done on the basis of keys.

Custom: Enables to write custom partition class by implementing the com.Gathr.framework.api.spark.partitioner.SAXAdvance Kafka Partitioner interface. The partition method contains logic to calculate the destination partition and returns the target partition number

Enable TTL

Select the checkbox to enable TTL(Time to Live) for records to persist for that time duration

Checkpoint Storage Location

Select the checkpointing storage location. Available options are HDFS, S3, and EFS.

Checkpoint Connections

Select the connection. Connections are listed corresponding to the selected storage location.

Checkpoint Directory

It is the path where Spark Application stores the checkpointing data.


For HDFS and EFS, enter the relative path like /user/hadoop/, checkpointingDir system will add suitable prefix by itself.

For S3, enter an absolute path like: S3://BucketName/checkpointingDir

Time-Based checkpoint

Select checkbox to enable timebased checkpoint on each pipeline run i.e. in each pipeline run above provided checkpoint location will be appended with current time in millis.

Output Mode

Output mode to be used while writing the data to Streaming emitter. Select the output mode from the given three options:

Append: Output Mode in which only the new rows in the streaming data will be written to the sink

Complete Mode: Output Mode in which all the rows in the streaming data will be written to the sink every time there are some updates.

Update Mode: Output Mode in which only the rows that were updated in the streaming data will be written to the sink every time there are some updates.

Enable Trigger

Trigger defines how frequently a streaming query should be executed.

Processing Time

It will appear only when Enable Trigger checkbox is selected. Processing Time is the trigger time interval in minutes or seconds.

Click on the Next button. Enter the notes in the space provided.

Click on the DONE button for saving the configuration.

Advance Redshift

Advanced Redshift works for Batch Datasets, which also signifies that it will only with a Batch Datasource. It uses S3 temp directory to unload data into Redshift database table.

Configuring Advanced Redshift Emitter for Spark Pipelines

To add a Advanced Redshift emitter into your pipeline, drag the emitter on the canvas and connect it to a Data Source or processor. Right click on the emitter to configure it as explained below:

Note: If the data source in pipeline has a streaming component, then the emitter will show four additional properties, Checkpoint Storage Location; Checkpoint Connections; Checkpoint Directory; and Time-Based checkpoint.

Field

Description

Connection Name

All Redshift connections will be listed here. Select a connection for connecting to Redshift

S3 Connection Name

All S3 connections will be listed here.Select a connection for connection for S3 temp dir.

S3 Temp Directory

A writable location in Amazon S3, to be used for unloaded data when data to be loaded into Redshift when writing. Example:- s3n://anirkhe/NYC,where anirkhe is S3 bucket name

Message Name

Message used in the pipeline

Table Name

Existing database tablename whose schema is to be fetched.

Connection Retries

Number of retries for component connection

Delay Between Connection Retries

Defines the retry delay intervals for component connection in millis.

Save Mode

Save Mode is used to specify the expected behavior of saving data to a data sink.

ErrorifExist: When persisting data, if the data already exists, an exception is expected to be thrown.

Append: When persisting data, if data/table already exists, contents of the Schema are expected to be appended to existing data.

Overwrite: When persisting data, if data/table already exists, existing data is expected to be overwritten by the contents of the Data.

Ignore: When persisting data, if data/table already exists, the save operation is expected to not save the contents of the Data and to not change the existing data.

This is similar to a CREATE TABLE IF NOT EXISTS in SQL.

ADD CONFIGURATION

Enables to configure additional custom properties.

Click on the Next button. Enter the notes in the space provided.

Click on the DONE button for saving the configuration.

AWS IOT

The AWS IOT emiter allows you to push data on AWS IOT Topics on MQTT client.

Configuring AWS IOT emitter for Spark pipelines

To add an AWS IOT emitter into your pipeline, drag the emitter to the canvas and connect it to a Data Source or processor:

Note: If the data source in pipeline has a streaming component, then the emitter will show four additional properties, Checkpoint Storage Location; Checkpoint Connections; Checkpoint Directory; and Time-Based checkpoint.

Field

Description

Connection Name

All connections will be listed here. Select a connection for connecting to AWS IOT.

IOT Topic Name

Name of the IOT Topic where data will be published.

Message Timeout

The timeout duration for messages.

Output Format

Datatype format of the output.

Output Fields

Fields in the message that needs to be a part of the output data.

Checkpoint Storage Location

Select the checkpointing storage location. Available options are HDFS, S3, and EFS.

Checkpoint Connections

Select the connection. Connections are listed corresponding to the selected storage location.

Checkpoint Directory

It is the path where Spark Application stores the checkpointing data.


For HDFS and EFS, enter the relative path like /user/hadoop/, checkpointingDir system will add suitable prefix by itself.

For S3, enter an absolute path like: S3://BucketName/checkpointingDir

Output Mode

Output mode to be used while writing the data to Streaming emitter. Select the output mode from the given three options:

Append: Output Mode in which only the new rows in the streaming data will be written to the sink

Complete Mode: Output Mode in which all the rows in the streaming data will be written to the sink every time there are some updates

Update Mode: Output Mode in which only the rows that were updated in the streaming data will be written to the sink every time there are some updates.

Enable Trigger

Trigger defines how frequently a streaming query should be executed.

Processing Time

It will appear only when Enable Trigger checkbox is selected. Processing Time is the trigger time interval in minutes or seconds.

Add Configuration

Enables to configure additional properties.

Click on the Next button. Enter the notes in the space provided.

Click on the DONE button for saving the configuration.

Azure Blob

On a Blob Emitter you should be able to write data to different formats (json, csv, orc, parquet, and more) of data to blob containers by specifying directory path.

Configuring Azure Blob emitter for Spark pipelines

To add an Azure Blob emitter into your pipeline, drag the emitter to the canvas and connect it to a Data Source or processor.

Note: If the data source in pipeline has a streaming component, then the emitter will show four additional properties, Checkpoint Storage Location; Checkpoint Connections; Checkpoint Directory; and Time-Based checkpoint.

The configuration settings are as follows:

Field

Description

Connection Name

All connections will be listed here. Select a connection for connecting to Azure Blob.

Container

Azure Blob Container Name.

Path

Sub-directories of the container mentioned above to which data is to be written.

Output Type

Output format in which result will be processed.

Delimiter

Message Field separator.

Output Fields

Select the fields that needs to be included in the output data.

Partioning Required

If checked, data will be partitioned.

Save Mode

Save mode specifies how to handle the existing data.

Output Mode

Output mode to be used while writing the data to Streaming emitter. Select the output mode from the given three options:

Append: Output Mode in which only the new rows in the streaming data will be written to the sink.

Complete Mode: Output Mode in which all the rows in the streaming data will be written to the sink every time there are some updates.

Update Mode: Output Mode in which only the rows that were updated in the streaming data will be written to the sink every time there are some updates.

Checkpoint Storage Location

Select the checkpointing storage location. Available options are HDFS, S3, and EFS.

Checkpoint Connections

Select the connection. Connections are listed corresponding to the selected storage location.

Checkpoint Directory

It is the path where Spark Application stores the checkpointing data.


For HDFS and EFS, enter the relative path like /user/hadoop/, checkpointingDir system will add suitable prefix by itself.

For S3, enter an absolute path like: S3://BucketName/checkpointingDir

Time-Based Check Point

Select checkbox to enable timebased checkpoint on each pipeline run i.e. in each pipeline run above provided checkpoint location will be appended with current time in millis.

Enable Trigger

Trigger defines how frequently a streaming query should be executed.

Processing Time

It will appear only when Enable Trigger checkbox is selected. Processing Time is the trigger time interval in minutes or seconds.

Add Configuration

Enables to configure additional properties.

Note:

Add various Spark configurations as per requirement. For example: Perform imputation by clicking the ADD CONFIGURATION button. Note: For imputation replace nullValue/emptyValue with the entered value across the data. (Optional)


Example: nullValue =123, the output will replace all null values with 123

Click on the Next button. Enter the notes in the space provided.

Click on the DONE button for saving the configuration.

Batch Emitter

Batch emitter allows you to write your own custom code for processing batch data as per the logic written in the custom emitter.

In case, you want to use any other emitter which is not provided by Gathr, you can make use of this emitter.

For example, if you want to store data to HDFS, you can write your own custom code and store the data.

Configuring Custom Emitter for Spark Pipelines

To add a custom emitter into your pipeline, drag the custom emitter to the canvas, connect it to a Data Source or processor, and right click on it to configure.

Note: If the data source in pipeline has a streaming component, then the emitter will show four additional properties, Checkpoint Storage Location; Checkpoint Connections; Checkpoint Directory; and Time-Based checkpoint.

Field

Description

Emitter Plugin

Provide fully qualified class name which implements Gathr CustomSSEmitter interface.


You can download sample project from Gathr UI (Data Pipeline page) and refer SampleCustomEmitter class.

Checkpoint Storage Location

Select the checkpointing storage location. Available options are HDFS, S3, and EFS.

Checkpoint Connections

Select the connection. Connections are listed corresponding to the selected storage location.

Checkpoint Directory

It is the path where Spark Application stores the checkpointing data.


For HDFS and EFS, enter the relative path like /user/hadoop/, checkpointingDir system will add suitable prefix by itself.

For S3, enter an absolute path like: S3://BucketName/checkpointingDir

Time-based Check Point

Select checkbox to enable timebased checkpoint on each pipeline run i.e. in each pipeline run above provided checkpoint location will be appended with current time in millis.

Enable Trigger

Trigger defines how frequently a streaming query should be executed.

ADD CONFIGURATION

Enables to configure additional properties.

Here is a small snippet of sample code that is used for writing data to HDFS.

public class CustomSinkProvider implements StreamSinkProvider, DataSourceRegister { 


public Sink createSink(SQLContext sqlContext, Map<String, String> options, Seq<String> partitionCols, OutputMode outputMode) {


return new MySink("mysink1");
}

public String shortName() {
return "mysink";
}

}


public class MySink implements Sink {

String message;

public MySink(String msg) {
message = msg;
}

public void addBatch(long batchId, Dataset<Row> dataset) {
ds.write().format("json").option("path", "localhost:/user/sax/blankTest/custom/data").save();

 

}

Click on the Next button. Enter the notes in the space provided.

Click on the DONE button for saving the configuration.

Big Query

The configuration of Big Query Emitter is mentioned below:

Field

Description

Save as Dataset

Select checkbox to save schema as dataset. Mention the dataset name.

Connection Name

Mention the connection name for creating connection.

Override Credentials

Check this option to override credentials for user specific actions.

Note: Upload the GCS service account key.

Message Name

Select the name for the message configuration which will act as metadata for actual data.

BigQuery Dataset Name

Select the existing dataset name in BigQuery.

Big Query Table Name

Select the existing table name of specified BigQuery dataset.

TimeStamp Column

Enter value for timestamp column that is used for tracking load time in BigQuery streaming pipeline.

Save Mode

Select save mode:

Append, Overwrite or Ignore. It specifies how to handle existing data, if present.

GCS Bucket

Provide the intermediate GCS location for BigQuery table.

Note:

The user can add further configurations by clicking at the ADD CONFIGURATION button.

Schema Results:

- Under this section, the user can view the data that he is getting and which column he wants to save it to. The user can view: Table Column Name, Mapping Value, Database Datatype, Mode and Ignore All checkbox.

- The user can download or upload Mapping.

Container

The Container Emitter is used to sink the data in Couchbase.

To add a Container Emitter into your pipeline, drag the Emitter to the canvas and right click on it to configure.

Note: If the data source in pipeline has a streaming component, then the emitter will show four additional properties, Checkpoint Storage Location; Checkpoint Connections; Checkpoint Directory; and Time-Based checkpoint.

Field

Description

Load From Template

Select the template from the list.

Only those templates will be visible in the drop down list that are created globally or within that workspace.

Sync With Source Template

When a template is saved and is edited, this tab is reflected. If any change is made in source template, corresponding changes will be reflected in Container’s configuration.

Connection Name

Name of the connection.

Bucket Name

It can also be called ‘Couchbase Bucket name’. Select the Couchbase bucket that gets generated with the Couchbase connection.

Max Buckets per cluster is 10, by default.

Memory Quota

Memory Quota in megabytes per server node. Memory quota cannot be less than 100 MB.

Replicate View Indexes

By checking the Replicate view indexes checkbox, you ensure that views indexes, as well as data, are replicated.

Flush

When Flushed, all items in the bucket are removed as soon as possible, through some rest calls.

Document Id

Unique document key for Couchbase to persist the document in Couchbase. If document id is not unique then document with the same id gets overridden every time.

Checkpoint Storage Location

Select the checkpointing storage location. Available options are HDFS, S3, and EFS.

Checkpoint Connections

Select the connection. Connections are listed corresponding to the selected storage location.

Checkpoint Directory

It is the path where Spark Application stores the checkpointing data.


For HDFS and EFS, enter the relative path like /user/hadoop/, checkpointingDir system will add suitable prefix by itself.

For S3, enter an absolute path like: S3://BucketName/checkpointingDir

Time-Based Check Point

Select checkbox to enable timebased checkpoint on each pipeline run i.e. in each pipeline run above provided checkpoint location will be appended with current time in millis.

Retention Enable

When selected, each newly created item lives for the number of seconds specified by retention policy. After the expiration time is reached item will be deleted by Couchbase server.

Retention Policy

TTL for the document to be persisted in the Couchbase.

Output Fields

Select the fields that needs to be included in the output data.

Output Mode

Output mode to be used while writing the data to Streaming emitter. Select the output mode from the given three options:

Append: Output Mode in which only the new rows in the streaming data will be written to the sink

Complete Mode: Output Mode in which all the rows in the streaming data will be written to the sink every time there are some updates

Update Mode: Output Mode in which only the rows that were updated in the streaming data will be written to the sink every time there are some updates.

Enable Trigger

Trigger defines how frequently a streaming query should be executed.

Processing Time

It will appear only when Enable Trigger checkbox is selected. Processing Time is the trigger time interval in minutes or seconds.

Click on the NEXT button. An option to Save as Template will be available. Add notes in the space provided and click on Save as Template

Choose the scope of the Template (Global or Workspace) and Click Done for saving the Template and configuration details.

Cosmos

On a Cosmos Emitter you should be able to emit data into different containers of selected Cosmos database.

In case of Streaming Cosmos Channel and Batch Cosmos channel, the cosmos emitter has different properties, explained below:

Configuring Cosmos for Spark Pipelines

To add a Cosmos Emitter into your pipeline, drag the Emitter to the canvas, connect it to a Data Source or processor, and click on it to configure.

Note: If the data source in pipeline is streaming Cosmos then the emitter will show four additional properties: Checkpoint Storage Location, Checkpoint Connections, Checkpoint Directory, and Time-Based checkpoint.

Field

Description

Connection Name

Select the connection name from the available list of connections, from where you would like to read the data.

Override Credentials

Unchecked by default, check mark the checkbox to override credentials for user specific actions.

Key

Provide the Azure Cosmos DB key. Click TEST Connection to test the execution of the connection.

Database

Select the Cosmos Database from the drop-down list.

Container

Select the Cosmos Database from the drop-down list.

Write Strategy

Select the Cosmos DB Write Strategy from the drop down list.

Item Overwrite: When persisting data, if data/table already exists, existing data is expected to be overwritten by the contents of the Data using Upsert.

Item Append: For persisting data, if data/table already exists, contents of the Schema are expected to be appended to existing data.

ItemDelete: Option to delete the data.

Output Fields

Fields in the message that needs to be a part of the output data.

Upsert

If set to True, the item with existing ids gets updated and if it does not exist, it gets created.

Note: In case if you choose to provide a write strategy, then Upsert option will not be available.

Also, the Upsert option is available for Spark 2.4.

Connection Retries

Number of retries for component connection.

Delay Between Connection Retries

Defines the retry delay intervals for component connection in millis.

Checkpoint Storage Location

Select the checkpointing storage location. Available options are HDFS, S3, and EFS.

Checkpoint Connections

Select the connection. Connections are listed corresponding to the selected storage location.

Checkpoint Directory

It is the path where Spark Application stores the checkpointing data.


For HDFS and EFS, enter the relative path like /user/hadoop/, checkpointingDir system will add suitable prefix by itself.

For S3, enter an absolute path like: S3://BucketName/checkpointingDir

Time-Based Check Point

Select checkbox to enable timebased checkpoint on each pipeline run i.e. in each pipeline run above provided checkpoint location will be appended with current time in millis.

Output Mode

Output mode to be used while writing the data to Streaming sink.


Select the output mode from the given three options:

Append: Output Mode in which only the new rows in the streaming data will be written to the sink.

Save Mode

Save Mode is used to specify the expected behavior of saving data to a data sink.

Append: When persisting data, if data/table already exists, contents of the Schema are expected to be appended to existing data.

Writing Batch Size

Define the writing batch size for writing to cosmos.

Enable Trigger

Trigger defines how frequently a streaming query should be executed.

Processing Time

It will appear only when Enable Trigger checkbox is selected.

Processing Time is the trigger time interval in minutes or seconds.

ADD CONFIGURATION

Additional properties can be added.

Delta

On a Delta Lake Emitter, you should be able to emit data on to HDFS or S3 or DBFS in delta lake.

All data in Delta Lake is stored in Apache Parquet format enabling Delta Lake to leverage the efficient compression and encoding schemes that are native to Parquet.

Delta Lake can handle petabyte-scale tables with billions of partitions and files at ease.

Configuring Delta for Spark Pipelines

To add a Delta Emitter into your pipeline, drag the Emitter; to the canvas, connect it to a Data Source or processor, and right click on it to configure:

Note: If the data source in pipeline has a streaming component, then the emitter will show four additional properties, Checkpoint Storage Location; Checkpoint Connections; Checkpoint Directory; and Time-Based checkpoint.

Field

Description

Emitter Type

Data Lake to which the data is emitted. For emitting the delta file the available options in the drop down list are: HDFS, S3, DBFS and ADLS.

Connection Name

Connection Name for creating connection.

Provide below fields if the user selects HDFS emitter type:

Connection Name

Select the connection name from the available list of connections, from where you would like to read the data.

Override Credentials

Unchecked by default, check mark the checkbox to override credentials for user specific actions.

Username

Once the Override Credentials option is checked, provide the user name through which the Hadoop service is running.

HDFS File Path

Provide the file path of HDFS file system.

Output Fields

Select the fields that needs to be included in the output data.

Partitioning Required

Check mark the check box if the data is to be partitioned.

Note: If Streaming data source is used in the pipeline along with Aggregation without watermark then it is recommended not to use Append as output mode.

Partitioned Column

If Partitioning Required field is checked, then select the fields on which data will be partitioned.

Save Mode

Save Mode is used to specify the expected behavior of saving data to a data sink.

ErrorifExist: When persisting data, if the data already exists, an exception is expected to be thrown.

Append: When persisting data, if data/table already exists, contents of the Schema are expected to be appended to existing data.

Overwrite: When persisting data, if data/table already exists, existing data is expected to be overwritten by the contents of the Data.

Ignore: When persisting data, if data/table already exists, the save operation is expected to not save the contents of the Data and to not change the existing data.

This is similar to a CREATE TABLE IF NOT EXISTS in SQL.

Provide below fields if the user selects S3 emitter type:

Connection Name

Select the connection name from the available list of connections, from where you would like to read the data.

Override Credentials

Unchecked by default, check mark the checkbox to override credentials for user specific actions.

AWS Key Id

Provide the S3 account access key.

Secret Access Key

Provide the S3 account secret key.

Note: Once the AWS Key Id and Secret Access Key is provided, user has an option to test the connection.

S3 Protocol

Select the S3 protocol from the drop down list. Below protocols are supported for various versions when user selects S3 connection type:

- For HDP versions, S3a protocol is supported.

- For CDH versions, S3a protocol is supported.

- For Apache versions, S3n protocol is supported.

- For GCP, S3n and S3a protocol is supported.

- For Azure S3n protocol is supported. Read/Write to Mumbai and Ohio regions is not supported.

- For EMR S3, S3n, and S3a protocol is supported.

- For AWS Databricks, s3a protocol is supported.

Bucket Name

Provide the S3 bucket name.

Path

Provide the sub-directories of the bucket name on which the data is to be written.

Output Fields

Select the fields that needs to be included in the output data.

Partitioning Required

Check mark the check box if the data is to be partitioned.

Note: If Streaming data source is used in the pipeline along with Aggregation without watermark then it is recommended not to use Append as output mode.

Partitioned Column

If Partitioning Required field is checked, then select the fields on which data will be partitioned.

Save Mode

Save Mode is used to specify the expected behavior of saving data to a data sink.

ErrorifExist: When persisting data, if the data already exists, an exception is expected to be thrown.

Append: When persisting data, if data/table already exists, contents of the Schema are expected to be appended to existing data.

Overwrite: When persisting data, if data/table already exists, existing data is expected to be overwritten by the contents of the Data.

Ignore: When persisting data, if data/table already exists, the save operation is expected to not save the contents of the Data and to not change the existing data.

This is similar to a CREATE TABLE IF NOT EXISTS in SQL.

Provide below fields if the user selects DBFS emitter type:

Connection Name

Select the connection name from the available list of connections, from where you would like to read the data.

Override Credentials

Unchecked by default, check mark the checkbox to override credentials for user specific actions.

Directory Path

Provide the DBFS parent path for check-pointing.

DBFS File Path

Provide the DBFS file path.

Output Fields

Select the fields that needs to be included in the output data.

Partitioning Required

Check mark the check box if the data is to be partitioned.

Note: If Streaming data source is used in the pipeline along with Aggregation without watermark then it is recommended not to use Append as output mode.

Partitioned Column

If Partitioning Required field is checked, then select the fields on which data will be partitioned.

Save Mode

Save Mode is used to specify the expected behavior of saving data to a data sink.

ErrorifExist: When persisting data, if the data already exists, an exception is expected to be thrown.

Append: When persisting data, if data/table already exists, contents of the Schema are expected to be appended to existing data.

Overwrite: When persisting data, if data/table already exists, existing data is expected to be overwritten by the contents of the Data.

Ignore: When persisting data, if data/table already exists, the save operation is expected to not save the contents of the Data and to not change the existing data.

This is similar to a CREATE TABLE IF NOT EXISTS in SQL.

Provide below fields if the user selects ADLS emitter type:

Connection Name

Select the connection name from the available list of connections, from where you would like to read the data.

Container Name

Provide container name for azure delta lake storage.

ADLS File Path

Provide the directory path for azure delta lake storage file system.

Output Fields

Select the fields that needs to be included in the output data.

Partitioning Required

Check mark the check box if the data is to be partitioned.

Note: If Streaming data source is used in the pipeline along with Aggregation without watermark then it is recommended not to use Append as output mode.

Partitioned Column

If Partitioning Required field is checked, then select the fields on which data will be partitioned.

Save Mode

Save Mode is used to specify the expected behavior of saving data to a data sink.

ErrorifExist: When persisting data, if the data already exists, an exception is expected to be thrown.

Append: When persisting data, if data/table already exists, contents of the Schema are expected to be appended to existing data.

Overwrite: When persisting data, if data/table already exists, existing data is expected to be overwritten by the contents of the Data.

Ignore: When persisting data, if data/table already exists, the save operation is expected to not save the contents of the Data and to not change the existing data.

This is similar to a CREATE TABLE IF NOT EXISTS in SQL.

ADD CONFIGURATION

To add additional custom properties in key-value pairs.

ADD PARAM

User can add further environment parameters. (Optional)

Dummy

Dummy emitters are required in cases where the pipeline has a processor that does not require an emitter. For example, in case of a custom processor, if the data is to be indexed on elastic search, we may not require an emitter in such a case. However, with Gathr it is a mandate to have an emitter in the pipeline. In such a scenarios, you can use a Dummy Emitter so that you can test the processors without the requirement of emitting the data using an actual emitter.

In the configuration window of a Dummy Emitter enter the value. The user can ADD CONFIGURATION and click Next.

EventBridge

Gathr user has an option to write data into Amazon EventBridge. This emitter supports both batch as well as streaming data sources. Configuration details are provided below:

Field

Description

Connection Name

Connection name for creating connection.

Select option to specify Source

A string that identifies the source of the event.

Note:

- Static Value: You will be required to provide a value for source.

- Select a Field: Select a field that will be used to evaluate source values.

Select option to specify Detail Type

Use Static Value option to assign a constant string. Use Select a Field option to assign value from a field.


Note:

- Static Value: You will be required to provide a value for source.

- Select a Field: Select a value that will be used to evaluate source values.

Specify Event Bus

An event bus is a pipeline that receives events.


Available options are:


- Static Value: You will be required to provide the input value for event bus.

- Select Field: You will be required to select a field which will be used to evaluate resource values.

Specify Resource

Use Static Value option to assign a constant string or use Select a Field option to assign value from a field.

Static Value

You will be required to provide the input string value for resource.

Select a Field

You will be required to select a field which will be used to evaluate Resource values. The selected field must be of

string json array type

For Example: ["resource1", "resource2"]

Specify Time

Use None option to leave it blank or use Current Time option to assign a current GMT time or use Select a Field option to assign value from a field.


Note: Selected field must be of epoch time. For Example: 1672916671

Detail

Fields in the message that needs to be a part of the output data.

Convert Detail fields to JSON

Select String fields in the message that needs to be parsed as JSON objects.

Skip Field Name as Key in Detail

Upon checking this option, the field name will be excluded in Detail as key. For an example: If the field name is ‘event’ then, Detail will be {payload}.

If this option remains unchecked then, Detail will be {event : {payload}}.

Throttle Control

Maximum Parallel Events

The maximum number of events that can be published by the application in parallel.

Maximum entries in a PutEvent Request

Maximum number of events that can be published in a single putevent request.

Write Modes (for streaming pipeline only)

Output Mode

The output mode specifies how to write the data. (In case of streaming source)

Available options are:

- Append

- Complete

Checkpoint Storage Location

DBFS checkpoint storage is not supported when the pipeline is configured on EMR.

Checkpoint Connections

Select the connection that are listed corresponding to the selected storage location.

Override Credentials

Override credential for user specific actions.

Upon checking this option you will need to provide username through which the Hadoop service is running.

You can test the connection by clicking at the TEST CONNECTION button.

Checkpoint Directory

It is the path where the Spark application stores the checkpointing data. For HDFS and EFS (AWS environment), enter the relative path like /user/hadoop/checkpointing/Dir system will add suitable prefix by itself. For S3, enter an absolute path like S3://BucketName/checkpointingDir

Time-based Check Point

Check the option to enable the time-based checkpoint on each pipeline run i.e., in each pipeline run the above provided checkpoint location will be appended with current time in milliseconds.

Enable Trigger

Check this option to define how frequently a streaming query should be executed. Upon checking this option, provide Trigger Type as explained below:

Trigger Type

Select one of the options available from the drop-down:

- One-Time Micro-Batch

- Fixed Interval Micro-Batches

Upon selecting the Fixed Interval Micro- Batches provide the below details:

Processing Time

Provide the value for processing time in Hour(s)/Minute(s)/Seconds.

Retry on failure

Enable Retry Mechanism

Check the option to retry the job after any interruption while writing data. The job will resume from the point where it was interrupted.

Retry Count

Specify the number of times to retry jobs in case of any interruption.

Retry Delay

Specify the retry delay in milliseconds. This is the time that application should wait before retrying.

Stop on Failure

Upon checking this option, the application will fail as soon as it encounters the first failed event (after completing retries if any).

If this option remains unchecked, then the application will continue to publish further events in input dataset.

ADD CONFIGURATION

User can add further configurations (Optional).

Environment Params

Click the + ADD PARAM button to add further parameters as key-value pair.

File Writer

A local file emitter can save data to local file system. Local file System is the File System where Gathr is deployed.

Configure File Writer Emitter for Spark Pipelines

To add a File Writer emitter to your pipeline, drag it to the canvas, connect it to a Data Source or processor, and right click on it to configure.

Note: If the data source in pipeline has a streaming component, then the emitter will show four additional properties, Checkpoint Storage Location; Checkpoint Connections; Checkpoint Directory; and Time-Based checkpoint.

Field

Description

File Path

Path where the data file will be read/saved. This is where you can use the Scope Variable using @. To know more about the same, read about Scope Variable.

Output Fields

Fields to be included in the Output File.

Output Format

Format in which the output file will be saved.

Delimiter

Separator to separate different fields.

Checkpoint Storage Location

Select the check pointing storage location. Available options are HDFS, S3, and EFS.

Checkpoint Connections

Select the connection. Connections are listed corresponding to the selected storage location.

Checkpoint Directory

It is the path where Spark Application stores the checkpointing data.


For HDFS and EFS, enter the relative path like /user/hadoop/, checkpointingDir system will add suitable prefix by itself.

For S3, enter an absolute path like: S3://BucketName/checkpointingDir

Time-Based Check Point

Select checkbox to enable timebased checkpoint on each pipeline run i.e. in each pipeline run above provided checkpoint location will be appended with current time in millis.

Output Mode

Mode in which File writer will run. Output mode to be used while writing the data to Streaming emitter. Select the output mode from the given three options:

Append: Output Mode in which only the new rows in the streaming data will be written to the sink.

Complete Mode: Output Mode in which all the rows in the streaming data will be written to the sink every time there are some updates.

Update Mode: Output Mode in which only the rows that were updated in the streaming data will be written to the sink every time there are some updates.

ADD CONFIGURATION

Enables to configure additional properties.

HTTP

HTTP Emitter allows you to emit data into different APIs.

For example: Consider that you have Employee Information available in JSON, TEXT or CSV format and you would like to update information for any employee, but, also save the same on HTTP emitter. This can be done using PUT or POST method.

Configuration

The configuration settings of the HTTP target are as follows:

Field

Description

URI

HTTP or HTTPS URI to send request to a resource.

The URL defined can either be provided with actual values or can be parameterized. The placeholders in case of parameterized inputs will be resolved during runtime as per the value of column name existing in the URL.

Example: http://localhost:1234/${column-name-1}/{column-name-2}

Method Type

You can choose a request method type based on the HTTP URL.

The following method types are supported:

GET: Use this method to retrieve information from the given server using a URI. The GET requests will only retrieve data without having any other effect on the data.

POST: Use this method to send data to the server using HTML forms.

PUT: Use this method to replace all current representations of the target resource with the uploaded content.

DELETE: Use this method to remove all current representations of the target resource given by a URI.

Note: In POST, PUT and DELETE, the user provides the request body in the below format:

{'key':'value'}

Enable SSL

This option can be selected as True if SSL is enabled on Http. It is set to False by default.

If selected as true, keystore/certificate file needs to be uploaded or the keystore file path should be provided in the respective configuration fields.

Keystore select option

Option to choose between a certificate file or a keystore file that you will need to upload for authenticating SSL connectivity.

Header

Header’s parameter name and value.

Accept Type

The content type that the URI accepts should be selected out of JSON, CSV or Text.

Auth Type

Used to specify the authorization type associated with the URL. Includes None, Basic, Token and OAuth2 as authorization types. All the types are explained below in detail.

None: This option specify that URL can be accessed without any authentication.

Basic: This option specify that accessing URL requires Basic Authorization. Provide user name and password for accessing the URL.

Token Based: Token-based authentication is a security technique that authenticates the users who attempts to log in to a server, a network, or other secure system, using a security token provided by the server.

OAuth2

Oauth2: It is an authentication technique in which application gets a token that authorizes access to the user's account.

If Auth Type is selected as Basic, proceed by providing inputs for below parameters:

Username

Enter the user name for accessing the URI.

Password

Enter the password for accessing the URl.

If Auth Type is selected as Token, proceed by providing inputs for below parameters:

Token ID

Key with which token is referred in request.

Token

Token which is provided in order to access the URL.

If Auth Type is selected as OAuth2, proceed by providing inputs for below parameters:

Client ID

Client ID identifier, given to client during application registration process.

Secret Key

The secret key provided to the client during application registration process.

Auth URL

The endpoint for authorization server, which retrieves authorization code.

Header

Used to specify the headers associated with Auth Url, through which authorization code is generated.

Message Name

The name of the message configuration which will act as a metadata for the actual data.

Output Fields

Fields that need to be a part of the output data should be selected.

Note: Make sure that the column(s) existing in the URL should be selected in the output fields.

For creating a customized payload, please use the Custom Payload option.

Custom Payload

Option to provide the customized payload.

Payload

Provide the payload in the format that supports the Accept Type field.

Example: If Accept Type field is selected as JSON, then provide the payload as per the sample given below:

{“emp_id” : ${id}, “emp_name” : $name}

will be evaluated to

{“emp_id” : 1, “emp_name” : John}

Enable Request Rate

Control the rate at which HTTP calls are made on each partition.

Note: The HTTP calls are made in parallel on multiple partitions. Therefore, set the number of partitions using the repartition processor before using the HTTP emitter.

Example: For a client, the API hit rate limit is set to 40 calls/sec. Add a repartition processor before HTTP emitter and provide the partitioning value as 10. Now, set the rate limit to 4 requests per second, which will apply to each partition. This means 10 parallel partitions will be working with 4 requests per second on each partition allowing 40 requests per second on the HTTP emitter.


JDBC

JDBC Emitter allows you to push data to relational databases like MySQL, PostgreSQL, Oracle DB and MS-SQL.

JDBC emitter also enables you to configure data on DB2 database using JDBC emitter for both batch and stream.

It is enriched with lookup functionality for the DB2 database so that you can enrich fields with external data read from DB2. Select a DB2 connection while configuring JDBC emitter.

Configuring JDBC Emitter for Spark pipelines

To add a JDBC emitter into your pipeline, drag the JDBC emitter on the canvas and connect it to a Data Source or processor. The configuration settings of the JDBC emitter are as follows:

Note: If the data source in pipeline has a streaming component, then the emitter will show four additional properties, Checkpoint Storage Location; Checkpoint Connections; Checkpoint Directory; and Time-Based checkpoint.

 

Field

Description

Save as Dataset

Save the JDBC emitter configuration as a Dataset.

Dataset Name

Provide a unique name to the Dataset.

Connection Name

All JDBC connections will be listed here. Select a connection for connecting to JDBC.

Message Name

Name of the message configuration that acts as metadata.

Schema Name

Existing database Schema Names whose tables are fetched. (for MSSQL, DB2 and POSTGRES)

Table Name

Existing tablename of the specified database.

Is Batch Enable

Enable parameter to batch multiple messages and improve write performances.

Batch Size

Batch Size, which determines how many rows to insert per round trip. This can help the performance on JDBC drivers. This option applies only to writing. It defaults to 1000.

Connection Retries

Number of retries for component connection.

Delay Between Connection Retries

Defines the retry delay intervals for component connection in millis.

Save Mode

Save Mode is used to specify the expected behavior of saving data to a data sink.

Append: When persisting data, if data/table already exists, contents of the Schema are expected to be appended to existing data.

Overwrite: When persisting data, if data/table already exists, existing data is expected to be overwritten by the contents of the Data.

Ignore: When persisting data, if data/table already exists, the save operation is expected to not save the contents of the Data and to not change the existing data.

This is similar to a CREATE TABLE IF NOT EXISTS in SQL.

Check Point Directory

It is the HDFS Path where the Spark application stores the checkpoint data.

Output Mode

Output Mode is used to what data will be written to a streaming sink when there is new data available.


Select the output mode from the given three options:

Append: Output Mode in which only the new rows in the streaming data will be written to the sink

Complete Mode: Output Mode in which all the rows in the streaming data will be written to the sink every time there are some updates.

Update Mode: Output Mode in which only the rows that were updated in the streaming data will be written to the sink every time there are some updates.

Enable Trigger

Trigger defines how frequently a streaming query should be executed.

Processing Time

It will appear only when Enable Trigger checkbox is selected.

Processing Time is the trigger time interval in minutes or seconds.

ADD CONFIGURATION

Enables to configure custom properties.

Provide spark-mssql-connector configuration for JDBC Emitter.

Provide the below configurations:


sqlConnectorName - com.microsoft.sqlserver.jdbc.spark


tableLock – false


schemaCheckEnabled - false


Note: If the above configuration is not provided then the emitter will use plain JDBC for writing data.

Schema Results: Map the values coming in pipeline with any table columns name.

Table Column Name

The columns from the selected table are populated here.

Mapping Value

Enter a mapping value to the corresponding column.

Database Data Type

The data type of the value, i.e., String, Int, Text and so on..

Ignore All

Use Ignore All or selected fields while pushing data to emitter.

Note: With the upgraded Spark3; complex data types like array, map, set, having empty values will be stored/read as null. Both inspection and execution of JDBC/CSV will contain NULL value in case of missing/empty values. In case of String, user can manually use expression evaluator to change null value into empty string same as Spark2 behavior.

Sample Gathr Expression Evaluator:

(_replaceMissingValues(<Col Name>, '', 'replaceNullAndEmpty', string))

Click on the NEXT button. Enter the notes in the space provided. Click DONE for saving the configuration.

Kafka

Kafka emitter stores data to Kafka cluster. Data format supported are JSON and DELIMITED.

Configuring Kafka Emitter for Spark Pipelines

To add a Kafka emitter into your pipeline, drag the Kafka emitter on the canvas and connect it to a Data Source or Processor. The configuration settings of the Kafka emitter are mentioned below.

Note: If the data source in pipeline has a streaming component, then the emitter will show four additional properties, Checkpoint Storage Location; Checkpoint Connections; Checkpoint Directory; and Time-Based checkpoint.

Field

Description

Connection Name

All Kafka connections will be listed here.Select a connection for connecting to Kafka.

Topic Name Type

Topic Name Type could be either Static or Dynamic.

Static: When you select Static, field “Topic Name” is pertained as a text box. Provide a unique topic name to emit data.


Dynamic: With Dynamic, topic name will be populated with a drop down of the field names of incoming dataset. On selecting any of the field name, it will create topics based on the value of the selected field name in the topic name. (which means at runtime it will create topics dynamically using the selected field name values.)

Topic Name

Kafka topic name where you want to emit data.


In case of Dynamic Topic Name Type, at runtime the Kafka Emitter will take the value of the selected field as a topic name for each incoming records.

Partitions

Number of partitions to be created for a topic. Each partition is ordered, immutable sequence of messages that is continually appended to a commit log.

Replication Factor

For a topic with replication factor N, Kafka will tolerate up to N-1 failures without losing any messages committed to the log.

Producer Type

Specifies whether the messages are sent asynchronously or synchronously in a background thread. Valid values are async for asynchronous send and sync for synchronous send

Output Format

Data type format of the output message. Three types of output format are available:

JSON

Delimited

AVRO

Avro Schema Source

Select the Source of Avro Schema:

Input Text Schema: Manually enter the text schema in the field, Input Schema.


Schema Registry: It allows you to enter a schema registry subject name for Avro schema, in the field, Subject Name.


Auto Generate Schema: Schema generated is automatic based on the previous incoming dataset.

Output Fields

Message fields which will be a part of output data.

Message Key

The type of key you want to store in Kafka along with the data. It can be stored in the following four types:

Field Value: The values of the fields present in dataset concatenated by "#".

Field Value Hash: The hash value of the values of the fields present in dataset concatenated by "#"

Static: The static value to be put as key in kafka along with data.

UUID: UUID to be put as a key along with data in kafka.

Kafka Partitioner

Kafka uses the default partition mechanism to distribute the data.


Key Based: kafka partitioning is done on the basis of keys.


Custom: Enables to write custom partition class by implementing the com.Gathr.framework.api.spark.partitioner.SAXKafkaPartitioner interface.

The partition method contains logic to calculate the destination partition and returns the target partition number.

Enable TTL

Select the checkbox to enable TTL(Time to Live) for records to persist for that time duration.

Checkpoint Directory

Location where the checkpoint data is stored.

Output Mode

Output mode to be used while writing the data to Streaming emitter.

Select the output mode from the given three options:

Append:

Output Mode in which only the new rows in the streaming data will be written to the sink


Complete Mode:

Output Mode in which all the rows in the streaming data will be written to the sink every time there are some updates


Update Mode:

Output Mode in which only the rows that were updated in the streaming data will be written to the sink every time there are some updates.

Enable Trigger

Trigger defines how frequently a streaming query should be executed.

Processing Time

It will appear only when Enable Trigger checkbox is selected.

Processing Time is the trigger time interval in minutes or seconds.

Kinesis

Kinesis emitter emits data to Amazon Kinesis stream. Supported data type formats of the output are Json and Delimited.

Configure Kinesis Emitter for Spark Pipelines

To add a Kinesis emitter into your pipeline, drag the emitter on to the canvas, connect it to a Data Source or a Processor, and right-click on it to configure it:

Note: If the data source in pipeline has a streaming component, then the emitter will show four additional properties, Checkpoint Storage Location; Checkpoint Connections; Checkpoint Directory; and Time-Based checkpoint.

Field

Description

Connection Name

Select a Kinesis connection for connecting to Kinesis.

Stream Name

Name of the Kinesis Stream.

Shard Count

Number of shards required to create the stream, in case stream is not present.

Region

Name of the region. For example, us-west-2

Partition Key

Used by Amazon Kinesis to distribute data across shards.

Output Format

Datatype format of the output.

Output Fields

Fields in the message that needs to be a part of the output data.

Checkpoint Directory

Location where the checkpoint data is stored.

Output Mode

Mode to be used while writing the data to Streaming sink.


Select the output mode from the given three options:


Append:

Output Mode in which only the new rows in the streaming data will be written to the sink.


Complete Mode:

Output Mode in which all the rows in the streaming data will be written to the sink every time there are some updates.


Update Mode:

Output Mode in which only the rows that were updated in the streaming data will be written to the sink every time there are some updates.

Enable Trigger

Trigger defines how frequently a streaming query should be executed.

Processing Time

It will appear only when Enable Trigger checkbox is selected.

Processing Time is the trigger time interval in minutes or seconds.

ADD CONFIGURATION

Enables to configure additional properties.

Click on the Next button. Enter the notes in the space provided. Click Save for saving the configuration details.

Kudu

Apache Kudu is a column-oriented data store of the Apache Hadoop ecosystem. It enable fast analytics on fast (rapidly changing) data. The emitter is engineered to take advantage of hardware and in-memory processing. It lowers query latency significantly from similar type of tools.

Configuring KUDU Emitter for Spark Pipelines

To add a KUDU emitter into your pipeline, drag the emitter to the canvas and connect it to a Data Source or processor. The configuration settings are as follows:

Note: If the data source in pipeline has a streaming component, then the emitter will show four additional properties, Checkpoint Storage Location; Checkpoint Connections; Checkpoint Directory; and Time-Based checkpoint.

Field

Description

Connection Name

Connection URL for creating Kudu connection.

Table Administration

If checked, the table will be created.

Primary Keys

This option is to select fields which will be primary keys of the table.

Partition List

This option is to select fields on which table will be partitioned.

Buckets

Buckets used for partitioning.

Replication

Replication factor used to make additional copies of data. The value should be either 1, 3, 5 or 7, only.

Checkpoint Storage Location

Select the checkpointing storage location. Available options are HDFS, S3, and EFS.

Checkpoint Connections

Select the connection. Connections are listed corresponding to the selected storage location.

Checkpoint Directory

It is the path where Spark Application stores the checkpointing data.


For HDFS and EFS, enter the relative path like /user/hadoop/, checkpointingDir system will add suitable prefix by itself.

For S3, enter an absolute path like: S3://BucketName/checkpointingDir

Time-Based Check Point

Select checkbox to enable timebased checkpoint on each pipeline run i.e. in each pipeline run above provided checkpoint location will be appended with current time in millis.

Output Fields

This option is to select fields whose value you want to persist in Table.

Output Mode

Output mode to be used while writing the data to Streaming sink.

Append: Output Mode in which only the new rows in the streaming data will be written to the sink.

Complete Mode: Output Mode in which all the rows in the streaming data will be written to the sink every time there are some updates.

Update Mode: Output Mode in which only the rows that were updated in the streaming data will be written to the sink every time there are some updates.

Save Mode

Save mode specifies how to handle the existing data.

Enable Trigger

Trigger defines how frequently a streaming query should be executed.

Processing Time

It will appear only when Enable Trigger checkbox is selected.

Processing Time is the trigger time interval in minutes or seconds.

ADD CONFIGURATION

Enables additional configuration properties of Elasticsearch.

Click on the Next button. Enter the notes in the space provided.

Click on the DONE for saving the configuration.

Mongo

To add Mongo emitter into your pipeline, drag the emitter on the canvas, connect it to a Data Source or processor, and click on it to configure.

Configuring Mongo Emitter

Field

Description

Connection Name

Select a connection name out of the list of saved connections from the drop-down.

Database Name

Select the database name to write data.

Collection Name

Select the name of the database collection that needs to be scanned should be selected.

Output Fields

Select the fields from the drop-down list that needs to be included in the output data.

Extended BSON Types

This option is checked by default to enable the extended BSON types while writing the data to Mongo DB emitter.

Replace Document

This options is checked by default to replace the document when saving datasets that contain an _id field.

If unchecked, it will only update the fields in the document that match the fields in the dataset.

Local Threshold

Provide the threshold value (in milliseconds) for choosing a server from multiple Mongo DB servers.

Max Batch Size

The maximum batch size for bulk operations when saving data. The default value provided is 512.

Write Concern W

The w option request for an acknowledgment that the write operation has propagated to a specified number of mongod instances or to mongod instances with specified tags.

Write Concern Timeout

Specify a wtimeout value (in milliseconds) so the query can timeout if the write concern can't be enforced. Applicable for values>1.

Shard Key

Provide value for Shard Key. MongoDB partitions data in the collection using ranges of shard key values. The field should be indexed and contain unique values.

Force Insert

Check the option to enable Force Insert to save inserts even if the datasets contains _IDs.

Ordered

This option is checked by default to allow setting the bulk operations ordered property.

Save Mode

Save Mode is used to specify the expected behavior of saving data to a data sink.

ErrorifExist: When persisting data, if the data already exists, an exception is expected to be thrown.

Append: When persisting data, if data/table already exists, contents of the Schema are expected to be appended to existing data.

Overwrite: When persisting data, if data/table already exists, existing data is expected to be overwritten by the contents of the Data.

Ignore: When persisting data, if data/table already exists, the save operation is expected to not save the contents of the Data and to not change the existing data.

This is similar to a CREATE TABLE IF NOT EXISTS in SQL.

ADD CONFIGURATION

Further configurations can be added.

MQTT

Mqtt emitter emits data to Mqtt queue or topic. Supported output formats are Json, and Delimited.

Mqtt supports wireless network with varying levels of latency.

Configuring Mqtt Emitter for Spark Pipelines

To add Mqtt emitter into your pipeline, drag the emitter on to the canvas, connect it to a Data Source or processor, and right click on it to configure it.

Note: If the data source in pipeline has a streaming component, then the emitter will show four additional properties, Checkpoint Storage Location; Checkpoint Connections; Checkpoint Directory; and Time-Based checkpoint.

Field

Description

Connection Name

All Mqtt connections will be listed here. Select a connection for connecting to Mqtt.

Queue Name/Topic Name

Queue or topic name to which messages will be published.

Output Format

Datatype format of the output.

JSON and Delimited.

If Delimited is selected, provide a delimiter as well.

Output Fields

Fields in the message that needs to be a part of the output data.

Checkpoint Directory

It is the path where Spark Application stores the checkpointing data.


For HDFS and EFS, enter the relative path like /user/hadoop/, checkpointingDir system will add suitable prefix by itself.

For S3, enter an absolute path like: S3://BucketName/checkpointingDir

Time-Based Check Point

Select checkbox to enable timebased checkpoint on each pipeline run i.e. in each pipeline run above provided checkpoint location will be appended with current time in millis.

Output Mode

Output mode to be used while writing the data to Streaming sink.


Append:

Output Mode in which only the new rows in the streaming data will be written to the sink


Complete Mode:

Output Mode in which all the rows in the streaming data will be written to the sink every time there are some updates


Update Mode:

Output Mode in which only the rows that were updated in the streaming data will be written to the sink every time there are some updates.

Enable Trigger

Trigger defines how frequently a streaming query should be executed.

Processing Time

Trigger time interval in minutes or seconds.

ADD CONFIGURATION

Enables to configure additional properties.

Click on the Add Notes tab. Enter the notes in the space provided.

Click SAVE for saving the configuration.

OpenJMS

OpenJMS is used to send and receive messages from one application to another. OpenJms emitter is used to write data to JMS queues or topics. All applications that have subscribed to those topics/queues will be able to read that data.

Configuring OpenJms Emitter for Spark Pipelines

To add an OpenJms emitter into your pipeline, drag the emitter on to the canvas, connect it to a Data Source or processor, and right click on it to configure it.

Note: If the data source in pipeline has a streaming component, then the emitter will show four additional properties, Checkpoint Storage Location; Checkpoint Connections; Checkpoint Directory; and Time-Based checkpoint.

.

Field

Description

Connection Name

All OpenJms connections will be listed here. Select a connection for connecting to the OpenJMS.

Queue Name

Queue name on which messages are published.

Output Format

Select the data format in which OpenJMS is configured to write the data.

Output Fields

Select the fields which should be a part of the output data.

Checkpoint Storage Location

Select the checkpointing storage location. Available options are HDFS, S3, and EFS.

Checkpoint Connections

Select the connection. Connections are listed corresponding to the selected storage location.

Checkpoint Directory

It is the path where Spark Application stores the checkpointing data.


For HDFS and EFS, enter the relative path like /user/hadoop/, checkpointingDir system will add suitable prefix by itself.

For S3, enter an absolute path like: S3://BucketName/checkpointingDir

Time-Based Check Point

Select checkbox to enable timebased checkpoint on each pipeline run i.e. in each pipeline run above provided checkpoint location will be appended with current time in millis.

Output Mode

Output Mode to be used while writing the data to data sink.

Select the output mode from the given three options:


Append:

Output Mode in which only the new rows in the streaming data will be written to the sink


Complete Mode:

Output Mode in which all the rows in the streaming data will be written to the sink every time there are some updates


Update Mode:

Output Mode in which only the rows that were updated in the streaming data will be written to the sink every time there are some updates.

Enable Trigger

Trigger defines how frequently a streaming query will be executed.

Processing Time

It will appear only when Enable Trigger checkbox is selected. Processing Time is the trigger time interval in minutes or seconds.

ADD CONFIGURATION

Enables to configure additional properties.

Click on the Next button. Enter the notes in the space provided.

Click SAVE for saving the configuration details.

Pubsub

The Pub/Sub emitter stores data to an existing Topic in Google Pub/Sub. Data format supported are JSON and DELIMITED.

Field

Description

Connection Name

Connections are the service identifiers. A connection name can be selected from the list if you have created and saved connection details for Pub/Sub earlier.

Override Credentials

Unchecked by default, check mark the checkbox to override credentials for user specific actions.

Service Account Key Files

Upload the Service Account File.

User has an option to test the connection.

Topic Name

A topic is a category or feed name to which messages will be published.

Retry Timeout

It defines how long the logic should keep trying the remote call until it gives up completely. The higher the total timeout, the more retries can be attempted. The value should be entered in seconds.

Output Format

Data type format of the output.

Output Fields

Fields in the message that needs to be a part of the output data.

Add Custom Message Attributes

Add key value pair to add any custom message attributes.


RabbitMQ

The RabbitMQ emitter is used when you want to write data to RabbitMQ cluster.

Data formats supported are JSON and DELIMITED (CSV, TSV, PSV, etc).

Configuring RabbitMQ Emitter for Spark Pipelines

To add a RabbitMQ emitter into your pipeline, drag the RabbitMQ emitter on the canvas and connect it to a Data Source or processor. Right click on the emitter to configure it as explained below:

Note: If the data source in pipeline has a streaming component, then the emitter will show four additional properties, Checkpoint Storage Location; Checkpoint Connections; Checkpoint Directory; and Time-Based checkpoint.

 

Field

Description

Connection Name

All RabbitMQ connections will be listed here. Select a connection for connecting to the RabbitMQ server.

Exchange Name

Rabbit MQ Exchange name.

Exchange Type

Specifies how messages are routed through Exchange.


Direct: A message goes to the queue(s) whose binding key exactly matches the routing key of the message.


Fanout: The fanout copies and routes a received message to all queues that are bound to it regardless of routing keys or pattern matching as with direct and topic exchanges. Keys provided will simply be ignored


Topic: Topic exchanges route messages to queues based on wildcard matches between the routing key and something called the routing pattern specified by the queue binding. Messages are routed to one or many queues based on a matching between a message routing key and this pattern.

Exchange Durable

Specifies whether exchange will be deleted or will remain active on server restart.


TRUE: Exchange will not be deleted if you restart RabbitMQ server.


FALSE: Exchange will be deleted if you restart RabbitMQ server.

Routing Key

Select RabbitMQ Routing Key where data will be published.

Queue Name

RabbitMQ Queue Name where data will be published.

Queue Durable

Specifies whether queue will remain active or deleted on server restart.

TRUE: Queue will not be deleted if you restart RabbitMQ.

FALSE: Queue will be deleted if you restart RabbitMQ.

Output Format

Select the data format in which RabbitMQ should write the data.

Output Fields

Select the fields which should be a part of the output data.

Enable Message TTL

RMQ allows you to set TTL (time to live) for messages.

Message TTL

Time to live in seconds after which message will be discarded to the specified TTL Exchange.

TTL Exchange

Name of the Exchange, on which message will be sent once time to live expires.

TTL Queue

Name of the Queue on which message will be sent once time to live expires.

TTL Routing Key

Routing key used to bind TTL queue with TTL exchange.

Checkpoint Storage Location

Select the checkpointing storage location. Available options are HDFS, S3, and EFS.

Checkpoint Connections

Select the connection. Connections are listed corresponding to the selected storage location.

Checkpoint Directory

It is the path where Spark Application stores the checkpointing data.


For HDFS and EFS, enter the relative path like /user/hadoop/, checkpointingDir system will add suitable prefix by itself.

For S3, enter an absolute path like: S3://BucketName/checkpointingDir

Time-Based Check Point

Select checkbox to enable timebased checkpoint on each pipeline run i.e. in each pipeline run above provided checkpoint location will be appended with current time in millis.

Output Mode

Output mode to be used while writing the data to Streaming sink.

Select the output mode from the given three options:


Append Mode:

Output Mode in which only the new rows in the streaming data will be written to the sink


Complete Mode:

Output Mode in which all the rows in the streaming data will be written to the sink every time there are some updates.


Update Mode:

Output Mode in which only the rows that were updated in the streaming data will be written to the sink every time there are some updates.

Enable Trigger

Trigger defines how frequently a streaming query should be executed.

Processing Time

Trigger time interval in minutes or seconds.

Add Configuration

Enables to configure additional RabbitMQ properties.

Click on the NEXT button. Enter the notes in the space provided. Click SAVE for saving the configuration details.

RDS

RDS emitter allows you to write to RDS (un)Secured DB Engine. RDS is Relational Database service on Cloud.

Configuring RDS Emitter for Spark Pipelines

To add an RDS emitter into your pipeline, drag the emitter to the canvas and connect it to a Data Source or processor.

Note: If the data source in pipeline has a streaming component, then the emitter will show four additional properties, Checkpoint Storage Location; Checkpoint Connections; Checkpoint Directory; and Time-Based checkpoint. 

The configuration settings are as follows:

Field

Description

Connection Name

All OpenJms connections will be listed here. Select a connection for connecting to the OpenJMS.

Message Name

The name for the message configuration which will act as metadata for the actual data.

Schema Name

Existing database Schema Names whose tables are fetched. (for MSSQL, DB2 and POSTGRES)

Table Name

Existing tablename of the specified database.

Is Batch Enable

Enable parameter to process batch multiple messages and improve write performance.

Batch Size

Batch Size determines how many rows to insert per round trip. This helps the performance on JDBC drivers. This option applies only to writing. It defaults to 1000.

Connection Retries

Number of retries for component connection

Delay Between Connection Retries

Defines the retry delay intervals for component connection in millis.

Checkpoint Storage Location

Select the checkpointing storage location. Available options are HDFS, S3, and EFS.

Checkpoint Connections

Select the connection. Connections are listed corresponding to the selected storage location.

Checkpoint Directory

It is the path where Spark Application stores the checkpointing data.


For HDFS and EFS, enter the relative path like /user/hadoop/, checkpointingDir system will add suitable prefix by itself.

For S3, enter an absolute path like: S3://BucketName/checkpointingDir

Time-Based Check Point

Select checkbox to enable timebased checkpoint on each pipeline run i.e. in each pipeline run above provided checkpoint location will be appended with current time in millis.

Output Mode

Output Mode to be used while writing the data to data sink.

Select the output mode from the given three options:


Append:

Output Mode in which only the new rows in the streaming data will be written to the sink


Complete Mode:

Output Mode in which all the rows in the streaming data will be written to the sink every time there are some updates


Update Mode:

Output Mode in which only the rows that were updated in the streaming data will be written to the sink every time there are some updates.

Enable Trigger

Trigger defines how frequently a streaming query will be executed.

Processing Time

It will appear only when Enable Trigger checkbox is selected. Processing Time is the trigger time interval in minutes or seconds.

ADD CONFIGURATION

Enables to configure additional properties.

Redshift

Redshift emitter works for both Streaming and Batch Datasets. It allows data to be pushed into the Redshift tables.

Configuring Redshift Emitter for Spark Pipelines

To add a Redshift emitter into your pipeline, drag the emitter on the canvas and connect it to a Data Source or processor. Right click on the emitter to configure it as explained below:

Note: If the data source in pipeline has a streaming component, then the emitter will show four additional properties, Checkpoint Storage Location; Checkpoint Connections; Checkpoint Directory; and Time-Based checkpoint. 

Field

Description

Connection Name

All Redshift connections will be listed here. Select a connection for connecting to Redshift

Message Name

Message used in the pipeline

Schema Name

Schema name should be selected while writing data into Redshift.

Table Name

Existing database tablename whose schema is to be fetched.

Connection Retries

Number of retries for component connection.

Delay Between Connection Retries

Defines the retry delay intervals for component connection in millis.

Checkpoint Storage Location

Select the checkpointing storage location. Available options are HDFS, S3, and EFS.

Checkpoint Connections

Select the connection. Connections are listed corresponding to the selected storage location.

Checkpoint Directory

It is the path where Spark Application stores the checkpointing data.


For HDFS and EFS, enter the relative path like /user/hadoop/ , checkpointingDir system will add suitable prefix by itself.

For S3, enter an absolute path like: S3://BucketName/checkpointingDir

Time-Based Check Point

Select checkbox to enable timebased checkpoint on each pipeline run i.e. in each pipeline run above provided checkpoint location will be appended with current time in millis.

Output Mode

Output mode to be used while writing the data to Streaming emitter.

Select the output mode from the given three options:

Append: Output Mode in which only the new rows in the streaming data will be written to the sink.

Complete Mode: Output Mode in which all the rows in the streaming data will be written to the sink every time there are some updates.

Update Mode: Output Mode in which only the rows that were updated in the streaming data will be written to the sink every time there are some updates.

Enable Trigger

Trigger defines how frequently a streaming query should be executed.

Processing Time

Processing Time is the trigger time interval in minutes or seconds. This property will appear only when Enable Trigger checkbox is selected.

ADD CONFIGURATION

Enables to configure additional custom properties.

S3

Amazon S3 stores data as objects within resources called Buckets. S3 emitter stores objects on Amazon S3 bucket.

Configuring S3 Emitter for Spark Pipelines

To add a S3 emitter into your pipeline, drag the emitter on the canvas and connect it to a Data Source or processor. Right click on the emitter to configure it as explained below:

Note: If the data source in pipeline has a streaming component, then the emitter will show four additional properties, Checkpoint Storage Location; Checkpoint Connections; Checkpoint Directory; and Time-Based checkpoint.

Field

Description

Connection Name

All S3 connections will be listed here. Select a connection for connecting to S3.

S3 protocol

S3 protocol to be used while writing on S3.

End Point

S3 endpoint details should be provided if the source is Dell EMC S3.

Bucket Name

Buckets are storage units used to store objects, which consists of data and meta-data that describes the data.

Path

File or directory path from where data is to be stored.

Output Type

Output format in which result will be processed.

Delimiter

Message Field seperator.

Output Field

Fields of the output message.

Partitioning Required

Whether to partition data on s3 or not.

Save Mode

Save Mode is used to specify the expected behavior of saving data to a data sink.

ErrorifExist: When persisting data, if the data already exists, an exception is expected to be thrown.

Append: When persisting data, if data/table already exists, contents of the Schema are expected to be appended to existing data.

Overwrite: When persisting data, if data/table already exists, existing data is expected to be overwritten by the contents of the Data.

Ignore: When persisting data, if data/table already exists, the save operation is expected to not save the contents of the Data and to not change the existing data.

This is similar to a CREATE TABLE IF NOT EXISTS in SQL.

Output Mode

Output mode to be used while writing the data to Streaming emitter. Select the output mode from the given three options:

Append:

Output Mode in which only the new rows in the streaming data will be written to the sink

Complete Mode:

Output Mode in which all the rows in the streaming data will be written to the sink every time there are some updates

Update Mode:

Output Mode in which only the rows that were updated in the streaming data will be written to the sink every time there are some updates.

Checkpoint Storage Location

Select the checkpointing storage location. Available options are HDFS, S3, and EFS.

Note: It is recommended that you use s3a protocol along with the path.

In case of AWS Databricks cluster, while creating a new cluster (within Cluster List View), under IAM role, s3 Role must be selected.

Checkpoint Connections

Select the connection. Connections are listed corresponding to the selected storage location.

Checkpoint Directory

It is the path where Spark Application stores the checkpointing data.


For HDFS and EFS, enter the relative path like /user/hadoop/, checkpointingDir system will add suitable prefix by itself.

For S3, enter an absolute path like: S3://BucketName/checkpointingDir

Time-Based Check Point

Select checkbox to enable timebased checkpoint on each pipeline run i.e. in each pipeline run above provided checkpoint location will be appended with current time in millis.

ADD CONFIGURATION

Enables to configure additional custom properties.

Note:

Add various Spark configurations as per requirement. For example: Perform imputation by clicking the ADD CONFIGURATION button. Note: For imputation replace nullValue/emptyValue with the entered value across the data. (Optional)


Example: nullValue =123, the output will replace all null values with 123

Snowflake

The user can use snowflake cloud-based data warehouse system as an emitter in the ETL pipelines. The user will be required to configure as shown below:

Field

Description

Connection Name

Connection name is to be selected out of the list of saved connections.

Override Credentials

Unchecked by default, check the checkbox to override credentials for user specific actions.

Username

Once the Override Credentials option is checked, provide the user name through which the Delta SQL service is running.

Password

Provide the password for Delta SQL override credentials.

Warehouse Name

Required warehouse name is to be selected from the list of Warehouses for the selected Snowflake connection.

Schema Name

Required schema name is to be selected from the Snowflake database schema list which appears as per the selected Snowflake connection.

Table settings options: Use Existing Table or Create New Table are available.

If Use Existing Table is selected, then provide the below fields:

Table Name

Target tables that are contained in the schema that you selected before, will list as a drop-down. An existing table name should be selected in which the source data is to be emitted.

Next, the schema results will be displayed for mapping.

Schema Mapping when an Existing Table is used

There are two ways in which schema mapping can be provided for an existing table, Fetch From Target and Upload Schema File.

Fetch From Target

With fetch from target you can directly fetch the schema to be applied from the Snowflake target.


Table column names will get populated and the Auto Map option can be used to fill-in the mapping values.


To do bulk changes in the mapping values, you can use Download Mapping option to download the schema file. Update mapping values in the downloaded file. Then, use Upload Mapping option to provide the mapping values.


The columns that are selected to ignore will not be considered for mapping.

Upload Schema File

The upload schema file option is preferable when the application is supposed to be submitted on a registered cluster. Here, you can upload the sample schema to be applied on the Snowflake target table.


Use Upload Schema option to provide a sample schema file if you have one.


Then, the Auto Map option can be used to fill-in the mapping values.


Else, use Download Sample Schema option to download a sample schema file. Update mapping values in the downloaded file and then upload it to provide the schema mapping.


The columns that are selected to ignore will not be considered for mapping.

Create New Table

If Create New Table is selected, configure the below fields:

Table Name

A table name should be provided that will get created in the schema that you selected before. The source data will be emitted to this table.

Next, the Columns section will be displayed for mapping.

Use Upload Schema option to provide a sample schema file if you have one.


Then, the Auto Map option can be used to fill-in the mapping values.


Else, use Download Sample Schema option to download a sample schema file. Update mapping values in the downloaded file and then upload it to provide the schema mapping.


The columns that are selected to ignore will not be considered for mapping.


Each column can then be defined with its data type, and optionally whether the column:


- Requires a value (NOT NULL).


- Has a default value.


- Has any referential integrity constraints (primary key, or unique key).


Additionally, you can also provide the constraints for reference table and reference columns using the Add Foreign Key option.

Continue to configure the new table options as follows:

Comments

Optional comments can be added for the table.

Cluster By

Specify column or column expressions that should be referenced as the clustering key in the table.

To know more about data clustering in Snowflake, refer to the topic below:


Snowflake Table Structures

Data Retention Time

For how many days the table needs to be retained can be specified.

Change Tracking

If set to true, an individual table stream tracks the changes made to rows in a source table.

Copy Grants

Check-marking the box for grants, copies permissions from the table being replaced with CREATE OR REPLACE (if it already exists), and not from the source table(s) being queried in the SELECT statement.

Table Options

Create table method should be selected out of the following:

Create table if not exists

A new table will get created as per the configurations provided if it does not exist in the target.

Truncate & create table

If the table name provided already exists in Snowflake database, it will be truncated and a new table will then be created with the source data.

Truncate table

If the table name provided already exists in Snowflake database, it will be truncated with the source data.

Warehouse Settings Cluster and Scaling

Configure Warehouse details to specify the compute resources. The available options are Use Existing Warehouse or Create New Warehouse.

If Use Existing Warehouse is selected, configure the below fields:

Warehouse Name

Warehouse list for the selected warehouse connection will be displayed here. The user can select the warehouse from the list.

If Create New Warehouse is selected, configure the below fields:

Warehouse Name

Provide a warehouse name for the new warehouse to be created.

Warehouse Configuration

Warehouse Name

New Warehouse name should be provided.

Warehouse Size

Preferred warehouse size should be selected.

Maximum Cluster Count

The maximum number of clusters required for the warehouse should be specified.

Scaling Policy

The scaling policy should be selected out of Standard or Economy.

Auto Suspend

Auto suspend value for the warehouse should be provided (in seconds).

Auto Resume

Specifies whether to automatically resume a warehouse when a SQL statement (e.g. query) is submitted to it.

Comments

Optional comments can be added for the warehouse.

Advanced Settings

Advanced settings will vary in case of batch and streaming data sources in the pipeline.


Advanced settings options with a Batch Data Source in pipeline:

Save Mode

Specifies how to handle the existing data in the target. The options are, Append and Truncate and Overwrite.

Append

Contents of the schema will be appended to existing data/table.

Use External Storage

If the user is doing an external data transfer, and has to store the temporary data in S3, select the external storage option.

External Storage

The temporary data will get stored in S3.

Connection Name

Connections are the service identifiers. A connection name can be selected from the list if you have created and saved connection details for Amazon S3 earlier.

Path

Optional. S3 file or directory path is to be given where the temporary data will get stored.

Purge

Optional. The purge option can be used to delete the intermediate data from S3.

Add configuration

Additional properties can be added using Add Configuration link.

Advanced settings options with a Streaming Data Source in pipeline:

Streaming Stage Name

Stage name to be used for creating stage in Snowflake.

Output Mode

The output mode to be used while writing the data to the Streaming sink.

Append

Output Mode in which only the new rows in the streaming data will be written to the target.

Enable Trigger

Trigger defines how frequently a streaming query should be executed.

Processing Time

Processing Time is the trigger time interval in minutes or seconds. This property will appear only when Enable Trigger checkbox is selected.

ADD CONFIGURATION

Additional properties can be added using Add Configuration link.




SQS

SQS emitter allows to emit data in Streaming manner into SQS Queues.

Configuring SQS Emitter for Spark Pipelines

To add an SQS emitter into your pipeline, drag the emitter to the canvas and connect it to a Data Source or processor. The configuration settings are as follows:

Note: If the data source in pipeline has a streaming component, then the emitter will show four additional properties, Checkpoint Storage Location; Checkpoint Connections; Checkpoint Directory; and Time-Based checkpoint.

Field

Description

Connection Name

All RabbitMQ connections will be listed here. Select a connection for connecting to the RabbitMQ server.

Queue Type

Select either type of Queue for incoming messages, Standard or SQS FIFO.

Queue Name

Name of the Queue where data will be published.

Content-Based Deduplication

Enable content based deduplication for the queue (each of your message has a unique body). The producer can omit the message deduplication ID.

Message Group ID For FIFO Queue

The tag that specifies that a message belongs to a specific message group. Messages that belong to the same message group are guaranteed to be processed in FIFO manner.

Message Deduplication ID for FIFO Queue

The token used for deduplication of messages within the deduplication interval.

Visibility Timeout (in seconds)

The length of time (in seconds) that a message received from a queue will be invisible for, to other receiving components.

Message Retention Period (in seconds)

The amount of time in which Amazon SQS will retain a message if it does not get deleted.

Maximum Message Size (in bytes)

Maximum message size (in bytes) accepted by Amazon SQS.

Receive Message Wait Time (In seconds)

The maximum amount of time that a long polling receive call will wait for a message to become available before returning an empty response.

Delivery Delay (in seconds)

The amount of time to delay the first delivery of all messages added to this queue.

Output Format

Select the data format in which RabbitMQ should write the data.

Output Fields

Select the fields which should be a part of the output data.

Checkpoint Storage Location

Select the check-pointing storage location. Available options are HDFS, S3, and EFS.

Checkpoint Connections

Select the connection. Connections are listed corresponding to the selected storage location.

Checkpoint Directory

It is the path where Spark Application stores the checkpointing data.


For HDFS and EFS, enter the relative path like /user/hadoop/, checkpointingDir system will add suitable prefix by itself.

For S3, enter an absolute path like: S3://BucketName/checkpointingDir

Time-Based Check Point

Select checkbox to enable timebased checkpoint on each pipeline run i.e. in each pipeline run above provided checkpoint location will be appended with current time in millis.

Output Mode

Output mode to be used while writing the data to Streaming sink.

Select the output mode from the given three options:


Append Mode:

Output Mode in which only the new rows in the streaming data will be written to the sink


Complete Mode:

Output Mode in which all the rows in the streaming data will be written to the sink every time there are some updates.


Update Mode:

Output Mode in which only the rows that were updated in the streaming data will be written to the sink every time there are some updates.

Enable Trigger

Trigger defines how frequently a streaming query should be executed.

Processing Time

Trigger time interval in minutes or seconds.

Add Configuration

Enables to configure additional RabbitMQ properties.

Streaming Emitter

Streaming Emitter is an action provided by Spark Structured Streaming. It provides a custom implementation for processing streaming data which is executed using the emitter.

Configuring CustomStreamingEmitter for Spark Pipelines

To add a CustomStreamingEmitter into your pipeline, drag the CustomStreamingEmitter; to the canvas, connect it to a Data Source or processor, and right click on it to configure:

Note: If the data source in pipeline has a streaming component, then the emitter will show four additional properties, Checkpoint Storage Location; Checkpoint Connections; Checkpoint Directory; and Time-Based checkpoint.

Field

Description

Implementation Class

Foreach implementation class to which control will be passed to process incoming data flow.

Output Mode

Output mode to be used while writing the data to Streaming sink.


Select the output mode from the given three options:

Append: Output Mode in which only the new rows in the streaming data will be written to the sink

Complete Mode: Output Mode in which all the rows in the streaming data will be written to the sink every time there are some updates

Update Mode: Output Mode in which only the rows that were updated in the streaming data will be written to the sink every time there are some updates.

Enable Trigger

Trigger defines how frequently a streaming query should be executed.

Processing Time

It will appear only when Enable Trigger checkbox is selected.

Processing Time is the trigger time interval in minutes or seconds.

ADD CONFIGURATION

Additional properties can be added.

Click on the Next button. Enter the notes in the space provided.

Click on the Done button for saving the configuration.

Streaming

Streaming emitter enables you to visualize the data running in the pipeline at the built-in real-time dashboard.

For example, you may use Streaming emitter to view real-time price fluctuation of stocks.

 

Field

Description

Stream Id

Exchange name on which streaming messages will be sent.

Output Mode

Output mode to be used while writing the data to streaming sink. Select the output mode from the given three options:

Append: Output Mode in which only the new rows in the streaming data will be written to the sink.

Complete Mode: Output Mode in which all the rows in the streaming data will be written to the sink every time there are some updates.

Update Mode: Output Mode in which only the rows that were updated in the streaming data will be written to the sink every time there are some updates.

Enable Trigger

Trigger defines how frequently a streaming query should be executed.

Processing Time

Trigger time interval in minutes or seconds.

ADD CONFIGURATION

Enables to configure additional custom properties.

Click on the Next button. Enter the notes in the space provided. Click Done for saving the configuration details.

Advance HDFS Emitter

Advance HDFS emitter allows you to add rotation policy to the emitter.

Configuring Advance HDFS Emitter for Spark Pipelines

To add an Advance HDFS emitter into your pipeline, drag the emitter to the canvas and connect it to a Data Source or processor. The configuration settings are as follows:

Field

Description

Connection Name

All advance HDFS connections will be listed here. Select a connection for connecting to HDFS.

Override Credentials

Check the checkbox for user specific actions.

Username

The name of user through which the Hadoop service is running.


Note: Click TEST CONNECTION BUTTON to test the connection.

Output Mode

Output mode to be used while writing the data to Streaming emitter. Select the output mode from the given three options:


Append: Output Mode in which only the new rows in the streaming data will be written to the sink


Complete Mode: Output Mode in which all the rows in the streaming data will be written to the sink every time there are some updates.


Update Mode: Output Mode in which only the rows that were updated in the streaming data will be written to the sink every time there are some updates.

Enable Trigger

Trigger defines how frequently a streaming query should be executed.

Trigger Type

Available options:


- One-Time Micro Batch


- Fixed-Interval Micro Batches (Provide the Processing Time)

ADD CONFIGURATION

User can add further configurations (Optional).


Note:


Add various Spark configurations as per requirement. For example: Perform imputation by clicking the ADD CONFIGURATION button. Note: For imputation replace nullValue/emptyValue with the entered value across the data. (Optional)


Example: nullValue =123, the output will replace all null values with 123

ENVIRONMENT PARAMS

Click the + ADD PARAM button to add further parameters as key-value pair.

HDFS PATH

Path Type

Select path type from the drop-down. Available options are:


- Static


- Dynamic

HDFS Path

Directory path from HDFS from where the data is to be read.

Output Fields

Message field that will be persisted.

Partitioning Required

Check the check-box to partition the table.

Partition Column

Option to select fields on which the table will be partitioned.

Include Column Name in Partition

Option to include the column name in partition path.

Output Type

Output format in which result will be processed.


Delimited: Delimited formats In a comma-separated values (CSV) file the data items are separated using commas as a delimiter.


JSON is an open-standard file format that uses human-readable text to transmit data objects consisting of attribute–value pairs and array data types (or any other serializable value).


ORC: ORC stands for Optimized Row Columnar which means it can store data in an optimized way than the other file formats.


AVRO: Avro stores the data definition in JSON format making it easy to read and interpret.


Parquet: Parquet stores nested data structures in a flat columnar format.

Rotation Policy

Select a rotation policy from - None, Size based, Time based, Size and time based both.




None- there will be no rotation policy applied.


Size based - the data will be written until the mentioned size in bytes has been reached.


Time based - the data will be written in same file for the mentioned amount of time in seconds.


Time and Size based - data will be written in same file until one criteria from mentioned size and time is achieved.


Record Based: The data will be written in same file until the mentioned number of records has been written.

Delimiter

Select the message field separator.

Check Point Directory

It is the HDFS Path where the Spark application stores the checkpoint data.

Time-Based Check Point

Select checkbox to enable time-based checkpoint on each pipeline run i.e. in each pipeline run above provided checkpoint location will be appended with current time in millis.

Block Size

Size of each block (in bytes) allocated in HDFS

Replication

Replication factor used to make additional copies of the data.

Compression Type

Save Mode

Save Mode is used to specify the expected behavior of saving data to a data sink.


ErrorifExist: When persisting data, if the data already exists, an exception is expected to be thrown.


Append: When persisting data, if data/table already exists, contents of the Schema are expected to be appended to existing data.


Overwrite: When persisting data, if data/table already exists, existing data is expected to be overwritten by the contents of the data.


Ignore: When persisting data, if data/table already exists, the save operation is expected to not save the contents of the data and to not change the existing data.


This is similar to a CREATE TABLE IF NOT EXISTS in SQL.

Enable Triggers

Trigger defines how frequently a streaming query should be executed.

Path Type

Path Type could be either Static or Dynamic.

Static: When you select Static, “HDFS Path” field is pertained as a text box. Provide a valid HDFS PATH to emit data.

Dynamic: With Dynamic, HDFS Path will be populated with a drop down of the field names of incoming dataset.

The value of the field as selected should be a valid HDFS path and that is considered as a value of the field. This value/path is the location where the records will be emitted.

HDFS Path

Enter the Directory path/Field name.

Output Fields

Fields in the message that needs to be a part of the output message.

Partitioning Required

Check this option if table is to be partitioned. You will view Partition List input box.

Partition List

This option is to select fields on which table will be partitioned.

Include Column name in Partition

Includes the column name in partition path.


Example- {Column Name}={Column Value}

Output Type

Output format in which the results will be processed.

Rotation Policy

Select a rotation policy from - None, Size based, Time based, Size and time based both.


None- there will be no rotation policy applied.


Size based - the data will be written until the mentioned size in bytes has been reached.


Time based - the data will be written in same file for the mentioned amount of time in seconds.


Time and Size based - data will be written in same file until one criteria from mentioned size and time is achieved.


Record Based: The data will be written in same file until the mentioned number of records has been written.

Raw Data Size

If rotation policy is Size Based or Time and Size based - Enter the raw data size in bytes after which the file will be rotated.

File Rotation Time

If rotation policy is Time Based or Time and Size based - Enter the time in milliseconds after which the file will be rotated.

Record count

Records count after which the data will be written in the new file. This field is generated when you select your Rotation Policy as Record Based.

Delimiter

Message field separator.

Checkpoint Directory

It is the path where Spark Application stores the checkpointing data.

For HDFS and EFS, enter the relative path like /user/hadoop/, checkpointingDir system will add suitable prefix by itself.


For S3, enter an absolute path like: S3://BucketName/checkpointingDir

Time-based Check Point

Select checkbox to enable time-based checkpoint on each pipeline run i.e. in each pipeline run above provided checkpoint location will be appended with current time in millis.

Block Size

Size of each block (in bytes) allocated in HDFS.

Replication

Replication factor used to make additional copies of the data.

Compression Type

Algorithm used to compress data. Types of Compression algorithms that you can apply are:

• NONE

• DEFLATE

• GZIP

• BZIP2

• SNAPPY

Cassandra

Cassandra emitter allows you to store data in a Cassandra table.

Configuring Cassandra emitter for Spark pipelines

To add a Cassandra emitter into your pipeline, drag the emitter to the canvas and connect it to a Data Source or processor.

Note: If the data source in pipeline has a streaming component, then the emitter will show four additional properties, Checkpoint Storage Location; Checkpoint Connections; Checkpoint Directory; and Time-Based checkpoint.

The configuration settings are as follows:

Field

Description

Connection Name

All Cassandra connections will be listed here. Select a connection for connecting to Advance Kafka.

KeySpace

Cassandra keyspace name. If keyspace name does not exist in Cassandra, then it will create new keyspace.

Output Fields

Output messages fields.

Key Columns

A single/compound primary key consists of the partition key and one or more additional columns that determines clustering.

Table Name Expression

Cassandra table name. If the table name does not exist on the keyspace then it will create a new table.

Note: The user can create tables dynamically based on field name provided in table name expression.

Consistency Level

Consistency level refers to how up-to-date and synchronized a row of Cassandra data is on all its replicas.


Consistency levels are as follows:

ONE: Only a single replica must respond.

TWO: Two replicas must respond.

THREE: Three replicas must respond.

QUORUM: A majority (n/2 + 1) of the replicas must respond.

ALL: All of the replicas must respond.

LOCAL_QUORUM: A majority of the replicas in the local data center (whichever data center the coordinator is in) must respond.

EACH_QUORUM: A majority of the replicas in each data center must respond.

LOCAL_ONE: Only a single replica must respond. In a multi-data center cluster, this also guarantees that read requests are not sent to replicas in a remote data center.

Replication Strategy

A replication strategy specifies the implementation class for determining the nodes where replicas are placed. Possible strategies are SimpleStrategy and NetworkTopologyStrategy.

Replication Factor

Replication factor used to make additional copies of data.

Enable TTL

Select the checkbox to enable TTL (Time to Live) for records to persist for that time duration.

TTL Value

It will appear only when Enable TTL checkbox is selected.

Provide TTL value in seconds.

Checkpoint Storage Location

Select the checkpointing storage location. Available options are HDFS, S3, and EFS.

Checkpoint Connections

Select the connection. Connections are listed corresponding to the selected storage location.

Checkpoint Directory

It is the path where Spark Application stores the checkpointing data.


For HDFS and EFS, enter the relative path like /user/hadoop/, checkpointingDir system will add suitable prefix by itself.

For S3, enter an absolute path like: S3://BucketName/checkpointingDir

Time-Based Check Point

Select checkbox to enable timebased checkpoint on each pipeline run i.e. in each pipeline run above provided checkpoint location will be appended with current time in millis.

Batch Size

Number of records to be picked for inserting into Cassandra.

Output Mode

Output mode to be used while writing the data to Streaming emitter. Select the output mode from the given three options:

Append: Output Mode in which only the new rows in the streaming data will be written to the sink

Complete Mode: Output Mode in which all the rows in the streaming data will be written to the sink every time there are some updates

Update Mode: Output Mode in which only the rows that were updated in the streaming data will be written to the sink every time there are some updates.

Save Mode

Save Mode is used to specify the expected behavior of saving data to a data sink.

ErrorifExist: When persisting data, if the data already exists, an exception is expected to be thrown.

Append: When persisting data, if data/table already exists, contents of the Schema are expected to be appended to existing data.

Overwrite: When persisting data, if data/table already exists, existing data is expected to be overwritten by the contents of the data.

Ignore: When persisting data, if data/table already exists, the save operation is expected to not save the contents of the data and to not change the existing data.

This is similar to a CREATE TABLE IF NOT EXISTS in SQL.

Enable Trigger

Trigger defines how frequently a streaming query should be executed.

Processing Time

It will appear only when Enable Trigger checkbox is selected. Processing Time is the trigger time interval in minutes or seconds.

Add Configuration

Enables to configure additional Cassandra properties.

Note: Append output mode should only be used if an Aggregation Processor with watermarking is used in the data pipeline.

GCS

gathr provides GCS emitter. The configuration for GCS emitter is mentioned below:

Field

Description

Save as Dataset

Select the checkbox to save the schema as dataset. Mention the dataset name.

Connection Name

Choose the connection name from the drop down to establish the connection.

Override Credentials

Check the checkbox for user specific actions.

Service Account Key File

Upload GCP Service Account Key File to create connection. You can test the connection by clicking at the TEST CONNECTION button.

Bucket Name

Mention the bucket name.

Path

Mention the sub-directories of the bucket name mentioned above to which the data is to be written.

Output Type

Select the output format in which the results will be processed.

Delimiter

Select the message field separator.

Output Fields

Select the fields in the message that needs to be a part of the output data.

Partitioning Required

To partition the data, checkmark the box.

Partition Columns

Option to select fields on which the data will be partitioned.

Save Mode

Save Mode is used to specify the expected behavior of saving data to a data sink.


ErrorifExist: When persisting data, if the data already exists, an exception is expected to be thrown.


Append: When persisting data, if data/table already exists, contents of the Schema are expected to be appended to existing data.


Overwrite: When persisting data, if data/table already exists, existing data is expected to be overwritten by the contents of the Data.


Ignore: When persisting data, if data/table already exists, the save operation is expected to not save the contents of the Data and to not change the existing data.


This is similar to a CREATE TABLE IF NOT EXISTS in SQL

Check point Storage Location

Select the check pointing storage location. The available options are S3, HDFS, EFS.

Check point Connections

Select the connection from the drop-down list. Connections are listed corresponding to the selected storage location.

Override Credentials

Check the checkbox for user specific actions.

Username

The name of user through which the Hadoop service is running.


Note: Click TEST CONNECTION BUTTON to test the connection.

Checkpoint Directory

It is the path where Spark Application stores the checkpointing data.


For HDFS and EFS, enter the relative path like /user/hadoop/ , checkpointingDir system will add suitable prefix by itself.


For S3, enter an absolute path like: S3://BucketName/checkpointingDir

Time-Based Check Point

Select checkbox to enable timebased checkpoint on each pipeline run i.e. in each pipeline run above provided checkpoint location will be appended with current time in millis.

Enable Trigger

Trigger defines how frequently a streaming query should be executed.

Trigger Type

Available options in drop-down are:


One Time Micro Batch


Fixed Interval Micro Batch

ADD CONFIGURATION

User can add further configurations (Optional).


Note:


Add various Spark configurations as per requirement. For example: Perform imputation by clicking the ADD CONFIGURATION button. Note: For imputation replace nullValue/emptyValue with the entered value across the data. (Optional)


Example: nullValue =123, the output will replace all null values with 123

ENVIRONMENT PARAMS

Click the + ADD PARAM button to add further parameters as key-value pair.

Note: The user can further configure by clicking at the ADD CONFIGURATION button.

HBase

HBase emitter stores streaming data into HBase. It provides quick random access to huge amount of structured data.

Configure Hbase Emitter for Spark Pipelines

To add Hbase emitter to your pipeline, drag it to the canvas, connect it to a Data Source or processor, and right click on it to configure.

Note: If the data source in pipeline has a streaming component, then the emitter will show four additional properties, Checkpoint Storage Location; Checkpoint Connections; Checkpoint Directory; and Time-Based checkpoint.

Field

Description

Connection Name

All HBase connections will be listed here. Select a connection for connecting to HBase.

Batch Size

If user wants to index records in batch, for that the user has to specify batch size.

Table Name Expression

Javascript expression used to evaluate table name.

The keyspace will be formed as ns_+{tenanatId}. For example, ns_1

Compression

Provides the facility to compress the message before storing it. The algorithm used is Snappy.

When selected true, enables compression on data

Region Splitting Definition

This functionality defines how the HBase tables should be pre-split. The default value is ‘No pre-split’. The supported options are:

Default: No Pre-Split- Only one region will be created initially.

Based on Region Boundaries: Regions are created based on given key boundaries. For example, if your key is a hexadecimal key and you provide a value ‘4, 8, d’, it will create four regions as follows:


1st region for keys less than 4.


2nd region for keys greater than 4 and less than 8.


3rd region for keys greater than 8 and less than d.


4th region for keys greater than d.

Encoding

Data encoding type either UTF-8 (base encoding) or BASE 64(64 bit encoding).

Row Key Generator Type

Enables to generate the custom row key.


Following type of key generators are available:


UUID: Universally unique identifier.


Key Based: In this case, key is generated by appending the values of selected fields.

An additional field – “Key Fields” will be displayed where you can select the keys you want to combine. The keys will be appended in the same order as selected on the user interface.


Custom: Write your custom logic to create the row key. For example, if you want to use an UUID key but want to prefix it with HSBC, then you can write the logic in a Java class.


If you select this option then an additional field - “Class Name” will be displayed on UI where you need to mention the fully qualified class name of your Java class. You can download the sample project from the “Data Pipeline” landing page and refer Java class “com.yourcompany.custom.keygen.SampleKeyGenerator” to write the custom code.

Column Family

Specify the name of column family that will be used while saving your data in a HBase table.

Emitter Output Fields

Select the emitter out put fields.

Output Fields

Fields in the message that needs to be a part of the output message.

Replication

Enables to copy your data on underlying Hadoop file system. For example, if you specify “2” as Replication, then two copies will be created on HDFS

Ignore Missing Values

Ignore or persist empty or null values of message fields in emitter.

When selected true, ignores null value of message fields.

Connection Retries

The number of retries for component connection. Possible values are -1, 0 or positive number. -1 denotes infinite retries.

Delay Between Connection Retries

Defines the retry delay intervals for component connection in millis.

Enable TTL

Specifies the life time of a record. When selected, record will persist for that time duration which you specify in TTL field text box.

TTL Type

Specify the TTL type as Static or Field Value.

TTL Value

Provide TTL value in seconds in case of static TTL type or integer field in case of Field Value.

Checkpoint Storage Location

Select the checkpointing storage location. Available options are HDFS, S3, and EFS.

Checkpoint Connections

Select the connection. Connections are listed corresponding to the selected storage location.

Checkpoint Directory

It is the path where Spark Application stores the checkpointing data.


For HDFS and EFS, enter the relative path like /user/hadoop/ , checkpointingDir system will add suitable prefix by itself.

For S3, enter an absolute path like: S3://BucketName/checkpointingDir

Time-Based Check Point

Select checkbox to enable timebased checkpoint on each pipeline run i.e. in each pipeline run above provided checkpoint location will be appended with current time in millis.

Output Mode

Output mode to be used while writing the data to Streaming sink.

Select the output mode from the given three options:

Append: Output Mode in which only the new rows in the streaming data will be written to the sink.

Complete Mode: Output Mode in which all the rows in the streaming data will be written to the sink every time there are some updates.

Update Mode: Output Mode in which only the rows that were updated in the streaming data will be written to the sink every time there are some updates.

Enable Trigger

Trigger defines how frequently a streaming query should be executed.

ADD CONFIGURATION

Enables to configure additional properties.

Note: Index_field and store_field support is there using Add Configuration.

Click on the Next button. Enter the notes in the space provided.

Click on the DONE button for saving the configuration.

Hive

Hive emitter allows you to store streaming/batch data into HDFS. Hive queries can be implemented to retrieve the stored data.

To configure a Hive emitter, provide the database name, table name along with the list of fields of schema to be stored. This list of data rows get stored in Hive table, in a specified format, inside the provided database.

You must have the necessary permissions for creating table partitions and then writing to partition tables.

Configuring Hive Emitter for Spark Pipelines

To add a Hive emitter into your pipeline, drag it to the canvas, connect it to a Data Source or processor, and right click on it to configure.

Note: If the data source in pipeline has a streaming component, then the emitter will show four additional properties, Checkpoint Storage Location; Checkpoint Connections; Checkpoint Directory; and Time-Based checkpoint.

Field

Description

Save as Dataset

Select the checkbox to save the schema as Dataset.

Connection Name

All Hive connections will be listed here. Select a connection for connecting to Hive.

Checkpoint Storage Location

Select the checkpointing storage location. Available options are HDFS, S3, and EFS.

Checkpoint Connections

Select the connection. Connections are listed corresponding to the selected storage location.

Checkpoint Directory

It is the path where Spark Application stores the checkpointing data.


For HDFS and EFS, enter the relative path like /user/hadoop/, checkpointingDir system will add suitable prefix by itself.

For S3, enter an absolute path like: S3://BucketName/checkpointingDir

Time-Based Check Point

Select checkbox to enable timebased checkpoint on each pipeline run i.e. in each pipeline run above provided checkpoint location will be appended with current time in millis.

Database Name

HIVE database name.

Table Name

HIVE table name.

Output Fields

Fields in the schema that needs to be a part of the output data.

Lower Case

Convert all the selected partition columns into the lower while writing data into hive.

Format

TEXT: Stores information as plain text. Space’ ’ Delimiter is not supported in TEXT format.

ORC: ORC stands for Optimized Row Columnar which means it can store data in an optimized way than the other file formats.

AVRO: AVRO stores the data definition in JSON format making it easy to read and interpret.

Parquet: Parquet stores nested data structures in a flat columnar format.

Note: HDP 3.1.0, for an ORC format, table is created as Manage Table and for other formats such as Avro, Parquet and Text format, an external table is created.

Delimiter

Message field separator.

Output Mode

Output mode to be used while writing the data to Streaming sink.

Append: Output Mode in which only the new rows in the streaming data will be written to the sink

Complete Mode: Output Mode in which all the rows in the streaming data will be written to the sink every time there are some updates.

Update Mode: Output Mode in which only the rows that were updated in the streaming data will be written to the sink every time there are some updates.

Save Mode

Save Mode is used to specify the expected behavior of saving data to a data sink.

ErrorifExist: When persisting data, if the data already exists, an exception is expected to be thrown.

Append: When persisting data, if data/table already exists, contents of the Schema are expected to be appended to existing data.

Overwrite: When persisting data, if data/table already exists, existing data is expected to be overwritten by the contents of the Data.

Ignore: When persisting data, if data/table already exists, the save operation is expected to not save the contents of the Data and to not change the existing data.

This is similar to a CREATE TABLE IF NOT EXISTS in SQL.

Replication

Enables to copy your data on underlying Hadoop file system. For example, if you specify “2” as Replication, then two copies will be created on HDFS.

Enable Trigger

Trigger defines how frequently a streaming query should be executed.g

Processing Time

It will appear only when Enable Trigger checkbox is selected.

Processing Time is the trigger time interval in minutes or seconds.

Add Configuration

Enables to configure custom properties.

Schema Results

Column Name

Name of the column populated from the selected Table.

Mapping Value

Map a corresponding value to the column.

Data Type

Data type of the Mapped Value.

Ignore All

Select the Ignore All check box to ignore all the Schema Results or select a checkbox adjacent to the column to ignore that column from the Schema Results.


Use Ignore All or selected fields while pushing data to emitter.


This will add that field as the part of partition fields while creating the table.

Add partition Column

This will add that field as the part of partition fields while creating the table.

Auto Fill

Auto Fill automatically populates and map all incoming schema fields with the fetched table columns. The left side shows the table columns and right side shows the incoming schema fields.


If same field, as of table column, not found in incoming schema then the first field will be selected by default.

Download Mapping

It downloads the mappings of schema fields and table columns in a file.

Upload Mapping

Uploading the mapping file automatically populates the table columns and schema fields.

Click on the Next button. Enter the notes in the space provided.

Click on the DONE button after entering all the details.

NativeHDFS

HDFS emitter stores data in Hadoop Distributed File System.

To configure a Native HDFS emitter, provide the HDFS directory path along with the list of fields of schema to be written. These field values get stored in HDFS file(s), in a specified format, inside the provided HDFS directory.

Configuring Native HDFS for Spark Pipelines

To add Native HDFS emitter into your pipeline, drag the emitter on to the canvas, connect it to a Data Source or processor, and right click on it to configure it. You can also save it as a Dataset.

Note: If the data source in pipeline has a streaming component, then the emitter will show four additional properties, Checkpoint Storage Location; Checkpoint Connections; Checkpoint Directory; and Time-Based checkpoint.

Field

Description

Save As Dataset

When you select this checkbox, you will be able to save the data of this emitter a a dataset. After selecting this, provide a name to the Dataset.

Connection Name

All Native HDFS connections will be listed here.Select a connection for connecting to HDFS.

Save Mode

Save Mode is used to specify the expected behavior of saving data to a data sink.

ErrorifExist: When persisting data, if the data already exists, an exception is expected to be thrown.

Append: When persisting data, if data/table already exists, contents of the Schema are expected to be appended to existing data.

Overwrite: When persisting data, if data/table already exists, existing data is expected to be overwritten by the contents of the Data.

Ignore: When persisting data, if data/table already exists, the save operation is expected to not save the contents of the Data and to not change the existing data.

This is similar to a CREATE TABLE IF NOT EXISTS in SQL. .

HDFS Path

Directory path on HDFS where data has to be written.

Output Fields

Fields which will be part of output data.

Partitioning Required

If checked, table will be partitioned.

Partition List

Select the fields on which table is to be partitioned.

Output Type

Output format in which result will be processed.

Delimited: Delimited formats In a comma-separated values (CSV) file the data items are separated using commas as a delimiter.

JSON is an open-standard file format that uses human-readable text to transmit data objects consisting of attribute–value pairs and array data types (or any other serializable value).

ORC: ORC stands for Optimized Row Columnar which means it can store data in an optimized way than the other file formats.

AVRO: Avro stores the data definition in JSON format making it easy to read and interpret.

Parquet: Parquet stores nested data structures in a flat columnar format.

XML: Extensible Markup Language (XML) is a markup language that defines a set of rules for encoding documents in a format that is both human-readable and machine-readable.

In case of Multi Level JSON, multi array testing is done.


Output

<root>

<row>

<arrayArrayLevel1>[1,2]</arrayArrayLevel1>

<arrayArrayLevel1>[3,4]</arrayArrayLevel1>

<arrayArrayLevel1>[6,7]</arrayArrayLevel1>

<arrayJsonLevel1>

<array1StringLevel2>jj</array1StringLevel2>

</arrayJsonLevel1>

<arrayJsonLevel1>

<array2StringLevel2>jj22</array2StringLevel2>

</arrayJsonLevel1>

<arrayStringLevel1>a</arrayStringLevel1>

<arrayStringLevel1>b</arrayStringLevel1>

<arrayStringLevel1>c</arrayStringLevel1>

<doubleLevel1>10.0</doubleLevel1>

<intLevel1>1</intLevel1>

<jsonLevel1>

<jsonLevel2>

<stringLevel3>bye</stringLevel3>

</jsonLevel2>

<stringLevel2>hello</stringLevel2>

</jsonLevel1>

<stringLevel1>hi1</stringLevel1>

</row>

</root>


Sample nested json

 

{"stringLevel1":"hi1","intLevel1":1,"doubleLevel1":10.0,"jsonLevel1":{"stringLevel2":"hello","jsonLevel2":{"stringLevel3":"bye"}},"arrayStringLevel1":["a","b","c"],"arrayJsonLevel1":[{"array1StringLevel2":"jj"},{"array2StringLevel2":"jj22"}],"arrayArrayLevel1":[[1,2],[3,4],[6,7]]}


NOTE: Every option selected will produce a field as per the output selected, For example in case of Delimited, a Delimiter field is populated, select the delimiter accordingly.

Output Type

Output Format in which results will be processed.

Delimiter

Message Field separator.

Checkpoint Directory

It is the path where Spark Application stores the checkpointing data.


For HDFS and EFS, enter the relative path like /user/hadoop , checkpointingDir system will add suitable prefix by itself.

For S3, enter an absolute path like: S3://BucketName/checkpointingDir

Time-based Check Point

Select checkbox to enable time-based checkpoint on each pipeline run i.e. in each pipeline run above provided checkpoint location will be appended with current time in millis.

Row Tag

Row Tag for Output XML.

Root Tag

Root Tag for Output XML.

Block Size

Size of each block (in Bytes) allocated in HDFS.

Replication

Enables to make additional copies of data.

Compression Type

Algorithm used to compress the data.

ADD CONFIGURATION

Enables to configure additional Elasticsearch properties.

Note: In case of Multi Level JSON, multi array testing is done. (Confirm this note and where should it be included)

Every option selected will produce a field as per the output selected, for example in case of Delimited, a Delimiter field is populated, select the delimiter accordingly.

Limitation-This emitter works only for batch pipeline. Partitioning does not work when output format is selected as XML.

Click on the NEXT button. Enter the notes in the space provided. Click SAVE for saving the configuration details.

Elasticsearch

Elasticsearch emitter allows you to store data in Elasticsearch indexes.

Configuring Elasticsearch Emitter for Spark Pipelines

To add an Elasticsearch emitter into your pipeline, drag the emitter to the canvas and connect it to a Data Source or processor. The configuration settings are as follows:

Note: If the data source in pipeline has a streaming component, then the emitter will show four additional properties, Checkpoint Storage Location; Checkpoint Connections; Checkpoint Directory; and Time-Based checkpoint.

Field

Description

Connection Name

All Elasticsearch connections will be listed here.Select a connection for connecting to Elasticsearch.

Output Message

Output message which is to be indexed.

Index Nested JSON

Select the checkbox if nested JSON fields are to be indexed.

If the checkbox is not selected, three additional fields are populated: Index Number of Shards, Index Replication Factor and Output Fields.

If selected, these three fields will be hidden and following note be displayed.

“Index will be created with ElasticSearch default 5 shards and 1 replication factor.”

Note: If this checkbox is selected, you can use curly brackets to avail schema fields in the Index name. (shown below in the diagram)

Index Number of Shards

Number of shards to be created in Index Store.

Index Replication Factor

Number of additional copies of data.

Index Name

Index name can also be created using the document/data field like emp_{document_field}.


Here{document_field} will be replaced during runtime by the value of document field of that particular record.


Index name should be in lower case and follow naming conventions of Elasticsearch.


Specify the index name where data is to be indexed.

Verify dynamic index creation works only when below condition is fulfill.

1. Index Nested JSON check box should be selected on ES emitter.

2. Action.auto_create_index: true should be set in Elasticsearch cluster.

3. The field data should always be in lower case otherwise pipeline would fail.

Index Type

Index Type could be either Static or Dynamic.

Example of Dynamic Index Type:

Index type can also be created using the document/data field like emp_{document_field}.

Here {document_field} will be replaced during runtime by the value of document field of that record.

Note:

1. Default Index type or blank index type will be

index name +“_ type”.

2. Index type should follow naming conventions of elasticsearch.

3. Make sure that in case of a static index name, index type should also be static.

Checkpoint Storage Location

Select the checkpointing storage location. Available options are HDFS, S3, and EFS.

Checkpoint Connections

Select the connection. Connections are listed corresponding to the selected storage location.

Checkpoint Directory

It is the path where Spark Application stores the checkpointing data.

For HDFS and EFS, enter the relative path like /user/hadoop/, checkpointingDir system will add suitable prefix by itself.

For S3, enter an absolute path like: S3://BucketName/checkpointingDir

Time-based Check Point

Select checkbox to enable timebased checkpoint on each pipeline run i.e. in each pipeline run above provided checkpoint location will be appended with current time in millis.

ID Field Name

Specify a name for the generated ID field.

ID Generator Type

Enables to generate the ID field.


Following type of ID generators are available:

UUID: Universally unique identifier.

Field Values based: In this case, ID is generated by appending the values of selected fields.

If you select this option then an additional field – “Key Fields” will be displayed, where you need to select the fields you want to combine. The fields will be appended in the same order as selected on the user interface.

Custom: In this case, you can write your custom logic to create the ID field. For example, if you wish to use an UUID key but want to prefix it with “HSBC”, then you can write the logic in a Java class.

If you select this option then an additional field - “Class Name” will be displayed on user interface where you need to mention the fully qualified class name of your Java class.


You can download the sample project from the “Data Pipeline” landing page and refer Java class com.yourcompany.custom.keygen.SampleKeyGenerator to write the custom code.

Emitter Output Fields

Output fields of the emitter.

Connection Retries

Number of retries for component connection. Possible values are -1, 0 or positive number. -1 denotes infinite retries.

Output Mode

Output mode to be used while writing the data to Streaming sink.

Append: Output Mode in which only the new rows in the streaming data will be written to the sink.

Complete Mode: Output Mode in which all the rows in the streaming data will be written to the sink every time there are some updates.

Update Mode: Output Mode in which only the rows that were updated in the streaming data will be written to the sink every time there are some updates.

Enable Trigger

Trigger defines how frequently a streaming query should be executed.

Processing Time

It will appear only when Enable Trigger checkbox is selected.

Processing Time is the trigger time interval in minutes or seconds.

ADD CONFIGURATION

Enables additional configuration properties of Elasticsearch.

Click on the Next button. Enter the notes in the space provided.

Click on the DONE button for saving the configuration.

Salesforce

Salesforce as a target is supported by Gathr.

Configuring Salesforce Emitter for ETL Pipelines

To add a Salesforce emitter into your pipeline, drag the emitter on the canvas and connect it to a Data Source or processor. Right click on the emitter to configure it as explained below:

Configuration

Field

Description

Connection Name

Connections are the service identifiers.

A connection name can be selected from the list if you have created and saved connection details for Salesforce earlier.

Table Name

Existing table name of the specific Salesforce account should be selected.

Save Mode

Save Mode is used to specify the expected behavior of saving data to the target relational database table.

Append: When persisting data, if data/table already exists, contents of the Schema are expected to be appended to existing data.

Update: When persisting data, if data/table already exists, existing data is expected to be updated by the contents of the Data with additional option as specified in the Update Type option below.

If Save Mode field is selected with Update option, additional field will be displayed as given below:

External ID Field

A field for upsert operation should be selected.

Additionally, there is an option to create new custom Salesforce object.

Create Table

Check mark this option if you want to create a custom object in Salesforce.

If Create Table field is selected, additional fields will be displayed as given below:

Replace Table

Option to delete and create a new custom Salesforce object.

Table Name

The table name provided here will be used in tabs, page layouts and reports.

Record Name Field

The record name field appears in the page layouts, key lists, related lists, lookups and search results.

For example: The record name for Account is “Account Name” and for Case it is “Case Number”.

Data Type

Data type should be selected for the record name field.

Note: Table Label and Record Name must not be same.

Add Column

Option to add columns by providing Field Name and selecting Data Type and Default Value.

Save Mode

Save Mode is used to specify the expected behavior of saving data to the target relational database table.

Append: When persisting data, if data/table already exists, contents of the Schema are expected to be appended to existing data.

Update: When persisting data, if data/table already exists, existing data is expected to be updated by the contents of the Data with additional option as specified in the Update Type option below.

If Save Mode field is selected with Update option, additional field will be displayed as given below:

External ID Field

A field for upsert operation should be selected.

ADD CONFIGURATION

Enables to configure additional custom properties.

Note:

The fields that are read-only for any Salesforce emitter object will not appear in schema results.

Click on the Next button. Enter the notes in the space provided.

Click on the Done button for saving the configuration.

Note: The Salesforce emitter will support batch sources only.

Solr

Solr emitter allows you to store data in Solr indexes. Indexing is done to increase the speed and performance of search queries.   

Configuring Solr Emitter for Spark Pipelines

To add a Solr emitter into your pipeline, drag it on the canvas and connect it to a Data Source or processor. The configuration settings of the Solr emitter are as follows:

Note: If the data source in pipeline has a streaming component, then the emitter will show four additional properties, Checkpoint Storage Location; Checkpoint Connections; Checkpoint Directory; and Time-Based checkpoint.

Field

Description

Connection Name

All Solr connections are listed here. Select a connection for connecting to Solr.

Batch Size

If user wants to index records in batch, for that the user has to specify batch size.

KeySpace

Define a new or existing keyspace or its replication strategy.

Across Field Search Enabled

Specifies if full text search is to be enabled across all fields.

Index Number of Shards

Specifies number of shards to be created in index store.

Index Replication Factor

Specifies number of additional copies of data to be kept across nodes. Should be less than n-1, where n is the number of nodes in the cluster.

Index Expression

The MVEL Expression is used to evaluate the index name. This can help you leverage field based partitioning.

For example consider the expression below:

@{'ns_1_myindex' + Math.round(<MessageName>.timestamp 3600*1000))}

A new index will be created within one-hour time range, and data will be dynamically indexed based on fields whose field alias name is 'timestamp'.

Routing Required

This specifies if custom dynamic routing is to be enabled. If enabled, a json of routing policy needs to be defined.

ID Generator Type

Enables to generate the ID field.

Following types of ID generators are available:

Key Based:

Key Fields: Select message field to be used as key.

Select: Select all/id/sequence_number/File_id.

Note: Add key 'incremental_fields' and comma separated column names as values. This will work with a key based UUID

UUID: Universally unique identifier.

Custom: In this case, you can write your custom logic to create the ID field. For example, if you wish to use an UUID key but want to prefix it with “HSBC”, then you can write the logic in a java class.

If you select this option then an additional field - “Class Name” will be displayed on user interface where you need to mention the fully qualified class name of your Java class.

You can download the sample project from the “Data Pipeline” landing page and refer Java class com.yourcompany.custom.keygen.SampleKeyGenerator to write the custom code.

Enable TTL

Select TTL that limits the lifetime of the data.

TTL Type: Provide TTL type as either Static or Field Value.

TTL Value: Provide TTL value in seconds in case of static TTL type or integer field in case of Field Value.

Output Fields

Fields of the output message.

Ignore Missing Values

Ignore or persist empty or null values of message fields in sink.

Connection Retries

Number of retries for component connection. Possible values are -1, 0 or positive number. -1 denotes infinite retries.

If Routing Required =true, then:


Routing Policy - A json defining the custom routing policy. Example: {"1":{"company":{“Google”:20.0,"Apple":80.0}}}


Here 1 is the timestamp after which custom routing policy will be active, 'company' is the field name and the value 'Google' takes 20% shards and value 'Apple' takes 80% shards.

Delay Between Connection Retries

Defines the retry delay intervals for component connection in milliseconds.

Enable TTL

When selected, data will be discarded to TTL exchange specified.

Checkpoint Storage Location

Select the checkpointing storage location. Available options are HDFS, S3, and EFS.

Checkpoint Connections

Select the connection. Connections are listed corresponding to the selected storage location.

Checkpoint Directory

It is the path where Spark Application stores the checkpointing data.


For HDFS and EFS, enter the relative path like /user/hadoop/, checkpointingDir system will add suitable prefix by itself.

For S3, enter an absolute path like: S3://BucketName/checkpointingDir

Time-base Checkpoint

Select checkbox to enable time-based checkpoint on each pipeline run

Output Mode

Output mode to be used while writing the data to Streaming sink.

Append: Output Mode in which only the new rows in the streaming data will be written to the sink.

Complete Mode: Output Mode in which all the rows in the streaming data will be written to the sink every time there are some updates.

Update Mode: Output Mode in which only the rows that were updated in the streaming data will be written to the sink every time there are some updates.

Enable Trigger

Trigger defines how frequently a streaming query should be executed.

Add Configuration

The user can add further configuration.

Note: Index_field and store_field support is there using Add Configuration.


Vertica

VERTICA emitter supports Oracle, Postgres, MYSQL, MSSQL, DB2 connections.

You can configure and connect above mentioned DB-engines with JDBC. It allows you to emit data into DB2 and other sources into your data pipeline in batches after configuring JDBC channel.

Note: This is a batch component.

For using DB2, create a successful DB2 Connection.

Configuring Vertica Emitter for Spark Pipelines

To add a Vertica emitter into your pipeline, drag it on the canvas and connect it to a Data Source or processor. The configuration settings of the Solr emitter are as follows:

Note: If the data source in pipeline has a streaming component, then the emitter will show four additional properties, Checkpoint Storage Location; Checkpoint Connections; Checkpoint Directory; and Time-Based checkpoint.

Field

Description

Connection Name

All Vertica connections are listed here. Select a connection for connecting to Vertica.

Message Name

The name of the message configuration which will act as metadata for the actual data.

Table Name

Existing tablename of the specified database.

Is Batch Enable

Enable parameter to batch multiple messages and improve write performances.

Batch Size

Batch Size, which determines how many rows to insert per round trip. This can help the performance on JDBC drivers. This option applies only to writing. It defaults to 1000.

Connection Retries

Number of retries for component connection. Possible values are -1, 0 or positive number. -1 denotes infinite retries.

If Routing Required =true, then:


Routing Policy - A json defining the custom routing policy. Example: {"1":{"company":{“Google”:20.0,"Apple":80.0}}}


Here 1 is the timestamp after which custom routing policy will be active, 'company' is the field name and the value 'Google' takes 20% shards and value 'Apple' takes 80% shards.

Save Mode

Save Mode is used to specify the expected behavior of saving data to a data sink.

ErrorifExist: When persisting data, if the data already exists, an exception is expected to be thrown.

Append: When persisting data, if data/table already exists, contents of the Schema are expected to be appended to existing data.

Overwrite: When persisting data, if data/table already exists, existing data is expected to be overwritten by the contents of the Data.

Ignore: When persisting data, if data/table already exists, the save operation is expected to not save the contents of the Data and to not change the existing data.

This is similar to a CREATE TABLE IF NOT EXISTS in SQL.

Ignore Missing Values

Ignore or persist empty or null values of message fields in sink.

Delay Between Connection Retries

Defines the retry delay intervals for component connection in milliseconds.

Enable TTL

When selected, data will be discarded to TTL exchange specified.

Checkpoint Storage Location

Select the checkpointing storage location. Available options are HDFS, S3, and EFS.

Checkpoint Connections

Select the connection. Connections are listed corresponding to the selected storage location.

Checkpoint Directory

It is the path where Spark Application stores the checkpointing data.


For HDFS and EFS, enter the relative path like /user/hadoop/, checkpointingDir system will add suitable prefix by itself.

For S3, enter an absolute path like: S3://BucketName/checkpointingDir

Output Mode

Output mode to be used while writing the data to Streaming sink.

Append: Output Mode in which only the new rows in the streaming data will be written to the sink.

Complete Mode: Output Mode in which all the rows in the streaming data will be written to the sink every time there are some updates.

Update Mode: Output Mode in which only the rows that were updated in the streaming data will be written to the sink every time there are some updates.

Enable Trigger

Trigger defines how frequently a streaming query should be executed.

Schema Results

Table Column Name

Name of the column populated from the selected Table.

Mapping Value

Map a corresponding value to the column.

Database Data Type

Data type of the Mapped Value.

Ignore All

Select the Ignore All check box to ignore all the Schema Results or select a checkbox adjacent to the column to ignore that column from the Schema Results.


Use Ignore All or selected fields while pushing data to emitter.


This will add that field as the part of partition fields while creating the table.

Auto Fill

Auto Fill automatically populates and map all incoming schema fields with the fetched table columns. The left side shows the table columns and right side shows the incoming schema fields.


If same field, as of table column, not found in incoming schema then the first field will be selected by default.

Download Mapping

It downloads the mappings of schema fields and table columns in a file.

Upload Mapping

Uploading the mapping file automatically populates the table columns and schema fields.