Create a Datasource Plugin
A datasource plugin handles the load_graph action. It reads data from an external source (a file, API, database, service…) and returns a Graph object containing Nodes and Edges with their attributes.
Step 1 — Scaffold with the CLI
Inside your Nessie project, scaffold a new plugin:
nessie new my-csv-datasourceSelect datasource as the plugin type. The resulting structure:
my_plugins/
└── my-csv-datasource/
├── my_csv_datasource/
│ ├── __init__.py
│ └── plugin.py
├── setup.py
└── README.md
Step 2 — Write the Handler
Here is a complete datasource plugin that reads a CSV file where each row is a node and a second column represents an edge target:
import csv
from nessie_api.models import (
Action, Graph, GraphType, Node, Edge, Attribute,
plugin, SetupRequirementType
)
from nessie_api.protocols import Context
def load_graph(action: Action, context: Context) -> Graph:
csv_path = action.payload["CSV File Path"]
id_col = action.payload.get("ID Column", "id")
edge_col = action.payload.get("Edge Target Column", "")
graph = Graph("csv_graph", GraphType.DIRECTED)
node_map: dict[str, Node] = {}
with open(csv_path, newline="", encoding="utf-8") as f:
reader = csv.DictReader(f)
rows = list(reader)
# ── Create nodes ──────────────────────────────────────
for row in rows:
node_id = row.get(id_col, str(hash(str(row))))
node = Node(node_id)
for col, val in row.items():
if val: # skip empty cells
node.add_attribute(Attribute(col, val))
graph.add_node(node)
node_map[node_id] = node
# ── Create edges ──────────────────────────────────────
if edge_col:
edge_counter = 0
for row in rows:
src_id = row.get(id_col)
tgt_id = row.get(edge_col)
src = node_map.get(src_id)
tgt = node_map.get(tgt_id)
if src and tgt and src is not tgt:
edge = Edge(f"e{edge_counter}", src, tgt)
graph.add_edge(edge)
edge_counter += 1
return graph
@plugin("CSV Datasource")
def csv_datasource_plugin():
return {
"handlers": {"load_graph": load_graph},
"requires": [],
"setup_requires": {
"CSV File Path": SetupRequirementType.STRING,
"ID Column": SetupRequirementType.STRING,
"Edge Target Column": SetupRequirementType.STRING,
},
}
Step 3 — Register in pyproject.toml
[project.entry-points."nessie_plugins"]
csv_datasource = "my_csv_datasource:csv_datasource_plugin"
Step 4 — Install
nessie install my-csv-datasourceRestart the server. Your plugin will appear in the datasource picker when you open a new workspace.
Graph Building Reference
Graph
from nessie_api.models import Graph, GraphType
graph = Graph(
name="my_graph",
graph_type=GraphType.DIRECTED # or UNDIRECTED
)
Nodes
from nessie_api.models import Node, Attribute
node = Node("unique-node-id")
# Add any key-value attributes
node.add_attribute(Attribute("name", "Alice"))
node.add_attribute(Attribute("age", 30))
node.add_attribute(Attribute("salary", 75000.0))
# Or use dict syntax at construction time
node = Node("alice", attributes={
"name": Attribute("name", "Alice"),
"role": Attribute("role", "Engineer"),
})
graph.add_node(node)
Edges
from nessie_api.models import Edge
src = graph.get_node("alice")
tgt = graph.get_node("bob")
edge = Edge("edge-1", source=src, target=tgt)
edge.add_attribute(Attribute("type", "reports_to"))
graph.add_edge(edge)
Node IDs must be unique within a graph
If you call graph.get_node("id") for a non-existent node it returns None. Always check before creating edges to avoid NoneType errors.
Reporting Progress via Console
For long-running datasource operations, write status messages to the UI console using the context:
from nessie_api.models import Action
def load_graph(action, context):
context.perform_action(Action("add_console_message", {
"message": {"message": "Connecting to database…", "type": "info"}
}))
# ... load data ...
context.perform_action(Action("add_console_message", {
"message": {"message": f"Loaded {len(nodes)} nodes.", "type": "ok"}
}))
return graph
Message Types
| Type | Color in UI | Use for |
|---|---|---|
"ok" | green | Success, completion |
"error" | red | Errors, failures |
"info" | gray | Informational progress |
"warn" | amber | Warnings, partial failures |
"input" | cyan | User input prompts (CLI) |
Async Datasources
If your datasource needs async I/O (like the npm plugin), wrap the async logic with asyncio.run():
import asyncio
async def _build_graph_async(url: str) -> Graph:
# async fetching ...
pass
def load_graph(action, context):
url = action.payload["URL"]
return asyncio.run(_build_graph_async(url))