Skip to content

Commit

Permalink
fix(exports): tasks stuck in processing TASK-1243 (#5436)
Browse files Browse the repository at this point in the history
### 📣 Summary
Fixes occasional problem where tasks end up showing as 'Processing'
forever.


### 💭 Notes
Tasks were failing when the celery task started too quickly and tried to
fetch the task before it had actually hit the database. This PR
implements 2 guards against this:
1. Call the tasks from within an on_commit block to ensure they only
start after the task object has made it to the database
2. Automatically catch ObjectDoesNotExist exceptions and retry the task
a hardcoded number of times. `task.retry` takes care of putting some
time in between retries



### 👀 Preview steps
This bug tends to only appear in low traffic when the celery queue is
empty. Make sure `CELERY_TASK_ALWAYS_EAGER` is false.

1. ℹ️ have an account and a project
3. Export the project submissions
4. 🔴 [on main] The job will show as 'Processing' indefinitely. In the
worker logs, you'll see
`kpi.models.import_export_task.SubmissionExportTask.DoesNotExist:
SubmissionExportTask matching query does not exist`
5. 🟢 [on PR] The job will only show as 'Processing' for a few seconds
and then will complete.
  • Loading branch information
rgraber authored Jan 28, 2025
1 parent 6ca7f16 commit aa5a20c
Show file tree
Hide file tree
Showing 8 changed files with 77 additions and 45 deletions.
35 changes: 21 additions & 14 deletions kobo/apps/audit_log/views.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from django.db import transaction
from rest_framework import mixins, status, viewsets
from rest_framework.decorators import action
from rest_framework.renderers import BrowsableAPIRenderer, JSONRenderer
Expand Down Expand Up @@ -618,12 +619,14 @@ def export(self, request, *args, **kwargs):
'type': 'project_history_logs_export',
},
)

export_task_in_background.delay(
export_task_uid=export_task.uid,
username=export_task.user.username,
export_task_name='kpi.ProjectHistoryLogExportTask',
transaction.on_commit(
lambda: export_task_in_background.delay(
export_task_uid=export_task.uid,
username=export_task.user.username,
export_task_name='kpi.ProjectHistoryLogExportTask',
)
)

return Response(
{f'status: {export_task.status}'},
status=status.HTTP_202_ACCEPTED,
Expand Down Expand Up @@ -961,10 +964,12 @@ def export(self, request, *args, **kwargs):
},
)

export_task_in_background.delay(
export_task_uid=export_task.uid,
username=export_task.user.username,
export_task_name='kpi.ProjectHistoryLogExportTask',
transaction.on_commit(
lambda: export_task_in_background.delay(
export_task_uid=export_task.uid,
username=export_task.user.username,
export_task_name='kpi.ProjectHistoryLogExportTask',
)
)
return Response(
{f'status: {export_task.status}'},
Expand All @@ -985,12 +990,14 @@ def create_task(self, request, get_all_logs):
'type': 'access_logs_export',
},
)

export_task_in_background.delay(
export_task_uid=export_task.uid,
username=export_task.user.username,
export_task_name='kpi.AccessLogExportTask',
transaction.on_commit(
lambda: export_task_in_background.delay(
export_task_uid=export_task.uid,
username=export_task.user.username,
export_task_name='kpi.AccessLogExportTask',
)
)

return Response(
{f'status: {export_task.status}'},
status=status.HTTP_202_ACCEPTED,
Expand Down
23 changes: 10 additions & 13 deletions kobo/apps/project_views/views.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from typing import Union, Optional
from typing import Optional, Union

from django.conf import settings
from django.db import transaction
from django.db.models.query import QuerySet
from django.http import Http404
from rest_framework import viewsets
Expand All @@ -9,10 +10,7 @@

from kobo.apps.kobo_auth.shortcuts import User
from kpi.constants import ASSET_TYPE_SURVEY
from kpi.filters import (
AssetOrderingFilter,
SearchFilter,
)
from kpi.filters import AssetOrderingFilter, SearchFilter
from kpi.mixins.asset import AssetViewSetListMixin
from kpi.mixins.object_permission import ObjectPermissionViewSetMixin
from kpi.models import Asset, ProjectViewExportTask
Expand All @@ -22,10 +20,7 @@
from kpi.serializers.v2.user import UserListSerializer
from kpi.tasks import export_task_in_background
from kpi.utils.object_permission import get_database_user
from kpi.utils.project_views import (
get_region_for_view,
user_has_view_perms,
)
from kpi.utils.project_views import get_region_for_view, user_has_view_perms
from .models.project_view import ProjectView
from .serializers import ProjectViewSerializer

Expand Down Expand Up @@ -110,10 +105,12 @@ def export(self, request, uid, obj_type):
)

# Have Celery run the export in the background
export_task_in_background.delay(
export_task_uid=export_task.uid,
username=user.username,
export_task_name='kpi.ProjectViewExportTask',
transaction.on_commit(
lambda: export_task_in_background.delay(
export_task_uid=export_task.uid,
username=user.username,
export_task_name='kpi.ProjectViewExportTask',
)
)

return Response({'status': export_task.status})
Expand Down
2 changes: 2 additions & 0 deletions kobo/settings/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -872,6 +872,8 @@ def __init__(self, *args, **kwargs):
# REMOVE the oldest if a user exceeds this many exports for a particular form
MAXIMUM_EXPORTS_PER_USER_PER_FORM = 10

MAX_RETRIES_FOR_IMPORT_EXPORT_TASK = 10

# Private media file configuration
PRIVATE_STORAGE_ROOT = os.path.join(BASE_DIR, 'media')
PRIVATE_STORAGE_AUTH_FUNCTION = \
Expand Down
15 changes: 9 additions & 6 deletions kpi/serializers/v2/export_task.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,8 @@
# coding: utf-8
from typing import Optional

from django.db import transaction
from django.utils.translation import gettext as t
from rest_framework import serializers
from rest_framework.request import Request
from rest_framework.reverse import reverse
from formpack.constants import (
EXPORT_SETTING_FIELDS,
EXPORT_SETTING_FIELDS_FROM_ALL_VERSIONS,
Expand All @@ -26,9 +24,12 @@
VALID_EXPORT_TYPES,
VALID_MULTIPLE_SELECTS,
)
from rest_framework import serializers
from rest_framework.request import Request
from rest_framework.reverse import reverse

from kpi.fields import ReadOnlyJSONField
from kpi.models import SubmissionExportTask, Asset
from kpi.models import Asset, SubmissionExportTask
from kpi.tasks import export_in_background
from kpi.utils.export_task import format_exception_values
from kpi.utils.object_permission import get_database_user
Expand Down Expand Up @@ -65,7 +66,9 @@ def create(self, validated_data: dict) -> SubmissionExportTask:
user=user, data=validated_data
)
# Have Celery run the export in the background
export_in_background.delay(export_task_uid=export_task.uid)
transaction.on_commit(
lambda: export_in_background.delay(export_task_uid=export_task.uid)
)

return export_task

Expand Down Expand Up @@ -152,7 +155,7 @@ def validate_fields(self, data: dict) -> list:
{EXPORT_SETTING_FIELDS: t('Must be an array')}
)

if not all((isinstance(field, str) for field in fields)):
if not all(isinstance(field, str) for field in fields):
raise serializers.ValidationError(
{
EXPORT_SETTING_FIELDS: t(
Expand Down
20 changes: 17 additions & 3 deletions kpi/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from django.apps import apps
from django.conf import settings
from django.core import mail
from django.core.exceptions import ObjectDoesNotExist
from django.core.management import call_command

from kobo.apps.kobo_auth.shortcuts import User
Expand All @@ -15,25 +16,38 @@
from kpi.models.import_export_task import ImportTask, SubmissionExportTask


@celery_app.task
@celery_app.task(
autoretry_for=(ObjectDoesNotExist,),
max_retries=settings.MAX_RETRIES_FOR_IMPORT_EXPORT_TASK,
retry_backoff=True,
)
def import_in_background(import_task_uid):
import_task = ImportTask.objects.get(uid=import_task_uid)
import_task.run()
return import_task.uid


@celery_app.task
@celery_app.task(
autoretry_for=(ObjectDoesNotExist,),
max_retries=settings.MAX_RETRIES_FOR_IMPORT_EXPORT_TASK,
retry_backoff=True,
)
def export_in_background(export_task_uid):
export_task = SubmissionExportTask.objects.get(uid=export_task_uid)
export_task.run()


@celery_app.task
@celery_app.task(
autoretry_for=(ObjectDoesNotExist,),
max_retries=settings.MAX_RETRIES_FOR_IMPORT_EXPORT_TASK,
retry_backoff=True,
)
def export_task_in_background(
export_task_uid: str, username: str, export_task_name: str
) -> None:
user = User.objects.get(username=username)
export_task_class = apps.get_model(export_task_name)

export_task = export_task_class.objects.get(uid=export_task_uid)
export = export_task.run()
if export.status == 'complete' and export.result:
Expand Down
8 changes: 5 additions & 3 deletions kpi/tests/api/v1/test_api_assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@
from urllib.parse import unquote_plus

from django.urls import reverse
from formpack.utils.expand_content import SCHEMA_VERSION
from rest_framework import status
from rest_framework.authtoken.models import Token

from formpack.utils.expand_content import SCHEMA_VERSION
from kobo.apps.kobo_auth.shortcuts import User
from kpi.constants import ASSET_TYPE_COLLECTION
from kpi.models import Asset, SubmissionExportTask
Expand All @@ -18,6 +18,7 @@
from kpi.tests.api.v2 import test_api_assets
from kpi.tests.base_test_case import BaseTestCase
from kpi.tests.kpi_test_case import KpiTestCase
from kpi.tests.utils.transaction import immediate_on_commit
from kpi.utils.xml import check_lxml_fromstring

EMPTY_SURVEY = {'survey': [], 'schema': SCHEMA_VERSION, 'settings': {}}
Expand Down Expand Up @@ -296,8 +297,9 @@ def test_owner_can_create_export(self):
'source': asset_url,
'type': 'csv',
}
# Create the export task
response = self.client.post(post_url, task_data)
with immediate_on_commit():
# Create the export task
response = self.client.post(post_url, task_data)
self.assertEqual(response.status_code, status.HTTP_201_CREATED)
# Task should complete right away due to `CELERY_TASK_ALWAYS_EAGER`
detail_response = self.client.get(response.data['url'])
Expand Down
13 changes: 8 additions & 5 deletions kpi/tests/api/v2/test_api_exports.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@
PERM_VIEW_ASSET,
PERM_VIEW_SUBMISSIONS,
)
from kpi.models import Asset, SubmissionExportTask, AssetExportSettings
from kpi.models import Asset, AssetExportSettings, SubmissionExportTask
from kpi.tests.base_test_case import BaseTestCase
from kpi.tests.test_mock_data_exports import MockDataExportsBase
from kpi.tests.utils.transaction import immediate_on_commit
from kpi.urls.router_api_v2 import URL_NAMESPACE as ROUTER_URL_NAMESPACE
from kpi.utils.object_permission import get_anonymous_user

Expand Down Expand Up @@ -335,9 +336,10 @@ def test_synchronous_csv_export_matches_async_export(self):
self._get_endpoint('asset-export-list'),
kwargs={'format': 'json', 'parent_lookup_asset': self.asset.uid},
)
exports_list_response = self.client.post(
exports_list_url, data=es.export_settings
)
with immediate_on_commit():
exports_list_response = self.client.post(
exports_list_url, data=es.export_settings
)
assert exports_list_response.status_code == status.HTTP_201_CREATED

exports_detail_response = self.client.get(
Expand Down Expand Up @@ -473,7 +475,8 @@ def test_export_asset_with_slashes(self):
'fields_from_all_versions': 'false',
'multiple_select': 'both',
}
response = self.client.post(list_url, data=data)
with immediate_on_commit():
response = self.client.post(list_url, data=data)
assert response.status_code == status.HTTP_201_CREATED
export_response = self.client.get(response.data['url'])
filepath = export_response.data['result']
Expand Down
6 changes: 5 additions & 1 deletion kpi/views/v1/export_task.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# coding: utf-8
from django.db import transaction
from django.db.models import TextField
from django.db.models.functions import Cast
from rest_framework import exceptions, serializers, status
Expand Down Expand Up @@ -211,7 +212,10 @@ def create(self, request, *args, **kwargs):
user=request.user, data=task_data
)
# Have Celery run the export in the background
export_in_background.delay(export_task_uid=export_task.uid)
transaction.on_commit(
lambda: export_in_background.delay(export_task_uid=export_task.uid)
)

return Response({
'uid': export_task.uid,
'url': reverse(
Expand Down

0 comments on commit aa5a20c

Please sign in to comment.