Streaming Table Deltas¶
The Hyperion Stream Client enables you to subscribe to a stream of table delta traces, providing real-time or historical insight into how data within smart contract tables changes over time. This is invaluable for tracking state changes, such as account balances, NFT ownership, game states, or any data stored in on-chain tables.
You initiate a table delta stream using the client.streamDeltas(request) method.
client.streamDeltas(request)¶
This asynchronous method sends a request to the Hyperion server to start streaming table delta traces based on the
criteria defined in the request object. It returns a Promise that resolves to a HyperionStream instance, which you
can then use to listen for messages.
import { HyperionStreamClient, IncomingData, DeltaContent } from "@eosrio/hyperion-stream-client";
// Assuming 'client' is an initialized and connected HyperionStreamClient instance
try {
const deltaStream = await client.streamDeltas({
code: "eosio.token",
table: "accounts",
scope: "USER_ACCOUNT_NAME", // Optional: filter by table scope
start_from: 0, // Start from HEAD block
read_until: 0, // Stream indefinitely
});
console.log("Delta stream request successful. UUID:", deltaStream.reqUUID);
deltaStream.on("message", (message: IncomingData<DeltaContent>) => {
console.log("Received delta:", message.content.data);
console.log(` Present (row exists after delta): ${message.content.present === 1}`);
});
deltaStream.on("error", (error) => {
console.error("Stream error:", error);
});
} catch (error) {
console.error("Failed to initiate delta stream:", error);
}
Request Parameters (StreamDeltasRequest)¶
The request object passed to client.streamDeltas() can contain the following properties:
code¶
- Type:
string - Required: Yes
- Description: The account name of the smart contract that owns the table(s) you want to monitor.
- Example:
"eosio.token","atomicassets"
table¶
- Type:
string - Required: Yes
- Description: The name of the table within the
codecontract for which you want to stream deltas.- You can specify a single table name (e.g.,
"accounts"). - Use an asterisk (
"*") to stream deltas from all tables within the specifiedcodecontract.
- You can specify a single table name (e.g.,
- Example:
"accounts","assets","*"
scope¶
- Type:
string - Optional: Yes
- Default:
""(empty string, meaning deltas from any scope within the table will be included) - Description: The specific scope within the table to filter by. Many contracts use account names as scopes.
- Example:
"myuseraccnt1","eosio"
payer¶
- Type:
string - Optional: Yes
- Default:
""(empty string, meaning deltas paid for by any account will be included) - Description: Filter deltas by the account name that paid for the RAM for the table row.
- Example:
"rampayeracct"
start_from¶
- Type:
number | string - Optional: Yes
- Default:
0 - Description: Defines the starting point of the stream.
0: Start from the current head block of the blockchain (live streaming).- Positive Number: Start from this specific absolute block number (e.g.,
150000000). - Negative Number: Start from a block relative to the current head block. For example,
-100starts streaming 100 blocks before the current head. - ISO 8601 Timestamp String: Start from the block closest to the specified date and time (e.g.,
"2023-01-01T00:00:00.000Z").
- See Also: Block Range Parameters for a more detailed explanation.
read_until¶
- Type:
number | string - Optional: Yes
- Default:
0 - Description: Defines the ending point of the stream.
0: Stream indefinitely (or untilignore_live: trueand historical data is complete).- Positive Number: Stop at this specific absolute block number.
- Negative Number: Stop at a block relative to the current head block.
- ISO 8601 Timestamp String: Stop at the block closest to this specified date and time.
- See Also: Block Range Parameters
filters¶
- Type:
RequestFilter[](Array ofRequestFilterobjects) - Optional: Yes
- Default:
[](empty array, no additional data filters) - Description: An array of filter objects to perform server-side filtering based on the content of the table row
data. Each
RequestFilterobject has:field(string): A dot-notation path to the field within the delta'sdataobject (e.g.,"data.balance","payer","primary_key"). Hyperion might also provide special@prefixed fields for decoded data from specific common tables (e.g.,@accounts.balance).value(string | number | boolean): The value to compare against.operator(string, optional): The comparison operator. Defaults to"eq"(equals). Supported operators include:"eq": equals"ne": not equals"gt": greater than"lt": less than"gte": greater than or equal to"lte": less than or equal to"contains": (for string fields) field contains the value"starts_with": (for string fields) field starts with the value"ends_with": (for string fields) field ends with the value
- Example:
filters: [ { field: "data.owner", value: "myuseraccnt1" }, // Row's owner field === 'myuseraccnt1' { field: "data.value", value: 100, operator: "gt" } // Row's value field > 100 { field: "payer", value: "otheraccount" } // RAM payer for the row is 'otheraccount' ] - Note: Refer to
Hyperion's index template definitions
to understand available indexed fields for deltas and the structure of
data.
filter_op¶
- Type:
'and' | 'or' - Optional: Yes
- Default:
"and" - Description: Specifies the logical operator to use when multiple
filtersare provided."and": All filter conditions must be met."or": At least one filter condition must be met.
- Example:
// Deltas where data.fieldA IS 'X' OR data.fieldB IS 'Y' filters: [ { field: "data.fieldA", value: "X" }, { field: "data.fieldB", value: "Y" } ], filter_op: "or"
ignore_live¶
- Type:
boolean - Optional: Yes
- Default:
false - Description:
- If
true, the stream will stop after all historical data (up toread_untilor the current head block ifread_untilis 0) has been sent. It will not send live blocks. - If
false, the stream will transition to sending live blocks after historical data is complete.
- If
replayOnReconnect¶
- Type:
boolean - Optional: Yes
- Default:
false - Description:
- If
true, and the client disconnects and then reconnects, it will attempt to resend this stream request, automatically adjustingstart_fromto the block number after the last successfully received block for this stream. - If
false, the stream request will not automatically replay data from the offline time on reconnect. Current live data will keep streaming after reconnection, but you will miss the data from the offline period.
- If
- Recommendation: Set to
trueif you need the full missed sequence of data even after reconnections.
Handling the Delta Stream¶
The await client.streamDeltas(request) method returns a HyperionStream object. You interact with it similarly to
action streams:
-
stream.on("message", (data: IncomingData<DeltaContent>) => { ... }): This event is fired for each table delta received. Thedataobject contains:data.uuid: The unique identifier for this stream request.data.type: Will be"delta".data.mode:"live"or"history".data.content: ADeltaContentobject containing details of the table row change (e.g.,code,table,scope,data,present).data.content.present:1if the row exists (or was created/updated) after this delta,0if the row was deleted.
data.irreversible:trueif this message is confirmed irreversible (relevant ifclient.options.libStreamis true). See Handling Stream Data for more onDeltaContent.
-
stream.on("error", (error: any) => { ... }): Fired if an error occurs specific to this stream. -
stream.on("start", (response: { status: string, reqUUID: string, startingBlock: number | string }) => { ... }): Fired when the Hyperion server acknowledges and successfully starts the stream request.
Stopping a Delta Stream¶
To stop receiving data for a specific delta stream:
// Assuming 'deltaStream' is the object returned by client.streamDeltas()
deltaStream.stop();
console.log("Requested to stop delta stream:", deltaStream.reqUUID);
Examples¶
1. Monitor Live Changes to a Specific Account's Balances¶
const stream = await client.streamDeltas({
code: "eosio.token",
table: "accounts",
scope: USER_NAME, // Scope is often the user's account name for balances
start_from: 0, // Live
replayOnReconnect: true
});
stream.on("message", (msg) => {
console.log(`Live balance update for ${USER_NAME}:`, msg.content.data.balance);
});
2. Get Historical Deltas for a Specific Table with Data Filtering¶
const stream = await client.streamDeltas({
code: "atomicassets", // Example contract
table: "assets", // Example table
start_from: "2023-01-01T00:00:00.000Z",
read_until: "2023-01-02T00:00:00.000Z",
ignore_live: true,
filters: [
{ field: "data.owner", value: OWNER_NAME }
]
});
console.log(`Fetching historical asset deltas for owner ${OWNER_NAME}:`);
for await (const message of stream) {
if (message === null) break;
console.log(` Asset ID ${message.content.primary_key}:`, message.content.data);
}
Next Steps¶
- Handling Stream Data: Learn more about the
DeltaContentstructure and using the AsyncIterator pattern. - Streaming Actions: If you need to track contract actions rather than table state.
- Client Configuration: Review all Client Configuration options.