Architecture Deep Dive: Data Flow & State Management - Part 2

Documentation
Guide

Architecture Deep Dive: Data Flow & State Management - Part 2

Comprehensive technical breakdown of TS Starter's React Query + oRPC integration, real-time updates, and caching strategies

Documentation
Guide
0 Pages
Available

Part 2: Data Flow & State Management

Welcome to Part 2 of our architecture deep-dive series! In this part, we'll explore how TS Starter manages data flow, state, and real-time updates using React Query + oRPC integration patterns.

🎯 What You'll Learn

  • React Query + oRPC integration patterns
  • Real-time updates with Event Publisher
  • Caching strategies and optimization
  • State management best practices

1. React Query + oRPC Integration Patterns

1.1. The Perfect Type-Safe Duo

TS Starter combines React Query's powerful caching with oRPC's type-safe RPC calls to create a seamless data management experience:

// src/lib/orpc.ts - The Integration Foundation
import { createORPCReactQueryUtils } from "@orpc/react-query";
import { createRouterClient } from "@orpc/client";
import { RPCLink } from "@orpc/client/link";

const getORPCClient = createIsomorphicFn()
  .server(() =>
    createRouterClient(serverRouter, {
      context: async () => {
        const req = getWebRequest();
        const ctx = await createServerOrpcContext({
          headers: req.headers,
          req,
        });
        return ctx;
      },
    }),
  )
  .client((): RouterClient<typeof serverRouter> => {
    const link = new RPCLink({
      url: new URL("/api/rpc", import.meta.env.VITE_BASE_URL || window.location.origin),
      fetch: (input, init) =>
        fetch(input, {
          ...init,
          credentials: "include", // Include auth cookies
        }),
    });

    return createORPCClient(link);
  });

export const client: RouterClient<typeof serverRouter> = getORPCClient();
export const orpc = createORPCReactQueryUtils(client); // ← The magic integration

1.2. Query Patterns: Suspense vs Regular Queries

TS Starter uses different query patterns based on the use case:

Suspense Queries for Critical Data

// src/routes/_authed/task.tsx
export function TaskComponent() {
  // CRITICAL DATA: Use Suspense for immediate loading
  const { data: tasks } = useSuspenseQuery({
    queryKey: ["tasks"] as const,
    queryFn: () => orpc.task.getAll.call(),
    staleTime: 1000, // Fresh for 1 second
    refetchOnWindowFocus: true,
    refetchOnMount: true,
  });

  // CRITICAL DATA: User session with Suspense
  const { data: session } = useSuspenseQuery({
    queryKey: ["session"] as const,
    queryFn: () => orpc.auth.getSession.call(),
    staleTime: 1000 * 60 * 5, // 5 minutes
  });

  return (
    <div>
      {tasks?.tasks?.map(task => (
        <TaskItem key={task.id} task={task} />
      ))}
    </div>
  );
}

Regular Queries for Optional Data

// src/routes/_authed/dashboard.tsx
export function DashboardPage() {
  // OPTIONAL DATA: Use regular queries for non-critical data
  const { data: taskStats } = useQuery({
    queryKey: ["taskStats"],
    queryFn: () => orpc.task.getStats.call(),
    staleTime: 30000, // 30 seconds
    refetchOnWindowFocus: true,
    enabled: !!session, // Only fetch when authenticated
  });

  // OPTIONAL DATA: Analytics with longer stale time
  const { data: analytics } = useQuery({
    queryKey: ["analytics"],
    queryFn: () => orpc.task.getAnalytics.call(),
    staleTime: 1000 * 60 * 2, // 2 minutes
    refetchOnWindowFocus: false, // Don't refetch on focus
  });

  return (
    <div>
      {taskStats && <StatsWidget data={taskStats} />}
      {analytics && <AnalyticsWidget data={analytics} />}
    </div>
  );
}

1.3. Mutation Patterns with Optimistic Updates

TS Starter implements sophisticated mutation patterns with optimistic updates:

// src/routes/_authed/task.tsx
export function TaskComponent() {
  const queryClient = useQueryClient();

  // CREATE MUTATION: With optimistic updates
  const createTaskMutation = useMutation({
    mutationFn: (data: CreateTaskInput) => orpc.task.create.call(data),
    onMutate: async (newTask) => {
      // Cancel outgoing refetches
      await queryClient.cancelQueries({ queryKey: ["tasks"] });
      
      // Snapshot previous value
      const previousTasks = queryClient.getQueryData(["tasks"]);
      
      // Optimistically update
      queryClient.setQueryData(["tasks"], (old: any) => ({
        ...old,
        tasks: [...(old?.tasks || []), {
          ...newTask,
          id: `temp-${Date.now()}`, // Temporary ID
          status: "Not Started",
          createdAt: new Date(),
          updatedAt: new Date(),
        }]
      }));
      
      return { previousTasks };
    },
    onError: (err, newTask, context) => {
      // Rollback on error
      if (context?.previousTasks) {
        queryClient.setQueryData(["tasks"], context.previousTasks);
      }
      toast.error("Failed to create task. Please try again.");
    },
    onSuccess: (data) => {
      // Invalidate to get fresh data
      queryClient.invalidateQueries({ queryKey: ["tasks"] });
      toast.success("Task created successfully!");
    },
    onSettled: () => {
      // Always refetch after error or success
      queryClient.invalidateQueries({ queryKey: ["tasks"] });
    },
  });

  // UPDATE MUTATION: With optimistic updates
  const updateTaskMutation = useMutation({
    mutationFn: ({ id, data }: { id: string; data: Partial<Task> }) => 
      orpc.task.update.call({ id, ...data }),
    onMutate: async ({ id, data }) => {
      await queryClient.cancelQueries({ queryKey: ["tasks"] });
      
      const previousTasks = queryClient.getQueryData(["tasks"]);
      
      // Optimistically update the specific task
      queryClient.setQueryData(["tasks"], (old: any) => ({
        ...old,
        tasks: old?.tasks?.map((task: Task) => 
          task.id === id ? { ...task, ...data, updatedAt: new Date() } : task
        ) || []
      }));
      
      return { previousTasks };
    },
    onError: (err, variables, context) => {
      if (context?.previousTasks) {
        queryClient.setQueryData(["tasks"], context.previousTasks);
      }
      toast.error("Failed to update task. Please try again.");
    },
    onSuccess: () => {
      queryClient.invalidateQueries({ queryKey: ["tasks"] });
    },
  });

  return (
    <div>
      <button 
        onClick={() => createTaskMutation.mutate({
          text: "New task",
          priority: "medium",
          hrs: 1,
          isPublic: false
        })}
        disabled={createTaskMutation.isPending}
      >
        {createTaskMutation.isPending ? "Creating..." : "Create Task"}
      </button>
    </div>
  );
}

2. Real-Time Updates with Event Publisher

2.1. Event Publisher Architecture

TS Starter uses oRPC's Event Publisher for real-time updates across the application:

// src/server/orpc.ts - Event Publisher Setup
import { EventPublisher, withEventMeta } from "@orpc/server";

// Define event types with TypeScript
export const taskPublisher = new EventPublisher<{
  "task-created": {
    taskId: string;
    userId: string;
    task: Task;
  };
  "task-updated": {
    taskId: string;
    userId: string;
    task: Task;
  };
  "task-deleted": {
    taskId: string;
    userId: string;
  };
  "task-status-changed": {
    taskId: string;
    userId: string;
    oldStatus: string;
    newStatus: string;
    task: Task;
  };
}>();

// Real-time task updates using Event Iterator
export const realtimeTasks = protectedProcedure.handler(async function* ({
  context,
  signal,
}) {
  if (!context.session?.user) {
    console.warn("Real-time tasks: No authenticated user");
    return;
  }

  try {
    // Send initial connection confirmation
    yield withEventMeta(
      {
        type: "connected",
        message: "Connected to real-time task updates",
        userId: context.session.user.id,
      },
      { id: "connection", retry: 5000 },
    );

    // Subscribe to task events for this user
    for await (const event of taskPublisher.subscribe("task-created", {
      signal,
    })) {
      // Only send events relevant to the current user
      if (
        event.userId === context.session.user.id ||
        event.task.isPublic ||
        (event.task.sharedWith &&
          JSON.parse(event.task.sharedWith).includes(
            context.session.user.email,
          ))
      ) {
        yield withEventMeta(
          {
            type: "task-created",
            task: event.task,
            timestamp: new Date().toISOString(),
          },
          { id: `task-created-${event.taskId}`, retry: 3000 },
        );
      }
    }

    // Subscribe to other event types...
    for await (const event of taskPublisher.subscribe("task-updated", {
      signal,
    })) {
      if (
        event.userId === context.session.user.id ||
        event.task.isPublic ||
        (event.task.sharedWith &&
          JSON.parse(event.task.sharedWith).includes(
            context.session.user.email,
          ))
      ) {
        yield withEventMeta(
          {
            type: "task-updated",
            task: event.task,
            timestamp: new Date().toISOString(),
          },
          { id: `task-updated-${event.taskId}`, retry: 3000 },
        );
      }
    }
  } catch (error) {
    console.error("Real-time connection error:", error);
    yield withEventMeta(
      {
        type: "error",
        message: "Connection lost",
        timestamp: new Date().toISOString(),
      },
      { id: "error", retry: 5000 },
    );
  }
});

2.2. Publishing Events from Business Logic

Events are published from oRPC procedures when data changes:

// src/server/routes/task.ts
export const taskRouter = {
  create: protectedProcedure
    .input(z.object({ 
      text: z.string(), 
      priority: z.enum(["high", "medium", "low"]).default("medium"),
      hrs: z.number().min(1).max(8).default(1),
      isPublic: z.boolean().default(false)
    }))
    .handler(async ({ input, context }) => {
      // Create task in database
      const task = await db.insert(tasks).values({
        id: crypto.randomUUID(),
        text: input.text,
        priority: input.priority,
        hrs: input.hrs,
        isPublic: input.isPublic,
        status: "Not Started",
        userId: context.session.user.id,
        createdAt: new Date(),
        updatedAt: new Date(),
      }).returning();

      // PUBLISH REAL-TIME EVENT
      taskPublisher.publish("task-created", {
        taskId: task[0].id,
        userId: context.session.user.id,
        task: task[0],
      });

      return task[0];
    }),

  update: protectedProcedure
    .input(z.object({
      id: z.string(),
      text: z.string().optional(),
      status: z.string().optional(),
      priority: z.enum(["high", "medium", "low"]).optional(),
    }))
    .handler(async ({ input, context }) => {
      const { id, ...updateData } = input;
      
      // Get current task for comparison
      const currentTask = await db.query.tasks.findFirst({
        where: eq(tasks.id, id),
      });

      if (!currentTask) {
        throw new Error("Task not found");
      }

      // Update task in database
      const updatedTask = await db.update(tasks)
        .set({
          ...updateData,
          updatedAt: new Date(),
        })
        .where(eq(tasks.id, id))
        .returning();

      // PUBLISH REAL-TIME EVENT
      taskPublisher.publish("task-updated", {
        taskId: id,
        userId: context.session.user.id,
        task: updatedTask[0],
      });

      // PUBLISH STATUS CHANGE EVENT if status changed
      if (updateData.status && updateData.status !== currentTask.status) {
        taskPublisher.publish("task-status-changed", {
          taskId: id,
          userId: context.session.user.id,
          oldStatus: currentTask.status,
          newStatus: updateData.status,
          task: updatedTask[0],
        });
      }

      return updatedTask[0];
    }),

  delete: protectedProcedure
    .input(z.object({ id: z.string() }))
    .handler(async ({ input, context }) => {
      const { id } = input;

      // Delete task from database
      await db.delete(tasks).where(eq(tasks.id, id));

      // PUBLISH REAL-TIME EVENT
      taskPublisher.publish("task-deleted", {
        taskId: id,
        userId: context.session.user.id,
      });

      return { success: true };
    }),
};

2.3. Client-Side Real-Time Hook

TS Starter provides a custom hook for handling real-time updates:

// src/lib/hooks/useRealtimeTasks.ts
export interface RealtimeTaskEvent {
  type: "connected" | "task-created" | "task-updated" | "task-deleted" | "task-status-changed" | "error";
  message?: string;
  task?: Task;
  userId?: string;
  oldStatus?: string;
  newStatus?: string;
  timestamp?: string;
}

export function useRealtimeTasks() {
  return useQuery({
    queryKey: ["realtime-tasks"],
    queryFn: () => client.realtimeTasks(),
    refetchInterval: false, // Disable polling since we're using SSE
    staleTime: Infinity, // Keep the data fresh since it's real-time
    gcTime: 0, // Don't cache since it's streaming data
  });
}

// Hook to handle real-time task updates
export function useTaskUpdates(onUpdate?: (event: RealtimeTaskEvent) => void) {
  const { data: eventGenerator } = useRealtimeTasks();

  // Process incoming events
  React.useEffect(() => {
    if (!eventGenerator) return;

    const processEvents = async () => {
      try {
        for await (const event of eventGenerator) {
          onUpdate?.(event);
        }
      } catch (error) {
        console.error("Error reading real-time stream:", error);
      }
    };

    processEvents();

    return () => {
      // Cleanup will be handled by the generator
    };
  }, [eventGenerator, onUpdate]);

  return eventGenerator;
}

2.4. Real-Time Updates in Components

Components use the real-time hook to stay synchronized:

// src/routes/_authed/task.tsx
export function TaskComponent() {
  const queryClient = useQueryClient();

  // Real-time task updates
  useTaskUpdates((event: RealtimeTaskEvent) => {
    console.log("Real-time task event:", event);

    switch (event.type) {
      case "connected":
        console.log("Connected to real-time task updates");
        break;

      case "task-created":
        toast.success(`New task created: "${event.task?.text}"! 🎉`);
        queryClient.invalidateQueries({ queryKey: ["tasks"] });
        break;

      case "task-updated":
        toast.info(`Task updated: "${event.task?.text}"`);
        queryClient.invalidateQueries({ queryKey: ["tasks"] });
        break;

      case "task-deleted":
        toast.info(`Task deleted`);
        queryClient.invalidateQueries({ queryKey: ["tasks"] });
        break;

      case "task-status-changed": {
        const statusEmoji =
          event.newStatus === "Completed"
            ? "✅"
            : event.newStatus === "In Progress"
              ? "🚀"
              : "📋";
        toast.success(
          `Task status changed to ${event.newStatus} ${statusEmoji}`,
        );
        queryClient.invalidateQueries({ queryKey: ["tasks"] });
        break;
      }

      case "error":
        console.error("Real-time connection error:", event.message);
        toast.error("Real-time connection lost. Trying to reconnect...");
        break;
    }
  });

  return (
    <div>
      {/* Task list automatically updates via real-time events */}
      <TaskList />
    </div>
  );
}

3. Caching Strategies and Optimization

3.1. Global Query Client Configuration

TS Starter configures React Query with optimized defaults:

// src/router.tsx
export function createRouter() {
  const queryClient = new QueryClient({
    defaultOptions: {
      queries: {
        refetchOnWindowFocus: false, // Disable global refetch on focus
        staleTime: 1000 * 60 * 2, // 2 minutes default stale time
      },
    },
  });

  const router = createTanStackRouter({
    routeTree,
    context: { queryClient, user: null },
    defaultPreload: "intent",
    defaultPreloadStaleTime: 0,
    defaultErrorComponent: DefaultCatchBoundary,
    defaultNotFoundComponent: DefaultNotFound,
    scrollRestoration: true,
    defaultStructuralSharing: true,
  });

  setupRouterSsrQueryIntegration({ router, queryClient });

  return router;
}

3.2. Query-Specific Caching Strategies

Different types of data use different caching strategies:

// CRITICAL DATA: Short stale time, always refetch
const { data: tasks } = useSuspenseQuery({
  queryKey: ["tasks"] as const,
  queryFn: () => orpc.task.getAll.call(),
  staleTime: 1000, // 1 second - very fresh
  refetchOnWindowFocus: true,
  refetchOnMount: true,
});

// USER SESSION: Medium stale time, refetch on focus
const { data: session } = useSuspenseQuery({
  queryKey: ["session"] as const,
  queryFn: () => orpc.auth.getSession.call(),
  staleTime: 1000 * 60 * 5, // 5 minutes
  refetchOnWindowFocus: true,
});

// ANALYTICS: Long stale time, no refetch on focus
const { data: analytics } = useQuery({
  queryKey: ["analytics"],
  queryFn: () => orpc.task.getAnalytics.call(),
  staleTime: 1000 * 60 * 2, // 2 minutes
  refetchOnWindowFocus: false,
  enabled: !!session,
});

// STATIC CONTENT: Very long stale time
const { data: latestBlogPost } = useSuspenseQuery({
  queryKey: ["latestBlogPost"],
  queryFn: async () => {
    const { allBlogs } = await import("content-collections");
    const sortedBlogs = allBlogs.sort((a, b) => {
      const dateA = new Date(a.date || "").getTime();
      const dateB = new Date(b.date || "").getTime();
      return dateB - dateA || (a.order || 999) - (b.order || 999);
    });
    return sortedBlogs.length > 0 ? sortedBlogs[0] : null;
  },
  staleTime: 30000, // 30 seconds for content
  refetchOnWindowFocus: true,
  refetchOnMount: true,
});

// REAL-TIME DATA: No caching, streaming
const { data: eventGenerator } = useQuery({
  queryKey: ["realtime-tasks"],
  queryFn: () => client.realtimeTasks(),
  refetchInterval: false,
  staleTime: Infinity, // Never stale
  gcTime: 0, // Don't cache
});

3.3. Cache Invalidation Patterns

TS Starter uses strategic cache invalidation to keep data fresh:

// MUTATION-BASED INVALIDATION
const createTaskMutation = useMutation({
  mutationFn: (data) => orpc.task.create.call(data),
  onSuccess: () => {
    // Invalidate specific queries
    queryClient.invalidateQueries({ queryKey: ["tasks"] });
    queryClient.invalidateQueries({ queryKey: ["taskStats"] });
    queryClient.invalidateQueries({ queryKey: ["analytics"] });
  },
});

// REAL-TIME INVALIDATION
useTaskUpdates((event: RealtimeTaskEvent) => {
  switch (event.type) {
    case "task-created":
    case "task-updated":
    case "task-deleted":
    case "task-status-changed":
      // Invalidate all task-related queries
      queryClient.invalidateQueries({ queryKey: ["tasks"] });
      queryClient.invalidateQueries({ queryKey: ["taskStats"] });
      queryClient.invalidateQueries({ queryKey: ["productivityInsights"] });
      queryClient.invalidateQueries({ queryKey: ["mostRecentCompleted"] });
      queryClient.invalidateQueries({ queryKey: ["mostRecentCreated"] });
      break;
  }
});

// SELECTIVE INVALIDATION
const updateTaskMutation = useMutation({
  mutationFn: ({ id, data }) => orpc.task.update.call({ id, ...data }),
  onSuccess: (updatedTask) => {
    // Update specific task in cache
    queryClient.setQueryData(["tasks"], (old: any) => ({
      ...old,
      tasks: old?.tasks?.map((task: Task) => 
        task.id === updatedTask.id ? updatedTask : task
      ) || []
    }));
    
    // Invalidate related queries
    queryClient.invalidateQueries({ queryKey: ["taskStats"] });
  },
});

3.4. Performance Optimization Techniques

TS Starter implements several performance optimizations:

// 1. QUERY DEDUPLICATION
// Multiple components can use the same query key without duplicate requests
const { data: tasks } = useSuspenseQuery({
  queryKey: ["tasks"] as const, // Same key across components
  queryFn: () => orpc.task.getAll.call(),
});

// 2. BACKGROUND REFETCHING
const { data: analytics } = useQuery({
  queryKey: ["analytics"],
  queryFn: () => orpc.task.getAnalytics.call(),
  staleTime: 1000 * 60 * 2, // 2 minutes
  refetchOnWindowFocus: false,
  refetchOnMount: false,
  refetchOnReconnect: true, // Refetch when connection restored
});

// 3. CONDITIONAL QUERIES
const { data: userTasks } = useQuery({
  queryKey: ["userTasks", userId],
  queryFn: () => orpc.task.getUserTasks.call({ userId }),
  enabled: !!userId && !!session, // Only fetch when conditions are met
  staleTime: 1000 * 60 * 5,
});

// 4. PREFETCHING
const prefetchTask = (taskId: string) => {
  queryClient.prefetchQuery({
    queryKey: ["task", taskId],
    queryFn: () => orpc.task.getById.call({ id: taskId }),
    staleTime: 1000 * 60 * 5,
  });
};

// 5. STRUCTURAL SHARING
// React Query automatically shares identical data structures
const { data: tasks } = useSuspenseQuery({
  queryKey: ["tasks"] as const,
  queryFn: () => orpc.task.getAll.call(),
  structuralSharing: true, // Default behavior
});

🎯 Part 2 Complete: Data Flow & State Management

Congratulations! You've now completed Part 2 of our architecture deep-dive series. In this part, we've covered:

What We've Learned

  • React Query + oRPC Integration: Type-safe data fetching with powerful caching
  • Real-Time Updates: Event Publisher architecture with Server-Sent Events
  • Caching Strategies: Optimized cache invalidation and performance patterns
  • State Management: Optimistic updates and error handling
  • Performance Optimization: Query deduplication, prefetching, and background refetching

🚀 What's Next in the Series?

Part 3: Authentication & Security

  • Better Auth configuration details
  • CSRF protection implementation
  • User data isolation patterns

Part 4: External Service Integration

  • Plunk email service architecture
  • Polar payment webhook processing
  • Third-party API patterns

This is Part 2 of the TS Starter Architecture Deep-Dive series. Each part builds on the previous one to give you a complete understanding of the system.