EventReaderService

EventReaderService/ERs is a subsystem designed to read events coming from external sources and convert them into internal ones. The converted events are then sent to other CGRateS subsystems, like SessionS where further processing logic is applied to them.

The translation between external and internal events is done based on field mapping, defined in JSON configuration.

Configuration

The EventReaderService is configured within ers section from JSON configuration.

Sample config

With explanations in the comments:

"ers": {
       "enabled": true,                                        // enable the service
       "sessions_conns": ["*internal"],        // connection towards SessionS
       "readers": [                                            // list of active readers
               {
                       "id": "file_reader2",           // file_reader2 reader
                       "run_delay":  "-1",                     // reading of events it is triggered outside of ERs
                       "opts": {
                               "csvFieldSeparator":";" // field separator definition
                       },
                       "type": "*file_csv",            // type of reader, *file_csv can read .csv files
                       "flags": [                                      // influence processing logic within CGRateS workflow
                               "*cdrs",                                //   *cdrs will create CDRs
                               "*log"                                  //   *log will log the events to syslog
                       ],
                       "source_path": "/tmp/ers2/in",          // location of the files
                       "processed_path": "/tmp/ers2/out",      // move the files here once processed
                       "fields":[                                      // mapping definition between line index in the file and CGRateS field
                               {
                                       "tag": "OriginID",                      // OriginID together with OriginHost will
                                       "path": "*cgreq.OriginID",      //   uniquely identify the session on CGRateS side
                                       "type": "*variable",
                                       "value": "~*req.0",q            // take the content from line index 0
                                       "mandatory": true                       //   in the request file
                               },
                               {
                                       "tag": "RequestType",           // RequestType instructs SessionS
                                       "path": "*cgreq.RequestType",//   about charging type to apply for the event
                                       "type": "*variable",
                                       "value": "~*req.1",
                                       "mandatory": true
                               },
                               {
                                       "tag": "Category",                      // Category serves for ataching Account
                                       "path": "*cgreq.Category",      //   and RatingProfile to the request
                                       "type": "*constant",
                                       "value": "call",
                                       "mandatory": true
                               },
                               {
                                       "tag": "Account",                       // Account is required by charging
                                       "path": "*cgreq.Account",
                                       "type": "*variable",
                                       "value": "~*req.3",
                                       "mandatory": true
                               },
                               {
                                       "tag": "Subject",                       // Subject is required by charging
                                       "path": "*cgreq.Subject",
                                       "type": "*variable",
                                       "value": "~*req.3",
                                       "mandatory": true
                               },
                               {
                                       "tag": "Destination",           // Destination is required by charging
                                       "path": "*cgreq.Destination",
                                       "type": "*variable",
                                       "value": "~*req.4:s/0([1-9]\\d+)/+49${1}/",
                                       "mandatory": true                       // Additional mediation is performed on number format
                               },
                               {
                                       "tag": "AnswerTime",            // AnswerTime is required by charging
                                       "path": "*cgreq.AnswerTime",
                                       "type": "*variable",
                                       "value": "~*req.5",
                                       "mandatory": true
                               },
                               {
                                       "tag": "Usage",                         // Usage is required by charging
                                       "path": "*cgreq.Usage",
                                       "type": "*variable",
                                       "value": "~*req.6",
                                       "mandatory": true
                               },
                               {
                                       "tag": "HDRExtra1",                     // HDRExtra1 is transparently stored into CDR
                                       "path": "*cgreq.HDRExtra1",     //   as extra field not used by CGRateS
                                       "type": "*composed",
                                       "value": "~*req.6",
                                       "mandatory": true
                               }
                       ],
               }
       ]
}

Config params

Most of the parameters are explained in JSON configuration, hence we mention here only the ones where additional info is necessary or there will be particular implementation for EventReaderService.

readers

List of reader profiles which ERs manages. Simultaneous readers of the same type are possible.

id

Reader identificator, used mostly for debug. The id should be unique per each reader since it can influence updating configuration from different .json configuration.

type

Reader type. Following types are implemented:

*file_csv

Reader for comma separated files.

*file_xml

Reader for .xml formatted files.

*file_fwv

Reader for fixed width value formatted files.

*file_json

Reader for *json formatted files.

*kafka_json_map

Reader for hashmaps within Kafka database.

*sql

Reader for generic content out of SQL databases. Supported databases are: MySQL, PostgreSQL and MSSQL.

*amqp_json_map

Reader for AMQP v0.9.1 messaging.

*amqpv1_json_map

Reader for AMQP v1.0 messaging.

*s3_json_map

Reader for S3 events.

*sqs_json_map

Reader for SQS events.

*nats_json_map

Reader for NATS events.

run_delay

Duration interval between consecutive reads from source. If 0 or less, ERs relies on external source (ie. Linux inotify for files) for starting the reading process.

start_delay

A duration to delay the reader from starting to read events on engine start.

concurrent_requests

Limits the number of concurrent reads from source (ie: the number of simultaneously opened files).

source_path

Path towards the events source

processed_path

Optional path for moving the events source to after processing.

tenant

Will auto-populate the Tenant within the API calls sent to CGRateS. It has the form of a RSRParser. If undefined, default one from general section will be used.

timezone

Defines the timezone for source content which does not carry that information. If undefined, default one from general section will be used.

filters

List of filters to pass for the reader to process the event. For the dynamic content (prefixed with ~) following special variables are available:

*vars

Request related shared variables between processors, populated especially by core functions. The data put inthere is not automatically transfered into requests sent to CGRateS, unless instructed inside templates.

*tmp

Temporary container to be used when exchanging information between fields.

*req

Request read from the source. In case of file content without field name, the index will be passed instead of field source path.

*hdr

Header values (available only in case of *file_fwv). In case of file content without field name, the index will be passed instead of field source path.

*trl

Trailer values (available only in case of *file_fwv). In case of file content without field name, the index will be passed instead of field source path.

flags

Special tags enforcing the actions/verbs done on an event. There are two types of flags: main and auxiliary.

There can be any number of flags or combination of those specified in the list however the flags have priority one against another and only some simultaneous combinations of main flags are possible.

The main flags will select mostly the action taken on a request.

The auxiliary flags only make sense in combination with main ones.

Implemented main flags are (in order of priority, and not working simultaneously unless specified):

*log

Logs the Event read. Can be used together with other main flags.

*none

Disable transfering the Event from Reader to CGRateS side.

*dryrun

Together with not transfering the Event on CGRateS side will also log it, useful for troubleshooting.

*authorize

Sends the Event for authorization on CGRateS.

Auxiliary flags available: *attributes, *thresholds, *stats, *resources, *accounts, *routes, *routes_ignore_errors, *routes_event_cost, *routes_maxcost which are used to influence the auth behavior on CGRateS side. More info on that can be found on the SessionS component’s API behavior.

*initiate

Initiates a session out of Event on CGRateS side.

Auxiliary flags available: *attributes, *thresholds, *stats, *resources, *accounts which are used to influence the behavior on CGRateS side.

*update

Updates a session with the Event on CGRateS side.

Auxiliary flags available: *attributes, *accounts which are used to influence the behavior on CGRateS side.

*terminate

Terminates a session using the Event on CGRateS side.

Auxiliary flags available: *thresholds, *stats, *resources, *accounts which are used to influence the behavior on CGRateS side.

*message

Process the Event as individual message charging on CGRateS side.

Auxiliary flags available: *attributes, *thresholds, *stats, *resources, *accounts, *routes, *routes_ignore_errors, *routes_event_cost, *routes_maxcost which are used to influence the behavior on CGRateS side.

*event

Process the Event as generic event on CGRateS side.

Auxiliary flags available: all flags supported by the “SessionSv1.ProcessEvent” generic API

*cdrs

Build a CDR out of the Event on CGRateS side. Can be used simultaneously with other flags (except *dryrun)

*export

Process the event read, and send the processed event to EEs. Can be used simultaneously with other flags.

reconnects

The amount retries to reestablish the connection in case of connection loss for AMQP. -1 retry indefinitely.

max_reconnect_interval

The duration to wait in between retries to reconnect on a connection loss for AMQP.

ees_ids

The IDs of exporters in EEs which you want to make use of, when *export flag is present in the reader. When an event is read and processed from the reader in use, the processed event will be sent to those specific IDs in EEs.

ees_success_ids

When an ERs reader processes an event successfuly, it will send the raw(unprocessed) event that it read, to the specified EEs exporter IDs matching the ees_success_ids.

ees_failed_ids

When an ERs reader fails to process an event, it will send the raw(unprocessed) event that it read, to the specified EEs exporter IDs matching the ees_failed_ids.

opts

Additional options specific and non specific to reader types.

Partial:

partialPath

The path were the partial events will be sent.

partialCacheAction

The action that will be executed for the partial events that are not matched with other events:

*none - Nothing happens.

*post_cdr - Try to merge partial events and post them back to ERs for processing.

*dump_to_file - Apply the cache_dump_fields to the partial events and write the record to file in CSV format.

*dump_to_json - Apply the cache_dump_fields to the partial events and write the record to file in JSON format.

partialOrderField

The field after what the events are ordered when merged.

partialcsvFieldSeparator

The separator used when dumping the event fields.

FileCSV:

csvRowLength

Number of fields from csv file, -1 to disable checking, 0 to inherit the lenght of first record.

csvFieldSeparator

Define what separator is used in the CSV fields that will be read.

csvHeaderDefineChar

The starting character for the header definition used in CSV files.

csvLazyQuotes

Make it true if a quote may appear in an unquoted field and a non-doubled quote may appear in a quoted field.

FileXML reader:

xmlRootPath

The prefix path applied to each xml element read.

AMQP and AMQPv1:

amqpQueueID

ID for the primary queue where messages are consumed. (Used for AMQP 0.9.1 and 1.0)

amqpUsername

Username for SASL PLAIN auth, exclusive to AMQP 1.0, often representing the policy name.

amqpPassword

Password for authentication, exclusive to AMQP 1.0.

amqpConsumerTag

Unique tag for the consumer, useful for message tracking and consumer management. (Used for AMQP 0.9.1 and 1.0)

amqpExchange

Name of the primary exchange where messages will be published. Exclusive to AMQP 0.9.1

amqpExchangeType

Type of the primary exchange (direct, topic, fanout, headers). Exclusive to AMQP 0.9.1

amqpRoutingKey

Key used for routing messages to the primary queue. Exclusive to AMQP 0.9.1

Kafka:

kafkaTopic

The topic from were the events are read.

kafkaGroupID

The group that reads the events.

kafkaMaxWait

The maximum amount of time to wait for new data to come.

kafkaTLS

If true it will try to authenticate the client.

kafkaCAPath

Path to certificate authority file.

kafkaSkipTLSVerify

If true it will skip certificate verification.

SQL:

sqlDBName

The name of the database from where the events are read.

sqlTableName

The name of the table from where the events are read.

sqlBatchSize

Number of SQL rows that can be selected at a time. 0 or lower for unlimited.

sqlDeleteIndexedFields

List of fields to DELETE from the table.

pgSSLMode

The SSL mode for postgres db.

SQS:

sqsQueueID

The queue ID for SQS readers from where the events are read.

awsRegion

The region of the AWS SQS bucket.

awsKey

The key of the AWS SQS bucket.

awsSecret

The secret of the AWS SQS bucket.

awsToken

The token of the AWS SQS bucket.

S3:

s3BucketID

The bucket ID for S3 readers from where the events are read.

awsRegion

The region of the AWS S3 bucket.

awsKey

The key of the AWS S3 bucket.

awsSecret

The secret of the AWS S3 bucket.

awsToken

The token of the AWS S3 bucket.

NATS:

natsJetStream

When true, the nats reader uses the JetStream.

natsConsumerName

Name of the JetStream consumer. Used when natsJetStream is enabled.

natsStreamName

JetStream stream name from which the consumer will read messages.

natsSubject

Specifies the NATS subject to subscribe to for receiving messages.

natsQueueID

Queue ID for the consumer to listen to.

natsJWTFile

Path to the NATS JWT file. Can be a chained JWT or a user JWT file.

natsSeedFile

Path to the NATS seed file used for signing the JWT. Only used if natsJWTFile is provided.

natsCertificateAuthority

Path to the custom certificate authority file.

natsClientCertificate

Path to the client certificate used for TLS.

natsClientKey

Path to the client private key used for TLS.

natsJetStreamMaxWait

Maximum time to wait for a JetStream response.

fields

List of fields for read event. One field template can contain the following parameters.

path

Defined within field, specifies the path where the value will be written. Possible values:

*vars

Write the value in the special container, *vars, available for the duration of the request.

*cgreq

Write the value in the request object which will be sent to CGRateS side.

*hdr

Header values (available only in case of *file_fwv). In case of file content without field name, the index will be passed instead of field source path.

*trl

Trailer values (available only in case of *file_fwv). In case of file content without field name, the index will be passed instead of field source path.

type

Defined within field, specifies the logic type to be used when writing the value of the field. Possible values:

*none

Pass

*filler

Fills the values with an empty string

*constant

Writes out a constant

*variable

Writes out the variable value, overwriting previous one set

*composed

Writes out the variable value, postpending to previous value set

*usage_difference

Calculates the usage difference between two arguments passed in the value. Requires 2 arguments: $stopTime;$startTime

*sum

Calculates the sum of all arguments passed within value. It supports summing up duration, time, float, int autodetecting them in this order.

*difference

Calculates the difference between all arguments passed within value. Possible value types are (in this order): duration, time, float, int.

*value_exponent

Calculates the exponent of a value. It requires two values: $val;$exp

*template

Specifies a template of fields to be injected here. Value should be one of the template ids defined.

value

The captured value. Possible prefixes for dynamic values are:

*req

Take data from current request coming from the reader.

*vars

Take data from internal container labeled *vars. This is valid for the duration of the request.

*cgreq

Take data from the request being sent to SessionS. This is valid for one active request.

*cgrep

Take data from the reply coming from SessionS. This is valid for one active reply.

mandatory

Makes sure that the field cannot have empty value (errors otherwise).

tag

Used for debug purposes in logs.

width

Used to control the formatting, enforcing the final value to a specific number of characters.

strip

Used when the value is higher than width allows it, specifying the strip strategy. Possible values are:

*right

Strip the suffix.

*xright

Strip the suffix, postpending one x character to mark the stripping.

*left

Strip the prefix.

*xleft

Strip the prefix, prepending one x character to mark the stripping.

padding

Used to control the formatting. Applied when the data is smaller than the width. Possible values are:

*right

Suffix with spaces.

*left

Prefix with spaces.

*zeroleft

Prefix with 0 chars.

partial_commit_fields

The fields are written in the same way as import fields template. The fields are used for events which were read but werent fully processed. If the coming events that are being read, match the filters set in these partial_commit_fields, they will be used to fully create and process that partial event.

cache_dump_fields

The fields are written in the same way as import fields template. The fields are used as a template to write the fields of the partial events into dump files, when the TTL expires for that partial event, or when that cache element is evicted.