EventReaderService
EventReaderService/ERs is a subsystem designed to read events coming from external sources and convert them into internal ones. The converted events are then sent to other CGRateS subsystems, like SessionS where further processing logic is applied to them.
The translation between external and internal events is done based on field mapping, defined in JSON configuration.
Configuration
The EventReaderService is configured within ers section from JSON configuration.
Sample config
With explanations in the comments:
"ers": {
"enabled": true, // enable the service
"sessions_conns": ["*internal"], // connection towards SessionS
"readers": [ // list of active readers
{
"id": "file_reader2", // file_reader2 reader
"run_delay": "-1", // reading of events it is triggered outside of ERs
"opts": {
"csvFieldSeparator":";" // field separator definition
},
"type": "*file_csv", // type of reader, *file_csv can read .csv files
"flags": [ // influence processing logic within CGRateS workflow
"*cdrs", // *cdrs will create CDRs
"*log" // *log will log the events to syslog
],
"source_path": "/tmp/ers2/in", // location of the files
"processed_path": "/tmp/ers2/out", // move the files here once processed
"fields":[ // mapping definition between line index in the file and CGRateS field
{
"tag": "OriginID", // OriginID together with OriginHost will
"path": "*cgreq.OriginID", // uniquely identify the session on CGRateS side
"type": "*variable",
"value": "~*req.0",q // take the content from line index 0
"mandatory": true // in the request file
},
{
"tag": "RequestType", // RequestType instructs SessionS
"path": "*cgreq.RequestType",// about charging type to apply for the event
"type": "*variable",
"value": "~*req.1",
"mandatory": true
},
{
"tag": "Category", // Category serves for ataching Account
"path": "*cgreq.Category", // and RatingProfile to the request
"type": "*constant",
"value": "call",
"mandatory": true
},
{
"tag": "Account", // Account is required by charging
"path": "*cgreq.Account",
"type": "*variable",
"value": "~*req.3",
"mandatory": true
},
{
"tag": "Subject", // Subject is required by charging
"path": "*cgreq.Subject",
"type": "*variable",
"value": "~*req.3",
"mandatory": true
},
{
"tag": "Destination", // Destination is required by charging
"path": "*cgreq.Destination",
"type": "*variable",
"value": "~*req.4:s/0([1-9]\\d+)/+49${1}/",
"mandatory": true // Additional mediation is performed on number format
},
{
"tag": "AnswerTime", // AnswerTime is required by charging
"path": "*cgreq.AnswerTime",
"type": "*variable",
"value": "~*req.5",
"mandatory": true
},
{
"tag": "Usage", // Usage is required by charging
"path": "*cgreq.Usage",
"type": "*variable",
"value": "~*req.6",
"mandatory": true
},
{
"tag": "HDRExtra1", // HDRExtra1 is transparently stored into CDR
"path": "*cgreq.HDRExtra1", // as extra field not used by CGRateS
"type": "*composed",
"value": "~*req.6",
"mandatory": true
}
],
}
]
}
Config params
Most of the parameters are explained in JSON configuration, hence we mention here only the ones where additional info is necessary or there will be particular implementation for EventReaderService.
- readers
List of reader profiles which ERs manages. Simultaneous readers of the same type are possible.
- id
Reader identificator, used mostly for debug. The id should be unique per each reader since it can influence updating configuration from different .json configuration.
- type
Reader type. Following types are implemented:
- *file_csv
Reader for comma separated files.
- *file_xml
Reader for .xml formatted files.
- *file_fwv
Reader for fixed width value formatted files.
- *file_json
Reader for *json formatted files.
- *kafka_json_map
Reader for hashmaps within Kafka database.
- *sql
Reader for generic content out of SQL databases. Supported databases are: MySQL, PostgreSQL and MSSQL.
- *amqp_json_map
Reader for AMQP v0.9.1 messaging.
- *amqpv1_json_map
Reader for AMQP v1.0 messaging.
- *s3_json_map
Reader for S3 events.
- *sqs_json_map
Reader for SQS events.
- *nats_json_map
Reader for NATS events.
- run_delay
Duration interval between consecutive reads from source. If 0 or less, ERs relies on external source (ie. Linux inotify for files) for starting the reading process.
- start_delay
A duration to delay the reader from starting to read events on engine start.
- concurrent_requests
Limits the number of concurrent reads from source (ie: the number of simultaneously opened files).
- source_path
Path towards the events source
- processed_path
Optional path for moving the events source to after processing.
- tenant
Will auto-populate the Tenant within the API calls sent to CGRateS. It has the form of a RSRParser. If undefined, default one from general section will be used.
- timezone
Defines the timezone for source content which does not carry that information. If undefined, default one from general section will be used.
- filters
List of filters to pass for the reader to process the event. For the dynamic content (prefixed with ~) following special variables are available:
- *vars
Request related shared variables between processors, populated especially by core functions. The data put inthere is not automatically transfered into requests sent to CGRateS, unless instructed inside templates.
- *tmp
Temporary container to be used when exchanging information between fields.
- *req
Request read from the source. In case of file content without field name, the index will be passed instead of field source path.
- *hdr
Header values (available only in case of *file_fwv). In case of file content without field name, the index will be passed instead of field source path.
- *trl
Trailer values (available only in case of *file_fwv). In case of file content without field name, the index will be passed instead of field source path.
- flags
Special tags enforcing the actions/verbs done on an event. There are two types of flags: main and auxiliary.
There can be any number of flags or combination of those specified in the list however the flags have priority one against another and only some simultaneous combinations of main flags are possible.
The main flags will select mostly the action taken on a request.
The auxiliary flags only make sense in combination with main ones.
Implemented main flags are (in order of priority, and not working simultaneously unless specified):
- *log
Logs the Event read. Can be used together with other main flags.
- *none
Disable transfering the Event from Reader to CGRateS side.
- *dryrun
Together with not transfering the Event on CGRateS side will also log it, useful for troubleshooting.
- *authorize
Sends the Event for authorization on CGRateS.
Auxiliary flags available: *attributes, *thresholds, *stats, *resources, *accounts, *routes, *routes_ignore_errors, *routes_event_cost, *routes_maxcost which are used to influence the auth behavior on CGRateS side. More info on that can be found on the SessionS component’s API behavior.
- *initiate
Initiates a session out of Event on CGRateS side.
Auxiliary flags available: *attributes, *thresholds, *stats, *resources, *accounts which are used to influence the behavior on CGRateS side.
- *update
Updates a session with the Event on CGRateS side.
Auxiliary flags available: *attributes, *accounts which are used to influence the behavior on CGRateS side.
- *terminate
Terminates a session using the Event on CGRateS side.
Auxiliary flags available: *thresholds, *stats, *resources, *accounts which are used to influence the behavior on CGRateS side.
- *message
Process the Event as individual message charging on CGRateS side.
Auxiliary flags available: *attributes, *thresholds, *stats, *resources, *accounts, *routes, *routes_ignore_errors, *routes_event_cost, *routes_maxcost which are used to influence the behavior on CGRateS side.
- *event
Process the Event as generic event on CGRateS side.
Auxiliary flags available: all flags supported by the “SessionSv1.ProcessEvent” generic API
- *cdrs
Build a CDR out of the Event on CGRateS side. Can be used simultaneously with other flags (except *dryrun)
- *export
Process the event read, and send the processed event to EEs. Can be used simultaneously with other flags.
- reconnects
The amount retries to reestablish the connection in case of connection loss for AMQP. -1 retry indefinitely.
- max_reconnect_interval
The duration to wait in between retries to reconnect on a connection loss for AMQP.
- ees_ids
The IDs of exporters in EEs which you want to make use of, when *export flag is present in the reader. When an event is read and processed from the reader in use, the processed event will be sent to those specific IDs in EEs.
- ees_success_ids
When an ERs reader processes an event successfuly, it will send the raw(unprocessed) event that it read, to the specified EEs exporter IDs matching the ees_success_ids.
- ees_failed_ids
When an ERs reader fails to process an event, it will send the raw(unprocessed) event that it read, to the specified EEs exporter IDs matching the ees_failed_ids.
- opts
Additional options specific and non specific to reader types.
Partial:
- partialPath
The path were the partial events will be sent.
- partialCacheAction
The action that will be executed for the partial events that are not matched with other events:
*none - Nothing happens.
*post_cdr - Try to merge partial events and post them back to ERs for processing.
*dump_to_file - Apply the cache_dump_fields to the partial events and write the record to file in CSV format.
*dump_to_json - Apply the cache_dump_fields to the partial events and write the record to file in JSON format.
- partialOrderField
The field after what the events are ordered when merged.
- partialcsvFieldSeparator
The separator used when dumping the event fields.
FileCSV:
- csvRowLength
Number of fields from csv file, -1 to disable checking, 0 to inherit the lenght of first record.
- csvFieldSeparator
Define what separator is used in the CSV fields that will be read.
- csvHeaderDefineChar
The starting character for the header definition used in CSV files.
- csvLazyQuotes
Make it true if a quote may appear in an unquoted field and a non-doubled quote may appear in a quoted field.
FileXML reader:
- xmlRootPath
The prefix path applied to each xml element read.
AMQP and AMQPv1:
- amqpQueueID
ID for the primary queue where messages are consumed. (Used for AMQP 0.9.1 and 1.0)
- amqpUsername
Username for SASL PLAIN auth, exclusive to AMQP 1.0, often representing the policy name.
- amqpPassword
Password for authentication, exclusive to AMQP 1.0.
- amqpConsumerTag
Unique tag for the consumer, useful for message tracking and consumer management. (Used for AMQP 0.9.1 and 1.0)
- amqpExchange
Name of the primary exchange where messages will be published. Exclusive to AMQP 0.9.1
- amqpExchangeType
Type of the primary exchange (direct, topic, fanout, headers). Exclusive to AMQP 0.9.1
- amqpRoutingKey
Key used for routing messages to the primary queue. Exclusive to AMQP 0.9.1
Kafka:
- kafkaTopic
The topic from were the events are read.
- kafkaGroupID
The group that reads the events.
- kafkaMaxWait
The maximum amount of time to wait for new data to come.
- kafkaTLS
If true it will try to authenticate the client.
- kafkaCAPath
Path to certificate authority file.
- kafkaSkipTLSVerify
If true it will skip certificate verification.
SQL:
- sqlDBName
The name of the database from where the events are read.
- sqlTableName
The name of the table from where the events are read.
- sqlBatchSize
Number of SQL rows that can be selected at a time. 0 or lower for unlimited.
- sqlDeleteIndexedFields
List of fields to DELETE from the table.
- pgSSLMode
The SSL mode for postgres db.
SQS:
- sqsQueueID
The queue ID for SQS readers from where the events are read.
- awsRegion
The region of the AWS SQS bucket.
- awsKey
The key of the AWS SQS bucket.
- awsSecret
The secret of the AWS SQS bucket.
- awsToken
The token of the AWS SQS bucket.
S3:
- s3BucketID
The bucket ID for S3 readers from where the events are read.
- awsRegion
The region of the AWS S3 bucket.
- awsKey
The key of the AWS S3 bucket.
- awsSecret
The secret of the AWS S3 bucket.
- awsToken
The token of the AWS S3 bucket.
NATS:
- natsJetStream
When true, the nats reader uses the JetStream.
- natsConsumerName
Name of the JetStream consumer. Used when natsJetStream is enabled.
- natsStreamName
JetStream stream name from which the consumer will read messages.
- natsSubject
Specifies the NATS subject to subscribe to for receiving messages.
- natsQueueID
Queue ID for the consumer to listen to.
- natsJWTFile
Path to the NATS JWT file. Can be a chained JWT or a user JWT file.
- natsSeedFile
Path to the NATS seed file used for signing the JWT. Only used if natsJWTFile is provided.
- natsCertificateAuthority
Path to the custom certificate authority file.
- natsClientCertificate
Path to the client certificate used for TLS.
- natsClientKey
Path to the client private key used for TLS.
- natsJetStreamMaxWait
Maximum time to wait for a JetStream response.
- fields
List of fields for read event. One field template can contain the following parameters.
- path
Defined within field, specifies the path where the value will be written. Possible values:
- *vars
Write the value in the special container, *vars, available for the duration of the request.
- *cgreq
Write the value in the request object which will be sent to CGRateS side.
- *hdr
Header values (available only in case of *file_fwv). In case of file content without field name, the index will be passed instead of field source path.
- *trl
Trailer values (available only in case of *file_fwv). In case of file content without field name, the index will be passed instead of field source path.
- type
Defined within field, specifies the logic type to be used when writing the value of the field. Possible values:
- *none
Pass
- *filler
Fills the values with an empty string
- *constant
Writes out a constant
- *variable
Writes out the variable value, overwriting previous one set
- *composed
Writes out the variable value, postpending to previous value set
- *usage_difference
Calculates the usage difference between two arguments passed in the value. Requires 2 arguments: $stopTime;$startTime
- *sum
Calculates the sum of all arguments passed within value. It supports summing up duration, time, float, int autodetecting them in this order.
- *difference
Calculates the difference between all arguments passed within value. Possible value types are (in this order): duration, time, float, int.
- *value_exponent
Calculates the exponent of a value. It requires two values: $val;$exp
- *template
Specifies a template of fields to be injected here. Value should be one of the template ids defined.
- value
The captured value. Possible prefixes for dynamic values are:
- *req
Take data from current request coming from the reader.
- *vars
Take data from internal container labeled *vars. This is valid for the duration of the request.
- *cgreq
Take data from the request being sent to SessionS. This is valid for one active request.
- *cgrep
Take data from the reply coming from SessionS. This is valid for one active reply.
- mandatory
Makes sure that the field cannot have empty value (errors otherwise).
- tag
Used for debug purposes in logs.
- width
Used to control the formatting, enforcing the final value to a specific number of characters.
- strip
Used when the value is higher than width allows it, specifying the strip strategy. Possible values are:
- *right
Strip the suffix.
- *xright
Strip the suffix, postpending one x character to mark the stripping.
- *left
Strip the prefix.
- *xleft
Strip the prefix, prepending one x character to mark the stripping.
- padding
Used to control the formatting. Applied when the data is smaller than the width. Possible values are:
- *right
Suffix with spaces.
- *left
Prefix with spaces.
- *zeroleft
Prefix with 0 chars.
- partial_commit_fields
The fields are written in the same way as import fields template. The fields are used for events which were read but werent fully processed. If the coming events that are being read, match the filters set in these partial_commit_fields, they will be used to fully create and process that partial event.
- cache_dump_fields
The fields are written in the same way as import fields template. The fields are used as a template to write the fields of the partial events into dump files, when the TTL expires for that partial event, or when that cache element is evicted.