API Docs - v3.0.1
Tested Siddhi Core version: 5.1.21
It could also support other Siddhi Core minor versions.
Sink
sqs (Sink)
SQS sink allows users to connect and publish messages to an AWS SQS Queue. It has the ability to only publish Text messages
Syntax@sink(type="sqs", queue="<STRING>", access.key="<STRING>", secret.key="<STRING>", region="<STRING>", message.group.id="<STRING>", deduplication.id="<STRING>", delay.interval="<INT>", @map(...)))
QUERY PARAMETERS
Name | Description | Default Value | Possible Data Types | Optional | Dynamic |
---|---|---|---|---|---|
queue | Queue url which SQS Sink should connect to |
STRING | No | No | |
access.key | Access Key for the Amazon Web Services. (This is a mandatory field and should be provided either in the deployment.yml or in the sink definition itself) |
none | STRING | Yes | No |
secret.key | Secret Key of the Amazon User. (This is a mandatory field and should be provided either in the deployment.yml or in the sink definition itself) |
none | STRING | Yes | No |
region | Amazon Web Service Region |
STRING | No | No | |
message.group.id | ID of the group that the message belong to(only applicable for FIFO Queues) |
null | STRING | Yes | Yes |
deduplication.id | ID by which a FIFO queue identifies the duplication in the queue(only applicable for FIFO queues) |
null | STRING | Yes | Yes |
delay.interval | Time in seconds for how long the message remain in the queue until it is available for the consumers to consume. |
-1 | INT | Yes | No |
Examples EXAMPLE 1
@sink(type='sqs',queue='https://amazon.sqs.queue.url',access.key='aws.access.key',secret.key='aws.secret.key',region='us-east-1',delay.interval='5',message.group.id='group-1',@map(type='xml') )define stream outStream(symbol string, deduplicationID string);
Above example demonstrate how an SQS sink is getting configured in order to publish messages to a SQS queue.
Once an event is received by outStream, an xml message will be generated by 'xml' mapper from the attribute values of the event. Then SQS sink will connect to the queue using provided configurations and send the message to the queue.
EXAMPLE 2
@sink(type='sqs',queue='https://amazon.sqs.queue.fifo',access.key='aws.access.key',secret.key='aws.secret.key',region='us-east-1',delay.interval='5',deduplication.id='{{deduplicationID}}',message.group.id='group-1',@map(type='xml') )define stream outStream(symbol string, deduplicationID string);
Above example demonstrate how an SQS sink is getting configured in order to publish messages to a SQS FIFO queue.
Once an event is received by outStream, an xml message will be generated by 'xml' mapper from the attribute values of the event. SQS sink will connect to the queue using provided configurations and send the messages to the queue.
For each message deduplciation id will be selected from the attriibute 'deduplicationID' in the outStream.
Source
sqs (Source)
SQS source allows users to connect and consume messages from a AWS SQS Queue. It has the ability to receive Text messages
Syntax@source(type="sqs", queue="<STRING>", access.key="<STRING>", secret.key="<STRING>", region="<STRING>", polling.interval="<INT>", wait.time="<INT>", max.number.of.messages="<INT>", visibility.timeout="<INT>", delete.messages="<BOOL>", delete.retry.interval="<INT>", max.number.of.delete.retry.attempts="<INT>", number.of.parallel.consumers="<INT>", @map(...)))
QUERY PARAMETERS
Name | Description | Default Value | Possible Data Types | Optional | Dynamic |
---|---|---|---|---|---|
queue | Queue name which SQS Source should subscribe to |
STRING | No | No | |
access.key | Access Key for the Amazon Web Services. (This is a mandatory field and should be provided either in the deployment.yml or in the source definition itself) |
null | STRING | Yes | No |
secret.key | Secret Key of the Amazon User. (This is a mandatory field and should be provided either in the deployment.yml or in the source definition itself) |
null | STRING | Yes | No |
region | Amazon Web Service Region |
STRING | No | No | |
polling.interval | Interval (in milliseconds) between two message retrieval operations |
INT | No | No | |
wait.time | Maximum amount (in seconds) that a polling call will wait for a message to become available in the queue |
-1 | INT | Yes | No |
max.number.of.messages | Maximum number of messages retrieved from the queue per polling call (Actual maybe smaller than this even if there's more messages in the queue) |
1 | INT | No | No |
visibility.timeout | The length of time (in seconds) for which a message received from a queue will be invisible to other consumers(only applicable if consumer doesn't purge the received messages from the queue). |
-1 | INT | Yes | No |
delete.messages | Should the message be deleted from the queue after consuming it. |
delete.messages | BOOL | Yes | No |
delete.retry.interval | Time interval (in milliseconds) consumer should retry to delete a message in the case of failure during a message delete operation. |
5000 | INT | Yes | No |
max.number.of.delete.retry.attempts | Maximum number retry attempts to be performed in case of a failure. |
10 | INT | Yes | No |
number.of.parallel.consumers | Size of the thread pool that should be used for polling. |
1 | INT | No | No |
Examples EXAMPLE 1
@source(type='sqs',queue='http://aws.sqs.queue.url',access.key='aws.access.key',secret.key='aws.secret.key',region='us-east-2',polling.interval='5000',max.number.of.messages='10',number.of.parallel.consumers='1',purge.messages='true',wait.time='2',visibility.timeout='30',delete.retry.interval='1000',max.number.of.delete.retry.attempts='10',@map(type='xml',enclosing.element="//events",@attributes(symbol='symbol', message_id='trp:MESSAGE_ID') ))define stream inStream (symbol string, message_id string);
Above example demonstrate how an SQS source is getting configured in order to consume messages from an SQS queue.
SQS source will establish the connection to a queue using given configurations and start consuming xml messages from the queue.
Once a message is received by the source from the given queue, 'xml' mapper will generate a siddhi event from that message and pass it to the inStream.