A data source is the location where huge volumes of batch/streaming data that is being used originates from. It is the source data which is transformed using processors and ML algorithms that is ultimately emitted using a wide range of data warehouses. Gathr provides a range of data sources for its ETL and analytics functions.
Typically, you start creating a pipeline by selecting a Data Source for reading data. Data Source can also help you to infer schema from Schema Type which can directly from the selected source or by uploading a sample data file.
Gathr Data Sources are built-in drag and drop operators. The incoming data can be in any form such as message queues, transactional databases, log files and many more.
Gathr runs on the computation system: Spark.
Within Spark, we have two types of Data Source’s behavior:
• Streaming Data Sources
• Batch Data Sources
The user may want to perform certain actions before the execution of source components. A Pre-Action tab is available at the source.
These actions could be performed on the Source:
Field | Description |
---|---|
SQL | The user can execute SQL from the actions option by selecting the connection at which he wants to execute the SQL and provide the query which he wants to execute. |
Shell Script | The user can invoke a Shell Script as an Action. To invoke the shell script there are two ways to provide the input: - Writing the shell script in the inline editor - The user can upload the shell script file to execute it. |
Stored Procedure | With this option user can execute a stored procedure or function stored in a database. To do so, the user has to select an appropriate JDBC connection. Further he would be required to select the relevant stored procedure appearing in the drop-down menu. |
Note:
- If for some reason the action configured fails to execute, the user has an option to check mark the ‘Ignore Action Error’ option so that the pipeline runs without getting impacted.
- By check marking the ‘Ignore while execution’ option, the configuration will remain intact in the pipeline, but the configured action will not get executed.
- The user can also configure multiple actions by clicking at the Add Action button.
This feature helps define the schema while loading the data; with Auto Schema feature, first load data from a source then infer schema, modify data types and determine the schema on the go.
This feature helps design the pipeline interactively. Data can be loaded in the form of CSV/TEXT/JSON/XML/Fixed Length/Binary/Avro/ORC and Parquet file or you can fetch data from sources such as Kafka, JDBC and Hive. Auto schema enables the creation of schema within the pre-built operators and identifies columns in CSV/JSON/TEXT/XML/Fixed Length/Binary (Kinesis supports Avro Binary format) and Parquet.
Gathr starts the inference by ingesting data from the data source. As each component processes data, the schema is updated (as per the configuration and the logic applied). During the process, Gathr examines each field and attempts to assign a data type to that field based on the values in the data.
All our components (source) have auto schema enabled. Below mentioned are the common configuration of Auto schema for every Data Source.
To add a Data Source to your pipeline, drag the Data Source to the canvas and right click on it to configure.
There are three tabs for configuring a component.
1.Schema Type
2.Configuration
3.Incremental Read
4.Add Notes
Note: An additional tab (DETECT SCHEMA) is reflected. This tab presents your data in the form of Schema.
Schema Type allows you to create a schema and the fields. On the Schema Type tab, select either of the below mentioned options:
• Fetch from Source
• Upload Data File
• Use Existing Dataset
Whether the data is fetched from the source or uploaded, the following configuration properties remain the same:
Field | Description |
---|---|
Type Of Data | Input format of Data. |
Is Header Included in Source | Option to enable header in source data. |
Enable PII Masking | Option to enable PII (Personally Identifiable Information) masking. To know more, see PII Masking processor. |
Max no. of Rows | Maximum no. of the rows. Sample to pull from Streaming Source. |
Trigger time for Sample | Minimum wait time before system fires a query to fetch data. Example: If it is 15 Secs, then the system will first wait for 15 secs and will fetch all the rows available and create dataset out of it. |
Sampling Method | Dictates how to extract sample records from the complete data fetched from the source. Following are the ways: -Top N: Extract top n records using limit() on dataset. -Random Sample: Extract Random Sample records applying sample transformation and then limit to max number of rows in dataset. |
Note: In case of ClickStream Data Source, you can also choose from an existing schema.
Fetch data from any data source.
Depending on the type of data, determine the Data Source you want to use, then recognize the type of data (or data format) and its corresponding delimiters.
These are types of data formats supported by Gathr to extract schema from them; Avro, CSV, JSON, TEXT, XML, Fixed Length, ORC and Parquet.
Once you choose a data type, you can edit the schema and then configure the component as per your requirement.
Note: Whenever Fetch from Source is chosen, the sequence of tabs will be as follows:
Fetch from Source< Configuration < Schema < Add Notes.
The data that is being fetched from the source is in the CSV format. From within the CSV, the data columns that will be parsed, will be accepting the following delimiters.
• Tab
• , Comma
• : Colon
• ; Semi Colon
• | Pipe
The default delimiter is comma (,).
If the data is fetched from the source, after uploading a CSV/TEXT/JSON/Parquet, next tab is the Configuration, where the Data Sources are configured and accordingly the schema is generated. Then you can add notes, and save the Data Source’s configuration.
Select JSON as your Type of Data. The data that is being fetched from the source is in JSON format. The source will read the data in the format it is available.
The System will fetch the incoming XML data from the source.
Select XML as your Type of Data and provide the XML XPath value.
XML XPath - It is the path of the tag of your XML data to treat as a row. For example, in this XML <books> <book><book>...</books>, the appropriate XML Xpath value for book would be /books/book.
The Default value of XML XPath is '/' which will parse the whole XML data.
The System will parse the incoming fixed length data. Select 'Fixed Length' as your Type of Data and Field Length value.
The Field length value is a comma separated length of each field.
For Example:
If there are 3 fields f1,f2,f3 and their max length is 4,10 & 6 respectively.Then the field length value for this data would be 4,10,6.
Select Parquet as your Type of data. The file that is being fetched from the source is in parquet format. The source will read the data in the format it is available.
Text message parser is used to read data in any format from the source, which is not allowed when csv, json etc. parsers are selected in configuration. Data is read as text, a single column dataset.
When you select the TEXT as “Type of Data” in configuration, data from the source is read by Gathr in a single column dataset.
To parse the data into fields append the Data Source with a Custom Processor where custom logic for data parsing can be added.
Upload your data either by using a CSV, TEXT, JSON, XML, Fixed Length, Avro, Parquet, OCR file.
Once you choose either data type, you can edit the schema and then configure the component as per your requirement:
Note: Whenever you choose Upload Data File, the sequence of tabs will be as follows:
Upload Data File< Detect Schema< Configuration< Add Notes.
You can choose the data type as CSV and the data formats are as follows:
• Comma
• Tab
• Colon
• Semi Colon
• Pipe
• A CTRL+A
You need to specify the format of the data source. For this a few sample records need to be provided such as Max no of Rows, Trigger time for Sample, Sampling method.
Upon selecting CSV option as Type of data, follow below instructions for specific scenarios.
The user can provide their own custom delimiter while selecting or uploading a .csv file. There are a few use cases where user needs to specifically provide some special ASCII characters for the system to detect those characters. For an example while providing a backslash/double quote/delimiter which is a part of the value itself, you need to provide the special ASCII characters as explained below:
1. If a Delimiter is part of any Value, then the respective value needs to be quoted with double quotes.
For an example: Hash(#) is the Delimiter and the provided value is @$"#"$^&#*( which is having # as a value therefore in this scenario en-quote this value with ““, so that the value appears as "@$"#"$^&#*("
In this case the complete row will appear as- "@$"#"$^&#*("#"2017-07-23"#"Kayle Duffill"#"Furniture"#"Female"#"Cash"#"+1 (717) 934-4029"
2. If Double Quote is part of the value, then the double quotes in the value needs to be replaced by placing another quote with the actual value as mentioned in the below example:
"@$"#"$^&#*(" there are two double quotes present in value, so both should be preceded by another quote which means our final value of column will be - "@$”"#”"$^&#*(",
Note: The quotes surrounding the complete column value which you add to escape delimiter{#) should not be double quoted, only the double quote which is in the value should be double quoted.
3. If user requires to use a backslash (\) as a single delimiter value then in that case, the actual delimiter value that has to be provided by the user has to be a double backslash (\\)for the system to read it as a single backslash value. Likewise, in case double backslash (\\) value is to be provided in the value, then the user will be required to provide four backslash values (\\\\)for the system to read it as double backslashes.
Example of a sample test data:
"name"\"age"\"phone"
"virat"\"23"\"987653451"
"yuvraj"\"37\""
"sachin"\"23"\"98765345245431"
"saurav"\"40"\"9876523sdf"
"dhoni"\"40"\"987653451"
channel used :=blob batch
upload file,select csv as file type and enter \\ as delimiter
Note: In the subsequent step you can validate the schema of the incoming source. For example: after CSV/TEXT/JSON/XML Fixed Length is uploaded, next tab is the Detect Schema, which can be edited by overriding the automatically detected schema.
Next is the Configuration tab, where the Data Sources are configured as per the schema. Then you can add notes, and save the Data Source’s configuration.
Following fields in the schema are editable:
Note: Nested JSON cannot be edited.
• Schema Name: Name of the Schema can be changed.
• Column Alias: Column alias can be renamed.
• Date and Timestamp: Formats of dates and timestamps can be changed. Note: Gathr has migrated from spark 2 to spark 3.0. For further details on upgrading Gathr to Spark 3.0, refer to the link shared below:
https://spark.apache.org/docs/3.0.0-preview2/sql-migration-guide.html
• Data Type: Field type can be any one of the following and they are editable:
• String
• Integer
• Long
• Short
• Double
• Float
• Byte
• Boolean
• Binary
• Vector
When you select JSON as your type of data, the JSON is uploaded in the Detect Schema tab, where you can edit the Key-Value pair.
After the schema is finalized, the next tab is configuration of Data Source (Below mentioned are the configurations of every Data Source).
Note: In case of Hive and JDBC, the configuration tab appears before the Detect schema tab.
The Text files can be uploaded to a source when the data available requires customized parsing. Text message parser is used to read data in any format from the source, which is not allowed when CSV, JSON, etc. parsers are selected in configuration.
To parse the data into fields append the Data Source with a Custom Processor where custom logic for data parsing can be added.
The system will parse the incoming XML data from the uploaded file. Select XML as your Type of Data and provide the XML XPath of the XML tag. These tags of your XML files are reflected as a row.
For example, in this XML <books> <book><book>...</books>, the output value would be /books/book
You can also provide / in XML X path. It is generated by default.
‘/’ will parse the whole XML
The system will parse the incoming fixed length data. If the field has data in continuation without delimiters, you can separate the data using Fixed Length data type.
Separate the field length (numeric unit) by comma in the field length.
When Parquet is selected as the type of data, the file is uploaded in the schema and the fields are editable.
Note: Parquet is only available in HDFS and Native DFS Receiver.
Auto schema infers headers by comparing the first row of the file with other rows in the data set. If the first line contains only strings, and the other lines do not, auto schema assumes that the first row is a header row.
After the schema is finalized, the next tab is configuration. Once you configure the Data Source, you can add notes and save the Data Source’s configuration.
Along with fetching data from a source directly and uploading the data on the data source; you can also, use an existing dataset in the data source RabbitMQ and DFS.
To read more about it, Refer the “How to use an Existing Dataset " in the Dataset Section.
Add an ADLS batch or streaming data source to create a pipeline. Click the component to configure it.
Under the Schema Type tab, select Fetch From Source, Upload Data File or Use Existing Dataset option. Edit the schema if required and click next to configure.
Note: Under the Fetch From Source tab, if the Type of Data is selected as Avro, then in the Provide Schema field, the below options are available:
- Infer from Data
- Inline Avro Schema
- Upload Avro Schema
These options to Provide Schema field are also available of Upload Data File option under the Schema Type tab.
Field | Description |
---|---|
Connection Name | Connections are the Service identifiers. Select the connection name from the available list of connections, from where you would like to read the data. |
Container | Provide connection name in Azure Blob storage. |
ADLS Directory Path | Provide directory path for ADLS file system. Note: User has an option to configure ADLS source with supported compressed data files. |
ADD CONFIGURATIONS | User can add further configurations (Optional). |
Environment Params | User can add further environment parameters. (Optional) |
Click Next for Incremental Read option.
Note: The incremental Read option is available only for ADLS Batch.
Field | Description |
---|---|
Enable Incremental Read | Unchecked by default, check mark this option to enable incremental read support. |
Read By | Option to read data incrementally either by choosing the File Modification Time option or Column Partition option. |
Upon selecting the File Modification Time option provide the below detail: | |
Offset | Specifies the last modified time of the file. Note: The offset time must be lesser than the latest file modification time. Records with timestamp value greater than the specified datetime (in UTC) will be fetched. After each pipeline run the datetime configuration will set to the most recent timestamp value from the last fetched records. The given value should be in UTC with ISO Date format as yyyy-MM-dd'T'HH:mm:ss.SSSZZZ. Ex: 2021-12-24T13:20:54.825+0000. |
Upon selecting the Column Partition option provide the below details: | |
Read Control Type | Options to control data fetch: None: All records in reference column with values greater than the start value will be read. Limit by Value: All records in reference column with values greater than the start value but less than/equal to the max value that you set will be read. Limit by Incremental Size: All records in reference column with values greater than the start value with specified incremental size that you set will be selected. Note: Upon selecting the Inclusive Start Offset checkbox, the Start value will be included with selected size.
|
Inclusive Start Offset | Check the checkbox for enabling the Inclusive Start Offset option to include the start value for incrementally reading the schema. Note: In case if the Limit by Value option is selected as Read Control Type, then provide the Max value along with Start value of the selected column ID. Upon selecting the Inclusive Start Offset option the schema from Start value to the Max value will be incrementally read. |
Add an Advanced Mongo data source into your pipeline. Drag the data source to the canvas and click on it to configure.
Under the Schema Type tab, select Fetch From Source or Upload Data File. Edit the schema if required and click next to Configure the Advanced Mongo source.
Configuring Advanced Mongo
Field | Description |
---|---|
Connection Name | Connections are the Service identifiers. Select the connection name from the available list of connections, from where you would like to read the data. |
Database Name | Select the Mongo DB database source from which the data is to be fetched. |
Collection Name | Name of the database collection that needs to be scanned should be selected. |
Query | Filtering criteria option to choose between All Data or Match Query. If Match Query option is selected, provide the below detail: |
Filter/Query | Fetch filtered schema or read data from source as per the filter condition or query provided. Note: In Match Query, use only single quotes (‘’) where required. |
Records Per Partition | Number of records to be read per partition. |
Schema Updated Alert | Check the checkbox to receive alerts for any schema changes when the data is fetched from the source. |
Add Configuration | Option to add further configuration. |
Incremental Read in Advanced Mongo
Field | Description |
---|---|
Read Type | Option to fetch data from the source. Full Load, Incremental and CDC options are available to fetch data as explained below: |
Full Load | Reads all the records as per the configured collection from the database during the pipeline execution. |
Incremental | Reads records as per specified offset(s) or start value from the database during pipeline execution. Provide the below |
Column | Select the column for incremental read. The listed columns can be integer, long, date, timestamp, decimal, etc. Note: The selected column should have sequential, sorted (in increasing order) and unique values. |
Column Includes Date | Check-mark the check-box if the column to be read is of data/stamp type. |
Start Value | An offset value needs to be set for the incremental read that will be done on the selected column. Only those column records with values greater than the offset value will be read. |
Read Control Type | Provides three options to control data fetched - None, Limit by Count, and Limit by Value. None: All the records in the reference column with values greater than the Start Value will be read. Limit by Count: Only the mentioned number of records in the reference column with their values greater than the Start Value will be read. Provide No. of Records. Limit by Value: All the records in the reference column with values greater than the Start Value but less than/equal to the Column Value that you set will be read (Column Value is inclusive). Set the Column Value. For None and Limit by Count, it is recommended that the table should have data in sequential and sorted (increasing) order. |
CDC | Reads the records from the configured namespace or the Oplog namespace as per the specified CDC configuration during the pipeline execution. |
Oplog Database | Select the Oplog Database from where the data should be read. |
Oplog Collection | Select the Oplog Collection from where the data should be read. |
Load From Original Collection | This option is checked by default during first time of the configuration. If the check box is unchecked, then it reads the records from Oplog collection as per the specified CDC configuration during the pipeline execution. It will get automatically disabled after pipeline is successfully executed. If this option is unchecked, then provide the below field: |
Offset | Records with timestamp value greater than the specified datetime (in UTC) will be fetched. After each pipeline run the datetime configuration will set to the most recent timestamp value from the last fetched records. The given value should be in UTC with ISO Date format as yyyy-MM-dd'T'HH:mm:ss.SSSZZZ. Ex: 2021-12-24T13:20:54.825+0000. |
Configure Pre-Action in Source
Configuring an Attunity Data Source
To add an Attunity Data Source into your pipeline, drag the Data Source to the canvas and right click on it to configure.
Under the Schema Type tab, select Fetch From Source or Upload Data File. Edit the schema if required and click next to Configure Attunity.
Configuring Attunity
Field | Description |
---|---|
Connection Name | Connections are the Service identifiers. Select the connection name from the available list of connections, from where you would like to read the data. |
Capture | Data: Flag to capture data Metadata: Flag to capture Metadata. You can select both the options too. |
Define Offset | This configurations is similar to what is used for Kafka offset. • Latest: The starting point of the query is just from the latest offset. • Earliest: The starting point of the query is from the starting /first offset. |
Connection Retries | The number of retries for component connection. Possible values are -1, 0 or any positive number. If the value were -1 then there would be infinite retries for infinite connection. |
Delay Between Connection Retries | Retry delay interval for component connection. (In milliseconds.) |
Add Configuration | To add additional custom Kafka properties in key-value pairs. |
Click on the add notes tab. Enter the notes in the space provided.
Configuring a Data Topics and Metadata Topic tab
Choose the topic names and their fields are populated which are editable. You can choose as many topics.
Choose the metadata topic and the topics’ fields are populated which are editable. You can only choose metadata of one Topic.
Field | Description |
---|---|
Topic Name | Topic name from where consumer will read the messages. |
ZK ID | Zookeeper path to store the offset value at per-consumer basis. An offset is the position of the consumer in the log. |
Click Done to save the configuration.
AWS-IoT and Gathr allows collecting telemetry data from multiple devices and process the data.
Note: Every action we perform on Gathr is reflected on AWS-IoT Wizard and vice versa.
Configuring an AWS IoT
To add an IoT channel into your pipeline, drag the channel to the canvas and right click on it to configure.
Schema Type
Under the Schema Type tab, select Fetch From Source or Upload Data File.
Fetch from source takes you to Configuration of AWSIoT.
Upload Data File will take you to Detect Schema page.
Click on the add notes tab. Enter the notes in the space provided.
An Azure Blob Batch channel reads different formats of data in batch (json, csv, orc, parquet) from container. It can omit data into any emitter.
Configuring an Azure Blob Data Source
To add an Azure Blob Data Source into your pipeline, drag the Data Source to the canvas and right click on it to configure.
Under the Schema Type tab, you can Upload Data File and Fetch From Source.
Field | Description |
---|---|
Connection Name | Select the connection name from the available list of connections, from where you would like to read the data. |
Container | Container name in Azure Blob. |
Path | End path with * in case of directory. For example:- outdir.*. For Absolute path:- outdir/filename |
Add Configuration | To add additional custom properties in key-value pairs. |
Configure Pre-Action in Source
An Azure Blob Stream channel reads different formats of streaming data (json, csv, orc, parquet) from container and emit data into different containers.
Configuring an Azure Blob Data Source
To add an Azure Blob Data Source into your pipeline, drag the Data Source to the canvas and right click on it to configure.
Under the Schema Type tab, you can Upload Data File and Fetch From Source.
Field | Description |
---|---|
Connection Name | Select the connection name from the available list of connections, from where you would like to read the data. |
Container | Container name in Azure Blob. |
Path | End path with * in case of directory. For example:- outdir/* Expecting any specific format in directory Path:- outdir/*.csv |
Add Configuration | To add additional custom properties in key-value pairs. |
On Cosmos Channel, you will be able to read from selected container of a Database in batches. You can use custom query in case of Batch Channel.
Configuring a Batch Cosmos Data Source
To add a Batch Cosmos Data Source into your pipeline, drag the Data Source to the canvas and click on it to configure.
Under the Schema Type tab, you can Upload Data File and Fetch From Source.
Field | Description |
---|---|
Connection Name | Select the connection name from the available list of connections, from where you would like to read the data. |
Override Credentials | Unchecked by default, check mark the checkbox to override credentials for user specific actions. |
Key | Provide the Azure Cosmos DB key. Click TEST Connection to test the execution of the connection. |
Database | Select the Cosmos Database from the drop-down list. |
Container | Select the Cosmos Database from the drop-down list. |
Enable Change Feed | Change feed enables to read the latest records or changed records. Effect comes only during pipeline flow. |
Change Feed From Beginning | If set to True, data will be read from beginning. |
CosmosDB Checkpoint Directory | It is the file path where Cosmos stores the checkpoint data for Change feed. |
ADD CONFIGURATION | To add additional custom properties in key-value pairs. |
Configure Pre-Action in Source
Click Next for Incremental Read option
Field | Description |
Enable Incremental Read | Check this checkbox to enable the incremental read support. |
Column to Check | Select a column on which incremental read will work. Displays the list of columns that has integer, long, date, timestamp and decimal type of values. |
Start Value | Mention the value of reference column. Only the records whose value of the reference column is greater than this value will be read. |
Read Control Type | Provides 3 options to control how data will be fetched: None, Limit by Count and Limit by Value. None: All the records with value of reference column greater than offset will be read. Limit By Count: Mentioned no. of records will be read with the value of reference column greater than offset will be read. Limit By Value: All the records with value of reference column greater than offset and less than Column Value field will be read. For None and Limit by count it is recommended that table should have data in sequential and sorted (increasing) order |
On Delta Lake Channel, you should be able to read data from delta lake table on S3, HDFS, GCS, ADLS or DBFS. Delta Lake is an open-source storage layer that brings ACID transactions to Apache Spark™ and big data workloads. All data in Delta Lake is stored in Apache Parquet format. Delta Lake provides the ability to specify your schema and enforce it along with timestamps.
Configuring Delta Data Source
To add a Delta Data Source into your pipeline, drag the Data Source to the canvas and click on it to configure.
Under the Schema Type tab, you can Upload Data File and Fetch From Source. Below are the configuration details of the Delta Source (Batch and Streaming):
Field | Description |
---|---|
Source | Select source for reading the delta file from the available options in the drop down list: HDFS, S3, GCS, DBFS and ADLS. |
Provide below fields if the user selects HDFS source for reading the data: | |
Connection Name | Select the connection name from the available list of connections, from where you would like to read the data. |
Override Credentials | Unchecked by default, check mark the checkbox to override credentials for user specific actions. |
Username | Once the Override Credentials option is checked, provide the user name through which the Hadoop service is running. |
HDFS File Path | Provide the file path of HDFS file system. |
Time Travel Option | Select one of the time travel options: - None: Option not to choose Time Travel. - Version: Specify the version of delta file in order to fetch the older snapshot of the table with given version number. - Timestamp: Specifies last modified time of file. All the files that have their last modified time greater than the present value should be read. Note: The Time Travel option is not available for Streaming Delta source. |
Provide below fields if the user selects S3 source for reading the data: | |
Connection Name | Select the connection name from the available list of connections, from where you would like to read the data. |
Override Credentials | Unchecked by default, check mark the checkbox to override credentials for user specific actions. |
AWS Key Id | Provide the S3 account access key. |
Secret Access Key | Provide the S3 account secret key. Note: Once the AWS Key Id and Secret Access Key is provided, user has an option to test the connection. |
S3 Protocol | Select the S3 protocol from the drop down list. Below protocols are supported for various versions when user selects S3 connection type: - For HDP versions, S3a protocol is supported. - For CDH versions, S3a protocol is supported. - For Apache versions, S3n protocol is supported. - For GCP, S3n and S3a protocol is supported. - For Azure S3n protocol is supported. Read/Write to Mumbai and Ohio regions is not supported. - For EMR S3, S3n, and S3a protocol is supported. - For AWS Databricks, s3a protocol is supported. |
Bucket Name | Provide the S3 bucket name. |
Path | Provide the sub-directories of the bucket name on which the data is to be written. |
Time Travel Option | Select one of the time travel options: - None: Option not to choose Time Travel. - Version: Specify the version of delta file in order to fetch the older snapshot of the table with given version number. () - Timestamp: Specifies last modified time of file. All the files that have their last modified time greater than the present value should be read. Note: The Time Travel option is not available for Streaming Delta source. |
Provide below fields if the user selects GCS source for reading the data: | |
Connection Name | Select the connection name from the available list of connections, from where you would like to read the data. |
Override Credentials | Unchecked by default, check mark the checkbox to override credentials for user specific actions. |
sax.label.comp.serviceAccountKey | Upload the Service Account File. User has an option to test connection. |
Bucket Name | Provide the GCS bucket name. |
Path | Provide the sub-directories of the bucket name on which the data is to be written. |
Time Travel Option | Select one of the time travel options: - None: Option not to choose Time Travel. - Version: Specify the version of delta file in order to fetch the older snapshot of the table with given version number. - Timestamp: Specifies last modified time of file. All the files that have their last modified time greater than the present value should be read. Note: The Time Travel option is not available for Streaming Delta source. |
Provide below fields if the user selects DBFS source for reading the data: | |
Connection Name | Select the connection name from the available list of connections, from where you would like to read the data. |
Override Credentials | Unchecked by default, check mark the checkbox to override credentials for user specific actions. |
Directory Path | Provide the DBFS parent path for check-pointing. |
DBFS File Path | Provide the DBFS file path. |
Provide below fields if the user selects ADLS source for reading the data: | |
Connection Name | Select the connection name from the available list of connections, from where you would like to read the data. |
Container Name | Provide container name for azure delta lake storage. |
ADLS File Path | Provide the directory path for azure delta lake storage file system. |
Time Travel Option | Select one of the time travel options: - None: Option not to choose Time Travel. - Version: Specify the version of delta file in order to fetch the older snapshot of the table with given version number. - Timestamp: Specifies last modified time of file. All the files that have their last modified time greater than the present value should be read. Note: The Time Travel option is not available for Streaming Delta source. |
ADD CONFIGURATION | To add additional custom properties in key-value pairs. |
Environment Params | User can add further environment parameters. (Optional) |
Configure Pre-Action in Source
The configuration for BIGQUERY is mentioned below:
Field | Description |
Connection Name | Connections are the service identifiers. Mention the connection name for creating connection. |
Override Credentials | Unchecked by default, check the checkbox to override credentials for user specific actions. Note: Upload the GCS service account key. |
Load From Big Query Table/ Load From Query Results | Choose one of the options. |
Upon selecting Load From Big Query option, update the below fields: | |
Dataset Name | Upon choosing, ‘Load From Big Query Table’, the user is required to provide the dataset name. |
Table Name | Upon choosing, ‘Load From Big Query Table’, provide the table name. |
Project ID of Dataset | Upon choosing, ‘Load From Big Query Table’, provide the Google Cloud project ID. If not specified, the project from the service account key of connection will be used. |
Columns to Fetch | Upon choosing, ‘Load From Big Query Table’, enter vale for comma separated list of columns to select. |
Where Condition | Upon choosing, ‘Load From Big Query Table’, enter the where condition. |
Partition Filter Condition | Upon choosing, ‘Load From Big Query Table’, enter the partition filter condition. |
Load From Query Results Upon selecting this option, the user will be required to mention, ‘Location of Datasets used in the query’. | |
Maximum Parallelism | Mention the maximum number of partitions to split the data into. |
Add Configuration | The user can add further configurations. |
Schema Results | Under schema results, select the Big Query Dataset name and Big Query Table Name. |
Details | Under details the user will be able to view the: Table of Expiration Number of Rows Last Modified Data Location Table ID Table Size Created |
Table Schema | Table schema details can be viewed here. |
Next, in the Detect Schema window, the user can set the schema as dataset by clicking on the Save As Dataset checkbox. Click Next. The user can set the Incremental Read option.
Note: The user can enable Incremental Read option if, date or integer column in our input data.
Field | Description |
Enable Incremental Read | Check this checkbox to enable the incremental read support. |
Column to Check | Select a column on which incremental read will work. Displays the list of columns that has integer, long, date, timestamp and decimal type of values. |
Start Value | Mention the value of reference column. Only the records whose value of the reference column is greater than this value will be read. |
Read Control Type | Provides 3 options to control how data will be fetched: None, Limit by Count and Limit by Value. None: All the records with value of reference column greater than offset will be read. Limit By Count: Mentioned no. of records will be read with the value of reference column greater than offset will be read. Limit By Value: All the records with value of reference column greater than offset and less than Column Value field will be read. For None and Limit by count it is recommended that table should have data in sequential and sorted (increasing) order. |
Configure Pre-Action in Source
Field | Description |
---|---|
Type | The type of thing you will register. |
Name | Name of the thing. |
Attributes | Attributes of associated thing type. |
Cassandra Data Source reads data from Cassandra cluster using specified keyspace name and table name.
Configuring Cassandra Data Source
To add a Cassandra Data Source into your pipeline, drag the Data Source to the canvas and right click on it to configure.
Under the Schema Type tab, you can Upload Data File and Fetch From Source.
Note: Casandra keyspace name and table name should exist in the Cassandra cluste
r
Field | Description |
---|---|
Connection Name | Select the connection name from the available list of connections, from where you would like to read the data. |
Keyspace | Cassandra keyspace name. |
Table Name | Table name inside the keyspace from where we read data. |
Add configuration | To add additional custom properties in key-value pairs. |
Configure Pre-Action in Source
The CDC Data Source processes Change Data Capture (CDC) information provided by Oracle LogMiner redo logs from Oracle 11g or 12c.
CDC Data Source processes data based on the commit number, in ascending order. To read the redo logs, CDC Data Source requires the LogMiner dictionary.
Follow the Oracle CDC Client Prerequisites, before configuring the CDC Data Source.
Oracle CDC Client Prerequisites
Before using the Oracle CDC Client Data Source, complete the following tasks:
1.Enable LogMiner.
2.Enable supplemental logging for the database or tables.
3.Create a user account with the required roles and privileges.
4.To use the dictionary in redo logs, extract the Log Miner dictionary.
5.Install the Oracle JDBC driver.
LogMiner provides redo logs that summarize database activity. The Data Source uses these logs to generate records.
LogMiner requires an open database in ARCHIVELOG mode with archiving enabled. To determine the status of the database and enable LogMiner, use the following steps:
1. Log into the database as a user with DBA privileges.
2. Check the database logging mode:
select log_mode from v$database; |
If the command returns ARCHIVELOG, you can skip to Task 2.
If the command returns NOARCHIVELOG, continue with the following steps:
3. Shut down the database.
shutdown immediate; |
4. Start up and mount the database:
startup mount; |
5. Configure enable archiving and open the database:
alter database archivelog; alter database open; |
Task 2. Enable Supplemental Logging
To retrieve data from redo logs, LogMiner requires supplemental logging for the database or tables.
1.To verify if supplemental logging is enabled for the database, run the following command:
SELECT supplemental_log_data_min, supplemental_log_data_pk, supplemental_log_data_all FROM v$database; |
For 12c multi-tenant databases, best practice is to enable logging for the container for the tables, rather than the entire database. You can use the following command first to apply the changes to just the container:
ALTER SESSION SET CONTAINER=<pdb>; |
You can enable identification key or full supplemental logging to retrieve data from redo logs. You do not need to enable both:
To enable identification key logging
You can enable identification key logging for individual tables or all tables in the database:
• For individual tables
Use the following commands to enable minimal supplemental logging for the database, and then enable identification key logging for each table that you want to use:
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA; ALTER TABLE <schema name>.<table name> ADD SUPPLEMENTAL LOG DATA (PRIMARY KEY) COLUMNS; |
• For all tables
Use the following command to enable identification key logging for the entire database:
To enable full supplemental logging
You can enable full supplemental logging for individual tables or all tables in the database:
• For individual tables
Use the following commands to enable minimal supplemental logging for the database, and then enable full supplemental logging for each table that you want to use:
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA; ALTER TABLE <schema name>.<table name> ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS; |
• For all tables
Use the following command to enable full supplemental logging for the entire database:
ALTER DATABASE ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS; |
To submit the changes
ALTER SYSTEM SWITCH LOGFILE; |
Create a user account to use with the Oracle CDC Client Data Source. You need the account to access the database through JDBC.
Create accounts differently based on the Oracle version that you use:
Oracle 12c multi-tenant databases
For multi-tenant Oracle 12c databases, create a common user account. Common user accounts are created in cdb$root and must use the convention: c##<name>.
1.Log into the database as a user with DBA privileges.
2.Create the common user account:
ALTER SESSION SET CONTAINER=cdb$root; CREATE USER <user name> IDENTIFIED BY <password> CONTAINER=all; GRANT create session, alter session, set container, select any dictionary, logmining, execute_catalog_role TO <username> CONTAINER=all; ALTER SESSION SET CONTAINER=<pdb>; GRANT select on <db>.<table> TO <user name>; |
Repeat the final command for each table that you want to use.
When you configure the origin, use this user account for the JDBC credentials. Use the entire user name, including the "c##", as the JDBC user name.
Oracle 12c standard databases
For standard Oracle 12c databases, create a user account with the necessary privileges:
1.Log into the database as a user with DBA privileges.
2.Create the user account:
CREATE USER <user name> IDENTIFIED BY <password>; GRANT create session, alter session, select any dictionary, logmining, execute_catalog_role TO <user name>; GRANT select on <db>.<table> TO <user name>; |
Repeat the last command for each table that you want to use.
When you configure the Data Source, use this user account for the JDBC credentials.
Oracle 11g databases
For Oracle 11g databases, create a user account with the necessary privileges:
1.Log into the database as a user with DBA privileges.
2.Create the user account:
CREATE USER <user name> IDENTIFIED BY <password>; GRANT create session, alter session, select any dictionary, logmining, execute_catalog_role TO <user name>; GRANT select on <db>.<table> TO <user name>; |
GRANT select on v$logmnr_parameters to <user name>; GRANT select on v$archived_log to <user name>; GRANT select on <db>.<table> TO <user name>; |
Repeat the final command for each table that you want to use.
When you configure the origin, use this user account for the JDBC credentials.
Task 4. Extract a Log Miner Dictionary (Redo Logs)
When using redo logs as the dictionary source, you must extract the Log Miner dictionary to the redo logs before you start the pipeline. Repeat this step periodically to ensure that the redo logs that contain the dictionary are still available.
Oracle recommends that you extract the dictionary only at off-peak hours since the extraction can consume database resources.
To extract the dictionary for Oracle 11g or 12c databases, run the following command:
EXECUTE DBMS_LOGMNR_D.BUILD(OPTIONS=> DBMS_LOGMNR_D.STORE_IN_REDO_LOGS); |
To extract the dictionary for Oracle 12c multi-tenant databases, run the following commands:
ALTER SESSION SET CONTAINER=cdb$root; EXECUTE DBMS_LOGMNR_D.BUILD(OPTIONS=> DBMS_LOGMNR_D.STORE_IN_REDO_LOGS); |
The Oracle CDC Client origin connects to Oracle through JDBC. You cannot access the database until you install the required driver.
Note: JDBC Jar is mandatory for Gathr Installation and is stored in Tomcat Lib and Gathr third party folder.
The CDC Data Source processes Change Data Capture (CDC) information provided by Oracle LogMiner redo logs from Oracle 11g or 12c.
CDC Data Source processes data based on the commit number, in ascending order. To read the redo logs, CDC Data Source requires the LogMiner dictionary.
The Data Source can create records for the INSERT, UPDATE and DELETE operations for one or more tables in a database
To add a CDC Data Source into your pipeline, drag the processor to the canvas and right click on it to configure.
Source Tables:
To configure your schema, you can either use configured pluggable database or non-pluggable database.
Field | Description |
---|---|
Use Pluggable Database | Select the option for 12c multi-tenant databases. |
Pluggable Database Name | Name of the pluggable database that contains the schema you want to use. Use only when the schema was created in a pluggable database. |
Container Database Connection | Use the user account created for the Data Source. Common user accounts for Oracle. |
Schema Name | Provide the schema name. |
Tables | Tables on which data will be processed. |
Field | Description |
---|---|
Operations | Operations for creating records. • INSERT • DELETE • UPDATE |
Database Time Zone | Time zone of the database. When the database operates in a different time zone from Data Collector. |
Maximum Transaction Time | Time in seconds to wait for changes for a transaction. Enter the longest period that you expect a transaction to require. Default is 60 seconds. |
LogMiner Session Window | Time in seconds to keep a LogMiner session open. Set to larger than the maximum transaction length. Reduce when not using local buffering to reduce LogMiner resource use. Default is 7200 seconds. |
Local Buffering | TRUE:- Using local buffers, the Data Source requests the transactions for the relevant tables and period. The Data Source buffers the resulting LogMiner redo SQL statements until it verifies a commit for a transaction. After the commit, it parses and processes the committed data. The Data Source can buffer the redo SQL statements completely in memory. FALSE:- When using Oracle LogMiner buffers, the Data Source requests data from Oracle LogMiner for a particular period. LogMiner then buffers all transactions for that period for all the tables in the database, rather than only the tables needed by the origin. |
Max Batch Size (records) | Maximum number of records processed at one time. Keep values up to the Data Collector maximum batch size. Default is 1000. |
LogMiner Fetch Size | Minimum number of records to fetch before passing a batch to the pipeline. Keep this value low to allow the origin to pass records to the pipeline as soon as possible, rather than waiting for a larger number of records to become available. Lower values can increase throughput when writes to the destination system are slow. Default is 1. |
Query Timeout | Time to wait before timing out a LogMiner query. Default is 300 seconds. |
Start From | Start From is the point in the LogMiner redo logs where you want to start processing from. When you start the pipeline, Oracle CDC Client starts processing from the specified initial change and continues until you stop the pipeline. It is sub- categories in the following: 1. From the latest change The origin processes all changes that happened in the pipeline after you start the pipeline. 2. From a specified date-time: The origin processes all changes that occurred at the specified date-time and later use the format Use the format: DD-MM-YYYY HH24:mm:ss. 3. From a specified system-change number (SCN) The origin processes all changes that occurred in the specified SCN (specified-change number) and later. When using the specified SCN, the origin starts processing with the timestamps associated with the SCN. If the SCN cannot be found in the redo logs, the origin continues reading from the next higher SCN that is available in the redo logs. |
The container Data Source is used to read the data from Couchbase. This Data Source is used as a caching container which read both the aggregated data and raw data from the bucket.
Configuring a Container Data Source
To add a Container Data Source into your pipeline, drag the Data Source to the canvas and right click on it to configure.
Under the Schema tab, you can Upload Data File Upload Data File and Fetch From Source.
Field | Description |
---|---|
Load From Template | Select the template from the list. Only those templates will be visible in the drop down list that are created globally or within that workspace. |
Sync With Source Template | When a template is saved and is edited, this tab is reflected. If any change is made in source template, corresponding changes will reflect in Container’s configuration. |
Connection Name | Name of the connection |
Bucket Name | Couchbase Bucket name. Select the Couchbase bucket that gets generated with the Couchbase connection. You can also create a bucket and fetch details in the fields. |
Bucket password | Couchbase Bucket password |
Memory Quota | Memory Quota in megabytes per server node. Memory Quota cannot be less than 100 MB. |
Replicate view indexes | By checking the Replicate view indexes checkbox, you ensure that view indexes, as well as data are replicated |
Flush | When flushed, all items in the bucket are removed as soon as possible. Note: This does not immediately show in item count statistics with memcached buckets. |
Record Limit | Enable the limit to fetch the number of records from Couchbase. If you select it as Yes, the following tabs are populated, else, you can move to Fetch Data. |
Max no. of Records | Maximum number of records to be fetched from the Couchbase bucket |
Overflow Condition | Fetch the records based on order for the selected column. (Fetch minimum records for ascending order and fetch maximum records for descending order) Note: With aggregation, select the grouping field in the Overflow condition. |
Fetch Data | Raw Data: Fetch the raw data from the Couchbase bucket. Aggregated Data: Fetch the aggregated data from the Couchbase. A new field is populated, where you can group the fields from the schema. |
Grouping Fields | Field of selected message on which group by is applied. |
Fields | Apply functions on the input and output fields. |
Choose the scope of the Template (Global or Workspace) and Click Done for saving the Template and configuration details.
Configure Pre-Action in Source
Custom Data Source allows you to read data from any data source.
You can write your own custom code to ingest data from any data source and build it as a custom Data Source. You can use it in your pipelines or share it with other workspace users.
Create a jar file of your custom code and upload it in a pipeline or as a registered component utility.
To write a custom code for your custom Data Source, follow these steps:
1.Download the Sample Project. (Available on the home page of Data Pipeline).
Import the downloaded Sample project as a maven project in Eclipse. Ensure that Apache Maven is installed on your machine and that the PATH for the same is set on the machine.
2.Implement your custom code and build the project. To create a jar file of your code, use the following command: mvn clean install –DskipTests.
For a Custom Data Source, add your custom logic in the implemented methods of the classes as mentioned below:
High-level abstraction
If you want high level, abstraction using only Java code, then extend BaseSource as shown in SampleCustomData Source class
com.yourcompany.component.ss.Data Source.SampleCustomData Source which extends BaseSource
Methods to implement:
• public void init(Map<String, Object> conf)
• public List<String> receive()
• public void cleanup()
Low-level abstraction
If you want low-level implementation using spark API, then extend AbstractData Source as shown in SampleSparkSourceData Source class.
com.yourcompany.component.ss.Data Source.SampleSparkSourceData Source extends AbstractData Source
Methods to implement:
• public void init(Map<String, Object> conf)
• public Dataset<Row> getDataset(SparkSession spark)
• public void cleanup()
Configuring Custom Data Source
While uploading data you can also upload a Dataset in Custom Data Source.
Field | Description |
---|---|
Data Source Plugin | Fully qualified name of a custom code class. |
Upload the custom code jar using the Upload jar button from the pipeline designer page. You can use this Custom Data Source in any pipeline.
Step 4: Click Done to save the configuration.
Data Generator Data Source generates test data for testing your pipelines. Once the pipeline is tested with fields and their random data, you can replace the Data Generator Data Source.
Configuring Data Generator Data Source
To add a Data Generator Data Source into your pipeline, drag the Data Source to the canvas and right click on it to configure.
Under the Schema Type tab, you can only Upload Data File to this Data Source.
Configuration tab:
Upload Data:
You can upload a file, of either JSON or CSV format to test data.
Every row from the file is treated as a single message.
Check the option repeat data, if you wish to continuously generate data from uploaded file.
Click on the add notes tab. Enter the notes in the space provided. Click Done to save the configuration.
This component is supported in Gathr on-premise. User have an option to utilize any existing dataset as a channel in the data pipeline.
On the Schema Type tab, update configuration for required parameters with reference to below table:
Field | Description |
---|---|
Select Dataset Type | Option to select a specific type of dataset (for example: HDFS, Hive, JDBC or S3) or all types of dataset to filter and narrow down the selection of required dataset in the subsequent step. |
Select Dataset & Version | Select any of the existing dataset and its version that you want to use as a channel in the pipeline. |
Max No of Rows | Maximum number of sample rows to pull from the Dataset Source. 100 by default. |
Trigger time for Sample | Minimum wait time before system fires a query to fetch data. Example: If it is 15 Secs, then system will first wait for 15 secs and will fetch all the rows available and create dataset out of it. 3 Seconds by default. |
Sampling Method | Dictates how to extract sample records from the complete data fetched from the source. Following are the ways: -Top N: Extract top n records using limit() on dataset. -Random Sample: Extract Random Sample records applying sample transformation and then limit to max number of rows in dataset. |
Once the Schema and Rules for the existing dataset are validated, click Next and go to the Incremental Read tab.
Incremental Read
Field | Description |
---|---|
Enable Incremental Read | Check this checkbox to enable incremental read support. |
Column to Check | Select a column on which incremental read will work. Displays the list of columns that has integer, long, date, timestamp, decimal types of values. |
Start Value | Mention a value of the reference column, only the records whose value of the reference column is greater than this value will be read. |
Read Control Type | Provides three options to control data to be fetched -None, Limit By Count, and Limit By Value. None: All the records with value of reference column greater than offset will be read. Limit By Count: Mentioned no. of records will be read with the value of reference column greater than offset will be read. Limit By Value: All the records with value of reference column greater than offset and less than Column Value field will be read. For None and Limit by count it is recommended that table should have data in sequential and sorted (increasing) order. |
Click NEXT to detect schema from the File Path file. Click Done to save the configuration.
Configure Pre-Action in Source.
To add a DeltaSQL source into your pipeline, drag the data source to the canvas and click on it to configure.
Under the Schema Type tab, you can Upload Data File, Fetch From Source, Use Existing Dataset. Below are the configuration details of the DeltaSQL:
Field | Description |
---|---|
Connection Name | Select the connection name from the available list of connections, from where you would like to read the data. Note: DeltaSQL connection is required while pipeline creation. The actual pipeline execution will happen through metastore. |
Override Credentials | Unchecked by default, check the checkbox to override credentials for user specific actions. |
Username | Once the Override Credentials option is checked, provide the user name through which the Delta SQL service is running. |
Password | Provide the password for Delta SQL override credentials. |
Query | Provide the JDBC compatible SQL query to be executed in the component. |
Inspect Query | Provide the SQL query to be executed in the component with a set limit in records count for inspect and schema detection. Note: - For inspect query field you need to provide simple query with table name like "select * from tablename" - "As of" clause is not supported for inspect query but can be used in query field. - Also for Inspect query it should be JDBC compatible |
ADD CONFIGURATION | To add additional custom properties in key-value pairs. |
Environment Params | User can add further environment parameters. (Optional) |
Incremental Read
Field | Description |
---|---|
Enable Incremental Read | Check the radio button to enable the incremental option. The available options are: - None - Normal - Version |
Upon selecting None option the incremental read support will be disabled. | |
Upon selecting Normal, provide the inputs for the below fields: | |
Column to Check | Select a column on which incremental read will work. Displays the list of columns that has integer, long, date, timestamp, decimal types of values. |
Start Value | Mention a value of the reference column, only the records whose value of the reference column is greater than this value will be read. |
Read Control Type | Provides three options to control data to be fetched -None, Limit By Count, and Maximum Value. None: All the records with value of reference column greater than offset will be read. Limit By Count: Mentioned no. of records will be read with the value of reference column greater than offset will be read. Note: Number of records to be read if Read Control Type is set as Limit by Count. If in the Column to Check field an integer value is selected then, provide integer value greater than 0 in the No. of Records field. Limit by Value: All the records with value of reference column greater than offset and less than Column Value field will be read. Note: In the Column Value field, the incremental read will only be done within the set value for the selected column. Only those column records with values less than or equal to the set column value will be read. The Column Value field appears when the Read Control Type is set as Limit by Value. For None and Limit by count it is recommended that table should have data in sequential and sorted (increasing) order. |
Upon selecting Version, provide the inputs for the below fields: | |
Table Name | Option to provide delta table name for querying the records. |
Version | Option to provide version for querying the records. The records will be read subsequent to the selected version. |
ADD CONFIGURATION | To add additional custom properties in key-value pairs. |
Environment Params | User can add further environment parameters. (Optional) |
Dummy channel is required in cases where the pipeline has a processor and emitter that does not require a channel.
For example, in case you only want to check the business logic or the processing of data, but do not require a data source to generate data for this pipeline. Now, with StreamAnalytix it is a mandatory to have an emitter in the pipeline. In such a scenario, you can use a Dummy Channel so that you can test the processors without the requirement of generating the data using an actual channel.
StreamAnalytix provides batch and streaming GCS (Google Cloud Storage) channels. The configuration for GCS data source is specified below:
Field | Description |
Connection Name | Select GCP connection name for establishing connection. |
Bucket Name | Provide path of the file for Google storage bucket name. |
Path | Provide value for the end path with * in case of directory. For e.g. outdir.* |
Note:
- The user can add configuration by clicking at the ADD CONFIGURATION button.
- Next, in the Detect Schema window, the user can set the schema as dataset by clicking on the Save As Dataset checkbox.
- The Incremental Read option will be in GCS batch data source and not in the GCS Streaming channel.
Configure Pre-Action in Source
Google Spreadsheet data source allows you to read data from worksheets of the desired spreadsheets.
Field | Description |
Connection Name | Connections are the service identifiers. A connection name can be selected from the list if you have created and saved connection details for Google Spreadsheet earlier. Or create one as explained in the topic - Google Spreadsheet Connection. |
Spreadsheet Location | Option to browse and select a spreadsheet or a directory containing spreadsheet(s). |
Path | Provide value for the end path with * in case of directory. For e.g. outdir.* |
ADD CONFIGURATION | Click this button to add additional properties using this option as key-value pairs. |
ADD PARAMS | Click this button to add environment parameters. |
This component is supported in Gathr on-premise. A local file data source allows you to read data from local file system. Local file System is File System where Gathr is deployed.
Note: File reader is for batch processing.
The Batch data can be fetched from source or you can upload the files.
Schema tab allows you to create a schema and the fields. On the Detect Schema tab, select a Data Source or Upload Data.
Field | Description |
---|---|
File Path | The local file path from where the data will be read. |
Click NEXT to detect schema from the File Path file. Click Done to save the configuration.
Configure Pre-Action in Source
To add an HDFS Data Source into your pipeline, drag the Data Source to the canvas and right click on it to configure.
Schema Type tab allows you to create a schema and the fields. On the Detect Schema tab, select a Data Source or Upload Data.
For an HDFS Data Source, if data is fetched from the source, and the type of data is CSV, the schema has an added tab, Is Header Included in source
Field | Description |
---|---|
Connection Name | Connections are the Service identifiers. Select the connection name from the available list of connections, from where you would like to read the data. |
HDFS file Path | HDFS path from where data is read. For Parquet, provide.parquet file path. |
Click on the Add Notes tab. Enter the notes in the space provided.
Click Done to save the configuration.
Configure Pre-Action in Source
To use a Hive Data Source, select the connection and specify a warehouse directory path.
Note: This is a batch component.
To add a Hive Data Source into your pipeline, drag the Data Source to the canvas and right click on it to configure.
Under the Detect Schema Type tab, select Fetch From Source or Upload Data File.
Configuring Hive Data Source
Field | Description |
---|---|
Message Type | Single: If only one type of message will arrive on the Data Source. Multi: If more than one type of message will arrive on the Data Source. |
Message Type | Select the message you want to apply configuration on. |
Field | Description |
---|---|
Connection Name | Connections are the Service identifiers. Select the connection name from the available list of connections, from where you would like to read the data. |
Query | Write a custom query for Hive. |
Refresh Table Metadata | Spark hive caches the parquet table metadata and partition information to increase performance. It allows you to have an option to refresh table cache, to get the latest information during inspect. Also, this feature helps the most when there are multiple update and fetch events, in the inspect session. Refresh Table option also repairs and sync partitioned values into Hive metastore. This allows to process the latest value while fetching data during inspect or run. |
Table Names | User can specify single or multiple table names to be refreshed. |
After the query, Describe Table and corresponding Table Metadata, Partition Information, Serialize and Reserialize Information is populated.
Make sure that the query you run matches with the schema created with Upload data or Fetch from Source.
Click Done to save the configuration.
Configure Pre-Action in Source
Through HTTP Data Source, you can consume different URI and use that data as datasets in pipeline creation.
For example, there is a URL which returns the list of employees in JSON, TEXT or CSV format. You can consume this URL through HTTP Channel and use it as dataset for performing operations.
Configuring HTTP Data Source.
To add a HTTP into your pipeline, drag the Data Source to the canvas and right click on it to configure
Field | Description |
---|---|
URI | HTTP or HTTPS URL to fetch data. |
Request Method | HTTP request method- GET, POST. |
Request Body | Request body for method. |
Header | Header’s parameter name and value. |
Accept Type | Content type that URI accepts. For example: JSON, CSV and Text. |
Auth Type | Used to specify the authorization type associated with the URL. Includes None, Basic, Token and Oauth2 as authorization types. All the types are explained below in detail. |
Path to message | JSON path expression to be evaluated to get input data as per message.Works only for Fetch From source as Json |
1. None
This option specify that URL can be accessed without any authentication.
2. Basic
This option specify that accessing URL requires Basic Authorization. Provide user name and password for accessing the URL
User name | Enter the user name for accessing the URL. |
Password | Enter the password for accessing the URl. |
3. Token Based
Token-based authentication is a security technique that authenticates the users who attempts to log in to a server, a network, or other secure system, using a security token provided by the server
Token ID | Key with which token is referred in request. |
Token | Token which is provided in order to access the URL. |
4. OAuth2
Oauth2 is an authentication technique in which application gets a token that authorizes access to the user's account
AuthURL | The endpoint for authorization server, which retrieves authorization code. |
ClientId | Client ID identifier, given to client during application registration process |
Secret Key | The secret key provided to the client during application registration process. |
Header | Used to specify the headers associated with AuthUrl, through which authorization code is generated. |
Configure Pre-Action in Source
Through HTTPV2 Data Source, you can consume different URI and use that data as datasets in the Ingestion applications creation.
For example: Consider that there is a URL which returns the list of employees in JSON, TEXT, XML or CSV format. You can consume this URL through HTTPV2 Channel and use it as a dataset for performing operations.
Field | Description |
---|---|
HTTP URL | HTTP or HTTPS URI to send request to a resource. |
Method Type | HTTP request method for the URI to be selected out of GET or POST. |
Request Body | Request body to be provided for the POST Request Method selected above. |
Design Time Attributes | This value can be different from what is given in the request body field and will be utilized only during application design time. |
Header | Header’s parameter name and value. |
Auth Type | Used to specify the authorization type associated with the URL. Includes None, Basic, Token and OAuth2 as authorization types. All the types are explained below in detail. |
None | This option specify that URL can be accessed without any authentication. |
Basic | This option specify that accessing URL requires Basic Authorization. Provide user name and password for accessing the URL. |
Token Based | Token-based authentication is a security technique that authenticates the users who attempts to log in to a server, a network, or other secure system, using a security token provided by the server. |
Oauth2 | It is an authentication technique in which application gets a token that authorizes access to the user’s account. |
If Auth Type is selected as Basic, proceed by providing inputs for below parameters: | |
Username | Enter the user name for accessing the URI. |
Password | Enter the password for accessing the URl. |
If Auth Type is selected as Token, proceed by providing inputs for below parameters: | |
Token ID | Key with which token is referred in request. |
Token | Token which is provided in order to access the URL. |
If Auth Type is selected as OAuth2, proceed by providing inputs for below parameters: | |
Header | Used to specify the headers associated with Auth Url, through which authorization code is generated. |
Client ID | Client ID identifier, given to client during application registration process. |
Secret Key | The secret key provided to the client during application registration process. |
Auth URL | The endpoint for authorization server, which retrieves authorization code. |
OAuth2 (Client Credential) | The grant_type is client credential in case of OAuth2 (Client Credential). You can also pass parameters to authentication URL using this authentication method. |
Auth Headers | Header’s parameter name and value can be provided. |
Auth Params | Auth parameter name and value can be provided. |
ClientId | The client identifier that is given to the client during the application registration process should be provided. |
Secret Key | The secret key that is given to the client during the application registration process should be provided. |
Auth Url | The endpoint for authorization server, that retrieves the authorization code. |
Use Token | Use generated token in the URL parameter or header of the request. |
Custom Token | When user wants to first get a token from a custom API, that can be used in main data API to fetch the data as an authentication key. Make sure that the response content type of the custom token request is JSON. |
HTTP URL | HTTP or HTTPS URI to send request to a resource. |
Method Type | HTTP request method for the URI to be selected out of GET or POST. |
Request Body | Request body to be provided for the for Request Method selected above. |
Header | Header’s parameter name and value. |
Path to Token | JSON path expression that points to tokens. |
Auth Type | Used to specify the authorization type associated with the URL. Includes None, Basic and Token. |
None | This option specify that URL can be accessed without any authentication. |
Basic | This option specify that accessing URL requires Basic Authorization. Provide user name and password for accessing the URL. |
Token | Token-based authentication is a security technique that authenticates the users who attempts to log in to a server, a network, or other secure system, using a security token provided by the server. |
Enable SSL | This option can be selected as True if SSL is enabled on Http. It is set to False by default. If selected as true, keystore/certificate file needs to be uploaded and Keystore Password/Certificate Alias should be provided in the respective configuration fields. There is another field “Path to Data” that will appear only when in Schema Type page, for Fetch From Source option, you select the type of data as JSON. |
Path to Data | Path to data is a JSON path expression that points to arrays. |
HTTPV2 Pagination
Field | Description |
---|---|
Enable Pagination | Enable this option to make paginated calls to fetch the desired data from the endpoint URI. |
Paginate Using | Choose the request component that will change for each pagination Get/Post call. |
URL | When the content of the URL will get updated. |
Request Body | When the request body content will get updated. This option is only available for Post request methods. |
Header | When the header content will get updated. |
Persist across run | Enable the checkbox to retain the last offset value as the initial value for the subsequent run. |
Treat as Expression | During configuration this option can be enabled at the applicable fields to consider the entered value as an expression. Else, the value will be considered as a constant string. |
Paginate using URL | |
Option | Choose the relevant method to provide the variable for URL-based pagination. |
Query Parameter | When the query parameters are changing in URL. |
URL Offset | When the URL will get generated from the previous response. |
Custom Expression | To provide custom logic to create URL. |
Query Parameter | |
Parameter Name | The query parameter name should be provided that will be used in the URL. Example: page -> This will make the URL as http://www..com?page=<query_parameter_value> |
Initial Value | The initial value of the query parameter should be provided. Example: 1 -> This will make the URL as http://www..com?page=1 |
Next Value | The next value expression of the query parameter should be provided. Example: $page+1 -> This will make the URL for next call as http://www..com?page=2 |
URL Offset | |
Initial Page URL | Initial URL expression should be provided. Example: “http://www..com” |
Next Page URL | The next page URL expression should be provided. The previous response will have a URL value, that will be used as the next URL. Example: When response is Json, and URL is present at path “data.url”, then the next page URL expression is from_json_object($last_response_last_record,’$.data.url') |
Custom Expression | |
URI | Custom URI expression should be provided with help of offset variables. Example: concat($url,‘page=’,offset) where offset is the variable created in offset name section. |
Offset Name | The offset name should be provided that can be used in the Custom Expression and Pagination End Condition expression. Example: offset -> This can be used in the URI expression as concat($url,‘page=’,offset) |
Initial Offset Value | The initial offset value should be provided. Example: 1 -> This will make the initial value for the URL expression as concat($url,‘page=’,<1>) |
Next Offset Value | The next offset value expression should be provided. Example: $offset+1 -> This will make the next value for the URL expression as concat($url,‘page=’,<2>) |
Paginate using Request Body | |
Option | Choose the relevant method to provide the variable for body-based pagination. |
JSON Body | When the body is in JSON format. |
Custom Expression | To provide custom logic to create a body for post-call requests. |
JSON Body | |
Key | The path to the JSON object that will update for each pagination request should be provided. Example 1: $body.path_to_variable_component Example 2: $body.a.b |
Initial Value | The initial value of the key in the JSON body should be provided. Example: 10 -> This value 10 will be replaced for the first call at key path: $body.path_to_variable_component |
Next Value | The next value expression of the key in the JSON body should be provided. Example: $body.path_to_variable_component+10 |
Custom Expression | |
Body Expression | Provide custom expression to create body for each pagination request. Example: concat(’{“from”:’,$offset,’,“size”:100,“query”:{“match_all”:{}}}’) |
Offset Name | The offset name should be provided that can be used in the Custom Expression and Pagination End Condition expression. Example: offset -> This can be used in the body expression as concat(’{“from”:’,$offset,’,“size”:100,“query”:{“match_all”:{}}}’) |
Initial Offset Value | The initial offset value should be provided. Example: 10 -> This can be used in the body expression as concat(’{“from”:’,10,’,“size”:100,“query”:{“match_all”:{}}}’) |
Next Offset Value | The next offset value expression should be provided. Example: $offset+1 -> This can be used in the body expression as concat(’{“from”:’,,’,“size”:100,“query”:{“match_all”:{}}}’) |
Paginate using Header | |
Header Expression | All the headers with their keys and values (custom expression/constant) should be provided. |
Key | Header key should be provided. |
Value | An expression value or a constant value should be provided for each header key. Example: $offset -> Offset value will be derived from the Offset variable fields. |
Offset Name | The offset name should be provided that can be used in the Header value expression and Pagination End Condition expression. Example: offset -> This can be used in the header value as $offset |
Initial Offset Value | The initial offset value should be provided. Example: 10 -> This can be used in the header value as 10 for a key. |
Next Offset Value | The next offset value expression should be provided. Example: $offset+10 -> This can be used in the header value as increments of 10 for each pagination request. |
Pagination End Condition | An expression to stop the pagination should be provided. Example 1: $offset>12 Example 2: $last_response_record_count=0 Example 3: get_json_object($last_response_last_record,’$.data.id’)>3 Example 4: get_json_object($last_response,’$.metadata.url’) is null |
Enable Request Rate | Control the rate at which pagination requests are made. |
HTTPV2 Incremental Configuration Field Description Enable Incremental Read Enable this option to read data incrementally. Increment Using Choose the request component that will change for each incremental run. URL When the content of the URL will get updated. Request Body When the request body content will get updated. Header When the header content will get updated. Each incremental option is further explained in detail. Common option Treat as Expression During configuration this option can be enabled at the applicable fields to consider the entered value as an expression. Else, the value will be considered as a constant string. Increment using URL Option Choose the relevant method to provide the variable for URL-based incremental read. Query Parameter When the query parameters are changing in URL. URL Offset When the URL will get generated from the previous response. Custom Expression To provide custom logic to create URL. Query Parameter Parameter Name The query parameter name should be provided that will be used in the URL. Example: page -> This will make the URL as http://www..com?page=<query_parameter_value> Initial Value The initial value of the query parameter should be provided. Example: 1 -> This will make the URL as http://www..com?page=1 Next Value The next value expression of the query parameter should be provided. Example: $page+1 -> This will make the URL for next call as http://www..com?page=2 URL Offset Initial Page URL Initial URL expression should be provided. Example: “http://www..com” Next Page URL The next page URL expression should be provided. The previous response will have a URL value, that will be used as the next URL. Example: When response is Json, and URL is present at path “data.url”, then the next page URL expression is from_json_object($last_response_last_record,’$.data.url') Custom Expression URI Custom URI expression should be provided with help of offset variables. Example: concat($url,‘page=’,offset) where offset is the variable created in offset name section. Offset Name The offset name should be provided that can be used in the Custom Expression. Example: offset -> This can be used in the URI expression as concat($url,‘page=’,offset) Initial Offset Value The initial offset value should be provided. Example: 1 -> This will make the initial value for the URL expression as concat($url,‘page=’,<1>) Next Offset Value The next offset value expression should be provided. Example: $offset+1 -> This will make the next value for the URL expression as concat($url,‘page=’,<2>) Increment using Request Body Option Choose the relevant method to provide the variable for body-based incremental read. JSON Body When the body is in JSON format. Custom Expression To provide custom logic to create a body for post-call requests. JSON Body Key The path to the JSON object that will update for each incremental run should be provided. Example 1: $body.path_to_variable_component Example 2: $body.a.b Initial Value The initial value of the key in the JSON body should be provided. Example: 10 -> This value 10 will be replaced for the first call at key path: $body.path_to_variable_component Next Value The next value expression of the key in the JSON body should be provided. Example: $body.path_to_variable_component+10 Custom Expression Body Expression Provide custom expression to create body for each incremental run request. Example: concat(’{“from”:’,$offset,’,“size”:100,“query”:{“match_all”:{}}}’) Offset Name The offset name should be provided that can be used in the Custom Expression. Example: offset -> This can be used in the body expression as concat(’{“from”:’,$offset,’,“size”:100,“query”:{“match_all”:{}}}’) Initial Offset Value The initial offset value should be provided. Example: 10 -> This can be used in the body expression as concat(’{“from”:’,10,’,“size”:100,“query”:{“match_all”:{}}}’) Next Offset Value The next offset value expression should be provided. Example: $offset+1 -> This can be used in the body expression as concat(’{“from”:’,,’,“size”:100,“query”:{“match_all”:{}}}’) Increment using Header Header Expression All the headers with their keys and values (custom expression/constant) should be provided. Key Header key should be provided. Value An expression value or a constant value should be provided for each header key. Example: $offset -> Offset value will be derived from the Offset variable fields. Offset Name The offset name should be provided that can be used in the Header value expression. Example: offset -> This can be used in the header value as $offset Initial Offset Value The initial offset value should be provided. Example: 10 -> This can be used in the header value as 10 for a key. Next Offset Value The next offset value expression should be provided. Example: $offset+10 -> This can be used in the header value as increments of 10 for each incremental read request.
This component is supported in Gathr on-premise. Under the Schema Type tab, select Fetch From Source or Upload Data File.
When fetch from source is chosen, the schema tab comes after the configuration, and when you upload data, the schema tab comes before configuration.
Configuring Impala Data Source
Field | Description |
---|---|
Connection Name | Connections are the Service identifiers. Select the connection name from the available list of connections, from where you would like to read the data. |
Query | Write a custom query for Impala. |
Fetch Size | The fetch size determines the number of rows to be fetched per round trip. |
Click on the Add Notes tab. Enter the notes in the space provided.
Click Done to save the configuration.
JDBC Custom Channel supports Clickhouse, ES and other databases that have JDBC connection. JDBC Channel supports Oracle, Postgres, MYSQL, MSSQL,DB2 connections. You can configure and test the connection with JDBC. This connection will allow you to extract the data from Relational DB sources into pipeline in batches after configuring JDBC channel.
Prerequisite: Upload appropriate driver jar as per the RDBMS used in JDBC Data Source. Use the upload jar option.
For using DB2, create a successful DB2 Connection.
Configuring JDBC Data Source
To add a JDBC into your pipeline, drag the Data Source to the canvas and click on it to configure.
Under the Schema Type tab, select Fetch From Source or Upload Data File. or use existing dataset
Note: This is a Batch component. Field Description Connection Name Connections are the service identifiers. A connection name can be selected from the list if you have created and saved connection details for JDBC Custom earlier. Use the Test Connection option to ensure that the connection with the JDBC Custom channel is established successfully. A success message states that the connection is available. In case of any error in test connection, edit the connection to resolve the issue before proceeding further. Schema Name Source Schema name for which the list of table will be viewed. This field will only be visible for databases that require schema to be selected. Table Name Source table name to be selected for which you want to view the metadata. Query Hive compatible SQL query to be executed in the component. In case if a connection with Custom JDBC option is selected, this query will serve as a sub-query for the inspect query specified in the custom JDBC connection configuration. Design Time Query Query used to fetch limited records during Application design. Used only during schema detection and inspection. This field should be ignored in case of custom JDBC connection. Enable Query Partitioning This enables parallel reading of data from the table. It is disabled by default. Tables will be partitioned if this check-box is enabled. If query partitioning is enabled, additional fields will be displayed as described below: No. of Partitions Specifies the number of parallel threads to be invoked to partition the table while reading the data. Type-in Partition Column Select this option if the Partition Column list shown is empty or you do not see the required column in the list. Partition on Column This column will be used to partition the data. This has to be a numeric column, on which spark will perform partitioning to read data in parallel. Data Type In case if you have typed-in the partitioning column, you need to specify the data type of that column here. Lower Bound Value of the lower bound for partitioning column. This value will be used to decide the partition boundaries. The entire dataset will be distributed into multiple chunks depending on the values. Upper Bound Value of the upper bound for partitioning column. This value will be used to decide the partition boundaries. The entire dataset will be distributed into multiple chunks depending on the values. Fetch Size The fetch size determines the number of rows to be fetched per round trip. The default value is 1000. Add configuration Additional properties can be added using Add Configuration link. Schema Check the populated schema details. Advanced Configuration Optionally, you can enable incremental read.
Metadata
Select table. You can view the Metadata of the tables.
Field | Description |
Table | Select table of which you want to view Metadata. |
Column Name | Name of the column generated from the table. |
Column Type | Type of the column, for example: Text, Int |
Nullable | If the value of the column could be Nullable or not. |
Once the Metadata is selected, Click Next and detect schema to generate the output with Sample Values. The next tab is Incremental Read.
Incremental Read
Enter the schema and select table. You can view the Metadata of the tables.
Field | Description |
---|---|
Enable Incremental Read | Check this check-box to enable incremental read support. |
Column to Check | Select a column on which incremental read will work. Displays the list of columns that has integer, long, date, timestamp, decimal types of values. |
Type-in Details | Check this option if column list shown is empty or you do not see the required column in the list. |
Start Value | Mention a value of the reference column, only the records whose value of the reference column is greater than this value will be read. |
Override Query | Select this option to override the Incremental Read query. This allows you to provide an optimal query. |
Incremental Read Query | This section shows the query to fetch the incremental data. If Override Query is selected, you can change this query and provide an optimal query. <Offset>keyword is a placeholder which will be replaced with an actual value at runtime. |
Read Control Type Note: Not available in case of Custom JDBC source. | Provides three options to control data to be fetched -None, Limit By Count, and Maximum Value. None: All the records with value of reference column greater than offset will be read. Limit By Count: Mentioned no. of records will be read with the value of reference column greater than offset will be read. Maximum Value: All the records with value of reference column greater than offset and less than Column Value field will be read. For None and Limit by count it is recommended that table should have data in sequential and sorted (increasing) order. |
Click Done to save the configuration.
Configure Pre-Action in Source
JIRA data source allows you to retrieve information from issues based on a SQL query or from projects based on the specified configurations.
Configure the JIRA data source by providing the details for each field as per the table below:
Field | Description |
---|---|
Connection Name | Connections are the service identifiers. A connection name can be selected from the list if you have created and saved connection details for JIRA earlier. |
Search Type | The options available in Search Type are: Query: It can be selected to read the desired data from JIRA issues by providing SQL query. Parameters: It can be selected to read data from specific projects, issue types and fields. |
If the preferred option is Query, an additional field for providing query will be displayed. | |
Query | Query to be executed in JIRA. Example: select * from jiraissue, project where project.id = jiraissue.project and project.pkey = 'KEY' |
If the preferred option is Parameter, then proceed by updating the following fields. | |
Project Name | Project name(s) should be selected to read data from the available user projects. Select IN to include or NOT IN to exclude the selected project names. |
Issue Type | Issue Type(s) should be selected to read data from the available Issue Types. Select IN to include or NOT IN to exclude the selected Issue Types. |
Expand String | The expand parameter for the Work Item attributes. At present, only fields are supported. |
Fields | Option to read data from the selected fields. The fields displayed in this parameter are specific to the projects selected above. Note: At least one field selected here should be a timestamp to define the incremental or range-based configuration in the subsequent step. |
Advanced Configuration
In JIRA data source, the option for incremental read can be enabled or disabled. Based on the selection, configure each field as per the table below:
Field | Description |
---|---|
Incremental Read | The options available in Incremental Read are: Enable: It can be selected to read the desired data incrementally from the specified column as per the input start date. Disable: It can be selected to read the data within a specific range by providing the start date and an end date. |
Column | The timestamp field should be selected on which the condition to fetch data will be applied as per the incremental read type. |
Start Date | Records with timestamp value greater than or equal to the specified start date will be fetched at the runtime. |
End Date | Records with timestamp value less than or equal to the specified end date will be fetched at the runtime. Note: This option is only available with incremental read disabled. |
Under the Schema Type tab, select Fetch From Source or Upload Data File.
When fetch from source is chosen, the schema tab comes after the configuration, and when you upload data, the schema tab comes before configuration.
Configuring Kafka Data Source
Field | Description |
---|---|
Connection Name | Connections are the Service identifiers. Select the connection name from the available list of connections, from where you would like to read the data. |
Topic Type | Select one of the below option to fetch the records from Kafka topic(s) • Topic name: The topic name is used to subscribe a single topic • Topic list: The topic list is used to subscribe a comma-separated list of topics • Pattern: The pattern is used to subscribe to topic values as Java regex: • With Partitions80: The topic with partitions is used for specific topic(s) partitions to consume. i.e. json string {"topicA":[0,1],"topicB":[2,4]} Schema must be same in case of Topic List/Pattern/With partition. |
Topic Name | Topic in Kafka from where messages will be read. |
Topic List/ Pattern/ With partitions | A topic is category or feed name to which messages will be published |
Partitions | Number of partitions. Each partition is an ordered unchangeable sequence of message that is repeatedly added to a commit log. |
Replication Factor | Number of replications. Replication provides stronger durability and higher availability. For example, a topic with replication factor N can tolerate up to N-1 server failures without losing any messages committed to the log. |
Define Offset | Following configurations are used for Kafka offset. • Latest: The starting point of the query is just from the latest offset. • Earliest: The starting point of the query is from the starting /first offset. • Custom: A json string specifying a starting and ending offset for each partition. • startingOffsets: A JSON string specifying a starting offset for each partition i.e. {"topicA":{"0":23,"1":-1},"topicB":{"0":-1}} • endingOffsets: A JSON string specifying a ending offset for each partition. This is an optional property with default value “latest”.i.e. {"topicA":{"0":23,"1":-1},"topicB":{"0":-1 |
Connection Retries | The number of retries for component connection. Possible values are -1, 0 or any positive number. If the value is -1 then there would be infinite retries for infinite connection. |
Max Offset Per Trigger | Rate limit on maximum number of offsets processed per trigger interval. The specified total number of offsets will be proportionally split across topic Partitions of different volume. |
Fail on Data Loss | Provides option of query failure in case of data loss. (For example, topics are deleted, or offsets are out of range). This may be a false alarm. You can disable it when it doesn't work as you expected. Batch queries will always fail, if it fails to read any data from the provided offsets due to data loss |
Delay Between Connection Retries | Retry delay interval for component connection (in milliseconds). |
ADD CONFIGURATION | To add additional custom Kafka properties in key-value pairs. |
Click on the Add Notes tab. Enter the notes in the space provided.
Click Done to save the configuration.
Kinesis Data Source allows you to fetch data from Amazon Kinesis stream.
Configuring Kinesis Data Source
Under the Schema Type tab, select Upload Data File.
Field | Description |
---|---|
Connection Name | Connections are the Service identifiers. Select the connection name from the available list of connections, from where you would like to read the data. |
Override Credentials | Unchecked by default, check mark the checkbox to override credentials for user specific actions. Provide AWS KeyID and Secret Access Key. |
Stream Name | Name of the Kinesis stream. |
EndPoint | End point is a URL that is the entry point of Kinesis services. |
Region | Name of the region. For example, us-west-2 |
Initial Position | In the absence of Kinesis checkpoint, this is the workers initial starting position in the stream. • The values are either the beginning of stream per Kinesis limit of 24 hours or the tip of the stream. • TRIM_HORIZON: To read data from beginning of stream, use the command TRIM_HORIZON. • LATESOpenness To read latest or most recent records, use the command LATEST. |
Connection Retries | The number of retries for component connection. Possible values are -1, 0 or any positive number. If the value were -1 then there would be infinite retries for infinite connection. |
Delay between Connection Retries | Retry delay interval for component connection. (In milliseconds.) |
Maximum Delay Between Connection Retries | Maximum cool-off period between two retries. |
ADD CONFIGURATION | User can add further configurations by providing values after clicking the ADD CONFIGURATION option. |
Environment Params |
Click on the Add Notes tab. Enter the notes in the space provided.
Click Done to save the configuration.
Apache Kudu is a column-oriented data store of the Apache Hadoop ecosystem. It enable fast analytics on fast (rapidly changing) data. The channel is engineered to take advantage of hardware and in-memory processing. It lowers query latency significantly from similar type of tools.
Configuring KUDU Data Source
To add a KUDU Data Source into your pipeline, drag the Data Source to the canvas and right click on it to configure.
Under the Schema Type tab, select Fetch From Source or Upload Data File.
Field | Description |
---|---|
Connection Name | Connections are the service identifiers. Select the connection name from the available list of connections, from where you would like to read the data. |
Table Name | Name of the table. |
Add Configuration | To add additional properties in key-value pairs. |
Metadata
Enter the schema and select table. You can view the Metadata of the tables.
Field | Description |
---|---|
Table | Select table of which you want to view Metadata. |
Column Name | Name of the column generated from the table. |
Column Type | Type of the column, for example: Text, Int |
Nullable | If the value of the column could be Nullable or not. |
Once the Metadata is selected, Click Next and detect schema to generate the output with Sample Values. The next tab is Incremental Read.
Incremental Read
Enter the schema and select table. You can view the Metadata of the tables.
Field | Description |
---|---|
Enable Incremental Read | Check this check-box to enable incremental read support. |
Column to Check | Select a column on which incremental read will work. Displays the list of columns that has integer, long, date, timestamp, decimal types of values. |
Start Value | Mention a value of the reference column, only the records whose value of the reference column is greater than this value will be read. |
Read Control Type | Provides three options to control data to be fetched -None, Limit By Count, and Maximum Value. None: All the records with value of reference column greater than offset will be read. Limit By Count: Mentioned no. of records will be read with the value of reference column greater than offset will be read. Maximum Value: All the records with value of reference column greater than offset and less than Column Value field will be read. For None and Limit by count it is recommended that table should have data in sequential and sorted (increasing) order. |
Click on the Add Notes tab. Enter the notes in the space provided.
Click Done to save the configuration.
Configure Pre-Action in Source
Mqtt Data Source reads data from Mqtt queue or topic.
Configuring Mqtt Data Source
To add a MQTT Data Source into your pipeline, drag the Data Source to the canvas and right click on it to configure.
Under the Schema Type tab, select Fetch From Sourceor Upload Data File.
Field | Description |
---|---|
Connection Name | Connections are the service identifiers. Select the connection name from the available list of connections, from where you would like to read the data. |
QueueName/TopicName | Queue/topic name from which messages will be read. |
Add Configuration | To add additional properties in key-value pairs. |
Click on the Add Notes tab. Enter the notes in the space provided.
Click Done to save the configuration.
This Data Source enables you to read data from HDFS. This is a streaming component.
For a NativeDFS Receiver Data Source, if data is fetched from the source, and the type of data is CSV, the schema has an added tab that reads, Is Header Included in source.
This is to signify if the data that is fetched from source has a header or not.
If Upload Data File is chosen, then there is an added tab, which is Is Header included in Source.
All the properties are same as HDFS.
This component is supported in Gathr on-premise. A Native File Reader Data Source allows you to read data from local file system. Local file System is the File System where Gathr is deployed.
Note: Native File reader is for Streaming processing.
Streaming data can be fetched from source or you can upload the files.
Detect Schema tab allows you to create a schema and the fields. On the Detect Schema tab, select a Data Source or Upload Data
Field | Description |
---|---|
File Path | The local file path from where the data will be read. |
Click NEXT to detect schema from the File Path file. Click Done to save the configuration.
OpenJMS is a messaging standard that allows application components to create, send, receive and read messages.
OpenJMS Data Source reads data from OpenJMS queue or topic.
The JMS (Java Message Service) supports two messaging models:
1. Point to Point: The Point-to-Point message producers are called senders and the consumers are called receivers. They exchange messages by means of a destination called a queue: senders produce messages to a queue; receivers consume messages from a queue. What distinguishes the point-to-point messaging is that only one consumer can consume a message.
2. Publish and Subscribe: Each message is addressed to a specific queue, and the receiving clients extract messages from the queues established to hold their messages. If no consumers are registered to consume the messages, the queue holds them until a consumer registers to consume them.
Configuring OpenJMS Data Source
To add an OpenJMS Data Source into your pipeline, drag the Data Source to the canvas and right click on it to configure.
Under the Schema Type tab, select Fetch From Source or Upload Data File.
Field | Description |
---|---|
Connection Name | Connections are the Service identifiers.Select the connection name from the available list of connections, from where you would like to read the data |
Queue Name | Directory path of HDFS file system. |
Click on the add notes tab. Enter the notes in the space provided. Click Done to save the configuration.
The configuration for Pubsub data source is mentioned below:
Field | Description |
Connection Name | Select the connection name for creating connection. |
Override Credentials | Unchecked by default, check mark the checkbox to override credentials for user specific actions. |
Service Account Key Files | Upload the Service Account File. User has an option to test the connection. |
Topic Name | Mention the topic name where the data will be published. |
Auto-create Subscription Name | Select checkbox to auto-create subscription name. Or else; enter Subscription name. |
Max Size | Enter value for number of messages to be received in one request. |
Note:
The user can add further configuration by clicking at the ADD CONFIGURATION button.
The ‘Fetch from Source’ will work only if topic and subscription name exists and has some data already published in it.
Multiple Emitters to Pubsub source are supported only when auto create subscription name is enabled.
RabbitMQ Data Source reads messages from the RabbitMQ cluster using its exchanges and queues.
Configuring RabbitMQ Data Source
To add a RabbitMQ into your pipeline, drag the Data Source to the canvas and right click on it to configure.
Under the Schema Type tab, select Fetch From Source or Upload Data File.
Field | Description |
---|---|
Connection Name | Connections are the Service identifiers. Select the connection name from the available list of connections, from where you would like to read the data. |
Exchange Name | RMQ producers can only send messages to an exchange. The exchange pushes messages to queues from where consumers can read the message. |
Exchange Type | Defines the functionality of the exchange i.e., how messages are routed through it. Exchange types: DIRECT, TOPIC and FANOUT. |
Exchange Durable | Durable exchanges remain active when a server restarts. Non-durable exchanges (transient exchanges) are purged when a server restarts. |
Exchange Auto Delete | If you set this to True, the exchange is deleted when all queues finish using it. |
Routing Key | Identifies a key for redirecting messages to a specific topic. |
Queue Name | Name of the queue where data will be published. |
Queue Durable | Durable queues remain active when a server restarts. Non- durable queues (transient queues) are purged if/when a server restarts. |
Queue Auto Delete | If set, the queue is deleted when all the consumers finish using it. |
Add Configuration | To add additional custom RabbitMQ properties in key-value pair. |
Click on the add notes tab. Enter the notes in the space provided.
Click Done to save the configuration.
RDS emitter allows you to write to RDS DB Engine, which could be through SSl or without ssl. RDS is Relational Database service on Cloud.
RDS Channel can read in Batch from the RDS Databases (Postgresql, MySql, Oracle, Mssql). RDS is Relational Database service on Cloud. The properties of RDS are similar to those of a JDBC Connector with one addition of SSL Security.
SSL Security can be enabled on RDS Databases.
System should be able to connect, read and write from SSL Secured RDS.
If security is enabled, it will be configured in Connection and automatically propagated to channel.
Please note: SSL Support is not available for Oracle.
Configuring RDS Data Source
To add a RDS Data Source into your pipeline, drag the Data Source to the canvas and right click on it to configure.
Under the Schema Type tab, select Fetch From Source or Upload Data File:
Field | Description |
---|---|
Connection Name | Connections are the Service identifiers. Select the connection name from the available list of connections for data ingestion. |
Query | Hive compatible SQL query to be executed in the component. |
Enable Query Partitioning | Tables will be partitioned and loaded RDDs if this check-box is enabled. This enables parallel reading of data from the table. |
Number of Partitions | Specifies no of parallel threads to be invoked to read from JDBC in spark. |
Partition on Column | Partitioning column can be any column of type Integer, on which spark will perform partitioning to read data in parallel. |
Lower Bound/Upper Bound | Value of the lower bound for partitioning column/Value of the upper bound for partitioning column. |
Metadata
Enter the schema and select table. You can view the Metadata of the tables.
Field | Description |
---|---|
Schema | Schema name for which the list of table will be viewed. |
Table | Select table of which you want to view Metadata. |
Once the Metadata is selected, Click Next and go to the Incremental Read tab.
Incremental Read
Enter the schema and select table. You can view the Metadata of the tables.
Field | Description |
---|---|
Enable Incremental Read | Check this checkbox to enable incremental read support. |
Column to Check | Select a column on which incremental read will work. Displays the list of columns that has integer, long, date, timestamp, decimal types of values. |
Start Value | Mention a value of the reference column, only the records whose value of the reference column is greater than this value will be read. |
Read Control Type | Provides three options to control data to be fetched -None, Limit By Count, and Maximum Value. None: All the records with value of reference column greater than offset will be read. Limit By Count: Mentioned no. of records will be read with the value of reference column greater than offset will be read. Maximum Value: All the records with value of reference column greater than offset and less than Column Value field will be read. For None and Limit by count it is recommended that table should have data in sequential and sorted (increasing) order. |
Amazon Redshift is a fully managed, petabyte-scale data warehouse service in the cloud.
Note: This is a batch component.
Configuring RedShift Data Source
To add a Redshift Data Source into your pipeline, drag the Data Source to the canvas and right click on it to configure.
Under the Schema Type tab, select Fetch From Source or Upload Data File
Field | Description |
---|---|
Message Type | Single: If only one type of message will arrive on the Data Source. Multi: If more than one type of message will arrive on the Data Source. |
Message Name | Select the message you want to apply configuration on. |
Field | Description |
---|---|
Connection Name | Connections are the service identifiers. Select the connection name from the available list of connections, from where you would like to read the data. |
Query | Write a valid query for RedShift |
Enable Query Partitioning | Enable Redshift to read parallel data from a running query. |
No. of Partitions | Specifies number of parallel threads to be invoked to read from RedShift in Spark. |
Partition on Columns | Partitioning column is applied on column of Integer type. Spark performs partitioning to read data in parallel. |
Lower Bound | Value of the lower bound for partitioning column |
Upper Bound | Value of the upper bound for partitioning column |
Click Done to save the configuration.
Configure Pre-Action in Source
S3 data source reads objects from Amazon S3 bucket. Amazon S3 stores data as objects within resources called Buckets.
For an S3 data source, if data is fetched from the source, and the type of data is CSV, the schema has an added tab, Header Included in source.
This is to signify if the data that is fetched from source has a header or not.
If Upload Data File is chosen, then there is an added tab, which is Is Header Included in the source. This signifies if the data uploaded is included in the source or not.
Configuring S3 Data Source
To add the S3 data source into your pipeline, drag the source to the canvas and click on it to configure.
Under the Schema Type tab, select Fetch From Sourceor Upload Data File
.
Field | Description |
---|---|
Connection Name | Connections are the Service identifiers. Select the connection name from the available list of connections, from where you would like to read the data. |
S3 Protocol | Protocols available are S3, S3n, S3a |
End Point | S3 endpoint details should be provided if the source is Dell EMC S3. |
Bucket Name | Buckets are storage units used to store objects, which consists of data and meta-data that describes the data. |
Override Credentials | Unchecked by default, check mark the checkbox to override credentials for user specific actions. Provide AWS KeyID and Secret Access Key. |
Path | File or directory path from where data is to be read. The path to be added must be a directory and not absolute. |
Add Configuration | To add additional custom S3 properties in a key-value pair. User can add further configurations by the following ways: Use key: avroSchema in case if you want to provide avro schema file and paste content (as value) in JSON format to map the schema. - Use key: avroSchemaFilePath and provide S3 absolutepath of AVSC schema file as value. To load the schema file from s3, IAM Role attached to Instance Profile will be used. |
Click on the add notes tab. Enter the notes in the space provided.
Click Done to save the configuration.
S3 Batch Channel can read data from S3 Buckets in incremental manner. Amazon S3 stores data as objects within resources called Buckets.
On a S3 Batch Channel you will be able to read data from specified S3 Bucket with formats like json, CSV, Text, Parquet, ORC. How it helps is only the files modified after the specified time would be read.
For an S3 Data Source, if data is fetched from the source, and the type of data is CSV, the schema has an added tab, Is Header Included in source.
This is to signify if the data that is fetched from source has a header or not.
If Upload Data File is chosen, then there is an added tab, which is Is Header included in Source. This signifies if the data is uploaded is included in source or not.
Configuring S3 Batch Data Source
To add a S3 Batch Data Source into your pipeline, drag the Data Source to the canvas and right click on it to configure.
Configuring S3 Data Source
To add a S3 Data Source into your pipeline, drag the Data Source to the canvas and right click on it to configure.
Under the Schema Type tab, select Fetch From Source or Upload Data File.
Field | Description |
---|---|
Connection Name | Connections are the Service identifiers. Select the connection name from the available list of connections, from where you would like to read the data. |
Override Credentials | Unchecked by default, check mark the checkbox to override credentials for user specific actions. Provide AWS KeyID and Secret Access Key. |
S3 Protocol | Available protocols of S3 are S3, S3n, S3a. S3a protocol is supported by Databricks. S3 and S3n are supported by EMR. S3a is supported by Hadoop Version 3.x. S3 and S3n protocol is supported by Hadoop Version below 3.x. |
End Point | S3 endpoint details should be provided if the source is Dell EMC S3. |
Bucket Name | Buckets are storage units used to store objects, which consists of data and meta-data that describes the data. |
Path | File or directory path from where data is to be read. |
Enable Incremental Read | Note: Incremental Read works during Pipeline run. |
Offset | Specifies last modified time of file - all the files whose last modified time is greater that this value will be read. |
Add Configuration | To add additional custom S3 properties in a key-value pair. User can add further configurations by the following ways: Use key: avroSchema in case if you want to provide avro schema file and paste content (as value) in JSON format to map the schema. - Use key: avroSchemaFilePath and provide S3 absolutepath of AVSC schema file as value. To load the schema file from s3, IAM Role attached to Instance Profile will be used. |
Click Next for Incremental Read option.
Note: The incremental Read option is available only for S3 Batch
.
Field | Description |
---|---|
Enable Incremental Read | Unchecked by default, check mark this option to enable incremental read support. |
Read By | Option to read data incrementally either by choosing the File Modification Time option or Column Partition option. |
Upon selecting the File Modification Time option provide the below detail: | |
Offset | Records with timestamp value greater than the specified datetime (in UTC) will be fetched. After each pipeline run the datetime configuration will set to the most recent timestamp value from the last fetched records. The given value should be in UTC with ISO Date format as yyyy-MM-dd'T'HH:mm:ss.SSSZZZ. Ex: 2021-12-24T13:20:54.825+0000. |
Upon selecting the Column Partition option provide the below details: | |
Column | Select the column for incremental read. The listed columns can be integer, long, date, timestamp, decimal, etc. Note: The selected column should have sequential, sorted (in increasing order) and unique values. |
Start Value | Mention the value of reference column. Only the records whose value of the reference column is greater than this value will be read. |
Read Control Type | Provides three options to control data to be fetched -None, Limit By Count, and Limit by Value. None: All the records with value of reference column greater than offset will be read. Limit By Count: Mentioned no. of records will be read with the value of reference column greater than offset will be read. Limit by Value: All the records with value of reference column greater than offset and less than Column Value field will be read. For None and Limit by count it is recommended that table should have data in sequential and sorted (increasing) order. |
Click on the add notes tab. Enter the notes in the space provided.
Click Done to save the configuration.
Configure Pre-Action in Source
Parse Salesforce data from the source or import a file in either data type format except Parquet. Salesforce channel allows to read data from a Salesforce account.
Salesforce is a top-notch CRM application built on the Force.com platform. It can manage all the customer interactions of an organization through different media, like phone calls, site email inquiries, communities, as well as social media. This is done by reading Salesforce object specified by Salesforce Object Query Language.
First is to create a Salesforce connection and for that you would require the following prerequisites:
l A valid Salesforce account.
l User name of Salesforce account.
l Password of Salesforce account.
l Security token of Salesforce account.
Configuring Salesforce Data Source
To add a Salesforce Data Source into your pipeline, select the Data Source and click on it to configure.
Under the Schema Type tab, select Fetch from Source or Upload Data File.
Now, select a Salesforce connection and write a query to fetch any Salesforce object. Then provide an API version. Below are the configuration details:
Field | Description |
---|---|
Fetch From Source/Upload Data File | Option to provide source schema details by either fetching from source or by uploading data file. |
Connection Name | Connections are the service identifiers. A connection name can be selected from the list if you have created and saved connection details for Salesforce earlier. |
Table Name | Source table name to be selected for which you want to view the metadata. |
More Configurations | |
Query | Salesforce Object Query Language (SOQL) to search your organization’s Salesforce data for specific information. SOQL is similar to the SELECT statement in the widely used Structured Query Language (SQL) but is designed specifically for Salesforce data. It is mandatory for reading objects like opportunity. Note: SOQL does not support ‘*’ identifier. |
Infer Schema | (Optional) Infer schema from the query results. This will find the data type of the field specified in SOQL. Sample rows will be taken to find the data type. This will work if number of records are 5 and above. |
Date Format | A string that indicates the format that follow java.text.SimpleDateFormat to use when reading timestamps. This applies to Timestamp type. By default, it is null which means trying to parse timestamp by java.sql.Timestamp.valueof(). |
Bulk | (Optional) Flag to enable bulk query. This is the preferred method when loading large sets of data. Bulk API is based on REST principles and is optimized for loading large sets of data. You can use it to query many records asynchronously by submitting batches. Salesforce will process batches in the background. Default value is false. |
Salesforce Object | (Conditional) Salesforce Objects are database tables which permit you to store data specific to organization. This is a mandatory parameter when bulk is true and it should be same as specified in SOQL. |
Pk Chunking | (Optional) Flag to enable automatic primary key chunking for bulk query job. This splits bulk queries into separate batches that of the size defined by chunkSize option. By default, it is false. Pk Chunking feature can automatically make large queries manageable when using the Bulk API. Pk stands for Primary Key — the object’s record ID — which is always indexed. This feature is supported for all custom objects, many standard objects, and their sharing tables. |
Chunk size | The size of the number of records to include in each batch. Default value is 100,000. This option can only be used when Pk Chunking is true. Maximum size is 250,000. |
Version | Salesforce API version to be selected from the list. |
Configure Pre-Action in Source
SFTP channel allows user to read data from network file system.
Configuring SFTP Data Source
Field | Description |
---|---|
Connection Name | Select the SFTP connection name. |
File Path | Mention the file path of SFTP file system. Note: In case of AWS, while providing directory the user is required to provide the wildcard character ( * ) along with the directory. For eg: /home/centos/foldername/* |
Incremental Read | Check mark to read latest file in case of folder. |
Parallelism | Number of parallel threads to launch in order to run to download file from SFTP. |
Configure Pre-Action in Source
The snowflake cloud-based data warehouse system can be used as a source/channel in Gathr for configuring ETL pipelines.
Configure the snowflake source by filling the below mentioned details:
Field | Description |
---|---|
Connection Name | The user is required to select the Snowflake connection. |
Override Credentials | Unchecked by default, check the checkbox to override credentials for user specific actions. |
Username | Once the Override Credentials option is checked, provide the user name through which the Delta SQL service is running. |
Password | Provide the password for Delta SQL override credentials. |
Create Warehouse | The user can create a new warehouse by selecting the check-box. The user will be required to upload the file to detect schema. Note: Upon selecting the create warehouse option, the user is required to mention the warehouse configuration details. |
Warehouse Name | The user is required to mention the warehouse name. The user can select a warehouse from the existing warehouse list displayed here. |
Schema Name | Snowflake schema list of the database of the selected connection is to be mentioned here. Note: Upon selecting schema, the metadata of the table will be displayed. |
Query | The user is required to write the Snowflake SQL query. |
Pre-Action Query | A semicolon separated list of SQL commands that are executed before reading data. |
Post Action Query | A semicolon separated list of SQL commands that are executed after reading data. |
Field | Description |
---|---|
Warehouse Size | User will be required to select a warehouse size. |
Maximum Cluster Count | The user will be required to specify the maximum number of clusters for the warehouse. |
Scaling Policy | The user is required to mention the scaling policy. |
Auto Suspend | The user is required to mention the value in seconds. |
Auto Resume | Specifies whether to automatically resume a warehouse when a SQL statement (e.g. query) is submitted to it. |
Comments | If needed, the user can mention specific comments. |
Configure Pre-Action in Source
Socket Data Source allows you to consume data from a TCP data source from a pipeline. Configure Schema type and choose a Socket connection to start streaming data to a pipeline.
Configuring Socket Data Source
To add a Socket Data Source into your pipeline, drag the Data Source to the canvas and right click on it to configure.
Under the Schema Type tab, select Fetch from Source or Upload Data File
.
Field | Description |
---|---|
Connection Name | Connections are the Service identifiers. Select the connection name from the available list of connections, from where you would like to read the data. |
Add Configuration | To add additional custom properties in key-value pairs. |
Click on the add notes tab. Enter the notes in the space provided. Click Done to save the configuration.
This component is supported in Gathr on-premise. SQS Channel allows you to read data from different SQS queues. The Queue types supported are Standard and FIFO. Along with providing maximum throughput you can achieve scalability using the buffered requests independently.
Configuring SQS Data Source
To add a SQS Data Source into your pipeline, drag the Data Source to the canvas and right click on it to configure.
Under the Schema tab, you can select the Max No. of Rows and Sampling Method as Top-N or Random Sample
Field | Description |
---|---|
Connection Name | Connection Name for creating connection. |
Queue Type | Select type of the queue: Standard or FIFO. |
Queue Name | Name of the queue where data will be published. |
Visibility Timeout | The length of the time (in seconds) that a message received from a queue will be invisible to other receiving components. |
Message Retention Period (in seconds) | The amount of time Amazon SQS will retainna message in if it does not get deleted. |
Maximum Message Size (in bytes) | Maximum message size (in bytes) accepted by Amazon SQS. |
Receive Message Wait Time (in seconds) | The maximum amount of time that a long polling receive call will wait for a message to become available before returning an empty house. |
Delivery Delay (in seconds) | The amount of time to delay the first delivery of all messages added to this queue. |
Stream Cosmos Channel can read data in stream manner from CosmosDB (SQL API). It reads updated documents by specifying change-feed directory and using its options.
Note: Both BatchCosmos and StreamCosmos works with Local Session.
Configuring a Stream Cosmos Data Source
To add a Stream Cosmos Data Source into your pipeline, drag the Data Source to the canvas and right click on it to configure.
Under the Schema Type tab, you can Upload Data File and Fetch From Source
.
Field | Description |
---|---|
Connection Name | Select the connection name from the available list of connections, from where you would like to read the data. |
Database | Cosmos Database. |
Container | Container name in Cosmos. |
Change Feed from Beginning | If set to True, data will be read from beginning. |
CosmosDB Checkpoint Directory | It is the file path where Cosmos stores the checkpoint data for Change feed. |
Add Configuration | To add additional custom properties in key-value pairs. |
On Delta Lake Channel, you should be able to read data from delta lake table on S3, HDFS or DBFS.
Configuring a Streaming Delta Data Source.
Field | Description |
Source | Mention the source name. |
Connection Name | Select the connection name from the available list of connections, from where you would like to read the data. |
Override Credentials Check-mark the checkbox. Provide the username through which the Hadoop service is running. | |
HDFS File Path | Provide file path of the HDFS file system. |
S3 Protocol | S3 protocol to be used. |
Bucket Name | Mention Bucket Name. |
Path | Sub-directories of the bucket name mentioned above to which the data is to be written on. |
Add configuration | To add additional custom properties in key-value pairs. |
This component is supported in Gathr on-premise. VERTICA Channel supports Oracle, Postgres, MYSQL, MSSQL, DB2 connections.
You can configure and connect above mentioned DB-engines with JDBC. It allows you to extract the data from DB2 and other sources into your data pipeline in batches after configuring JDBC channel.
Note: This is a batch component.
Prerequisite: Upload appropriate driver jar as per the RDBMS used in JDBC Data Source. Use the upload jar option.
For using DB2, create a successful DB2 Connection.
Configuring SQS a Vertica Data Source
To add a SQS Vertica Data Source into your pipeline, drag the Data Source to the canvas and right click on it to configure.
Field | Description |
Connection Name | Connection Name for creating connection. |
Query | Hive compatible SQL query to be executed in the component. |
Metadata
Enter the schema and select table. You can view the Metadata of the tables:
Field | Description |
Table | Select table of which you want to view Metadata. |
Column Name | Name of the column generated from the table. |
Column Type | Type of the column, for example: Text, Int |
Nullable | If the value of the column could be Nullable or not. |
Once the Metadata is selected, Click Next and detect schema to generate the output with Sample Values. The next tab is Incremental Read.
Incremental Read
Enter the schema and select table. You can view the Metadata of the tables.
Field | Description |
Enable Incremental Read | Check this check-box to enable incremental read support. |
Column to Check | Select a column on which incremental read will work. Displays the list of columns that has integer, long, date, timestamp, decimal types of values. |
Start Value | Mention a value of the reference column, only the records whose value of the reference column is greater than this value will be read. |
Read Control Type | Provides three options to control data to be fetched -None, Limit By Count, and Maximum Value. None: All the records with value of reference column greater than offset will be read. Limit By Count: Mentioned no. of records will be read with the value of reference column greater than offset will be read. Maximum Value: All the records with value of reference column greater than offset and less than Column Value field will be read. For None and Limit by count it is recommended that table should have data in sequential and sorted (increasing) order. |
Click Done to save the configuration.
Configure Pre-Action in Source
This component is supported in Gathr on-premise. A TIBCO Enterprise Management Service (EMS) server provides messaging services for applications that communicate by monitoring queues.
Configuring Tibco Data Source
To add a Tibco Data Source into your pipeline, drag the Data Source to the canvas and right click on it to configure.
Under the Schema Type tab, you can Upload Data File and Fetch From Source
Field | Description |
---|---|
Connection Name | Select the connection name from the available list of connections, from where you would like to read the data. |
Messaging Model | Option to select the messaging model as either Topic or Queue. |
Queue Name | The queue name should be specified. |
Durable | Durable name should be specified. |
Number of Consumers | Number of consumers should be specified. |
Batch Size | Maximum number of records processed at one time. Keep values up to the Data Collector maximum batch size. |
Connection Timeout | Connection timeout in milliseconds should be specified. 2000 by default. |
Connection Retries | Maximum number of connection retries allowed should be specified. 3 by default. |
Click on the add notes tab. Enter the notes in the space provided. Click Done to save the configuration.
VSAM Channel reads data stored in COBOL EBCDIC and Text Format.
Configuring a COBOL Data Source
To add a VSAM Data Source into your pipeline, drag the Data Source to the canvas and right click on it to configure.
Under the Schema tab, you can select the Max No. of Rows and Sampling Method as Top-N or Random Sample.
Field | Description |
---|---|
HDFS Connection | Container connection name for creating connection. |
Data Type | Two types of data type are supported: EBDCIC: Extended Binary Coded Decimal Interchange Code, is an 8 bit character encoding used on IBM mainframes and AS/400s. TEXT: Text files are also accepted as data type. |
Generate Record ID | Every generated record will have a primary record id corresponding to every entry. |
HDFS Copy Book Path | CopyBook is Schema file for Cobol. This is where you can use the Scope Variable using @. To know more about the same, read about Scope Variable. |
HDFS Data Path | The file path in HDFS from where the data will be read. |