How to Connect Python to CockroachDB: Complete Guide
CockroachDB is a distributed SQL database designed for cloud-native applications. It combines the familiarity of SQL with the scalability of NoSQL, offering horizontal scaling, strong consistency, and automatic failover. This comprehensive guide covers everything you need to know about connecting Python to CockroachDB.
Why Use CockroachDB with Python?
CockroachDB with Python offers powerful capabilities:
- Distributed SQL database with PostgreSQL compatibility
- Automatic horizontal scaling across multiple nodes
- Strong consistency with ACID guarantees
- Built-in replication and automatic failover
- Geo-distributed capabilities for global applications
- Survives infrastructure failures automatically
- Compatible with PostgreSQL drivers and tools
- No manual sharding required
Prerequisites
Before connecting Python to CockroachDB, ensure you have:
- Python 3.6 or higher installed
- CockroachDB cluster running (local or cloud)
- psycopg2 driver (CockroachDB is PostgreSQL-compatible)
- Connection details (host, port, database, user, password/certificates)
- Basic understanding of SQL and distributed systems
Installing Required Libraries
Since CockroachDB is PostgreSQL-compatible, we use psycopg2:
bash
pip install psycopg2-binary
For production, also install:
bash
pip install sqlalchemy cockroachdb-python
Basic Connection to CockroachDB
Local Connection
python
import psycopg2
# Connect to local CockroachDB
connection = psycopg2.connect(
host='localhost',
port=26257,
database='defaultdb',
user='root',
sslmode='disable' # Only for local dev
)
print("Connected to CockroachDB successfully!")
# Get version
cursor = connection.cursor()
cursor.execute("SELECT version()")
version = cursor.fetchone()[0]
print(f"CockroachDB version: {version}")
cursor.close()
connection.close()
Secure Connection with Certificates
python
import psycopg2
# Connect with SSL certificates (production)
connection = psycopg2.connect(
host='your-cluster.cockroachlabs.cloud',
port=26257,
database='your_database',
user='your_username',
password='your_password',
sslmode='verify-full',
sslrootcert='/path/to/ca.crt'
)
print("Connected to CockroachDB Cloud!")
connection.close()
Connection with Error Handling
python
import psycopg2
from psycopg2 import OperationalError, Error
def create_connection():
try:
connection = psycopg2.connect(
host='localhost',
port=26257,
database='company',
user='root',
sslmode='disable'
)
print("Successfully connected to CockroachDB")
# Set default search path
cursor = connection.cursor()
cursor.execute("SET search_path = public")
cursor.close()
return connection
except OperationalError as e:
print(f"Operational error: {e}")
return None
except Error as e:
print(f"Error connecting: {e}")
return None
# Usage
conn = create_connection()
if conn:
conn.close()
Creating Tables
python
import psycopg2
connection = psycopg2.connect(
host='localhost',
port=26257,
database='company',
user='root',
sslmode='disable'
)
cursor = connection.cursor()
# Create employees table
create_table_sql = """
CREATE TABLE IF NOT EXISTS employees (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
first_name STRING NOT NULL,
last_name STRING NOT NULL,
email STRING UNIQUE,
department STRING,
salary DECIMAL(10, 2),
hire_date DATE,
created_at TIMESTAMP DEFAULT now()
)
"""
cursor.execute(create_table_sql)
connection.commit()
print("Table created successfully!")
cursor.close()
connection.close()
Inserting Data
Insert Single Record
python
import psycopg2
from datetime import date
connection = psycopg2.connect(
host='localhost',
port=26257,
database='company',
user='root',
sslmode='disable'
)
cursor = connection.cursor()
# Insert single employee
insert_sql = """
INSERT INTO employees (first_name, last_name, email, department, salary, hire_date)
VALUES (%s, %s, %s, %s, %s, %s)
RETURNING id
"""
employee_data = ('John', 'Doe', 'john.doe@company.com', 'Engineering', 75000.00, date.today())
cursor.execute(insert_sql, employee_data)
employee_id = cursor.fetchone()[0]
connection.commit()
print(f"Employee inserted with ID: {employee_id}")
cursor.close()
connection.close()
Bulk Insert with Retry Logic
CockroachDB may require retries for transaction contention:
python
import psycopg2
from psycopg2 import Error
import time
def insert_employees_with_retry(connection, employees_data, max_retries=3):
for attempt in range(max_retries):
try:
cursor = connection.cursor()
insert_sql = """
INSERT INTO employees (first_name, last_name, email, department, salary)
VALUES (%s, %s, %s, %s, %s)
"""
cursor.executemany(insert_sql, employees_data)
connection.commit()
print(f"Successfully inserted {cursor.rowcount} employees")
cursor.close()
return True
except Error as e:
if "40001" in str(e): # Retry error code
print(f"Transaction retry needed (attempt {attempt + 1})")
connection.rollback()
time.sleep(0.1 * (2 ** attempt)) # Exponential backoff
else:
print(f"Error: {e}")
connection.rollback()
return False
print("Max retries exceeded")
return False
# Usage
connection = psycopg2.connect(
host='localhost',
port=26257,
database='company',
user='root',
sslmode='disable'
)
employees_data = [
('Jane', 'Smith', 'jane@company.com', 'Marketing', 68000.00),
('Bob', 'Johnson', 'bob@company.com', 'Sales', 72000.00),
('Alice', 'Williams', 'alice@company.com', 'Engineering', 80000.00)
]
insert_employees_with_retry(connection, employees_data)
connection.close()
Querying Data
Basic Queries
python
import psycopg2
from psycopg2.extras import RealDictCursor
connection = psycopg2.connect(
host='localhost',
port=26257,
database='company',
user='root',
sslmode='disable'
)
# Use RealDictCursor for dictionary access
cursor = connection.cursor(cursor_factory=RealDictCursor)
# Select all employees
select_sql = "SELECT * FROM employees ORDER BY last_name"
cursor.execute(select_sql)
employees = cursor.fetchall()
print(f"Total employees: {len(employees)}\n")
for emp in employees:
print(f"Name: {emp['first_name']} {emp['last_name']}, Dept: {emp['department']}")
cursor.close()
connection.close()
Parameterized Queries
python
import psycopg2
connection = psycopg2.connect(
host='localhost',
port=26257,
database='company',
user='root',
sslmode='disable'
)
cursor = connection.cursor()
# Query with parameters
select_sql = "SELECT * FROM employees WHERE department = %s AND salary > %s"
cursor.execute(select_sql, ('Engineering', 70000))
employees = cursor.fetchall()
print("Senior Engineers:")
for emp in employees:
print(f"{emp[1]} {emp[2]} - ${emp[5]}")
cursor.close()
connection.close()
Updating Records with Retry Logic
python
import psycopg2
from psycopg2 import Error
import time
def update_with_retry(connection, update_func, max_retries=3):
for attempt in range(max_retries):
try:
cursor = connection.cursor()
update_func(cursor)
connection.commit()
cursor.close()
return True
except Error as e:
if "40001" in str(e):
print(f"Retry needed (attempt {attempt + 1})")
connection.rollback()
time.sleep(0.1 * (2 ** attempt))
else:
print(f"Error: {e}")
connection.rollback()
return False
return False
# Usage
connection = psycopg2.connect(
host='localhost',
port=26257,
database='company',
user='root',
sslmode='disable'
)
def update_salary(cursor):
cursor.execute(
"UPDATE employees SET salary = %s WHERE id = %s",
(85000.00, 'some-uuid-here')
)
success = update_with_retry(connection, update_salary)
print(f"Update {'succeeded' if success else 'failed'}")
connection.close()
Distributed Transactions
python
import psycopg2
from psycopg2 import Error
connection = psycopg2.connect(
host='localhost',
port=26257,
database='company',
user='root',
sslmode='disable'
)
cursor = connection.cursor()
try:
# Begin transaction
cursor.execute("BEGIN")
# Transfer budget between departments
cursor.execute("UPDATE departments SET budget = budget - 10000 WHERE name = 'Engineering'")
cursor.execute("UPDATE departments SET budget = budget + 10000 WHERE name = 'Marketing'")
# Commit transaction
cursor.execute("COMMIT")
print("Transaction completed successfully")
except Error as e:
cursor.execute("ROLLBACK")
print(f"Transaction failed: {e}")
finally:
cursor.close()
connection.close()
Working with Geo-Partitioned Data
CockroachDB excels at geo-distribution:
python
import psycopg2
connection = psycopg2.connect(
host='localhost',
port=26257,
database='company',
user='root',
sslmode='disable'
)
cursor = connection.cursor()
# Create geo-partitioned table
create_table_sql = """
CREATE TABLE IF NOT EXISTS user_data (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
user_id INT,
region STRING NOT NULL,
data JSONB,
created_at TIMESTAMP DEFAULT now()
) PARTITION BY LIST (region)
"""
cursor.execute(create_table_sql)
# Create partitions for different regions
cursor.execute("""
ALTER TABLE user_data PARTITION BY LIST (region) (
PARTITION us_east VALUES IN ('us-east'),
PARTITION us_west VALUES IN ('us-west'),
PARTITION europe VALUES IN ('europe'),
PARTITION asia VALUES IN ('asia')
)
""")
connection.commit()
# Insert geo-partitioned data
cursor.execute("""
INSERT INTO user_data (user_id, region, data)
VALUES (%s, %s, %s)
""", (1001, 'us-east', '{"preferences": {"theme": "dark"}}'))
connection.commit()
print("Geo-partitioned table created and data inserted!")
cursor.close()
connection.close()
Using JSONB Columns
python
import psycopg2
import json
connection = psycopg2.connect(
host='localhost',
port=26257,
database='company',
user='root',
sslmode='disable'
)
cursor = connection.cursor()
# Create table with JSONB
cursor.execute("""
CREATE TABLE IF NOT EXISTS employee_metadata (
employee_id UUID PRIMARY KEY,
metadata JSONB
)
""")
# Insert JSONB data
metadata = {
"skills": ["Python", "SQL", "Docker"],
"certifications": ["AWS", "Azure"],
"projects": [{"name": "Project A", "role": "Lead"}]
}
cursor.execute("""
INSERT INTO employee_metadata (employee_id, metadata)
VALUES (gen_random_uuid(), %s)
""", (json.dumps(metadata),))
connection.commit()
# Query JSONB data
cursor.execute("""
SELECT metadata->>'skills' as skills,
metadata->'certifications' as certs
FROM employee_metadata
""")
for row in cursor.fetchall():
print(f"Skills: {row[0]}")
print(f"Certifications: {row[1]}")
cursor.close()
connection.close()
Using SQLAlchemy with CockroachDB
SQLAlchemy provides a higher-level ORM interface:
python
from sqlalchemy import create_engine, Column, Integer, String, Float, Date
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from datetime import date
# Create engine
engine = create_engine(
'cockroachdb://root@localhost:26257/company?sslmode=disable',
echo=True
)
Base = declarative_base()
# Define model
class Employee(Base):
__tablename__ = 'employees_orm'
id = Column(Integer, primary_key=True)
first_name = Column(String(50), nullable=False)
last_name = Column(String(50), nullable=False)
email = Column(String(100), unique=True)
department = Column(String(50))
salary = Column(Float)
hire_date = Column(Date)
# Create tables
Base.metadata.create_all(engine)
# Create session
Session = sessionmaker(bind=engine)
session = Session()
# Insert data
new_employee = Employee(
first_name='John',
last_name='Doe',
email='john@company.com',
department='Engineering',
salary=75000.00,
hire_date=date.today()
)
session.add(new_employee)
session.commit()
print(f"Employee created with ID: {new_employee.id}")
# Query data
engineers = session.query(Employee).filter_by(department='Engineering').all()
print(f"\nEngineers: {len(engineers)}")
for emp in engineers:
print(f"{emp.first_name} {emp.last_name} - ${emp.salary}")
# Update data
employee = session.query(Employee).filter_by(email='john@company.com').first()
if employee:
employee.salary = 80000.00
session.commit()
print(f"Updated {employee.first_name}'s salary")
# Delete data
session.delete(employee)
session.commit()
session.close()
Change Data Capture (CDC)
CockroachDB supports changefeeds for streaming changes:
python
import psycopg2
import time
connection = psycopg2.connect(
host='localhost',
port=26257,
database='company',
user='root',
sslmode='disable'
)
cursor = connection.cursor()
# Create changefeed (Enterprise feature)
cursor.execute("""
CREATE CHANGEFEED FOR TABLE employees
INTO 'kafka://localhost:9092'
WITH updated, resolved
""")
print("Changefeed created for employees table")
cursor.close()
connection.close()
Backup and Restore
python
import psycopg2
connection = psycopg2.connect(
host='localhost',
port=26257,
database='company',
user='root',
sslmode='disable'
)
cursor = connection.cursor()
# Create backup
backup_location = 'nodelocal://1/backups'
cursor.execute(f"""
BACKUP TABLE employees
TO '{backup_location}'
""")
print("Backup created successfully")
# Restore backup
cursor.execute(f"""
RESTORE TABLE employees
FROM '{backup_location}'
""")
print("Backup restored successfully")
cursor.close()
connection.close()
Performance Optimization
Creating Indexes
python
import psycopg2
connection = psycopg2.connect(
host='localhost',
port=26257,
database='company',
user='root',
sslmode='disable'
)
cursor = connection.cursor()
# Create single column index
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_department
ON employees (department)
""")
# Create composite index
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_dept_salary
ON employees (department, salary DESC)
""")
# Create covering index
cursor.execute("""
CREATE INDEX IF NOT EXISTS idx_email_covering
ON employees (email) STORING (first_name, last_name)
""")
connection.commit()
print("Indexes created successfully")
cursor.close()
connection.close()
Query Optimization
python
import psycopg2
connection = psycopg2.connect(
host='localhost',
port=26257,
database='company',
user='root',
sslmode='disable'
)
cursor = connection.cursor()
# Use EXPLAIN to analyze queries
cursor.execute("""
EXPLAIN ANALYZE
SELECT * FROM employees
WHERE department = 'Engineering' AND salary > 70000
""")
plan = cursor.fetchall()
print("Query Plan:")
for row in plan:
print(row[0])
# Collect statistics for better query planning
cursor.execute("CREATE STATISTICS emp_stats FROM employees")
connection.commit()
print("\nStatistics created for query optimization")
cursor.close()
connection.close()
Complete Application Example
python
import psycopg2
from psycopg2.extras import RealDictCursor
from psycopg2 import Error
import time
from contextlib import closing
class CockroachDBManager:
def __init__(self, host, port, database, user, sslmode='disable'):
self.config = {
'host': host,
'port': port,
'database': database,
'user': user,
'sslmode': sslmode
}
def get_connection(self):
return psycopg2.connect(**self.config)
def execute_with_retry(self, operation, max_retries=3):
"""Execute operation with automatic retry for transaction conflicts"""
for attempt in range(max_retries):
try:
with closing(self.get_connection()) as conn:
with closing(conn.cursor(cursor_factory=RealDictCursor)) as cursor:
result = operation(cursor)
conn.commit()
return result
except Error as e:
if "40001" in str(e) and attempt < max_retries - 1:
time.sleep(0.1 * (2 ** attempt))
continue
else:
raise
return None
def create_employee(self, first_name, last_name, email, department, salary):
def operation(cursor):
cursor.execute("""
INSERT INTO employees (first_name, last_name, email, department, salary, hire_date)
VALUES (%s, %s, %s, %s, %s, CURRENT_DATE)
RETURNING id
""", (first_name, last_name, email, department, salary))
return cursor.fetchone()['id']
return self.execute_with_retry(operation)
def get_employee(self, employee_id):
def operation(cursor):
cursor.execute("SELECT * FROM employees WHERE id = %s", (employee_id,))
return cursor.fetchone()
return self.execute_with_retry(operation)
def get_employees_by_department(self, department):
def operation(cursor):
cursor.execute(
"SELECT * FROM employees WHERE department = %s ORDER BY last_name",
(department,)
)
return cursor.fetchall()
return self.execute_with_retry(operation)
def update_salary(self, employee_id, new_salary):
def operation(cursor):
cursor.execute(
"UPDATE employees SET salary = %s WHERE id = %s",
(new_salary, employee_id)
)
return cursor.rowcount
return self.execute_with_retry(operation)
def transfer_budget(self, from_dept, to_dept, amount):
"""Atomic budget transfer between departments"""
def operation(cursor):
cursor.execute("""
UPDATE departments
SET budget = budget - %s
WHERE name = %s AND budget >= %s
""", (amount, from_dept, amount))
if cursor.rowcount == 0:
raise Exception("Insufficient budget")
cursor.execute("""
UPDATE departments
SET budget = budget + %s
WHERE name = %s
""", (amount, to_dept))
return True
return self.execute_with_retry(operation)
def get_department_statistics(self):
def operation(cursor):
cursor.execute("""
SELECT
department,
COUNT(*) as employee_count,
AVG(salary) as avg_salary,
MIN(salary) as min_salary,
MAX(salary) as max_salary
FROM employees
WHERE department IS NOT NULL
GROUP BY department
ORDER BY employee_count DESC
""")
return cursor.fetchall()
return self.execute_with_retry(operation)
def search_employees(self, search_term):
def operation(cursor):
cursor.execute("""
SELECT * FROM employees
WHERE first_name ILIKE %s
OR last_name ILIKE %s
OR email ILIKE %s
""", (f'%{search_term}%', f'%{search_term}%', f'%{search_term}%'))
return cursor.fetchall()
return self.execute_with_retry(operation)
# Usage
db = CockroachDBManager(
host='localhost',
port=26257,
database='company',
user='root'
)
# Create employee
try:
emp_id = db.create_employee('Sarah', 'Connor', 'sarah@company.com', 'Security', 78000)
print(f"Created employee with ID: {emp_id}")
except Exception as e:
print(f"Error creating employee: {e}")
# Get employee
employee = db.get_employee(emp_id)
if employee:
print(f"Employee: {employee['first_name']} {employee['last_name']}")
# Get employees by department
engineers = db.get_employees_by_department('Engineering')
print(f"\nFound {len(engineers)} engineers")
# Update salary with automatic retry
rows_updated = db.update_salary(emp_id, 82000)
print(f"Updated {rows_updated} employee(s)")
# Get statistics
stats = db.get_department_statistics()
print("\nDepartment Statistics:")
for stat in stats:
print(f"{stat['department']}: {stat['employee_count']} employees, "
f"Avg Salary: ${stat['avg_salary']:.2f}")
# Search employees
results = db.search_employees('Sarah')
print(f"\nSearch results: {len(results)} found")
Multi-Region Configuration
python
import psycopg2
connection = psycopg2.connect(
host='localhost',
port=26257,
database='company',
user='root',
sslmode='disable'
)
cursor = connection.cursor()
# Add regions to cluster
cursor.execute("""
ALTER DATABASE company CONFIGURE ZONE USING
num_replicas = 3,
constraints = '[+region=us-east, +region=us-west, +region=europe]'
""")
# Set table locality
cursor.execute("""
ALTER TABLE employees SET LOCALITY REGIONAL BY ROW
""")
connection.commit()
print("Multi-region configuration applied")
cursor.close()
connection.close()
Monitoring and Observability
python
import psycopg2
connection = psycopg2.connect(
host='localhost',
port=26257,
database='company',
user='root',
sslmode='disable'
)
cursor = connection.cursor()
# Check cluster health
cursor.execute("SHOW CLUSTER SETTING cluster.organization")
print(f"Cluster organization: {cursor.fetchone()[0]}")
# View active queries
cursor.execute("""
SELECT query_id, node_id, user_name, start, query
FROM crdb_internal.cluster_queries
WHERE query NOT LIKE '%crdb_internal%'
""")
print("\nActive Queries:")
for row in cursor.fetchall():
print(f"Query ID: {row[0]}, User: {row[2]}")
print(f"Query: {row[4][:80]}...")
# Check node status
cursor.execute("SELECT node_id, address, is_live FROM crdb_internal.gossip_nodes")
print("\nNode Status:")
for row in cursor.fetchall():
status = "Live" if row[2] else "Dead"
print(f"Node {row[0]} ({row[1]}): {status}")
cursor.close()
connection.close()
Best Practices
Follow these best practices when using CockroachDB with Python:
- Implement retry logic for transaction conflicts (error 40001)
- Use UUID primary keys for better distribution
- Design for distribution with appropriate partitioning
- Use batch operations when inserting multiple rows
- Create appropriate indexes for query optimization
- Monitor transaction contention and optimize hot spots
- Use connection pooling for better performance
- Set appropriate transaction isolation levels
- Leverage JSONB for flexible schemas
- Use REGIONAL tables for geo-distributed applications
- Implement proper error handling with exponential backoff
- Avoid sequential keys that create hotspots
- Use EXPLAIN ANALYZE to optimize queries
- Monitor cluster metrics regularly
- Plan for multi-region deployment from the start
Conclusion
Connecting Python to CockroachDB provides a powerful foundation for building globally distributed, resilient applications. CockroachDB's PostgreSQL compatibility makes it easy to get started, while its distributed architecture provides automatic scaling and high availability. By following the examples and best practices in this guide, you'll be able to build robust applications that leverage CockroachDB's unique capabilities including automatic failover, geo-partitioning, and strong consistency guarantees. Whether you're building cloud-native applications, multi-region services, or systems requiring high availability, mastering Python-CockroachDB connectivity is essential for modern distributed application development.