How to Connect Python to Elasticsearch: Complete Guide
Elasticsearch is a powerful distributed search and analytics engine built on Apache Lucene. When combined with Python, it provides exceptional full-text search, real-time analytics, and log analysis capabilities. This comprehensive guide covers everything you need to know about connecting Python to Elasticsearch.
Why Use Elasticsearch with Python?
Elasticsearch with Python offers powerful capabilities:
- Lightning-fast full-text search across large datasets
- Real-time data indexing and searching
- Complex aggregations and analytics
- Distributed architecture for horizontal scaling
- Near real-time data availability
- RESTful API with comprehensive features
- Perfect for log analysis, search engines, and analytics
- Excellent support for geospatial queries
Prerequisites
Before connecting Python to Elasticsearch, ensure you have:
- Python 3.6 or higher installed
- Elasticsearch installed and running (version 7.x or 8.x)
- Basic understanding of JSON and REST APIs
- Elasticsearch connection details (host, port, credentials)
Installing the Elasticsearch Client
Install the official Elasticsearch Python client:
bash
pip install elasticsearch
For Elasticsearch 8.x with newer features:
bash
pip install elasticsearch>=8.0.0
Basic Connection to Elasticsearch
Let's start with a simple connection:
python
from elasticsearch import Elasticsearch
# Connect to local Elasticsearch
es = Elasticsearch(['http://localhost:9200'])
# Test connection
if es.ping():
print("Connected to Elasticsearch successfully!")
else:
print("Could not connect to Elasticsearch")
# Get cluster info
info = es.info()
print(f"Cluster name: {info['cluster_name']}")
print(f"Elasticsearch version: {info['version']['number']}")
Connection with Authentication
For secured Elasticsearch instances:
python
from elasticsearch import Elasticsearch
# Connect with basic authentication
es = Elasticsearch(
['https://localhost:9200'],
basic_auth=('username', 'password'),
verify_certs=True
)
print("Connected with authentication!")
# Alternative: API key authentication
es = Elasticsearch(
['https://localhost:9200'],
api_key=('id', 'api_key')
)
Connection with Error Handling
python
from elasticsearch import Elasticsearch
from elasticsearch.exceptions import ConnectionError, AuthenticationException
def create_es_connection():
try:
es = Elasticsearch(
['http://localhost:9200'],
request_timeout=30,
max_retries=3,
retry_on_timeout=True
)
if es.ping():
print("Successfully connected to Elasticsearch")
# Get cluster health
health = es.cluster.health()
print(f"Cluster status: {health['status']}")
return es
else:
print("Could not ping Elasticsearch")
return None
except ConnectionError as e:
print(f"Connection failed: {e}")
return None
except AuthenticationException as e:
print(f"Authentication failed: {e}")
return None
except Exception as e:
print(f"Error: {e}")
return None
# Usage
es = create_es_connection()
Creating an Index
Indexes in Elasticsearch are like databases:
python
from elasticsearch import Elasticsearch
es = Elasticsearch(['http://localhost:9200'])
# Create index with settings
index_body = {
"settings": {
"number_of_shards": 1,
"number_of_replicas": 1
},
"mappings": {
"properties": {
"employee_id": {"type": "integer"},
"first_name": {"type": "text"},
"last_name": {"type": "text"},
"email": {"type": "keyword"},
"department": {"type": "keyword"},
"salary": {"type": "float"},
"hire_date": {"type": "date"},
"description": {"type": "text"}
}
}
}
# Create the index
es.indices.create(index='employees', body=index_body, ignore=400)
print("Index created successfully!")
Indexing Documents
Index Single Document
python
from elasticsearch import Elasticsearch
from datetime import datetime
es = Elasticsearch(['http://localhost:9200'])
# Index a single document
doc = {
'employee_id': 1,
'first_name': 'John',
'last_name': 'Doe',
'email': 'john.doe@company.com',
'department': 'Engineering',
'salary': 75000.00,
'hire_date': datetime.now(),
'description': 'Senior software engineer with expertise in Python and distributed systems'
}
# Index with auto-generated ID
response = es.index(index='employees', document=doc)
print(f"Document indexed with ID: {response['_id']}")
# Index with specific ID
response = es.index(index='employees', id=1, document=doc)
print(f"Document indexed with ID: {response['_id']}")
Bulk Indexing
For better performance with multiple documents:
python
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
es = Elasticsearch(['http://localhost:9200'])
# Prepare bulk data
documents = [
{
'_index': 'employees',
'_id': 2,
'_source': {
'employee_id': 2,
'first_name': 'Jane',
'last_name': 'Smith',
'email': 'jane@company.com',
'department': 'Marketing',
'salary': 68000.00,
'description': 'Marketing specialist with 5 years of experience'
}
},
{
'_index': 'employees',
'_id': 3,
'_source': {
'employee_id': 3,
'first_name': 'Bob',
'last_name': 'Johnson',
'email': 'bob@company.com',
'department': 'Sales',
'salary': 72000.00,
'description': 'Sales executive focused on enterprise clients'
}
},
{
'_index': 'employees',
'_id': 4,
'_source': {
'employee_id': 4,
'first_name': 'Alice',
'last_name': 'Williams',
'email': 'alice@company.com',
'department': 'Engineering',
'salary': 80000.00,
'description': 'Full-stack developer specializing in React and Node.js'
}
}
]
# Bulk index
success, failed = bulk(es, documents)
print(f"Successfully indexed: {success} documents")
print(f"Failed: {failed} documents")
Searching Documents
Simple Search
python
from elasticsearch import Elasticsearch
es = Elasticsearch(['http://localhost:9200'])
# Search all documents
response = es.search(index='employees', body={"query": {"match_all": {}}})
print(f"Total hits: {response['hits']['total']['value']}\n")
for hit in response['hits']['hits']:
doc = hit['_source']
print(f"Name: {doc['first_name']} {doc['last_name']}, Dept: {doc['department']}")
Match Query
python
from elasticsearch import Elasticsearch
es = Elasticsearch(['http://localhost:9200'])
# Search with match query
query = {
"query": {
"match": {
"description": "Python engineer"
}
}
}
response = es.search(index='employees', body=query)
print("Search Results:")
for hit in response['hits']['hits']:
doc = hit['_source']
score = hit['_score']
print(f"Score: {score} - {doc['first_name']} {doc['last_name']}")
print(f"Description: {doc['description']}\n")
Term Query
For exact matches on keyword fields:
python
from elasticsearch import Elasticsearch
es = Elasticsearch(['http://localhost:9200'])
# Exact match on keyword field
query = {
"query": {
"term": {
"department": "Engineering"
}
}
}
response = es.search(index='employees', body=query)
print("Engineering Employees:")
for hit in response['hits']['hits']:
doc = hit['_source']
print(f"{doc['first_name']} {doc['last_name']} - ${doc['salary']}")
Range Query
python
from elasticsearch import Elasticsearch
es = Elasticsearch(['http://localhost:9200'])
# Find employees with salary in range
query = {
"query": {
"range": {
"salary": {
"gte": 70000,
"lte": 80000
}
}
}
}
response = es.search(index='employees', body=query)
print("Employees with salary between $70k-$80k:")
for hit in response['hits']['hits']:
doc = hit['_source']
print(f"{doc['first_name']} {doc['last_name']} - ${doc['salary']}")
Boolean Query
Combine multiple conditions:
python
from elasticsearch import Elasticsearch
es = Elasticsearch(['http://localhost:9200'])
# Complex boolean query
query = {
"query": {
"bool": {
"must": [
{"match": {"description": "engineer"}}
],
"filter": [
{"term": {"department": "Engineering"}},
{"range": {"salary": {"gte": 75000}}}
],
"should": [
{"match": {"description": "Python"}},
{"match": {"description": "Java"}}
],
"minimum_should_match": 1
}
}
}
response = es.search(index='employees', body=query)
print("Engineers matching criteria:")
for hit in response['hits']['hits']:
doc = hit['_source']
print(f"{doc['first_name']} {doc['last_name']} - ${doc['salary']}")
Full-Text Search with Highlighting
python
from elasticsearch import Elasticsearch
es = Elasticsearch(['http://localhost:9200'])
# Search with highlighting
query = {
"query": {
"match": {
"description": "software engineer"
}
},
"highlight": {
"fields": {
"description": {}
}
}
}
response = es.search(index='employees', body=query)
for hit in response['hits']['hits']:
doc = hit['_source']
print(f"{doc['first_name']} {doc['last_name']}")
if 'highlight' in hit:
print(f"Highlighted: {hit['highlight']['description'][0]}\n")
Updating Documents
python
from elasticsearch import Elasticsearch
es = Elasticsearch(['http://localhost:9200'])
# Update specific fields
update_body = {
"doc": {
"salary": 85000.00,
"department": "Senior Engineering"
}
}
response = es.update(index='employees', id=1, body=update_body)
print(f"Document updated: {response['result']}")
# Update with script
script_body = {
"script": {
"source": "ctx._source.salary += params.raise_amount",
"params": {
"raise_amount": 5000
}
}
}
response = es.update(index='employees', id=1, body=script_body)
print(f"Salary updated via script: {response['result']}")
Deleting Documents
python
from elasticsearch import Elasticsearch
es = Elasticsearch(['http://localhost:9200'])
# Delete by ID
response = es.delete(index='employees', id=10)
print(f"Document deleted: {response['result']}")
# Delete by query
delete_query = {
"query": {
"term": {
"department": "Sales"
}
}
}
response = es.delete_by_query(index='employees', body=delete_query)
print(f"Deleted {response['deleted']} documents")
Aggregations
Elasticsearch provides powerful aggregation capabilities:
python
from elasticsearch import Elasticsearch
es = Elasticsearch(['http://localhost:9200'])
# Statistics aggregation
agg_body = {
"size": 0,
"aggs": {
"by_department": {
"terms": {
"field": "department"
},
"aggs": {
"avg_salary": {
"avg": {"field": "salary"}
},
"max_salary": {
"max": {"field": "salary"}
},
"min_salary": {
"min": {"field": "salary"}
}
}
},
"overall_stats": {
"stats": {
"field": "salary"
}
}
}
}
response = es.search(index='employees', body=agg_body)
# Process aggregation results
print("Department Statistics:")
for bucket in response['aggregations']['by_department']['buckets']:
dept = bucket['key']
count = bucket['doc_count']
avg_sal = bucket['avg_salary']['value']
max_sal = bucket['max_salary']['value']
min_sal = bucket['min_salary']['value']
print(f"\n{dept}:")
print(f" Employees: {count}")
print(f" Avg Salary: ${avg_sal:.2f}")
print(f" Salary Range: ${min_sal:.2f} - ${max_sal:.2f}")
# Overall statistics
stats = response['aggregations']['overall_stats']
print(f"\nOverall Statistics:")
print(f" Total Employees: {stats['count']}")
print(f" Average Salary: ${stats['avg']:.2f}")
Pagination
python
from elasticsearch import Elasticsearch
es = Elasticsearch(['http://localhost:9200'])
# Pagination using from and size
page_size = 10
page_number = 0
query = {
"query": {"match_all": {}},
"from": page_number * page_size,
"size": page_size,
"sort": [
{"last_name.keyword": "asc"}
]
}
response = es.search(index='employees', body=query)
print(f"Page {page_number + 1}:")
for hit in response['hits']['hits']:
doc = hit['_source']
print(f"{doc['last_name']}, {doc['first_name']}")
# Scroll API for large result sets
query = {
"query": {"match_all": {}},
"size": 100
}
# Initial scroll request
response = es.search(index='employees', body=query, scroll='2m')
scroll_id = response['_scroll_id']
hits = response['hits']['hits']
# Continue scrolling
while len(hits) > 0:
# Process current batch
for hit in hits:
doc = hit['_source']
print(f"{doc['first_name']} {doc['last_name']}")
# Get next batch
response = es.scroll(scroll_id=scroll_id, scroll='2m')
scroll_id = response['_scroll_id']
hits = response['hits']['hits']
# Clear scroll
es.clear_scroll(scroll_id=scroll_id)
Complete Application Example
python
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
from datetime import datetime
class EmployeeSearch:
def __init__(self, host='localhost', port=9200):
self.es = Elasticsearch([f'http://{host}:{port}'])
self.index_name = 'employees'
self.create_index()
def create_index(self):
if not self.es.indices.exists(index=self.index_name):
mapping = {
"settings": {
"number_of_shards": 1,
"number_of_replicas": 1
},
"mappings": {
"properties": {
"employee_id": {"type": "integer"},
"first_name": {"type": "text"},
"last_name": {"type": "text"},
"email": {"type": "keyword"},
"department": {"type": "keyword"},
"salary": {"type": "float"},
"hire_date": {"type": "date"},
"skills": {"type": "keyword"},
"description": {"type": "text"}
}
}
}
self.es.indices.create(index=self.index_name, body=mapping)
print(f"Index '{self.index_name}' created")
def add_employee(self, employee_data):
response = self.es.index(
index=self.index_name,
document=employee_data
)
return response['_id']
def search_by_name(self, name):
query = {
"query": {
"multi_match": {
"query": name,
"fields": ["first_name", "last_name"]
}
}
}
response = self.es.search(index=self.index_name, body=query)
return [hit['_source'] for hit in response['hits']['hits']]
def search_by_department(self, department):
query = {
"query": {
"term": {
"department": department
}
}
}
response = self.es.search(index=self.index_name, body=query)
return [hit['_source'] for hit in response['hits']['hits']]
def search_by_skills(self, skills):
query = {
"query": {
"terms": {
"skills": skills
}
}
}
response = self.es.search(index=self.index_name, body=query)
return [hit['_source'] for hit in response['hits']['hits']]
def full_text_search(self, text):
query = {
"query": {
"match": {
"description": {
"query": text,
"fuzziness": "AUTO"
}
}
}
}
response = self.es.search(index=self.index_name, body=query)
return [hit['_source'] for hit in response['hits']['hits']]
def get_department_stats(self):
agg_body = {
"size": 0,
"aggs": {
"departments": {
"terms": {"field": "department"},
"aggs": {
"avg_salary": {"avg": {"field": "salary"}}
}
}
}
}
response = self.es.search(index=self.index_name, body=agg_body)
return response['aggregations']['departments']['buckets']
# Usage
search_engine = EmployeeSearch()
# Add employees
emp_id = search_engine.add_employee({
'employee_id': 1,
'first_name': 'Sarah',
'last_name': 'Connor',
'email': 'sarah@company.com',
'department': 'Security',
'salary': 78000,
'hire_date': datetime.now(),
'skills': ['Security', 'Risk Management', 'Compliance'],
'description': 'Security specialist with expertise in threat assessment'
})
print(f"Employee added with ID: {emp_id}")
# Search by name
results = search_engine.search_by_name('Sarah')
print(f"Found {len(results)} employees named Sarah")
# Department stats
stats = search_engine.get_department_stats()
for bucket in stats:
print(f"{bucket['key']}: {bucket['doc_count']} employees, "
f"Avg Salary: ${bucket['avg_salary']['value']:.2f}")
Best Practices
Follow these best practices when using Elasticsearch with Python:
- Use bulk operations for indexing multiple documents
- Define appropriate mappings before indexing data
- Use keyword type for exact match