Reading and Writing data from Kafka

As a security data lake Guardium Big Data Intelligence can consume data from Kafka (as a Kafka consumer) and can generate data sent to be sent using Kafka (as a Kafka producer).

Producing Data and Sending it to Kafka

The data created by any job (relying on any report or query) can be sent over Kafka topics by using a job producer. Schedule your job as you would schedule any report and select the Kafka radio button. The pipeline you are using should create the data as you want it to appear on your queue. Normally you would ensure you project key and value strings and these will be placed onto the Kafka queue. If Guardium Big Data Intelligence does not find a key it will try to use an _id and if that is not found either it will ue the current time since epoch in milliseconds as the key. If it cannot find a value attribute as a string it will try to serialize the entire document as a JSON document.

If you can connect to Kafka without authentication you can enter the server, port and topic directly within the job definition (but in production you should still use aliases). If you use SSL or Kerberos authentication you need to use an alias to a configuration definition in /etc/sonar/sonar_kafka_aliases.yaml. In this release the combination of SSL and Kerberos is not supported for Guardium Big Data Intelligence as a Kafka producer (just one or the other; both are supported for a consumer).

In the job definition you can specify an alias or a list of aliases in an array for the servers. Each alias needs to already be defined in the mentioned yaml file. Aliases can be plain, SSL, Kerberos or SSL+Kerberos:

[
    {
        "name": "alias-plain",
        "bootstrap.servers": "192.168.1.1:9092",
        "security.protocol": "PLAINTEXT",
        "kafka.topics": ["topic1"],
        "rsyslog.host": "localhost",
        "rsyslog.port": 10546
    },
    {
        "name": "alias-ssl",
        "bootstrap.servers": "192.168.1.1:9092",
        "group.id": "group-ID",
        "security.protocol": "SSL",
        "ssl.truststore.location": "/trust_store",
        "ssl.truststore.password": "123456",
        "ssl.keystore.location": "/key_store",
        "ssl.keystore.password": "123456",
        "ssl.key.password": "123456",
        "ssl.key.location": "/path/to/key.pem",
        "ssl.certificate.location": "/pat/to/certificate.pem",
        "ssl.ca.location": "/path/to/ca-cert",
        "kafka-topics": ["topic1"],
        "rsyslog-host": "localhost",
        "rsyslog-port": 10546
    },
    {
        "name": "alias-kerberos",
        "bootstrap.servers": "192.168.1.1:9092",
        "group.id": "group-ID",
        "security.protocol": "SASL_PLAINTEXT",
        "sasl.mechanism": "GSSAPI",
        "sasl.kerberos.service.name": "my-kafka-service",
        "sasl.jaas.config": "com.sun.security.auth.module.Krb5LoginModule required \n
        useKeyTab=true \n useCache=false \n storeKey=false \n keyTab=
        '/home/user/my_keytab/user.keytab' \n serviceName='kafka' \n principal=
        'Principle_Name';",
        "kafka.topics": ["topic1", "topic2"],
        "rsyslog.host": "localhost",
        "rsyslog.port": 10546
    },
    {
        "name": "alias-kerberos-ssl",
        "kafka-host": "192.168.1.1",
        "kafka-port": 9095,
        "security.protocol": "SASL_SSL",
        "sasl.mechanism": "GSSAPI",
        "sasl.kerberos.service.name": "my-kafka-service",
        "sasl.jaas.config": "com.sun.security.auth.module.Krb5LoginModule required \n
        useKeyTab=true \n useCache=false \n storeKey=false \n keyTab=
        '/home/user/my_keytab/user.keytab' \n serviceName='kafka' \n principal=
        'Principle_Name';",
        "security.protocol": "SSL",
        "ssl.truststore.location": "/trust_store",
        "ssl.truststore.password": "123456",
        "ssl.keystore.location": "/key_store",
        "ssl.keystore.password": "123456",
        "ssl.key.password": "123456",
        "ssl.key.location": "/path/to/key.pem",
        "ssl.certificate.location": "/path/to/certificate.pem",
        "ssl.ca.location": "/path/to/ca-cert",
        "kafka.topics": ["topic1", "topic2"],
        "rsyslog.host": "localhost",
        "rsyslog.port": 10546
    }
]

Parameters:

  • “bootstrap.servers”:<kafka-host>:<kafka-port>
  • “ssl.truststore.location”: <full path name to the trust store> - used for kafka producer.
  • “ssl.keystore.location”: <full path name to the key store> - used for kafka producer.
  • “ssl.key.location”: <full path to the key file> - used for kafka consumer. can be generated from the key store - see here.
  • “ssl.key.password”: password for client key
  • “ssl.certificate.location”: <full path to the signed certificate pem file> - used for kafka consumer, needed only if ssl.client.auth=required in the kafka server settings can be generated from the key store - see here.
  • “ssl.ca.location”: <full path to the ca file> - used for kafka consumer.
  • “sasl.jaas.config”: There are 3 parameters that require setting
    1. keyTab=<full path name to the keytab file>
    2. serviceName=<name of the kerberos service> (same as “sasl.kerberos.service.name”)
    3. principal=<the principal logged in to kerberos>
  • “kafka.topics”: an array of 1 or more topics (used by kafka consumer)

Make sure a sonar user has access to the files in the following parameters:

  • “ssl.keystore.location”
  • “ssl.truststore.location”
  • “ssl.key.location”
  • “ssl.certificate.location”
  • “ssl.ca.location”

This means that these files should be in /etc/sonar directory, and under sonar-kafka-consumer user:

sudo chown sonar-kafka-consumer:sonar <file name>
sudo chmod 440 <file name>

When using SSL you need to create a trust store and a key store using standard tools such as OpenSSL. The following is an example sequence but this is in no way different than what you will already have done within your Kafka environment:

  1. Create private keys (server and client) and certificates using OpenSSL

2. Create trust chain (CA) certificate file using OpenSSL. For example, follow instructions at https://jamielinux.com/docs/openssl-certificate-authority/introduction.html 3. Create trust store with trust chain:

$ ~/dev/jdk1.8.0_112/bin/keytool -import -file ca-chain.cert.pem -alias localhost -keystore ca.truststore.jks -storepass 123456
  1. Create client/server keys and certificates, e.g.:

$ openssl genrsa -out intermediate/private/user1.key.pem 2048

$ openssl genrsa -out intermediate/private/server1.key.pem 2048

$ openssl req -new -sha256 -nodes -config intermediate/openssl.cnf -subj “/C=CA/ST=BC/O=jSonar/CN=aditya/emailAddress=user1@jsonar.com/L=Vancouver” -out intermediate/csr/user1.csr.pem -key intermediate/private/user1.key.pem

$ openssl req -new -sha256 -nodes -config intermediate/openssl.cnf -subj “/C=CA/ST=BC/O=jSonar/CN=localhost/emailAddress=user1@jsonar.com/L=Vancouver” -out intermediate/csr/server1.csr.pem -key intermediate/private/server1.key.pem

$ openssl ca -config intermediate/openssl.cnf -extensions usr_cert -days 365 -notext -md sha256 -in intermediate/csr/user1.csr.pem -out intermediate/certs/user1.cert.pem

$ openssl ca -config intermediate/openssl.cnf -extensions server_cert -days 365 -notext -md sha256 -in intermediate/csr/server1.csr.pem -out intermediate/certs/server1.cert.pem

$ openssl pkcs12 -export -in intermediate/certs/server1.cert.pem -inkey intermediate/private/server1.key.pem -out intermediate/certs/server1.pkcs12 -name localhost -noiter -nomaciter

  1. Create server keystore:

$ keytool -importkeystore -destkeystore sonar_key_store -srckeystore intermediate/certs/server1.pkcs12 -srcstoretype pkcs12 -alias localhost

$ openssl pkcs12 -export -in intermediate/certs/user1.cert.pem -inkey intermediate/private/user1.key.pem -out intermediate/certs/user1.pkcs12 -name localhost -noiter -nomaciter

  1. Create client keystore:

    $ /opt/jdk1.8.0_141/bin/keytool -importkeystore -destkeystore user_key_store -srckeystore intermediate/certs/user1.pkcs12 -srcstoretype pkcs12 -alias localhost

    $ openssl s_client -connect aditya:9093 -verify 10 -state -cert intermediate/certs/user1.cert.pem -key intermediate/private/user1.key.pem -CAfile intermediate/certs/ca-chain.cert.pem

    $ openssl pkcs12 -export -in user1.pem -out user1.pkcs12 -name user1 -noiter -nomaciter

    $ ~/dev/jdk1.8.0_112/bin/keytool -importkeystore

-destkeystore sonar_key_store -srckeystore user1.pkcs12 -srcstoretype pkcs12 -alias user1 -storepass 123456 -keypass 123456

Extract the certificate and key from jks keystores

In order to connect to the kafka consumer when the server configurations Everything we need is contained in the ‘kafka.client.keystore.jks’ file. To get an overview of its content you can call:

keytool -list -rfc -keystore kafka.client.keystore.jks

First, we will extract the client certificate:

keytool -exportcert -alias localhost -keystore kafka.client.keystore.jks -rfc -file certificate.pem -srcstorepass test1234 -deststorepass test1234

Next we will extract the client’s key. This is not supported directly by keytool, which is why we have to convert the keystore to pkcs12 format first and then extract the private key from that:

keytool -importkeystore -srckeystore kafka.client.keystore.jks -destkeystore cert_and_key.p12 -srcstoretype JKS -deststoretype PKCS12 -srcstorepass test1234 -deststorepass test1234 -srcalias localhost -srckeypass test1234 -destkeypass test1234 -noprompt

Now we convert it to key.pem:

openssl pkcs12 -in cert_and_key.p12 -nocerts -nodes -passin pass:test1234 -passout pass:test1234 | sed -n ‘/—-/,$p’ - > key.pem

kafka.client.keystore.jks - needs to be changes to the keystore file name in your system test1234 - needs to be changes to the password of your keyfile. These parameters can be omitted if you wish to be prompted for the passwords.

rsyslog-kafka - Consuming Data from Kafka

We use the rsyslog imkafka input module to read data from Kafka before inserting into the Guardium Big Data Intelligence security lake. The imkafka module implements an Apache Kafka consumer. This permits rsyslog to receive data from Kafka. For more information about imkafka visit imkafka.

Follow the steps below to enable the imkafka extension:

  1. Edit the relevant configuration in /etc/rsyslog.d/sonar/gateway/rulesets/kafka_forward.conf.

For Plain text:

topic=<"Give Topic Name Used in Producer">
broker=["<Broker IP>:<Port>"]

target="<Give IP/HostName of SonarG Machine>"

For Kafka with SSL:

topic=<"Give Topic Name Used in Producer">
broker=["<Broker IP>:<Port>"]
confParam=["security.protocol=SSL",
        "ssl.ca.location=<Path to CA Certificate>",
        "ssl.key.password=<Client Key Password>",
        "ssl.key.location=<Path to Client Key>",
        "ssl.certificate.location=<Path to Client Certificate>"]

target="<IP/HostName of SonarG Machine>"

Notes:

  • Port 9092 is used for Plaintext and 9093 is used for SSL.
  • “broker” value is a list of one or more IP:Port combinations, e.g. : [“<Broker IP>:9092”,”<Broker IP>:9093”]

To add multiple topics, you can define multiple inputs, for example:

input(type="imkafka"
        topic="Topic1"
        broker=["<Broker IP or resolvable hostname>:9092"]
        consumergroup="default"
        ruleset="pRuleset"
     )

input(type="imkafka"
        topic="Topic2"
        broker=["<Broker IP or resolvable hostname>:9092"]
        consumergroup="default"
        ruleset="pRuleset"
)
  1. Add the kafka configuration to sonargateway.

Uncomment the following lines in /etc/rsyslog.d/sonargateway.conf:

#$IncludeConfig /etc/rsyslog.d/sonar/gateway/rulesets/kafka_forward.conf
#$IncludeConfig /etc/rsyslog.d/sonar/gateway/rulesets/kafka_consumer.conf
  1. Restart rsyslog for the change to take effect:
sudo systemctl restart rsyslog

The messages collected by imkafka are forwarded to SonarGateway, for example:

Sep 25 17:17:26 localhost {“foo”: 1, “bar”: 2}

The data can then be transformed, mapped and directed to the appropriate Guardium Big Data Intelligence collections using SonarGateway (see SonarGateway documentation).

More information about setting up apache Kafka can be found here.