ФУНКЦИИ - МЕЖПРОЕКТНАЯ КОММУНИКАЦИЯ: realtime канал карточек задач

This commit is contained in:
DCCONSTRUCTIONS 2026-04-29 11:41:58 +03:00
parent 83c61a85b4
commit b2a710a7ec
13 changed files with 491 additions and 9 deletions

View File

@ -60,6 +60,7 @@ cd /Users/dcconstructions/Downloads/mnt/data/dc_taskmanager/NODEDC_TASKMANAGER/p
docker build -t nodedc/plane-backend:local -f Dockerfile.api .
cd /Users/dcconstructions/Downloads/mnt/data/dc_taskmanager/NODEDC_TASKMANAGER/plane-src
docker build -t nodedc/plane-live:local -f apps/live/Dockerfile.live .
docker build -t nodedc/plane-frontend:ru -f apps/web/Dockerfile.web .
docker build -t nodedc/plane-admin:ru -f apps/admin/Dockerfile.admin .
docker build -t nodedc/plane-space:ru -f apps/space/Dockerfile.space .

View File

@ -120,6 +120,7 @@ cd /Users/dcconstructions/Downloads/mnt/data/dc_taskmanager/NODEDC_TASKMANAGER
```bash
cd /Users/dcconstructions/Downloads/mnt/data/dc_taskmanager/NODEDC_TASKMANAGER/plane-src
docker build -t nodedc/plane-live:local -f apps/live/Dockerfile.live .
docker build -t nodedc/plane-frontend:ru -f apps/web/Dockerfile.web .
docker build -t nodedc/plane-admin:ru -f apps/admin/Dockerfile.admin .
docker build -t nodedc/plane-space:ru -f apps/space/Dockerfile.space .

View File

@ -99,7 +99,7 @@ services:
- web
live:
image: makeplane/plane-live:${APP_RELEASE:-v1.3.0}
image: nodedc/plane-live:local
environment:
<<: [*live-env, *redis-env]
deploy:

View File

@ -99,7 +99,7 @@ services:
- web
live:
image: makeplane/plane-live:${APP_RELEASE:-v1.3.0}
image: nodedc/plane-live:local
environment:
<<: [*live-env, *redis-env]
deploy:

View File

@ -0,0 +1,3 @@
# Copyright (c) 2023-present Plane Software, Inc. and contributors
# SPDX-License-Identifier: AGPL-3.0-only
# See the LICENSE file for details.

View File

@ -0,0 +1,47 @@
# Copyright (c) 2023-present Plane Software, Inc. and contributors
# SPDX-License-Identifier: AGPL-3.0-only
# See the LICENSE file for details.
import json
import logging
from uuid import uuid4
from django.core.serializers.json import DjangoJSONEncoder
from django.db import transaction
from django.utils import timezone
from plane.settings.redis import redis_instance
logger = logging.getLogger(__name__)
ISSUE_EVENT_CHANNEL_PREFIX = "plane:issue-events:project"
def issue_event_channel(project_id):
return f"{ISSUE_EVENT_CHANNEL_PREFIX}:{project_id}"
def publish_issue_event_on_commit(event_type, issue, actor_id=None, changed_fields=None):
payload = {
"event_id": str(uuid4()),
"type": event_type,
"workspace_id": str(issue.workspace_id),
"workspace_slug": issue.workspace.slug if getattr(issue, "workspace", None) else None,
"project_id": str(issue.project_id),
"issue_id": str(issue.id),
"sequence_id": issue.sequence_id,
"updated_at": issue.updated_at or timezone.now(),
"actor_id": str(actor_id) if actor_id else None,
"changed_fields": sorted(set(changed_fields or [])),
}
def _publish():
try:
redis_instance().publish(
issue_event_channel(issue.project_id),
json.dumps(payload, cls=DjangoJSONEncoder),
)
except Exception:
logger.exception("Failed to publish issue realtime event")
transaction.on_commit(_publish)

View File

@ -33,6 +33,7 @@ from rest_framework.response import Response
# Module imports
from plane.app.permissions import ROLE, allow_permission
from plane.app.realtime.issue_events import publish_issue_event_on_commit
from plane.app.serializers import (
IssueCreateSerializer,
IssueDetailSerializer,
@ -429,7 +430,7 @@ class IssueViewSet(BaseViewSet):
)
if serializer.is_valid():
serializer.save()
issue_instance = serializer.save()
# Track the issue
issue_activity.delay(
@ -502,6 +503,12 @@ class IssueViewSet(BaseViewSet):
user_id=request.user.id,
is_creating=True,
)
publish_issue_event_on_commit(
"issue.created",
issue_instance,
actor_id=request.user.id,
changed_fields=request.data.keys(),
)
return Response(issue, status=status.HTTP_201_CREATED)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
@ -695,7 +702,7 @@ class IssueViewSet(BaseViewSet):
requested_data = json.dumps(self.request.data, cls=DjangoJSONEncoder)
serializer = IssueCreateSerializer(issue, data=request.data, partial=True, context={"project_id": project_id})
if serializer.is_valid():
serializer.save()
updated_issue = serializer.save()
# Check if the update is a migration description update
is_migration_description_update = skip_activity and is_description_update
# Log all the updates
@ -726,12 +733,18 @@ class IssueViewSet(BaseViewSet):
issue_id=str(serializer.data.get("id", None)),
user_id=request.user.id,
)
publish_issue_event_on_commit(
"issue.updated",
updated_issue,
actor_id=request.user.id,
changed_fields=request.data.keys(),
)
return Response(status=status.HTTP_204_NO_CONTENT)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
@allow_permission([ROLE.ADMIN], creator=True, model=Issue)
def destroy(self, request, slug, project_id, pk=None):
issue = Issue.objects.get(workspace__slug=slug, project_id=project_id, pk=pk)
issue = Issue.objects.select_related("workspace").get(workspace__slug=slug, project_id=project_id, pk=pk)
issue.delete()
# delete the issue from recent visits
@ -753,6 +766,12 @@ class IssueViewSet(BaseViewSet):
origin=base_host(request=request, is_app=True),
subscriber=False,
)
publish_issue_event_on_commit(
"issue.deleted",
issue,
actor_id=request.user.id,
changed_fields=["deleted_at"],
)
return Response(status=status.HTTP_204_NO_CONTENT)

View File

@ -7,6 +7,13 @@
import { CollaborationController } from "./collaboration.controller";
import { DocumentController } from "./document.controller";
import { HealthController } from "./health.controller";
import { IssueStreamController } from "./issue-stream.controller";
import { PdfExportController } from "./pdf-export.controller";
export const CONTROLLERS = [CollaborationController, DocumentController, HealthController, PdfExportController];
export const CONTROLLERS = [
CollaborationController,
DocumentController,
HealthController,
IssueStreamController,
PdfExportController,
];

View File

@ -0,0 +1,139 @@
/**
* Copyright (c) 2023-present Plane Software, Inc. and contributors
* SPDX-License-Identifier: AGPL-3.0-only
* See the LICENSE file for details.
*/
import type { Request } from "express";
import type Redis from "ioredis";
import type { WebSocket as WSSocket } from "ws";
// plane imports
import { Controller, WebSocket as WSDecorator } from "@plane/decorators";
import { logger } from "@plane/logger";
// redis
import { redisManager } from "@/redis";
// services
import { ProjectMemberService } from "@/services/project-member.service";
import { UserService } from "@/services/user.service";
const ISSUE_EVENT_CHANNEL_PREFIX = "plane:issue-events:project";
const HEARTBEAT_INTERVAL_MS = 25_000;
type TIssueRealtimeEvent = {
event_id?: string;
type?: string;
project_id?: string;
};
const getQueryValue = (value: unknown) => (typeof value === "string" && value.trim() ? value.trim() : undefined);
const sendJson = (ws: WSSocket, payload: Record<string, unknown>) => {
if (ws.readyState !== 1) return;
ws.send(JSON.stringify(payload));
};
@Controller("/issues")
export class IssueStreamController {
[key: string]: unknown;
@WSDecorator("/stream")
handleConnection(ws: WSSocket, req: Request) {
void this.handleIssueStream(ws, req);
}
private async handleIssueStream(ws: WSSocket, req: Request) {
const workspaceSlug = getQueryValue(req.query.workspaceSlug);
const projectId = getQueryValue(req.query.projectId);
const cookie = req.headers.cookie?.toString();
if (!workspaceSlug || !projectId || !cookie) {
ws.close(1008, "Missing issue stream credentials");
return;
}
let subscriber: Redis | undefined;
let heartbeat: NodeJS.Timeout | undefined;
const cleanup = async () => {
if (heartbeat) clearInterval(heartbeat);
if (subscriber) {
try {
await subscriber.unsubscribe();
subscriber.disconnect();
} catch (error) {
logger.error("ISSUE_STREAM_CONTROLLER: Redis cleanup failed:", error);
}
}
};
try {
const userService = new UserService();
const projectMemberService = new ProjectMemberService();
const user = await userService.currentUser(cookie);
await projectMemberService.currentProjectMember(cookie, workspaceSlug, projectId);
const redisClient = redisManager.getClient();
if (!redisClient) {
ws.close(1011, "Issue stream unavailable");
return;
}
const channel = `${ISSUE_EVENT_CHANNEL_PREFIX}:${projectId}`;
subscriber = redisClient.duplicate({ lazyConnect: true });
await subscriber.connect();
await subscriber.subscribe(channel);
subscriber.on("message", (_channel, message) => {
try {
const event = JSON.parse(message) as TIssueRealtimeEvent;
if (event.project_id !== projectId || !event.type?.startsWith("issue.")) return;
sendJson(ws, event as Record<string, unknown>);
} catch (error) {
logger.error("ISSUE_STREAM_CONTROLLER: Failed to forward issue event:", error);
}
});
subscriber.on("error", (error) => {
logger.error("ISSUE_STREAM_CONTROLLER: Redis subscriber error:", error);
ws.close(1011, "Issue stream subscriber failed");
});
heartbeat = setInterval(() => {
sendJson(ws, { type: "issue.stream.ping", server_ts: new Date().toISOString() });
}, HEARTBEAT_INTERVAL_MS);
sendJson(ws, {
type: "issue.stream.ready",
project_id: projectId,
user_id: user.id,
});
} catch (error) {
logger.error("ISSUE_STREAM_CONTROLLER: WebSocket authentication failed:", error);
ws.close(1008, "Issue stream authentication failed");
await cleanup();
return;
}
ws.on("message", (message) => {
try {
const payload = JSON.parse(message.toString()) as { type?: string };
if (payload.type === "issue.stream.pong") return;
} catch {
// Client messages are optional for this stream.
}
});
ws.on("close", () => {
void cleanup();
});
ws.on("error", (error: Error) => {
logger.error("ISSUE_STREAM_CONTROLLER: WebSocket connection error:", error);
ws.close(1011, "Issue stream connection failed");
void cleanup();
});
}
}

View File

@ -0,0 +1,18 @@
/**
* Copyright (c) 2023-present Plane Software, Inc. and contributors
* SPDX-License-Identifier: AGPL-3.0-only
* See the LICENSE file for details.
*/
// services
import { APIService } from "@/services/api.service";
export class ProjectMemberService extends APIService {
async currentProjectMember(cookie: string, workspaceSlug: string, projectId: string) {
return this.get(`/api/workspaces/${workspaceSlug}/projects/${projectId}/project-members/me/`, {
headers: {
Cookie: cookie,
},
}).then((response) => response?.data);
}
}

View File

@ -5,6 +5,7 @@
*/
import { observer } from "mobx-react";
import { useParams } from "next/navigation";
// plane imports
import { EIssueLayoutTypes } from "@plane/types";
// components
@ -15,6 +16,7 @@ import { ListLayoutLoader } from "@/components/ui/loader/layouts/list-layout-loa
import { SpreadsheetLayoutLoader } from "@/components/ui/loader/layouts/spreadsheet-layout-loader";
// hooks
import { useIssues } from "@/hooks/store/use-issues";
import { useIssueRealtimeEvents } from "@/hooks/use-issue-realtime-events";
import { useIssueStoreType } from "@/hooks/use-issue-layout-store";
// local imports
import { IssueLayoutEmptyState } from "./empty-states";
@ -44,9 +46,11 @@ interface Props {
export const IssueLayoutHOC = observer(function IssueLayoutHOC(props: Props) {
const { layout } = props;
const { workspaceSlug, projectId } = useParams();
const storeType = useIssueStoreType();
const { issues } = useIssues(storeType);
useIssueRealtimeEvents(storeType, workspaceSlug?.toString(), projectId?.toString());
const issueCount = issues.getGroupIssueCount(undefined, undefined, false);

View File

@ -0,0 +1,238 @@
/**
* Copyright (c) 2023-present Plane Software, Inc. and contributors
* SPDX-License-Identifier: AGPL-3.0-only
* See the LICENSE file for details.
*/
import { useEffect, useRef } from "react";
// plane imports
import { LIVE_BASE_PATH, LIVE_BASE_URL } from "@plane/constants";
import type { TIssue } from "@plane/types";
import { EIssuesStoreType } from "@plane/types";
// hooks
import { useIssues } from "@/hooks/store/use-issues";
// services
import { IssueService } from "@/services/issue";
type TIssueRealtimeEvent = {
event_id: string;
type: "issue.created" | "issue.updated" | "issue.deleted" | "issue.stream.ready" | "issue.stream.ping";
workspace_slug?: string;
project_id?: string;
issue_id?: string;
updated_at?: string;
};
type TRealtimeIssueStore = {
addIssue?: (issue: TIssue, shouldUpdateList?: boolean) => void;
groupedIssueIds?: Record<string, unknown>;
removeIssueFromList?: (issueId: string) => void;
updateIssueList?: (issue?: TIssue, issueBeforeUpdate?: TIssue) => void;
rootIssueStore?: {
issues?: {
removeIssue?: (issueId: string) => void;
};
};
};
type TIssueFilterSnapshot = {
appliedFilters?: Record<string, string | boolean>;
};
const REALTIME_STORE_TYPES = new Set<EIssuesStoreType>([EIssuesStoreType.PROJECT, EIssuesStoreType.PROJECT_VIEW]);
const MAX_PROCESSED_EVENTS = 250;
const hasIssueId = (value: unknown, issueId: string): boolean => {
if (Array.isArray(value)) return value.includes(issueId);
if (!value || typeof value !== "object") return false;
return Object.values(value).some((nestedValue) => hasIssueId(nestedValue, issueId));
};
const buildIssueStreamUrl = (workspaceSlug: string, projectId: string) => {
const liveBaseUrl = LIVE_BASE_URL?.trim() || window.location.origin;
const liveBasePath = LIVE_BASE_PATH?.trim() || "/live";
const url = new URL(liveBaseUrl);
url.protocol = window.location.protocol === "https:" ? "wss:" : "ws:";
url.pathname = `${liveBasePath.replace(/\/$/, "")}/issues/stream`;
url.searchParams.set("workspaceSlug", workspaceSlug);
url.searchParams.set("projectId", projectId);
return url.toString();
};
export const useIssueRealtimeEvents = (storeType: EIssuesStoreType, workspaceSlug?: string, projectId?: string) => {
const { issueMap, issues, issuesFilter } = useIssues(storeType);
const issueServiceRef = useRef(new IssueService());
const issueMapRef = useRef(issueMap);
const issuesRef = useRef<TRealtimeIssueStore>(issues as TRealtimeIssueStore);
const issueFilterRef = useRef<TIssueFilterSnapshot>(issuesFilter as TIssueFilterSnapshot);
const processedEventIdsRef = useRef<string[]>([]);
const processedEventSetRef = useRef(new Set<string>());
const lastSeenUpdatedAtRef = useRef<string | undefined>();
useEffect(() => {
issueMapRef.current = issueMap;
}, [issueMap]);
useEffect(() => {
issuesRef.current = issues as TRealtimeIssueStore;
}, [issues]);
useEffect(() => {
issueFilterRef.current = issuesFilter as TIssueFilterSnapshot;
}, [issuesFilter]);
useEffect(() => {
if (!workspaceSlug || !projectId || !REALTIME_STORE_TYPES.has(storeType) || typeof window === "undefined") return;
let socket: WebSocket | undefined;
let reconnectTimer: ReturnType<typeof setTimeout> | undefined;
let cancelled = false;
let reconnectAttempt = 0;
let hasConnectedOnce = false;
const getFilterParams = () => {
const filters = { ...(issueFilterRef.current?.appliedFilters ?? {}) };
delete filters.cursor;
delete filters.group_by;
delete filters.per_page;
delete filters.sub_group_by;
return filters;
};
const rememberEvent = (eventId: string) => {
if (processedEventSetRef.current.has(eventId)) return false;
processedEventIdsRef.current.push(eventId);
processedEventSetRef.current.add(eventId);
if (processedEventIdsRef.current.length > MAX_PROCESSED_EVENTS) {
const removedEventId = processedEventIdsRef.current.shift();
if (removedEventId) processedEventSetRef.current.delete(removedEventId);
}
return true;
};
const applyIssue = (issue: TIssue) => {
const realtimeStore = issuesRef.current;
const issueBeforeUpdate = issueMapRef.current?.[issue.id];
if (!issueBeforeUpdate || !hasIssueId(realtimeStore.groupedIssueIds, issue.id)) {
realtimeStore.addIssue?.(issue, true);
return;
}
realtimeStore.addIssue?.(issue, false);
realtimeStore.updateIssueList?.(issue, issueBeforeUpdate);
};
const removeIssue = (issueId: string, removeFromMap = false) => {
const realtimeStore = issuesRef.current;
realtimeStore.removeIssueFromList?.(issueId);
if (removeFromMap) realtimeStore.rootIssueStore?.issues?.removeIssue?.(issueId);
};
const fetchIssue = async (issueId: string) => {
const issues = await issueServiceRef.current.retrieveIssues(workspaceSlug, projectId, [issueId], getFilterParams());
return issues?.[0];
};
const handleIssueEvent = async (event: TIssueRealtimeEvent) => {
if (!event.event_id || !event.issue_id) return;
if (!rememberEvent(event.event_id)) return;
if (event.updated_at) lastSeenUpdatedAtRef.current = event.updated_at;
if (event.type === "issue.deleted") {
removeIssue(event.issue_id, true);
return;
}
const issue = await fetchIssue(event.issue_id);
if (!issue) {
removeIssue(event.issue_id);
return;
}
applyIssue(issue);
};
const catchUpMissedEvents = async () => {
const updatedAt = lastSeenUpdatedAtRef.current;
if (!updatedAt) return;
const response = await issueServiceRef.current.getIssues(workspaceSlug, projectId, {
...getFilterParams(),
updated_at__gt: updatedAt,
per_page: "100",
});
const results = response?.results;
if (!Array.isArray(results)) return;
results.forEach(applyIssue);
};
const scheduleReconnect = () => {
if (cancelled) return;
const delay = Math.min(1000 * 2 ** reconnectAttempt, 15000);
reconnectAttempt += 1;
reconnectTimer = setTimeout(connect, delay);
};
const connect = () => {
try {
socket = new WebSocket(buildIssueStreamUrl(workspaceSlug, projectId));
socket.onopen = () => {
reconnectAttempt = 0;
if (hasConnectedOnce) void catchUpMissedEvents();
hasConnectedOnce = true;
};
socket.onmessage = (message) => {
try {
const event = JSON.parse(message.data) as TIssueRealtimeEvent;
if (event.type === "issue.stream.ping") {
socket?.send(JSON.stringify({ type: "issue.stream.pong" }));
return;
}
if (event.type === "issue.stream.ready") return;
if (event.workspace_slug && event.workspace_slug !== workspaceSlug) return;
if (event.project_id && event.project_id !== projectId) return;
void handleIssueEvent(event);
} catch (error) {
console.error("Failed to process issue realtime event", error);
}
};
socket.onclose = () => {
scheduleReconnect();
};
socket.onerror = () => {
socket?.close();
};
} catch (error) {
console.error("Failed to connect issue realtime stream", error);
scheduleReconnect();
}
};
connect();
return () => {
cancelled = true;
if (reconnectTimer) clearTimeout(reconnectTimer);
socket?.close();
};
}, [storeType, workspaceSlug, projectId]);
};

View File

@ -80,7 +80,7 @@ export class IssueService extends APIService {
async getIssues(
workspaceSlug: string,
projectId: string,
queries?: Partial<Record<TIssueParams, string | boolean>>,
queries?: Partial<Record<TIssueParams, string | boolean>> | Record<string, string | boolean>,
config = {}
): Promise<TIssuesResponse> {
return this.getIssuesFromServer(workspaceSlug, projectId, queries, config);
@ -126,9 +126,14 @@ export class IssueService extends APIService {
});
}
async retrieveIssues(workspaceSlug: string, projectId: string, issueIds: string[]): Promise<TIssue[]> {
async retrieveIssues(
workspaceSlug: string,
projectId: string,
issueIds: string[],
queries?: Record<string, string | boolean>
): Promise<TIssue[]> {
return this.get(`/api/workspaces/${workspaceSlug}/projects/${projectId}/${this.serviceType}/list/`, {
params: { issues: issueIds.join(",") },
params: { ...queries, issues: issueIds.join(",") },
})
.then(async (response) => response?.data)
.catch((error) => {