Skip to content

Commit

Permalink
test: ensure broker.subscribe can be used by consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
getlarge committed Sep 22, 2023
1 parent 961db1a commit 15de1a4
Showing 1 changed file with 75 additions and 19 deletions.
94 changes: 75 additions & 19 deletions test/aedes-instrumentation.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,16 @@ import type { IPublishPacket } from 'mqtt-packet'
import * as semver from 'semver'
import assert from 'node:assert/strict'
import { createServer } from 'node:net'
import { afterEach, beforeEach, describe, it } from 'node:test'
import { afterEach, beforeEach, describe, it, mock } from 'node:test'
import { promisify } from 'node:util'

import {
AedesAttributes,
AedesInstrumentation,
CONNECTION_ATTRIBUTES,
MqttPacketInstrumentation,
} from '../src'
import { AedesClient } from '../src/internal-types'
import { AedesClient, HandleConnect } from '../src/internal-types'
import { NO_RESOLVE, waitForEvent } from './helpers'

// polyfill for JS new feature
Expand All @@ -44,6 +45,7 @@ if (semver.lt(process.versions.node, '20.0.0')) {

let Client: typeof MqttClient
let Broker: typeof Aedes
let ConnectHandler: HandleConnect

const getBroker = async () => {
const broker = new Broker()
Expand Down Expand Up @@ -112,6 +114,7 @@ describe('Aedes', () => {
/* eslint-disable @typescript-eslint/no-var-requires */
Client = require('mqtt')
Broker = require('aedes')
ConnectHandler = require('aedes/lib/handlers/connect')
/* eslint-enable @typescript-eslint/no-var-requires */
})

Expand All @@ -124,6 +127,34 @@ describe('Aedes', () => {
instrumentation.disable()
})

/*
* this test is flaky.
* `handleConnect` seems to be correctly unpatched by the instrumentation disable method
* but still the `mqtt.connect` span is created when a client connects to the broker
*
*/
it('should not generate any spans when disabled', async () => {
mqttPacketInstrumentation.disable()
instrumentation.disable()
assert.strictEqual(instrumentation.getConfig().enabled, false)
assert.strictEqual(
instrumentation.isWrapped(Broker.prototype, 'handle'),
false
)
assert.strictEqual(
instrumentation.isWrapped(ConnectHandler, 'handleConnect'),
false
)

// eslint-disable-next-line @typescript-eslint/no-unused-vars
await using brokerWrapper = await getBroker()
// eslint-disable-next-line @typescript-eslint/no-unused-vars
await using clientWrapper = await getMqttClient(true)

const spans = memorySpanExporter.getFinishedSpans()
assert.strictEqual(spans.length, 0)
})

it('should create a span when a client is connected', async () => {
await using brokerWrapper = await getBroker()
// eslint-disable-next-line @typescript-eslint/no-unused-vars
Expand Down Expand Up @@ -162,18 +193,31 @@ describe('Aedes', () => {
assert.strictEqual(span?.attributes[AedesAttributes.CLIENT_ID], client.id)
})

// this test is reliable when running second.
// when running first, it might fail because the modules are not correctly unpatched
it('should not generate any spans when disabled', async () => {
mqttPacketInstrumentation.disable()
instrumentation.disable()
// eslint-disable-next-line @typescript-eslint/no-unused-vars
it('should allow usage with broker instance as a client', async () => {
const onDeliver = mock.fn((packet, cb) => cb())
const topic = 'test'
await using brokerWrapper = await getBroker()
// eslint-disable-next-line @typescript-eslint/no-unused-vars
await using clientWrapper = await getMqttClient(true)
const { broker } = brokerWrapper
await new Promise((resolve) => {
broker.subscribe(
topic,
(packet, cb) => onDeliver(packet, cb),
() => resolve(null)
)
})
const packet = {
topic,
payload: Buffer.from('test'),
qos: 0,
retain: false,
}
assert.strictEqual(onDeliver.mock.calls.length, 0)
await promisify(broker.publish.bind(broker))(packet)

const spans = memorySpanExporter.getFinishedSpans()
assert.strictEqual(spans.length, 0)
assert.strictEqual(onDeliver.mock.calls.length, 1)
const call = onDeliver.mock.calls[0]
assert.strictEqual(call.error, undefined)
mock.reset()
})

it('should create a span when a client is publishing a message (JSON payload)', async () => {
Expand Down Expand Up @@ -215,14 +259,26 @@ describe('Aedes', () => {
const topic = 'test'
await subscriber.subscribeAsync(topic)
await publisher.publishAsync(topic, JSON.stringify({ message: 'test' }))
await waitForEvent(
subscriber,
'message',
(topic, message, packet: IPublishPacket) => {
return packet
}
)
const promises = [
waitForEvent(
brokerWrapper.broker,
'publish',
(packet: AedesPublishPacket) => {
if (packet.topic.startsWith('$SYS')) {
return NO_RESOLVE
}
return packet
}
),
waitForEvent(
subscriber,
'message',
(topic, message, packet: IPublishPacket) => packet
),
]
await Promise.all(promises)

await memorySpanExporter.forceFlush()
const subscriberSpan = memorySpanExporter
.getFinishedSpans()
.find((span) => span.name.includes(`${topic} receive`))
Expand Down

0 comments on commit 15de1a4

Please sign in to comment.