Architecture Deep Dive: Data Flow & State Management - Part 2
Architecture Deep Dive: Data Flow & State Management - Part 2
Comprehensive technical breakdown of TS Starter's React Query + oRPC integration, real-time updates, and caching strategies
Part 2: Data Flow & State Management
Welcome to Part 2 of our architecture deep-dive series! In this part, we'll explore how TS Starter manages data flow, state, and real-time updates using React Query + oRPC integration patterns.
🎯 What You'll Learn
- React Query + oRPC integration patterns
- Real-time updates with Event Publisher
- Caching strategies and optimization
- State management best practices
1. React Query + oRPC Integration Patterns
1.1. The Perfect Type-Safe Duo
TS Starter combines React Query's powerful caching with oRPC's type-safe RPC calls to create a seamless data management experience:
// src/lib/orpc.ts - The Integration Foundation
import { createORPCReactQueryUtils } from "@orpc/react-query";
import { createRouterClient } from "@orpc/client";
import { RPCLink } from "@orpc/client/link";
const getORPCClient = createIsomorphicFn()
.server(() =>
createRouterClient(serverRouter, {
context: async () => {
const req = getWebRequest();
const ctx = await createServerOrpcContext({
headers: req.headers,
req,
});
return ctx;
},
}),
)
.client((): RouterClient<typeof serverRouter> => {
const link = new RPCLink({
url: new URL("/api/rpc", import.meta.env.VITE_BASE_URL || window.location.origin),
fetch: (input, init) =>
fetch(input, {
...init,
credentials: "include", // Include auth cookies
}),
});
return createORPCClient(link);
});
export const client: RouterClient<typeof serverRouter> = getORPCClient();
export const orpc = createORPCReactQueryUtils(client); // ← The magic integration
1.2. Query Patterns: Suspense vs Regular Queries
TS Starter uses different query patterns based on the use case:
Suspense Queries for Critical Data
// src/routes/_authed/task.tsx
export function TaskComponent() {
// CRITICAL DATA: Use Suspense for immediate loading
const { data: tasks } = useSuspenseQuery({
queryKey: ["tasks"] as const,
queryFn: () => orpc.task.getAll.call(),
staleTime: 1000, // Fresh for 1 second
refetchOnWindowFocus: true,
refetchOnMount: true,
});
// CRITICAL DATA: User session with Suspense
const { data: session } = useSuspenseQuery({
queryKey: ["session"] as const,
queryFn: () => orpc.auth.getSession.call(),
staleTime: 1000 * 60 * 5, // 5 minutes
});
return (
<div>
{tasks?.tasks?.map(task => (
<TaskItem key={task.id} task={task} />
))}
</div>
);
}
Regular Queries for Optional Data
// src/routes/_authed/dashboard.tsx
export function DashboardPage() {
// OPTIONAL DATA: Use regular queries for non-critical data
const { data: taskStats } = useQuery({
queryKey: ["taskStats"],
queryFn: () => orpc.task.getStats.call(),
staleTime: 30000, // 30 seconds
refetchOnWindowFocus: true,
enabled: !!session, // Only fetch when authenticated
});
// OPTIONAL DATA: Analytics with longer stale time
const { data: analytics } = useQuery({
queryKey: ["analytics"],
queryFn: () => orpc.task.getAnalytics.call(),
staleTime: 1000 * 60 * 2, // 2 minutes
refetchOnWindowFocus: false, // Don't refetch on focus
});
return (
<div>
{taskStats && <StatsWidget data={taskStats} />}
{analytics && <AnalyticsWidget data={analytics} />}
</div>
);
}
1.3. Mutation Patterns with Optimistic Updates
TS Starter implements sophisticated mutation patterns with optimistic updates:
// src/routes/_authed/task.tsx
export function TaskComponent() {
const queryClient = useQueryClient();
// CREATE MUTATION: With optimistic updates
const createTaskMutation = useMutation({
mutationFn: (data: CreateTaskInput) => orpc.task.create.call(data),
onMutate: async (newTask) => {
// Cancel outgoing refetches
await queryClient.cancelQueries({ queryKey: ["tasks"] });
// Snapshot previous value
const previousTasks = queryClient.getQueryData(["tasks"]);
// Optimistically update
queryClient.setQueryData(["tasks"], (old: any) => ({
...old,
tasks: [...(old?.tasks || []), {
...newTask,
id: `temp-${Date.now()}`, // Temporary ID
status: "Not Started",
createdAt: new Date(),
updatedAt: new Date(),
}]
}));
return { previousTasks };
},
onError: (err, newTask, context) => {
// Rollback on error
if (context?.previousTasks) {
queryClient.setQueryData(["tasks"], context.previousTasks);
}
toast.error("Failed to create task. Please try again.");
},
onSuccess: (data) => {
// Invalidate to get fresh data
queryClient.invalidateQueries({ queryKey: ["tasks"] });
toast.success("Task created successfully!");
},
onSettled: () => {
// Always refetch after error or success
queryClient.invalidateQueries({ queryKey: ["tasks"] });
},
});
// UPDATE MUTATION: With optimistic updates
const updateTaskMutation = useMutation({
mutationFn: ({ id, data }: { id: string; data: Partial<Task> }) =>
orpc.task.update.call({ id, ...data }),
onMutate: async ({ id, data }) => {
await queryClient.cancelQueries({ queryKey: ["tasks"] });
const previousTasks = queryClient.getQueryData(["tasks"]);
// Optimistically update the specific task
queryClient.setQueryData(["tasks"], (old: any) => ({
...old,
tasks: old?.tasks?.map((task: Task) =>
task.id === id ? { ...task, ...data, updatedAt: new Date() } : task
) || []
}));
return { previousTasks };
},
onError: (err, variables, context) => {
if (context?.previousTasks) {
queryClient.setQueryData(["tasks"], context.previousTasks);
}
toast.error("Failed to update task. Please try again.");
},
onSuccess: () => {
queryClient.invalidateQueries({ queryKey: ["tasks"] });
},
});
return (
<div>
<button
onClick={() => createTaskMutation.mutate({
text: "New task",
priority: "medium",
hrs: 1,
isPublic: false
})}
disabled={createTaskMutation.isPending}
>
{createTaskMutation.isPending ? "Creating..." : "Create Task"}
</button>
</div>
);
}
2. Real-Time Updates with Event Publisher
2.1. Event Publisher Architecture
TS Starter uses oRPC's Event Publisher for real-time updates across the application:
// src/server/orpc.ts - Event Publisher Setup
import { EventPublisher, withEventMeta } from "@orpc/server";
// Define event types with TypeScript
export const taskPublisher = new EventPublisher<{
"task-created": {
taskId: string;
userId: string;
task: Task;
};
"task-updated": {
taskId: string;
userId: string;
task: Task;
};
"task-deleted": {
taskId: string;
userId: string;
};
"task-status-changed": {
taskId: string;
userId: string;
oldStatus: string;
newStatus: string;
task: Task;
};
}>();
// Real-time task updates using Event Iterator
export const realtimeTasks = protectedProcedure.handler(async function* ({
context,
signal,
}) {
if (!context.session?.user) {
console.warn("Real-time tasks: No authenticated user");
return;
}
try {
// Send initial connection confirmation
yield withEventMeta(
{
type: "connected",
message: "Connected to real-time task updates",
userId: context.session.user.id,
},
{ id: "connection", retry: 5000 },
);
// Subscribe to task events for this user
for await (const event of taskPublisher.subscribe("task-created", {
signal,
})) {
// Only send events relevant to the current user
if (
event.userId === context.session.user.id ||
event.task.isPublic ||
(event.task.sharedWith &&
JSON.parse(event.task.sharedWith).includes(
context.session.user.email,
))
) {
yield withEventMeta(
{
type: "task-created",
task: event.task,
timestamp: new Date().toISOString(),
},
{ id: `task-created-${event.taskId}`, retry: 3000 },
);
}
}
// Subscribe to other event types...
for await (const event of taskPublisher.subscribe("task-updated", {
signal,
})) {
if (
event.userId === context.session.user.id ||
event.task.isPublic ||
(event.task.sharedWith &&
JSON.parse(event.task.sharedWith).includes(
context.session.user.email,
))
) {
yield withEventMeta(
{
type: "task-updated",
task: event.task,
timestamp: new Date().toISOString(),
},
{ id: `task-updated-${event.taskId}`, retry: 3000 },
);
}
}
} catch (error) {
console.error("Real-time connection error:", error);
yield withEventMeta(
{
type: "error",
message: "Connection lost",
timestamp: new Date().toISOString(),
},
{ id: "error", retry: 5000 },
);
}
});
2.2. Publishing Events from Business Logic
Events are published from oRPC procedures when data changes:
// src/server/routes/task.ts
export const taskRouter = {
create: protectedProcedure
.input(z.object({
text: z.string(),
priority: z.enum(["high", "medium", "low"]).default("medium"),
hrs: z.number().min(1).max(8).default(1),
isPublic: z.boolean().default(false)
}))
.handler(async ({ input, context }) => {
// Create task in database
const task = await db.insert(tasks).values({
id: crypto.randomUUID(),
text: input.text,
priority: input.priority,
hrs: input.hrs,
isPublic: input.isPublic,
status: "Not Started",
userId: context.session.user.id,
createdAt: new Date(),
updatedAt: new Date(),
}).returning();
// PUBLISH REAL-TIME EVENT
taskPublisher.publish("task-created", {
taskId: task[0].id,
userId: context.session.user.id,
task: task[0],
});
return task[0];
}),
update: protectedProcedure
.input(z.object({
id: z.string(),
text: z.string().optional(),
status: z.string().optional(),
priority: z.enum(["high", "medium", "low"]).optional(),
}))
.handler(async ({ input, context }) => {
const { id, ...updateData } = input;
// Get current task for comparison
const currentTask = await db.query.tasks.findFirst({
where: eq(tasks.id, id),
});
if (!currentTask) {
throw new Error("Task not found");
}
// Update task in database
const updatedTask = await db.update(tasks)
.set({
...updateData,
updatedAt: new Date(),
})
.where(eq(tasks.id, id))
.returning();
// PUBLISH REAL-TIME EVENT
taskPublisher.publish("task-updated", {
taskId: id,
userId: context.session.user.id,
task: updatedTask[0],
});
// PUBLISH STATUS CHANGE EVENT if status changed
if (updateData.status && updateData.status !== currentTask.status) {
taskPublisher.publish("task-status-changed", {
taskId: id,
userId: context.session.user.id,
oldStatus: currentTask.status,
newStatus: updateData.status,
task: updatedTask[0],
});
}
return updatedTask[0];
}),
delete: protectedProcedure
.input(z.object({ id: z.string() }))
.handler(async ({ input, context }) => {
const { id } = input;
// Delete task from database
await db.delete(tasks).where(eq(tasks.id, id));
// PUBLISH REAL-TIME EVENT
taskPublisher.publish("task-deleted", {
taskId: id,
userId: context.session.user.id,
});
return { success: true };
}),
};
2.3. Client-Side Real-Time Hook
TS Starter provides a custom hook for handling real-time updates:
// src/lib/hooks/useRealtimeTasks.ts
export interface RealtimeTaskEvent {
type: "connected" | "task-created" | "task-updated" | "task-deleted" | "task-status-changed" | "error";
message?: string;
task?: Task;
userId?: string;
oldStatus?: string;
newStatus?: string;
timestamp?: string;
}
export function useRealtimeTasks() {
return useQuery({
queryKey: ["realtime-tasks"],
queryFn: () => client.realtimeTasks(),
refetchInterval: false, // Disable polling since we're using SSE
staleTime: Infinity, // Keep the data fresh since it's real-time
gcTime: 0, // Don't cache since it's streaming data
});
}
// Hook to handle real-time task updates
export function useTaskUpdates(onUpdate?: (event: RealtimeTaskEvent) => void) {
const { data: eventGenerator } = useRealtimeTasks();
// Process incoming events
React.useEffect(() => {
if (!eventGenerator) return;
const processEvents = async () => {
try {
for await (const event of eventGenerator) {
onUpdate?.(event);
}
} catch (error) {
console.error("Error reading real-time stream:", error);
}
};
processEvents();
return () => {
// Cleanup will be handled by the generator
};
}, [eventGenerator, onUpdate]);
return eventGenerator;
}
2.4. Real-Time Updates in Components
Components use the real-time hook to stay synchronized:
// src/routes/_authed/task.tsx
export function TaskComponent() {
const queryClient = useQueryClient();
// Real-time task updates
useTaskUpdates((event: RealtimeTaskEvent) => {
console.log("Real-time task event:", event);
switch (event.type) {
case "connected":
console.log("Connected to real-time task updates");
break;
case "task-created":
toast.success(`New task created: "${event.task?.text}"! 🎉`);
queryClient.invalidateQueries({ queryKey: ["tasks"] });
break;
case "task-updated":
toast.info(`Task updated: "${event.task?.text}"`);
queryClient.invalidateQueries({ queryKey: ["tasks"] });
break;
case "task-deleted":
toast.info(`Task deleted`);
queryClient.invalidateQueries({ queryKey: ["tasks"] });
break;
case "task-status-changed": {
const statusEmoji =
event.newStatus === "Completed"
? "✅"
: event.newStatus === "In Progress"
? "🚀"
: "📋";
toast.success(
`Task status changed to ${event.newStatus} ${statusEmoji}`,
);
queryClient.invalidateQueries({ queryKey: ["tasks"] });
break;
}
case "error":
console.error("Real-time connection error:", event.message);
toast.error("Real-time connection lost. Trying to reconnect...");
break;
}
});
return (
<div>
{/* Task list automatically updates via real-time events */}
<TaskList />
</div>
);
}
3. Caching Strategies and Optimization
3.1. Global Query Client Configuration
TS Starter configures React Query with optimized defaults:
// src/router.tsx
export function createRouter() {
const queryClient = new QueryClient({
defaultOptions: {
queries: {
refetchOnWindowFocus: false, // Disable global refetch on focus
staleTime: 1000 * 60 * 2, // 2 minutes default stale time
},
},
});
const router = createTanStackRouter({
routeTree,
context: { queryClient, user: null },
defaultPreload: "intent",
defaultPreloadStaleTime: 0,
defaultErrorComponent: DefaultCatchBoundary,
defaultNotFoundComponent: DefaultNotFound,
scrollRestoration: true,
defaultStructuralSharing: true,
});
setupRouterSsrQueryIntegration({ router, queryClient });
return router;
}
3.2. Query-Specific Caching Strategies
Different types of data use different caching strategies:
// CRITICAL DATA: Short stale time, always refetch
const { data: tasks } = useSuspenseQuery({
queryKey: ["tasks"] as const,
queryFn: () => orpc.task.getAll.call(),
staleTime: 1000, // 1 second - very fresh
refetchOnWindowFocus: true,
refetchOnMount: true,
});
// USER SESSION: Medium stale time, refetch on focus
const { data: session } = useSuspenseQuery({
queryKey: ["session"] as const,
queryFn: () => orpc.auth.getSession.call(),
staleTime: 1000 * 60 * 5, // 5 minutes
refetchOnWindowFocus: true,
});
// ANALYTICS: Long stale time, no refetch on focus
const { data: analytics } = useQuery({
queryKey: ["analytics"],
queryFn: () => orpc.task.getAnalytics.call(),
staleTime: 1000 * 60 * 2, // 2 minutes
refetchOnWindowFocus: false,
enabled: !!session,
});
// STATIC CONTENT: Very long stale time
const { data: latestBlogPost } = useSuspenseQuery({
queryKey: ["latestBlogPost"],
queryFn: async () => {
const { allBlogs } = await import("content-collections");
const sortedBlogs = allBlogs.sort((a, b) => {
const dateA = new Date(a.date || "").getTime();
const dateB = new Date(b.date || "").getTime();
return dateB - dateA || (a.order || 999) - (b.order || 999);
});
return sortedBlogs.length > 0 ? sortedBlogs[0] : null;
},
staleTime: 30000, // 30 seconds for content
refetchOnWindowFocus: true,
refetchOnMount: true,
});
// REAL-TIME DATA: No caching, streaming
const { data: eventGenerator } = useQuery({
queryKey: ["realtime-tasks"],
queryFn: () => client.realtimeTasks(),
refetchInterval: false,
staleTime: Infinity, // Never stale
gcTime: 0, // Don't cache
});
3.3. Cache Invalidation Patterns
TS Starter uses strategic cache invalidation to keep data fresh:
// MUTATION-BASED INVALIDATION
const createTaskMutation = useMutation({
mutationFn: (data) => orpc.task.create.call(data),
onSuccess: () => {
// Invalidate specific queries
queryClient.invalidateQueries({ queryKey: ["tasks"] });
queryClient.invalidateQueries({ queryKey: ["taskStats"] });
queryClient.invalidateQueries({ queryKey: ["analytics"] });
},
});
// REAL-TIME INVALIDATION
useTaskUpdates((event: RealtimeTaskEvent) => {
switch (event.type) {
case "task-created":
case "task-updated":
case "task-deleted":
case "task-status-changed":
// Invalidate all task-related queries
queryClient.invalidateQueries({ queryKey: ["tasks"] });
queryClient.invalidateQueries({ queryKey: ["taskStats"] });
queryClient.invalidateQueries({ queryKey: ["productivityInsights"] });
queryClient.invalidateQueries({ queryKey: ["mostRecentCompleted"] });
queryClient.invalidateQueries({ queryKey: ["mostRecentCreated"] });
break;
}
});
// SELECTIVE INVALIDATION
const updateTaskMutation = useMutation({
mutationFn: ({ id, data }) => orpc.task.update.call({ id, ...data }),
onSuccess: (updatedTask) => {
// Update specific task in cache
queryClient.setQueryData(["tasks"], (old: any) => ({
...old,
tasks: old?.tasks?.map((task: Task) =>
task.id === updatedTask.id ? updatedTask : task
) || []
}));
// Invalidate related queries
queryClient.invalidateQueries({ queryKey: ["taskStats"] });
},
});
3.4. Performance Optimization Techniques
TS Starter implements several performance optimizations:
// 1. QUERY DEDUPLICATION
// Multiple components can use the same query key without duplicate requests
const { data: tasks } = useSuspenseQuery({
queryKey: ["tasks"] as const, // Same key across components
queryFn: () => orpc.task.getAll.call(),
});
// 2. BACKGROUND REFETCHING
const { data: analytics } = useQuery({
queryKey: ["analytics"],
queryFn: () => orpc.task.getAnalytics.call(),
staleTime: 1000 * 60 * 2, // 2 minutes
refetchOnWindowFocus: false,
refetchOnMount: false,
refetchOnReconnect: true, // Refetch when connection restored
});
// 3. CONDITIONAL QUERIES
const { data: userTasks } = useQuery({
queryKey: ["userTasks", userId],
queryFn: () => orpc.task.getUserTasks.call({ userId }),
enabled: !!userId && !!session, // Only fetch when conditions are met
staleTime: 1000 * 60 * 5,
});
// 4. PREFETCHING
const prefetchTask = (taskId: string) => {
queryClient.prefetchQuery({
queryKey: ["task", taskId],
queryFn: () => orpc.task.getById.call({ id: taskId }),
staleTime: 1000 * 60 * 5,
});
};
// 5. STRUCTURAL SHARING
// React Query automatically shares identical data structures
const { data: tasks } = useSuspenseQuery({
queryKey: ["tasks"] as const,
queryFn: () => orpc.task.getAll.call(),
structuralSharing: true, // Default behavior
});
🎯 Part 2 Complete: Data Flow & State Management
Congratulations! You've now completed Part 2 of our architecture deep-dive series. In this part, we've covered:
What We've Learned
- ✅ React Query + oRPC Integration: Type-safe data fetching with powerful caching
- ✅ Real-Time Updates: Event Publisher architecture with Server-Sent Events
- ✅ Caching Strategies: Optimized cache invalidation and performance patterns
- ✅ State Management: Optimistic updates and error handling
- ✅ Performance Optimization: Query deduplication, prefetching, and background refetching
🚀 What's Next in the Series?
Part 3: Authentication & Security
- Better Auth configuration details
- CSRF protection implementation
- User data isolation patterns
Part 4: External Service Integration
- Plunk email service architecture
- Polar payment webhook processing
- Third-party API patterns
This is Part 2 of the TS Starter Architecture Deep-Dive series. Each part builds on the previous one to give you a complete understanding of the system.