NODEDC_TASKMANAGER/plane-src/apps/api/plane/app/realtime/nodedc_events.py

162 lines
5.2 KiB
Python

# Copyright (c) 2023-present Plane Software, Inc. and contributors
# SPDX-License-Identifier: AGPL-3.0-only
# See the LICENSE file for details.
import json
import logging
from uuid import uuid4
from django.core.serializers.json import DjangoJSONEncoder
from django.db import transaction
from django.utils import timezone
from plane.settings.redis import redis_instance
logger = logging.getLogger(__name__)
NODEDC_EVENT_CHANNEL_PREFIX = "plane:nodedc-events:user"
def nodedc_user_event_channel(user_id):
return f"{NODEDC_EVENT_CHANNEL_PREFIX}:{user_id}"
def _normalize_user_ids(user_ids):
normalized_user_ids = []
seen_user_ids = set()
for user_id in user_ids or []:
if not user_id:
continue
normalized_user_id = str(user_id)
if normalized_user_id in seen_user_ids:
continue
seen_user_ids.add(normalized_user_id)
normalized_user_ids.append(normalized_user_id)
return normalized_user_ids
def _publish_payload_to_users(user_ids, payload):
client = redis_instance()
for user_id in _normalize_user_ids(user_ids):
client.publish(
nodedc_user_event_channel(user_id),
json.dumps({**payload, "target_user_id": user_id}, cls=DjangoJSONEncoder),
)
def publish_nodedc_event_to_users_on_commit(event_type, user_ids, payload=None):
event_payload = {
"event_id": str(uuid4()),
"type": event_type,
"emitted_at": timezone.now(),
**(payload or {}),
}
def _publish():
try:
_publish_payload_to_users(user_ids, event_payload)
except Exception:
logger.exception("Failed to publish NODE.DC realtime event")
transaction.on_commit(_publish)
def publish_nodedc_workspace_event_on_commit(workspace, event_type, payload=None, extra_user_ids=None):
workspace_payload = {
"workspace_id": str(workspace.id),
"workspace_slug": workspace.slug,
**(payload or {}),
}
extra_user_ids = _normalize_user_ids(extra_user_ids or [])
def _publish():
try:
from plane.db.models import WorkspaceMember
workspace_user_ids = WorkspaceMember.objects.filter(
workspace_id=workspace.id,
is_active=True,
member__is_bot=False,
deleted_at__isnull=True,
).values_list("member_id", flat=True)
_publish_payload_to_users(
[*workspace_user_ids, *extra_user_ids],
{
"event_id": str(uuid4()),
"type": event_type,
"emitted_at": timezone.now(),
**workspace_payload,
},
)
except Exception:
logger.exception("Failed to publish NODE.DC workspace realtime event")
transaction.on_commit(_publish)
def publish_nodedc_user_profile_event_on_commit(user, changed_fields=None):
changed_fields = sorted(set(changed_fields or []))
payload = {
"member_id": str(user.id),
"email": user.email,
"display_name": user.display_name,
"avatar": user.avatar or None,
"changed_fields": changed_fields,
}
def _publish():
try:
from plane.db.models import WorkspaceMember
memberships = (
WorkspaceMember.objects.filter(
member_id=user.id,
is_active=True,
deleted_at__isnull=True,
)
.select_related("workspace")
.only("workspace_id", "workspace__slug")
)
workspace_ids = []
client = redis_instance()
for membership in memberships:
workspace_ids.append(str(membership.workspace_id))
workspace_user_ids = WorkspaceMember.objects.filter(
workspace_id=membership.workspace_id,
is_active=True,
member__is_bot=False,
deleted_at__isnull=True,
).values_list("member_id", flat=True)
event_payload = {
"event_id": str(uuid4()),
"type": "user.profile.updated",
"emitted_at": timezone.now(),
"workspace_id": str(membership.workspace_id),
"workspace_slug": membership.workspace.slug,
**payload,
}
for target_user_id in _normalize_user_ids([*workspace_user_ids, user.id]):
client.publish(
nodedc_user_event_channel(target_user_id),
json.dumps({**event_payload, "target_user_id": target_user_id}, cls=DjangoJSONEncoder),
)
if not workspace_ids:
_publish_payload_to_users(
[user.id],
{
"event_id": str(uuid4()),
"type": "user.profile.updated",
"emitted_at": timezone.now(),
**payload,
},
)
except Exception:
logger.exception("Failed to publish NODE.DC profile realtime event")
transaction.on_commit(_publish)