Emitters
Emitters define the destination stage of a pipeline which could be a NoSql store, Indexer, relational database, or third party BI tool.
Note: Append output mode should only be used if an Aggregation Processor with watermarking is used in the data pipeline.
Advance Kafka Emitter stores data to Advance Kafka cluster. Data format supported are JSON and DE-LIMITED.
Configuring Advance Kafka Emitter for Spark Pipelines
To add a Advance Kafka Emitter into your pipeline, drag the Advance Kafka to the canvas, connect it to a channel or processor, and right click on it to configure.
|
Field |
Description |
|---|---|
|
Connection Name |
All Kafka connections will be listed here. Select a connection for connecting to Advance Kafka. |
|
Topic Name |
Advance Kafka topic name where you want to emit data. |
|
Partitions |
Number of partitions to be created for a topic. Each partition is ordered, immutable sequence of messages that is continually appended to a commit log. |
|
Replication Factor |
For a topic with replication factor N, Advance Kafka will tolerate up to N-1 failures without losing any messages committed to the log. |
|
Output Format |
Data Type format of the output message. |
|
Advance Kafka Partitioner |
Round Robin(Default): Advance Kafka uses the default partition mechanism to distribute the data. Key Based: Advance Kafka partitioning is done on the basis of keys. Custom: Enables to write custom partition class by implementing the com.streamanalytix.framework.api.spark.partitioner.SAXAdvance Kafka Partitioner interface. The partition method contains logic to calculate the destination partition and returns the target partition number |
|
Enable TTL |
Select the checkbox to enable TTL(Time to Live) for records to persist for that time duration |
|
Output Mode |
Output mode to be used while writing the data to Streaming emitter. Select the output mode from the given three options: Append Mode: This is the default mode, where only the new rows added to the Result Table since the last trigger will be delivered to the emitter. This is supported for only those queries where rows added to the Result Table are never going to change. Hence, this mode guarantees that each row will be output only once.
Complete: The whole Result Table will be delivered to the sink after every trigger. This is supported for aggregation queries.
Update: Only the rows in the Result Table that were updated since the last trigger will be delivered to the emitter. |
|
Enable Trigger |
Trigger defines how frequently a streaming query should be executed. |
|
Processing Time |
It will appear only when Enable Trigger checkbox is selected. Processing Time is the trigger time interval in minutes or seconds. |
|
CheckPoint directory |
Kafka offsets will be stored in the given checkpoint location. |
Cassandra emitter allows you to store data in a Cassandra table.
Configuring Cassandra emitter for Spark pipelines
To add an Cassandra emitter into your pipeline, drag the emitter to the canvas and connect it to a channel or processor. The configuration settings are as follows:
|
Field |
Description |
|---|---|
|
Connection Name |
All Cassandra connections will be listed here. Select a connection for connecting to Advance Kafka. |
|
keySpace |
Cassandra keyspace name. If keySpace name does not exist in Cassandra, then it will create new keyspace. |
|
Replication Strategy |
A replication strategy specifies the implementation class for determining the nodes where replicas are placed. Possible strategies are SimpleStrategy and NetworkTopologyStrategy. |
|
Replication Factor |
Replication factor used to make additional copies of data |
|
Table Name Expression |
Casandra table name. If the table name does not exist on the keyspace then it will create a new table. |
|
Consistency Level |
Consistency level refers to how up-to-date and synchronized a row of Cassandra data is on all its replicas.
Consistency levels are as follows: ONE Only a single replica must respond. TWO Two replicas must respond. THREE Three replicas must respond. QUORUM A majority (n/2 + 1) of the replicas must respond. ALL All of the replicas must respond. LOCAL_QUORUM A majority of the replicas in the local datacenter (whichever datacenter the coordinator is in) must respond. EACH_QUORUM A majority of the replicas in each datacenter must respond. LOCAL_ONE Only a single replica must respond. In a multi-datacenter cluster, this also guarantees that read requests are not sent to replicas in a remote datacenter. |
|
Output Fields |
Output messages fields |
|
key Columns |
A single/compound primary key consists of the partition key and one or more additional columns that determine clustering. |
|
Output Mode |
Output mode to be used while writing the data to Streaming emitter. Select the output mode from the given three options: Append Mode: This is the default mode, where only the new rows added to the Result Table since the last trigger will be delivered to the emitter. This is supported for only those queries where rows added to the Result Table are never going to change. Hence, this mode guarantees that each row will be output only once.
Complete: The whole Result Table will be delivered to the sink after every trigger. This is supported for aggregation queries.
Update: Only the rows in the Result Table that were updated since the last trigger will be delivered to the emitter. |
|
Save Mode |
Save operation specifies how to handle existing data if present.
ErrorifExist: When saving a DataFrame to a data source, if data already exists, an exception is expected to be thrown.
Append: When saving a DataFrame to a data source, if data/table already exists, contents of the DataFrame are expected to be appended to existing data.
Overwrite: Overwrite mode means that when saving a DataFrame to a data source, if data/table already exists, existing data is expected to be overwritten by the contents of the DataFrame.
Ignore: Ignore mode means that when saving a DataFrame to a data source, if data already exists, the save operation is expected to not save the contents of the DataFrame and to not change the existing data. This is similar to a CREATE TABLE IF NOT EXISTS in SQL. |
|
Enable TTL |
Select the checkbox to enable TTL (Time to Live) for records to persist for that time duration. |
|
TTL Value |
It will appear only when Enable TTL checkbox is selected. Provide TTL value in seconds. |
|
Enable Trigger |
Trigger defines how frequently a streaming query should be executed. |
|
Processing Time |
It will appear only when Enable Trigger checkbox is selected. Processing Time is the trigger time interval in minutes or seconds. |
|
CheckPoint directory |
Location where checkpoint data is stored. |
|
Add Configuration |
Enables to configure additional cassandra properties. |
CustomForeach is an action provided by Spark Structured Streaming. It provides a custom implementation for processing streaming data which is executed using the Foreach emitter.
Configuring CustomForeach Emitter for Spark Pipelines
To add a CustomForeach into your pipeline, drag the CustomForeach to the canvas, connect it to a channel or processor, and right click on it to configure.
|
Field |
Description |
|---|---|
|
Implementation Class |
Foreach implementation class to which control will be passed in order to process incoming data flow. |
|
Output Mode |
Output mode to be used while writing the data to Streaming sink.
Select the output mode from the given three options:
Append Mode: This is the default mode, where only the new rows added to the Result Table since the last trigger will be delivered to the emitter. This is supported for only those queries where rows added to the Result Table are never going to change. Hence, this mode guarantees that each row will be output only once. For example, queries with only select, where map, flatMap, filter, join etc will support Append mode.
Complete: The whole Result Table will be delivered to the emitter after every trigger. This is supported for aggregation queries.
Update: Only the rows in the Result Table that were updated since the last trigger will be delivered to the emitter. |
|
Enable Trigger |
Trigger defines how frequently a streaming query should be executed. |
|
Processing Time |
It will appear only when Enable Trigger checkbox is selected. Processing Time is the trigger time interval in minutes or seconds. |
|
ADD CONFIGURATION |
Additional properties can be added using ADD CONFIGURATION link. |
Click on the Next button. Enter the notes in the space provided.
Click on the SAVE button for saving the configuration.
The Container Emitter is used to sink the data in Couchbase. Configuring a Container Emitter
To add a Container Emitter into your pipeline, drag the Emitter to the canvas and right click on it to configure.
|
Field |
Description |
|---|---|
|
Load From Template |
Select the template from the list. Only those templates will be visible in the drop down list that are created globally or within that workspace. |
|
Sync With Source Template |
When a template is saved and is edited, this tab is reflected. If any change is made in source template, corresponding changes will reflect in Container’s configuration. |
|
Connection Name |
Name of the connection |
|
Bucket Name |
Couchbase Bucket name. Select the Couchbase bucket that gets generated with the Couchbase connection. Max Buckets per cluster is 10, by default. |
|
Document Id |
Unique document key for Couchbase to persist the document in Couchbase. If document id is not unique then document with the same id gets overridden every time. |
|
Check Point Directory |
It is the HDFS path where the Spark Application stores the checkpoint data. |
|
Retention Enable |
When selected, each newly created item lives for the number of seconds specified by retention policy. After the expiration time is reached item will be deleted by Couchbase server. |
|
Retention Policy |
TTL for the document to be persisted in the Couchbase. |
|
Output Fields |
Fields in the message that needs to be a part of the output data. |
|
Output Mode |
Output mode to be used while writing the data to Streaming emitter. Select the output mode from the given three options: Append Mode: This is the default mode, where only the new rows added to the Result Table since the last trigger will be delivered to the emitter. This is supported for only those queries where rows added to the Result Table are never going to change. Hence, this mode guarantees that each row will be output only once.
Complete: The whole Result Table will be delivered to the sink after every trigger. This is supported for aggregation queries.
Update: Only the rows in the Result Table that were updated since the last trigger will be delivered to the emitter. |
|
Enable Trigger |
Trigger defines how frequently a streaming query should be executed. |
|
Processing Time |
It will appear only when Enable Trigger checkbox is selected. Processing Time is the trigger time interval in minutes or seconds. |
Click on the NEXT button. An option to Save as Template will be available. Add notes in the space provided and click on Save as Template
Choose the scope of the Template (Global or Workspace) and Click SAVE for saving the Template and configuration details.
Custom emitter enables to write your own custom code for processing streaming and batch data as per the logic written in the custom emitter.
In case, you want to use any other emitter which is not provided by StreamAnalytix, you can make use of this emitter.
For example, if you want to store data to HDFS, you can write your own custom code and store the data.
Configuring Custom Emitter for Spark Pipelines
To add a custom emitter into your pipeline, drag the custom emitter to the canvas, connect it to a channel or processor, and right click on it to configure.
|
Field |
Description |
|---|---|
|
Component Mode |
Determines whether the emitter will act as a streaming or a batch emitter. Streaming: If the channel used in the pipeline is of type Streaming, select the component mode as Streaming and the emitter will act as a Streaming emitter. Batch: If the channel used in the pipeline is of type Batch, select the component type as Batch and the emitter will act as a Batch emitter.
|
|
If Component Mode selected is Streaming, you will view the fields listed below: |
|
|
Use Spark API |
Select this checkbox if you wish to use Spark inbuilt API’s.For example, “memory” and “Kafka”. If selected, you will view one additional field Sink Provider. |
|
Output Mode |
Output mode to be used while writing the data to Streaming sink.
Select the output mode from the given three options:
Append Mode: This is the default mode, where only the new rows added to the Result Table since the last trigger will be delivered to the emitter. This is supported for only those queries where rows added to the Result Table are never going to change. Hence, this mode guarantees that each row will be output only once. For example, queries with only select, where map, flatMap, filter, join etc will support Append mode.
Complete: The whole Result Table will be delivered to the sink after every trigger. This is supported for aggregation queries.
Update: Only the rows in the Result Table that were updated since the last trigger will be delivered to the emitter. |
|
checkpointLocation |
Location where the checkpoint data is stored. |
|
Sink Provider |
Enables to use Spark native implementation and custom emitter for storing data. In case of Spark native implementation, specify short name of the emitter where you wish to store data. For example, if you specify Kafka as Sink Provider, data will be stored in Kafka. If you wish to use your own custom emitter, provide fully qualified name of the class which implements StreamSinkProvider and DataSourceRegister interfaces. |
|
Enable Trigger |
Trigger defines how frequently a streaming query should be executed. |
|
Processing Time |
It will appear only when Enable Trigger checkbox is selected. Processing Time is the trigger time interval in minutes or seconds. |
|
If Component Mode selected is Batch, you will view only one field “Emitter Plugin”. |
|
|
Emitter Plugin |
Provide fully qualified class name which implements StreamAnalytix CustomSSEmitter interface.
You can download sample project from StreamAnalytix UI (Data Pipeline page) and refer SampleCustomEmitter class. |
|
ADD CONFIGURATION |
Enables to configure additional properties. |
Here is a small snippet of sample code that is used for writing data to HDFS.
|
public class CustomSinkProvider implements StreamSinkProvider, DataSourceRegister {
public Sink createSink(SQLContext sqlContext, Map<String, String> options, Seq<String> partitionCols, OutputMode outputMode) {
public String shortName() { }
public class MySink implements Sink { String message; public MySink(String msg) { public void addBatch(long batchId, Dataset<Row> dataset) { } |
Click on the Next button. Enter the notes in the space provided.
Click on the SAVE button for saving the configuration.
ElasticSearch emitter allows you to store data in ElasticSearch indexes.
Configuring ElasticSearch Emitter for Spark Pipelines
To add an Elastic Search emitter into your pipeline, drag the emitter to the canvas and connect it to a channel or processor. The configuration settings are as follows:
|
Field |
Description |
|---|---|
|
Connection Name |
All ElasticSearch connections will be listed here.Select a connection for connecting to ElasticSearch. |
|
Output Message |
Output message which is to be indexed. |
|
Index Nested JSON |
Select the checkbox if nested JSON fields are to be indexed. If unchecked, three additional fields are populated: Index Number of Shards, Index Replication Factor and Output Fields. If selected, these three fields will be hidden and following note be displayed. “Index will be created with Elastic Search default 5 shards and 1 replication factor.” Note: If this checkbox is selected, you can use Curly Brackets to avail schema fields in the Index name.(shown below in the diagram) |
|
Index Number of Shards |
Number of shards to be created in Index Store. |
|
Index Replication Factor |
Number of additional copies of data. |
|
Index Name |
Specify the index name where data is to be indexed. Verify dynamic index creation works only when below condition is fulfill. 1. Index Nested JSON check box should be selected on ES emitter. 2. Action.auto_create_index: true should be set in Elasticsearch cluster. 3. The field data should always be in lower case otherwise pipeline would fail. |
|
Check Point Directory |
HDFS Path where the Spark application stores the checkpoint data. |
|
ID Field Name |
Specify a name for the generated ID field. |
|
ID Generator Type |
Enables to generate the ID field.
Following type of ID generators are available:
UUID: Universally unique identifier.
Field Values based: In this case, ID is generated by appending the values of selected fields. If you select this option then an additional field – “Key Fields” will be displayed, where you need to select the fields you want to combine. The fields will be appended in the same order as selected on the user interface.
Custom: In this case, you can write your custom logic to create the ID field. For example, if you wish to use an UUID key but want to prefix it with “HSBC”, then you can write the logic in a Java class. If you select this option then an additional field - “Class Name” will be displayed on user interface where you need to mention the fully qualified class name of your Java class.
You can download the sample project from the “Data Pipeline” landing page and refer Java class com.yourcompany.custom.keygen.SampleKeyGenerator to write the custom code. |
|
Output Fields |
Output message fields. |
|
Connection Retries |
Number of retries for component connection. Possible values are -1, 0 or positive number. -1 denotes infinite retries. |
|
Output Mode |
Output mode to be used while writing the data to Streaming sink. Append Mode: Only the new rows added to the Result Table since the last trigger will be delivered to the emitter. This is supported for only those queries where rows added to the Result Table are never going to change. Hence, this mode guarantees that each row will be output only once. |
|
Enable Trigger |
Trigger defines how frequently a streaming query should be executed. |
|
Processing Time |
It will appear only when Enable Trigger checkbox is selected. Processing Time is the trigger time interval in minutes or seconds. |
|
ADD CONFIGURATION |
Enables to configure additional ElasticSearch properties. |
Click on the Next button. Enter the notes in the space provided.
Click on the SAVE button for saving the configuration.
HBase emitter stores streaming data into HBase. It provides quick random access to huge amount of structured data.
Configure Hbase Emitter for Spark Pipelines
To add Hbase emitter to your pipeline, drag it to the canvas, connect it to a channel or processor, and right click on it to configure.
|
Field |
Description |
|---|---|
|
Connection Name |
All HBase connections will be listed here.Select a connection for connecting to HBase. |
|
Table Name Expression |
Javascript expression used to evaluate table name. The keyspace will be formed as ns_+{tenanatId}. For example, ns_1 |
|
Compression |
Provides the facility to compress the message before storing it. The algorithm used is Snappy. when selected true, enables compression on data |
|
Region Splitting Definition |
This functionality defines how the HBase tables should be pre-split. The default value is ‘No pre-split’. The supported options are: Default: No Pre-Split- Only one region will be created initially. Based on Region Boundaries: Regions are created based on given key boundaries. For example, if your key is a hexadecimal key and you provide a value ‘4, 8, d’, it will create four regions as follows:
1st region for keys less than 4
2nd region for keys greater than 4 and less than 8
3rd region for keys greater than 8 and less than d
4th region for keys greater than d |
|
Encoding |
Data encoding type either UTF-8 (base encoding) or BASE 64(64 bit encoding). |
|
Row Key Generator Type |
Enables to generate the custom row key.
Following type of key generators are available:
UUID: Universally unique identifier.
Field Values Based: In this case, key is generated by appending the values of selected fields. If you select this option then an additional field – “Key Fields” will be displayed where you need to select the keys you want to combine. The keys will be appended in the same order as selected on the user interface.
Custom: In this case, you can write your custom logic to create the row key. For example, if you want to use an UUID key but want to prefix it with HSBC, then you can write the logic in a Java class.
If you select this option then an additional field - “Class Name” will be displayed on UI where you need to mention the fully qualified class name of your Java class. You can download the sample project from the “Data Pipeline” landing page and refer Java class “com.yourcompany.custom.keygen.SampleKeyGenerator” to write the custom code. |
|
Column Family |
Specify the name of column family that will be used while saving your data in a HBase table. |
|
Output Fields |
Fields in the message that needs to be a part of the output message. |
|
Replication |
Enables to copy your data on underlying Hadoop file system. For example, if you specify “2” as Replication, then two copies will be created on HDFS |
|
Ignore Missing Values |
Ignore or persist empty or null values of message fields in emitter. when selected true, ignores null value of message fields. |
|
Connection Retries |
The number of retries for component connection. Possible values are -1, 0 or positive number. -1 denotes infinite retries |
|
Delay Between Connection Retries |
Defines the retry delay intervals for component connection in millis. |
|
Enable TTL |
Specifies the life time of a record.when selected, record will persist for that time duration which you specify in TTL field text box. |
|
TTL Value |
Provide TTL value in seconds. |
|
Checkpoint Directory |
Location where the checkpoint data is stored. |
|
Output Mode |
Output mode to be used while writing the data to Streaming sink. Select the output mode from the given three options: Append Mode: This is the default mode, where only the new rows added to the Result Table since the last trigger will be delivered to the emitter. This is supported for only those queries where rows added to the Result Table are never going to change. Hence, this mode guarantees that each row will be output only once.
Complete: The whole Result Table will be delivered to the sink after every trigger. This is supported for aggregation queries.
Update: Only the rows in the Result Table that were updated since the last trigger will be delivered to the emitter. |
|
Enable Trigger |
Trigger defines how frequently a streaming query should be executed. |
|
Processing Time |
It will appear only when Enable Trigger checkbox is selected. Processing Time is the trigger time interval in minutes or seconds. |
|
ADD CONFIGURATION |
Enables to configure additional properties. |
Click on the Next button. Enter the notes in the space provided.
Click on the SAVE button for saving the configuration.
Hive emitter allows you to store streaming/batch data into HDFS. Hive queries can be implemented to retrieve the stored data.
To configure a Hive emitter, provide the database name, table name along with the list of fields of schema to be stored. This list of data rows get stored in Hive table, in a specified format, inside the provided database.
You must have the necessary permissions for creating table partitions and then writing to partition tables.
Configuring Hive Emitter for Spark Pipelines
To add a Hive emitter into your pipeline, drag it to the canvas, connect it to a channel or processor, and right click on it to configure.
|
Field |
Description |
|---|---|
|
Connection Name |
All Hive connections will be listed here. Select a connection for connecting to Hive. |
|
Check Point Directory |
HDFS path where Spark application stores checkpoint data. |
|
Database Name |
HIVE database name. |
|
Table Name |
HIVE table name. |
|
Output Fields |
Fields in the schema that needs to be a part of the output data. |
|
Partitioning Required |
Check this option if HIVE table is to be partitioned. You will view Partition List input box. |
|
Format |
PARQUET: Columnar storage format and includes compression of file. TEXT: Stores information as plain text.
Note: Space’ ’ Delimiter is not supported in TEXT format. |
|
Delimiter |
Message field separator. |
|
Output Mode |
Output mode to be used while writing the data to Streaming sink.
Append Mode: Only the new rows added to the Result Table since the last trigger will be delivered to the emitter. This is supported for only those queries where rows added to the Result Table are never going to change. Hence, this mode guarantees that each row will be output only once. |
|
Save Mode |
Save operation specifies how to handle existing data if present.
ErrorifExist: When saving a DataFrame to a data source, if data already exists, an exception is expected to be thrown.
Append: When saving a DataFrame to a data source, if data/table already exists, contents of the DataFrame are expected to be appended to existing data.
Overwrite: Overwrite mode means that when saving a DataFrame to a data source, if data/table already exists, existing data is expected to be overwritten by the contents of the DataFrame.
Ignore: Ignore mode means that when saving a DataFrame to a data source, if data already exists, the save operation is expected to not save the contents of the DataFrame and to not change the existing data. This is similar to a CREATE TABLE IF NOT EXISTS in SQL. |
|
Replication |
Enables to copy your data on underlying Hadoop file system. For example, if you specify “2” as Replication, then two copies will be created on HDFS. |
|
Enable Trigger |
Trigger defines how frequently a streaming query should be executed.g |
|
Processing Time |
It will appear only when Enable Trigger checkbox is selected. Processing Time is the trigger time interval in minutes or seconds. |
|
Add Configuration |
Enables to configure custom properties. |
Click on the Next button. Enter the notes in the space provided.
Click on the SAVE button after entering all the details.
JDBC Emitter allows you to push data to relational databases like MySQL, PostgreSQL, Oracle DB and MS-SQL.
Configuring JDBC Emitter for Spark pipelines
To add a JDBC emitter into your pipeline, drag the JDBC emitter on the canvas and connect it to a channel or processor. The Configuration Settings of the JDBC emitter are as follows:
|
Field |
Description |
|---|---|
|
Connection Name |
All JDBC connections will be listed here. Select a connection for connecting to JDBC. |
|
Message Name |
Message used in the pipeline |
|
Is batch Enable |
Parameter used to batch multiple messages. |
|
batch Size |
Number of messages to be batched together. |
|
Schema Name |
Existing database Schema Names whose tables are fetched.(for MSSQL and POSTGRES) |
|
Table Name |
Existing database tablename whose schema is to be fetched. |
|
Connection Retries |
Number of retries for component connection |
|
Delay Between Connection Retries |
Defines the retry delay intervals for component connection in millis. |
|
CheckPoint Directory |
Location where the checkpoint data is stored. |
|
Output Mode |
Output mode to be used while writing the data to Streaming emitter.
Select the output mode from the given three options:
Append Mode: This is the default mode, where only the new rows added to the Result Table since the last trigger will be delivered to the emitter. This is supported for only those queries where rows added to the Result Table are never going to change. Hence, this mode guarantees that each row will be output only once.
Complete: The whole Result Table will be delivered to the sink after every trigger. This is supported for aggregation queries.
Update: Only the rows in the Result Table that were updated since the last trigger will be delivered to the sink. |
|
Enable Trigger |
Trigger defines how frequently a streaming query should be executed. |
|
Processing Time |
It will appear only when Enable Trigger checkbox is selected. Processing Time is the trigger time interval in minutes or seconds. |
|
ADD CONFIGURATION |
Enables to configure custom properties. |
Click on the NEXT button. Enter the notes in the space provided. Click SAVE for saving the configuration.
Kafka emitter stores data to Kafka cluster. Data format supported are JSON and DELIMITED.
Configuring Kafka Emitter for Spark Pipelines
To add a Kafka emitter into your pipeline, drag the Kafka emitter on the canvas and connect it to a channel or processor. The Configuration Settings of the Kafka emitter are as follows:
|
Field |
Description |
|---|---|
|
Connection Name |
All Kafka connections will be listed here.Select a connection for connecting to Kafka. |
|
Topic Name |
Kafka topic name where you want to emit data. |
|
Partitions |
Number of partitions to be created for a topic. Each partition is ordered, immutable sequence of messages that is continually appended to a commit log. |
|
Replication Factor |
For a topic with replication factor N, Kafka will tolerate up to N-1 failures without losing any messages committed to the log. |
|
Producer Type |
Specifies whether the messages are sent asynchronously or synchronously in a background thread. Valid Values are async for asynchronous send and sync for synchronous send |
|
Output Format |
Data type format of the output message. |
|
Output Fields |
Message fields which will be a part of output data. |
|
Kafka Partitioner |
Round Robin(Default): Kafka uses the default partition mechanism to distribute the data. Key Based: kafka partitioning is done on the basis of keys. Custom: Enables to write custom partition class by implementing the com.streamanalytix.framework.api.spark.partitioner.SAXKafkaPartitioner interface.The partition method contains logic to calculate the destination partition and returns the target partition number. |
|
Enable TTL |
Select the checkbox to enable TTL(Time to Live) for records to persist for that time duration. |
|
Checkpoint Directory |
Location where the checkpoint data is stored. |
|
Output Mode |
Output mode to be used while writing the data to Streaming emitter. Select the output mode from the given three options: Append Mode: This is the default mode, where only the new rows added to the Result Table since the last trigger will be delivered to the emitter. This is supported for only those queries where rows added to the Result Table are never going to change. Hence, this mode guarantees that each row will be output only once. Complete: The whole Result Table will be delivered to the sink after every trigger. This is supported for aggregation queries. Update: Only the rows in the Result Table that were updated since the last trigger will be delivered to the emitter. |
|
Enable Trigger |
Trigger defines how frequently a streaming query should be executed. |
|
Processing Time |
It will appear only when Enable Trigger checkbox is selected. Processing Time is the trigger time interval in minutes or seconds. |
Kinesis emitter emits data to Amazon Kinesis stream. Supported data type formats of the output are Json and Delimited.
Configure Kinesis Emitter for Spark Pipelines
To add a Kinesis emitter into your pipeline, drag the emitter on to the canvas, connect it to a channel or processor, and right click on it to configure it.
|
Field |
Description |
|---|---|
|
Connection Name |
All Kinesis connections will be listed here. Select a Kinesis connection for connecting to Kinesis. |
|
Stream Name |
Name of the Kinesis Stream. |
|
Shard Count |
Number of shards required to create the stream, in case stream is not present. |
|
Region |
Name of the region. For example, us-west-2 |
|
Partition Key |
Used by Amazon Kinesis to distribute data across shards. |
|
Output Format |
Datatype format of the output. |
|
Output Fields |
Fields in the message that needs to be a part of the output data. |
|
Checkpoint Directory |
Location where the checkpoint data is stored. |
|
Output Mode |
Mode to be used while writing the data to Streaming sink.
Select the output mode from the given three options:
Append Mode: This is the default mode, where only the new rows added to the Result Table since the last trigger will be delivered to the emitter. This is supported for only those queries where rows added to the Result Table are never going to change. Hence, this mode guarantees that each row will be output only once.
Complete: The whole Result Table will be delivered to the sink after every trigger. This is supported for aggregation queries.
Update: Only the rows in the Result Table that were updated since the last trigger will be delivered to the emitter. |
|
Enable Trigger |
Trigger defines how frequently a streaming query should be executed. |
|
Processing Time |
It will appear only when Enable Trigger checkbox is selected. Processing Time is the trigger time interval in minutes or seconds. |
|
ADD CONFIGURATION |
Enables to configure additional properties. |
Click on the Next button. Enter the notes in the space provided. Click Save for saving the configuration details.
Mqtt emitter emits data to Mqtt queue or topic. Supported output formats are Json, and Delimited.
Mqtt supports wireless network with varying levels of latency.
Configuring Mqtt Emitter for Spark Pipelines
To add Mqtt emitter into your pipeline, drag the emitter on to the canvas, connect it to a channel or processor, and right click on it to configure it.
|
Field |
Description |
|---|---|
|
Connection Name |
All Mqtt connections will be listed here. Select a connection for connecting to Mqtt. |
|
Queue Name/Topic Name |
Queue or topic name to which messages will be published. |
|
Output Format |
Datatype format of the output. JSON and Delimited. If Delimited is selected, provide a delimiter as well. |
|
Output Fields |
Fields in the message that needs to be a part of the output data. |
|
Check Point Directory |
It is the HDFS path where the checkpoint data is saved. |
|
Output Mode |
Output mode to be used while writing the data to Streaming sink.
Select the output mode from the given three options:
Append Mode: This is the default mode, where only the new rows added to the Result Table since the last trigger will be delivered to the emitter. This is supported for only those queries where rows added to the Result Table are never going to change. Hence, this mode guarantees that each row will be output only once.
Complete: The whole Result Table will be delivered to the sink after every trigger. This is supported for aggregation queries.
Update: Only the rows in the Result Table that were updated since the last trigger will be delivered to the sink |
|
Enable Trigger |
Trigger defines how frequently a streaming query should be executed. |
|
Processing Time |
Trigger time interval in minutes or seconds. |
|
ADD CONFIGURATION |
Enables to configure additional properties. |
Click on the Add Notes tab. Enter the notes in the space provided.
Click SAVE for saving the configuration.
HDFS emitter stores data in Hadoop Distributed File System.
To configure a Native HDFS emitter, provide the HDFS directory path along with the list of fields of schema to be written. These field values get stored in HDFS file(s), in a specified format, inside the provided HDFS directory.
Configuring Native HDFS for Spark Pipelines
To add Native HDFS emitter into your pipeline, drag the emitter on to the canvas, connect it to a channel or processor, and right click on it to configure it.
|
Field |
Description |
|---|---|
|
Connection Name |
All Native HDFS connections will be listed here.Select a connection for connecting to HDFS. |
|
Output Mode |
Output mode to be used while writing the data to Streaming sink.
Append Mode: In this mode, the new rows added to the Result Table since the last trigger will be delivered to the sink. This is supported for only those queries where rows added to the Result Table are never going to change. Hence, this mode guarantees that each row will be output only once. |
|
Save Mode |
Save operation specifies how to handle existing data if present.
ErrorifExist: When saving a DataFrame to a data source, if data already exists, an exception is expected to be thrown.
Append: When saving a DataFrame to a data source, if data/table already exists, contents of the DataFrame are expected to be appended to existing data.
Overwrite: Overwrite mode means that when saving a DataFrame to a data source, if data/table already exists, existing data is expected to be overwritten by the contents of the DataFrame.
Ignore: Ignore mode means that when saving a DataFrame to a data source, if data already exists, the save operation is expected to not save the contents of the DataFrame and to not change the existing data. This is similar to a CREATE TABLE IF NOT EXISTS in SQL. |
|
Enable Trigger |
Trigger defines how frequently a streaming query should be executed. |
|
Processing Time |
Trigger time interval in minutes. |
|
HDFS Path |
Directory path on HDFS where data has to be written. |
|
Output Fields |
Fields which will be part of output data. |
|
Partitioning Required |
if checked, table will be partitioned. |
|
Partition List |
Select the fields on which table is to be partitioned. |
|
Output Type |
Output format of the request.Supported formats are CSV and JSON. |
|
Delimiter |
Delimiter character is used to separate two fields. |
|
Check Point Directory |
HDFS path where Spark application stores the checkpoint data. |
|
Block Size |
Size of each block(in Bytes) allocated in HDFS. |
|
Replication |
Enables to make additional copies of data. |
|
Compression Type |
Algorithm used to compress the data. |
|
ADD CONFIGURATION |
Enables to configure additional ElasticSearch properties. |
Click on the NEXT button. Enter the notes in the space provided. Click SAVE for saving the configuration details.
JMS is mainly used to send and receive message from one application to another.OpenJms emitter is used when you want to write data to JMS queue or topic.All applications which have subscribed to those topics/queues will be able to read that data.
Configuring OpenJms Emitter for Spark Pipelines
To add OpenJms emitter into your pipeline, drag the emitter on to the canvas, connect it to a channel or processor, and right click on it to configure it.
|
Field |
Description |
|---|---|
|
Connection Name |
All OpenJms connections will be listed here.Select a connection for connecting to the OpenJMS. |
|
Queue Name |
Queue or topic name to which messages will be published. |
|
Output Format |
Select the data format in which OpenJMS should write the data. |
|
Output Fields |
Select the fields which should be a part of the output data. |
|
Output Mode |
Mode to be used while writing the data to Streaming sink Select the output mode from the given three options:
Append Mode: This is the default mode, where only the new rows added to the Result Table since the last trigger will be delivered to the emitter. This is supported for only those queries where rows added to the Result Table are never going to change. Hence, this mode guarantees that each row will be output only once.
Complete: The whole Result Table will be delivered to the sink after every trigger. This is supported for aggregation queries.
Update: Only the rows in the Result Table that were updated since the last trigger will be delivered to the emitter. |
|
Checkpoint Directory |
Location where the checkpoint data is stored. |
|
Enable Trigger |
Trigger defines how frequently a streaming query should be executed. |
|
Processing Time |
It will appear only when Enable Trigger checkbox is selected. Processing Time is the trigger time interval in minutes or seconds. |
|
ADD CONFIGURATION |
Enables to configure additional properties. |
Click on the Next button. Enter the notes in the space provided.
Click SAVE for saving the configuration details.
The RabbitMQ emitter is used when you want to write data to RabbitMQ cluster.
Data formats supported are JSON and DELIMITED.
Configuring RabbitMQ Emitter for Spark Pipelines
To add a RabbitMQ emitter into your pipeline, drag the RabbitMQ emitter on the canvas and connect it to a channel or processor. Right click on the emitter to configure it as explained below:
|
Field |
Description |
|---|---|
|
Connection Name |
All RabbitMQ connections will be listed here.Select a connection for connecting to the RabbitMQ server. |
|
Exchange Name |
Rabbit MQ Exchange name |
|
Exchange Type |
Specifies how messages are routed through it.
Direct: Delivers message to queues based on a message routing key.
Fanout: Routes message to all of the queues that are bound to it.
Topic: Does a wildcard match between the routing key and the routing pattern specified in the binding. |
|
Exchange Durable |
Specifies whether exchange will be deleted or remain active on server restart.
TRUE: Exchange will not be deleted if you restart RabbitMQ server.
FALSE: Exchange will be deleted if you restart RabbitMQ server. |
|
Routing Key |
Select RabbitMQ routing Key where data will be published. |
|
Queue Name |
RabbitMQ queue name where data will be published. |
|
Queue Durable |
Specifies whether queue will remain active or deleted on server restart. TRUE: Queue will not be deleted if you restart RabbitMQ. FALSE: Queue will be deleted if you restart RabbitMQ. |
|
Output Format |
Select the data format in which RabbitMQ should write the data. |
|
Output Fields |
Select the fields which should be a part of the output data. |
|
Enable Message TTL |
when selected, message will be discarded to TTL exchange specified. |
|
Message TTL |
Time to live in seconds after which message will be discarded to TTL Exchange specified. |
|
TTL Exchange |
Exchange to which message will be sent once time to live expires. |
|
TTL Queue |
Queue on which message will be sent once time to live expires. |
|
TTL Routing Key |
Routing key used to bind TTL queue with TTL exchange. |
|
Check Point Directory |
Location where the checkpoint data is stored. |
|
Output Mode |
Output mode to be used while writing the data to Streaming sink.
Select the output mode from the given three options:
Append Mode: This is the default mode, where only the new rows added to the Result Table since the last trigger will be delivered to the emitter. This is supported for only those queries where rows added to the Result Table are never going to change. Hence, this mode guarantees that each row will be output only once.
Complete: The whole Result Table will be delivered to the sink after every trigger. This is supported for aggregation queries.
Update: Only the rows in the Result Table that were sent since the last trigger will be delivered to the sink. |
|
Enable Trigger |
Trigger defines how frequently a streaming query should be executed. |
|
Processing Time |
Trigger time interval in minutes or seconds. |
|
Add Configuration |
Enables to configure additional RabbitMQ properties. |
Click on the NEXT button. Enter the notes in the space provided.Click SAVE for saving the configuration details.
Streaming emitter enables you to visualize the data running in the pipeline in the StreamAnalytix built in real time dashboards. For example, you may use Streaming emitter to view real time price fluctuation of stocks.
|
Field |
Description |
|---|---|
|
Stream Id |
Exchange name on which streaming messages will be sent. |
|
Output Mode |
Output mode to be used while writing the data to Streaming sink.
Select the output mode from the given three options:
Append Mode: This is the default mode, where only the new rows added to the Result Table since the last trigger will be delivered to the sink. This is supported for only those queries where rows added to the Result Table are never going to change. Hence, this mode guarantees that each row will be output only once.
Complete: The whole Result Table will be delivered to the sink after every trigger. This is supported for aggregation queries.
Update: Only the rows in the Result Table that were updated since the last trigger will be delivered to the emitter. |
|
Enable Trigger |
Trigger defines how frequently a streaming query should be executed. |
|
Processing Time |
Trigger time interval in minutes or seconds. |
|
ADD CONFIGURATION |
Enables to configure additional custom properties. |
Click on the Next button. Enter the notes in the space provided. Click Save for saving the configuration details.
Solr emitter allows you to store data in Solr indexes. Indexing is done to increase the speed and performance of search queries.
Configuring Solr Emitter for Spark Pipelines
To add a Solr emitter into your pipeline, drag it on the canvas and connect it to a channel or processor. The configuration settings of the Solr emitter are as follows:
|
Field |
Description |
|---|---|
|
All Solr connections are listed here. Select a connection for connecting to Solr. |
|
|
Across Field Search Enabled |
Specifies if full text search is to be enabled across all fields. |
|
Index Number of Shards |
Specifies number of shards to be created in index store. |
|
Index Replication Factor |
Specifies number of additional copies of data to be kept across nodes. Should be less than n-1, where n is the number of nodes in the cluster. |
|
Index Expression |
The MVEL Expression is used to evaluate the index name. This can help you leverage field based partitioning.
For example consider the expression below:
@{'ns_1_myindex' + Math.round(<MessageName>.timestamp 3600*1000))}
Here a new index will be created with one-hour time range and data will be dynamically indexed based on field whose field alias name is 'timestamp'. |
|
Routing Required |
This specifies if custom dynamic routing is to be enabled. If enabled, a routing policy json needs to be defined as shown in the below figure. |
|
ID Generator Type |
Enables to generate the ID field.
Following type of ID generators are available:
UUID: Universally unique identifier.
Field Values based: In this case, ID is generated by appending the values of selected fields. If you select this option then an additional field – “Key Fields” will be displayed, where you need to select the fields you want to combine. The fields will be appended in the same order as selected on the user interface.
Custom: In this case, you can write your custom logic to create the ID field. For example, if you wish to use an UUID key but want to prefix it with “HSBC”, then you can write the logic in a java class. If you select this option then an additional field - “Class Name” will be displayed on user interface where you need to mention the fully qualified class name of your Java class.
You can download the sample project from the “Data Pipeline” landing page and refer Java class com.yourcompany.custom.keygen.SampleKeyGenerator to write the custom code.
|
|
Output Fields |
Fields of the output message. |
|
Ignore Missing Values |
Ignore or persist empty or null values of message fields in emitter. |
|
Connection Retries |
Number of retries for component connection. Possible values are -1, 0 or positive number. -1 denotes infinite retries. If Routing Required =true, then:
Routing Policy - A json defining the custom routing policy. Example: {"1":{"company":{“Google”:20.0,"Apple":80.0}}} where 1 is the timestamp after which custom routing policy will be active, 'company' is the field name and the value 'Google' takes 20% shards and value 'Apple' takes 80% shards. |
|
Delay Between Connection Retries |
Defines the retry delay intervals for component connection in milliseconds. |
|
Enable TTL |
when selected, message will be discarded to TTL exchange specified. |
|
Checkpoint Directory |
Location where the checkpoint data is stored. |
|
Output Mode |
Output mode to be used while writing the data to Streaming sink. Append Mode: In this mode, the new rows added to the Result Table since the last trigger will be delivered to the sink. This is supported for only those queries where rows added to the Result Table are never going to change. Hence, this mode guarantees that each row will be output only once.
|
|
Enable Trigger |
Trigger defines how frequently a streaming query should be executed. |