Part 2: Creating a Data Node
Quick Summary
In this part, you will:
- create your first DataNode and run it locally
- add a second DataNode that depends on the first one
- run launcher scripts from the terminal and inspect persisted tables from the CLI
- learn the difference between
update_hashandstorage_hash
DataNodes created in this part: DailyRandomNumber and DailyRandomAddition.
1. Create Your First DataNode
Key concepts: data DAGs, DataNode, dependencies, update_hash, and storage_hash.
Main Sequence encourages you to model workflows as data DAGs (directed acyclic graphs), composing your work into small steps called data nodes, each performing a single transformation.
In this chapter, you will start with one standalone node, run it locally, and then extend it with a dependent node.
Create a new file at src\data_nodes\example_nodes.py (Windows) or src/data_nodes/example_nodes.py (macOS/Linux), and define your first node, DailyRandomNumber, by subclassing DataNode.
import os
from typing import Dict, Union
import pandas as pd
from mainsequence.tdag import (
APIDataNode,
DataNode,
DataNodeConfiguration,
DataNodeMetaData,
RecordDefinition,
)
import mainsequence.client as msc
import numpy as np
from pydantic import BaseModel, Field
PROJECT_ID = os.getenv("MAIN_SEQUENCE_PROJECT_ID", "local").strip() or "local"
class VolatilityConfig(BaseModel):
center: float = Field(
...,
title="Standard Deviation",
description="Standard deviation of the normal distribution (must be > 0).",
examples=[0.1, 1.0, 2.5],
gt=0, # constraint: strictly positive
le=1e6, # example upper bound (optional)
multiple_of=0.0001, # example precision step (optional)
)
skew: bool
class RandomDataNodeConfig(DataNodeConfiguration):
mean: float = Field(
...,
title="Mean",
description="Mean for the random normal distribution generator",
)
std: VolatilityConfig = Field(
VolatilityConfig(center=1, skew=True),
title="Vol Config",
description="Vol Configuration",
json_schema_extra={"update_only": True},
)
records: list[RecordDefinition] = Field(
default_factory=lambda: [
RecordDefinition(
column_name="random_number",
dtype="float64",
label="Random Number",
description="Daily random number generated by the tutorial node.",
)
]
)
node_metadata: DataNodeMetaData = Field(
default_factory=lambda: DataNodeMetaData(
identifier=f"example_random_number_{PROJECT_ID}",
description="Example Data Node",
),
json_schema_extra={"runtime_only": True},
)
class DailyRandomNumber(DataNode):
"""
Example Data Node that generates one random number every day
"""
def __init__(
self,
config: RandomDataNodeConfig,
*,
hash_namespace: str | None = None,
test_node: bool = False,
):
"""
:param config: Configuration containing mean, volatility, records, and metadata
"""
self.mean = config.mean
self.std = config.std
super().__init__(
config=config,
hash_namespace=hash_namespace,
test_node=test_node,
)
def update(self) -> pd.DataFrame:
"""Draw daily samples from N(mean, std) since last run (UTC days)."""
today = pd.Timestamp.now("UTC").normalize()
last = self.update_statistics.max_time_index_value
if last is not None and last >= today:
return pd.DataFrame()
return pd.DataFrame(
{"random_number": [np.random.normal(self.mean, self.std.center)]},
index=pd.DatetimeIndex([today], name="time_index", tz="UTC"),
)
def dependencies(self) -> Dict[str, Union["DataNode", "APIDataNode"]]:
"""
This node does not depend on any other data nodes.
"""
return {}
For simple cases, table metadata can now live directly in DataNodeConfiguration
under node_metadata. When you do that, the base DataNode.get_table_metadata()
builds msc.TableMetaData for you, so you do not need to override
get_table_metadata() unless you need custom logic.
Important
TableMetaData.identifier must be unique across your organization. In tutorial code, generic names like example_random_number are very likely to collide because someone else in your organization has probably already run the same tutorial.
That is why this example includes MAIN_SEQUENCE_PROJECT_ID from the generated .env file. It gives each project a stable table name while still keeping the identifier readable.
identifier is published runtime metadata, not hash identity. That means you can
later repoint a published identifier to a different backing table during a migration
without rotating storage_hash or update_hash.
This is different from the unique_identifier field used later in MultiIndex asset tables. Here, you are naming the table itself, not an individual asset row.
If you want to inspect the organization-visible DataNode table identifiers before choosing one, run:
mainsequence data-node org-unique-identifiers
This command lists DataNode table identifiers, not asset unique_identifier values.
In Pydantic v2, mark updater-scope fields with json_schema_extra={"update_only": True} when they should affect update_hash but not storage_hash.
If a field should be kept only for UI or descriptive purposes and must affect
neither hash, mark it with json_schema_extra={"runtime_only": True}.
Typical examples are labels and descriptions attached to config-driven column
definitions. RecordDefinition is already provided by the SDK:
from mainsequence.tdag import RecordDefinition
records: list[RecordDefinition] = Field(
default_factory=lambda: [
RecordDefinition(
column_name="random_number",
dtype="float64",
label="Random Number",
description="Daily random number generated by the tutorial node.",
)
]
)
Use runtime_only only for descriptive metadata. If changing the field would
change output values, dependencies, or schema, it should not be runtime_only.
The other important runtime-only case is config-driven table metadata. DataNodeMetaData
is also provided by the SDK:
from mainsequence.tdag import DataNodeMetaData
node_metadata: DataNodeMetaData = Field(
default_factory=lambda: DataNodeMetaData(
identifier=f"example_random_number_{PROJECT_ID}",
description="Example Data Node",
),
json_schema_extra={"runtime_only": True},
)
In DataNodeConfiguration, this lives under node_metadata. The whole block is
runtime-only so published metadata can evolve independently from the underlying
hashed table identity.
There is no separate portable_identifier flag. In the current SDK,
DataNodeMetaData.identifier is already treated as portable published metadata.
DataNode Recipe
Every DataNode follows the same basic recipe:
- Extend the base class
mainsequence.tdag.DataNode - Implement the constructor method
__init__() - Implement the
dependencies()method - Implement the
update()method
The update() Method
The update() method has one hard requirement: it must return a pandas.DataFrame.
DataFrame structure requirements
update()must always return apd.DataFrame()- the first index level must always be
datetime.datetime(timezone="UTC") - all column names must be lowercase and no more than 63 characters long
- column types should be
float,int, orstr; date values should live in the index or be converted to numeric timestamps - if there is new data to return, the DataFrame must contain rows; if there is no new data, return an empty
pd.DataFrame() - a MultiIndex DataFrame is only allowed when the first index level is UTC datetimes and the second index level is a string named
unique_identifier - a single-index DataFrame must not contain duplicate index values; a MultiIndex DataFrame must not contain duplicate
(time_index, unique_identifier)pairs - the first index level must always be named
time_index, and it should represent the observation time of the data - if dates are stored in columns, they should be represented as timestamps
Next, create scripts\random_number_launcher.py to run the node:
from src.data_nodes.example_nodes import DailyRandomNumber, RandomDataNodeConfig
def main():
daily_node = DailyRandomNumber(config=RandomDataNodeConfig(mean=0.0))
daily_node.run()
if __name__ == "__main__":
main()
Test the node in a namespace first
Before you start running a new DataNode against a shared backend, use a namespace first.
Why this matters:
- it isolates your first test runs from shared tables
- it gives you a safe way to validate schema and update behavior
- it keeps experimentation separate from production-like resources
Use hash_namespace(...) while you are developing or testing:
from mainsequence.tdag.data_nodes import hash_namespace
from src.data_nodes.example_nodes import DailyRandomNumber, RandomDataNodeConfig
def main():
with hash_namespace("tutorial_daily_random_number"):
daily_node = DailyRandomNumber(config=RandomDataNodeConfig(mean=0.0))
daily_node.run(debug_mode=True, force_update=True)
if __name__ == "__main__":
main()
This should be your default habit when you are validating a new node for the first time.
For real projects, also keep a small smoke test under tests/, for example tests/test_daily_random_number.py:
from mainsequence.tdag.data_nodes import hash_namespace
from src.data_nodes.example_nodes import DailyRandomNumber, RandomDataNodeConfig
def test_daily_random_number_smoke():
with hash_namespace("pytest_daily_random_number_smoke"):
node = DailyRandomNumber(config=RandomDataNodeConfig(mean=0.0))
err, df = node.run(debug_mode=True, force_update=True)
assert err is False
assert df is not None
Once that namespaced run behaves as expected, you can run the same node without a namespace when you are ready to publish or share the real dataset.
Run the launcher directly from the terminal:
python scripts/random_number_launcher.py
If your shell uses python3 instead of python, run:
python3 scripts/random_number_launcher.py
Verify From the CLI
Confirm that the launcher created update records:
mainsequence project data-node-updates list
Then locate the published table by its identifier:
mainsequence data-node list --filter identifier__contains=example_random_number_
If you want the full record for one row, inspect it directly:
mainsequence data-node detail <DATA_NODE_STORAGE_ID>
If your local project auth has expired or your .env does not yet contain fresh project JWTs, refresh them first:
mainsequence project refresh_token --path .
The CLI output lists the update ID, update hash, data node storage, and update details for the current project. Run it again after random_daily_addition_launcher.py or after the updated random_number_launcher.py to confirm that additional update processes were created.
Add a Dependent Data Node
Now extend the workflow with a node that depends on DailyRandomNumber. Add the following to src\data_nodes\example_nodes.py:
class DailyRandomAdditionConfig(DataNodeConfiguration):
mean: float
std: float
class DailyRandomAddition(DataNode):
def __init__(
self,
config: DailyRandomAdditionConfig,
*,
hash_namespace: str | None = None,
test_node: bool = False,
):
self.mean = config.mean
self.std = config.std
self.daily_random_number_data_node = DailyRandomNumber(
config=RandomDataNodeConfig(mean=0.0),
hash_namespace=hash_namespace,
test_node=test_node,
)
super().__init__(
config=config,
hash_namespace=hash_namespace,
test_node=test_node,
)
def dependencies(self):
return {"number_generator": self.daily_random_number_data_node}
def update(self) -> pd.DataFrame:
"""Draw daily samples from N(mean, std) since last run (UTC days)."""
today = pd.Timestamp.now("UTC").normalize()
last = self.update_statistics.max_time_index_value
if last is not None and last >= today:
return pd.DataFrame()
random_number = np.random.normal(self.mean, self.std)
dependency_noise = self.daily_random_number_data_node.get_df_between_dates(
start_date=today, great_or_equal=True
).iloc[0]["random_number"]
self.logger.info(f"random_number={random_number} dependency_noise={dependency_noise}")
return pd.DataFrame(
{"random_number": [random_number + dependency_noise]},
index=pd.DatetimeIndex([today], name="time_index", tz="UTC"),
)
This adds a dependent node, DailyRandomAddition, that reads the output of DailyRandomNumber and uses it in its own update logic.
Create a launcher at scripts\random_daily_addition_launcher.py:
from src.data_nodes.example_nodes import DailyRandomAddition, DailyRandomAdditionConfig
daily_node = DailyRandomAddition(config=DailyRandomAdditionConfig(mean=0.0, std=1.0))
daily_node.run(debug_mode=True, force_update=True)
Run the new launcher from the terminal:
python scripts/random_daily_addition_launcher.py
If your shell uses python3, run:
python3 scripts/random_daily_addition_launcher.py
Because DailyRandomAdditionConfig does not set node_metadata in this example, the new table may not have a friendly identifier yet. Use mainsequence project data-node-updates list and mainsequence data-node list to locate the newest table created by this run. If you want a predictable human-readable table name, add node_metadata=DataNodeMetaData(...) to DailyRandomAdditionConfig.
The important thing to verify here is that the dependent node ran successfully and created a new update process in the current project.
4. update_hash vs. storage_hash
A DataNode does two critical things in Main Sequence:
- Controls the update process for your data (sequential or time-series based).
- Persists data in the Data Engine (think of it as a managed database—no need to handle schemas, sessions, etc.).
To support both, each DataNode uses two identifiers:
update_hash: a unique hash derived from the combination of arguments that define an update process. In the random-number example, that might includemeanandstd.storage_hash: an identifier for where data is stored. It can ignore specific arguments so multiple update processes can write to the same table.
Why do this? Sometimes you want to store data from different processes in a single table. While the simple example here is contrived, this pattern becomes very useful with multi-index tables.
Now update your daily random number launcher to run two update processes with different volatility configurations but the same storage.
To do this, modify scripts\random_number_launcher.py to be as follows:
from src.data_nodes.example_nodes import DailyRandomNumber, RandomDataNodeConfig, VolatilityConfig
low_vol = VolatilityConfig(center=0.5, skew=False)
high_vol = VolatilityConfig(center=2.0, skew=True)
daily_node_low = DailyRandomNumber(config=RandomDataNodeConfig(mean=0.0, std=low_vol))
daily_node_high = DailyRandomNumber(
config=RandomDataNodeConfig(mean=0.0, std=high_vol)
)
daily_node_low.run(debug_mode=True, force_update=True)
daily_node_high.run(debug_mode=True, force_update=True)
Here we create two DailyRandomNumber nodes with different std (Volatility) configurations but the same mean. Since we set update_only=True for the std field in RandomDataNodeConfig, both nodes will write to the same underlying table. The tutorial identifier stays stable because it is based on project id and mean, not on std.
Run the updated launcher from the terminal as before:
python scripts/random_number_launcher.py
If your shell uses python3, run:
python3 scripts/random_number_launcher.py
Then inspect the result from the CLI:
mainsequence project data-node-updates list
mainsequence data-node list --filter identifier__contains=example_random_number_
You should see that you still have one tutorial table identifier, but additional update processes were created for the different updater configurations.
You can also monitor the data nodes updates via the cli by running:
mainsequence project data-node-updates list
Project Data Node Updates
ID Update Hash Data Node Storage Update Details
────────────────────────────────────────────────────────────────────────────────────────────────
8005 dailyrandomnumber_009e3dfd8059e97933414c8e54b13af1 5016 -
8004 dailyrandomnumber_f32b575aa53142a50fa10c2fbff4d658 5016 -
At this point, you have built your first DataNodes in Main Sequence. In the next part of the tutorial, you will move from local execution to shared access control and then to orchestration.
For further reference on DataNode concepts and best practices, see Data Nodes Knowledge Guide.