10-step approach to migrate Confluent Kafka to Amazon MSK for a B2B digital sales leader - Impetus

10-step approach to migrate Confluent Kafka to Amazon MSK for a B2B digital sales leader

Many enterprises want to migrate from Confluent to Amazon MSK to scale storage capacity, save operational expenses, and enhance network security. Impetus Technologies, one of the ten launch partners for Amazon Managed Streaming for Apache Kafka (MSK) Delivery specialization, helped a global market leader in B2B digital sales migrate from Confluent to MSK.

The B2B digital sales leader helps enterprises maximize recurring revenue and provides sales teams, executives, and channel partners with a complete contract and subscription renewal system. Their existing on-premises platform integrates with the customer’s backend system to gather data and automate various sales processes like pricing and quotes.

The client had observed a data volume surge of up to 5 GB from 500 MB on weekends. Processing high volumes of data led to frequent outrages, causing message loss and impacting customer businesses. In addition, to support regular operations, the client had to maintain 2 DevOps resources. Moreover, with 4 different Kafka-dependent consumers in Confluent, there was a delay in the near-real-time data processing.

The B2B digital sales leader wanted to migrate their customer digital journey platform to Amazon MSK to address these challenges, ensure high data availability, better management and monitoring, and reduce server management and operational costs.

This blog illustrates the 10 steps that we used to migrate from Confluent Cloud to Amazon MSK, with zero downtime and message loss, to ensure automatic scalability in case of a sudden surge in data volumes.

Step 1

Estimating the size of the required MSK environment

Understanding the existing workload dynamics was necessary to size the Amazon MSK cluster. For that, from our existing Confluent Kafka clusters, we retrieved the following:

  • Ingestion rate, the rate at which data is generated and pushed to the Kafka queue, by:
    • Considering broker-level metric
      kafka.server:type=BrokerTopicMetrics,name=BytesInPerSec
    • Choosing the average value for individual brokers
    • Aggregating the value across all brokers present in the cluster
    • Considering broker-level metric
  • Consumption rate, the rate at which data is consumed from the Kafka queue, by:
    • Considering broker level metric
      kafka.server:type=BrokerTopicMetrics,name=BytesOutPerSec
    • Choosing the average value for individual broker
    • Aggregating the value across all brokers present in the cluster
  • Data replication strategy: Use the property “default.replication.factor” and consider the highest value of the replication factor configured using the global cluster parameter for individual topics.
  • Data retention strategy and disk utilization percentage on target cluster: Use the property “log.retention.ms” to consider the global highest data retention value and “retention.ms” to get the data retention value at the topic level.

Based on the above metrics, calculated the disk utilization percentage on the target cluster and estimated that a 500 GB cluster would be ideal for the platform.

Step 2

Setting up the Amazon MSK cluster

According to the estimate in Step 1, the Amazon MSK cluster was set up.

Step 3

Migrating Confluent topics to Amazon MSK

Using MirrorMaker 2.0 (MM2), we migrated Confluent Kafka clusters to Amazon MSK. The tool uses a Kafka consumer to consume messages from the source Confluent Kafka cluster and re-publishes them to the target Amazon MSK cluster, thereby eliminating any downtime. Migrating involved the following steps:

1. Setting up MM2 cluster on EC2 instance

  • Configured MM2 on EC2 instance
  • Configured source Confluent Kafka and target Amazon MSK cluster details in MM2 configuration properties file.

A sample properties file is given below:

#Apache Kafka clusters

clusters = #comma separated list of Apache Kafka cluster aliases
source.bootstrap.servers = :9092
target.bootstrap.servers = :9092
source -> target.enabled = true

Source and target cluster configuration

source.config.storage.replication.factor = 1
target.config.storage.replication.factor = 1
source.offset.storage.replication.factor = 1
target.offset.storage.replication.factor = 1

Mirror Maker configuration

offset-sync.topoc.replication.factor = 1
heartbeat.topic.replication.factor = 1
checkpoint.topic.replication.factor = 1
topics = .*
groups = .*

tasks.max = 1
replication.factor = 1
refresh.topics.enabled = true
sync.topic.configs.enabled = true

Enable heartbeats and checkpoints

source->target.emit.heartbeats.enabled = true
source->target.emit.checkpoints.enabled = true

2. Replicating Kafka topics on the target environment

Next, we extracted the existing topics from the source Confluent Kafka cluster. A sample command used is given below:

bin/kafka-topics –describe –zookeeper <ZooKeeper_Url> | grep ReplicationFactor | awk ‘{print “/opt/confluent/confluent-5.5.1/bin/kafka-topics –create –bootstrap-server <Broker_Url> –topic ” $1 ” –replication-factor ” $2 ” –partitions ” $3 ” –config ” $4}’ >> ./create_topic_list

Example:

bin/kafka-topics –describe –zookeeper zkp-1.int.dev:2181, zkp-2.int.dev:2181, zkp-3.int.dev:2181/env/kafka | grep ReplicationFactor | awk ‘{print”/opt/confluent/confluent-5.5.1/bin/kafka-topics –create –bootstrap-server 10.3.20.61:9092 –topic ” $1 ” –replication-factor ” $2 ” –partitions ” $3 ” –config ” $4}’ >> ./create_topic_list

Broker_Url is the target cluster broker URL, and create_topic_list will have all the topic creation commands ready for the target cluster.

The configuration parameters used are given below:

  1. $1: List of topics name we wanted to extract
  2. $2: Replication factor in the target Amazon MSK cluster
  3. $3: # of partitions we wanted to create in target Amazon MSK cluster
  4. $4: Configuration details of the target Amazon MSK cluster

Based on the above command, a sample output data is given below:

../../confluent-5.5.1/bin/kafka-topics –create –zookeeper z-1.kafka.us-east-1.amazonaws.com:2181,z-3.kafka.us-east-1.amazonaws.com:2181,z-2.kafka.us-east-1.amazonaws.com:2181 –topic <topic_name> –replication-factor 3 –partitions 1 –config min.insync.replicas=1 follower.replication.throttled.replicas=*

../../confluent-5.5.1/bin/kafka-topics –create –zookeeper z-1.kafka.us-east-1.amazonaws.com:2181,z-3.kafka.us-east-1.amazonaws.com:2181,z-2.kafka.us-east-1.amazonaws.com:2181 –topic <topic_name> –replication-factor 2 –partitions 1 –config min.insync.replicas=1 follower.replication.throttled.replicas=*

../../confluent-5.5.1/bin/kafka-topics –create –zookeeper z-1.kafka.us-east-1.amazonaws.com:2181,z-3.kafka.us-east-1.amazonaws.com:2181,z-2.kafka.us-east-1.amazonaws.com:2181 –topic <topic_name> –replication-factor 3 –partitions 1 –config min.insync.replicas=1

3. Creating topics in Amazon MSK

Create topics in Amazon MSK using command in create_topic_list file that are created in the previous step.

Step 4

Validating the target Amazon MSK cluster

We used Kafka Console Producer and Kafka Console Consumer to validate the target cluster using the following command:

bin/kafka-console-producer –topic –broker-list <broker-host:port>, we generated standard messages and pushed into specified topics. To read the messages from specified topic and validate them against the messages pushed from producer, we used the following command: kafka-console-consumer –topic orders –bootstrap-server broker:9092

Step 5

Repointing producers to the target MSK cluster

Once the Amazon MSK clusters are up and running with the required topics and configuration, we paused all producers and reconfigured the properties file to update the bootstrap brokers’ values of the target MSK cluster. Once complete, we started the producers with the updated configuration.

Step 6

Mirroring schema registry and Kafka Connect

For mirroring the schema registry and Kafka connect, MM2 takes one or more consumer configurations, a producer configuration, and either a whitelist or a blacklist. We pointed the consumer to the source environment Zookeeper and the producer to the mirror environment’s Zookeeper (or use the broker.list parameter).

You can use the following sample mirromaker.consumer.properties for Mirroring Schema Registry And Kafka Connect properties file for mirromaker.consumer.properties parameter value:

bin/kafka-mirror-maker –consumer.config mirromaker.consumer.properties –num.streams 1 –producer.config mirromaker.producer.properties –whitelist=”_schemas,connect-configs,connect-configs-es,connect-configs-jd,connect-configs-morph,connect-configs-s3,connect-configs-sfdc,connect-configs-streamx,connect-offsets,connect-offsets-es,connect-offsets-jd,connect-offsets-morph,connect-offsets-s3,connect-offsets-sfdc,connect-offsets-streamx,connect-status,connect-status-es,connect-status-jd,connect-status-morph,connect-status-s3,connect-status-sfdc,connect-status-stream”

Sample mirromaker.consumer.properties:

bootstrap.servers=
exclude.internal.topics=true
client.id=consumer-group
#consumer group id
group.id=consumer-group
auto.offset.reset=earliest

Sample mirromaker.producer.properties:

bootstrap.servers=
acks=all
max.request.size=400000
batch.size=50
max.in.flight.requests.per.connection=1
client.id=__admin_client
retries=3

The additional configuration properties are given below:

  • Whitelist or blacklist: MirrorMaker accepts either a whitelist OR a blacklist. Give a comma-separated list of topics that you want to mirror.
  • Number of consumption streams: Use the –num.streams option to specify the number of mirror consumer threads to create.
    Note: If you start multiple MirrorMaker processes, look at the partition distribution on the source cluster. If the number of consumption streams per MirrorMaker process is too high, some mirroring threads will be idle due to the consumer rebalancing algorithm (if they do not own any partitions for consumption).

Step 7

Mirroring application-specific topics

Use sample mirromaker.consumer.properties for Mirroring Application Specific Topics file for consumer.properties parameter value:

bin/kafka-mirror-maker –consumer.config mirromaker.consumer.properties –num.streams 1 –producer.config mirromaker.producer.properties –whitelist= “.sbx.*”

Repeat above command for all source topics. You can choose to execute this command using java regular expression as well.

Step 8

Relaunching MSK services with updated configuration

Launch new Kafka connect services, schema registry, and rest services for target MSK cluster after updating the configuration. Update Zookeeper and Broker URLs in connect-distributed.properties, schema-registry.properties, kafka-rest.properties files from AWS MSK.

  • To change Kafka Connect service configuration, follow these steps:
    • Edit connect-distributed.properties to change below properties
      bootstrap.servers=<BROKER-URLS COMMA SEPARATED for AWS MSK>
    • Execute below command to update the Kafka connect properties
      bin/connect-distributed /opt/confluent/config/connect/connect-distributed.properties
  • To change Kafka Schema Registry configuration, follow these steps:
    • Edit schema-registry. properties to change below properties
      kafkastore.connection.url=<ZOO-KEEPER-PATH USED FOR AWS MSK KAFKA CLUSTER>
    • Execute below command to update the Kafka schema registry properties
      bin/schema-registry-start /opt/confluent/config/registry/schema-registry.properties
  • To change the Kafka Rest Service configuration, follow these steps:
    • Edit kafka-rest.properties to change the below properties
      bootstrap.servers=<BROKER-URLS for AWS MSK>
    • Execute the below command to update the Kafka rest properties
      bin/kafka-rest-start /opt/confluent/config/rest/kafka-rest.properties
  • Edit below properties in environment-specific default files
    • #kafka configurations
      kafka.url: “<BROKER-URLS COMMA SEPARATED for AWS MSK>
      kafka.zookeeper.connect: “<ZOO-KEEPER-PATH USED FOR TARGET KAFKA CLUSTER SETUP
      e.g.msk-zkp-1:2181,msk-zkp-2:2181,msk-zkp-2:2181>”
      kafka.schema.registry.url: “<KAFKA_REGISTRY_URL FOR TARGET CLUSTER e.g http://msk-kafkaschreg>”

Step 9

Repointing consumers to the target Amazon MSK cluster

Stop all consumer services and reconfigure the properties file with bootstrap brokers values of the target MSK cluster. Now start the consumer services with the updated configuration.

Keep MirrorMaker running in the background and continue to use the source Confluent Kafka cluster environment.

Step 10

Monitoring the mirror lagging

Check the progress of mirroring by inspecting the lag between the last offset for each topic and the current offset from which MirrorMaker is consuming. Using kafka-consumer-groups.sh tool to check offset. Below is the sample command executed for the same.

bin/kafka-consumer-groups –bootstrap-server –group –describe
bin/kafka-consumer-groups –bootstrap-server :9092 –describe

Using the command: bin/kafka-console-producer –topic –broker-list <broker-host:port>, we generated standard messages and pushed into specified topics. To read the messages from specified topic and validate them against the messages pushed from producer, we used the following command: kafka-console-consumer –topic orders –bootstrap-server broker:9092

To find the consumer group name, look inside the mirrormaker-consumer.properties file for the group.id, and use its value. If there is no such key in the file, you can create it. For example, set group.id=mirrormaker-consumer-group.

If lag for all groups is ZERO, stop the MirrorMaker process

Monitoring an Amazon MSK cluster:

Amazon MSK gathers Apache Kafka metrics and sends them to Amazon CloudWatch, where you can view them. The different levels of monitoring that Amazon MSK provides are given below:

  • Monitoring with CloudWatch
    • DEFAULT Level monitoring
    • PER_BROKER Level monitoring
    • PER_TOPIC_PER_BROKER Level monitoring
  • Consumer-lag monitoring

Impact delivered:

By migrating open-source Confluent brokers to Amazon MSK, the client could scale storage capacity from 500 GB to 16 TiB per broker. MSK also ensured zero outrage and no downtime with automated load balancing. By migrating 5 Confluent environments to Amazon MSK in less than 2 months, the client could save more than $14000 monthly on operational expenses.

This is just one example of how we can help you take advantage of Kafka seamlessly using Amazon MSK. To know more about how we can help you choose the right tools to achieve your business goals, write to us at inquiry@impetus.com.

Choose a lab aligned to your Data & AI journey

Address your desired use case across critical analytic dimensions

  • Explore architecture options with experts

  • Ensure strategic alignment of business and technology

  • Architect an ideal solution for a pressing problem


  • Validate new or refactor existing architecture

  • Develop a prototype with expert guidance

  • Establish a roadmap to production


Learn more about how our work can support your enterprise

Author
Manoj Kumar