DEV Community

Jaeyoun Nam
Jaeyoun Nam

Posted on

Langgraph Human In The Loop with socket

langgraph 의 interruption 기능을 통해서 Agent의 수행 중간에 human이 개입할 수 있다는 것을 알았다.

하지만 예시들을 보면 전부 human interaction은 한 셈치고~ 넘어간다. 실제로 User에게 확인을 받으려면 어떻게 해야할까? 크게 세가지 방법이 있을 것 같다.

Langgraph API 서버 사용

langgraph cli 로 langgraph API 서버를 docker로 실행한 후 langgraph SDK로 그래프를 실행하고, 스테이트를 변경하고, 재게하고 할 수 있다.

langgraph에서 제공하는 것들을 제공하는 방법대로 사용해야한다. 뭔간 설정이 많아지고, 내 코드랑 융합하기 까다로울 수 있어보인다.

서버에서 그래프 관리

위의 Langgraph API 서버에서 필요한 부분만 내 커스텀 서버에 구현하는 방법이다. 예를 들어 그래프 실행하면 그래프를 실행한 클라이언트와 그래프 체크포인트를 저장해야하고, 유저의 확인 후에 다시 그래프를 불러와서 유저의 응답에 맞게 상태를 변경해서 재게해야한다.

짜야할게 은근 많을 수도 있다.

소켓 연결

Agent실행 시에 소켓을 연결하고 소켓을 통해서 유저와 인터렉션 하는 것이다. 기존 예시 코드에서 소켓연결과 소켓 통신으로 유저 확인 받는 단계만 추가하면 동작한다.

대신, 글자 타이핑하듯 쳐지는 streaming을 구현하기 까다로울 수도 있다.

소켓 연결로 구현

일단 최대한 복잡성을 늘리지 않는 방향에서 구현을 해보고 싶어서 소켓연결로 구현해보았다.

서버는 NestJs를 사용하고 클라이언트는 NextJs를 사용한다.

서버

일단 Websocket 연결을 위해 Gateway를 만든다. agent/start 시에 커넥션을 만들고 바로 agent를 시행하도록 했다.

@WebSocketGateway({
  namespace: "/",
  transport: ["websocket", "polling"],
  path: "/agent/start",
  cors: {
    origin: "*",
    methods: ["GET", "POST"],
    credentials: true,
  },
})
export class AgentGateway implements OnGatewayConnection, OnGatewayDisconnect {
  @WebSocketServer()
  server: Server;
  protected readonly logger = new Logger(this.constructor.name);

  constructor(
    private readonly agentFactory: AgentFactory
  ) {}

  private pendingConfirmations = new Map<string, (response: boolean) => void>();

  // Handle new connections
  handleConnection(client: Socket) {
    console.log(`Client connected: ${client.id}`);

    // Option 1: Get actionData from query parameters
    const actionData: { agent: AgentName } = client.handshake.query.actionData
      ? JSON.parse(client.handshake.query.actionData as string)
      : null;

    if (actionData) {
      this.startAgentProcess(client, actionData);
    } else {
      // If no actionData is provided, you can wait for an event
      client.emit("error", "No action data provided");
      client.disconnect();
    }
  }

  // Handle disconnections
  handleDisconnect(client: Socket) {
    console.log(`Client disconnected: ${client.id}`);
    this.pendingConfirmations.delete(client.id);
  }

  // Send confirmation request
  async sendConfirmationRequest(clientId: string, data: any): Promise<boolean> {
    return new Promise((resolve) => {
      this.pendingConfirmations.set(clientId, resolve);
      this.server.to(clientId).emit("confirmation_request", data);

      // Optional timeout for response
      setTimeout(() => {
        if (this.pendingConfirmations.has(clientId)) {
          this.pendingConfirmations.delete(clientId);
          resolve(false); // Default to 'false' if timeout occurs
        }
      }, 3000000); // 3000 seconds timeout
    });
  }

  // Handle client's confirmation response
  @SubscribeMessage("confirmation_response")
  handleClientResponse(
    @MessageBody() data: { confirmed: boolean },
    @ConnectedSocket() client: Socket
  ) {
    const resolve = this.pendingConfirmations.get(client.id);
    if (resolve) {
      resolve(data.confirmed);
      this.pendingConfirmations.delete(client.id);
    }
  }

  // Start the agent process
  private async startAgentProcess(
    client: Socket,
    actionData: { agent: AgentName }
  ) {
    const graph = await this.agentFactory.create({
      agentName: actionData.agent,
    });

    const initialInput = { input: "hello world" };

    // Thread
    const graphStateConfig = {
      configurable: { thread_id: "1" },
      streamMode: "values" as const,
    };

    // Run the graph until the first interruption
    for await (const event of await graph.stream(
      initialInput,
      graphStateConfig
    )) {
      this.logAndEmit(client, `--- ${event.input} ---`);
    }

    // Will log when the graph is interrupted, after step 2.
    this.logAndEmit(client, "---GRAPH INTERRUPTED---");

    const userConfirmed = await this.sendConfirmationRequest(client.id, {
      message: "Do you want to proceed with this action?",
      actionData,
    });

    if (userConfirmed) {
      // If approved, continue the graph execution. We must pass `null` as
      // the input here, or the graph will
      for await (const event of await graph.stream(null, graphStateConfig)) {
        this.logAndEmit(client, `--- ${event.input} ---`);
      }
      this.logAndEmit(client, "---ACTION EXECUTED---");
    } else {
      this.logAndEmit(client, "---ACTION CANCELLED---");
    }

    // Optionally disconnect the client
    client.disconnect();
  }

  private logAndEmit(client: Socket, message: string) {
    console.log(message);
    client.emit("message", { message });
  }
}
Enter fullscreen mode Exit fullscreen mode

핵심은 간단하다. Socket이 연결되면 바로 agent를 생성하여 수행하고, 수행해서 interrupt 당하면 Client에게 confirmation request message를 보내고 기다린다. confirmation이 resolve되면 이어서 graph를 진행한다.

위 코드에서 사용한 agent는 langgraph 문서에 있는 아래 스텝 1 2 3 을 순차적으로 사용하는 에이전트이다.

  const GraphState = Annotation.Root({
    input: Annotation<string>,
  });

  const step1 = (state: typeof GraphState.State) => {
    console.log("---Step 1---");
    return state;
  };

  const step2 = (state: typeof GraphState.State) => {
    console.log("---Step 2---");
    return state;
  };

  const step3 = (state: typeof GraphState.State) => {
    console.log("---Step 3---");
    return state;
  };

  const builder = new StateGraph(GraphState)
    .addNode("step1", step1)
    .addNode("step2", step2)
    .addNode("step3", step3)
    .addEdge(START, "step1")
    .addEdge("step1", "step2")
    .addEdge("step2", "step3")
    .addEdge("step3", END);

  // Set up memory
  const graphStateMemory = new MemorySaver();

  const graph = builder.compile({
    checkpointer: graphStateMemory,
    interruptBefore: ["step3"],
  });
  return graph;
Enter fullscreen mode Exit fullscreen mode

클라이언트

클라이언트에서는 훅을 만들어서 agent start와 그 상태를 관리한다.

import { useRef, useState } from "react";
import io, { Socket } from "socket.io-client";

export const useAgentSocket = () => {
  const socketRef = useRef<Socket | null>(null);
  const [confirmationRequest, setConfirmationRequest] = useState<any>(null);
  const [messages, setMessages] = useState<string[]>([]);

  const connectAndRun = (actionData: any) => {
    return new Promise((resolve, reject) => {
      socketRef.current = io("http://localhost:8000", {
        path: "/agent/start",
        transports: ["websocket", "polling"],
        query: {
          actionData: JSON.stringify(actionData),
        },
      });

      socketRef.current.on("connect", () => {
        console.log("Connected:", socketRef.current?.id);
        resolve(void 0);
      });

      socketRef.current.on("connect_error", (error) => {
        console.error("Connection error:", error);
        reject(error);
      });

      // Listen for confirmation requests
      socketRef.current.on("confirmation_request", (data) => {
        setConfirmationRequest(data);
      });

      // Listen for messages
      socketRef.current.on("message", (data) => {
        console.log("Received message:", data);
        setMessages((prevMessages) => [...prevMessages, data.message]);
      });

      socketRef.current.on("disconnect", () => {
        console.log("Disconnected from server");
      });
    });
  };

  const sendConfirmationResponse = (confirmed: boolean) => {
    if (socketRef.current) {
      socketRef.current.emit("confirmation_response", { confirmed });
      setConfirmationRequest(null);
    }
  };

  const disconnectSocket = () => {
    if (socketRef.current) {
      socketRef.current.disconnect();
    }
  };

  const clearMessages = () => {
    setMessages([]);
  };

  return {
    confirmationRequest,
    sendConfirmationResponse,
    connectAndRun,
    disconnectSocket,
    messages,
    clearMessages,
  };
};
Enter fullscreen mode Exit fullscreen mode

커넥션을 맺고, confirmation request가 오면 confirmationRequest 상태를 업데이트한다. UI component에서 confirmationRequest 상태를 보고 유저에게 창을 띄워주면 된다.

Top comments (0)