Elasticsearch Integration
Deploying an Elasticsearch Integration for Apache JanusGraph requires strict alignment between the graph transaction log and the external search index. In production clusters, the Apache JanusGraph Storage Backend & Index Synchronization layer operates as a dual-write architecture. Graph mutations commit to the primary storage backend first, then propagate asynchronously to Elasticsearch via the mixed index subsystem. Misaligned refresh intervals, improper client routing, or unbounded bulk queues will manifest as index lag, stale query results, or transaction timeouts. This guide covers production-grade configuration, pipeline orchestration, and consistency tuning to maintain operational reliability under sustained write throughput.
The diagram below shows the dispatch path from a graph mutation to a searchable document, including the bulk retry loop.
flowchart LR
M["Graph mutation"] --> Q["Index mutation queue"]
Q --> B["Bulk request builder"]
B -->|"_bulk"| ES["Elasticsearch"]
ES -->|"429 / 5xx"| RT["Retry with backoff"]
RT --> B
ES -->|"refresh"| S["Searchable"]
Core Configuration & Client Routing
JanusGraph delegates mixed index operations through a dedicated configuration namespace. The legacy embedded transport client is deprecated for modern deployments due to classpath isolation conflicts and JVM heap pressure. Production systems must use the Elasticsearch Java REST client with explicit connection pooling and retry semantics.
# janusgraph.properties
storage.backend=cql
storage.hostname=10.0.1.10,10.0.1.11,10.0.1.12
storage.cql.keyspace=graph_prod
index.search.backend=elasticsearch
index.search.hostname=es-cluster-01.internal:9200,es-cluster-02.internal:9200,es-cluster-03.internal:9200
index.search.elasticsearch.client-only=false
index.search.elasticsearch.ext.http.port=9200
index.search.elasticsearch.ext.cluster.name=prod-es-graph
# Consistency & Performance Tuning
index.search.elasticsearch.ext.refresh_interval=1s
index.search.elasticsearch.ext.number_of_shards=3
index.search.elasticsearch.ext.number_of_replicas=1
index.search.elasticsearch.bulk-refresh=wait_for
index.search.elasticsearch.ext.mapping.total_fields.limit=5000
index.search.elasticsearch.ext.action.bulk.concurrent_requests=5
Setting client-only=false forces JanusGraph to instantiate the Elasticsearch Java REST client, isolating the graph JVM from index node heap contention. The bulk-refresh=wait_for directive ensures bulk indexing requests block until the primary shard acknowledges the write, providing near-real-time visibility without sacrificing throughput. For clusters requiring strict cross-datacenter parity or alternative routing logic, review Mixed Index Routing to align shard assignment with your graph partitioning strategy.
Consistency Models & Transaction Propagation
JanusGraph relies on eventual consistency by default for mixed indexes. The wait_for refresh policy bridges the gap between transaction commit and search visibility without blocking the primary write path. True strong consistency requires synchronous index updates, which degrades throughput and increases tail latency. For cross-datacenter deployments or strict read-after-write guarantees, review External Index Synchronization & Consistency Tuning to align replication factors with your SLA.
Teams migrating from legacy search stacks or evaluating OpenSearch Sync Patterns should note that Lucene segment merging and translog flushing behave identically across both engines, but index lifecycle management differs. Refer to the official Elasticsearch Bulk API documentation for payload limits and error response formats when designing retry boundaries.
Pipeline Orchestration & Bulk Indexing
Direct JanusGraph-to-Elasticsearch writes suffice for low-throughput workloads. Production pipelines require explicit backpressure handling, idempotent document generation, and failure isolation. A Python-based sync worker consuming graph mutation logs (CDC or transaction journal) must implement exponential backoff, chunked bulk submission, and deterministic routing keys.
import logging
from elasticsearch import Elasticsearch, helpers
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception_type
logger = logging.getLogger("janusgraph_sync")
ES_CLIENT = Elasticsearch(
hosts=["https://es-cluster-01.internal:9200"],
api_key=("YOUR_API_KEY_ID", "YOUR_API_KEY_SECRET"),
retry_on_timeout=True,
max_retries=3,
verify_certs=True
)
CHUNK_SIZE = 500
MAX_QUEUE_DEPTH = 10000
@retry(
stop=stop_after_attempt(4),
wait=wait_exponential(multiplier=1, min=2, max=30),
retry=retry_if_exception_type((ConnectionError, TimeoutError, helpers.BulkIndexError))
)
def push_to_index(doc_batch: list[dict]) -> bool:
try:
success, errors = helpers.bulk(
ES_CLIENT,
doc_batch,
chunk_size=CHUNK_SIZE,
raise_on_error=True,
raise_on_exception=True,
refresh="wait_for"
)
logger.info(f"Indexed {success} documents successfully")
return True
except helpers.BulkIndexError as e:
logger.error(f"Bulk indexing failed: {e.errors}")
raise
except Exception as e:
logger.error(f"Unexpected transport error: {e}")
raise
def process_mutation_stream(mutation_queue: list[dict]):
batch = []
for mutation in mutation_queue:
doc_id = f"{mutation['graph_id']}_{mutation['element_id']}"
batch.append({
"_op_type": "index",
"_index": "janusgraph_mixed",
"_id": doc_id,
"_routing": mutation["partition_key"],
"_source": mutation["payload"]
})
if len(batch) >= CHUNK_SIZE:
push_to_index(batch)
batch.clear()
if batch:
push_to_index(batch)
The implementation above enforces idempotency via deterministic document IDs derived from graph vertex/edge identifiers. Chunked submission prevents heap exhaustion, while the retry decorator handles transient 429 and 5xx responses. For complete deployment workflows, reference Syncing JanusGraph with Elasticsearch Step by Step.
Operational Guardrails & Failure Modes
- Monitor index lag via
_statsand transaction log offsets. Alert whenindex.search.elasticsearch.ext.refresh_intervalexceeds 30 seconds. - Implement circuit breakers for bulk queue saturation. Drop or queue mutations when
thread_pool.write.queue_sizehits 90% capacity. - Handle split-brain scenarios with deterministic conflict resolution. Use
version_type=externalto enforce graph-side ordering. - Tune
indices.memory.index_buffer_sizeto 15-20% of node heap for high-write workloads. - Validate schema drift by enforcing
dynamic: stricton mixed index mappings. Reject unregistered property keys at ingestion. - Archive stale transaction logs after successful sync. Retain logs for a minimum of 24 hours to support point-in-time recovery.