MemoryEventBus
Let's start by creating a bus using EventEmitter in NodeJS as the message broker.
We define a class that use EventEmitter as the message broker. In the constructor function, we create an instance of GraphQLEventbus class. While most of the constructor argument fields are simply passed to the GraphQLEventbus, we have to implement the custom logic for each step above for EventEmitter.
Step 1: We emit an event corresponding to a topic with the name
message-${topic}. Thebaggage, which is an object with the event payload and the metadata is serialized string usingJSON.stringifyand published with the event.Step 2: We subscribe to the topics that the consumer has subscribed to and trigger the callback
DataCbwith the name of the topic and the deserialized baggage. TheGraphQLEventBusvalidates the payload, extracts the fields and triggers thesubscriber.cbevent handler provided by the consumer of our bus.Step 3: We add an
initmethod to initialize our bus. In this case, we just initialize theGraphQLEventbus. However, in other case, we might add other startup logic. We also add aclosemethod to doEventEmittercleanup. Finally, we expose a methodpublishto publish events. It simply callspublishonGraphQLEventbusand passes through the topic, payload, and metadata to be propagated.
import { EventEmitter } from "events";
import { DocumentNode, GraphQLSchema } from "graphql";
import { EventBusSubscriberCb } from "./EventBus";
import {
DataCb,
EventBusPlugin,
GraphQLEventbusMetadata,
GraphQLEventbus,
} from "./GraphQLEventbus";
export type MemoryEventBusConfig = {
schema: GraphQLSchema;
subscriber?: {
cb: EventBusSubscriberCb;
queries: DocumentNode;
};
plugins?: EventBusPlugin[];
allowPublishNonExistingTopic?: boolean;
};
export class MemoryEventBus {
public eventEmitter = new EventEmitter();
private bus: GraphQLEventbus;
constructor(public config: MemoryEventBusConfig) {
this.eventEmitter.setMaxListeners(100000);
this.bus = new GraphQLEventbus({
plugins: config.plugins,
publisher: {
schema: config.schema,
// Step 1
publish: async (args) => {
this.eventEmitter.emit(
`message-${args.topic}`,
JSON.stringify(args.baggage),
);
},
allowInvalidTopic: config.allowPublishNonExistingTopic,
},
subscriber: this.config.subscriber
? {
cb: this.config.subscriber!.cb,
// Step 2
subscribe: (topics, cb: DataCb) => {
topics.forEach((topic) => {
this.eventEmitter.on(`message-${topic}`, async (baggage) => {
await cb({
baggage: JSON.parse(baggage),
topic,
});
});
});
},
queries: this.config.subscriber.queries,
schema: this.config.schema,
}
: undefined,
});
}
// Step 3 Startup Logic
init = async () => {
// this must be called
await this.bus.init();
};
publish = async (args: {
topic: string;
payload: {};
metadata?: Partial<GraphQLEventbusMetadata>;
}) => {
await this.bus.publish({
topic: args.topic,
payload: args.payload,
metadata: args.metadata,
});
};
// Step 3 Cleanup logic
close = async () => {
this.eventEmitter.removeAllListeners();
};
}
Next, let's do a more complex case of using RabbitMQ as our message broker.