درس تقني
متقدم
⏱ قراءة لمدة ٤٥ دقيقة
© بوابة الذكاء الاصطناعي ٢٠٢٦-٠٦-٠٣
متقدم
⏱ قراءة لمدة ٤٥ دقيقة
© بوابة الذكاء الاصطناعي ٢٠٢٦-٠٦-٠٣
قم ببناء سير عمل متعدد الوكلاء بجودة إنتاجية باستخدام LangGraph v1.2. استخدم التنسيق القائم على الحالة لإدارة حلقات التفكير الذاتي بأمان.
المتطلبات الأساسية
- Python 3.10+
- LangChain v1.3.4+ وLangGraph v1.2.4+
- OpenAI API Key (GPT-4o)
- فهم Pydantic وTypedDict لإدارة الحالة
التثبيت
pip install langchain==1.3.4 langgraph==1.2.4 langchain-openaiالخطوة ١: تعريف مخطط الحالة
في سير العمل الحديث للوكيل، يتم استبدال “الذاكرة” بمخطط حالة صريح. يتيح ذلك للرسوم البيانية تمرير البيانات بين العقد بأمان نوعي.
from typing import TypedDict, Annotated
import operator
from langchain_core.messages import BaseMessage
class AgentState(TypedDict):
# Annotate as 'list' to append new messages instead of overwriting
messages: Annotated[list[BaseMessage], operator.add]
task_goal: str
generated_topology: strالخطوة ٢: التنسيق باستخدام LangGraph
نستبدل فئة Agent القديمة بـ Nodes وEdges. يتيح ذلك “تصحيح السفر عبر الزمن” ونقاط التحقق البشرية في الحلقة.
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
llm = ChatOpenAI(model="gpt-4o", temperature=0.2)
# Define Node Logic
def understand_task(state: AgentState):
# ... logic to parse natural language ...
return {"task_goal": "Optimized Ethylene Cracking"}
def generate_topology(state: AgentState):
# ... logic to output process structure ...
return {"generated_topology": "C2H4 -> C2H2 + H2"}
# Build the Graph
workflow = StateGraph(AgentState)
workflow.add_node("understand", understand_task)
workflow.add_node("topology", generate_topology)
workflow.set_entry_point("understand")
workflow.add_edge("understand", "topology")
workflow.add_edge("topology", END)
app = workflow.compile()اختبار سير العمل
لتشغيل الوكيل بجودة إنتاجية، نقوم باستدعاء الرسم البياني بحالة أولية.
result = app.invoke({"messages": ["Design an ethylene cracking process"]})
print(result["generated_topology"])