rabbitmqTemplate.ts
4.49 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
///
/// Copyright © 2016-2024 The Thingsboard Authors
///
/// Licensed under the Apache License, Version 2.0 (the "License");
/// you may not use this file except in compliance with the License.
/// You may obtain a copy of the License at
///
/// http://www.apache.org/licenses/LICENSE-2.0
///
/// Unless required by applicable law or agreed to in writing, software
/// distributed under the License is distributed on an "AS IS" BASIS,
/// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
/// See the License for the specific language governing permissions and
/// limitations under the License.
///
import config from 'config';
import { _logger } from '../config/logger';
import { JsInvokeMessageProcessor } from '../api/jsInvokeMessageProcessor'
import { IQueue } from './queue.models';
import amqp, { ConfirmChannel, Connection } from 'amqplib';
import { Options, Replies } from 'amqplib/properties';
export class RabbitMqTemplate implements IQueue {
private logger = _logger(`rabbitmqTemplate`);
private queuePrefix: string = config.get('queue_prefix');
private requestTopic: string = this.queuePrefix ? this.queuePrefix + "." + config.get('request_topic') : config.get('request_topic');
private host = config.get('rabbitmq.host');
private port = config.get('rabbitmq.port');
private vhost = config.get('rabbitmq.virtual_host');
private username = config.get('rabbitmq.username');
private password = config.get('rabbitmq.password');
private queueProperties: string = config.get('rabbitmq.queue_properties');
private queueOptions: Options.AssertQueue = {
durable: false,
exclusive: false,
autoDelete: false
};
private connection: Connection;
private channel: ConfirmChannel;
private topics: string[] = [];
name = 'RabbitMQ';
constructor() {
}
async init(): Promise<void> {
const url = `amqp://${this.username}:${this.password}@${this.host}:${this.port}${this.vhost}`;
this.connection = await amqp.connect(url);
this.channel = await this.connection.createConfirmChannel();
this.parseQueueProperties();
await this.createQueue(this.requestTopic);
const messageProcessor = new JsInvokeMessageProcessor(this);
await this.channel.consume(this.requestTopic, (message) => {
if (message) {
messageProcessor.onJsInvokeMessage(JSON.parse(message.content.toString('utf8')));
this.channel.ack(message);
}
})
}
async send(responseTopic: string, msgKey: string, rawResponse: Buffer, headers: any): Promise<any> {
if (!this.topics.includes(responseTopic)) {
await this.createQueue(responseTopic);
this.topics.push(responseTopic);
}
let data = JSON.stringify(
{
key: msgKey,
data: [...rawResponse],
headers: headers
});
let dataBuffer = Buffer.from(data);
this.channel.sendToQueue(responseTopic, dataBuffer);
return this.channel.waitForConfirms()
}
private parseQueueProperties() {
let args: { [n: string]: number } = {};
const props = this.queueProperties.split(';');
props.forEach(p => {
const delimiterPosition = p.indexOf(':');
args[p.substring(0, delimiterPosition)] = Number(p.substring(delimiterPosition + 1));
});
this.queueOptions['arguments'] = args;
}
private async createQueue(topic: string): Promise<Replies.AssertQueue> {
return this.channel.assertQueue(topic, this.queueOptions);
}
async destroy() {
this.logger.info('Stopping RabbitMQ resources...');
if (this.channel) {
this.logger.info('Stopping RabbitMQ chanel...');
const _channel = this.channel;
// @ts-ignore
delete this.channel;
await _channel.close();
this.logger.info('RabbitMQ chanel stopped');
}
if (this.connection) {
this.logger.info('Stopping RabbitMQ connection...')
try {
const _connection = this.connection;
// @ts-ignore
delete this.connection;
await _connection.close();
this.logger.info('RabbitMQ client connection.');
} catch (e) {
this.logger.info('RabbitMQ connection stop error.');
}
}
this.logger.info('RabbitMQ resources stopped.')
}
}