API Docs - v2.0.3¶
Tested Siddhi Core version: 5.1.5
It could also support other Siddhi Core minor versions.
Sink¶
file (Sink)¶
File Sink can be used to publish (write) event data which is processed within siddhi to files.
Siddhi-io-file sink provides support to write both textual and binary data into files
@sink(type="file", file.uri="<STRING>", append="<BOOL>", add.line.separator="<BOOL>", @map(...)))
QUERY PARAMETERS
Name | Description | Default Value | Possible Data Types | Optional | Dynamic |
---|---|---|---|---|---|
file.uri | Used to specify the file for data to be written. |
STRING | No | Yes | |
append | This parameter is used to specify whether the data should be append to the file or not. |
true | BOOL | Yes | No |
add.line.separator | This parameter is used to specify whether events added to the file should be separated by a newline. |
true. (However, if csv mapper is used, it is false) | BOOL | Yes | No |
Examples EXAMPLE 1
@sink(type='file', @map(type='json'), append='false', file.uri='/abc/{{symbol}}.txt') define stream BarStream (symbol string, price float, volume long);
Under above configuration, for each event, a file will be generated if there's no such a file,and then data will be written to that file as json messagesoutput will looks like below.
{
"event":{
"symbol":"WSO2",
"price":55.6,
"volume":100
}
}
Source¶
file (Source)¶
File Source provides the functionality for user to feed data to siddhi from files. Both text and binary files are supported by file source.
Syntax@source(type="file", dir.uri="<STRING>", file.uri="<STRING>", mode="<STRING>", tailing="<BOOL>", action.after.process="<STRING>", action.after.failure="<STRING>", move.after.process="<STRING>", move.after.failure="<STRING>", begin.regex="<STRING>", end.regex="<STRING>", file.polling.interval="<STRING>", dir.polling.interval="<STRING>", timeout="<STRING>", file.read.wait.timeout="<STRING>", @map(...)))
QUERY PARAMETERS
Name | Description | Default Value | Possible Data Types | Optional | Dynamic |
---|---|---|---|---|---|
dir.uri | Used to specify a directory to be processed. |
STRING | No | No | |
file.uri | Used to specify a file to be processed. |
STRING | No | No | |
mode | This parameter is used to specify how files in given directory should.Possible values for this parameter are, |
line | STRING | Yes | No |
tailing | This can either have value true or false. By default it will be true. |
true | BOOL | Yes | No |
action.after.process | This parameter is used to specify the action which should be carried out |
delete | STRING | Yes | No |
action.after.failure | This parameter is used to specify the action which should be carried out if a failure occurred during the process. |
delete | STRING | Yes | No |
move.after.process | If action.after.process is MOVE, user must specify the location to move consumed files using 'move.after.process' parameter. |
STRING | No | No | |
move.after.failure | If action.after.failure is MOVE, user must specify the location to move consumed files using 'move.after.failure' parameter. |
STRING | No | No | |
begin.regex | This will define the regex to be matched at the beginning of the retrieved content. |
None | STRING | Yes | No |
end.regex | This will define the regex to be matched at the end of the retrieved content. |
None | STRING | Yes | No |
file.polling.interval | This parameter is used to specify the time period (in milliseconds) of a polling cycle for a file. |
1000 | STRING | Yes | No |
dir.polling.interval | This parameter is used to specify the time period (in milliseconds) of a polling cycle for a directory. |
1000 | STRING | Yes | No |
timeout | This parameter is used to specify the maximum time period (in milliseconds) for waiting until a file is processed. |
5000 | STRING | Yes | No |
file.read.wait.timeout | This parameter is used to specify the maximum time period (in milliseconds) till it waits before retrying to read the full file content. |
1000 | STRING | Yes | No |
Examples EXAMPLE 1
@source(type='file', mode='text.full', tailing='false' dir.uri='file://abc/xyz', action.after.process='delete', @map(type='json')) define stream FooStream (symbol string, price float, volume long);
Under above configuration, all the files in directory will be picked and read one by one.
In this case, it's assumed that all the files contains json valid json strings with keys 'symbol','price' and 'volume'.
Once a file is read, its content will be converted to an event using siddhi-map-json extension and then, that event will be received to the FooStream.
Finally, after reading is finished, the file will be deleted.
EXAMPLE 2
@source(type='file', mode='files.repo.line', tailing='true', dir.uri='file://abc/xyz', @map(type='json')) define stream FooStream (symbol string, price float, volume long);
Under above configuration, the first file in directory '/abc/xyz' will be picked and read line by line.
In this case, it is assumed that the file contains lines json strings.
For each line, line content will be converted to an event using siddhi-map-json extension and then, that event will be received to the FooStream.
Once file content is completely read, it will keep checking whether a new entry is added to the file or not.
If such entry is added, it will be immediately picked up and processed.