# Copyright (c) 2023-present Plane Software, Inc. and contributors # SPDX-License-Identifier: AGPL-3.0-only # See the LICENSE file for details. import json import logging from uuid import uuid4 from django.core.serializers.json import DjangoJSONEncoder from django.db import transaction from django.utils import timezone from plane.settings.redis import redis_instance logger = logging.getLogger(__name__) ISSUE_EVENT_CHANNEL_PREFIX = "plane:issue-events:project" def issue_event_channel(project_id): return f"{ISSUE_EVENT_CHANNEL_PREFIX}:{project_id}" def _publish_payload(project_id, payload): next_payload = { **payload, "project_id": str(project_id), } redis_instance().publish( issue_event_channel(project_id), json.dumps(next_payload, cls=DjangoJSONEncoder), ) def _external_contour_project_ids(contour_request): extra = contour_request.extra or {} project_ids = { str(project_id) for project_id in [ extra.get("source_project_id"), extra.get("target_project_id"), contour_request.project_id, getattr(contour_request.issue, "project_id", None), ] if project_id } return sorted(project_ids) def publish_external_contour_event_on_commit(event_type, contour_request, actor_id=None, changed_fields=None): issue = contour_request.issue extra = contour_request.extra or {} payload = { "event_id": str(uuid4()), "type": event_type, "workspace_id": str(contour_request.workspace_id), "workspace_slug": contour_request.workspace.slug if getattr(contour_request, "workspace", None) else None, "request_id": str(contour_request.id), "issue_id": str(issue.id) if issue else None, "sequence_id": issue.sequence_id if issue else None, "source_project_id": str(extra.get("source_project_id")) if extra.get("source_project_id") else None, "target_project_id": str(extra.get("target_project_id") or contour_request.project_id), "updated_at": contour_request.updated_at or timezone.now(), "actor_id": str(actor_id) if actor_id else None, "changed_fields": sorted(set(changed_fields or [])), } def _publish(): try: for project_id in _external_contour_project_ids(contour_request): _publish_payload(project_id, payload) except Exception: logger.exception("Failed to publish external contour realtime event") transaction.on_commit(_publish) def publish_external_contour_issue_event_on_commit(event_type, issue, actor_id=None, changed_fields=None): def _publish(): try: from plane.db.models import IntakeIssue contour_requests = ( IntakeIssue.objects.filter(issue_id=issue.id, extra__bridge="external-contours") .select_related("issue", "workspace") .only("id", "workspace_id", "workspace__slug", "project_id", "issue_id", "extra", "updated_at") ) for contour_request in contour_requests: event_name = "external_contour.deleted" if event_type == "issue.deleted" else "external_contour.updated" payload = { "event_id": str(uuid4()), "type": event_name, "workspace_id": str(contour_request.workspace_id), "workspace_slug": contour_request.workspace.slug if getattr(contour_request, "workspace", None) else None, "request_id": str(contour_request.id), "issue_id": str(issue.id), "sequence_id": issue.sequence_id, "source_project_id": str(contour_request.extra.get("source_project_id")) if contour_request.extra.get("source_project_id") else None, "target_project_id": str(contour_request.extra.get("target_project_id") or contour_request.project_id), "updated_at": issue.updated_at or timezone.now(), "actor_id": str(actor_id) if actor_id else None, "changed_fields": sorted(set(changed_fields or [])), } for project_id in _external_contour_project_ids(contour_request): _publish_payload(project_id, payload) except Exception: logger.exception("Failed to publish external contour bridge event") transaction.on_commit(_publish) def publish_issue_event_on_commit(event_type, issue, actor_id=None, changed_fields=None, publish_external_bridge=True): payload = { "event_id": str(uuid4()), "type": event_type, "workspace_id": str(issue.workspace_id), "workspace_slug": issue.workspace.slug if getattr(issue, "workspace", None) else None, "project_id": str(issue.project_id), "issue_id": str(issue.id), "sequence_id": issue.sequence_id, "updated_at": issue.updated_at or timezone.now(), "actor_id": str(actor_id) if actor_id else None, "changed_fields": sorted(set(changed_fields or [])), } def _publish(): try: _publish_payload(issue.project_id, payload) except Exception: logger.exception("Failed to publish issue realtime event") transaction.on_commit(_publish) if publish_external_bridge: publish_external_contour_issue_event_on_commit(event_type, issue, actor_id=actor_id, changed_fields=changed_fields) def publish_assignee_cleanup_issue_events_on_commit(project_id=None, workspace_id=None, assignee_id=None, actor_id=None): if not assignee_id: return from plane.db.models import Issue issues = ( Issue.objects.filter( deleted_at__isnull=True, issue_assignee__assignee_id=assignee_id, issue_assignee__deleted_at__isnull=True, ) .select_related("workspace") .distinct() ) if project_id: issues = issues.filter(project_id=project_id) if workspace_id: issues = issues.filter(workspace_id=workspace_id) for issue in issues: publish_issue_event_on_commit( "issue.updated", issue, actor_id=actor_id, changed_fields=["assignees"], publish_external_bridge=True, )