Skip to content

Migrating DataNodes To MetaTable-Driven Storage

This guide is for libraries that still use the old DataNode style where the node declares its output schema inline. The new SDK model is storage-first: a PlatformTimeIndexMetaTable SQLAlchemy model owns the table contract, and the DataNode only owns update logic.

What Changed

Old DataNode New DataNode
Node declares schema with methods such as get_table_metadata() and get_column_metadata(). A PlatformTimeIndexMetaTable SQLAlchemy model declares the schema.
Node metadata mixes table identity, columns, and update behavior. Storage identity lives on the storage class; update identity lives in DataNodeConfiguration.
Registration is implicit or manually glued together from table metadata. Registration is migration-first: add the storage class to the MetaTable migration provider and run mainsequence migrations upgrade.
Foreign keys were sometimes glued through SDK/backend UID maps. Foreign keys are normal SQLAlchemy/Alembic DDL metadata; they are not serialized into MetaTable registration contracts.
storage_table had to be registered before node construction. Output storage_table must be registered by the MetaTable migration workflow before DataNode / PersistManager uses it. Config-stored storage classes fail if they are not already bound.

The New Mental Model

A modern DataNode has two explicit objects:

  • a storage class: type[PlatformTimeIndexMetaTable]
  • an update process: DataNode

The storage class answers:

  • What table is this?
  • Which project/session data source owns it?
  • What columns and descriptions define the MetaTable contract?
  • What SQLAlchemy/Alembic DDL metadata, such as indexes and foreign keys, belongs in migrations?
  • What stable storage identity should downstream users depend on?

The DataNode answers:

  • What configuration controls this updater?
  • What dependencies does this updater read?
  • What rows does this update produce now?

Do not put storage schema, published table metadata, backend UIDs, or data-source UIDs in DataNodeConfiguration.

Public Registration Rule

For platform-managed SDK storage classes, the public registration path is the MetaTable migration workflow:

mainsequence migrations upgrade --provider sdk_examples.migrations:migration head

Do not call StorageClass.register() directly in application/bootstrap code. The method remains SDK plumbing for migration tooling.

Those values are inferred:

  • data source: active Main Sequence project/session
  • identifier: __metatable_identifier__
  • namespace: __metatable_namespace__
  • description: __metatable_description__
  • labels: __metatable_labels__
  • provisioning: __metatable_provisioning__ or SDK default
  • time index: __time_index_name__
  • index grain: __index_names__

introspect belongs to lower-level externally registered table flows. It is not part of normal platform-managed DataNode storage registration.

Migration Pattern

Every migrated node should end up with three declarations.

Import Surface

Old libraries commonly import DataNode from mainsequence.tdag or from a domain-layer compatibility module. SDK-level migrations should import the modern DataNode surface from mainsequence.meta_tables:

from mainsequence.meta_tables import (
    DataNode,
    DataNodeConfiguration,
    PlatformTimeIndexMetaTable,
    schema_table_name,
)

Domain libraries may wrap these primitives. In those projects, map the same storage/config/node boundaries onto the library's public wrappers rather than mixing layers.

1. Storage Class

Move table identity and schema out of the node and into a PlatformTimeIndexMetaTable model.

import datetime

from sqlalchemy import DateTime, Float, String
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column

from mainsequence.meta_tables import PlatformTimeIndexMetaTable, schema_table_name

PROJECT_TABLE_APP = "example_project"


class Base(DeclarativeBase):
    pass


class DailyMetricsStorage(PlatformTimeIndexMetaTable, Base):
    __tablename__ = schema_table_name(PROJECT_TABLE_APP, "daily_metrics")
    __metatable_namespace__ = "example-data"
    __metatable_identifier__ = "example_project.daily_metrics"
    __metatable_description__ = (
        "Daily metric observations keyed by entity identifier for downstream "
        "analytics."
    )
    __metatable_labels__ = ["metrics", "daily"]

    __time_index_name__ = "time_index"
    __index_names__ = ["time_index", "entity_identifier"]

    time_index: Mapped[datetime.datetime] = mapped_column(
        DateTime(timezone=True),
        nullable=False,
        info={
            "label": "Time Index",
            "description": "UTC observation timestamp for the daily metric row.",
        },
    )
    entity_identifier: Mapped[str] = mapped_column(
        String(255),
        nullable=False,
        info={
            "label": "Entity Identifier",
            "description": "Stable identifier for the observed entity.",
        },
    )
    value: Mapped[float] = mapped_column(
        Float,
        nullable=False,
        info={
            "label": "Value",
            "description": "Daily numeric value produced by the source system.",
        },
    )

Rules:

  • Always include __metatable_description__.
  • Describe both schema and intention, not only column names.
  • Use SQLAlchemy column info for column labels and descriptions.
  • Use schema_table_name(project_or_app, concept) for authored SQLAlchemy table names so project/app prefixes are consistent and bounded for PostgreSQL.
  • If you need a deterministic contract fingerprint outside registration, call compute_metatable_contract_hash(..., extra_components={...}) explicitly. Do not put hash-only inputs on the storage class.

2. Configuration

Keep configuration focused on update identity.

import datetime

from pydantic import Field

from mainsequence.meta_tables import DataNodeConfiguration


class DailyMetricsConfig(DataNodeConfiguration):
    source_scope: str = Field(
        ...,
        description="Stable upstream scope selected by this updater.",
        examples=["default_scope", "regional_scope"],
    )
    offset_start: datetime.datetime | None = Field(
        default=None,
        description="Optional first-run lower bound for this updater.",
        examples=["2024-01-01T00:00:00+00:00"],
    )

Use Field(...) with descriptions for configuration fields. Add examples=[...] when useful.

If a field is descriptive UI metadata and must not affect update identity, use:

display_label: str | None = Field(
    default=None,
    description="Human-facing label for UI display only.",
    json_schema_extra={"hash_excluded": True},
)

Do not use legacy schema extras such as update_only, runtime_only, or ignore_from_storage_hash. They are not the current SDK contract.

3. DataNode

The node receives the config and storage class, then produces rows.

import pandas as pd

from mainsequence.meta_tables import DataNode, PlatformTimeIndexMetaTable


class DailyMetricsNode(DataNode):
    def __init__(
        self,
        config: DailyMetricsConfig,
        storage_table: type[PlatformTimeIndexMetaTable] = DailyMetricsStorage,
        *,
        hash_namespace: str | None = None,
    ) -> None:
        self.source_scope = config.source_scope
        super().__init__(
            config=config,
            storage_table=storage_table,
            hash_namespace=hash_namespace,
        )

    def dependencies(self) -> dict:
        return {}

    def update(self) -> pd.DataFrame:
        return build_daily_metrics_frame(self.source_scope)

The returned frame must match the storage table contract:

  • index includes the declared __index_names__
  • time_index is timezone-aware UTC
  • value columns match the SQLAlchemy storage columns
  • no extra payload columns appear

PlatformTimeIndexMetaTable automatically adds a SQLAlchemy unique index across the declared __index_names__ tuple. Do not manually repeat that full unique index in __table_args__; reserve explicit Index(...) declarations for additional lookup/performance paths.

Auto-Registration During Construction And Hashing

The SDK now registers storage classes at the places where it needs backend identity:

  • output storage_table passed to DataNode
  • output storage_table validated by PersistManager
  • type[PlatformTimeIndexMetaTable] values inside DataNodeConfiguration

That means this is valid:

node = DailyMetricsNode(
    config=DailyMetricsConfig(source_scope="default_scope"),
    storage_table=DailyMetricsStorage,
)

You do not need to pre-register the output storage table just to construct or run the node.

This is also valid when a dependency storage class is part of config:

class SpreadConfig(DataNodeConfiguration):
    base_storage: type[PlatformTimeIndexMetaTable] = Field(
        ...,
        description="Storage table for the base metric series.",
    )


node = DerivedMetricsNode(
    config=SpreadConfig(base_storage=DailyMetricsStorage),
    storage_table=SpreadStorage,
)

Before hashing that config, the SDK requires DailyMetricsStorage to already be bound by the migration workflow, then hashes by the bound TimeIndexMetaTable.uid. If it is not bound, config serialization fails and tells the user to run migrations.

Old Code To Delete

Remove old inline schema declarations from the node:

def get_table_metadata(self):
    ...


def get_column_metadata(self):
    ...

Remove hand-built table metadata objects from update classes. The table contract must be declared once on the SQLAlchemy storage model.

Also remove manual binding or UID plumbing:

Storage.bind_meta_table(...)              # wrong
Storage.__metatable_uid__ = "..."         # wrong
storage_hash = "backend_table_name"       # wrong as config
data_source_uid = "..."                   # wrong as DataNode config

Foreign Keys

Use normal SQLAlchemy ForeignKey(...) for platform-managed storage relationships. The SDK does not wrap FK declarations and does not resolve FK target MetaTable UIDs. Alembic, SQLAlchemy, and the database own the physical FK DDL.

import uuid

from sqlalchemy import ForeignKey, Uuid
from sqlalchemy.orm import Mapped, mapped_column

from mainsequence.meta_tables import (
    PlatformManagedMetaTable,
    PlatformTimeIndexMetaTable,
    schema_table_name,
)

PROJECT_TABLE_APP = "example_project"
ACCOUNT_TABLE_NAME = schema_table_name(PROJECT_TABLE_APP, "account")
ACCOUNT_POSITIONS_TABLE_NAME = schema_table_name(PROJECT_TABLE_APP, "account_positions")


class Account(PlatformManagedMetaTable, Base):
    __tablename__ = ACCOUNT_TABLE_NAME
    __metatable_namespace__ = "accounts"
    __metatable_identifier__ = "example_project.account"
    __metatable_description__ = "Account master rows used to scope positions."

    uid: Mapped[uuid.UUID] = mapped_column(
        Uuid,
        primary_key=True,
        info={
            "label": "Account UID",
            "description": "Stable account identifier referenced by dependent observation tables.",
        },
    )


class AccountPositions(PlatformTimeIndexMetaTable, Base):
    __tablename__ = ACCOUNT_POSITIONS_TABLE_NAME
    __metatable_namespace__ = "positions"
    __metatable_identifier__ = "example_project.account_positions"
    __metatable_description__ = "Time-indexed position rows keyed by account."
    __time_index_name__ = "time_index"
    __index_names__ = ["time_index", "account_uid"]

    account_uid: Mapped[uuid.UUID] = mapped_column(
        Uuid,
        ForeignKey(f"{ACCOUNT_TABLE_NAME}.uid", ondelete="RESTRICT"),
        nullable=False,
        info={
            "label": "Account UID",
            "description": "Account identifier that scopes this position observation.",
        },
    )

Prefer project-prefixed SQLAlchemy table names for FK string targets. Do not use backend physical table names or target MetaTable UIDs in FK declarations; the FK target is database DDL metadata, not a MetaTable registration contract.

Prefix explicit table identifiers and explicit physical table names with the project or package name. Prefer schema_table_name(project_or_app, concept) for authored SQLAlchemy names. Bare names such as account, positions, or alembic_version are easy to collide across projects sharing an organization or database schema.

Parameterized Storage Tables

If a library used one node class for multiple table identities, create one storage class per identity. Use a memoized factory so repeated calls return the same SQLAlchemy mapped class object.

import datetime
from functools import lru_cache

from sqlalchemy import DateTime, Float, String
from sqlalchemy.orm import Mapped, mapped_column

from mainsequence.meta_tables import PlatformTimeIndexMetaTable, schema_table_name


class _ObservationColumns(PlatformTimeIndexMetaTable, Base):
    __abstract__ = True
    __time_index_name__ = "time_index"
    __index_names__ = ["time_index", "entity_identifier"]

    time_index: Mapped[datetime.datetime] = mapped_column(
        DateTime(timezone=True),
        nullable=False,
        info={
            "label": "Time Index",
            "description": "UTC observation timestamp for this dataset variant.",
        },
    )
    entity_identifier: Mapped[str] = mapped_column(
        String(255),
        nullable=False,
        info={
            "label": "Entity Identifier",
            "description": "Stable identifier for the observed entity.",
        },
    )
    value: Mapped[float] = mapped_column(
        Float,
        nullable=False,
        info={
            "label": "Value",
            "description": "Numeric observation value for this dataset variant.",
        },
    )


@lru_cache(maxsize=None)
def observation_storage(dataset_variant: str) -> type[_ObservationColumns]:
    normalized = dataset_variant.lower().replace("-", "_")
    identifier = f"{normalized}_observations"
    return type(
        f"ObservationStorage_{normalized}",
        (_ObservationColumns,),
        {
            "__module__": __name__,
            "__qualname__": f"ObservationStorage_{normalized}",
            "__tablename__": schema_table_name("example_project", "observations", normalized),
            "__metatable_namespace__": "example-data",
            "__metatable_identifier__": identifier,
            "__metatable_description__": (
                f"{dataset_variant} observations keyed by entity identifier."
            ),
        },
    )

Then pass the concrete class into the node:

class ObservationNode(DataNode):
    def __init__(
        self,
        config: ObservationConfig,
        *,
        hash_namespace: str | None = None,
    ):
        storage_table = observation_storage(config.dataset_variant)
        super().__init__(
            config=config,
            storage_table=storage_table,
            hash_namespace=hash_namespace,
        )

Use this when table identity truly changes. If a value changes only updater scope, keep it in config and write into the same storage table.

Migration Checklist

For each old DataNode:

  1. [ ] Identify the output shape: index columns, payload columns, dtypes, nullable flags, and any SQLAlchemy/Alembic FK metadata.
  2. [ ] Replace old mainsequence.tdag imports with the modern SDK or domain-layer DataNode surface.
  3. [ ] Create a PlatformTimeIndexMetaTable SQLAlchemy storage class.
  4. [ ] Move table identifier, namespace, descriptions, labels, and storage disambiguation to class metadata.
  5. [ ] Move column labels and descriptions to SQLAlchemy column info.
  6. [ ] Replace old inline metadata methods with the storage class.
  7. [ ] Make the node constructor accept config and storage_table.
  8. [ ] Keep DataNodeConfiguration limited to update-scope fields.
  9. [ ] Use Field(...) descriptions and examples for config fields.
  10. [ ] Keep foreign keys as normal SQLAlchemy ForeignKey(...) declarations and use project-prefixed table names for explicit string targets.
  11. [ ] Use a memoized storage-class factory for parameterized table identities.
  12. [ ] Delete manual UID binding, manual data_source_uid threading, and direct register kwargs.
  13. [ ] Add contract tests comparing the storage columns/indexes to the frame returned by update().
  14. [ ] Add an import smoke test for the library package.
  15. [ ] Run mainsequence migrations upgrade --provider <provider> head against an authenticated project to verify storage registration, then run one live update to verify row writes.

Validation Tests To Add

At minimum, add tests for:

  • storage class imports without backend access
  • storage contract columns match the expected output frame
  • index names match the returned DataFrame index names
  • time_index values are timezone-aware UTC
  • parameterized storage factories are memoized
  • different table identities produce different storage hashes
  • DataNodeConfiguration changes alter update_hash only when they should
  • hash_excluded fields do not alter update_hash
  • SQLAlchemy foreign keys remain present in Base.metadata for Alembic but are absent from MetaTable registration contracts

Offline tests should avoid backend calls by monkeypatching the migration registration plumbing or using SDK model constructors for returned metadata. Live tests should run with a real project/session and verify idempotency: a second migration run should reuse the same registered storage table, and a node run should append/update rows according to the node logic.

Common Mistakes

  • Keeping get_table_metadata() or get_column_metadata() on the node.
  • Passing data_source_uid, description, labels, or introspect into platform-managed model register().
  • Putting storage_table in DataNodeConfiguration.
  • Putting backend UIDs or data-source UIDs in config.
  • Using hash_namespace to define real storage identity.
  • Using backend physical table names or target MetaTable UIDs in SQLAlchemy FK declarations.
  • Creating a new dynamic SQLAlchemy class every time instead of memoizing a parameterized storage factory.
  • Declaring config fields without Field(...) descriptions.
  • Using legacy schema extras instead of json_schema_extra={"hash_excluded": True}.