Real time CDC from Oracle to PostgreSQL using Debezium, Docker Compose with Grafana

This Proof of Concept demonstrates Debezium’s ability to enable real-time data synchronization between Oracle and PostgreSQL databases through CDC. In addition to replicating data, the solution supports conditional data filtering. A secondary flow is also configured to support analytics and monitoring, feeding live stats into Grafana dashboards.

Read more

Simple diagram-

Read more
Read more

Configuration Steps

Read more

Following the different files, we will be using for this demo

Read more
Read more

Get Oracle Docker Image

Read more

Oracle docker images are not available in public Docker hub repository, it is hosted in Oracle’s own repository. Here are the steps to get the image

Read more
  • Visit oracle container repo — https://container-registry.oracle.com/ords/f?p=113:1:117569610822412:::1:P1_BUSINESS_AREA:3
  • Sign in
  • Accept T&C for enterprise db
  • Use the following command line commands to pull the image
  • docker login container-registry.oracle.com (use the same user/password as web login)
  • docker pull container-registry.oracle.com/database/enterprise:19.3.0.0
Read more

Create Debezium Connect Image

Read more

As Oracle is not open source, the Debezium connect image is not comes with Oracle driver integrated with it. Need to customise the image with Oracle driver & also Kafka connect for Sink Connector

Read more

Download Oracle driver & unzip at oracle_instantclient/ — https://download.oracle.com/otn_software/linux/instantclient/1914000/instantclient-basic-linux.x64-19.14.0.0.0dbru.zip

Read more

Download Sink connector & unzip at confluentinc-kafka-connect-jdbc-10.3.3/ — https://www.confluent.io/hub/confluentinc/kafka-connect-jdbc

Read more

Download Scripting dependency & unzip at debezium-scripting/ — https://repo1.maven.org/maven2/io/debezium/debezium-scripting/1.8.1.Final/debezium-scripting-1.8.1.Final.tar.gz

Read more

Download following groovy dependencies for filter expression at groovy/

Read more

https://repo1.maven.org/maven2/org/codehaus/groovy/groovy-jsr223/3.0.9/groovy-jsr223-3.0.9.jar

Read more

https://repo1.maven.org/maven2/org/codehaus/groovy/groovy-json/3.0.9/groovy-json-3.0.9.jar

Read more

https://repo1.maven.org/maven2/org/codehaus/groovy/groovy/3.0.9/groovy-3.0.9.jar

Read more
ARG DEBEZIUM_VERSION
FROM debezium/connect:$DEBEZIUM_VERSION
ENV KAFKA_CONNECT_JDBC_DIR=$KAFKA_CONNECT_PLUGINS_DIR/kafka-connect-jdbc
ARG JMX_AGENT_VERSION
RUN mkdir /kafka/etc && cd /kafka/etc &&\
curl -so jmx_prometheus_javaagent.jar \
https://repo1.maven.org/maven2/io/prometheus/jmx/jmx_prometheus_javaagent/$JMX_AGENT_VERSION/jmx_prometheus_javaagent-$JMX_AGENT_VERSION.jar
COPY config.yml /kafka/etc/config.yml
USER kafka
# Add Oracle client and drivers
COPY oracle_instantclient/xstreams.jar /kafka/libs
COPY oracle_instantclient/ojdbc8.jar /kafka/libs
# Add Confluent kafka connect for jdbc sink connector
COPY confluentinc-kafka-connect-jdbc-10.3.3/lib/*.jar /kafka/connect/kafka-connect-jdbc/
COPY confluentinc-connect-transforms-1.4.2/lib/*.jar /kafka/connect/kafka-connect-transforms/
# Add filter dependencies
COPY debezium-scripting/*.jar /kafka/connect/debezium-connector-oracle/
COPY groovy/*.jar /kafka/connect/debezium-connector-oracle/
Read more

Run Containers

Read more

Following is the docker compose file to initiate all required docker containers.

Read more

If you are running Docker Desktop, it may choke the resources as there are seven Docker containers in the compose file. You may need to increase the memory to 6g in the Docker Desktop settings.

Read more

Oracle container may take ~20 minutes to spin up. Use volume mount for oracle container to faster initialisation second time onwards.

Read more
version: '2'
services:
oracledb19:
image: container-registry.oracle.com/database/enterprise:19.3.0.0
ports:
- 1521:1521
environment:
- ORACLE_PWD=top_secret
volumes:
- /tmp/oradata:/opt/oracle/oradata
postgres:
image: debezium/postgres:9.6
ports:
- 5432:5432
environment:
- POSTGRES_USER=postgresuser
- POSTGRES_PASSWORD=postgrespw
- POSTGRES_DB=inventory
zookeeper:
image: debezium/zookeeper:${DEBEZIUM_VERSION}
ports:
- 2181:2181
- 2888:2888
- 3888:3888
kafka:
image: debezium/kafka:${DEBEZIUM_VERSION}
ports:
- 9092:9092
links:
- zookeeper
environment:
- ZOOKEEPER_CONNECT=zookeeper:2181
depends_on:
- zookeeper
prometheus:
build:
context: debezium-prometheus
args:
PROMETHEUS_VERSION: v2.26.0
ports:
- 9090:9090
links:
- connect
depends_on:
- connect
grafana:
build:
context: debezium-grafana
args:
GRAFANA_VERSION: 7.5.5
ports:
- 3000:3000
links:
- prometheus
depends_on:
- prometheus
environment:
- DS_PROMETHEUS=prometheus
connect:
image: debezium/connect-with-oracle-jdbc:${DEBEZIUM_VERSION}
build:
context: debezium-with-oracle-jdbc
args:
DEBEZIUM_VERSION: ${DEBEZIUM_VERSION}
JMX_AGENT_VERSION: 0.15.0
ports:
- 8083:8083
- 5005:5005
- 1976:1976
links:
- kafka
depends_on:
- kafka
environment:
- BOOTSTRAP_SERVERS=kafka:9092
- GROUP_ID=1
- CONFIG_STORAGE_TOPIC=my_connect_configs
- OFFSET_STORAGE_TOPIC=my_connect_offsets
- STATUS_STORAGE_TOPIC=my_connect_statuses
- LD_LIBRARY_PATH=/instant_client
- KAFKA_DEBUG=true
- DEBUG_SUSPEND_FLAG=n
- JAVA_DEBUG_PORT=0.0.0.0:5005
- KAFKA_OPTS=-javaagent:/kafka/etc/jmx_prometheus_javaagent.jar=8080:/kafka/etc/config.yml
- JMXPORT=1976
- JMXHOST=localhost
Read more

export DEBEZIUM_VERSION=1.8docker-compose -f docker-compose-oracle.yaml up — build

Read more

Setup Log Miner in Oracle

Read more

This process enables sets up the process of LogMining in Oracle. It basically sets up the users & required credentials to facilitate LogMining.

Read more

docker exec -it debezium_oracledb19_1 /bin/bashmkdir /opt/oracle/oradata/recovery_areacurl https://raw.githubusercontent.com/debezium/oracle-vagrant-box/main/setup-logminer.sh | sh

Read more

Populate Dummy Data

Read more

In order to play around the CDC capabilities, let’s populate the database with few tables & records. The following script creates tables CUSTOMERS, ORDERS, PRODUCTS, PRODUCTS_ON_HAND and populates few records in each tables.

Read more

cat inventory.sql | sqlplus debezium/dbz@//localhost:1521/ORCLPDB1

Read more

Register Connectors

Read more

This connector ‘inventory-connector’ is a OracleConnector which start the LogMiner process, detects the changes and pushes messages to Kafka topic. By default, separate topic will be created based on the tables in schemas.

Read more

Route SMT

Read more

By default, the topic name is created as <Server Name>.<Schema>.<Table Name> but this format is not recognised by the Jdbc Sink Connector. So we need to transform the topic name to a simples one <Table Name>.

Read more
{
"name": "inventory-connector",
"config": {
"connector.class" : "io.debezium.connector.oracle.OracleConnector",
"tasks.max" : "1",
"database.server.name" : "server1",
"database.hostname" : "oracledb19",
"database.port" : "1521",
"database.user" : "c##dbzuser",
"database.password" : "dbz",
"database.dbname" : "ORCLCDB",
"database.pdb.name" : "ORCLPDB1",
"database.connection.adapter" : "logminer",
"database.history.kafka.bootstrap.servers" : "kafka:9092",
"database.history.kafka.topic": "schema-changes.inventory",
"transforms": "route",
"transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
"transforms.route.replacement": "$3"
}
}
Read more

view rawregister-oracle-logminer.json

Read more

Filter SMT

Read more

Optionally you may include filter expression to filter events based on groovy expressions on the content.

Read more

This following connector ‘jdbc-sink’ is a Sink Connector which listens to the change events from the Kafka topics and pushes to PostgreSQL Database.

Read more
{
"name": "jdbc-sink",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "CUSTOMERS,ORDERS,PRODUCTS,PRODUCTS_ON_HAND",
"connection.url": "jdbc:postgresql://postgres:5432/inventory?user=postgresuser&password=postgrespw",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"auto.create": "true",
"insert.mode": "upsert",
"delete.enabled": "true",
"pk.mode": "record_key"
}
}
Read more

view rawjdbc-sink.json

Read more

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-oracle-logminer.json

Read more

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ -d @jdbc-sink.json

Read more

Sample Database Operations

Read more

Initial Snapshot & Sync

Read more

By default, Oracle connector creates the read events from all tables and pushes to Kafka topics. This behaviour may be changed to setting “snapshot.mode” to “schema_only” where only initial sync will not happen.

Read more

Oracle Database

Read more
Read more

PostgreSQL Database

Read more
Read more

Insert Data in Oracle

Read more

INSERT INTO DEBEZIUM.CUSTOMERS VALUES (1010, ‘Test’, ‘Test’, ‘test@example.com’);

Read more

Oracle Database

Read more
Read more

CDC Event Generated

Read more

{"schema": {"type": "struct","fields": [{"type": "struct","fields": [{"type": "int16","optional": false,"field": "ID"},{"type": "string","optional": false,"field": "FIRST_NAME"},{"type": "string","optional": false,"field": "LAST_NAME"},{"type": "string","optional": false,"field": "EMAIL"}],"optional": true,"name": "server1.DEBEZIUM.CUSTOMERS.Value","field": "before"},{"type": "struct","fields": [{"type": "int16","optional": false,"field": "ID"},{"type": "string","optional": false,"field": "FIRST_NAME"},{"type": "string","optional": false,"field": "LAST_NAME"},{"type": "string","optional": false,"field": "EMAIL"}],"optional": true,"name": "server1.DEBEZIUM.CUSTOMERS.Value","field": "after"},{"type": "struct","fields": [{"type": "string","optional": false,"field": "version"},{"type": "string","optional": false,"field": "connector"},{"type": "string","optional": false,"field": "name"},{"type": "int64","optional": false,"field": "ts_ms"},{"type": "string","optional": true,"name": "io.debezium.data.Enum","version": 1,"parameters": {"allowed": "true,last,false,incremental"},"default": "false","field": "snapshot"},{"type": "string","optional": false,"field": "db"},{"type": "string","optional": true,"field": "sequence"},{"type": "string","optional": false,"field": "schema"},{"type": "string","optional": false,"field": "table"},{"type": "string","optional": true,"field": "txId"},{"type": "string","optional": true,"field": "scn"},{"type": "string","optional": true,"field": "commit_scn"},{"type": "string","optional": true,"field": "lcr_position"}],"optional": false,"name": "io.debezium.connector.oracle.Source","field": "source"},{"type": "string","optional": false,"field": "op"},{"type": "int64","optional": true,"field": "ts_ms"},{"type": "struct","fields": [{"type": "string","optional": false,"field": "id"},{"type": "int64","optional": false,"field": "total_order"},{"type": "int64","optional": false,"field": "data_collection_order"}],"optional": true,"field": "transaction"}],"optional": false,"name": "server1.DEBEZIUM.CUSTOMERS.Envelope"},"payload": {"before": null,"after": {"ID": 1010,"FIRST_NAME": "Test","LAST_NAME": "Test","EMAIL": "test@example.com"},"source": {"version": "1.8.1.Final","connector": "oracle","name": "server1","ts_ms": 1645967024000,"snapshot": "false","db": "ORCLPDB1","sequence": null,"schema": "DEBEZIUM","table": "CUSTOMERS","txId": "09002100b3020000","scn": "5494041","commit_scn": "5494042","lcr_position": null},"op": "c","ts_ms": 1645967029112,"transaction": null}}

Read more

PostgreSQL Database

Read more
Read more

Update Data in Oracle

Read more

UPDATE DEBEZIUM.CUSTOMERS SET FIRST_NAME = ‘Sample’, LAST_NAME = ‘Sample’, EMAIL = ‘sample@example.com’ WHERE ID = 1010;

Read more

Oracle Database

Read more
Read more

CDC Event Generated

Read more

{"schema": {"type": "struct","fields": [{"type": "struct","fields": [{"type": "int16","optional": false,"field": "ID"},{"type": "string","optional": false,"field": "FIRST_NAME"},{"type": "string","optional": false,"field": "LAST_NAME"},{"type": "string","optional": false,"field": "EMAIL"}],"optional": true,"name": "server1.DEBEZIUM.CUSTOMERS.Value","field": "before"},{"type": "struct","fields": [{"type": "int16","optional": false,"field": "ID"},{"type": "string","optional": false,"field": "FIRST_NAME"},{"type": "string","optional": false,"field": "LAST_NAME"},{"type": "string","optional": false,"field": "EMAIL"}],"optional": true,"name": "server1.DEBEZIUM.CUSTOMERS.Value","field": "after"},{"type": "struct","fields": [{"type": "string","optional": false,"field": "version"},{"type": "string","optional": false,"field": "connector"},{"type": "string","optional": false,"field": "name"},{"type": "int64","optional": false,"field": "ts_ms"},{"type": "string","optional": true,"name": "io.debezium.data.Enum","version": 1,"parameters": {"allowed": "true,last,false,incremental"},"default": "false","field": "snapshot"},{"type": "string","optional": false,"field": "db"},{"type": "string","optional": true,"field": "sequence"},{"type": "string","optional": false,"field": "schema"},{"type": "string","optional": false,"field": "table"},{"type": "string","optional": true,"field": "txId"},{"type": "string","optional": true,"field": "scn"},{"type": "string","optional": true,"field": "commit_scn"},{"type": "string","optional": true,"field": "lcr_position"}],"optional": false,"name": "io.debezium.connector.oracle.Source","field": "source"},{"type": "string","optional": false,"field": "op"},{"type": "int64","optional": true,"field": "ts_ms"},{"type": "struct","fields": [{"type": "string","optional": false,"field": "id"},{"type": "int64","optional": false,"field": "total_order"},{"type": "int64","optional": false,"field": "data_collection_order"}],"optional": true,"field": "transaction"}],"optional": false,"name": "server1.DEBEZIUM.CUSTOMERS.Envelope"},"payload": {"before": {"ID": 1010,"FIRST_NAME": "Test","LAST_NAME": "Test","EMAIL": "test@example.com"},"after": {"ID": 1010,"FIRST_NAME": "Sample","LAST_NAME": "Sample","EMAIL": "sample@example.com"},"source": {"version": "1.8.1.Final","connector": "oracle","name": "server1","ts_ms": 1645967651000,"snapshot": "false","db": "ORCLPDB1","sequence": null,"schema": "DEBEZIUM","table": "CUSTOMERS","txId": "08001400ea020000","scn": "5542287","commit_scn": "5542288","lcr_position": null},"op": "u","ts_ms": 1645967653034,"transaction": null}}

Read more

PostgreSQL Database

Read more
Read more

Monitoring Dashboard

Read more

Runtime statistics are exposed by JMX MBEAN from Debezium connector docker image.

Read more

jconsole (localhost:1976)

Read more
Read more

Grafana Dashboard (localhost:3000 (admin/admin))

Read more

All JMX MBEANs attributes are pushed to Prometheus. Following is the basic dashboard which shows few interesting statistics about the snapshot & stream.

Read more
Read more

Conclusion

Read more

Debezium is an industry standard tool for data replication between one database to another database/storages. It has both source & destination/sink connectors to make sure it connects almost all storage systems. The regular data replication needs no coding at all and can done with configurations only, the above POC application can be setup & run in less than 10 mins to test the features of Debezium. Running Debezium in production system may need more involved process setup to make sure availability, fault tolerance & recovery from a catastrophic failure

Read more

Did you like this story?

Please share by clicking this button!

Visit our site and see all other available articles!

Influencer Magazine UK