Created:  | Updated: 


Summary

This document will help users who wish to use Apache Kafka in conjunction with z/IRIS:

  • Configure their Kafka Cluster for z/IRIS.

  • Configure and use recommended settings for the Kafka Producer (z/IRIS z/OS Client).

  • Configure and use recommended settings for the Kafka Consumer (z/IRIS IronTap).


Kafka Cluster

Architecture

We suggest using:

  • an odd number of Zookeeper instances with a minimum of 3 instances.

  • at least 3 Kafka Brokers with a replication factor of 3 deployed on (physically) separate machines. This will exploit the replication factor effectively and ensure high availability and resilience.

Topic

For the purposes of z/IRIS, a Kafka topic is needed to enable producing and consuming of SMF data. To determine a suitable number of partitions for the topic the following should be considered:

  • The number of partitions should be a multiple of the number of Kafka Brokers to achieve proper Load Balancing between them. 

  • Because a partition can only be assigned to one consumer in a consumer group,  more partitions increases:

    • parallelism,

    • throughput,

    • the possibility to leverage more Kafka brokers in a cluster,

    • and scale capabilities. 

  • Fewer partitions means that fewer files are opened on the Kafka cluster for Zookeeper to manage.

Taking this into consideration, we recommend using around 15 Partitions for the topic. Select a suitable topic name, e.g. smf,  and create the topic with the help of the Kafka CLI:

Example
kafka-topics.sh --bootstrap-server <kafka-ip>:<kafka-port> --topic smf --partitions 15 --replication-factor 3 --create
BASH

To further reduce the risk of data loss, we suggest 2 In-Sync Replicas, which can be configured after creating the topic smf by:

Example
kafka-configs.sh --zookeeper <zookeeper-ip>:<zookeeper-port> --entity-type topics --entity-name smf --add-config min.insync.replicas=2 --alter
BASH

Quotas

Once z/IRIS IronTap is set up to use the Kafka Cluster, we highly recommend using Apache Kafka's feature named Quotas, which protects against the possibility of monopolizing resources and causing network saturation. To achieve this, Kafka needs to identify IronTap (i.e. its Kafka consumers) in the Kafka cluster. This can be done by specifying a unique, logical application name (e.g. smf-consumer) for the client.id property in the IronTap application.conf file. Afterwards, Apache Kafka's Quotas can be set dynamically via the Kafka CLI. We recommend limiting the network bandwidth to around ~50 MB/s for IronTap:

Example
kafka-configs.sh --zookeeper <zookeeper-ip>:<zookeeper-port> --entity-type clients --entity-name <client-id> --add-config 'consumer_byte_rate=52428800‬' --alter
BASH


Kafka Consumer (z/IRIS IronTap)

Kafka Mode Requirements

A list of bootstrap servers of the Kafka Cluster needs to be supplied. This can be configured using the bootstrap.servers parameter in the IronTap's application.conf file.

Default Configuration

We provide a default configuration, that can be overwritten in the file application.conf:

reference.conf
########################################################################################################################
################################################ CONSUMER CONFIGURATION ################################################
################################################ for Apache Kafka 2.7.0 ################################################
########################################################################################################################
irontap {
    sources{
        kafka{
            # Name of the Topic, where the Consumer should describe to.
            topicPattern="smf"
            consumer{
                ########################################## IMPORTANT SETTINGS ##########################################
                # A list of host/port pairs to use for establishing the initial connection to the Kafka Cluster.
                ### NOTE: Kafka Cluster needs to be adjusted.
                bootstrap.servers="localhost:32400"
 
                # Unique string to identify the Consumer's Consumer Group. This is necessary to enable Load Balancing,
                # since each Consumer within a Consumer Group reads from exclusive Partitions.
                group.id="smf-group"
 
                # Deserializer Class for Key and Value that implements the
                # org.apache.kafka.common.serialization.Deserializer interface.
                key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
                value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
 
                # Consumer can choose if its Offsets should be committed automatically in the background or manually.
                # The latter one allows to use asynchronous Offset processing, since it can be controlled when and under
                # which conditions Consumer Offsets should be committed. The default setting is true.
                enable.auto.commit=false
 
                # Determines the minimum amount of data the Broker should return for a fetch request. This will cause
                # the Broker to wait for larger amounts of data to accumulate which can improve server throughput at the
                # expense of a introducing a small delay. The default setting is 1 byte.
                fetch.min.bytes=32768
 
                # Describes the maximum amount of time the Broker will block before answering the fetch request if there
                # isn't sufficient data to control the introduced delay of 'fetch.min.bytes' by immediately satisfying
                # the requirement given by 'fetch.min.bytes'. The default setting is 500 ms.
                fetch.max.wait.ms=5
 
                # Close idle connections. The default setting is 540000ms (9 mins).
                connections.max.idle.ms=30000
 
                # Allows automatic topic creation on the Broker when subscribing or assigning the Consumer to a Topic. A
                # Topic being subscribed to will be automatically created only if the respective Broker allows for it
                # using 'auto.create.topics.enable' Broker configuration. The default setting is true.
                allow.auto.create.topics=false
 
                # The Consumer can choose what to do when there is no initial Offset in Kafka or if the current Offset
                # does not exist any more on the Broker (due e.g. data has expired). The default setting is latest.
                # Options are:
                # - earliest: Automatically reset its Offset to the earliest possible Offset.
                # - latest: Automatically reset its Offset to the latest possible Offset.
                # - none: Kafka throws an exception to the Consumer if no previous Offset is found.
                auto.offset.reset=latest
 
                ########################################################################################################
                ########################################### FURTHER SETTINGS ###########################################
                # The maximum amount of data the Broker should return for a fetch request. The default setting is
                # 52428800 (50 MB).
                fetch.max.bytes=52428800
 
                # The maximum amount of data per-partition the Broker will return. Messages are fetched in batches by
                # the Consumer. The default setting is 1048576 (1 MB).
                max.partition.fetch.bytes=1048576
 
                # Specifies the timeout for Consumer APIs that could block. This configuration is used as the default
                # timeout for all Consumer operations that do not explicitly accept a timeout parameter. The default
                # setting is 60000 ms (1 min).
                default.api.timeout.ms=60000
 
                # Controls the maximum amount of time the Consumer will wait for the response of a request. If the
                # response is not received before the timeout elapses the Consumer will resend the request if necessary.
                # The default setting is 30000 ms (30 s)
                request.timeout.ms=30000
 
                # Describes the amount of time to wait before attempting to retry a failed request to a given topic
                # partition. The default setting is 100 ms.
                retry.backoff.ms=100
 
                # The CRC32 of the consumed messages is checked automatically. This introduces some overhead, so it may
                # be disabled in cases seeking extreme performance. The default setting is true.
                check.crcs=true
 
                # Client ID in form of a string to pass to the server when making requests. The purpose is to be able to
                # to track the source of requests beyond just ip/port by allowing a logical application name to be
                # included in server-side request logging. The default setting is "".
                client.id=""
 
                # Determines the maximum number of records returned in a single call to method poll(). The default
                # setting is 500.
                max.poll.records=500
 
                # Describes the maximum delay between invocations of the method poll(). If the method poll() is not
                # called before expiration of this timeout, then the Consumer is considered failed. The default setting
                # is 300000 ms (5 mins).
                max.poll.interval.ms=300000
 
                # Describes the time period within a Consumer has to send a heartbeat before the Consumer is considered
                # to be dead by the Broker. The default setting is 10000 ms (10 s).
                session.timeout.ms=10000
 
                # Describes the expected time between heartbeats. The default setting is 3000 ms (3 s).
                heartbeat.interval.ms=3000
 
                # The period of time after which a refresh of metadata is forced even if there haven't been seen any
                # partition leadership changes to proactively discover any new Kafka Brokers or Partitions. The default
                # setting is 300000 ms (5 mins).
                metadata.max.age.ms=300000
 
                # Determines the maximum amount of time to wait when reconnection to a Kafka Broker that has repeatedly
                # failed to connect. If provided, the backoff per host will increase exponentially for each consecutive
                # connection failure, up to this maximum. The default setting is 1000 ms (1 s).
                reconnect.backoff.max.ms=1000
 
                # The base amount of time to wait before attempting to reconnect to a given host. The default setting is
                # 50 ms.
                reconnect.backoff.ms=50
 
                # The size of the TCP receive buffer to use when reading data. If the value is -1, the OS default will
                # be used. The default setting is 65536 (64 KB).
                receive.buffer.bytes=65536
 
                # The size of the TCP send buffer to use when sending data. If the value is -1, the OS default will be
                # used. The default setting is 131072 (128 KB).
                send.buffer.bytes=131072
            }
        }
    }
}
BASH



Kafka Producer (z/IRIS z/OS Client)

$APPHOME refers to the z/OS clients installation directory in z/OS Unix systems services

Kafka Mode Requirements

To stream SMF record via a z/IRIS Kafka producer on z/OS, the following properties must be configured in the $APPHOME/lib/ziris.comm.config file: 

For more information regarding the file ziris.comm.config, please refer to the Latest Administration and User Guide.

Required ziris.comm.config settings for Kafka streaming
mode=kafka
# kafkaTopic defines the Kafka Cluster Topic used to stream SMF records.
# Required if mode=kafka                                               
# syntax: kafkaTopic=kafka_topic_name                                  
kafkaTopic=ziris_prod_smf                    
# host refers to the host address destination for SMF record streaming. 
# If mode=kafka, host is a list of Kafka Bootstrap Servers that will overwrite 
# any bootstrap servers configured in the producer.properties file linked by kafkaConfig  
# syntax: host=kafka_bootstrap_server:port                                             
host=kafka_bootstrap_server:port[,kafka_bootstrap_server:port]
# kafkaConfig is an optional parameter that configures an alternate 
# Kafka producer.properties file to the z/IRIS z/OS default.             
# default: $APP_HOME/kafka/default-ziris-producer.properties
# syntax: kafkaConfig=/producer/properties/file             
# kafkaConfig=/usr/lpp/ziris/ziris-producer.properties        
BASH

The default configuration file: $APP_HOME/kafka/default-ziris-producer.properties, does not specify Kafka brokers in the bootstrap.servers parameter.

Default Kafka Producer Properties

The default Kafka producer properties for z/OS Clients are specified in:  $APP_HOME/kafka/ziris-default-producer.properties.

To use a different Kafka producer.properties configuration, simply specify the absolute path of the new producer.properties file in the kafkaConfig parameter of the ziris.comm.config file. Take note that the host parameter in ziris.comm.config overwrites the bootstrap.servers parameter in producer.properties.


default-ziris.producer.properties

####################################################################################
############################ DEFAULT PRODUCER CONFIGURATION ########################
############################# for Apache Kafka 2.7.0 ###############################
################################ z/IRIS z/OS Client ################################

# A list of host/port pairs to use for establishing the initial connection to the
# Kafka Cluster.
### NOTE: Kafka Cluster address needs to be adjusted.
#bootstrap.servers=localhost:9092

# Ensures that no duplicate messages will be introduced into Kafka due to network
# errors.
enable.idempotence=true

##################### Implied settings by enable.idempotence #####################
# Producer can choose between different required acknowledgements of data writes
# for data acquisition confirmation. When acks=0, the producer doesn't request any
# response from the Kafka Cluster. When acks=1, the producer requests an
# acknowledgement only from the leader. When acks=all, the producer requests
# acknowledgements from the leader and the corresponding replicas. The default
# setting is acks=1, however enable.idempotence requires acks=all.
acks=all

# Producer will automatically retry to send messages. The default and recommended
# setting is 2147483647 since Kafka 2.1.0. When enable.idempotence=true, it
# has to be a value > 0, otherwise an exception will be thrown.
retries=2147483647

# Restricts how many unacknowledged requests can be made by a producer parallel on
# a single connection before blocking. When enable.idempotence=true, then the
# default setting is 5, otherwise it can be a value between 1<=x<=5 where message
# ordering is ensured as well.
max.in.flight.requests.per.connection=5
##################################################################################

# In general enabled compression would reduce latency because of allowing faster
# data transfers, quicker replication across Kafka Brokers and better Kafka disk
# utilisation due to less data sizes at the expense of CPU cycles. It becomes more
# effective the bigger the batch of messages being sent to Kafka due to a higher
# compression ratio.
compression.type=none

# The total bytes of memory the producer can use to buffer records waiting to be
# sent to the server. The default setting is 33554432 bytes (32 MB).
buffer.memory=33554432

# When the producer produces faster than the Kafka Broker can process the data
# at the moment in time or is temporarily down, then data is going to be buffered
# in the buffer memory. If it fills completely up, then the producer will start to
# block by not producing new data while waiting. The producer will wait a certain
# amount of time until it throws a timeout exception. The default setting for this
# is 60000 ms (1 min).
max.block.ms=60000

# Determines the maximum number of bytes that can be included in a batch. Any
# message that is bigger than the batch size will be sent as soon as possible
# regardless of linger.ms and thus not be batched. The default setting is 16384
# (16 KB). Don't set the batch size too high, since a batch is allocated per
# partition which could possibly lead to wasting or even outrunning of memory.
batch.size=65536

# Describes the maximum size of a request in bytes. This setting will limit the
# number of message batches the producer will send in a single request to avoid
# sending huge requests. Thus it is also effectively a cap on the maximum record
# batch size. The default setting is 1048576 (1 MB).
max.request.size=1048576

# Producer will wait a small amount of time before sending. At the expense of
# introducing a small delay the chance of messages being sent together in a batch
# can be increased. The default setting is 0 ms.
linger.ms=15

# Close idle connections. The default setting is 540000 ms (9 mins).
connections.max.idle.ms=30000

# For a certain amount of time the producer will continue to retry to send a
# message. After that the producer will stop sending the message. When a message
# can't be acknowledged in this given time frame, then the message sending counts
# as failed. The value should be >= linger.ms + request.timeout.ms. The default
# setting is 120000 ms (2 mins).
delivery.timeout.ms=120000

# Controls the maximum amount of time the producer will wait for Kafka Leader's
# acknowledgement of a sent message. If the acknowledgement is not received in
# time, then the message will be sent again depending on retries and
# delivery.timeout.ms. The default setting is 30000 ms (30 s).
request.timeout.ms=30000

# Describes the amount of time to wait before attempting to retry a failed
# request to a given topic partition. The default setting is 100 ms.
retry.backoff.ms=100

# Client ID in form of a string to pass to the server when making requests. The
# purpose is to be able to track the source of requests beyond just ip/port by
# allowing a logical application name to be included in server-side request
# logging. The default setting is "".
client.id=""

# The size of the TCP receive buffer to use when reading data. If the value is
# -1, the OS default will be used. The default setting is 32768 (32 KB).
receive.buffer.bytes=32768

# The size of the TCP send buffer to use when sending data. If the value is
# -1, the OS default will be used. The default setting is 131072 (128 KB).
send.buffer.bytes=131072

# The period of time after which a refresh of metadata is forced even if there
# haven't been seen any partition leadership changes to proactively discover any
# new Kafka Brokers or Partitions. The default setting is 300000 ms (5 mins).
metadata.max.age.ms=300000

# Determines the maximum amount of time to wait when reconnection to a Kafka
# Broker that has repeatedly failed to connect. If provided, the backoff per
# host will increase exponentially for each consecutive connection failure, up
# to this maximum. The default setting is 1000 ms (1 s).
reconnect.backoff.max.ms=1000

# The base amount of time to wait before attempting to reconnect to a given host.
# The default setting is 50 ms.
reconnect.backoff.ms=50
BASH

Required z/IRIS Kafka Producer Properties

The following properties are required for all z/IRIS z/OS Clients where "mode=kafka" is set, and should be activated and present in all alternate producer.properties. Failure to configure these properties may result in erroneous behavior, reduced reliability and/or reduced performance: 

Property

Value

Requirement description

enable.idempotence

true

The order in which SMF records are read and sent must be maintained to achieve correlation of data within SMF records. Enforcing the order of records sent via the Kafka Producer used by the z/IRIS z/OS Client is achieved by setting the Producer configuration enable.idempotence=true