RabbitMQEventBus
Let create a bus that uses RabbitMQ as the message broker.
We define our custom class, RabbitMQEventBus that use RabbitMQ as the message broker. In the constructor function, we create an instance of GraphQLEventbus class. While most of the constructor argument fields are simply passed to the GraphQLEventbus, we have to implement the custom logic publishing, subscribing, and initialization for RabbitQM. First, we create a topic exchange (idempotent operation) and initialize consume and publish channel depending on if instance of the bus publisher and/or consumes (see the init method).
Step 1: We publish an event with the routing key as the name of the topic. The
baggage, which is an object with the event payload and the metadata is serialized string usingJSON.stringifyand published as a buffer.Step 2: We bind the
consumeChannelwith all the topics that that the consumer has subscribed to. Note that to consume an event inRabbitMQ, we have to create a queue and bind it to a channel. Although not required, to load balance across multiple instance of the same service, we use a unique queue name for each service. Once we idempotently create the queue, we find all the topics that we subscribe to using thebindQueuemethod on theconsumeChannel. Finally, we callconsumemethod on the channel and invote theDataCbcallback every time a message is received. We deserialize the payload and pass theBaggageto the callback. Finally, once the message is consumed successfully, weackthe message.Step 3: We add an
initmethod to initialize our bus. We must initialize theGraphQLEventbus. In addition, we also initialize the connection and create the channels for publishing and consuming events. We also add acloseConsumerand aclosePublishermethod to close these channels when we close the connection. Finally, we expose a methodpublishto publish events. It simply callspublishonGraphQLEventbusand passes through the topic, payload, and metadata to be propagated.
import amqp from "amqplib";
import { DocumentNode, GraphQLSchema } from "graphql";
import {
EventBusPlugin,
EventBusSubscriberCb,
GraphQLEventbus,
GraphQLEventbusMetadata,
} from "graphql-eventbus";
export type RabbitMQEventBusConfig = {
publisher?: {
schema: GraphQLSchema;
};
subscriber?: {
schema: GraphQLSchema;
queries: DocumentNode;
cb: EventBusSubscriberCb;
};
serviceName: string;
plugins?: EventBusPlugin[];
};
const EXCHANGE = "event-hub";
const QUEUE_INITIALS = "graphql-eventbus-";
export class RabbitMQEventBus {
bus!: GraphQLEventbus;
connection!: amqp.Connection;
consumeChannel?: amqp.Channel;
publishChannel?: amqp.Channel;
constructor(public config: RabbitMQEventBusConfig) {
const queueName = `${QUEUE_INITIALS}-${config.serviceName}`;
this.bus = new GraphQLEventbus({
plugins: config.plugins,
publisher: config.publisher
? {
schema: config.publisher?.schema,
publish: async (args) => {
// Step 1
this.publishChannel?.publish(
EXCHANGE,
args.topic,
Buffer.from(
JSON.stringify({
payload: args.baggage.payload,
metadata: args.baggage.metadata,
}),
),
);
},
}
: undefined,
subscriber: config.subscriber
? {
schema: config.subscriber.schema,
queries: config.subscriber.queries,
cb: config.subscriber.cb,
subscribe: (topics, dataCb) => {
// Step 2
this.consumeChannel
?.assertQueue(queueName, {
exclusive: false,
})
.then((q) => {
topics.forEach((topic) => {
this.consumeChannel?.bindQueue(queueName, EXCHANGE, topic);
});
this.consumeChannel?.consume(queueName, (msg) => {
if (msg?.content) {
dataCb({
baggage: JSON.parse(msg!.content.toString("utf-8")),
topic: msg?.fields.routingKey!,
})
.then(() => {
this.consumeChannel?.ack(msg);
})
.catch((e) => {
this.consumeChannel?.nack(msg);
throw e;
});
}
});
});
},
}
: undefined,
});
}
// Step 3: Startup logic
init = async () => {
const connection = await amqp.connect("amqp://localhost");
if (this.config.publisher) {
const channel = await connection.createChannel();
await channel.assertExchange(EXCHANGE, "topic", {
durable: false,
});
this.publishChannel = channel;
}
if (this.config.subscriber) {
const channel = await connection.createChannel();
await channel.assertExchange(EXCHANGE, "topic", {
durable: false,
});
this.consumeChannel = channel;
}
this.connection = connection;
await this.bus.init();
};
publish = (args: {
payload: {};
topic: string;
metadata?: Partial<GraphQLEventbusMetadata>;
}) => {
return this.bus.publish({
payload: args.payload,
topic: args.topic,
metadata: args.metadata,
});
};
// Step 3: Close consumer logic
closeConsumer = async () => {
if (this.consumeChannel) {
await this.consumeChannel.close();
}
};
// Step 3: Close publisher logic
closePublisher = async () => {
if (this.publishChannel) {
await this.publishChannel.close();
}
};
// Step 3: Close the connection
close = async () => {
this.connection.close();
};
}