[go: up one dir, main page]

Skip to main content

Generic PostgreSQL CDC Setup Guide

This guide covers setting up Change Data Capture (CDC) for PostgreSQL running on any server infrastructure including EC2 instances, virtual machines, bare metal servers, or on-premises installations. The setup uses logical replication with the pgoutput plugin (native to PostgreSQL 10+) for real-time data streaming.

Prerequisites:

  • PostgreSQL version 10+ (pgoutput is built-in from version 10 onwards)
  • Superuser access or ability to modify postgresql.conf and pg_hba.conf
  • TLS/SSL enabled for secure connections (recommended)
  • Sufficient disk space for WAL retention
Read Replica Support

PostgreSQL 16+: Logical replication on read replicas is supported but may have limitations. Row deletions and vacuum operations on the primary can affect replication slot stability.

PostgreSQL 15 and earlier: Read replicas cannot act as publishers for logical replication. Connect OLake to the primary instance for CDC functionality.

Recommendation: Use the primary instance for production CDC workloads to ensure reliability.

Steps:

1. Verify pgoutput availability​

The pgoutput plugin is built into PostgreSQL 10 and later versions. No separate installation is required.

Verify your PostgreSQL version:

SELECT version();

If you see PostgreSQL 10.0 or higher, pgoutput is automatically available.

2. Configure PostgreSQL Parameters​

Edit postgresql.conf (usually in /etc/postgresql/15/main/ or /var/lib/pgsql/15/data/):

# Enable logical replication
wal_level = logical

# Set replication limits
max_replication_slots = 10
max_wal_senders = 10

# Optional: Prevent timeouts during long snapshots
wal_sender_timeout = 0

# Optional: Control WAL retention
max_slot_wal_keep_size = 1GB

# Performance tuning (adjust based on your workload)
checkpoint_timeout = 15min
max_wal_size = 2GB
ParameterRecommended ValuePurpose
wal_levellogicalEnable logical decoding
max_replication_slotsβ‰₯5Number of concurrent CDC connections
max_wal_sendersβ‰₯7Should exceed replication slots
wal_sender_timeout0Prevents snapshot timeouts
max_slot_wal_keep_size1GB+Prevents WAL accumulation

3. Configure Client Authentication​

Edit pg_hba.conf to allow replication connections. Add entries for your OLake client:

# Allow replication connections from OLake
host replication cdc_user 10.0.0.0/16 md5
host all cdc_user 10.0.0.0/16 md5

# For SSL connections (recommended)
hostssl replication cdc_user 10.0.0.0/16 md5
hostssl all cdc_user 10.0.0.0/16 md5

Replace 10.0.0.0/16 with your actual network range or specific IP addresses.

4. Restart PostgreSQL​

Apply configuration changes by restarting PostgreSQL:

# Ubuntu/Debian
sudo systemctl restart postgresql

# CentOS/RHEL
sudo systemctl restart postgresql-15 # Replace with your version

# Or using pg_ctl
sudo -u postgres pg_ctl restart -D /var/lib/pgsql/15/data/

Verify Configuration:

SELECT name, setting FROM pg_settings 
WHERE name IN ('wal_level', 'max_replication_slots', 'max_wal_senders');

5. Create Replication User​

Connect as a superuser (usually postgres) and create a dedicated CDC user:

-- Create user with replication privileges
CREATE USER cdc_user WITH PASSWORD 'strongpassword';
ALTER ROLE cdc_user WITH REPLICATION;

Grant Database Access:

-- Grant connection to specific database
GRANT CONNECT ON DATABASE mydb TO cdc_user;

Optional: Read-only Access for Initial Snapshot

For stricter security, grant explicit read permissions:

-- Example for the "public" schema in database mydb
GRANT USAGE ON SCHEMA public TO cdc_user;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO cdc_user;
-- Ensure future tables are covered automatically
ALTER DEFAULT PRIVILEGES IN SCHEMA public GRANT SELECT ON TABLES TO cdc_user;

Multiple Schema Access:

-- Repeat for each schema you want to sync
GRANT USAGE ON SCHEMA schema1 TO cdc_user;
GRANT SELECT ON ALL TABLES IN SCHEMA schema1 TO cdc_user;
ALTER DEFAULT PRIVILEGES IN SCHEMA schema1 GRANT SELECT ON TABLES TO cdc_user;

6. Configure Replica Identity for Tables​

Critical for pgoutput: Tables must have a proper replica identity to track UPDATE and DELETE operations.

Understanding Replica Identity:

  • DEFAULT: Uses the primary key (if exists)
  • FULL: Uses all columns (highest overhead, works without primary key)
  • INDEX: Uses a specific unique index
  • NOTHING: Only INSERT operations are replicated

Set Replica Identity for Tables:

-- Option 1: Use primary key (recommended, works if table has PK)
ALTER TABLE public.my_table REPLICA IDENTITY DEFAULT;

-- Option 2: Use all columns (works for tables without PK, but higher overhead)
ALTER TABLE public.my_table REPLICA IDENTITY FULL;

-- Option 3: Use a specific unique index
ALTER TABLE public.my_table REPLICA IDENTITY USING INDEX my_unique_index;

-- Option 4: Do not log old row values (NOTHING)
ALTER TABLE public.my_table REPLICA IDENTITY NOTHING;

Verify Replica Identity:

SELECT 
n.nspname AS schema_name,
c.relname AS table_name,
CASE c.relreplident
WHEN 'd' THEN 'DEFAULT'
WHEN 'n' THEN 'NOTHING'
WHEN 'f' THEN 'FULL'
WHEN 'i' THEN 'INDEX'
END AS replica_identity
FROM pg_class c
JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE c.relkind = 'r'
AND n.nspname = 'public';

7. Create Publication​

Publications define which tables will be replicated. Create a publication for the tables you want to monitor:

Publication Name Configuration:

Publication Name Configuration

Use the exact publication name that you configure in OLake UI or CLI. The publication name must match the publication parameter in your OLake source configuration.

-- Create publication for all tables in the database
CREATE PUBLICATION olake_publication FOR ALL TABLES;

-- Or create publication for specific tables
CREATE PUBLICATION olake_publication FOR TABLE public.table1, public.table2, public.table3;

-- Or create publication for all tables in a specific schema
CREATE PUBLICATION olake_publication FOR TABLES IN SCHEMA public;

Publication Options:

-- Customize what operations are published
CREATE PUBLICATION olake_publication FOR ALL TABLES
WITH (publish = 'insert,update,delete,truncate');

-- Only publish INSERT and UPDATE operations
CREATE PUBLICATION olake_publication FOR ALL TABLES
WITH (publish = 'insert,update');

-- For partitioned tables (PostgreSQL 13+): publish changes via the partition root
CREATE PUBLICATION olake_publication FOR ALL TABLES
WITH (publish = 'insert,update,delete,truncate', publish_via_partition_root = true);

Verify Publication:

-- List all publications
SELECT * FROM pg_publication;

-- View tables in a publication
SELECT * FROM pg_publication_tables WHERE pubname = 'olake_publication';

Add Tables to Existing Publication:

-- Add a single table
ALTER PUBLICATION olake_publication ADD TABLE public.new_table;

-- Add multiple tables
ALTER PUBLICATION olake_publication ADD TABLE public.table4, public.table5;

-- Remove a table
ALTER PUBLICATION olake_publication DROP TABLE public.old_table;

8. Create Logical Replication Slot​

Set up the replication slot for OLake to consume changes:

Slot Name Configuration

Use the exact replication slot name that you configure in OLake UI or CLI. The slot name must match the replication_slot parameter in your OLake source configuration.

Best practices:

  • Create the publication before creating the replication slot
  • Create the slot in the same database that OLake will connect to (slots are per-database)
  • Use a unique slot name per OLake connection; slot names cannot start with a number
-- Replace 'your_olake_slot_name' with the slot name from your OLake configuration
SELECT pg_create_logical_replication_slot('your_olake_slot_name', 'pgoutput');

Example with common slot names:

-- If your OLake source config uses "olake_slot"
SELECT pg_create_logical_replication_slot('olake_slot', 'pgoutput');

-- If your OLake source config uses "postgres_slot"
SELECT pg_create_logical_replication_slot('postgres_slot', 'pgoutput');

Verify Slot Creation:

SELECT * FROM pg_replication_slots WHERE slot_name = 'your_olake_slot_name';

9. Test Logical Decoding​

Verify that CDC is working correctly with pgoutput:

-- Create test data
CREATE TABLE IF NOT EXISTS test_table (
id SERIAL PRIMARY KEY,
data TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- Set replica identity
ALTER TABLE test_table REPLICA IDENTITY DEFAULT;

-- Add table to publication
ALTER PUBLICATION olake_publication ADD TABLE test_table;

-- Insert test data
INSERT INTO test_table (data) VALUES ('PostgreSQL pgoutput CDC test - INSERT');

Check for changes using pg_logical_slot_peek_binary_changes:

SELECT *
FROM pg_logical_slot_peek_binary_changes(
'olake_slot', NULL, NULL,
'proto_version','1',
'publication_names','olake_publication'
);

Expected Results:

βœ… Success: Returns binary change records from pgoutput
❌ Permission denied: Check user has REPLICATION role
❌ Publication not found: Verify publication exists and contains tables
❌ Slot not found: Check replication slot was created with pgoutput plugin
❌ No changes returned: Verify table has replica identity configured and is in the publication

important

Ensure your source configuration includes replication_slot and publication names. OLake will stream changes via the pgoutput logical slot.

For secure connections, configure SSL in postgresql.conf:

# Enable SSL
ssl = on
ssl_cert_file = 'server.crt'
ssl_key_file = 'server.key'
ssl_ca_file = 'ca.crt' # Optional for client certificate verification

Generate Self-signed Certificate (for testing):

sudo -u postgres openssl req -new -x509 -days 365 -nodes -text \
-out /var/lib/pgsql/15/data/server.crt \
-keyout /var/lib/pgsql/15/data/server.key \
-subj "/CN=your-server-hostname"

sudo -u postgres chmod 600 /var/lib/pgsql/15/data/server.key

11. Firewall Configuration​

Ensure PostgreSQL port (default 5432) is accessible:

Ubuntu/Debian (ufw):

sudo ufw allow from 10.0.0.0/16 to any port 5432

CentOS/RHEL (firewalld):

sudo firewall-cmd --permanent --add-rich-rule="rule family='ipv4' source address='10.0.0.0/16' port protocol='tcp' port='5432' accept"
sudo firewall-cmd --reload

AWS EC2 Security Groups:

  • Add inbound rule for port 5432 from OLake's IP addresses
  • Ensure outbound rules allow responses

Now that you have configured the PostgreSQL server and created the CDC user, you can add the PostgreSQL source in OLake to build an ELT pipeline to Iceberg or Parquet. See the PostgreSQL connector overview for a high-level walkthrough.

Troubleshooting​

Configuration Issues​

  • pgoutput Plugin Not Found:

    ERROR: could not access file "pgoutput": No such file or directory

    Solution: Please make sure you have installed PostgreSQL 10 or higher.

  • WAL Level Not Logical:

    ERROR: logical decoding requires wal_level >= logical

    Solution: Set wal_level = logical in postgresql.conf and restart PostgreSQL.

  • Configuration Not Applied: If settings don't take effect:

    • Verify you edited the correct postgresql.conf file: SHOW config_file;
    • Ensure PostgreSQL was restarted after changes
    • Check PostgreSQL logs for configuration errors

Publication Issues​

Publication Not Found:

ERROR: publication "publication_name" does not exist

Solution: Create the publication or verify the name matches exactly (case-sensitive):

-- Check existing publications
SELECT * FROM pg_publication;

-- Create publication if missing
CREATE PUBLICATION olake_publication FOR ALL TABLES;

Publication Required Error:

ERROR: publication is required for pgoutput
ERROR: failed to validate cdc configuration for slot: publication is required for pgoutput

Solution: Ensure the publication parameter is specified in your source.json configuration with the exact name of an existing PostgreSQL publication.

Permission Issues​

  • Replication Slot Creation Failed:

    ERROR: permission denied to create replication slot

    Solution: Ensure the user has REPLICATION role: ALTER ROLE cdc_user WITH REPLICATION;

  • Table Access Denied:

    ERROR: permission denied for table "some_table"

    Solution: Grant SELECT privileges on the schema and tables the user needs to access.

Connection Issues​

  • Connection Refused: Verify:

    • PostgreSQL is running: sudo systemctl status postgresql
    • Firewall allows connections on port 5432
    • pg_hba.conf has appropriate entries for your client IP
    • listen_addresses = '*' in postgresql.conf (or specific IP)
  • SSL/TLS Issues:

    • Ensure certificates have correct permissions (600 for private keys)
    • Verify certificate hostname matches connection string
    • Check client SSL configuration matches server requirements

Performance and Monitoring​

  • WAL Accumulation: Monitor disk usage as inactive slots retain WAL files:

    SELECT slot_name, active, restart_lsn, confirmed_flush_lsn 
    FROM pg_replication_slots;

    Solution: Drop unused slots or ensure CDC consumers are actively reading.

  • High CPU/Memory Usage: Tune checkpoint settings:

    checkpoint_timeout = 15min
    checkpoint_completion_target = 0.9
    max_wal_size = 2GB
  • Slow Snapshots: For large tables, consider:

    • Increasing work_mem for the CDC session
    • Using wal_sender_timeout = 0 to prevent timeouts
    • Running snapshots during low-activity periods

Schema and Data Issues​

  • Missing Changes for Updates/Deletes: Ensure tables have primary keys or set replica identity:

    ALTER TABLE my_table REPLICA IDENTITY FULL;  -- For tables without PK
  • Large Transaction Handling: For very large transactions, monitor:

    • WAL disk space usage
    • Replication slot lag
    • Memory usage during decoding

Security Best Practices​

  1. Use Dedicated User: Create a specific user for CDC with minimal required privileges
  2. Enable TLS: Always use encrypted connections for production
  3. Network Security: Restrict access using firewalls and security groups
  4. Regular Monitoring: Set up alerts for replication lag and disk usage
  5. Password Security: Use strong passwords and consider certificate-based authentication

This setup provides a robust foundation for CDC from any PostgreSQL server to OLake, enabling real-time data integration with your data lake infrastructure.



πŸ’‘ Join the OLake Community!

Got questions, ideas, or just want to connect with other data engineers?
πŸ‘‰ Join our Slack Community to get real-time support, share feedback, and shape the future of OLake together. πŸš€

Your success with OLake is our priority. Don’t hesitate to contact us if you need any help or further clarification!