API Docs - v1.0.5
Sink
sqs (Sink)
SQS sink allows users to connect and publish messages to an AWS SQS Queue. It has the ability to only publish Text messages
Syntax
@sink(type="sqs", queue="<STRING>", access.key="<STRING>", secret.key="<STRING>", region="<STRING>", message.group.id="<STRING>", deduplication.id="<STRING>", delay.interval="<INT>", @map(...)))
QUERY PARAMETERS
Name | Description | Default Value | Possible Data Types | Optional | Dynamic |
---|---|---|---|---|---|
queue | Queue url which SQS Sink should connect to | STRING | No | No | |
access.key | Access Key for the Amazon Web Services. (This is a mandatory field and should be provided either in the deployment.yml or in the sink definition itself) | none | STRING | Yes | No |
secret.key | Secret Key of the Amazon User. (This is a mandatory field and should be provided either in the deployment.yml or in the sink definition itself) | none | STRING | Yes | No |
region | Amazon Web Service Region | STRING | No | No | |
message.group.id | ID of the group that the message belong to(only applicable for FIFO Queues) | null | STRING | Yes | Yes |
deduplication.id | ID by which a FIFO queue identifies the duplication in the queue(only applicable for FIFO queues) | null | STRING | Yes | Yes |
delay.interval | Time in seconds for how long the message remain in the queue until it is available for the consumers to consume. | -1 | INT | Yes | No |
Examples EXAMPLE 1
@sink(type='sqs',queue='<queue_url>',access.key='<aws_access_key>',secret.key='<aws_secret_key>',region='<region>',delay.interval='5',deduplication.id='{{deduplicationID}}',message.group.id='charuka',@map(type='xml') )define stream outStream(symbol string, deduplicationID string);
Following Example shows how to define a SQS sink to publish messages to the service
Source
sqs (Source)
SQS source allows users to connect and consume messages from a AWS SQS Queue. It has the ability to receive Text messages
Syntax
@source(type="sqs", queue="<STRING>", access.key="<STRING>", secret.key="<STRING>", region="<STRING>", polling.interval="<INT>", wait.time="<INT>", max.number.of.messages="<INT>", visibility.timeout="<INT>", delete.messages="<BOOL>", delete.retry.interval="<INT>", max.number.of.delete.retry.attempts="<INT>", number.of.parallel.consumers="<INT>", @map(...)))
QUERY PARAMETERS
Name | Description | Default Value | Possible Data Types | Optional | Dynamic |
---|---|---|---|---|---|
queue | Queue name which SQS Source should subscribe to | STRING | No | No | |
access.key | Access Key for the Amazon Web Services. (This is a mandatory field and should be provided either in the deployment.yml or in the source definition itself) | null | STRING | Yes | No |
secret.key | Secret Key of the Amazon User. (This is a mandatory field and should be provided either in the deployment.yml or in the source definition itself) | null | STRING | Yes | No |
region | Amazon Web Service Region | STRING | No | No | |
polling.interval | Interval (in milliseconds) between two message retrieval operations | INT | No | No | |
wait.time | Maximum amount (in seconds) that a polling call will wait for a message to become available in the queue | -1 | INT | Yes | No |
max.number.of.messages | Maximum number of messages retrieved from the queue per polling call (Actual maybe smaller than this even if there's more messages in the queue) | 1 | INT | No | No |
visibility.timeout | The length of time (in seconds) for which a message received from a queue will be invisible to other consumers(only applicable if consumer doesn't purge the received messages from the queue). | -1 | INT | Yes | No |
delete.messages | Should the message be deleted from the queue after consuming it. | delete.messages | BOOL | Yes | No |
delete.retry.interval | Time interval (in milliseconds) consumer should retry to delete a message in the case of failure during a message delete operation. | 5000 | INT | Yes | No |
max.number.of.delete.retry.attempts | Maximum number retry attempts to be performed in case of a failure. | 10 | INT | Yes | No |
number.of.parallel.consumers | Size of the thread pool that should be used for polling. | 1 | INT | No | No |
Examples EXAMPLE 1
@source(type='sqs',queue='<queue url>',access.key='<access_key>',secret.key='<secret_key>',region='us-east-2',polling.interval='5000',max.number.of.messages='10',number.of.parallel.consumers='1',purge.messages='true',wait.time='2',visibility.timeout='30',delete.retry.interval='1000',max.number.of.delete.retry.attempts='10',@map(type='xml',enclosing.element="//events",@attributes(symbol='symbol', message_id='trp:MESSAGE_ID') ))define stream inStream (symbol string, message_id string);
Following Example shows how to define a SQS source to receive messages from the service