diff --git a/plane-src/apps/api/plane/app/realtime/issue_events.py b/plane-src/apps/api/plane/app/realtime/issue_events.py index 5389b2f..a58d916 100644 --- a/plane-src/apps/api/plane/app/realtime/issue_events.py +++ b/plane-src/apps/api/plane/app/realtime/issue_events.py @@ -135,3 +135,34 @@ def publish_issue_event_on_commit(event_type, issue, actor_id=None, changed_fiel transaction.on_commit(_publish) if publish_external_bridge: publish_external_contour_issue_event_on_commit(event_type, issue, actor_id=actor_id, changed_fields=changed_fields) + + +def publish_assignee_cleanup_issue_events_on_commit(project_id=None, workspace_id=None, assignee_id=None, actor_id=None): + if not assignee_id: + return + + from plane.db.models import Issue + + issues = ( + Issue.objects.filter( + deleted_at__isnull=True, + issue_assignee__assignee_id=assignee_id, + issue_assignee__deleted_at__isnull=True, + ) + .select_related("workspace") + .distinct() + ) + + if project_id: + issues = issues.filter(project_id=project_id) + if workspace_id: + issues = issues.filter(workspace_id=workspace_id) + + for issue in issues: + publish_issue_event_on_commit( + "issue.updated", + issue, + actor_id=actor_id, + changed_fields=["assignees"], + publish_external_bridge=True, + ) diff --git a/plane-src/apps/api/plane/app/realtime/nodedc_events.py b/plane-src/apps/api/plane/app/realtime/nodedc_events.py new file mode 100644 index 0000000..b891057 --- /dev/null +++ b/plane-src/apps/api/plane/app/realtime/nodedc_events.py @@ -0,0 +1,161 @@ +# Copyright (c) 2023-present Plane Software, Inc. and contributors +# SPDX-License-Identifier: AGPL-3.0-only +# See the LICENSE file for details. + +import json +import logging +from uuid import uuid4 + +from django.core.serializers.json import DjangoJSONEncoder +from django.db import transaction +from django.utils import timezone + +from plane.settings.redis import redis_instance + +logger = logging.getLogger(__name__) + +NODEDC_EVENT_CHANNEL_PREFIX = "plane:nodedc-events:user" + + +def nodedc_user_event_channel(user_id): + return f"{NODEDC_EVENT_CHANNEL_PREFIX}:{user_id}" + + +def _normalize_user_ids(user_ids): + normalized_user_ids = [] + seen_user_ids = set() + + for user_id in user_ids or []: + if not user_id: + continue + normalized_user_id = str(user_id) + if normalized_user_id in seen_user_ids: + continue + seen_user_ids.add(normalized_user_id) + normalized_user_ids.append(normalized_user_id) + + return normalized_user_ids + + +def _publish_payload_to_users(user_ids, payload): + client = redis_instance() + for user_id in _normalize_user_ids(user_ids): + client.publish( + nodedc_user_event_channel(user_id), + json.dumps({**payload, "target_user_id": user_id}, cls=DjangoJSONEncoder), + ) + + +def publish_nodedc_event_to_users_on_commit(event_type, user_ids, payload=None): + event_payload = { + "event_id": str(uuid4()), + "type": event_type, + "emitted_at": timezone.now(), + **(payload or {}), + } + + def _publish(): + try: + _publish_payload_to_users(user_ids, event_payload) + except Exception: + logger.exception("Failed to publish NODE.DC realtime event") + + transaction.on_commit(_publish) + + +def publish_nodedc_workspace_event_on_commit(workspace, event_type, payload=None, extra_user_ids=None): + workspace_payload = { + "workspace_id": str(workspace.id), + "workspace_slug": workspace.slug, + **(payload or {}), + } + extra_user_ids = _normalize_user_ids(extra_user_ids or []) + + def _publish(): + try: + from plane.db.models import WorkspaceMember + + workspace_user_ids = WorkspaceMember.objects.filter( + workspace_id=workspace.id, + is_active=True, + member__is_bot=False, + deleted_at__isnull=True, + ).values_list("member_id", flat=True) + + _publish_payload_to_users( + [*workspace_user_ids, *extra_user_ids], + { + "event_id": str(uuid4()), + "type": event_type, + "emitted_at": timezone.now(), + **workspace_payload, + }, + ) + except Exception: + logger.exception("Failed to publish NODE.DC workspace realtime event") + + transaction.on_commit(_publish) + + +def publish_nodedc_user_profile_event_on_commit(user, changed_fields=None): + changed_fields = sorted(set(changed_fields or [])) + payload = { + "member_id": str(user.id), + "email": user.email, + "display_name": user.display_name, + "avatar": user.avatar or None, + "changed_fields": changed_fields, + } + + def _publish(): + try: + from plane.db.models import WorkspaceMember + + memberships = ( + WorkspaceMember.objects.filter( + member_id=user.id, + is_active=True, + deleted_at__isnull=True, + ) + .select_related("workspace") + .only("workspace_id", "workspace__slug") + ) + + workspace_ids = [] + client = redis_instance() + for membership in memberships: + workspace_ids.append(str(membership.workspace_id)) + workspace_user_ids = WorkspaceMember.objects.filter( + workspace_id=membership.workspace_id, + is_active=True, + member__is_bot=False, + deleted_at__isnull=True, + ).values_list("member_id", flat=True) + event_payload = { + "event_id": str(uuid4()), + "type": "user.profile.updated", + "emitted_at": timezone.now(), + "workspace_id": str(membership.workspace_id), + "workspace_slug": membership.workspace.slug, + **payload, + } + for target_user_id in _normalize_user_ids([*workspace_user_ids, user.id]): + client.publish( + nodedc_user_event_channel(target_user_id), + json.dumps({**event_payload, "target_user_id": target_user_id}, cls=DjangoJSONEncoder), + ) + + if not workspace_ids: + _publish_payload_to_users( + [user.id], + { + "event_id": str(uuid4()), + "type": "user.profile.updated", + "emitted_at": timezone.now(), + **payload, + }, + ) + except Exception: + logger.exception("Failed to publish NODE.DC profile realtime event") + + transaction.on_commit(_publish) diff --git a/plane-src/apps/api/plane/app/views/project/member.py b/plane-src/apps/api/plane/app/views/project/member.py index 47abc01..d3a0ec4 100644 --- a/plane-src/apps/api/plane/app/views/project/member.py +++ b/plane-src/apps/api/plane/app/views/project/member.py @@ -9,6 +9,8 @@ from django.db.models import Min # Module imports from .base import BaseViewSet, BaseAPIView +from plane.app.realtime.issue_events import publish_assignee_cleanup_issue_events_on_commit +from plane.app.realtime.nodedc_events import publish_nodedc_workspace_event_on_commit from plane.authentication.nodedc_workspace_policy import ( is_nodedc_launcher_managed_workspace, nodedc_launcher_managed_workspace_response, @@ -176,6 +178,18 @@ class ProjectMemberViewSet(BaseViewSet): ], batch_size=10, ) + for project_member in project_members: + publish_nodedc_workspace_event_on_commit( + project_member.workspace, + "project_member.created", + payload={ + "project_id": str(project_member.project_id), + "member_id": str(project_member.member_id), + "role": project_member.role, + "source": "tasker", + }, + extra_user_ids=[project_member.member_id], + ) # Send emails to notify the users [ project_add_user_email.delay( @@ -301,6 +315,17 @@ class ProjectMemberViewSet(BaseViewSet): if serializer.is_valid(): serializer.save() + publish_nodedc_workspace_event_on_commit( + project_member.workspace, + "project_member.updated", + payload={ + "project_id": str(project_member.project_id), + "member_id": str(project_member.member_id), + "role": project_member.role, + "source": "tasker", + }, + extra_user_ids=[project_member.member_id], + ) return Response(serializer.data, status=status.HTTP_200_OK) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) @@ -336,9 +361,24 @@ class ProjectMemberViewSet(BaseViewSet): status=status.HTTP_400_BAD_REQUEST, ) + publish_assignee_cleanup_issue_events_on_commit( + project_id=project_id, + assignee_id=project_member.member_id, + actor_id=request.user.id, + ) project_member.is_active = False project_member.save() IssueAssignee.objects.filter(project_id=project_id, assignee_id=project_member.member_id).delete() + publish_nodedc_workspace_event_on_commit( + project_member.workspace, + "project_member.deleted", + payload={ + "project_id": str(project_member.project_id), + "member_id": str(project_member.member_id), + "source": "tasker", + }, + extra_user_ids=[project_member.member_id], + ) return Response(status=status.HTTP_204_NO_CONTENT) @allow_permission([ROLE.ADMIN, ROLE.MEMBER, ROLE.GUEST]) @@ -367,10 +407,25 @@ class ProjectMemberViewSet(BaseViewSet): }, status=status.HTTP_400_BAD_REQUEST, ) + publish_assignee_cleanup_issue_events_on_commit( + project_id=project_id, + assignee_id=project_member.member_id, + actor_id=request.user.id, + ) # Deactivate the user project_member.is_active = False project_member.save() IssueAssignee.objects.filter(project_id=project_id, assignee_id=project_member.member_id).delete() + publish_nodedc_workspace_event_on_commit( + project_member.workspace, + "project_member.deleted", + payload={ + "project_id": str(project_member.project_id), + "member_id": str(project_member.member_id), + "source": "tasker", + }, + extra_user_ids=[project_member.member_id], + ) return Response(status=status.HTTP_204_NO_CONTENT) diff --git a/plane-src/apps/api/plane/app/views/workspace/invite.py b/plane-src/apps/api/plane/app/views/workspace/invite.py index 5164cc8..ad09c73 100644 --- a/plane-src/apps/api/plane/app/views/workspace/invite.py +++ b/plane-src/apps/api/plane/app/views/workspace/invite.py @@ -20,6 +20,7 @@ from rest_framework.response import Response # Module imports from plane.app.permissions import WorkSpaceAdminPermission +from plane.app.realtime.nodedc_events import publish_nodedc_workspace_event_on_commit from plane.authentication.nodedc_workspace_policy import ( get_nodedc_workspace_creation_policy, is_nodedc_launcher_managed_workspace, @@ -159,6 +160,18 @@ class WorkspaceInvitationsViewset(BaseViewSet): invitation.nodedc_approval_request_id = approval_request_id invitation.save(update_fields=["nodedc_approval_request_id", "updated_at"]) approved_requests.append(approval_request_id) + invited_user = User.objects.filter(email__iexact=invitation.email, is_bot=False).first() + publish_nodedc_workspace_event_on_commit( + workspace, + "workspace_invite.created", + payload={ + "invite_id": str(invitation.id), + "email": invitation.email, + "status": invitation.nodedc_approval_status, + "source": "tasker", + }, + extra_user_ids=[request.user.id, getattr(invited_user, "id", None)], + ) except Exception: WorkspaceMemberInvite.objects.filter(id__in=[invitation.id for invitation in workspace_invitations]).delete() return Response( @@ -197,6 +210,18 @@ class WorkspaceInvitationsViewset(BaseViewSet): "invitee_email": invitation.email, }, ) + invited_user = User.objects.filter(email__iexact=invitation.email, is_bot=False).first() + publish_nodedc_workspace_event_on_commit( + workspace, + "workspace_invite.created", + payload={ + "invite_id": str(invitation.id), + "email": invitation.email, + "status": invitation.nodedc_approval_status, + "source": "tasker", + }, + extra_user_ids=[request.user.id, getattr(invited_user, "id", None)], + ) return Response({"message": "Emails sent successfully"}, status=status.HTTP_200_OK) @@ -214,7 +239,23 @@ class WorkspaceInvitationsViewset(BaseViewSet): status=status.HTTP_502_BAD_GATEWAY, ) + invited_user = User.objects.filter(email__iexact=workspace_member_invite.email, is_bot=False).first() + workspace = workspace_member_invite.workspace + invite_payload = { + "invite_id": str(workspace_member_invite.id), + "email": workspace_member_invite.email, + "status": workspace_member_invite.nodedc_approval_status, + "source": "tasker", + } + extra_user_ids = [workspace_member_invite.created_by_id, getattr(invited_user, "id", None)] + workspace_member_invite.delete() + publish_nodedc_workspace_event_on_commit( + workspace, + "workspace_invite.deleted", + payload=invite_payload, + extra_user_ids=extra_user_ids, + ) return Response(status=status.HTTP_204_NO_CONTENT) @@ -340,6 +381,17 @@ class WorkspaceJoinEndpoint(BaseAPIView): }, ) + publish_nodedc_workspace_event_on_commit( + workspace_invite.workspace, + "workspace_member.created", + payload={ + "member_id": str(user.id), + "role": workspace_invite.role, + "source": "tasker", + }, + extra_user_ids=[user.id, workspace_invite.created_by_id], + ) + # Delete the invitation workspace_invite.delete() @@ -447,6 +499,16 @@ class UserWorkspaceInvitationsViewSet(BaseViewSet): }, ) ) + publish_nodedc_workspace_event_on_commit( + invitation.workspace, + "workspace_member.created", + payload={ + "member_id": str(request.user.id), + "role": invitation.role, + "source": "tasker", + }, + extra_user_ids=[request.user.id, invitation.created_by_id], + ) # Bulk create the user for all the workspaces WorkspaceMember.objects.bulk_create( diff --git a/plane-src/apps/api/plane/app/views/workspace/member.py b/plane-src/apps/api/plane/app/views/workspace/member.py index e35a3b6..2fc3837 100644 --- a/plane-src/apps/api/plane/app/views/workspace/member.py +++ b/plane-src/apps/api/plane/app/views/workspace/member.py @@ -12,6 +12,8 @@ from rest_framework import status from rest_framework.response import Response from plane.app.permissions import WorkspaceEntityPermission, allow_permission, ROLE +from plane.app.realtime.issue_events import publish_assignee_cleanup_issue_events_on_commit +from plane.app.realtime.nodedc_events import publish_nodedc_workspace_event_on_commit from plane.authentication.nodedc_workspace_policy import ( get_nodedc_workspace_creation_policy, is_nodedc_launcher_managed_workspace, @@ -54,7 +56,7 @@ class WorkSpaceMemberViewSet(BaseViewSet): workspace_member = WorkspaceMember.objects.get(member=request.user, workspace__slug=slug, is_active=True) # Get all active workspace members - workspace_members = self.get_queryset() + workspace_members = self.get_queryset().filter(is_active=True) if workspace_member.role > 5: serializer = WorkspaceMemberAdminSerializer(workspace_members, fields=("id", "member", "role"), many=True) else: @@ -102,6 +104,16 @@ class WorkSpaceMemberViewSet(BaseViewSet): if serializer.is_valid(): serializer.save() + publish_nodedc_workspace_event_on_commit( + workspace_member.workspace, + "workspace_member.updated", + payload={ + "member_id": str(workspace_member.member_id), + "role": workspace_member.role, + "source": "tasker", + }, + extra_user_ids=[workspace_member.member_id], + ) return Response(serializer.data, status=status.HTTP_200_OK) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) @@ -166,6 +178,20 @@ class WorkSpaceMemberViewSet(BaseViewSet): status=status.HTTP_502_BAD_GATEWAY, ) + project_ids = list( + ProjectMember.objects.filter( + workspace__slug=slug, + member_id=workspace_member.member_id, + is_active=True, + ).values_list("project_id", flat=True) + ) + + publish_assignee_cleanup_issue_events_on_commit( + workspace_id=workspace_member.workspace_id, + assignee_id=workspace_member.member_id, + actor_id=request.user.id, + ) + # Deactivate the users from the projects where the user is part of _ = ProjectMember.objects.filter( workspace__slug=slug, member_id=workspace_member.member_id, is_active=True @@ -174,6 +200,16 @@ class WorkSpaceMemberViewSet(BaseViewSet): workspace_member.is_active = False workspace_member.save() + publish_nodedc_workspace_event_on_commit( + workspace_member.workspace, + "workspace_member.deleted", + payload={ + "member_id": str(workspace_member.member_id), + "project_ids": [str(project_id) for project_id in project_ids], + "source": "tasker", + }, + extra_user_ids=[workspace_member.member_id], + ) return Response(status=status.HTTP_204_NO_CONTENT) @invalidate_cache( @@ -224,6 +260,20 @@ class WorkSpaceMemberViewSet(BaseViewSet): status=status.HTTP_400_BAD_REQUEST, ) + project_ids = list( + ProjectMember.objects.filter( + workspace__slug=slug, + member_id=workspace_member.member_id, + is_active=True, + ).values_list("project_id", flat=True) + ) + + publish_assignee_cleanup_issue_events_on_commit( + workspace_id=workspace_member.workspace_id, + assignee_id=workspace_member.member_id, + actor_id=request.user.id, + ) + # # Deactivate the users from the projects where the user is part of _ = ProjectMember.objects.filter( workspace__slug=slug, member_id=workspace_member.member_id, is_active=True @@ -233,6 +283,16 @@ class WorkSpaceMemberViewSet(BaseViewSet): # # Deactivate the user workspace_member.is_active = False workspace_member.save() + publish_nodedc_workspace_event_on_commit( + workspace_member.workspace, + "workspace_member.deleted", + payload={ + "member_id": str(workspace_member.member_id), + "project_ids": [str(project_id) for project_id in project_ids], + "source": "tasker", + }, + extra_user_ids=[workspace_member.member_id], + ) return Response(status=status.HTTP_204_NO_CONTENT) diff --git a/plane-src/apps/api/plane/authentication/views/nodedc_workspace_adapter.py b/plane-src/apps/api/plane/authentication/views/nodedc_workspace_adapter.py index d7e91a7..04e6d31 100644 --- a/plane-src/apps/api/plane/authentication/views/nodedc_workspace_adapter.py +++ b/plane-src/apps/api/plane/authentication/views/nodedc_workspace_adapter.py @@ -10,6 +10,11 @@ from django.views import View from django.views.decorators.csrf import csrf_exempt from plane.authentication.views.nodedc_logout import is_internal_logout_request_authorized +from plane.app.realtime.issue_events import publish_assignee_cleanup_issue_events_on_commit +from plane.app.realtime.nodedc_events import ( + publish_nodedc_user_profile_event_on_commit, + publish_nodedc_workspace_event_on_commit, +) from plane.utils.host import base_host from plane.db.models import ( ExternalIdentityLink, @@ -361,6 +366,9 @@ class NodeDCInternalUserProfileSyncEndpoint(View): with transaction.atomic(): updated_fields = sync_user_profile_from_payload(user, payload) + if updated_fields: + publish_nodedc_user_profile_event_on_commit(user, changed_fields=updated_fields) + return JsonResponse( { "ok": True, @@ -434,6 +442,17 @@ class NodeDCInternalWorkspaceMembershipEnsureEndpoint(View): profile.last_workspace_id = workspace.id profile.save(update_fields=["last_workspace_id", "updated_at"]) + publish_nodedc_workspace_event_on_commit( + workspace, + "workspace_member.created" if created else "workspace_member.updated", + payload={ + "member_id": str(user.id), + "role": membership.role, + "source": "launcher", + }, + extra_user_ids=[user.id], + ) + return JsonResponse({"ok": True, "membership": serialize_membership(membership, created)}) @@ -475,7 +494,16 @@ class NodeDCInternalWorkspaceMembershipRemoveEndpoint(View): } ) + project_ids = list( + ProjectMember.objects.filter( + project__workspace=workspace, + member=user, + is_active=True, + ).values_list("project_id", flat=True) + ) + with transaction.atomic(): + publish_assignee_cleanup_issue_events_on_commit(workspace_id=workspace.id, assignee_id=user.id) ProjectMember.objects.filter( project__workspace=workspace, member=user, @@ -488,6 +516,17 @@ class NodeDCInternalWorkspaceMembershipRemoveEndpoint(View): membership.is_active = False membership.save(update_fields=["is_active", "updated_at"]) + publish_nodedc_workspace_event_on_commit( + workspace, + "workspace_member.deleted", + payload={ + "member_id": str(user.id), + "project_ids": [str(project_id) for project_id in project_ids], + "source": "launcher", + }, + extra_user_ids=[user.id], + ) + return JsonResponse( { "ok": True, @@ -530,6 +569,19 @@ class NodeDCInternalWorkspaceInviteApproveEndpoint(View): ] ) + invited_user = User.objects.filter(email__iexact=invitation.email, is_bot=False).first() + publish_nodedc_workspace_event_on_commit( + invitation.workspace, + "workspace_invite.approved", + payload={ + "invite_id": str(invitation.id), + "email": invitation.email, + "status": invitation.nodedc_approval_status, + "source": "launcher", + }, + extra_user_ids=[invitation.created_by_id, getattr(invited_user, "id", None)], + ) + return JsonResponse({"ok": True, "invite": serialize_workspace_invite(request, invitation)}) @@ -561,6 +613,19 @@ class NodeDCInternalWorkspaceInviteRejectEndpoint(View): ] ) + invited_user = User.objects.filter(email__iexact=invitation.email, is_bot=False).first() + publish_nodedc_workspace_event_on_commit( + invitation.workspace, + "workspace_invite.rejected", + payload={ + "invite_id": str(invitation.id), + "email": invitation.email, + "status": invitation.nodedc_approval_status, + "source": "launcher", + }, + extra_user_ids=[invitation.created_by_id, getattr(invited_user, "id", None)], + ) + return JsonResponse({"ok": True, "invite": serialize_workspace_invite(request, invitation)}) @@ -648,6 +713,28 @@ class NodeDCInternalProjectMembershipEnsureEndpoint(View): profile.last_workspace_id = workspace.id profile.save(update_fields=["last_workspace_id", "updated_at"]) + publish_nodedc_workspace_event_on_commit( + workspace, + "workspace_member.updated", + payload={ + "member_id": str(user.id), + "role": workspace_membership.role, + "source": "launcher", + }, + extra_user_ids=[user.id], + ) + publish_nodedc_workspace_event_on_commit( + workspace, + "project_member.created" if created else "project_member.updated", + payload={ + "project_id": str(project.id), + "member_id": str(user.id), + "role": project_member.role, + "source": "launcher", + }, + extra_user_ids=[user.id], + ) + return JsonResponse({"ok": True, "membership": serialize_project_membership(project_member, created)}) @@ -695,10 +782,22 @@ class NodeDCInternalProjectMembershipRemoveEndpoint(View): } ) + publish_assignee_cleanup_issue_events_on_commit(project_id=project.id, assignee_id=user.id) project_member.is_active = False project_member.save(update_fields=["is_active", "updated_at"]) IssueAssignee.objects.filter(project=project, assignee=user).delete() + publish_nodedc_workspace_event_on_commit( + workspace, + "project_member.deleted", + payload={ + "project_id": str(project.id), + "member_id": str(user.id), + "source": "launcher", + }, + extra_user_ids=[user.id], + ) + return JsonResponse( { "ok": True, diff --git a/plane-src/apps/live/Dockerfile.live b/plane-src/apps/live/Dockerfile.live index 801afca..864bd0d 100644 --- a/plane-src/apps/live/Dockerfile.live +++ b/plane-src/apps/live/Dockerfile.live @@ -3,7 +3,7 @@ FROM node:22-alpine AS base # Setup pnpm package manager with corepack and configure global bin directory for caching ENV PNPM_HOME="/pnpm" -ENV PATH="$PNPM_HOME:$PATH" +ENV PATH="$PNPM_HOME:$PNPM_HOME/bin:$PATH" RUN corepack enable # ***************************************************************************** diff --git a/plane-src/apps/live/src/controllers/index.ts b/plane-src/apps/live/src/controllers/index.ts index db4e458..5eda76d 100644 --- a/plane-src/apps/live/src/controllers/index.ts +++ b/plane-src/apps/live/src/controllers/index.ts @@ -8,6 +8,7 @@ import { CollaborationController } from "./collaboration.controller"; import { DocumentController } from "./document.controller"; import { HealthController } from "./health.controller"; import { IssueStreamController } from "./issue-stream.controller"; +import { NodeDCStreamController } from "./nodedc-stream.controller"; import { PdfExportController } from "./pdf-export.controller"; export const CONTROLLERS = [ @@ -15,5 +16,6 @@ export const CONTROLLERS = [ DocumentController, HealthController, IssueStreamController, + NodeDCStreamController, PdfExportController, ]; diff --git a/plane-src/apps/live/src/controllers/nodedc-stream.controller.ts b/plane-src/apps/live/src/controllers/nodedc-stream.controller.ts new file mode 100644 index 0000000..5127816 --- /dev/null +++ b/plane-src/apps/live/src/controllers/nodedc-stream.controller.ts @@ -0,0 +1,126 @@ +/** + * Copyright (c) 2023-present Plane Software, Inc. and contributors + * SPDX-License-Identifier: AGPL-3.0-only + * See the LICENSE file for details. + */ + +import type { Request } from "express"; +import type Redis from "ioredis"; +import type { WebSocket as WSSocket } from "ws"; +// plane imports +import { Controller, WebSocket as WSDecorator } from "@plane/decorators"; +import { logger } from "@plane/logger"; +// redis +import { redisManager } from "@/redis"; +// services +import { UserService } from "@/services/user.service"; + +const NODEDC_EVENT_CHANNEL_PREFIX = "plane:nodedc-events:user"; +const HEARTBEAT_INTERVAL_MS = 25_000; + +const sendJson = (ws: WSSocket, payload: Record) => { + if (ws.readyState !== 1) return; + ws.send(JSON.stringify(payload)); +}; + +@Controller("/nodedc") +export class NodeDCStreamController { + [key: string]: unknown; + + @WSDecorator("/stream") + handleConnection(ws: WSSocket, req: Request) { + void this.handleNodeDCStream(ws, req); + } + + private async handleNodeDCStream(ws: WSSocket, req: Request) { + const cookie = req.headers.cookie?.toString(); + + if (!cookie) { + ws.close(1008, "Missing NODE.DC stream credentials"); + return; + } + + let subscriber: Redis | undefined; + let heartbeat: NodeJS.Timeout | undefined; + let isCleanedUp = false; + + const cleanup = async () => { + if (isCleanedUp) return; + isCleanedUp = true; + + if (heartbeat) clearInterval(heartbeat); + if (!subscriber) return; + + try { + await subscriber.unsubscribe(); + subscriber.disconnect(); + } catch (error) { + logger.error("NODEDC_STREAM_CONTROLLER: Redis cleanup failed:", error); + } + }; + + try { + const userService = new UserService(); + const user = await userService.currentUser(cookie); + const redisClient = redisManager.getClient(); + + if (!redisClient) { + ws.close(1011, "NODE.DC stream unavailable"); + return; + } + + const channel = `${NODEDC_EVENT_CHANNEL_PREFIX}:${user.id}`; + subscriber = redisClient.duplicate({ lazyConnect: true }); + await subscriber.connect(); + await subscriber.subscribe(channel); + + subscriber.on("message", (_channel, message) => { + try { + const event = JSON.parse(message) as Record; + sendJson(ws, event); + } catch (error) { + logger.error("NODEDC_STREAM_CONTROLLER: Failed to forward event:", error); + } + }); + + subscriber.on("error", (error) => { + logger.error("NODEDC_STREAM_CONTROLLER: Redis subscriber error:", error); + ws.close(1011, "NODE.DC stream subscriber failed"); + }); + + heartbeat = setInterval(() => { + sendJson(ws, { type: "nodedc.stream.ping", server_ts: new Date().toISOString() }); + }, HEARTBEAT_INTERVAL_MS); + + sendJson(ws, { + type: "nodedc.stream.ready", + user_id: user.id, + server_ts: new Date().toISOString(), + }); + } catch (error) { + logger.error("NODEDC_STREAM_CONTROLLER: WebSocket authentication failed:", error); + ws.close(1008, "NODE.DC stream authentication failed"); + await cleanup(); + return; + } + + ws.on("message", (message) => { + try { + const payload = JSON.parse(message.toString()) as { type?: string }; + if (payload.type === "nodedc.stream.pong") return; + } catch { + // Client messages are optional for this stream. + } + }); + + ws.on("close", () => { + void cleanup(); + }); + + ws.on("error", (error: Error) => { + logger.error("NODEDC_STREAM_CONTROLLER: WebSocket connection error:", error); + ws.close(1011, "NODE.DC stream connection failed"); + void cleanup(); + }); + } +} diff --git a/plane-src/apps/web/core/components/project/confirm-project-member-remove.tsx b/plane-src/apps/web/core/components/project/confirm-project-member-remove.tsx index 52a7c30..47218c9 100644 --- a/plane-src/apps/web/core/components/project/confirm-project-member-remove.tsx +++ b/plane-src/apps/web/core/components/project/confirm-project-member-remove.tsx @@ -6,7 +6,6 @@ import { useState } from "react"; import { observer } from "mobx-react"; -import { useParams } from "next/navigation"; import { AlertTriangle } from "lucide-react"; // types import { Button } from "@plane/propel/button"; @@ -22,12 +21,11 @@ type Props = { onSubmit: () => Promise; isOpen: boolean; onClose: () => void; + projectId: string; }; export const ConfirmProjectMemberRemove = observer(function ConfirmProjectMemberRemove(props: Props) { - const { data, onSubmit, isOpen, onClose } = props; - // router - const { projectId } = useParams(); + const { data, onSubmit, isOpen, onClose, projectId } = props; // states const [isDeleteLoading, setIsDeleteLoading] = useState(false); // store hooks @@ -50,44 +48,62 @@ export const ConfirmProjectMemberRemove = observer(function ConfirmProjectMember if (!projectId) return <>; const isCurrentUser = currentUser?.id === data?.id; - const currentProjectDetails = getProjectById(projectId.toString()); + const currentProjectDetails = getProjectById(projectId); + const memberName = data?.display_name || "участника"; return ( - -
-
-
-