Public Repository

Last pushed: a month ago
Short Description
The Data Aggregator service
Full Description

sageaxcess-aggregator

The on-premise Data Aggregator - a service residing in the client network and accepting all the network events from
collectors
through ZMQ in order to pack them in Parquet files and send to our network,
namely to a receiver instance.

Since it's placed in the client network, aggregator relies on command-line args instead of Consul values.

+------------------------------------------------------+        +-------------------+
|                     +-----+                          |        |                   |
| Client network      | DNS +-------+---------------+  |        |  ChangeDynamix    |
|                     +-----+       |               |  |        |  network          |
|  +--------------<-------------+   | Aggregator    |  |        |                   |
|  |              |     HTTP    |   |               |  |        |  +-------------+  |
|  |Collector A   |  +----------+----->-----------+ |  | HTTP   |  |             |  |
|  |              |  |              | | Commands  <----------------+  Receiver   |  |
|  +--------------+---------+       | +-----------+ |  |        |  |             |  |
|                    |      |       |               |  |        |  |             |  |
|  +--------------<--+      |       | +-----------+ |  | HTTP   |  |             |  |
|  |              |         |ZMQ    | | Parquet   +---------------->             |  |
|  |Collector B   +------------------>+-----------+ |  |        |  |             |  |
|  |              |                 |               |  |        |  |             |  |
|  +--------------+                 +---------------+  |        |  +-------------+  |
|                                                      |        |                   |
+------------------------------------------------------+        +-------------------+

communication with collectors

ZMQ - network events

Aggregator accepts pre-generated --secretKey and --publicKey for ZMQ Curve encryption, and every collector
is supposed to know the public key. On start Aggregator binds an encrypted ZMQ PULL socket to tcp://host:port using
the pre-generated keypair,
where host and port values are configured by --host and --port keys respectively,
and collectors connect their encrypted PUSH sockets to this endpoint, using the same public key aggregator has,
and send their data, i.e. FlatBuffers-packed network events.
collector ZMQ writer plugin

HTTP - command distribution

Every now and then collectors' remote control plugin
addresses http://aggregator:8113/agent/ping?id=collector_id to get configuration updates, which he receives
JSON-formatted, if there's an update present which was not previously delivered to the collector.

comminucation with receiver

HTTP communication with receiver

Every once in a period configured by --batchTime aggregator sends a POST request to its receiver instance,
a URL configured by --receiver, with the following fields:

  • api-key - the aggregator's credential, configured by --apiKey
  • aggregator-id - configured by --uniqueId
  • file - a binary field, contents of a Parquet file representing accumulated messages of a certain type
  • type - the type of the messages in the file transmitted, one of GENERAL, FTP, HTTP, LDAP, SMB, SMTP, TDS, DCE_RPC, TELNET, SYSTEM_STATS
  • time - file generation time, mostly for diagnostica purposes, to be later included in the file name when it's placed on S3 by the receiver
  • description - an optional field, sent only on the first request, supposed to tell about this aggregator instance more than its mere ID

commands polling from receiver

Every once in --commandsPollInterval aggregator polls a url configured by --commands, and, in case of a new response contents,
clears distribution history and starts distributing the new command set to collectors.

DNS resolution

After pa rsing incomfing messages from their binary format, and before packing them into Parquet files,
aggregator uses an internal DNS server, configured by --dns, to resolve IPs from fields origIp and respIp
of GENERAL-typed messages and use the values to fill in the fields origH and respIp respectively.

sent Parquet file contents

Parquet files are composed according to schema, following the rule that one Parquet file
contains only one type of message, i.e. just one optional field of GeneralData, FtpData etc. is present.

Use --help to check command line args, note that AGGREGATOR_ARGS_FILE, if set, overrides command line input

$ docker run -it sageaxcess/data-aggregator --help
name: aegis-aggregator, version: 0.7.5, scalaVersion: 2.11.7, sbtVersion: 0.13.11
accepts zmq messages on provided host and port, puts messages to rolling files and sends them further to data receiver
Usage: aegis data aggregator [options]

  --host <value>
        zeromq sub socket ip, default is 127.0.0.1, 'localhost' isn't acceptable
  -p <value> | --port <value>
        zeromq sub socket port, default is 5555
  -s <value> | --shutdownPort <value>
        zeromq shutdown listener rep socket port to accept shutdown messages, default is 5557
  -b <value> | --batchTime <value>
        batch time in seconds, default is 1
  --dns <value>
        dns server to use for resolving hostnames inside client network
  -r <value> | --receiver <value>
        receiver url to send accumulated data to, default is http://172.17.0.1:8080/s3
  -o <value> | --commands <value>
        commands distribution url,
        normally the same endpoint as receiver, but with different path, like /commands,
        to get commands for distributing them to collectors,
        default is http://172.17.0.1:8080/commands
  -i <value> | --commandsPollInterval <value>
        commands polling interval in seconds, default is 60
  -a <value> | --apiKey <value>
        API key to authenticate with the data receiver
  -k <value> | --keep-temp-files <value>
        keep the temp files for later inspection, default is false
  -q <value> | --uniqueId <value>
        Unique ID to denote this instance of Data Aggregator
  -e <value> | --description <value>
        Verbose description transferred together with Unique ID of this instance
  -t <value> | --secretKey <value>
        ZMQ-Curve secret key, hex-encoded
  -c <value> | --publicKey <value>
        ZMQ-Curve public key, hex-encoded
  -l <value> | --loglevel <value>
        Override Akka logging level, default is INFO, use one of DEBUG, WARNING, ERROR, OFF
  -g <value> | --prometheusPushGateway <value>
        Prometheus push gateway, like is localhost:9091
  --ttl <value>            reverse dns lookup cache entries TTL in seconds, default is 3600
  --help
        provide a subscribing zmq socket address and keys to accept messages

Shutdown client

Aggregator accepts shutdown messages on port 5557 with a ZMQ.REP socket.
See shutdown client README

Fake data Docker image

You can build and run a Docker image for dev environment with both an aggregator and one collector
playing a tape of pre-recorded messages in a loop, thus taking care of message generation.
You can run it and see it communicating with other services, producing Parquet files: fakedata image

Getting the code

Use --recursive flag when running git clone:

git clone --recursive git@github.com:SageAxcess/scala-services-dependencies.git

Building Docker image

sbt docker:publishLocal

Running Docker image

Use the output of --help as a guide, mind the --logsEndpoint key, which goes to
log monitoring service and isn't handled by aggregator service and isn't explained by its help message.

$ docker run --restart="on-failure" -d sageaxcess/data-aggregator \  
--host 0.0.0.0 -p 5555 --dns $DNS_HOST \  
--receiver http://$RECEIVER_HOST/s3  \  
--apiKey XXXXXXXXXXXXXXXXXXX \  
--secretKey 693F66214F327A76785550413A5E253C3E6D70657A217B4155407A666A305876477B68495A2F5D36 \  
--publicKey 5A5832576F434D2D57367D667148464865326E3C4B74586B733C574C7A624E6355642D5239282E72 \  
--commands https://dev-receiver.sageaxcess.com/commands \  
--prometheusPushGateway localhost:9091 \  
--logsEndpoint https://dev-logs.sageaxcess.com

Metrics

Pass the address of Prometheus push gateway with -g key: https://github.com/prometheus/pushgateway
query the gateway to check accumulated metrics
access metrics on the specified port, meant to be scraped by Prometheus

$ curl localhost:9091/metrics

# HELP parquet_data summary parquet data size in bytes successfully written
# TYPE parquet_data counter
parquet_data{instance="010f6f8a-33d3-4ab0-a5d3-48be4d7b04d2",job="aggregator"} 5920498.0
# HELP parquet_writers simultaneously existing parquet writers
# TYPE parquet_writers gauge
parquet_writers{instance="010f6f8a-33d3-4ab0-a5d3-48be4d7b04d2",job="aggregator"} 7
# HELP uploads simultaneous uploads to receiver
# TYPE uploads gauge
uploads{instance="010f6f8a-33d3-4ab0-a5d3-48be4d7b04d2",job="aggregator"} 0
# HELP zmq ZMQ messages received
# TYPE zmq counter
zmq{instance="010f6f8a-33d3-4ab0-a5d3-48be4d7b04d2",job="aggregator"} 37957.0

Some JVM flags to make your dev launch settings more like those of the image

sbt -J-server -J-Xms300m -J-Xmx300m -J-XX:+UseNUMA -J-XX:+UseG1GC -J-XX:+AlwaysPreTouch -J-XX:+PerfDisableSharedMem -J-XX:+ParallelRefProcEnabled -Djava.library.path=/usr/local/lib "run --host 0.0.0.0 ..."

-XX:MaxGCPauseMillis=10
Docker Pull Command
Owner
sageaxcess

Comments (0)