Celery Worker OpenTelemetry Setup
This document contains instructions on how to set up OpenTelemetry instrumentation in your Celery worker applications.
Background
Celery uses prefork worker model by default. In this model, it is important to ensure that the libraries which are not fork-safe create their own instances in the parent process and each child process. To achieve this, we use worker_process_init
hook and instantiate the OpenTelemetry SDK in each worker process. Add the following code to the app file of your Celery application.
from celery.signals import worker_process_init
from opentelemetry.instrumentation.celery import CeleryInstrumentor
from opentelemetry import metrics, trace
from opentelemetry.exporter.otlp.proto.grpc.metric_exporter import (
OTLPMetricExporter,
)
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import (
OTLPSpanExporter,
)
from opentelemetry.sdk.metrics import MeterProvider
from opentelemetry.sdk.metrics.export import PeriodicExportingMetricReader
from opentelemetry.sdk.resources import Resource
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
@worker_process_init.connect(weak=False)
def init_celery_tracing(*args, **kwargs):
CeleryInstrumentor().instrument()
resource = Resource.create({})
trace.set_tracer_provider(TracerProvider(resource=resource))
span_processor = BatchSpanProcessor(OTLPSpanExporter())
trace.get_tracer_provider().add_span_processor(span_processor)
reader = PeriodicExportingMetricReader(
OTLPMetricExporter()
)
metrics.set_meter_provider(
MeterProvider(
resource=resource,
metric_readers=[reader],
)
)
A brief explanation of the code above:
- Import the
worker_process_init
to connect to the Celery signal so that each worker process can initialize its own OpenTelemetry SDK. - Import the
CeleryInstrumentor
to instrument Celery. - Import the necessary components from the OpenTelemetry library to set up the tracer and meter providers.
- Set up the tracer and meter providers with processors and exporters.
📝 Note
This adjustment is required only for the worker processes, not for the producer.