Real time CDC from Oracle to PostgreSQL using Debezium, Docker Compose with Grafana

This Proof of Concept demonstrates Debezium’s ability to enable real-time data synchronization between Oracle and PostgreSQL databases through CDC. In addition to replicating data, the solution supports conditional data filtering. A secondary flow is also configured to support analytics and monitoring, feeding live stats into Grafana dashboards.

Simple diagram-

image

Configuration Steps

Following the different files, we will be using for this demo

image

Get Oracle Docker Image

Oracle docker images are not available in public Docker hub repository, it is hosted in Oracle’s own repository. Here are the steps to get the image

Create Debezium Connect Image

As Oracle is not open source, the Debezium connect image is not comes with Oracle driver integrated with it. Need to customise the image with Oracle driver & also Kafka connect for Sink Connector

Download Oracle driver & unzip at oracle_instantclient/ — https://download.oracle.com/otn_software/linux/instantclient/1914000/instantclient-basic-linux.x64-19.14.0.0.0dbru.zip

Download Sink connector & unzip at confluentinc-kafka-connect-jdbc-10.3.3/ — https://www.confluent.io/hub/confluentinc/kafka-connect-jdbc

Download Scripting dependency & unzip at debezium-scripting/ — https://repo1.maven.org/maven2/io/debezium/debezium-scripting/1.8.1.Final/debezium-scripting-1.8.1.Final.tar.gz

Download following groovy dependencies for filter expression at groovy/

https://repo1.maven.org/maven2/org/codehaus/groovy/groovy-jsr223/3.0.9/groovy-jsr223-3.0.9.jar

https://repo1.maven.org/maven2/org/codehaus/groovy/groovy-json/3.0.9/groovy-json-3.0.9.jar

https://repo1.maven.org/maven2/org/codehaus/groovy/groovy/3.0.9/groovy-3.0.9.jar

ARG DEBEZIUM_VERSION
FROM debezium/connect:$DEBEZIUM_VERSION
ENV KAFKA_CONNECT_JDBC_DIR=$KAFKA_CONNECT_PLUGINS_DIR/kafka-connect-jdbc
 
ARG JMX_AGENT_VERSION
RUN mkdir /kafka/etc && cd /kafka/etc &&\
        curl -so jmx_prometheus_javaagent.jar \
        https://repo1.maven.org/maven2/io/prometheus/jmx/jmx_prometheus_javaagent/$JMX_AGENT_VERSION/jmx_prometheus_javaagent-$JMX_AGENT_VERSION.jar
 
COPY config.yml /kafka/etc/config.yml
 
USER kafka
 
# Add Oracle client and drivers
COPY oracle_instantclient/xstreams.jar /kafka/libs
COPY oracle_instantclient/ojdbc8.jar /kafka/libs
 
# Add Confluent kafka connect for jdbc sink connector
COPY confluentinc-kafka-connect-jdbc-10.3.3/lib/*.jar /kafka/connect/kafka-connect-jdbc/
COPY confluentinc-connect-transforms-1.4.2/lib/*.jar /kafka/connect/kafka-connect-transforms/
 
# Add filter dependencies
COPY debezium-scripting/*.jar /kafka/connect/debezium-connector-oracle/
COPY groovy/*.jar /kafka/connect/debezium-connector-oracle/

Run Containers

Following is the docker compose file to initiate all required docker containers.

If you are running Docker Desktop, it may choke the resources as there are seven Docker containers in the compose file. You may need to increase the memory to 6g in the Docker Desktop settings.

Oracle container may take ~20 minutes to spin up. Use volume mount for oracle container to faster initialisation second time onwards.

version: ‘2’
services:
  oracledb19:
    image: container-registry.oracle.com/database/enterprise:19.3.0.0
    ports:
     – 1521:1521
    environment:
     – ORACLE_PWD=top_secret
    volumes:
     – /tmp/oradata:/opt/oracle/oradata
  postgres:
    image: debezium/postgres:9.6
    ports:
     – 5432:5432
    environment:
     – POSTGRES_USER=postgresuser
     – POSTGRES_PASSWORD=postgrespw
     – POSTGRES_DB=inventory
  zookeeper:
    image: debezium/zookeeper:${DEBEZIUM_VERSION}
    ports:
     – 2181:2181
     – 2888:2888
     – 3888:3888
  kafka:
    image: debezium/kafka:${DEBEZIUM_VERSION}
    ports:
     – 9092:9092
    links:
     – zookeeper
    environment:
     – ZOOKEEPER_CONNECT=zookeeper:2181
    depends_on:
     – zookeeper
  prometheus:
    build:
      context: debezium-prometheus
      args:
        PROMETHEUS_VERSION: v2.26.0
    ports:
     – 9090:9090
    links:
     – connect
    depends_on:
     – connect
  grafana:
    build:
      context: debezium-grafana
      args:
        GRAFANA_VERSION: 7.5.5
    ports:
     – 3000:3000
    links:
     – prometheus
    depends_on:
     – prometheus
    environment:
     – DS_PROMETHEUS=prometheus
  connect:
    image: debezium/connect-with-oracle-jdbc:${DEBEZIUM_VERSION}
    build:
      context: debezium-with-oracle-jdbc
      args:
        DEBEZIUM_VERSION: ${DEBEZIUM_VERSION}
        JMX_AGENT_VERSION: 0.15.0
    ports:
     – 8083:8083
     – 5005:5005
     – 1976:1976
    links:
     – kafka
    depends_on:
     – kafka
    environment:
     – BOOTSTRAP_SERVERS=kafka:9092
     – GROUP_ID=1
     – CONFIG_STORAGE_TOPIC=my_connect_configs
     – OFFSET_STORAGE_TOPIC=my_connect_offsets
     – STATUS_STORAGE_TOPIC=my_connect_statuses
     – LD_LIBRARY_PATH=/instant_client
     – KAFKA_DEBUG=true
     – DEBUG_SUSPEND_FLAG=n
     – JAVA_DEBUG_PORT=0.0.0.0:5005
     – KAFKA_OPTS=-javaagent:/kafka/etc/jmx_prometheus_javaagent.jar=8080:/kafka/etc/config.yml
     – JMXPORT=1976
     – JMXHOST=localhost

export DEBEZIUM_VERSION=1.8
docker-compose -f docker-compose-oracle.yaml up — build

Setup Log Miner in Oracle

This process enables sets up the process of LogMining in Oracle. It basically sets up the users & required credentials to facilitate LogMining.

docker exec -it debezium_oracledb19_1 /bin/bash
mkdir /opt/oracle/oradata/recovery_area
curl https://raw.githubusercontent.com/debezium/oracle-vagrant-box/main/setup-logminer.sh | sh

Populate Dummy Data

In order to play around the CDC capabilities, let’s populate the database with few tables & records. The following script creates tables CUSTOMERS, ORDERS, PRODUCTS, PRODUCTS_ON_HAND and populates few records in each tables.

cat inventory.sql | sqlplus debezium/dbz@//localhost:1521/ORCLPDB1

Register Connectors

This connector ‘inventory-connector’ is a OracleConnector which start the LogMiner process, detects the changes and pushes messages to Kafka topic. By default, separate topic will be created based on the tables in schemas.

Route SMT

By default, the topic name is created as <Server Name>.<Schema>.<Table Name> but this format is not recognised by the Jdbc Sink Connector. So we need to transform the topic name to a simples one <Table Name>.

{
  “name”: “inventory-connector”,
  “config”: {
    “connector.class” : “io.debezium.connector.oracle.OracleConnector”,
    “tasks.max” : “1”,
    “database.server.name” : “server1”,
    “database.hostname” : “oracledb19”,
    “database.port” : “1521”,
    “database.user” : “c##dbzuser”,
    “database.password” : “dbz”,
    “database.dbname” : “ORCLCDB”,
    “database.pdb.name” : “ORCLPDB1”,
    “database.connection.adapter” : “logminer”,
    “database.history.kafka.bootstrap.servers” : “kafka:9092”,
    “database.history.kafka.topic”: “schema-changes.inventory”,
 
    “transforms”: “route”,
    “transforms.route.type”: “org.apache.kafka.connect.transforms.RegexRouter”,
    “transforms.route.regex”: “([^.]+)\\.([^.]+)\\.([^.]+)”,
    “transforms.route.replacement”: “$3”
  }
}

view rawregister-oracle-logminer.json

Filter SMT

Optionally you may include filter expression to filter events based on groovy expressions on the content.

"transforms": "filter",
"transforms.filter.type": "io.debezium.transforms.Filter",
"transforms.filter.language": "jsr223.groovy",
"transforms.filter.condition": "value.op == 'u',
"transforms.filter.topic.regex": "server1.DEBEZIUM.*",

This following connector ‘jdbc-sink’ is a Sink Connector which listens to the change events from the Kafka topics and pushes to PostgreSQL Database.

{
    “name”: “jdbc-sink”,
    “config”: {
        “connector.class”: “io.confluent.connect.jdbc.JdbcSinkConnector”,
        “tasks.max”: “1”,
        “topics”: “CUSTOMERS,ORDERS,PRODUCTS,PRODUCTS_ON_HAND”,
        “connection.url”: “jdbc:postgresql://postgres:5432/inventory?user=postgresuser&password=postgrespw”,
        “transforms”: “unwrap”,
        “transforms.unwrap.type”: “io.debezium.transforms.ExtractNewRecordState”,
        “transforms.unwrap.drop.tombstones”: “false”,
        “auto.create”: “true”,
        “insert.mode”: “upsert”,
        “delete.enabled”: “true”,
        “pk.mode”: “record_key”
    }
}

view rawjdbc-sink.json 

curl -i -X POST -H “Accept:application/json” -H “Content-Type:application/json” http://localhost:8083/connectors/ -d @register-oracle-logminer.json

curl -i -X POST -H “Accept:application/json” -H “Content-Type:application/json” http://localhost:8083/connectors/ -d @jdbc-sink.json

Sample Database Operations

Initial Snapshot & Sync

By default, Oracle connector creates the read events from all tables and pushes to Kafka topics. This behaviour may be changed to setting “snapshot.mode” to “schema_only” where only initial sync will not happen.

Oracle Database

image

PostgreSQL Database

image

Insert Data in Oracle

INSERT INTO DEBEZIUM.CUSTOMERS VALUES (1010, ‘Test’, ‘Test’, ‘test@example.com’);

Oracle Database

image

CDC Event Generated

{
“schema”: {
“type”: “struct”,
“fields”: [
{
“type”: “struct”,
“fields”: [
{
“type”: “int16”,
“optional”: false,
“field”: “ID”
},
{
“type”: “string”,
“optional”: false,
“field”: “FIRST_NAME”
},
{
“type”: “string”,
“optional”: false,
“field”: “LAST_NAME”
},
{
“type”: “string”,
“optional”: false,
“field”: “EMAIL”
}
],
“optional”: true,
“name”: “server1.DEBEZIUM.CUSTOMERS.Value”,
“field”: “before”
},
{
“type”: “struct”,
“fields”: [
{
“type”: “int16”,
“optional”: false,
“field”: “ID”
},
{
“type”: “string”,
“optional”: false,
“field”: “FIRST_NAME”
},
{
“type”: “string”,
“optional”: false,
“field”: “LAST_NAME”
},
{
“type”: “string”,
“optional”: false,
“field”: “EMAIL”
}
],
“optional”: true,
“name”: “server1.DEBEZIUM.CUSTOMERS.Value”,
“field”: “after”
},
{
“type”: “struct”,
“fields”: [
{
“type”: “string”,
“optional”: false,
“field”: “version”
},
{
“type”: “string”,
“optional”: false,
“field”: “connector”
},
{
“type”: “string”,
“optional”: false,
“field”: “name”
},
{
“type”: “int64”,
“optional”: false,
“field”: “ts_ms”
},
{
“type”: “string”,
“optional”: true,
“name”: “io.debezium.data.Enum”,
“version”: 1,
“parameters”: {
“allowed”: “true,last,false,incremental”
},
“default”: “false”,
“field”: “snapshot”
},
{
“type”: “string”,
“optional”: false,
“field”: “db”
},
{
“type”: “string”,
“optional”: true,
“field”: “sequence”
},
{
“type”: “string”,
“optional”: false,
“field”: “schema”
},
{
“type”: “string”,
“optional”: false,
“field”: “table”
},
{
“type”: “string”,
“optional”: true,
“field”: “txId”
},
{
“type”: “string”,
“optional”: true,
“field”: “scn”
},
{
“type”: “string”,
“optional”: true,
“field”: “commit_scn”
},
{
“type”: “string”,
“optional”: true,
“field”: “lcr_position”
}
],
“optional”: false,
“name”: “io.debezium.connector.oracle.Source”,
“field”: “source”
},
{
“type”: “string”,
“optional”: false,
“field”: “op”
},
{
“type”: “int64”,
“optional”: true,
“field”: “ts_ms”
},
{
“type”: “struct”,
“fields”: [
{
“type”: “string”,
“optional”: false,
“field”: “id”
},
{
“type”: “int64”,
“optional”: false,
“field”: “total_order”
},
{
“type”: “int64”,
“optional”: false,
“field”: “data_collection_order”
}
],
“optional”: true,
“field”: “transaction”
}
],
“optional”: false,
“name”: “server1.DEBEZIUM.CUSTOMERS.Envelope”
},
“payload”: {
“before”: null,
“after”: {
“ID”: 1010,
“FIRST_NAME”: “Test”,
“LAST_NAME”: “Test”,
“EMAIL”: “test@example.com
},
“source”: {
“version”: “1.8.1.Final”,
“connector”: “oracle”,
“name”: “server1”,
“ts_ms”: 1645967024000,
“snapshot”: “false”,
“db”: “ORCLPDB1”,
“sequence”: null,
“schema”: “DEBEZIUM”,
“table”: “CUSTOMERS”,
“txId”: “09002100b3020000”,
“scn”: “5494041”,
“commit_scn”: “5494042”,
“lcr_position”: null
},
“op”: “c”,
“ts_ms”: 1645967029112,
“transaction”: null
}
}

PostgreSQL Database

image

Update Data in Oracle

UPDATE DEBEZIUM.CUSTOMERS SET FIRST_NAME = ‘Sample’, LAST_NAME = ‘Sample’, EMAIL = ‘sample@example.com’ WHERE ID = 1010;

Oracle Database

image

CDC Event Generated

{
“schema”: {
“type”: “struct”,
“fields”: [
{
“type”: “struct”,
“fields”: [
{
“type”: “int16”,
“optional”: false,
“field”: “ID”
},
{
“type”: “string”,
“optional”: false,
“field”: “FIRST_NAME”
},
{
“type”: “string”,
“optional”: false,
“field”: “LAST_NAME”
},
{
“type”: “string”,
“optional”: false,
“field”: “EMAIL”
}
],
“optional”: true,
“name”: “server1.DEBEZIUM.CUSTOMERS.Value”,
“field”: “before”
},
{
“type”: “struct”,
“fields”: [
{
“type”: “int16”,
“optional”: false,
“field”: “ID”
},
{
“type”: “string”,
“optional”: false,
“field”: “FIRST_NAME”
},
{
“type”: “string”,
“optional”: false,
“field”: “LAST_NAME”
},
{
“type”: “string”,
“optional”: false,
“field”: “EMAIL”
}
],
“optional”: true,
“name”: “server1.DEBEZIUM.CUSTOMERS.Value”,
“field”: “after”
},
{
“type”: “struct”,
“fields”: [
{
“type”: “string”,
“optional”: false,
“field”: “version”
},
{
“type”: “string”,
“optional”: false,
“field”: “connector”
},
{
“type”: “string”,
“optional”: false,
“field”: “name”
},
{
“type”: “int64”,
“optional”: false,
“field”: “ts_ms”
},
{
“type”: “string”,
“optional”: true,
“name”: “io.debezium.data.Enum”,
“version”: 1,
“parameters”: {
“allowed”: “true,last,false,incremental”
},
“default”: “false”,
“field”: “snapshot”
},
{
“type”: “string”,
“optional”: false,
“field”: “db”
},
{
“type”: “string”,
“optional”: true,
“field”: “sequence”
},
{
“type”: “string”,
“optional”: false,
“field”: “schema”
},
{
“type”: “string”,
“optional”: false,
“field”: “table”
},
{
“type”: “string”,
“optional”: true,
“field”: “txId”
},
{
“type”: “string”,
“optional”: true,
“field”: “scn”
},
{
“type”: “string”,
“optional”: true,
“field”: “commit_scn”
},
{
“type”: “string”,
“optional”: true,
“field”: “lcr_position”
}
],
“optional”: false,
“name”: “io.debezium.connector.oracle.Source”,
“field”: “source”
},
{
“type”: “string”,
“optional”: false,
“field”: “op”
},
{
“type”: “int64”,
“optional”: true,
“field”: “ts_ms”
},
{
“type”: “struct”,
“fields”: [
{
“type”: “string”,
“optional”: false,
“field”: “id”
},
{
“type”: “int64”,
“optional”: false,
“field”: “total_order”
},
{
“type”: “int64”,
“optional”: false,
“field”: “data_collection_order”
}
],
“optional”: true,
“field”: “transaction”
}
],
“optional”: false,
“name”: “server1.DEBEZIUM.CUSTOMERS.Envelope”
},
“payload”: {
“before”: {
“ID”: 1010,
“FIRST_NAME”: “Test”,
“LAST_NAME”: “Test”,
“EMAIL”: “test@example.com
},
“after”: {
“ID”: 1010,
“FIRST_NAME”: “Sample”,
“LAST_NAME”: “Sample”,
“EMAIL”: “sample@example.com
},
“source”: {
“version”: “1.8.1.Final”,
“connector”: “oracle”,
“name”: “server1”,
“ts_ms”: 1645967651000,
“snapshot”: “false”,
“db”: “ORCLPDB1”,
“sequence”: null,
“schema”: “DEBEZIUM”,
“table”: “CUSTOMERS”,
“txId”: “08001400ea020000”,
“scn”: “5542287”,
“commit_scn”: “5542288”,
“lcr_position”: null
},
“op”: “u”,
“ts_ms”: 1645967653034,
“transaction”: null
}
}

PostgreSQL Database

image

Monitoring Dashboard

Runtime statistics are exposed by JMX MBEAN from Debezium connector docker image.

jconsole (localhost:1976)

image

Grafana Dashboard (localhost:3000 (admin/admin))

All JMX MBEANs attributes are pushed to Prometheus. Following is the basic dashboard which shows few interesting statistics about the snapshot & stream.

image

Conclusion

Debezium is an industry standard tool for data replication between one database to another database/storages. It has both source & destination/sink connectors to make sure it connects almost all storage systems. The regular data replication needs no coding at all and can done with configurations only, the above POC application can be setup & run in less than 10 mins to test the features of Debezium. Running Debezium in production system may need more involved process setup to make sure availability, fault tolerance & recovery from a catastrophic failure

n

How Much Does Drake Charge For A Collab

Screenshot

Titt4tat Reinvents Online Dating: For Singles Who Want to Stay Single