A Step-by-Step Guide to Build an Automated Knowledge Graph Pipeline Using LangGraph and NetworkX

0


In this tutorial, we demonstrate how to construct an automated Knowledge Graph (KG) pipeline using LangGraph and NetworkX. The pipeline simulates a sequence of intelligent agents that collaboratively perform tasks such as data gathering, entity extraction, relation identification, entity resolution, and graph validation. Starting from a user-provided topic, such as “Artificial Intelligence,” the system methodically extracts relevant entities and relationships, resolves duplicates, and integrates the information into a cohesive graphical structure. By visualizing the final knowledge graph, developers and data scientists gain clear insights into complex interrelations among concepts, making this approach highly beneficial for applications in semantic analysis, natural language processing, and knowledge management.

!pip install langgraph langchain_core

We install two essential Python libraries: LangGraph, which is used for creating and orchestrating agent-based computational workflows, and LangChain Core, which provides foundational classes and utilities for building language model-powered applications. These libraries enable seamless integration of agents into intelligent data pipelines.

import re
import networkx as nx
import matplotlib.pyplot as plt
from typing import TypedDict, List, Tuple, Dict, Any
from langchain_core.messages import HumanMessage, AIMessage
from langgraph.graph import StateGraph, END

We import essential libraries to build an automated knowledge graph pipeline. It includes re for regular expression-based text processing, NetworkX and matplotlib for creating and visualizing graphs, TypedDict and typing annotations for structured data handling, and LangGraph along with langchain_core for orchestrating the interaction between AI agents within the workflow.

class KGState(TypedDict):
topic: str
raw_text: str
entities: List[str]
relations: List[Tuple[str, str, str]]
resolved_relations: List[Tuple[str, str, str]]
graph: Any
validation: Dict[str, Any]
messages: List[Any]
current_agent: str

We define a structured data type, KGState, using Python’s TypedDict. It outlines the schema for managing state across different steps of the knowledge graph pipeline. It includes details like the chosen topic, gathered text, identified entities and relationships, resolved duplicates, the constructed graph object, validation results, interaction messages, and tracking the currently active agent.

def data_gatherer(state: KGState) -> KGState:
topic = state[“topic”]
print(f”📚 Data Gatherer: Searching for information about ‘{topic}'”)

collected_text = f”{topic} is an important concept. It relates to various entities like EntityA, EntityB, and EntityC. EntityA influences EntityB. EntityC is a type of EntityB.”

state[“messages”].append(AIMessage(content=f”Collected raw text about {topic}”))

state[“raw_text”] = collected_text
state[“current_agent”] = “entity_extractor”

return state

This function, data_gatherer, acts as the first step in the pipeline. It simulates gathering raw text data about a provided topic (stored in state[“topic”]). It then stores this simulated data into state[“raw_text”], adds a message indicating the data collection completion, and updates the pipeline’s state by setting the next agent (entity_extractor) as active.

def entity_extractor(state: KGState) -> KGState:
print(“🔍 Entity Extractor: Identifying entities in the text”)
text = state[“raw_text”]

entities = re.findall(r”Entity[A-Z]”, text)

entities = [state[“topic”]] + entities
state[“entities”] = list(set(entities))

state[“messages”].append(AIMessage(content=f”Extracted entities: {state[‘entities’]}”))
print(f” Found entities: {state[‘entities’]}”)

state[“current_agent”] = “relation_extractor”

return state

The entity_extractor function identifies entities from the collected raw text using a simple regular expression pattern that matches terms like “EntityA”, “EntityB”, etc. It also includes the main topic as an entity and ensures uniqueness by converting the list to a set. The extracted entities are stored in the state, an AI message logs the result, and the pipeline advances to the relation_extractor agent.

def relation_extractor(state: KGState) -> KGState:
print(“🔗 Relation Extractor: Identifying relationships between entities”)
text = state[“raw_text”]
entities = state[“entities”]
relations = []

relation_patterns = [
(r”([A-Za-z]+) relates to ([A-Za-z]+)”, “relates_to”),
(r”([A-Za-z]+) influences ([A-Za-z]+)”, “influences”),
(r”([A-Za-z]+) is a type of ([A-Za-z]+)”, “is_type_of”)
]

for e1 in entities:
for e2 in entities:
if e1 != e2:
for pattern, rel_type in relation_patterns:
if re.search(f”{e1}.*{rel_type}.*{e2}”, text.replace(“_”, ” “), re.IGNORECASE) or \
re.search(f”{e1}.*{e2}”, text, re.IGNORECASE):
relations.append((e1, rel_type, e2))

state[“relations”] = relations
state[“messages”].append(AIMessage(content=f”Extracted relations: {relations}”))
print(f” Found relations: {relations}”)

state[“current_agent”] = “entity_resolver”

return state

The relation_extractor function detects semantic relationships between entities within the raw text. It uses predefined regex patterns to identify phrases like “influences” or “is a type of” between entity pairs. When a match is found, it adds the corresponding relation as a triple (subject, predicate, object) to the relations list. These extracted relations are stored in the state, a message is logged for agent communication, and control moves to the next agent: entity_resolver.

def entity_resolver(state: KGState) -> KGState:
print(“🔄 Entity Resolver: Resolving duplicate entities”)

entity_map = {}
for entity in state[“entities”]:
canonical_name = entity.lower().replace(” “, “_”)
entity_map[entity] = canonical_name

resolved_relations = []
for s, p, o in state[“relations”]:
s_resolved = entity_map.get(s, s)
o_resolved = entity_map.get(o, o)
resolved_relations.append((s_resolved, p, o_resolved))

state[“resolved_relations”] = resolved_relations
state[“messages”].append(AIMessage(content=f”Resolved relations: {resolved_relations}”))

state[“current_agent”] = “graph_integrator”

return state

The entity_resolver function standardizes entity names to avoid duplication and inconsistencies. It creates a mapping (entity_map) by converting each entity to lowercase and replacing spaces with underscores. Then, this mapping is applied to all subjects and objects in the extracted relations to produce resolved relations. These normalized triples are added to the state, a confirmation message is logged, and control is passed to the graph_integrator agent.

def graph_integrator(state: KGState) -> KGState:
print(“📊 Graph Integrator: Building the knowledge graph”)
G = nx.DiGraph()

for s, p, o in state[“resolved_relations”]:
if not G.has_node(s):
G.add_node(s)
if not G.has_node(o):
G.add_node(o)
G.add_edge(s, o, relation=p)

state[“graph”] = G
state[“messages”].append(AIMessage(content=f”Built graph with {len(G.nodes)} nodes and {len(G.edges)} edges”))

state[“current_agent”] = “graph_validator”

return state

The graph_integrator function constructs the actual knowledge graph using networkx.DiGraph() supports directed relationships. It iterates over the resolved triples (subject, predicate, object), ensures both nodes exist, and then adds a directed edge with the relation as metadata. The resulting graph is saved in the state, a summary message is appended, and the pipeline transitions to the graph_validator agent for final validation.

def graph_validator(state: KGState) -> KGState:
print(“✅ Graph Validator: Validating knowledge graph”)
G = state[“graph”]

validation_report = {
“num_nodes”: len(G.nodes),
“num_edges”: len(G.edges),
“is_connected”: nx.is_weakly_connected(G) if G.nodes else False,
“has_cycles”: not nx.is_directed_acyclic_graph(G) if G.nodes else False
}

state[“validation”] = validation_report
state[“messages”].append(AIMessage(content=f”Validation report: {validation_report}”))
print(f” Validation report: {validation_report}”)

state[“current_agent”] = END

return state

The graph_validator function performs a basic health check on the constructed knowledge graph. It compiles a validation report containing the number of nodes and edges, whether the graph is weakly connected (i.e., every node is reachable if direction is ignored), and whether the graph contains cycles. This report is added to the state and logged as an AI message. Once validation is complete, the pipeline is marked as finished by setting the current_agent to END.

def router(state: KGState) -> str:
return state[“current_agent”]

def visualize_graph(graph):
plt.figure(figsize=(10, 6))
pos = nx.spring_layout(graph)

nx.draw(graph, pos, with_labels=True, node_color=”skyblue”, node_size=1500, font_size=10)

edge_labels = nx.get_edge_attributes(graph, ‘relation’)
nx.draw_networkx_edge_labels(graph, pos, edge_labels=edge_labels)

plt.title(“Knowledge Graph”)
plt.tight_layout()
plt.show()

The router function directs the pipeline to the next agent based on the current_agent field in the state. Meanwhile, the visualize_graph function uses matplotlib and networkx to display the final knowledge graph, showing nodes, edges, and labeled relationships for intuitive visual understanding.

def build_kg_graph():
workflow = StateGraph(KGState)

workflow.add_node(“data_gatherer”, data_gatherer)
workflow.add_node(“entity_extractor”, entity_extractor)
workflow.add_node(“relation_extractor”, relation_extractor)
workflow.add_node(“entity_resolver”, entity_resolver)
workflow.add_node(“graph_integrator”, graph_integrator)
workflow.add_node(“graph_validator”, graph_validator)

workflow.add_conditional_edges(“data_gatherer”, router,
{“entity_extractor”: “entity_extractor”})
workflow.add_conditional_edges(“entity_extractor”, router,
{“relation_extractor”: “relation_extractor”})
workflow.add_conditional_edges(“relation_extractor”, router,
{“entity_resolver”: “entity_resolver”})
workflow.add_conditional_edges(“entity_resolver”, router,
{“graph_integrator”: “graph_integrator”})
workflow.add_conditional_edges(“graph_integrator”, router,
{“graph_validator”: “graph_validator”})
workflow.add_conditional_edges(“graph_validator”, router,
{END: END})

workflow.set_entry_point(“data_gatherer”)

return workflow.compile()

The build_kg_graph function defines the complete knowledge graph workflow using LangGraph. It sequentially adds each agent as a node, from data collection to graph validation, and connects them through conditional transitions based on the current agent. The entry point is set to data_gatherer, and the graph is compiled into an executable workflow that guides the automated pipeline from start to finish.

def run_knowledge_graph_pipeline(topic):
print(f”🚀 Starting knowledge graph pipeline for: {topic}”)

initial_state = {
“topic”: topic,
“raw_text”: “”,
“entities”: [],
“relations”: [],
“resolved_relations”: [],
“graph”: None,
“validation”: {},
“messages”: [HumanMessage(content=f”Build a knowledge graph about {topic}”)],
“current_agent”: “data_gatherer”
}

kg_app = build_kg_graph()
final_state = kg_app.invoke(initial_state)

print(f”✨ Knowledge graph construction complete for: {topic}”)

return final_state

The run_knowledge_graph_pipeline function initializes the pipeline by setting up an empty state dictionary with the provided topic. It builds the workflow using build_kg_graph(), then runs it by invoking the compiled graph with the initial state. As each agent processes the data, the state evolves, and the final result contains the complete knowledge graph, validated and ready for use.

if __name__ == “__main__”:
topic = “Artificial Intelligence”
result = run_knowledge_graph_pipeline(topic)

visualize_graph(result[“graph”])

Finally, this block serves as the script’s entry point. When executed directly, it triggers the knowledge graph pipeline for the topic “Artificial Intelligence,” runs through all agent stages, and finally visualizes the resulting graph using the visualize_graph() function. It provides an end-to-end demonstration of automated knowledge graph generation.

Output Generated from Knowledge Graph Execution

In conclusion, we have learned how to seamlessly integrate multiple specialized agents into a cohesive knowledge graph pipeline through this structured approach, leveraging LangGraph and NetworkX. This workflow automates entity and relation extraction processes and visualizes intricate relationships, offering a clear and actionable representation of gathered information. By adjusting and enhancing individual agents, such as employing more sophisticated entity recognition methods or integrating real-time data sources, this foundational framework can be scaled and customized for advanced knowledge graph construction tasks across various domains.

Check out the Colab Notebook. All credit for this research goes to the researchers of this project. Also, feel free to follow us on Twitter and don’t forget to join our 90k+ ML SubReddit.

Asif Razzaq is the CEO of Marktechpost Media Inc.. As a visionary entrepreneur and engineer, Asif is committed to harnessing the potential of Artificial Intelligence for social good. His most recent endeavor is the launch of an Artificial Intelligence Media Platform, Marktechpost, which stands out for its in-depth coverage of machine learning and deep learning news that is both technically sound and easily understandable by a wide audience. The platform boasts of over 2 million monthly views, illustrating its popularity among audiences.



Source link

You might also like
Leave A Reply

Your email address will not be published.