How to Get Started with Substreams-Sink Library

Substreams By Jan 03, 2024 No Comments

The Substreams-Sink Library is a tool that makes it easy to create sinks in TypeScript to route blockchain data to any destination you need.

Whether you need to stream data to a database, a messaging queue, or a text file, the Substreams-Sink Library has you covered.

This article will cover how to install Substreams-Sink, its benefits, and how to create and use sinks.

What is Substreams?

Substreams is an indexing technology that builds on top of Firehose, and it allows developers to write code executed on the Firehose server to define exact data to be streamed for use in many different kinds of sinks.

Examples:

A developer could use Substreams to create a stream of:

  • All new transactions on a blockchain. This stream could then be used to power a real-time wallet or exchange.
  • All events related to a particular smart contract. This stream could then be used to power a dashboard that tracks the activity of the smart contract.
  • All data relevant to a particular analytical tool. This stream could then be used to power the tool’s real-time analytics.

What is a Substreams sink?

Sinks are the final destinations for blockchain data acquired through Substreams. They can route the data anywhere, including databases, messaging queue solutions, and logging solutions, etc. Here are some examples of where a sink can send data:

  • In SQL or NoSQL databases, such as MySQL, PostgreSQL, MongoDB, and Redis.
  • Send messages to messaging queue solutions, such as RabbitMQ and Kafka. This can be useful for decoupling applications and making them more scalable.
  • Send data to logging solutions, such as Elasticsearch. This can be useful for debugging and troubleshooting applications.
  • Send messages to a Discord channel containing blockchain data on a specific message channel. This could be used to create a bot that keeps users informed of the latest blockchain activity.
  • Anywhere!

Sinks can be developed using any programming language. However, the Substreams-Sink Library provides several benefits:

  • Removes duplicate code between sinks.
  • Comes with a bunch of built-in command-line options.
  • Provides Prometheus metrics.
  • Has a built-in logger.
  • Handles the cursor automatically.
  • Makes it easy to update and stay consistent between sinks during development.
  • Reduces the amount of boilerplate code when creating a new sink.
💡 Using the Substreams-Sink Library, developers can easily create sinks to route blockchain data to any desired destination.

Create your sink

This section covers how to quickly create a text file sink in TypeScript using the Substreams-Sink Library in this tutorial.

Prerequisites:

  • Node.js
  • TypeScript
  • Substreams CLI

Steps:

1. Create a new Node.js project (in this example we will create a text file sink):

mkdir substreams-sink-text
cd substreams-sink-text
npm init -y

2. Install the substreams-sink library:

npm install substreams-sink

3. Install TypeScript dependencies:

npm install -D typescript ts-node

4. Add a build script and start script to your package.json file:

{
  "scripts": {
    "start": "node index.js",
    "build": "tsc"
  }
}

5. Create a tsconfig.json file with the following contents:

{
    "extends": "@tsconfig/recommended/tsconfig.json",
    "compilerOptions": {
        "target": "ESNext",
        "module": "nodenext",
        "moduleResolution": "nodenext",
        "resolveJsonModule": true,
        "declaration": true,
        "declarationMap": true,
        "sourceMap": true,
        "verbatimModuleSyntax": true,
    },
}

6. Create an index.ts file and add the following code:

import { setup, logger, commander, http } from "substreams-sink";
import fs from "fs";

import pkg from "./package.json" assert { type: "json" };

logger.setName(pkg.name);
export { logger };

// Add cli options
const program = commander.program(pkg);
const command = commander.run(program, pkg);

command.requiredOption('--text-file-path <string>', 'Path for the text file');

command.action(action);
program.parse();

// Custom user options interface
interface ActionOptions extends commander.RunOptions {
    textFilePath: string
}

async function action(options: ActionOptions) {
    // Setup sink for Block Emitter
    const { emitter, substreamPackage, moduleHash, startCursor } = await setup(options);

    // Get command options
    const { textFilePath } = options;

    // Stream Blocks
    emitter.on("anyMessage", (message) => {
        let text = JSON.stringify(message.operations);
        logger.info(text);
        fs.appendFileSync(textFilePath, `${text}\n`);
    });

    // Setup HTTP server & Prometheus metrics
    http.listen(options);

    // Start streaming
    await emitter.start();
    http.server.close();
}import { setup, logger, commander, http } from "substreams-sink";
import fs from "fs";

import pkg from "./package.json" assert { type: "json" };

logger.setName(pkg.name);
export { logger };

// Add cli options
const program = commander.program(pkg);
const command = commander.run(program, pkg);

command.requiredOption('--text-file-path <string>', 'Path for the text file');

command.action(action);
program.parse();

// Custom user options interface
interface ActionOptions extends commander.RunOptions {
    textFilePath: string
}

async function action(options: ActionOptions) {
    // Setup sink for Block Emitter
    const { emitter, substreamPackage, moduleHash, startCursor } = await setup(options);

    // Get command options
    const { textFilePath } = options;

    // Stream Blocks
    emitter.on("anyMessage", (message) => {
        let text = JSON.stringify(message.operations);
        logger.info(text);
        fs.appendFileSync(textFilePath, `${text}\n`);
    });

    // Setup HTTP server & Prometheus metrics
    http.listen(options);

    // Start streaming
    await emitter.start();
    http.server.close();
}

Detailed code explanation

import { setup, logger, commander, http } from "substreams-sink";
import fs from "fs";

import pkg from "./package.json" assert { type: "json" };

logger.setName(pkg.name);
export { logger };

This code imports the necessary libraries for the Substreams sink, including the setup() function, the logger object, the commander library, and the http library. It also imports the fs library to write to a text file.

The pkg variable contains the contents of the package.json file, which is used to set the name of the logger.

// Add cli options
const program = commander.program(pkg);
const command = commander.run(program, pkg);

command.requiredOption('--text-file-path <string>', 'Path for the text file');

command.action(action);
program.parse();

This code adds a command-line option to the sink for the path to the text file. It also parses the command-line options.

// Custom user options interface
interface ActionOptions extends commander.RunOptions {
    textFilePath: string
}

This code defines a custom user options interface that extends the commander.RunOptions interface. This interface includes a new property called textFilePath, which is the path to the text file.

async function action(options: ActionOptions) {
    // Setup sink for Block Emitter
    const { emitter, substreamPackage, moduleHash, startCursor } = await setup(options);

This code sets up the sink for the block emitter. The setup() function returns an object containing the following properties:

  • emitter: The block emitter object.
  • substreamPackage: The Substreams package object.
  • moduleHash: The hash of the Substreams module.
  • startCursor: The start cursor for the stream.
// Get command options
const { textFilePath } = options;

This code gets the path to the text file from the command-line options.

// Stream Blocks
    emitter.on("anyMessage", (message) => {
        let text = JSON.stringify(message.operations);
        logger.info(text);
        fs.appendFileSync(textFilePath, `${text}\\n`);
    });

This code streams blocks from the block emitter. For each message, it converts the message’s operations to JSON and writes them to the text file.

// Setup HTTP server & Prometheus metrics
http.listen(options);

This code starts the HTTP server and Prometheus metrics.

// Start streaming
await emitter.start();
http.server.close();

This code starts the block emitter and closes the HTTP server when the stream is finished.

Conclusion

The library is easy to use and offers a number of features and benefits, such as flexibility, performance, and built-in logging and metrics.

We want to create more sinks using the Substreams-Sink Library and encourage the community to use it to create new sinks.

Author

A passionate, highly organized, innovative Open source Technical Documentation Engineer with 4+ years of experience crafting internal and user-facing support/learning documentation. Leverages a background in computer science to write for highly technical audiences and API docs and is the leader of the technical writing mentorship program.

No Comments

Leave a comment

Your email address will not be published. Required fields are marked *