server.js 2.5 KB
/*
 * Copyright © 2016-2018 The Thingsboard Authors
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
var config = require('config'),
    kafka = require('kafka-node'),
    Consumer = kafka.Consumer,
    Producer = kafka.Producer,
    JsMessageConsumer = require('./api/jsMessageConsumer');

var logger = require('./config/logger')('main');

var kafkaBootstrapServers = config.get('kafka.bootstrap.servers');
var kafkaRequestTopic = config.get('kafka.request_topic');

logger.info('Kafka Bootstrap Servers: %s', kafkaBootstrapServers);
logger.info('Kafka Requests Topic: %s', kafkaRequestTopic);

var kafkaClient;

(async() => {
    try {
        logger.info('Starting ThingsBoard JavaScript Executor Microservice...');

        kafkaClient = new kafka.KafkaClient({kafkaHost: kafkaBootstrapServers});

        var consumer = new Consumer(
            kafkaClient,
            [
                { topic: kafkaRequestTopic, partition: 0 }
            ],
            {
                autoCommit: true
            }
        );

        var producer = new Producer(kafkaClient);
        producer.on('error', (err) => {
            logger.error('Unexpected kafka producer error');
            logger.error(err);
        });

        producer.on('ready', () => {
            consumer.on('message', (message) => {
                JsMessageConsumer.onJsInvokeMessage(message, producer);
            });
        });

        logger.info('Started ThingsBoard JavaScript Executor Microservice.');
    } catch (e) {
        logger.error('Failed to start ThingsBoard JavaScript Executor Microservice: %s', e.message);
        logger.error(e);
        exit(-1);
    }
})();

process.on('exit', function () {
    exit(0);
});

function exit(status) {
    logger.info('Exiting with status: %d ...', status);
    if (kafkaClient) {
        logger.info('Stopping Kafka Client...');
        kafkaClient.close(() => {
            logger.info('Kafka Client stopped.');
            process.exit(status);
        });
    } else {
        process.exit(status);
    }
}