Skip to content

Part 2: Creating a Data Node

Quick Summary

In this part, you will:

  • create your first DataNode and run it locally
  • add a second DataNode that depends on the first one
  • run launcher scripts from the terminal and inspect persisted tables from the CLI
  • learn the difference between update_hash and storage_hash

DataNodes created in this part: DailyRandomNumber and DailyRandomAddition.

1. Create Your First DataNode

Key concepts: data DAGs, DataNode, dependencies, update_hash, and storage_hash.

Main Sequence encourages you to model workflows as data DAGs (directed acyclic graphs), composing your work into small steps called data nodes, each performing a single transformation.

In this chapter, you will start with one standalone node, run it locally, and then extend it with a dependent node.

Create a new file at src\data_nodes\example_nodes.py (Windows) or src/data_nodes/example_nodes.py (macOS/Linux), and define your first node, DailyRandomNumber, by subclassing DataNode.

import os
from typing import Dict, Union

import pandas as pd

from mainsequence.tdag import (
    APIDataNode,
    DataNode,
    DataNodeConfiguration,
    DataNodeMetaData,
    RecordDefinition,
)
import mainsequence.client as msc
import numpy as np
from pydantic import BaseModel, Field

PROJECT_ID = os.getenv("MAIN_SEQUENCE_PROJECT_ID", "local").strip() or "local"


class VolatilityConfig(BaseModel):
    center: float = Field(
        ...,
        title="Standard Deviation",
        description="Standard deviation of the normal distribution (must be > 0).",
        examples=[0.1, 1.0, 2.5],
        gt=0,  # constraint: strictly positive
        le=1e6,  # example upper bound (optional)
        multiple_of=0.0001,  # example precision step (optional)
    )
    skew: bool


class RandomDataNodeConfig(DataNodeConfiguration):
    mean: float = Field(
        ...,
        title="Mean",
        description="Mean for the random normal distribution generator",
    )
    std: VolatilityConfig = Field(
        VolatilityConfig(center=1, skew=True),
        title="Vol Config",
        description="Vol Configuration",
        json_schema_extra={"update_only": True},
    )
    records: list[RecordDefinition] = Field(
        default_factory=lambda: [
            RecordDefinition(
                column_name="random_number",
                dtype="float64",
                label="Random Number",
                description="Daily random number generated by the tutorial node.",
            )
        ]
    )
    node_metadata: DataNodeMetaData = Field(
        default_factory=lambda: DataNodeMetaData(
            identifier=f"example_random_number_{PROJECT_ID}",
            description="Example Data Node",
        ),
        json_schema_extra={"runtime_only": True},
    )


class DailyRandomNumber(DataNode):
    """
    Example Data Node that generates one random number every day
    """

    def __init__(
        self,
        config: RandomDataNodeConfig,
        *,
        hash_namespace: str | None = None,
        test_node: bool = False,
    ):
        """
        :param config: Configuration containing mean, volatility, records, and metadata
        """
        self.mean = config.mean
        self.std = config.std
        super().__init__(
            config=config,
            hash_namespace=hash_namespace,
            test_node=test_node,
        )

    def update(self) -> pd.DataFrame:
        """Draw daily samples from N(mean, std) since last run (UTC days)."""
        today = pd.Timestamp.now("UTC").normalize()
        last = self.update_statistics.max_time_index_value
        if last is not None and last >= today:
            return pd.DataFrame()
        return pd.DataFrame(
            {"random_number": [np.random.normal(self.mean, self.std.center)]},
            index=pd.DatetimeIndex([today], name="time_index", tz="UTC"),
        )

    def dependencies(self) -> Dict[str, Union["DataNode", "APIDataNode"]]:
        """
        This node does not depend on any other data nodes.
        """
        return {}

For simple cases, table metadata can now live directly in DataNodeConfiguration under node_metadata. When you do that, the base DataNode.get_table_metadata() builds msc.TableMetaData for you, so you do not need to override get_table_metadata() unless you need custom logic.

Important

TableMetaData.identifier must be unique across your organization. In tutorial code, generic names like example_random_number are very likely to collide because someone else in your organization has probably already run the same tutorial.

That is why this example includes MAIN_SEQUENCE_PROJECT_ID from the generated .env file. It gives each project a stable table name while still keeping the identifier readable.

identifier is published runtime metadata, not hash identity. That means you can later repoint a published identifier to a different backing table during a migration without rotating storage_hash or update_hash.

This is different from the unique_identifier field used later in MultiIndex asset tables. Here, you are naming the table itself, not an individual asset row.

If you want to inspect the organization-visible DataNode table identifiers before choosing one, run:

mainsequence data-node org-unique-identifiers

This command lists DataNode table identifiers, not asset unique_identifier values.

In Pydantic v2, mark updater-scope fields with json_schema_extra={"update_only": True} when they should affect update_hash but not storage_hash.

If a field should be kept only for UI or descriptive purposes and must affect neither hash, mark it with json_schema_extra={"runtime_only": True}.

Typical examples are labels and descriptions attached to config-driven column definitions. RecordDefinition is already provided by the SDK:

from mainsequence.tdag import RecordDefinition


records: list[RecordDefinition] = Field(
    default_factory=lambda: [
        RecordDefinition(
            column_name="random_number",
            dtype="float64",
            label="Random Number",
            description="Daily random number generated by the tutorial node.",
        )
    ]
)

Use runtime_only only for descriptive metadata. If changing the field would change output values, dependencies, or schema, it should not be runtime_only.

The other important runtime-only case is config-driven table metadata. DataNodeMetaData is also provided by the SDK:

from mainsequence.tdag import DataNodeMetaData


node_metadata: DataNodeMetaData = Field(
    default_factory=lambda: DataNodeMetaData(
        identifier=f"example_random_number_{PROJECT_ID}",
        description="Example Data Node",
    ),
    json_schema_extra={"runtime_only": True},
)

In DataNodeConfiguration, this lives under node_metadata. The whole block is runtime-only so published metadata can evolve independently from the underlying hashed table identity.

There is no separate portable_identifier flag. In the current SDK, DataNodeMetaData.identifier is already treated as portable published metadata.

DataNode Recipe

Every DataNode follows the same basic recipe:

  1. Extend the base class mainsequence.tdag.DataNode
  2. Implement the constructor method __init__()
  3. Implement the dependencies() method
  4. Implement the update() method

The update() Method

The update() method has one hard requirement: it must return a pandas.DataFrame.

DataFrame structure requirements
  • update() must always return a pd.DataFrame()
  • the first index level must always be datetime.datetime(timezone="UTC")
  • all column names must be lowercase and no more than 63 characters long
  • column types should be float, int, or str; date values should live in the index or be converted to numeric timestamps
  • if there is new data to return, the DataFrame must contain rows; if there is no new data, return an empty pd.DataFrame()
  • a MultiIndex DataFrame is only allowed when the first index level is UTC datetimes and the second index level is a string named unique_identifier
  • a single-index DataFrame must not contain duplicate index values; a MultiIndex DataFrame must not contain duplicate (time_index, unique_identifier) pairs
  • the first index level must always be named time_index, and it should represent the observation time of the data
  • if dates are stored in columns, they should be represented as timestamps

Next, create scripts\random_number_launcher.py to run the node:

from src.data_nodes.example_nodes import DailyRandomNumber, RandomDataNodeConfig


def main():
    daily_node = DailyRandomNumber(config=RandomDataNodeConfig(mean=0.0))
    daily_node.run()


if __name__ == "__main__":
    main()

Test the node in a namespace first

Before you start running a new DataNode against a shared backend, use a namespace first.

Why this matters:

  • it isolates your first test runs from shared tables
  • it gives you a safe way to validate schema and update behavior
  • it keeps experimentation separate from production-like resources

Use hash_namespace(...) while you are developing or testing:

from mainsequence.tdag.data_nodes import hash_namespace

from src.data_nodes.example_nodes import DailyRandomNumber, RandomDataNodeConfig


def main():
    with hash_namespace("tutorial_daily_random_number"):
        daily_node = DailyRandomNumber(config=RandomDataNodeConfig(mean=0.0))
        daily_node.run(debug_mode=True, force_update=True)


if __name__ == "__main__":
    main()

This should be your default habit when you are validating a new node for the first time.

For real projects, also keep a small smoke test under tests/, for example tests/test_daily_random_number.py:

from mainsequence.tdag.data_nodes import hash_namespace

from src.data_nodes.example_nodes import DailyRandomNumber, RandomDataNodeConfig


def test_daily_random_number_smoke():
    with hash_namespace("pytest_daily_random_number_smoke"):
        node = DailyRandomNumber(config=RandomDataNodeConfig(mean=0.0))
        err, df = node.run(debug_mode=True, force_update=True)

    assert err is False
    assert df is not None

Once that namespaced run behaves as expected, you can run the same node without a namespace when you are ready to publish or share the real dataset.

Run the launcher directly from the terminal:

python scripts/random_number_launcher.py

If your shell uses python3 instead of python, run:

python3 scripts/random_number_launcher.py

Verify From the CLI

Confirm that the launcher created update records:

mainsequence project data-node-updates list

Then locate the published table by its identifier:

mainsequence data-node list --filter identifier__contains=example_random_number_

If you want the full record for one row, inspect it directly:

mainsequence data-node detail <DATA_NODE_STORAGE_ID>

If your local project auth has expired or your .env does not yet contain fresh project JWTs, refresh them first:

mainsequence project refresh_token --path .

The CLI output lists the update ID, update hash, data node storage, and update details for the current project. Run it again after random_daily_addition_launcher.py or after the updated random_number_launcher.py to confirm that additional update processes were created.

Add a Dependent Data Node

Now extend the workflow with a node that depends on DailyRandomNumber. Add the following to src\data_nodes\example_nodes.py:

class DailyRandomAdditionConfig(DataNodeConfiguration):
    mean: float
    std: float


class DailyRandomAddition(DataNode):
    def __init__(
        self,
        config: DailyRandomAdditionConfig,
        *,
        hash_namespace: str | None = None,
        test_node: bool = False,
    ):
        self.mean = config.mean
        self.std = config.std
        self.daily_random_number_data_node = DailyRandomNumber(
            config=RandomDataNodeConfig(mean=0.0),
            hash_namespace=hash_namespace,
            test_node=test_node,
        )
        super().__init__(
            config=config,
            hash_namespace=hash_namespace,
            test_node=test_node,
        )

    def dependencies(self):
        return {"number_generator": self.daily_random_number_data_node}

    def update(self) -> pd.DataFrame:
        """Draw daily samples from N(mean, std) since last run (UTC days)."""
        today = pd.Timestamp.now("UTC").normalize()
        last = self.update_statistics.max_time_index_value
        if last is not None and last >= today:
            return pd.DataFrame()
        random_number = np.random.normal(self.mean, self.std)
        dependency_noise = self.daily_random_number_data_node.get_df_between_dates(
            start_date=today, great_or_equal=True
        ).iloc[0]["random_number"]
        self.logger.info(f"random_number={random_number} dependency_noise={dependency_noise}")

        return pd.DataFrame(
            {"random_number": [random_number + dependency_noise]},
            index=pd.DatetimeIndex([today], name="time_index", tz="UTC"),
        )

This adds a dependent node, DailyRandomAddition, that reads the output of DailyRandomNumber and uses it in its own update logic.

Create a launcher at scripts\random_daily_addition_launcher.py:

from src.data_nodes.example_nodes import DailyRandomAddition, DailyRandomAdditionConfig


daily_node = DailyRandomAddition(config=DailyRandomAdditionConfig(mean=0.0, std=1.0))
daily_node.run(debug_mode=True, force_update=True)

Run the new launcher from the terminal:

python scripts/random_daily_addition_launcher.py

If your shell uses python3, run:

python3 scripts/random_daily_addition_launcher.py

Because DailyRandomAdditionConfig does not set node_metadata in this example, the new table may not have a friendly identifier yet. Use mainsequence project data-node-updates list and mainsequence data-node list to locate the newest table created by this run. If you want a predictable human-readable table name, add node_metadata=DataNodeMetaData(...) to DailyRandomAdditionConfig.

The important thing to verify here is that the dependent node ran successfully and created a new update process in the current project.

4. update_hash vs. storage_hash

A DataNode does two critical things in Main Sequence:

  1. Controls the update process for your data (sequential or time-series based).
  2. Persists data in the Data Engine (think of it as a managed database—no need to handle schemas, sessions, etc.).

To support both, each DataNode uses two identifiers:

  • update_hash: a unique hash derived from the combination of arguments that define an update process. In the random-number example, that might include mean and std.
  • storage_hash: an identifier for where data is stored. It can ignore specific arguments so multiple update processes can write to the same table.

Why do this? Sometimes you want to store data from different processes in a single table. While the simple example here is contrived, this pattern becomes very useful with multi-index tables.

Now update your daily random number launcher to run two update processes with different volatility configurations but the same storage.

To do this, modify scripts\random_number_launcher.py to be as follows:

from src.data_nodes.example_nodes import DailyRandomNumber, RandomDataNodeConfig, VolatilityConfig

low_vol = VolatilityConfig(center=0.5, skew=False)
high_vol = VolatilityConfig(center=2.0, skew=True)


daily_node_low = DailyRandomNumber(config=RandomDataNodeConfig(mean=0.0, std=low_vol))
daily_node_high = DailyRandomNumber(
    config=RandomDataNodeConfig(mean=0.0, std=high_vol)
)

daily_node_low.run(debug_mode=True, force_update=True)
daily_node_high.run(debug_mode=True, force_update=True)

Here we create two DailyRandomNumber nodes with different std (Volatility) configurations but the same mean. Since we set update_only=True for the std field in RandomDataNodeConfig, both nodes will write to the same underlying table. The tutorial identifier stays stable because it is based on project id and mean, not on std.

Run the updated launcher from the terminal as before:

python scripts/random_number_launcher.py

If your shell uses python3, run:

python3 scripts/random_number_launcher.py

Then inspect the result from the CLI:

mainsequence project data-node-updates list
mainsequence data-node list --filter identifier__contains=example_random_number_

You should see that you still have one tutorial table identifier, but additional update processes were created for the different updater configurations.

You can also monitor the data nodes updates via the cli by running:

mainsequence project data-node-updates list

                                    Project Data Node Updates                                     

  ID     Update Hash                                          Data Node Storage   Update Details  
 ──────────────────────────────────────────────────────────────────────────────────────────────── 
  8005   dailyrandomnumber_009e3dfd8059e97933414c8e54b13af1   5016                -               
  8004   dailyrandomnumber_f32b575aa53142a50fa10c2fbff4d658   5016                -     

At this point, you have built your first DataNodes in Main Sequence. In the next part of the tutorial, you will move from local execution to shared access control and then to orchestration.

For further reference on DataNode concepts and best practices, see Data Nodes Knowledge Guide.