Skip to content

API Docs - v1.0.0

Sinkmapper

avro (Sink Mapper)

This extension is a Siddhi Event to Avro Message output mapper.Transports that publish messages to Avro sink can utilize this extension to convert siddhi events to Avro messages.
 Users can either specify the avro schema or give the schema registry URL and schema reference id as a parameter in stream definition.
In case no specification of avro schema a flat avro schema of type record is generated using the stream attributes as schema fields.

Syntax

@sink(..., @map(type="avro", schema.def="<STRING>", schema.registry="<STRING>", schema.id="<STRING>")

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
schema.def This specifies the desired avro schema to be used to convert siddhi events to avro message.
The schema should be specified as a quoted json string.
STRING No No
schema.registry Used to specify the URL of the schema registry. STRING No No
schema.id Used to specify the id of the avro schema. This id is the global id returned from the schema registry when posting the schema to the registry. The specified id is used to retrive the schema from the schema registry. STRING No No

Examples EXAMPLE 1

@sink(type='inMemory', topic='stock', @map(type='avro',schema.def = """{"type":"record","name":"stock","namespace":"stock.example","fields":[{"name":"symbol","type":"string"},{"name":"price":"type":"float"},{"name":"volume","type":"long"}]}"""))
define stream stockStream (symbol string, price float, volume long);

The above configuration performs a default Avro mapping that generates an Avro message as output byte array.

EXAMPLE 2

@sink(type='inMemory', topic='stock', @map(type='avro',schema.registry = 'http://localhost:8081', schema.id ='22',@payload("""{"Symbol":{{symbol}},"Price":{{price}},"Volume":{{volume}}}"""
)))
define stream stockStream (symbol string, price float, volume long);

The above configuration performs a custom Avro mapping that generates an Avro message as output byte array. The avro schema is retrieved from the given schema registry(localhost:8081) using the provided schema id.

Sourcemapper

avro (Source Mapper)

Avro to Event input mapper. Transports which accepts Avro messages can utilize this extension to convert the incoming Avro message to Siddhi event.
Users can specify the avro schema used to create avro message as a parameter in stream definition.
In case no specification of avro schema a flat avro schema of type record is generated using the stream attributes as schema fields.
The generated/specified avro schema is used to convert the avro message into siddhi event.

Syntax

@source(..., @map(type="avro", schema.def="<STRING>", schema.registry="<STRING>", schema.id="<STRING>", fail.on.missing.attribute="<BOOL>")

QUERY PARAMETERS

Name Description Default Value Possible Data Types Optional Dynamic
schema.def Used to specify the schema of the Avro message. The full schema used to create the avro message should be specified as quoted json string. STRING No No
schema.registry Used to specify the URL of the schema registry. STRING No No
schema.id Used to specify the id of the avro schema. This id is the global id returned from the schema registry when posting the schema to the registry. The specified id is used to retrive the schema from the schema registry. STRING No No
fail.on.missing.attribute This can either have value true or false. By default it will be true.
This attribute allows user to handle unknown attributes.
 By default if an json execution fails or returns null system will drop that message.
However setting this property to false will prompt system to send event with null value to Siddhi where user can handle it accordingly.
(ie. Assign a default value)
true BOOL Yes No

Examples EXAMPLE 1

@source(type='inMemory', topic='user', @map(type='avro', schema .def = """{"type":"record","name":"userInfo","namespace":"user.example","fields":[{"name":"name","type":"string"}, {"name":"age","type":"int"}]}"""))
define stream userStream (name string, age int );

Above configuration will do a default Avro input mapping. The input avro message containing user info will be converted to a siddhi event.
Expected input is a byte array.

EXAMPLE 2

@source(type='inMemory', topic='user', @map(type='avro', schema .def = """{"type":"record","name":"userInfo","namespace":"avro.userInfo","fields":[{"name":"username","type":"string"}, {"name":"age","type":"int"}]}""",@attributes(name="username",age="age")))
define stream userStream (name string, age int );

Above configuration will do a custom Avro input mapping. The input avro message containing user info will be converted to a siddhi event.
 Expected input is a byte array.

EXAMPLE 3

@source(type='inMemory', topic='user', @map(type='avro',schema.registry='http://192.168.2.5:9090', schema.id='1',@attributes(name="username",age="age")))
define stream userStream (name string, age int );

Above configuration will do a custom Avro input mapping. The input avro message containing user info will be converted to a siddhi event using the schema retrived from given schema registry(localhost:8081).
Expected input is a byte array.