Skip to main content
You're viewing v3 documentation

This is the v3 HyperIndex documentation. Still on an older version? Open the v2 documentation and consider migrating to v3.

RabbitMQ

Publish decoded events to a RabbitMQ exchange — direct, topic, or fanout — using amqplib.

Installation

pnpm add amqplib
pnpm add -D @types/amqplib

Configure the client

src/clients/rabbitmq.ts
import amqp from "amqplib";

let chanPromise: Promise<amqp.Channel> | undefined;

export const getChannel = () =>
(chanPromise ??= (async () => {
const conn = await amqp.connect(process.env.ENVIO_RABBITMQ_URL!);
const ch = await conn.createChannel();
await ch.assertExchange("transfer", "direct", { durable: true });
return ch;
})());

Define the effect

Exchange and routing key are static config — bake them in. input is just the per-event values.

src/effects/rabbitmq.ts
import { createEffect, S } from "envio";
import { getChannel } from "../clients/rabbitmq";

const EXCHANGE = "transfer";
const ROUTING_KEY = "my-routing-key";

export const publishTransfer = createEffect(
{
name: "publishTransfer",
input: {
from: S.string,
to: S.string,
value: S.bigint,
txHash: S.string,
},
rateLimit: { calls: 200, per: "second" },
mode: "unorderedAfterCommit",
},
async ({ input }) => {
const ch = await getChannel();
const body = JSON.stringify({ ...input, value: input.value.toString() });
ch.publish(EXCHANGE, ROUTING_KEY, Buffer.from(body), {
contentType: "application/json",
persistent: true,
});
}
);

Call it from a handler

The rindexer config…

streams:
rabbitmq:
exchanges:
- exchange: transfer
exchange_type: direct
routing_key: my-routing-key
events:
- event_name: Transfer
conditions:
- "value": ">=2000000000000000000 && value <=4000000000000000000"
- "from": "0x0338ce5020c447f7e668dc2ef778025ce3982662"

…becomes:

src/handlers/RocketPoolETH.ts
import { indexer } from "envio";
import { publishTransfer } from "../effects/rabbitmq";

const WHALE = "0x0338ce5020c447f7e668dc2ef778025ce3982662";
const MIN = 2_000_000_000_000_000_000n;
const MAX = 4_000_000_000_000_000_000n;

indexer.onEvent(
{ contract: "RocketPoolETH", event: "Transfer" },
async ({ event, context }) => {
const { from, to, value } = event.params;

if (from.toLowerCase() === WHALE && value >= MIN && value <= MAX) {
context.effect(publishTransfer, {
from,
to,
value,
txHash: event.transaction.hash,
});
}
},
);

Pick orderedAfterCommit if your consumer needs strict per-batch ordering. For fanout exchanges, leave routingKey as an empty string. If your consumer is idempotent and you want lower latency, switch the effect to mode: "unordered" (or "ordered") to publish inline before the DB commit.