diff --git a/README.md b/README.md index 8e04bca..617cd5f 100644 --- a/README.md +++ b/README.md @@ -60,6 +60,7 @@ cd /Users/dcconstructions/Downloads/mnt/data/dc_taskmanager/NODEDC_TASKMANAGER/p docker build -t nodedc/plane-backend:local -f Dockerfile.api . cd /Users/dcconstructions/Downloads/mnt/data/dc_taskmanager/NODEDC_TASKMANAGER/plane-src +docker build -t nodedc/plane-live:local -f apps/live/Dockerfile.live . docker build -t nodedc/plane-frontend:ru -f apps/web/Dockerfile.web . docker build -t nodedc/plane-admin:ru -f apps/admin/Dockerfile.admin . docker build -t nodedc/plane-space:ru -f apps/space/Dockerfile.space . diff --git a/README_RUN_RU.md b/README_RUN_RU.md index 1207ef2..fd0a414 100644 --- a/README_RUN_RU.md +++ b/README_RUN_RU.md @@ -120,6 +120,7 @@ cd /Users/dcconstructions/Downloads/mnt/data/dc_taskmanager/NODEDC_TASKMANAGER ```bash cd /Users/dcconstructions/Downloads/mnt/data/dc_taskmanager/NODEDC_TASKMANAGER/plane-src +docker build -t nodedc/plane-live:local -f apps/live/Dockerfile.live . docker build -t nodedc/plane-frontend:ru -f apps/web/Dockerfile.web . docker build -t nodedc/plane-admin:ru -f apps/admin/Dockerfile.admin . docker build -t nodedc/plane-space:ru -f apps/space/Dockerfile.space . diff --git a/docker/docker-compose.yaml b/docker/docker-compose.yaml index 94c4a27..531ad48 100644 --- a/docker/docker-compose.yaml +++ b/docker/docker-compose.yaml @@ -99,7 +99,7 @@ services: - web live: - image: makeplane/plane-live:${APP_RELEASE:-v1.3.0} + image: nodedc/plane-live:local environment: <<: [*live-env, *redis-env] deploy: diff --git a/plane-app/docker-compose.yaml b/plane-app/docker-compose.yaml index d78b7eb..1e30dfc 100644 --- a/plane-app/docker-compose.yaml +++ b/plane-app/docker-compose.yaml @@ -99,7 +99,7 @@ services: - web live: - image: makeplane/plane-live:${APP_RELEASE:-v1.3.0} + image: nodedc/plane-live:local environment: <<: [*live-env, *redis-env] deploy: diff --git a/plane-src/apps/api/plane/app/realtime/__init__.py b/plane-src/apps/api/plane/app/realtime/__init__.py new file mode 100644 index 0000000..fcc34a7 --- /dev/null +++ b/plane-src/apps/api/plane/app/realtime/__init__.py @@ -0,0 +1,3 @@ +# Copyright (c) 2023-present Plane Software, Inc. and contributors +# SPDX-License-Identifier: AGPL-3.0-only +# See the LICENSE file for details. diff --git a/plane-src/apps/api/plane/app/realtime/issue_events.py b/plane-src/apps/api/plane/app/realtime/issue_events.py new file mode 100644 index 0000000..5887991 --- /dev/null +++ b/plane-src/apps/api/plane/app/realtime/issue_events.py @@ -0,0 +1,47 @@ +# Copyright (c) 2023-present Plane Software, Inc. and contributors +# SPDX-License-Identifier: AGPL-3.0-only +# See the LICENSE file for details. + +import json +import logging +from uuid import uuid4 + +from django.core.serializers.json import DjangoJSONEncoder +from django.db import transaction +from django.utils import timezone + +from plane.settings.redis import redis_instance + +logger = logging.getLogger(__name__) + +ISSUE_EVENT_CHANNEL_PREFIX = "plane:issue-events:project" + + +def issue_event_channel(project_id): + return f"{ISSUE_EVENT_CHANNEL_PREFIX}:{project_id}" + + +def publish_issue_event_on_commit(event_type, issue, actor_id=None, changed_fields=None): + payload = { + "event_id": str(uuid4()), + "type": event_type, + "workspace_id": str(issue.workspace_id), + "workspace_slug": issue.workspace.slug if getattr(issue, "workspace", None) else None, + "project_id": str(issue.project_id), + "issue_id": str(issue.id), + "sequence_id": issue.sequence_id, + "updated_at": issue.updated_at or timezone.now(), + "actor_id": str(actor_id) if actor_id else None, + "changed_fields": sorted(set(changed_fields or [])), + } + + def _publish(): + try: + redis_instance().publish( + issue_event_channel(issue.project_id), + json.dumps(payload, cls=DjangoJSONEncoder), + ) + except Exception: + logger.exception("Failed to publish issue realtime event") + + transaction.on_commit(_publish) diff --git a/plane-src/apps/api/plane/app/views/issue/base.py b/plane-src/apps/api/plane/app/views/issue/base.py index 045c83e..81cbe14 100644 --- a/plane-src/apps/api/plane/app/views/issue/base.py +++ b/plane-src/apps/api/plane/app/views/issue/base.py @@ -33,6 +33,7 @@ from rest_framework.response import Response # Module imports from plane.app.permissions import ROLE, allow_permission +from plane.app.realtime.issue_events import publish_issue_event_on_commit from plane.app.serializers import ( IssueCreateSerializer, IssueDetailSerializer, @@ -429,7 +430,7 @@ class IssueViewSet(BaseViewSet): ) if serializer.is_valid(): - serializer.save() + issue_instance = serializer.save() # Track the issue issue_activity.delay( @@ -502,6 +503,12 @@ class IssueViewSet(BaseViewSet): user_id=request.user.id, is_creating=True, ) + publish_issue_event_on_commit( + "issue.created", + issue_instance, + actor_id=request.user.id, + changed_fields=request.data.keys(), + ) return Response(issue, status=status.HTTP_201_CREATED) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) @@ -695,7 +702,7 @@ class IssueViewSet(BaseViewSet): requested_data = json.dumps(self.request.data, cls=DjangoJSONEncoder) serializer = IssueCreateSerializer(issue, data=request.data, partial=True, context={"project_id": project_id}) if serializer.is_valid(): - serializer.save() + updated_issue = serializer.save() # Check if the update is a migration description update is_migration_description_update = skip_activity and is_description_update # Log all the updates @@ -726,12 +733,18 @@ class IssueViewSet(BaseViewSet): issue_id=str(serializer.data.get("id", None)), user_id=request.user.id, ) + publish_issue_event_on_commit( + "issue.updated", + updated_issue, + actor_id=request.user.id, + changed_fields=request.data.keys(), + ) return Response(status=status.HTTP_204_NO_CONTENT) return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST) @allow_permission([ROLE.ADMIN], creator=True, model=Issue) def destroy(self, request, slug, project_id, pk=None): - issue = Issue.objects.get(workspace__slug=slug, project_id=project_id, pk=pk) + issue = Issue.objects.select_related("workspace").get(workspace__slug=slug, project_id=project_id, pk=pk) issue.delete() # delete the issue from recent visits @@ -753,6 +766,12 @@ class IssueViewSet(BaseViewSet): origin=base_host(request=request, is_app=True), subscriber=False, ) + publish_issue_event_on_commit( + "issue.deleted", + issue, + actor_id=request.user.id, + changed_fields=["deleted_at"], + ) return Response(status=status.HTTP_204_NO_CONTENT) diff --git a/plane-src/apps/live/src/controllers/index.ts b/plane-src/apps/live/src/controllers/index.ts index 2ae3bce..db4e458 100644 --- a/plane-src/apps/live/src/controllers/index.ts +++ b/plane-src/apps/live/src/controllers/index.ts @@ -7,6 +7,13 @@ import { CollaborationController } from "./collaboration.controller"; import { DocumentController } from "./document.controller"; import { HealthController } from "./health.controller"; +import { IssueStreamController } from "./issue-stream.controller"; import { PdfExportController } from "./pdf-export.controller"; -export const CONTROLLERS = [CollaborationController, DocumentController, HealthController, PdfExportController]; +export const CONTROLLERS = [ + CollaborationController, + DocumentController, + HealthController, + IssueStreamController, + PdfExportController, +]; diff --git a/plane-src/apps/live/src/controllers/issue-stream.controller.ts b/plane-src/apps/live/src/controllers/issue-stream.controller.ts new file mode 100644 index 0000000..0b09367 --- /dev/null +++ b/plane-src/apps/live/src/controllers/issue-stream.controller.ts @@ -0,0 +1,139 @@ +/** + * Copyright (c) 2023-present Plane Software, Inc. and contributors + * SPDX-License-Identifier: AGPL-3.0-only + * See the LICENSE file for details. + */ + +import type { Request } from "express"; +import type Redis from "ioredis"; +import type { WebSocket as WSSocket } from "ws"; +// plane imports +import { Controller, WebSocket as WSDecorator } from "@plane/decorators"; +import { logger } from "@plane/logger"; +// redis +import { redisManager } from "@/redis"; +// services +import { ProjectMemberService } from "@/services/project-member.service"; +import { UserService } from "@/services/user.service"; + +const ISSUE_EVENT_CHANNEL_PREFIX = "plane:issue-events:project"; +const HEARTBEAT_INTERVAL_MS = 25_000; + +type TIssueRealtimeEvent = { + event_id?: string; + type?: string; + project_id?: string; +}; + +const getQueryValue = (value: unknown) => (typeof value === "string" && value.trim() ? value.trim() : undefined); + +const sendJson = (ws: WSSocket, payload: Record) => { + if (ws.readyState !== 1) return; + ws.send(JSON.stringify(payload)); +}; + +@Controller("/issues") +export class IssueStreamController { + [key: string]: unknown; + + @WSDecorator("/stream") + handleConnection(ws: WSSocket, req: Request) { + void this.handleIssueStream(ws, req); + } + + private async handleIssueStream(ws: WSSocket, req: Request) { + const workspaceSlug = getQueryValue(req.query.workspaceSlug); + const projectId = getQueryValue(req.query.projectId); + const cookie = req.headers.cookie?.toString(); + + if (!workspaceSlug || !projectId || !cookie) { + ws.close(1008, "Missing issue stream credentials"); + return; + } + + let subscriber: Redis | undefined; + let heartbeat: NodeJS.Timeout | undefined; + + const cleanup = async () => { + if (heartbeat) clearInterval(heartbeat); + + if (subscriber) { + try { + await subscriber.unsubscribe(); + subscriber.disconnect(); + } catch (error) { + logger.error("ISSUE_STREAM_CONTROLLER: Redis cleanup failed:", error); + } + } + }; + + try { + const userService = new UserService(); + const projectMemberService = new ProjectMemberService(); + const user = await userService.currentUser(cookie); + + await projectMemberService.currentProjectMember(cookie, workspaceSlug, projectId); + + const redisClient = redisManager.getClient(); + if (!redisClient) { + ws.close(1011, "Issue stream unavailable"); + return; + } + + const channel = `${ISSUE_EVENT_CHANNEL_PREFIX}:${projectId}`; + subscriber = redisClient.duplicate({ lazyConnect: true }); + await subscriber.connect(); + await subscriber.subscribe(channel); + + subscriber.on("message", (_channel, message) => { + try { + const event = JSON.parse(message) as TIssueRealtimeEvent; + if (event.project_id !== projectId || !event.type?.startsWith("issue.")) return; + + sendJson(ws, event as Record); + } catch (error) { + logger.error("ISSUE_STREAM_CONTROLLER: Failed to forward issue event:", error); + } + }); + + subscriber.on("error", (error) => { + logger.error("ISSUE_STREAM_CONTROLLER: Redis subscriber error:", error); + ws.close(1011, "Issue stream subscriber failed"); + }); + + heartbeat = setInterval(() => { + sendJson(ws, { type: "issue.stream.ping", server_ts: new Date().toISOString() }); + }, HEARTBEAT_INTERVAL_MS); + + sendJson(ws, { + type: "issue.stream.ready", + project_id: projectId, + user_id: user.id, + }); + } catch (error) { + logger.error("ISSUE_STREAM_CONTROLLER: WebSocket authentication failed:", error); + ws.close(1008, "Issue stream authentication failed"); + await cleanup(); + return; + } + + ws.on("message", (message) => { + try { + const payload = JSON.parse(message.toString()) as { type?: string }; + if (payload.type === "issue.stream.pong") return; + } catch { + // Client messages are optional for this stream. + } + }); + + ws.on("close", () => { + void cleanup(); + }); + + ws.on("error", (error: Error) => { + logger.error("ISSUE_STREAM_CONTROLLER: WebSocket connection error:", error); + ws.close(1011, "Issue stream connection failed"); + void cleanup(); + }); + } +} diff --git a/plane-src/apps/live/src/services/project-member.service.ts b/plane-src/apps/live/src/services/project-member.service.ts new file mode 100644 index 0000000..bcbd06f --- /dev/null +++ b/plane-src/apps/live/src/services/project-member.service.ts @@ -0,0 +1,18 @@ +/** + * Copyright (c) 2023-present Plane Software, Inc. and contributors + * SPDX-License-Identifier: AGPL-3.0-only + * See the LICENSE file for details. + */ + +// services +import { APIService } from "@/services/api.service"; + +export class ProjectMemberService extends APIService { + async currentProjectMember(cookie: string, workspaceSlug: string, projectId: string) { + return this.get(`/api/workspaces/${workspaceSlug}/projects/${projectId}/project-members/me/`, { + headers: { + Cookie: cookie, + }, + }).then((response) => response?.data); + } +} diff --git a/plane-src/apps/web/core/components/issues/issue-layouts/issue-layout-HOC.tsx b/plane-src/apps/web/core/components/issues/issue-layouts/issue-layout-HOC.tsx index 50a0323..2750f4b 100644 --- a/plane-src/apps/web/core/components/issues/issue-layouts/issue-layout-HOC.tsx +++ b/plane-src/apps/web/core/components/issues/issue-layouts/issue-layout-HOC.tsx @@ -5,6 +5,7 @@ */ import { observer } from "mobx-react"; +import { useParams } from "next/navigation"; // plane imports import { EIssueLayoutTypes } from "@plane/types"; // components @@ -15,6 +16,7 @@ import { ListLayoutLoader } from "@/components/ui/loader/layouts/list-layout-loa import { SpreadsheetLayoutLoader } from "@/components/ui/loader/layouts/spreadsheet-layout-loader"; // hooks import { useIssues } from "@/hooks/store/use-issues"; +import { useIssueRealtimeEvents } from "@/hooks/use-issue-realtime-events"; import { useIssueStoreType } from "@/hooks/use-issue-layout-store"; // local imports import { IssueLayoutEmptyState } from "./empty-states"; @@ -44,9 +46,11 @@ interface Props { export const IssueLayoutHOC = observer(function IssueLayoutHOC(props: Props) { const { layout } = props; + const { workspaceSlug, projectId } = useParams(); const storeType = useIssueStoreType(); const { issues } = useIssues(storeType); + useIssueRealtimeEvents(storeType, workspaceSlug?.toString(), projectId?.toString()); const issueCount = issues.getGroupIssueCount(undefined, undefined, false); diff --git a/plane-src/apps/web/core/hooks/use-issue-realtime-events.ts b/plane-src/apps/web/core/hooks/use-issue-realtime-events.ts new file mode 100644 index 0000000..647199a --- /dev/null +++ b/plane-src/apps/web/core/hooks/use-issue-realtime-events.ts @@ -0,0 +1,238 @@ +/** + * Copyright (c) 2023-present Plane Software, Inc. and contributors + * SPDX-License-Identifier: AGPL-3.0-only + * See the LICENSE file for details. + */ + +import { useEffect, useRef } from "react"; +// plane imports +import { LIVE_BASE_PATH, LIVE_BASE_URL } from "@plane/constants"; +import type { TIssue } from "@plane/types"; +import { EIssuesStoreType } from "@plane/types"; +// hooks +import { useIssues } from "@/hooks/store/use-issues"; +// services +import { IssueService } from "@/services/issue"; + +type TIssueRealtimeEvent = { + event_id: string; + type: "issue.created" | "issue.updated" | "issue.deleted" | "issue.stream.ready" | "issue.stream.ping"; + workspace_slug?: string; + project_id?: string; + issue_id?: string; + updated_at?: string; +}; + +type TRealtimeIssueStore = { + addIssue?: (issue: TIssue, shouldUpdateList?: boolean) => void; + groupedIssueIds?: Record; + removeIssueFromList?: (issueId: string) => void; + updateIssueList?: (issue?: TIssue, issueBeforeUpdate?: TIssue) => void; + rootIssueStore?: { + issues?: { + removeIssue?: (issueId: string) => void; + }; + }; +}; + +type TIssueFilterSnapshot = { + appliedFilters?: Record; +}; + +const REALTIME_STORE_TYPES = new Set([EIssuesStoreType.PROJECT, EIssuesStoreType.PROJECT_VIEW]); +const MAX_PROCESSED_EVENTS = 250; + +const hasIssueId = (value: unknown, issueId: string): boolean => { + if (Array.isArray(value)) return value.includes(issueId); + if (!value || typeof value !== "object") return false; + + return Object.values(value).some((nestedValue) => hasIssueId(nestedValue, issueId)); +}; + +const buildIssueStreamUrl = (workspaceSlug: string, projectId: string) => { + const liveBaseUrl = LIVE_BASE_URL?.trim() || window.location.origin; + const liveBasePath = LIVE_BASE_PATH?.trim() || "/live"; + const url = new URL(liveBaseUrl); + + url.protocol = window.location.protocol === "https:" ? "wss:" : "ws:"; + url.pathname = `${liveBasePath.replace(/\/$/, "")}/issues/stream`; + url.searchParams.set("workspaceSlug", workspaceSlug); + url.searchParams.set("projectId", projectId); + + return url.toString(); +}; + +export const useIssueRealtimeEvents = (storeType: EIssuesStoreType, workspaceSlug?: string, projectId?: string) => { + const { issueMap, issues, issuesFilter } = useIssues(storeType); + const issueServiceRef = useRef(new IssueService()); + const issueMapRef = useRef(issueMap); + const issuesRef = useRef(issues as TRealtimeIssueStore); + const issueFilterRef = useRef(issuesFilter as TIssueFilterSnapshot); + const processedEventIdsRef = useRef([]); + const processedEventSetRef = useRef(new Set()); + const lastSeenUpdatedAtRef = useRef(); + + useEffect(() => { + issueMapRef.current = issueMap; + }, [issueMap]); + + useEffect(() => { + issuesRef.current = issues as TRealtimeIssueStore; + }, [issues]); + + useEffect(() => { + issueFilterRef.current = issuesFilter as TIssueFilterSnapshot; + }, [issuesFilter]); + + useEffect(() => { + if (!workspaceSlug || !projectId || !REALTIME_STORE_TYPES.has(storeType) || typeof window === "undefined") return; + + let socket: WebSocket | undefined; + let reconnectTimer: ReturnType | undefined; + let cancelled = false; + let reconnectAttempt = 0; + let hasConnectedOnce = false; + + const getFilterParams = () => { + const filters = { ...(issueFilterRef.current?.appliedFilters ?? {}) }; + delete filters.cursor; + delete filters.group_by; + delete filters.per_page; + delete filters.sub_group_by; + + return filters; + }; + + const rememberEvent = (eventId: string) => { + if (processedEventSetRef.current.has(eventId)) return false; + + processedEventIdsRef.current.push(eventId); + processedEventSetRef.current.add(eventId); + + if (processedEventIdsRef.current.length > MAX_PROCESSED_EVENTS) { + const removedEventId = processedEventIdsRef.current.shift(); + if (removedEventId) processedEventSetRef.current.delete(removedEventId); + } + + return true; + }; + + const applyIssue = (issue: TIssue) => { + const realtimeStore = issuesRef.current; + const issueBeforeUpdate = issueMapRef.current?.[issue.id]; + + if (!issueBeforeUpdate || !hasIssueId(realtimeStore.groupedIssueIds, issue.id)) { + realtimeStore.addIssue?.(issue, true); + return; + } + + realtimeStore.addIssue?.(issue, false); + realtimeStore.updateIssueList?.(issue, issueBeforeUpdate); + }; + + const removeIssue = (issueId: string, removeFromMap = false) => { + const realtimeStore = issuesRef.current; + + realtimeStore.removeIssueFromList?.(issueId); + if (removeFromMap) realtimeStore.rootIssueStore?.issues?.removeIssue?.(issueId); + }; + + const fetchIssue = async (issueId: string) => { + const issues = await issueServiceRef.current.retrieveIssues(workspaceSlug, projectId, [issueId], getFilterParams()); + + return issues?.[0]; + }; + + const handleIssueEvent = async (event: TIssueRealtimeEvent) => { + if (!event.event_id || !event.issue_id) return; + if (!rememberEvent(event.event_id)) return; + if (event.updated_at) lastSeenUpdatedAtRef.current = event.updated_at; + + if (event.type === "issue.deleted") { + removeIssue(event.issue_id, true); + return; + } + + const issue = await fetchIssue(event.issue_id); + if (!issue) { + removeIssue(event.issue_id); + return; + } + + applyIssue(issue); + }; + + const catchUpMissedEvents = async () => { + const updatedAt = lastSeenUpdatedAtRef.current; + if (!updatedAt) return; + + const response = await issueServiceRef.current.getIssues(workspaceSlug, projectId, { + ...getFilterParams(), + updated_at__gt: updatedAt, + per_page: "100", + }); + + const results = response?.results; + if (!Array.isArray(results)) return; + + results.forEach(applyIssue); + }; + + const scheduleReconnect = () => { + if (cancelled) return; + const delay = Math.min(1000 * 2 ** reconnectAttempt, 15000); + reconnectAttempt += 1; + reconnectTimer = setTimeout(connect, delay); + }; + + const connect = () => { + try { + socket = new WebSocket(buildIssueStreamUrl(workspaceSlug, projectId)); + + socket.onopen = () => { + reconnectAttempt = 0; + if (hasConnectedOnce) void catchUpMissedEvents(); + hasConnectedOnce = true; + }; + + socket.onmessage = (message) => { + try { + const event = JSON.parse(message.data) as TIssueRealtimeEvent; + + if (event.type === "issue.stream.ping") { + socket?.send(JSON.stringify({ type: "issue.stream.pong" })); + return; + } + + if (event.type === "issue.stream.ready") return; + if (event.workspace_slug && event.workspace_slug !== workspaceSlug) return; + if (event.project_id && event.project_id !== projectId) return; + + void handleIssueEvent(event); + } catch (error) { + console.error("Failed to process issue realtime event", error); + } + }; + + socket.onclose = () => { + scheduleReconnect(); + }; + + socket.onerror = () => { + socket?.close(); + }; + } catch (error) { + console.error("Failed to connect issue realtime stream", error); + scheduleReconnect(); + } + }; + + connect(); + + return () => { + cancelled = true; + if (reconnectTimer) clearTimeout(reconnectTimer); + socket?.close(); + }; + }, [storeType, workspaceSlug, projectId]); +}; diff --git a/plane-src/apps/web/core/services/issue/issue.service.ts b/plane-src/apps/web/core/services/issue/issue.service.ts index f80ed38..6abaa10 100644 --- a/plane-src/apps/web/core/services/issue/issue.service.ts +++ b/plane-src/apps/web/core/services/issue/issue.service.ts @@ -80,7 +80,7 @@ export class IssueService extends APIService { async getIssues( workspaceSlug: string, projectId: string, - queries?: Partial>, + queries?: Partial> | Record, config = {} ): Promise { return this.getIssuesFromServer(workspaceSlug, projectId, queries, config); @@ -126,9 +126,14 @@ export class IssueService extends APIService { }); } - async retrieveIssues(workspaceSlug: string, projectId: string, issueIds: string[]): Promise { + async retrieveIssues( + workspaceSlug: string, + projectId: string, + issueIds: string[], + queries?: Record + ): Promise { return this.get(`/api/workspaces/${workspaceSlug}/projects/${projectId}/${this.serviceType}/list/`, { - params: { issues: issueIds.join(",") }, + params: { ...queries, issues: issueIds.join(",") }, }) .then(async (response) => response?.data) .catch((error) => {