NODEDC_TASKMANAGER/plane-src/apps/api/plane/app/realtime/issue_events.py

138 lines
5.4 KiB
Python

# Copyright (c) 2023-present Plane Software, Inc. and contributors
# SPDX-License-Identifier: AGPL-3.0-only
# See the LICENSE file for details.
import json
import logging
from uuid import uuid4
from django.core.serializers.json import DjangoJSONEncoder
from django.db import transaction
from django.utils import timezone
from plane.settings.redis import redis_instance
logger = logging.getLogger(__name__)
ISSUE_EVENT_CHANNEL_PREFIX = "plane:issue-events:project"
def issue_event_channel(project_id):
return f"{ISSUE_EVENT_CHANNEL_PREFIX}:{project_id}"
def _publish_payload(project_id, payload):
next_payload = {
**payload,
"project_id": str(project_id),
}
redis_instance().publish(
issue_event_channel(project_id),
json.dumps(next_payload, cls=DjangoJSONEncoder),
)
def _external_contour_project_ids(contour_request):
extra = contour_request.extra or {}
project_ids = {
str(project_id)
for project_id in [
extra.get("source_project_id"),
extra.get("target_project_id"),
contour_request.project_id,
getattr(contour_request.issue, "project_id", None),
]
if project_id
}
return sorted(project_ids)
def publish_external_contour_event_on_commit(event_type, contour_request, actor_id=None, changed_fields=None):
issue = contour_request.issue
extra = contour_request.extra or {}
payload = {
"event_id": str(uuid4()),
"type": event_type,
"workspace_id": str(contour_request.workspace_id),
"workspace_slug": contour_request.workspace.slug if getattr(contour_request, "workspace", None) else None,
"request_id": str(contour_request.id),
"issue_id": str(issue.id) if issue else None,
"sequence_id": issue.sequence_id if issue else None,
"source_project_id": str(extra.get("source_project_id")) if extra.get("source_project_id") else None,
"target_project_id": str(extra.get("target_project_id") or contour_request.project_id),
"updated_at": contour_request.updated_at or timezone.now(),
"actor_id": str(actor_id) if actor_id else None,
"changed_fields": sorted(set(changed_fields or [])),
}
def _publish():
try:
for project_id in _external_contour_project_ids(contour_request):
_publish_payload(project_id, payload)
except Exception:
logger.exception("Failed to publish external contour realtime event")
transaction.on_commit(_publish)
def publish_external_contour_issue_event_on_commit(event_type, issue, actor_id=None, changed_fields=None):
def _publish():
try:
from plane.db.models import IntakeIssue
contour_requests = (
IntakeIssue.objects.filter(issue_id=issue.id, extra__bridge="external-contours")
.select_related("issue", "workspace")
.only("id", "workspace_id", "workspace__slug", "project_id", "issue_id", "extra", "updated_at")
)
for contour_request in contour_requests:
event_name = "external_contour.deleted" if event_type == "issue.deleted" else "external_contour.updated"
payload = {
"event_id": str(uuid4()),
"type": event_name,
"workspace_id": str(contour_request.workspace_id),
"workspace_slug": contour_request.workspace.slug if getattr(contour_request, "workspace", None) else None,
"request_id": str(contour_request.id),
"issue_id": str(issue.id),
"sequence_id": issue.sequence_id,
"source_project_id": str(contour_request.extra.get("source_project_id"))
if contour_request.extra.get("source_project_id")
else None,
"target_project_id": str(contour_request.extra.get("target_project_id") or contour_request.project_id),
"updated_at": issue.updated_at or timezone.now(),
"actor_id": str(actor_id) if actor_id else None,
"changed_fields": sorted(set(changed_fields or [])),
}
for project_id in _external_contour_project_ids(contour_request):
_publish_payload(project_id, payload)
except Exception:
logger.exception("Failed to publish external contour bridge event")
transaction.on_commit(_publish)
def publish_issue_event_on_commit(event_type, issue, actor_id=None, changed_fields=None, publish_external_bridge=True):
payload = {
"event_id": str(uuid4()),
"type": event_type,
"workspace_id": str(issue.workspace_id),
"workspace_slug": issue.workspace.slug if getattr(issue, "workspace", None) else None,
"project_id": str(issue.project_id),
"issue_id": str(issue.id),
"sequence_id": issue.sequence_id,
"updated_at": issue.updated_at or timezone.now(),
"actor_id": str(actor_id) if actor_id else None,
"changed_fields": sorted(set(changed_fields or [])),
}
def _publish():
try:
_publish_payload(issue.project_id, payload)
except Exception:
logger.exception("Failed to publish issue realtime event")
transaction.on_commit(_publish)
if publish_external_bridge:
publish_external_contour_issue_event_on_commit(event_type, issue, actor_id=actor_id, changed_fields=changed_fields)