RabbitMQEventBus
Let create a bus that uses RabbitMQ as the message broker.
We define our custom class, RabbitMQEventBus
that use RabbitMQ
as the message broker. In the constructor function, we create an instance of GraphQLEventbus
class. While most of the constructor argument fields are simply passed to the GraphQLEventbus
, we have to implement the custom logic publishing, subscribing, and initialization for RabbitQM
. First, we create a topic exchange (idempotent operation) and initialize consume and publish channel depending on if instance of the bus publisher and/or consumes (see the init
method).
Step 1: We publish an event with the routing key as the name of the topic. The
baggage
, which is an object with the event payload and the metadata is serialized string usingJSON.stringify
and published as a buffer.Step 2: We bind the
consumeChannel
with all the topics that that the consumer has subscribed to. Note that to consume an event inRabbitMQ
, we have to create a queue and bind it to a channel. Although not required, to load balance across multiple instance of the same service, we use a unique queue name for each service. Once we idempotently create the queue, we find all the topics that we subscribe to using thebindQueue
method on theconsumeChannel
. Finally, we callconsume
method on the channel and invote theDataCb
callback every time a message is received. We deserialize the payload and pass theBaggage
to the callback. Finally, once the message is consumed successfully, weack
the message.Step 3: We add an
init
method to initialize our bus. We must initialize theGraphQLEventbus
. In addition, we also initialize the connection and create the channels for publishing and consuming events. We also add acloseConsumer
and aclosePublisher
method to close these channels when we close the connection. Finally, we expose a methodpublish
to publish events. It simply callspublish
onGraphQLEventbus
and passes through the topic, payload, and metadata to be propagated.
import amqp from "amqplib";
import { DocumentNode, GraphQLSchema } from "graphql";
import {
EventBusPlugin,
EventBusSubscriberCb,
GraphQLEventbus,
GraphQLEventbusMetadata,
} from "graphql-eventbus";
export type RabbitMQEventBusConfig = {
publisher?: {
schema: GraphQLSchema;
};
subscriber?: {
schema: GraphQLSchema;
queries: DocumentNode;
cb: EventBusSubscriberCb;
};
serviceName: string;
plugins?: EventBusPlugin[];
};
const EXCHANGE = "event-hub";
const QUEUE_INITIALS = "graphql-eventbus-";
export class RabbitMQEventBus {
bus!: GraphQLEventbus;
connection!: amqp.Connection;
consumeChannel?: amqp.Channel;
publishChannel?: amqp.Channel;
constructor(public config: RabbitMQEventBusConfig) {
const queueName = `${QUEUE_INITIALS}-${config.serviceName}`;
this.bus = new GraphQLEventbus({
plugins: config.plugins,
publisher: config.publisher
? {
schema: config.publisher?.schema,
publish: async (args) => {
// Step 1
this.publishChannel?.publish(
EXCHANGE,
args.topic,
Buffer.from(
JSON.stringify({
payload: args.baggage.payload,
metadata: args.baggage.metadata,
}),
),
);
},
}
: undefined,
subscriber: config.subscriber
? {
schema: config.subscriber.schema,
queries: config.subscriber.queries,
cb: config.subscriber.cb,
subscribe: (topics, dataCb) => {
// Step 2
this.consumeChannel
?.assertQueue(queueName, {
exclusive: false,
})
.then((q) => {
topics.forEach((topic) => {
this.consumeChannel?.bindQueue(queueName, EXCHANGE, topic);
});
this.consumeChannel?.consume(queueName, (msg) => {
if (msg?.content) {
dataCb({
baggage: JSON.parse(msg!.content.toString("utf-8")),
topic: msg?.fields.routingKey!,
})
.then(() => {
this.consumeChannel?.ack(msg);
})
.catch((e) => {
this.consumeChannel?.nack(msg);
throw e;
});
}
});
});
},
}
: undefined,
});
}
// Step 3: Startup logic
init = async () => {
const connection = await amqp.connect("amqp://localhost");
if (this.config.publisher) {
const channel = await connection.createChannel();
await channel.assertExchange(EXCHANGE, "topic", {
durable: false,
});
this.publishChannel = channel;
}
if (this.config.subscriber) {
const channel = await connection.createChannel();
await channel.assertExchange(EXCHANGE, "topic", {
durable: false,
});
this.consumeChannel = channel;
}
this.connection = connection;
await this.bus.init();
};
publish = (args: {
payload: {};
topic: string;
metadata?: Partial<GraphQLEventbusMetadata>;
}) => {
return this.bus.publish({
payload: args.payload,
topic: args.topic,
metadata: args.metadata,
});
};
// Step 3: Close consumer logic
closeConsumer = async () => {
if (this.consumeChannel) {
await this.consumeChannel.close();
}
};
// Step 3: Close publisher logic
closePublisher = async () => {
if (this.publishChannel) {
await this.publishChannel.close();
}
};
// Step 3: Close the connection
close = async () => {
this.connection.close();
};
}