API Docs - v2.2.4
Tested Siddhi Core version: 5.1.21
It could also support other Siddhi Core minor versions.
Sinkmapper
avro (Sink Mapper)
This extension is a Siddhi Event to Avro Message output mapper.Transports that publish messages to Avro sink can utilize this extension to convert Siddhi events to Avro messages.
You can either specify the Avro schema or provide the schema registry URL and the schema reference ID as parameters in the stream definition.
If no Avro schema is specified, a flat Avro schema of the 'record' type is generated with the stream attributes as schema fields.
@sink(..., @map(type="avro", schema.def="<STRING>", schema.registry="<STRING>", schema.id="<STRING>", use.avro.serializer="<BOOL>")
QUERY PARAMETERS
Name | Description | Default Value | Possible Data Types | Optional | Dynamic |
---|---|---|---|---|---|
schema.def | This specifies the required Avro schema to be used to convert Siddhi events to Avro messages. |
STRING | No | No | |
schema.registry | This specifies the URL of the schema registry. |
STRING | No | No | |
schema.id | This specifies the ID of the avro schema. This ID is the global ID that is returned from the schema registry when posting the schema to the registry. The specified ID is used to retrieve the schema from the schema registry. |
STRING | No | No | |
use.avro.serializer | Set this parameter to true when you use the class io.confluent.kafka.serializers.KafkaAvroSerializer as the value serializer when creating the Kafka producer. When set to false, org.apache.kafka.common.serialization.ByteArraySerializer will be used. |
false | BOOL | Yes | No |
Examples EXAMPLE 1
@sink(type='inMemory', topic='stock', @map(type='avro',schema.def = """{"type":"record","name":"stock","namespace":"stock.example","fields":[{"name":"symbol","type":"string"},{"name":"price","type":"float"},{"name":"volume","type":"long"}]}"""))
define stream StockStream (symbol string, price float, volume long);
The above configuration performs a default Avro mapping that generates an Avro message as an output ByteBuffer.
EXAMPLE 2
@sink(type='inMemory', topic='stock', @map(type='avro',schema.registry = 'http://localhost:8081', schema.id ='22',@payload("""{"Symbol":{{symbol}},"Price":{{price}},"Volume":{{volume}}}"""
)))
define stream StockStream (symbol string, price float, volume long);
The above configuration performs a custom Avro mapping that generates an Avro message as an output ByteBuffer. The Avro schema is retrieved from the given schema registry (localhost:8081) using the schema ID provided.
Sourcemapper
avro (Source Mapper)
This extension is an Avro to Event input mapper. Transports that accept Avro messages can utilize this extension to convert the incoming Avro messages to Siddhi events.
The Avro schema to be used for creating Avro messages can be specified as a parameter in the stream definition.
If no Avro schema is specified, a flat avro schema of the 'record' type is generated with the stream attributes as schema fields.
The generated/specified Avro schema is used to convert Avro messages to Siddhi events.
@source(..., @map(type="avro", schema.def="<STRING>", schema.registry="<STRING>", schema.id="<STRING>", fail.on.missing.attribute="<BOOL>", use.avro.deserializer="<BOOL>", basic.auth.username="<STRING>", basic.auth.password="<STRING>", ssl.keystore.path="<STRING>", ssl.keystore.password="<STRING>", ssl.truststore.path="<STRING>", ssl.truststore.password="<STRING>")
QUERY PARAMETERS
Name | Description | Default Value | Possible Data Types | Optional | Dynamic |
---|---|---|---|---|---|
schema.def | This specifies the schema of the Avro message. The full schema used to create the Avro message needs to be specified as a quoted JSON string. |
STRING | No | No | |
schema.registry | This specifies the URL of the schema registry. |
STRING | No | No | |
schema.id | This specifies the ID of the Avro schema. This ID is the global ID that is returned from the schema registry when posting the schema to the registry. The schema is retrieved from the schema registry via the specified ID. |
STRING | No | No | |
fail.on.missing.attribute | If this parameter is set to 'true', a JSON execution failing or returning a null value results in that message being dropped by the system. |
true | BOOL | Yes | No |
use.avro.deserializer | Set this parameter to true when you use the class io.confluent.kafka.serializers.KafkaAvroDeserializer as the value deserializer when creating the Kafka consumer configs. When set to false, org.apache.kafka.common.serialization.ByteArrayDeserializer will be used. |
false | BOOL | Yes | No |
basic.auth.username | This specifies the username to authenticate if the schema registry is secured via basic authentication. |
EMPTY_STRING | STRING | Yes | No |
basic.auth.password | This specifies the password to authenticate if the schema registry is secured via basic authentication. |
EMPTY_STRING | STRING | Yes | No |
ssl.keystore.path | This specifies the SSL keystore path. |
EMPTY_STRING | STRING | Yes | No |
ssl.keystore.password | This specifies the SSL keystore password. |
EMPTY_STRING | STRING | Yes | No |
ssl.truststore.path | This specifies the SSL trust store path. |
EMPTY_STRING | STRING | Yes | No |
ssl.truststore.password | This specifies the SSL trust store password. |
EMPTY_STRING | STRING | Yes | No |
Examples EXAMPLE 1
@source(type='inMemory', topic='user', @map(type='avro', schema .def = """{"type":"record","name":"userInfo","namespace":"user.example","fields":[{"name":"name","type":"string"}, {"name":"age","type":"int"}]}"""))
define stream UserStream (name string, age int );
The above Siddhi query performs a default Avro input mapping. The input Avro message that contains user information is converted to a Siddhi event.
The expected input is a byte array or ByteBuffer.
EXAMPLE 2
@source(type='inMemory', topic='user', @map(type='avro', schema .def = """{"type":"record","name":"userInfo","namespace":"avro.userInfo","fields":[{"name":"username","type":"string"}, {"name":"age","type":"int"}]}""",@attributes(name="username",age="age")))
define stream userStream (name string, age int );
The above Siddhi query performs a custom Avro input mapping. The input Avro message that contains user information is converted to a Siddhi event.
The expected input is a byte array or ByteBuffer.
EXAMPLE 3
@source(type='inMemory', topic='user', @map(type='avro',schema.registry='http://192.168.2.5:9090', schema.id='1',@attributes(name="username",age="age")))
define stream UserStream (name string, age int );
The above Siddhi query performs a custom Avro input mapping. The input Avro message that contains user information is converted to a Siddhi event via the schema retrieved from the given schema registry(localhost:8081).
The expected input is a byte array or ByteBuffer.