diff --git a/CHANGELOG.md b/CHANGELOG.md index 5f80b18eaa..423c191b48 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ([#1889](/~https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1889)) - Fixed union typing error not compatible with Python 3.7 introduced in `opentelemetry-util-http`, fix tests introduced by patch related to sanitize method for wsgi ([#1913](/~https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1913)) +- `opentelemetry-instrumentation-celery` Unwrap Celery's `ExceptionInfo` errors and report the actual exception that was raised. ([#1863](/~https://github.com/open-telemetry/opentelemetry-python-contrib/pull/1863)) ### Added diff --git a/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py b/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py index a216765fb0..bb83a5c192 100644 --- a/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py +++ b/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/__init__.py @@ -63,6 +63,7 @@ def add(x, y): from timeit import default_timer from typing import Collection, Iterable +from billiard.einfo import ExceptionInfo from celery import signals # pylint: disable=no-name-in-module from opentelemetry import trace @@ -75,6 +76,13 @@ def add(x, y): from opentelemetry.propagators.textmap import Getter from opentelemetry.semconv.trace import SpanAttributes from opentelemetry.trace.status import Status, StatusCode +from billiard import VERSION + + +if VERSION >= (4, 0, 1): + from billiard.einfo import ExceptionWithTraceback +else: + ExceptionWithTraceback = None logger = logging.getLogger(__name__) @@ -271,6 +279,18 @@ def _trace_failure(*args, **kwargs): return if ex is not None: + # Unwrap the actual exception wrapped by billiard's + # `ExceptionInfo` and `ExceptionWithTraceback`. + if isinstance(ex, ExceptionInfo) and ex.exception is not None: + ex = ex.exception + + if ( + ExceptionWithTraceback is not None + and isinstance(ex, ExceptionWithTraceback) + and ex.exc is not None + ): + ex = ex.exc + status_kwargs["description"] = str(ex) span.record_exception(ex) span.set_status(Status(**status_kwargs)) diff --git a/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/utils.py b/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/utils.py index 6f4f9cbc3a..f92c5e03c8 100644 --- a/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/utils.py +++ b/instrumentation/opentelemetry-instrumentation-celery/src/opentelemetry/instrumentation/celery/utils.py @@ -15,6 +15,7 @@ import logging from celery import registry # pylint: disable=no-name-in-module +from billiard import VERSION from opentelemetry.semconv.trace import SpanAttributes diff --git a/instrumentation/opentelemetry-instrumentation-celery/tests/celery_test_tasks.py b/instrumentation/opentelemetry-instrumentation-celery/tests/celery_test_tasks.py index d9660412f0..9ac78f6d8b 100644 --- a/instrumentation/opentelemetry-instrumentation-celery/tests/celery_test_tasks.py +++ b/instrumentation/opentelemetry-instrumentation-celery/tests/celery_test_tasks.py @@ -24,6 +24,15 @@ class Config: app.config_from_object(Config) +class CustomError(Exception): + pass + + @app.task def task_add(num_a, num_b): return num_a + num_b + + +@app.task +def task_raises(): + raise CustomError("The task failed!") diff --git a/instrumentation/opentelemetry-instrumentation-celery/tests/test_tasks.py b/instrumentation/opentelemetry-instrumentation-celery/tests/test_tasks.py index 47f79d7e1c..ed4dbb5b1d 100644 --- a/instrumentation/opentelemetry-instrumentation-celery/tests/test_tasks.py +++ b/instrumentation/opentelemetry-instrumentation-celery/tests/test_tasks.py @@ -18,9 +18,9 @@ from opentelemetry.instrumentation.celery import CeleryInstrumentor from opentelemetry.semconv.trace import SpanAttributes from opentelemetry.test.test_base import TestBase -from opentelemetry.trace import SpanKind +from opentelemetry.trace import SpanKind, StatusCode -from .celery_test_tasks import app, task_add +from .celery_test_tasks import app, task_add, task_raises class TestCeleryInstrumentation(TestBase): @@ -66,6 +66,10 @@ def test_task(self): }, ) + self.assertEqual(consumer.status.status_code, StatusCode.UNSET) + + self.assertEqual(0, len(consumer.events)) + self.assertEqual( producer.name, "apply_async/tests.celery_test_tasks.task_add" ) @@ -84,6 +88,70 @@ def test_task(self): self.assertEqual(consumer.parent.span_id, producer.context.span_id) self.assertEqual(consumer.context.trace_id, producer.context.trace_id) + def test_task_raises(self): + CeleryInstrumentor().instrument() + + result = task_raises.delay() + + timeout = time.time() + 60 * 1 # 1 minutes from now + while not result.ready(): + if time.time() > timeout: + break + time.sleep(0.05) + + spans = self.sorted_spans(self.memory_exporter.get_finished_spans()) + self.assertEqual(len(spans), 2) + + consumer, producer = spans + + self.assertEqual( + consumer.name, "run/tests.celery_test_tasks.task_raises" + ) + self.assertEqual(consumer.kind, SpanKind.CONSUMER) + self.assertSpanHasAttributes( + consumer, + { + "celery.action": "run", + "celery.state": "FAILURE", + SpanAttributes.MESSAGING_DESTINATION: "celery", + "celery.task_name": "tests.celery_test_tasks.task_raises", + }, + ) + + self.assertEqual(consumer.status.status_code, StatusCode.ERROR) + + self.assertEqual(1, len(consumer.events)) + event = consumer.events[0] + + self.assertIn(SpanAttributes.EXCEPTION_STACKTRACE, event.attributes) + + self.assertEqual( + event.attributes[SpanAttributes.EXCEPTION_TYPE], "CustomError" + ) + + self.assertEqual( + event.attributes[SpanAttributes.EXCEPTION_MESSAGE], + "The task failed!", + ) + + self.assertEqual( + producer.name, "apply_async/tests.celery_test_tasks.task_raises" + ) + self.assertEqual(producer.kind, SpanKind.PRODUCER) + self.assertSpanHasAttributes( + producer, + { + "celery.action": "apply_async", + "celery.task_name": "tests.celery_test_tasks.task_raises", + SpanAttributes.MESSAGING_DESTINATION_KIND: "queue", + SpanAttributes.MESSAGING_DESTINATION: "celery", + }, + ) + + self.assertNotEqual(consumer.parent, producer.context) + self.assertEqual(consumer.parent.span_id, producer.context.span_id) + self.assertEqual(consumer.context.trace_id, producer.context.trace_id) + def test_uninstrument(self): CeleryInstrumentor().instrument() CeleryInstrumentor().uninstrument() diff --git a/tests/opentelemetry-docker-tests/tests/celery/test_celery_functional.py b/tests/opentelemetry-docker-tests/tests/celery/test_celery_functional.py index f284272c5d..0b9cb3dac5 100644 --- a/tests/opentelemetry-docker-tests/tests/celery/test_celery_functional.py +++ b/tests/opentelemetry-docker-tests/tests/celery/test_celery_functional.py @@ -279,7 +279,7 @@ def fn_exception(): assert len(span.events) == 1 event = span.events[0] assert event.name == "exception" - assert event.attributes[SpanAttributes.EXCEPTION_TYPE] == "ExceptionInfo" + assert event.attributes[SpanAttributes.EXCEPTION_TYPE] == "Exception" assert SpanAttributes.EXCEPTION_MESSAGE in event.attributes assert ( span.attributes.get(SpanAttributes.MESSAGING_MESSAGE_ID) @@ -420,7 +420,7 @@ def run(self): assert "Task class is failing" in span.status.description -def test_class_task_exception_excepted(celery_app, memory_exporter): +def test_class_task_exception_expected(celery_app, memory_exporter): class BaseTask(celery_app.Task): throws = (MyException,)