Data Nodes
DataNodes are the core unit of data production in Main Sequence. A good DataNode is not just code that runs - it is a stable data product that other people, jobs, dashboards, and agents can trust.
This guide translates the internal rules into practical, human-friendly guidance.
1) Start with the right mental model
A DataNode has two separate objects:
- a registered
PlatformTimeIndexMetaTablestorage class, backed by a MetaTable UID, that owns the dataset contract - a DataNode update process, identified by
update_hash, that writes into that registered storage table
This is what allows multiple update processes to write safely into the same dataset contract.
Rule of thumb
If a change modifies what the dataset means, change or migrate the
storage contract with Alembic. If a change only modifies how one job updates
data, it should affect update_hash.
Use Alembic from the first version of a DataNode storage table when value columns or other table contract fields are expected to evolve. The MetaTable binding remains catalog metadata; Alembic owns the physical schema migration.
2) Guiding principles
- A DataNode is a product.
- Incremental updates are the default.
- Deterministic behavior beats clever behavior.
- Metadata is required for production-quality tables.
In practice, this means:
- treat
identifierand schema as API contracts, - avoid full-history fetches in production,
- avoid hidden global state and implicit time behavior,
- always document table and columns with metadata.
3) Naming conventions
Use these conventions consistently:
- Class name:
PascalCase(DailyFxRatesECB) - File name:
snake_case.py(daily_fx_rates_ecb.py) - Config model:
<Something>Configbased onDataNodeConfiguration - Published identifier: lowercase
snake_case, readable, and stable when possible (fx_ecb_daily_rates) - Dependency keys: short and descriptive (
"prices","rates","raw")
3.1 Table identifiers must be unique across the organization
Treat a table identifier as an organization-level published name.
That means:
- two teams should not publish different datasets under the same identifier,
- tutorial-style identifiers are likely to collide on shared backends,
- project-specific suffixes are often the safest starting point while learning.
Important architectural point:
identifieris published metadata, not update identity,- it belongs to the registered MetaTable-backed storage contract,
- you can repoint an identifier from one backing table to another without rotating update-process identity.
There is no separate portable_identifier flag in the current SDK. Portability
is the default meaning of the storage-table identifier.
This is useful when you want a portable public handle like daily_prices while
moving the actual storage implementation underneath it.
If you want to inspect existing DataNode table identifiers from the CLI, run:
mainsequence data-node list
The Identifier column lists DataNode table identifiers exposed by TimeIndexMetaTable.
It does not list row-level identity dimension values.
3.2 Labels are organization metadata only
TimeIndexMetaTable objects can also carry labels.
Those labels do not change:
- runtime behavior
- table identity
- hashing
- functionality
Use them only for grouping and discovery.
4) Config design: meaning vs scope vs operational knobs
A simple and scalable pattern is:
- one Pydantic config object for update-scope fields,
- one registered
PlatformTimeIndexMetaTableclass for storage, - make the node constructor accept both
configandstorage_table, - operational runtime knobs outside
__init__.
Preferred constructor shape:
from mainsequence.meta_tables import (
DataNode,
DataNodeConfiguration,
PlatformTimeIndexMetaTable,
)
class MyNodeConfig(DataNodeConfiguration):
...
class MyNode(DataNode):
def __init__(
self,
config: MyNodeConfig,
storage_table: type[PlatformTimeIndexMetaTable],
):
self.my_field = config.my_field
super().__init__(config=config, storage_table=storage_table)
DataNode is strict about this contract. New nodes should not rely on raw
constructor args being reflected back into hashed configuration automatically,
and they should not create storage manually inside update().
The output storage table itself is not config. It remains the explicit
storage_table: type[PlatformTimeIndexMetaTable] constructor argument. If a node
needs to select another DataNode's storage model as a dependency, that
dependency storage reference is update-scope config because changing it changes
the dependency graph. Type that field as type[PlatformTimeIndexMetaTable].
When a PlatformTimeIndexMetaTable class appears inside a config model, the SDK
hashes it by the bound
TimeIndexMetaTable.uid available through
StorageClass.get_time_index_meta_table(). If the class is not yet bound,
config serialization fails and tells the user to run the MetaTable migration
workflow. Do not pass dependency storage classes as extra constructor arguments,
manually attach an existing UID, reconstruct a generic MetaTable, or call
StorageClass.register() directly.
4.1 Storage meaning belongs to the storage table
These define the dataset contract and table identity and should be represented
in the PlatformTimeIndexMetaTable SQLAlchemy model:
Examples:
- frequency (
1m,1d) - price type (
mid,close) - transformation type (log return vs simple return)
- source choice when it changes dataset meaning
- index columns and foreign keys
When the output has a known stable observation frequency, declare
__cadence__ on the PlatformTimeIndexMetaTable model. Cadence is
time-indexed table metadata, not a runtime configuration knob, and should be
included whenever possible so the catalog can expose the intended observation
interval.
4.2 Config fields affect update_hash
Every DataNodeConfiguration field participates in update_hash. Declare a
normal config field when a value changes output values, dependencies, source
choice, or updater scope.
Examples:
asset_universeshard_id- partition keys
Use Field(...) for config fields and include a useful description plus
examples=[...] when possible:
from typing import ClassVar
from pydantic import Field
from mainsequence.meta_tables import DataNodeConfiguration
class PricesConfig(DataNodeConfiguration):
shard_id: str = Field(
...,
description="Stable updater partition for this price job.",
examples=["us_equities_daily"],
)
reference_dimension: ClassVar[str] = "unique_identifier"
Class-level invariants and implementation constants should be ClassVar[...]
so they are not Pydantic fields and do not enter update_hash.
4.3 Runtime values stay out of config
Do not put these in __init__ args.
Examples:
- batch size
- retries/timeouts/backoff
- debug flags
- secrets/credentials
Keep them in env vars or runtime config read inside update().
Published table identity and schema belong on the registered storage class,
not on DataNodeConfiguration.
5) Hashing rules in plain English
The storage table should change only when downstream users should treat the result as a different dataset.
update_hash can change for job-level differences while still writing to the same table.
Example:
PricesBars(frequency="1m")defines table meaning.- Updater A writes one configured row subset.
- Updater B writes another configured row subset.
- Both should share one registered storage table, but have different
update_hash.
5.1) Hash namespaces: what they actually do
DataNode also supports a separate testing and isolation mechanism called hash_namespace.
This is not the same thing as table identity, row identity, or business meaning. It is an extra namespace added to hashing so you can safely isolate runs on a shared backend.
When to use it
Use hash_namespace when you want:
- integration tests that do not collide with production-like tables
- temporary experimentation on a shared tenant
- parallel test runs that must not write into each other's tables
How namespace is resolved
The code resolves namespace in this order:
- explicit
hash_namespace="..." - the active
with hash_namespace("..."):context manager - otherwise, an empty namespace
What changes when the namespace is non-empty
If the namespace is empty, nothing changes and hashes behave exactly as they do in normal production-style runs.
If the namespace is non-empty, DataNode injects hash_namespace into the
build configuration. That changes update_hash.
Storage identity is not created by DataNode. If a test needs isolated
storage, pass a separately registered PlatformTimeIndexMetaTable storage class.
What happens during run()
If a node has a non-empty namespace, run() re-activates that namespace around the full execution.
That matters because dependencies created inside dependencies() will inherit the same namespace automatically during the run.
So the isolation is not only for the top-level node. It propagates across the run tree.
What to prefer
For tests, prefer:
from mainsequence.meta_tables.data_nodes import hash_namespace
with hash_namespace("pytest_case_123"):
node = MyNode(...)
node.run(debug_mode=True, force_update=True)
What not to use it for
Do not use hash_namespace to represent business meaning such as:
- frequency
- venue
- row subset
- transformation logic
Those belong in the actual constructor/config fields and should be reflected through the normal hashing rules.
Think of hash_namespace as test and isolation plumbing, not dataset definition.
6) Update strategy: incremental by default
Use UpdateStatistics to minimize work and control windows intentionally.
6.1 Single-index pattern
- first run: start at
config.offset_startwhen provided - subsequent runs: start at
last_time + frequency_step - daily datasets typically end at yesterday 00:00 UTC
6.2 Multidimensional index pattern
- the first index is the UTC
time_index - every remaining index is an identity dimension
- uniqueness is enforced across the full index tuple by the generated
SQLAlchemy unique index on
PlatformTimeIndexMetaTable - compute incremental start points from canonical
UpdateStatistics - return only new rows, or as close as practical
For a two-dimensional table, the index may be:
["time_index", "entity_uid"]
For a three-dimensional table, include each identity dimension explicitly:
["time_index", "portfolio_uid", "entity_uid"]
The nested UpdateStatistics.index_progress shape follows the identity
dimensions in order. For ["time_index", "account_uid", "unique_identifier"],
the progress path is:
account_uid -> unique_identifier -> timestamp
Use canonical range maps when reading only the needed incremental window:
range_map = update_statistics.get_dimension_range_map_great_or_equal(
identity_dimensions=["account_uid", "unique_identifier"],
)
last_observation = self.get_df_between_dates(dimension_range_map=range_map)
6.3 Backfills
Backfills should be explicit and controlled (separate job/updater, intentional overwrite policy).
6.4 Do not rely on implicit filtering
Even if runtime filtering removes already persisted rows, you should still:
- avoid fetching full history every run,
- avoid returning full history every run.
This is a cost and performance requirement.
7) DataFrame quality rules
Must-have rules:
time_indexis UTC-aware datetime indextime_indexis the observation point of the row, not just a generic date label- rows aligned on the same
time_indexshould be comparable across series in the dataset - for bar datasets,
time_indexshould usually be the right edge of the bar, so the timestamp represents the completed observation window - output columns are lowercase and stable
- payload columns may be
dateor timezone-aware datetime when declared in records;time_indexremains the row observation timestamp - no duplicate rows for same index keys
- consistent dtypes across runs
Recommended rules:
- declare
__cadence__on the storage model when the dataset has a stable observation interval - keep column names short (<= 63 chars)
- avoid mixed
objectdtype when possible - replace
inf/-infwithNaN - keep index sorted ascending
8) Dependencies best practices
Do:
- instantiate dependency nodes in
__init__ - return them in
dependencies() - put dependency storage-table selection in
DataNodeConfiguration - keep dependency graph deterministic
Do not:
- create dependencies inside
update() - pass dependency storage tables as ad hoc constructor arguments
- make dependency construction depend on current time or hidden env state
9) Records, foreign keys, and metadata
For production nodes, define the table contract on the registered
PlatformTimeIndexMetaTable SQLAlchemy model. Use Alembic for physical schema
migrations.
That is the source of truth for:
- index columns
- value columns
- foreign keys
- table identifier, namespace, labels, and description
Declare durable table-level discovery text with __metatable_description__.
The description should explain the table's intention, row grain, and analytical
use, not only list its columns. Column-level descriptions still belong in
mapped_column(info={"description": ...}).
DataNodeConfiguration no longer accepts table metadata or output records.
Stable output contracts are declared on the registered PlatformTimeIndexMetaTable
storage model and exposed through the MetaTable time-indexed profile/contract.
When a DataNode source table should reference another table, declare the
relationship on the PlatformTimeIndexMetaTable storage model with normal
SQLAlchemy ForeignKey(...) metadata. Foreign keys are not part of
DataNodeConfiguration and are not serialized into the platform-managed
MetaTable registration contract. Alembic, SQLAlchemy, and the database own the
physical FK DDL. Prefer project-prefixed table names when using explicit FK
string targets so project tables do not collide in shared schemas.
Log useful operational facts:
- chosen update window,
- number of assets processed,
- rows produced,
- data coverage issues.
Avoid logging secrets.
10.1) Searchability and semantic discovery
Good metadata is not just for humans reading code. It also powers search and discovery across published data nodes.
TimeIndexMetaTable exposes separate discovery and lookup paths:
description_search(q, ...)column_search(q, ...)
Use them differently:
description_search(...)is the default semantic discovery path for publishedDataNodestorage andMetaTablemetadata:- "close price"
- "daily allocation weights"
- "crypto funding rates"
column_search(...)is for schema-oriented discovery:- "close"
- "unique_identifier"
- "implied_volatility"
description_search(...)
This search hits:
GET <object_url>/description-search/?q=...
Server behavior:
- if
q_embeddingis omitted, the server generates it fromq - results are returned as paginated metadata with
count,next,previous, andresults
The ranking blends two signals:
- trigram similarity
- embedding similarity
That is why the method exposes:
trigram_kembed_kw_trgmw_embembedding_model
Use this when the user knows what they want conceptually, but not the exact table name.
Example:
import mainsequence.client as msc
results = msc.TimeIndexMetaTable.description_search(
"daily close price",
data_source__id=2,
)
CLI equivalent:
mainsequence data_node search "daily close price" --data-source-id 2
The CLI also exposes the ranking knobs:
mainsequence data-node search "daily btc price table" \
--trigram-k 200 \
--embed-k 200 \
--w-trgm 0.65 \
--w-emb 0.35
The response shape is metadata, not raw table rows:
{
"count": 2,
"next": null,
"previous": null,
"results": [
{
"orm_class": "TimeIndexMetaTable",
"uid": "0b0f5733-b1ee-4d09-b51f-9e29073007fd",
"data_source_uid": "864e7c22-482a-464a-8758-0d3408abd77f",
"identifier": "ms_markets__externalpricests__mainsequence_examples",
"namespace": "mainsequence.examples",
"description": "...",
"physical_table_name": "ms_markets__externalpricests__mainsequence_examples",
"table_contract": {},
"columns": [],
"indexes_meta": [],
"foreign_keys": [],
"cadence": "1d",
"time_indexed_profile": {}
}
]
}
column_search(...)
This search hits:
GET <object_url>/column-search/?q=...
Extra keyword arguments are passed through as normal DRF filters, which makes it useful when you want to constrain the search to a known area such as one data source or one identifier family.
Use this when the user remembers a column name or schema fragment, but not the data node name. Do not use this as the default semantic dataset discovery path.
Example:
import mainsequence.client as msc
results = msc.TimeIndexMetaTable.column_search(
"close",
data_source__id=2,
)
CLI equivalent:
mainsequence data_node search "close" --mode column --data-source-id 2
Refreshing the search index
refresh_table_search_index() exists for the cases where search quality depends on metadata or code that changed after the table was created.
This method:
- joins the table's column definitions with the code used to generate the data node
- builds one consolidated textual description
- embeds that description into the vector representation used for semantic search
It hits:
POST /{data_node_storage_uid}/refresh-table-search-index/
Use it when:
- you improved
get_table_metadata()orget_column_metadata() - you changed code comments or table-generation logic in a way that should improve discovery
- search results feel stale compared with the current node implementation
Example:
import mainsequence.client as msc
storage = msc.TimeIndexMetaTable.get(uid="<DATA_NODE_STORAGE_UID>")
storage.refresh_table_search_index()
CLI equivalent:
mainsequence data-node refresh-search-index <DATA_NODE_STORAGE_UID>
Running Read-Only SQL Against A Time-Indexed Table
TimeIndexMetaTable.run_query(...) executes a raw SQL query directly against one
published time-indexed table.
This is for inspection and diagnostics on the storage that already exists. It is not a substitute for building a reusable DataNode API contract.
The SDK uses:
POST /orm/api/ts_manager/dynamic_table/{data_node_storage_uid}/run_query/
Request contract:
- the request body is the raw SQL string
- content type is
text/plain - do not send JSON like
{"sql": "SELECT ..."} - do not send
X-MS-SYNC-TOKEN
Example:
import mainsequence.client as msc
storage = msc.TimeIndexMetaTable.get(uid="<DATA_NODE_STORAGE_UID>")
result = storage.run_query("SELECT * FROM my_table LIMIT 100")
Expected success envelope:
{
"ok": True,
"query_id": "abc123",
"meta_table_uid": "<DATA_NODE_STORAGE_UID>",
"results": [
{
"column_a": "value",
"column_b": 10,
}
],
"truncated": False,
"max_rows": 1000,
"row_count": 1,
"error": None,
}
The method returns the backend query envelope directly. If the backend rejects the SQL with a structured error payload, the SDK still returns that envelope so callers can inspect error.kind, error.message, and retryable.
CLI:
mainsequence data-node run_query <DATA_NODE_STORAGE_UID> "SELECT 1 AS ok"
mainsequence data-node run_query <DATA_NODE_STORAGE_UID> "SELECT * FROM my_table LIMIT 100"
Tail deleting rows after a cutoff
TimeIndexMetaTable.delete_after_date(...) removes the tail of a time-indexed
table starting at an inclusive cutoff timestamp.
This is not arbitrary range deletion:
- there is no
end_date - the cutoff is inclusive
- rows at or after
after_dateare deleted - the caller must be authenticated and have edit access to the storage table
The SDK uses:
POST /orm/api/ts_manager/dynamic_table/{data_node_storage_uid}/delete_after_date/
Use this for rollback-style cleanup when a bad tail load or backfill needs to be removed.
For a normal table:
import mainsequence.client as msc
storage = msc.TimeIndexMetaTable.get(uid="<DATA_NODE_STORAGE_UID>")
result = storage.delete_after_date("2026-04-01T00:00:00Z")
For a multidimensional table, scope the delete to one identity coordinate:
result = storage.delete_after_date(
"2026-04-01T00:00:00Z",
dimension_filters={"entity_uid": ["entity-a"]},
)
Or scope it to multiple identity coordinates:
result = storage.delete_after_date(
"2026-04-01T00:00:00Z",
dimension_filters={"entity_uid": ["entity-a", "entity-b"]},
)
For a three-index holdings table, scope the delete to one coordinate:
result = storage.delete_after_date(
"2026-04-01T00:00:00Z",
index_coordinates=[
{
"portfolio_uid": "00000000-0000-0000-0000-000000000001",
"entity_uid": "entity-a",
}
],
)
The response contains the authoritative post-delete table state:
{
"ok": True,
"meta_table_uid": "<DATA_NODE_STORAGE_UID>",
"deleted_count": 123,
"table_empty": False,
"dimension_filters": {"unique_identifier": ["AAPL", "MSFT"]},
"stats": {
"last_time_index_value": "2026-03-31T23:59:00Z",
"earliest_index_value": "2024-01-01T00:00:00Z",
"multi_index_stats": None,
"multi_index_column_stats": None,
},
}
Use stats to update visible table metadata, or refetch the table detail after the delete.
Search quality depends on metadata quality
If you want description_search(...) to work well, write table and column metadata for humans, not just for machines. Clear descriptions, stable naming, and meaningful column docs directly improve discovery.
11) Testing safely
When tests hit shared backends, isolate update hashes and use the intended storage table explicitly.
Use with hash_namespace("..."): with an explicit, collision-resistant
namespace.
This intentionally changes update_hash so tests do not collide with
production-like update records. It does not create or select storage for you.
Keep test runs bounded:
- prefer a narrow
config.offset_startfor test runs, - or pass controlled update statistics/checkpoints,
- keep integration tests small and deterministic.
Example: keep integration tests in tests/ and set config.offset_start
In real projects, keep these tests in the tests/ folder, for example:
tests/test_my_node.py
A practical pattern is to keep the production class unchanged, pass a narrow
offset_start in the test config, and run the node inside a namespace:
from mainsequence.meta_tables.data_nodes import hash_namespace
from src.data_nodes.my_node import MyNode, MyNodeConfig, MyNodeStorage
def test_my_node_smoke():
config = MyNodeConfig(
...,
offset_start="2025-01-01T00:00:00+00:00",
)
with hash_namespace("pytest_my_node_smoke"):
node = MyNode(config=config, storage_table=MyNodeStorage)
err, df = node.run(debug_mode=True, force_update=True)
assert err is False
assert df is not None
Why this pattern works well:
- the namespace isolates update hashes from production-like update records
- the registered storage table keeps the table contract explicit
- the production node stays unchanged
- the narrower
config.offset_startprevents large first-run backfills during tests
If you already have older nodes that still use class-level OFFSET_START, that
fallback remains supported, but config-driven offset_start is the preferred
pattern for new code and new documentation.
12) Schema evolution policy
Schema is an API contract and should be treated as immutable.
Safe changes:
- metadata text updates,
- tags/documentation improvements,
- bug fixes that do not change schema/meaning.
Breaking changes (create a new table identifier instead):
- adding/removing/renaming columns,
- changing dtypes,
- changing index shape,
- changing meaning of existing fields.
Never make breaking contract changes in place
If semantics or the table contract changes, publish a new table identifier or apply an Alembic schema migration and refresh the MetaTable catalog binding separately.
13) Security basics
- keep credentials in env vars or secret management,
- never hardcode API keys,
- validate external inputs,
- prefer batched calls over excessive network chatter.
13.1) Reading an existing table from the client
When you are consuming a table that already exists in the platform, the first question is simple:
"Do I just want to read one known table, or do I need a more flexible query?"
Start with APIDataNode.build_from_identifier(...) when you already know the published table identifier.
from mainsequence.meta_tables import APIDataNode
prices = APIDataNode.build_from_identifier("simulated_prices_tutorial")
df = prices.get_df_between_dates(
start_date="2026-01-01",
columns=["open", "high", "low", "close"],
dimension_filters={"unique_identifier": ["NVDA", "AAPL"]},
)
This helper resolves the table metadata for you and returns an APIDataNode that is ready for normal read methods.
Use it when:
- you are reading one table by its stable identifier,
- the read shape is fairly standard,
- you want the shortest path from published dataset to DataFrame.
This is especially useful in dashboards, notebooks, and integration code where the table already exists and you are not constructing a dependency graph.
14) Quick pre-ship checklist
Before shipping a DataNode, verify:
- identifier is stable and globally meaningful,
- config fields are correctly split across meaning/scope/runtime,
update()is incremental and usesUpdateStatistics,- assets are resolved for 2D tables,
- metadata methods are implemented,
- schema is intentional and stable,
- logs are useful and secret-safe,
- integration test runs in an isolated namespace.