Kafka Data Source
In this article
Under the Schema Type tab, select Fetch From Source or Upload Data File.
When fetch from source is chosen, the schema tab comes after the configuration, and when you upload data, the schema tab comes before configuration.
On the Kafka channel, you will be able to read data with formats including JSON, CSV, TEXT, XML, Fixed Length, Binary, AVRO.
Configuring Kafka Data Source
Connection Name
Connections are the service identifiers. Select the connection name from the available list of connections, from where you would like to read the data.
Batch
Check the option to enable batch processing.
Topic Type
Select one of the below options to fetch the records from Kafka topic(s):
Topic Name: The topic name is used to subscribe to a single topic.
Topic List: The topic list is used to subscribe to a comma-separated list of topics.
Pattern: The pattern is used to subscribe to topics using Java regex.
With Partitions: The topic with partitions is used for specific topic(s) partitions to consume. i.e., JSON string
{"topicA":[0,1],"topicB":[2,4]}
. Schema must be the same in case of Topic List/Pattern/With partition.
Topic Name
Topic in Kafka from where messages will be read.
Topic List/Pattern/With partitions
A topic is a category or feed name to which messages will be published.
Partitions
Number of partitions. Each partition is an ordered unchangeable sequence of messages that is repeatedly added to a commit log.
Replication Factor
Number of replications. Replication provides stronger durability and higher availability. For example, a topic with replication factor N can tolerate up to N-1 server failures without losing any messages committed to the log.
Record Has Header?
Check the option to read record headers along with data from the Kafka topic.
Replace Nulls with Blanks
Enable flags to replace all null values with blanks.
Specify Consumer Group
Specify consumer ID type. Default value is Auto, implying that it will be auto-generated by the Kafka client. The other available options are:
Group Id: In the Consumer group ID field, specify the group id used for reading data. Use this option cautiously. Concurrently running queries (both batch and streaming) or sources with the same group id are likely to interfere with each other, causing each query to read only part of the data. When this is set, option ‘groupIdPrefix’ will be ignored.
Group Id Prefix: Specify the consumer group id prefix to use for reading data. Prefix of consumer group identifiers (group.id) that are generated by structured streaming queries. If ‘kafka.group.id’ is set, then this option will be ignored.
Define Offset
Following configurations are used for Kafka offset:
Latest: The starting point of the query is just from the latest offset.
Earliest: The starting point of the query is from the starting/first offset.
Custom: A JSON string specifying a starting and ending offset for each partition.
startingOffsets: A JSON string specifying a starting offset for each partition. i.e.,
{"topicA":{"0":23,"1":-1},"topicB":{"0":-1}}
endingOffsets: A JSON string specifying an ending offset for each partition. This is an optional property with default value “latest”. i.e.,
{"topicA":{"0":23,"1":-1},"topicB":{"0":-1}}
Connection Retries
The number of retries for component connection. Possible values are -1, 0, or any positive number. If the value is -1, then there would be infinite retries for an infinite connection.
Max Offset Per Trigger
Rate limit on the maximum number of offsets processed per trigger interval. The specified total number of offsets will be proportionally split across topic partitions of different volumes.
Fail on Connection or Data Loss
Select as ‘True’ to stop the pipeline with an error after exhausting the connection retry attempts, upon either losing connection to Kafka or encountering data loss (e.g., due to deleted topics or out-of-range offsets). Setting it to ‘False’ enables the pipeline to run indefinitely until any underlying issues are resolved. Note: Batch queries will fail if they cannot read any data from provided offsets due to lost data.
Delay Between Connection Retries
Retry delay interval for component connection (in milliseconds).
ADD CONFIGURATION: To add additional custom Kafka properties in key-value pairs.
Click on the Add Notes tab. Enter the notes in the space provided.
Click Done to save the configuration.
If you have any feedback on Gathr documentation, please email us!