Orchestrator
The Orchestrator Service acts as the control plane for the Logwise system, managing critical operations including metadata synchronization, log sync delay tracking, Spark job monitoring, and Spark cluster scaling. Currently supports AWS for component sync and delay metrics.
Architecture in Logwise
Vector → Kafka → Spark Jobs → Object Storage (S3)
↑ ↑
| |
Orchestrator Service (Manages Discovery, Sync & Monitoring)The Orchestrator Service enables:
- Metadata Synchronization - Syncs service metadata between object storage and database
- Log Sync Delay Tracking - Monitors delay between log generation and storage
- Spark Job Monitoring - Monitors and auto-submits Spark jobs for log processing
- Spark Cluster Scaling - Scales Spark workers up and down based on workload patterns
Key Features
1. Metadata Management
- Service Discovery: Automatically discovers services from object storage partitions (AWS only)
- Database Sync: Syncs service metadata between S3 and database
- Auto-Onboarding: Automatically onboards new services to database
- Cleanup: Removes services that no longer exist in object storage
2. Spark Job Monitoring
- Auto-Monitoring: Monitors Spark driver status periodically
- Auto-Submission: Automatically submits Spark jobs when driver is not running
- State Management: Handles Spark state cleanup and recovery scenarios
- Timestamp-based Processing: Supports submitting jobs with specific Kafka offsets for historical processing
- Fault Tolerance: Automatically recovers from Spark driver failures
4. Spark Cluster Scaling
- Intelligent Auto-Scaling: Automatically scales Spark workers based on workload patterns and stage history
- Upscaling & Downscaling: Supports both scale-up and scale-down operations with configurable thresholds
- Scale Override: Provides API to enable/disable upscaling and downscaling independently per tenant
- Workload-Based Calculation: Calculates optimal worker count based on input records, processing capacity, and growth patterns
- Multi-Platform Support: Works with both Kubernetes deployments and AWS Auto Scaling Groups
- Safety Mechanisms: Includes WAL file checks, proportional downscaling limits, and minimum threshold enforcement
3. Log Sync Delay Metrics
- Delay Calculation: Computes delay between log generation and storage in S3
- AWS Only: Currently supports AWS (S3) for delay calculation
- Application Logs: Tracks delay for application logs only
- Real-time Monitoring: Provides real-time delay metrics via REST API
Metadata Management
Service Discovery
The Orchestrator automatically discovers services by scanning object storage partitions. Services are identified by their path structure:
Format: logs/service_name={service}/
Example: logs/service_name=api-service/
Process:
- Scans S3 prefixes to discover all unique service combinations
- Compares discovered services with database entries
- Onboards new services to database
- Removes services that no longer exist in object storage
Supported Platforms:
- AWS: Discovers services from S3 partitions (only AWS is currently supported)
Spark Job Monitoring
Monitoring Process
The Orchestrator monitors Spark jobs through API calls:
Monitoring Cycle:
- Polls Spark Master API every 15 seconds
- Checks driver status (RUNNING, FINISHED, FAILED, etc.)
- Monitors for up to 60 seconds (4 polls total)
- Stops monitoring once job is submitted
Decision Logic:
if (driver is not running) {
submit job immediately
} else if (pending timestamp-based submission exists) {
clean Spark state
submit job with specific timestamp
} else if (resume to subscribe pattern needed) {
clean Spark state
submit job with resume configuration
}Spark Job Submission
Configuration:
- Kafka Configuration: Broker hosts, topic patterns, starting offsets (passed to Spark jobs - orchestrator doesn't manage Kafka directly)
- Spark Resources: Driver/executor cores, memory allocation
- Processing Parameters: Max rate per partition, subscribe patterns
- Cloud Storage: S3 paths for checkpoints and logs
Job Lifecycle:
- Validates Spark Master connectivity
- Cleans previous state (checkpoints, WAL files) if needed
- Submits job to Spark Master REST API
- Updates submission status in database
Spark State Management
State Cleanup:
- Deletes checkpoint files from object storage
- Removes WAL (Write-Ahead Log) files
- Cleans Spark metadata files
- Required before timestamp-based job submissions
Spark Scaling
The Orchestrator provides auto-scaling capabilities for Spark clusters, dynamically adjusting worker nodes based on workload patterns.
How It Works
Scaling decisions are based on Spark stage history, WAL file status, and current worker count:
- Calculates expected workers from stage history:
ceil((maxInputRecords + buffer) / perCoreLogsProcess) / executorCoresPerMachine - Compares expected vs actual workers
- Scales if difference exceeds thresholds (min upscale: 1, min downscale: 2)
- Supports Kubernetes deployments and AWS Auto Scaling Groups
Safety: WAL file checks prevent scaling during critical operations. Downscaling limited to 25% of current workers.
Spark Scale Override API
POST /update-spark-scale-override - Enable/disable upscaling or downscaling per tenant
Headers: X-Tenant-Name (required)
Request Body:
{
"enableUpScale": true, // null = no override
"enableDownScale": false // null = no override
}Override persists in database and takes precedence over API-level flags.
Examples:
- Enable upscaling only:
{"enableUpScale": true, "enableDownScale": false} - Disable all scaling:
{"enableUpScale": false, "enableDownScale": false} - Clear override:
{"enableUpScale": null, "enableDownScale": null}
Configuration: Controlled via tenantConfig.spark (minWorkerCount, maxWorkerCount, perCoreLogsProcess, executorCoresPerMachine)
Metrics & Monitoring
Log Sync Delay Metrics
The Orchestrator computes and reports log processing delays:
Application Logs Delay (AWS Only):
- Checks S3 object timestamps in partitioned paths
- Scans S3 prefixes for configured service logs
- Extracts timestamp from object path:
hour={HH}/minute={MM}/ - Calculates delay in minutes:
currentTime - latestLogTime - Searches up to 3 hours back (MAX_LOGS_SYNC_DELAY_HOURS)
Metrics Response:
- Returns
LogSyncDelayResponsewith:tenant: Tenant nameappLogsDelayMinutes: Delay in minutes for application logs
- Returns max delay (180 minutes) when no logs found (indicates potential issues)
API Endpoints
Component Management
POST /api/v1/component/sync- Sync application services (AWS only)- Headers:
X-Tenant-Name(required) - Request Body:
ComponentSyncRequestwithcomponentType(must be "application") - Response:
DefaultSuccessResponsewith sync status message
- Headers:
Spark Management
POST /monitor-spark-job- Monitor and submit Spark jobs- Headers:
X-Tenant-Name(required) - Request Body:
MonitorSparkJobRequest(optional driverCores, driverMemoryInGb) - Response:
DefaultSuccessResponsewith success message
- Headers:
POST /update-spark-scale-override- Update Spark scaling override configuration- Headers:
X-Tenant-Name(required) - Request Body:
UpdateSparkScaleOverrideRequestwith optionalenableUpScaleandenableDownScale(Boolean, null to clear override) - Response:
DefaultSuccessResponsewith success message - Description: Sets persistent override flags for enabling/disabling upscaling and downscaling. Override takes precedence over API-level flags in scaling operations.
- Headers:
POST /scale-spark-cluster- Trigger Spark cluster scaling operation- Headers:
X-Tenant-Name(required) - Request Body:
ScaleSparkClusterRequestwithenableUpScale,enableDownScale(Boolean, default: true), andsparkStageHistory(required) - Response:
DefaultSuccessResponsewith success message - Timeout: 60 seconds
- Description: Performs immediate scaling operation based on provided stage history. Respects scale override settings if configured.
- Headers:
Metrics
GET /api/v1/metric/sync-delay- Get log sync delay metrics (AWS only)- Headers:
X-Tenant-Name(required) - Response:
LogSyncDelayResponsewith:tenant: Tenant nameappLogsDelayMinutes: Delay in minutes (null if error)
- Headers:
Health Check
GET /healthcheck- Application health status- Response: Health check status including MySQL connectivity
GET /pipeline/health- Complete pipeline health check- Headers:
X-Tenant-Name(required) - Response:
JsonObjectwith:status: Overall pipeline status (UP/DOWN)message: Overall health messagetenant: Tenant namechecks: Array of component health checks (vector, kafka, spark, s3)- Each check contains
componentname andcheckobject with:status: Component status (UP/DOWN/WARNING)message: Component-specific health message- Additional component-specific details (e.g., responseCode, drivers, recentObjects)
- Each check contains
- Headers:
Configuration
The Orchestrator uses HOCON configuration files with environment variable substitution:
Key Configuration Sections:
- Tenants: Multi-tenant configuration with isolated settings
- Kafka: Broker hosts, manager URLs, rate limits (used to configure Spark jobs - orchestrator doesn't manage Kafka)
- Spark: Master hosts, resource allocation, job configuration
- Object Storage: S3 buckets, regions, role ARNs
- Delay Metrics: Sample service configuration (serviceName) for delay calculation
Environment Variables:
- Database credentials
- AWS credentials
- Kafka broker hosts
- Spark master hosts
Technology Stack
- Framework: Vert.x (reactive Java framework)
- Language: Java 11
- Dependency Injection: Google Guice
- Database: MySQL
- Cloud Support: AWS (S3) - primary support
- Build Tool: Maven
- Reactive Programming: RxJava
Requirements and Setup
See the Orchestrator Setup Guide.
