Change data capture (CDC) is a well-established software design pattern for a system that monitors and captures data changes so that other software can respond to those events. Using KafkaConnect, along with Debezium Connectors and the Apache Camel Kafka Connector, we can build a configuration-driven data pipeline to bridge traditional data stores and new event-driven architectures.
This article walks through a simple example.
Why use change data capture?
The advantages of CDC compared to a simple poll-based or query-based process are:
- All changes are captured: Intermediary changes (i.e., updates and deletes) between two runs of the poll loop might otherwise be missed.
- Low overhead: Near real-time reaction to data changes avoids increased CPU load due to frequent polling.
- No data model impact: Timestamp columns are no longer needed to determine the last data update.
The example
In this example, we build a simple cloud-native CDC pipeline from scratch. The goal is to send every change from a simple customers
table to a message queue for further processing. Once the infrastructure is provisioned, we will implement the data pipeline using configuration files, without writing any code. This is the high-level overview of the architecture:
MySQL --> KafkaConnect [Worker0JVM(TaskA0, TaskB0, TaskB1),...] --> AMQ | Kafka (offsets, config, status)
Even if this is a simple use case, it includes the deployment and setup of different integration products on top of OpenShift 4: AMQ 7.7 (message broker), AMQ Streams 1.5 (Kafka and KafkaConnect) and Debezium (CDC engine). This infrastructure will make sense when expanding this simple example into a group of microservices communicating with each other by sending events. The nice thing about using Debezium is that you don't need to change your application's logic.
You can find all of the required configuration files here.
Note: For the sake of simplicity, we use unsecured components that are not suitable for production use. You might want to add TLS encryption setup and also increase resources for any serious use.
Setting up
To provision the infrastructure for our CDC pipeline, we have to set up the OpenShift project, MySQL source system, AMQ sink system, AMQ Streams, and the Debezium connector. You can also use Red Hat CodeReady Containers (CRC), but make sure to reserve eight cores and at least 14 GB of memory.
We will do all of this from the command line, so open your favorite shell. No prompt is added so that you can easily copy and paste the code you find here. First, set your environment variables:
API_ENDPOINT="https://api.crc.testing:6443" ADMIN_NAME="kubeadmin" ADMIN_PASS="7z6T5-qmTth-oxaoD-p3xQF" USER_NAME="developer" USER_PASS="developer" PROJECT_NAME="cdc"
Next, create a new project:
TMP="/tmp/ocp" && rm -rf $TMP && mkdir -p $TMP oc login -u $ADMIN_NAME -p $ADMIN_PASS $API_ENDPOINT oc new-project $PROJECT_NAME oc adm policy add-role-to-user admin $USER_NAME
Then set up Red Hat registry authentication (use your own credentials here):
REG_SECRET="registry-secret" oc create secret docker-registry $REG_SECRET \ --docker-server="registry.redhat.io" \ --docker-username="my-user" \ --docker-password="my-pass" oc secrets link default $REG_SECRET --for=pull oc secrets link builder $REG_SECRET --for=pull oc secrets link deployer $REG_SECRET --for=pull
MySQL setup (source system)
Our source system is MySQL. Here, we use a custom DeploymentConfig that contains a post lifecycle hook to initialize our database and enable binary log access for the Debezium user. To start, create your ConfigMap and your secret:
oc create configmap db-config --from-file=./mysql/my.cnf oc create configmap db-init --from-file=./mysql/initdb.sql oc create secret generic db-creds \ --from-literal=database-name=cdcdb \ --from-literal=database-user=cdcadmin \ --from-literal=database-password=cdcadmin \ --from-literal=database-admin-password=cdcadmin
Next, deploy your resources:
oc create -f ./mysql/my-mysql.yaml
Check the status:
MYSQL_POD=$(oc get pods | grep my-mysql | grep Running | cut -d " " -f1) oc exec -i $MYSQL_POD -- /bin/sh -c 'MYSQL_PWD="cdcadmin" $MYSQL_PREFIX/bin/mysql -u cdcadmin cdcdb -e "SELECT * FROM customers"'
Make some data changes:
oc exec -i $MYSQL_POD -- /bin/sh -c 'MYSQL_PWD="cdcadmin" $MYSQL_PREFIX/bin/mysql -u cdcadmin cdcdb -e \ "INSERT INTO customers (first_name, last_name, email) VALUES (\"John\", \"Doe\", \"jdoe@example.com\")"' oc exec -i $MYSQL_POD -- /bin/sh -c 'MYSQL_PWD="cdcadmin" $MYSQL_PREFIX/bin/mysql -u cdcadmin cdcdb -e \ "UPDATE customers SET first_name = \"Jane\" WHERE id = 1"' oc exec -i $MYSQL_POD -- /bin/sh -c 'MYSQL_PWD="cdcadmin" $MYSQL_PREFIX/bin/mysql -u cdcadmin cdcdb -e \ "INSERT INTO customers (first_name, last_name, email) VALUES (\"Chuck\", \"Norris\", \"cnorris@example.com\")"'
AMQ Broker setup (sink system)
Our destination system is the AMQ message broker, and you can download Red Hat AMQ broker from the developer portal for free. Here, we simply create a single instance broker and our destination queue:
mkdir $TMP/amq unzip -qq /path/to/amq-broker-operator-7.7.0-ocp-install-examples.zip -d $TMP/amq AMQ_DEPLOY="$(find $TMP/amq -name "deploy" -type d)"
Deploy the Operator:
oc create -f $AMQ_DEPLOY/service_account.yaml oc create -f $AMQ_DEPLOY/role.yaml oc create -f $AMQ_DEPLOY/role_binding.yaml sed -i -e "s/v2alpha1/v2alpha2/g" $AMQ_DEPLOY/crds/broker_v2alpha1_activemqartemis_crd.yaml sed -i -e "s/v2alpha1/v2alpha2/g" $AMQ_DEPLOY/crds/broker_v2alpha1_activemqartemisaddress_crd.yaml oc apply -f $AMQ_DEPLOY/crds oc secrets link amq-broker-operator $REG_SECRET --for=pull oc create -f $AMQ_DEPLOY/operator.yaml
Deploy the resources:
oc create -f ./amq/my-broker.yaml
Configure it to create the address only when the broker pod is running:
oc create -f ./amq/my-address.yaml
Check the status:
oc get activemqartemises oc get activemqartemisaddresses
AMQ Streams setup (Kafka)
You can also download Red Hat AMQ Streams on the same portal page, which is required to run KafkaConnect. Here we create a simple cluster with just one node. There is no need to create any topic here because Debezium will take care of this, creating its internal topics and our destination topic with the pattern serverName.databaseName.tableName
.
mkdir $TMP/streams unzip -qq /path/to/amq-streams-1.5.0-ocp-install-examples.zip -d $TMP/streams STREAMS_DEPLOY="$(find $TMP/streams -name "install" -type d)"
Deploy the Operator:
sed -i -e "s/namespace: .*/namespace: $PROJECT_NAME/g" $STREAMS_DEPLOY/cluster-operator/*RoleBinding*.yaml oc apply -f $STREAMS_DEPLOY/cluster-operator oc set env deploy/strimzi-cluster-operator STRIMZI_NAMESPACE=$PROJECT_NAME oc secrets link builder $REG_SECRET --for=pull oc secrets link strimzi-cluster-operator $REG_SECRET --for=pull oc set env deploy/strimzi-cluster-operator STRIMZI_IMAGE_PULL_SECRETS=$REG_SECRET oc apply -f $STREAMS_DEPLOY/strimzi-admin oc adm policy add-cluster-role-to-user strimzi-admin $USER_NAME
Deploy the resources:
oc apply -f ./streams/my-kafka.yaml
Check the status:
oc get kafkas
AMQ Streams setup (KafkaConnect)
From the same AMQ Streams package, we can also deploy our KafkaConnect custom image. We will build it on top of the official one, adding our specific connector plugins. In this case, we will add Debezium MySQL Connector and Camel Kafka SJMS2 Connector (I'm using the latest upstream releases for convenience, but you can download Red Hat releases from the portal).
Note: The nice thing about this new Camel sub-project is that you can use all 300+ components as Kafka connectors, to integrate with almost any external system.
Once your connectors are up and running (see the status from the describe
command), you can make other changes to the customers
table and see if they are streamed to the queue by using the broker web console:
KAFKA_CLUSTER="my-kafka-cluster" CONNECTOR_URLS=( "https://repo.maven.apache.org/maven2/io/debezium/debezium-connector-mysql/1.1.2.Final/debezium-connector-mysql-1.1.2.Final-plugin.zip" "https://repository.apache.org/content/groups/public/org/apache/camel/kafkaconnector/camel-sjms2-kafka-connector/0.3.0/camel-sjms2-kafka-connector-0.3.0-package.zip" ) CONNECTORS="$TMP/connectors" && mkdir -p $CONNECTORS for url in "${CONNECTOR_URLS[@]}"; do curl -sL $url -o $CONNECTORS/file.zip && unzip -qq $CONNECTORS/file.zip -d $CONNECTORS done sleep 2 rm -rf $CONNECTORS/file.zip
Deploy the resources:
oc create secret generic debezium-config --from-file=./streams/connectors/mysql.properties oc create secret generic camel-config --from-file=./streams/connectors/amq.properties oc apply -f ./streams/my-connect-s2i.yaml
Start the custom image build only when connect cluster is running:
oc start-build my-connect-cluster-connect --from-dir $CONNECTORS --follow
Check the status:
oc get kafkaconnects2i
These are all running pods up to this point:
amq-broker-operator-5d4559677-dpzf5 1/1 Running 0 21m my-broker-ss-0 1/1 Running 0 19m my-connect-cluster-connect-2-xr58q 1/1 Running 0 6m28s my-kafka-cluster-entity-operator-56c9868474-kfdx2 3/3 Running 1 14m my-kafka-cluster-kafka-0 2/2 Running 0 14m my-kafka-cluster-zookeeper-0 1/1 Running 0 15m my-mysql-1-vmbbw 1/1 Running 0 25m strimzi-cluster-operator-666fcd8b96-q8thc 1/1 Running 0 15m
Now that we have our infrastructure ready, we can finally configure the CDC pipeline:
oc apply -f ./streams/connectors/mysql-source.yaml oc apply -f ./streams/connectors/amq-sink.yaml
Check the status:
oc get kafkaconnectors oc describe kafkaconnector mysql-source oc describe kafkaconnector amq-sink CONNECT_POD=$(oc get pods | grep my-connect-cluster | grep Running | cut -d " " -f1) oc logs $CONNECT_POD oc get kafkatopics oc exec -i $KAFKA_CLUSTER-kafka-0 -c kafka -- bin/kafka-console-consumer.sh \ --bootstrap-server my-kafka-cluster-kafka-bootstrap:9092 --topic my-mysql.cdcdb.customers --from-beginning
Open the AMQ web console route to check the queue's contents:
echo http://$(oc get routes my-broker-wconsj-0-svc-rte -o=jsonpath='{.status.ingress[0].host}{"\n"}')/console
Cleanup
When you have finished experimenting with your new cloud-native CDC pipeline, you can delete the whole project with the following commands and free some resources:
rm -rf $TMP oc delete project $PROJECT_NAME oc delete crd/activemqartemises.broker.amq.io oc delete crd/activemqartemisaddresses.broker.amq.io oc delete crd/activemqartemisscaledowns.broker.amq.io oc delete crd -l app=strimzi oc delete clusterrolebinding -l app=strimzi oc delete clusterrole -l app=strimzi
Considerations
No matter what technology you use, the change data capture process must run as a single thread to maintain ordering. Since Debezium records the log offset asynchronously, any final consumer of these changes must be idempotent.
A benefit of running on top of KafkaConnect in distributed mode is that you have a fault-tolerant CDC process. Debezium offers great performance because of the access to the data source's internal transaction log, but there is no standard for it, so a change to the implementation may require a plug-in rewrite. This also means that every data source has its own procedure for enabling access to the transaction log.
Connectors configuration allows you to transform message payloads by using Single Message Transformations (SMTs). These can be chained and extended with custom implementations, but they are actually designed for simple modifications. Long chains of SMTs are hard to maintain and make sense of. Moreover, transformations are synchronous and applied on each message, so you can slow down the streaming pipeline with heavy processing or external service calls.
In cases where you need to do heavy processing, split, enrich, aggregate records, or call external services, you should use a stream processing layer between connectors such as Kafka Streams or just plain Camel. Just remember that Kafka Streams creates internal topics and you are forced to put transformed data back into Kafka (data duplication), while this approach is just an option when using Camel.
Last updated: June 26, 2020