Commit 2eeb6d43b073e250a9be842be9f9d040323b99f9

Authored by Igor Kulikov
2 parents 483a69fe 522a31c0

Merge branch 'master' of github.com:thingsboard/thingsboard

Showing 15 changed files with 36 additions and 21 deletions
... ... @@ -882,10 +882,15 @@ queue:
882 882 sasl.mechanism: "${TB_QUEUE_KAFKA_CONFLUENT_SASL_MECHANISM:PLAIN}"
883 883 sasl.config: "${TB_QUEUE_KAFKA_CONFLUENT_SASL_JAAS_CONFIG:org.apache.kafka.common.security.plain.PlainLoginModule required username=\"CLUSTER_API_KEY\" password=\"CLUSTER_API_SECRET\";}"
884 884 security.protocol: "${TB_QUEUE_KAFKA_CONFLUENT_SECURITY_PROTOCOL:SASL_SSL}"
  885 + # Key-value properties for Kafka consumer per specific topic, e.g. tb_ota_package is a topic name for ota, tb_rule_engine.sq is a topic name for default SequentialByOriginator queue.
  886 + # Check TB_QUEUE_CORE_OTA_TOPIC and TB_QUEUE_RE_SQ_TOPIC params
885 887 consumer-properties-per-topic:
886 888 tb_ota_package:
887 889 - key: max.poll.records
888   - value: 10
  890 + value: "${TB_QUEUE_KAFKA_OTA_MAX_POLL_RECORDS:10}"
  891 +# tb_rule_engine.sq:
  892 +# - key: max.poll.records
  893 +# value: "${TB_QUEUE_KAFKA_SQ_MAX_POLL_RECORDS:1024}"
889 894 other: # In this section you can specify custom parameters for Kafka consumer/producer and expose the env variables to configure outside
890 895 - key: "request.timeout.ms" # refer to https://docs.confluent.io/platform/current/installation/configuration/producer-configs.html#producerconfigs_request.timeout.ms
891 896 value: "${TB_QUEUE_KAFKA_REQUEST_TIMEOUT_MS:30000}" # (30 seconds)
... ...
... ... @@ -117,7 +117,7 @@ public class AwsSqsMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng
117 117
118 118 @Override
119 119 public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> createToRuleEngineMsgConsumer(TbRuleEngineQueueConfiguration configuration) {
120   - return new TbAwsSqsConsumerTemplate<>(ruleEngineAdmin, sqsSettings, ruleEngineSettings.getTopic(),
  120 + return new TbAwsSqsConsumerTemplate<>(ruleEngineAdmin, sqsSettings, configuration.getTopic(),
121 121 msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders()));
122 122 }
123 123
... ...
... ... @@ -109,7 +109,7 @@ public class AwsSqsTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory
109 109
110 110 @Override
111 111 public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> createToRuleEngineMsgConsumer(TbRuleEngineQueueConfiguration configuration) {
112   - return new TbAwsSqsConsumerTemplate<>(ruleEngineAdmin, sqsSettings, ruleEngineSettings.getTopic(),
  112 + return new TbAwsSqsConsumerTemplate<>(ruleEngineAdmin, sqsSettings, configuration.getTopic(),
113 113 msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders()));
114 114 }
115 115
... ...
... ... @@ -92,7 +92,7 @@ public class InMemoryMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE
92 92
93 93 @Override
94 94 public TbQueueConsumer<TbProtoQueueMsg<TransportProtos.ToRuleEngineMsg>> createToRuleEngineMsgConsumer(TbRuleEngineQueueConfiguration configuration) {
95   - return new InMemoryTbQueueConsumer<>(ruleEngineSettings.getTopic());
  95 + return new InMemoryTbQueueConsumer<>(configuration.getTopic());
96 96 }
97 97
98 98 @Override
... ...
... ... @@ -160,7 +160,7 @@ public class KafkaMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEngi
160 160 String queueName = configuration.getName();
161 161 TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToRuleEngineMsg>> consumerBuilder = TbKafkaConsumerTemplate.builder();
162 162 consumerBuilder.settings(kafkaSettings);
163   - consumerBuilder.topic(ruleEngineSettings.getTopic());
  163 + consumerBuilder.topic(configuration.getTopic());
164 164 consumerBuilder.clientId("re-" + queueName + "-consumer-" + serviceInfoProvider.getServiceId() + "-" + consumerCount.incrementAndGet());
165 165 consumerBuilder.groupId("re-" + queueName + "-consumer");
166 166 consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders()));
... ...
... ... @@ -159,7 +159,7 @@ public class KafkaTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory {
159 159 String queueName = configuration.getName();
160 160 TbKafkaConsumerTemplate.TbKafkaConsumerTemplateBuilder<TbProtoQueueMsg<ToRuleEngineMsg>> consumerBuilder = TbKafkaConsumerTemplate.builder();
161 161 consumerBuilder.settings(kafkaSettings);
162   - consumerBuilder.topic(ruleEngineSettings.getTopic());
  162 + consumerBuilder.topic(configuration.getTopic());
163 163 consumerBuilder.clientId("re-" + queueName + "-consumer-" + serviceInfoProvider.getServiceId() + "-" + consumerCount.incrementAndGet());
164 164 consumerBuilder.groupId("re-" + queueName + "-consumer");
165 165 consumerBuilder.decoder(msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders()));
... ...
... ... @@ -127,7 +127,7 @@ public class PubSubMonolithQueueFactory implements TbCoreQueueFactory, TbRuleEng
127 127
128 128 @Override
129 129 public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> createToRuleEngineMsgConsumer(TbRuleEngineQueueConfiguration configuration) {
130   - return new TbPubSubConsumerTemplate<>(ruleEngineAdmin, pubSubSettings, ruleEngineSettings.getTopic(),
  130 + return new TbPubSubConsumerTemplate<>(ruleEngineAdmin, pubSubSettings, configuration.getTopic(),
131 131 msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders()));
132 132 }
133 133
... ...
... ... @@ -114,7 +114,7 @@ public class PubSubTbRuleEngineQueueFactory implements TbRuleEngineQueueFactory
114 114
115 115 @Override
116 116 public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> createToRuleEngineMsgConsumer(TbRuleEngineQueueConfiguration configuration) {
117   - return new TbPubSubConsumerTemplate<>(ruleEngineAdmin, pubSubSettings, ruleEngineSettings.getTopic(),
  117 + return new TbPubSubConsumerTemplate<>(ruleEngineAdmin, pubSubSettings, configuration.getTopic(),
118 118 msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders()));
119 119 }
120 120
... ...
... ... @@ -125,7 +125,7 @@ public class RabbitMqMonolithQueueFactory implements TbCoreQueueFactory, TbRuleE
125 125
126 126 @Override
127 127 public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> createToRuleEngineMsgConsumer(TbRuleEngineQueueConfiguration configuration) {
128   - return new TbRabbitMqConsumerTemplate<>(ruleEngineAdmin, rabbitMqSettings, ruleEngineSettings.getTopic(),
  128 + return new TbRabbitMqConsumerTemplate<>(ruleEngineAdmin, rabbitMqSettings, configuration.getTopic(),
129 129 msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders()));
130 130 }
131 131
... ...
... ... @@ -112,7 +112,7 @@ public class RabbitMqTbRuleEngineQueueFactory implements TbRuleEngineQueueFactor
112 112
113 113 @Override
114 114 public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> createToRuleEngineMsgConsumer(TbRuleEngineQueueConfiguration configuration) {
115   - return new TbRabbitMqConsumerTemplate<>(ruleEngineAdmin, rabbitMqSettings, ruleEngineSettings.getTopic(),
  115 + return new TbRabbitMqConsumerTemplate<>(ruleEngineAdmin, rabbitMqSettings, configuration.getTopic(),
116 116 msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders()));
117 117 }
118 118
... ...
... ... @@ -124,7 +124,7 @@ public class ServiceBusMonolithQueueFactory implements TbCoreQueueFactory, TbRul
124 124
125 125 @Override
126 126 public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> createToRuleEngineMsgConsumer(TbRuleEngineQueueConfiguration configuration) {
127   - return new TbServiceBusConsumerTemplate<>(ruleEngineAdmin, serviceBusSettings, ruleEngineSettings.getTopic(),
  127 + return new TbServiceBusConsumerTemplate<>(ruleEngineAdmin, serviceBusSettings, configuration.getTopic(),
128 128 msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders()));
129 129 }
130 130
... ...
... ... @@ -112,7 +112,7 @@ public class ServiceBusTbRuleEngineQueueFactory implements TbRuleEngineQueueFact
112 112
113 113 @Override
114 114 public TbQueueConsumer<TbProtoQueueMsg<ToRuleEngineMsg>> createToRuleEngineMsgConsumer(TbRuleEngineQueueConfiguration configuration) {
115   - return new TbServiceBusConsumerTemplate<>(ruleEngineAdmin, serviceBusSettings, ruleEngineSettings.getTopic(),
  115 + return new TbServiceBusConsumerTemplate<>(ruleEngineAdmin, serviceBusSettings, configuration.getTopic(),
116 116 msg -> new TbProtoQueueMsg<>(msg.getKey(), ToRuleEngineMsg.parseFrom(msg.getData()), msg.getHeaders()));
117 117 }
118 118
... ...
... ... @@ -95,7 +95,7 @@ public class LwM2mVersionedModelProvider implements LwM2mModelProvider {
95 95 if (objectModel != null)
96 96 return objectModel.resources.get(resourceId);
97 97 else
98   - log.trace("TbResources (Object model) with id [{}/0/{}] not found on the server", objectId, resourceId);
  98 + log.trace("Tenant hasn't such the TbResources: Object model with id [{}/0/{}].", objectId, resourceId);
99 99 return null;
100 100 } catch (Exception e) {
101 101 log.error("", e);
... ... @@ -128,14 +128,17 @@ public class LwM2mVersionedModelProvider implements LwM2mModelProvider {
128 128 private ObjectModel getObjectModelDynamic(Integer objectId, String version) {
129 129 String key = getKeyIdVer(objectId, version);
130 130 ObjectModel objectModel = models.get(tenantId).get(key);
131   -
132 131 if (objectModel == null) {
133 132 modelsLock.lock();
134 133 try {
135 134 objectModel = models.get(tenantId).get(key);
136 135 if (objectModel == null) {
137 136 objectModel = getObjectModel(key);
  137 + }
  138 + if (objectModel != null) {
138 139 models.get(tenantId).put(key, objectModel);
  140 + } else {
  141 + log.error("Tenant hasn't such the resource: Object model with id [{}] version [{}].", objectId, version);
139 142 }
140 143 } finally {
141 144 modelsLock.unlock();
... ...
... ... @@ -186,6 +186,9 @@ public class DefaultLwM2mDownlinkMsgHandler extends LwM2MExecutorAwareService im
186 186 }
187 187 sendSimpleRequest(client, downlink, request.getTimeout(), callback);
188 188 }
  189 + else {
  190 + callback.onValidationError(toString(request), "Tenant hasn't such the TbResources: " + request.getVersionedId() + "!");
  191 + }
189 192 }
190 193
191 194 @Override
... ... @@ -245,7 +248,7 @@ public class DefaultLwM2mDownlinkMsgHandler extends LwM2MExecutorAwareService im
245 248 callback.onError(toString(request), e);
246 249 }
247 250 } else {
248   - callback.onValidationError(toString(request), "Resource " + request.getVersionedId() + " is not configured in the device profile!");
  251 + callback.onValidationError(toString(request), "Tenant hasn't such the TbResources: " + request.getVersionedId() + "!");
249 252 }
250 253 }
251 254
... ... @@ -271,10 +274,15 @@ public class DefaultLwM2mDownlinkMsgHandler extends LwM2MExecutorAwareService im
271 274 **/
272 275 Collection<LwM2mResource> resources = client.getNewResourceForInstance(request.getVersionedId(), request.getValue(), modelProvider, this.converter);
273 276 ResourceModel resourceModelWrite = client.getResourceModel(request.getVersionedId(), modelProvider);
274   - ContentFormat contentFormat = request.getObjectContentFormat() != null ? request.getObjectContentFormat() : convertResourceModelTypeToContentFormat(client, resourceModelWrite.type);
275   - WriteRequest downlink = new WriteRequest(WriteRequest.Mode.UPDATE, contentFormat, resultIds.getObjectId(),
276   - resultIds.getObjectInstanceId(), resources);
277   - sendSimpleRequest(client, downlink, request.getTimeout(), callback);
  277 + if (resourceModelWrite != null) {
  278 + ContentFormat contentFormat = request.getObjectContentFormat() != null ? request.getObjectContentFormat() : convertResourceModelTypeToContentFormat(client, resourceModelWrite.type);
  279 + WriteRequest downlink = new WriteRequest(WriteRequest.Mode.UPDATE, contentFormat, resultIds.getObjectId(),
  280 + resultIds.getObjectInstanceId(), resources);
  281 + sendSimpleRequest(client, downlink, request.getTimeout(), callback);
  282 + }
  283 + else {
  284 + callback.onValidationError(toString(request), "Tenant hasn't such the TbResources: " + request.getVersionedId() + " !");
  285 + }
278 286 } else if (resultIds.isObjectInstance()) {
279 287 /*
280 288 * params = "{\"id\":0,\"resources\":[{\"id\":14,\"value\":\"+5\"},{\"id\":15,\"value\":\"+9\"}]}"
... ...
... ... @@ -995,7 +995,6 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
995 995 @Override
996 996 public void onAttributeUpdate(UUID sessionId, TransportProtos.AttributeUpdateNotificationMsg notification) {
997 997 log.trace("[{}] Received attributes update notification to device", sessionId);
998   - log.info("[{}] : attrSubTopicType: {}", notification.toString(), attrSubTopicType);
999 998 String topic;
1000 999 MqttTransportAdaptor adaptor;
1001 1000 switch (attrSubTopicType) {
... ... @@ -1031,7 +1030,7 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
1031 1030
1032 1031 @Override
1033 1032 public void onToDeviceRpcRequest(UUID sessionId, TransportProtos.ToDeviceRpcRequestMsg rpcRequest) {
1034   - log.info("[{}] Received RPC command to device", sessionId);
  1033 + log.trace("[{}] Received RPC command to device", sessionId);
1035 1034 String baseTopic;
1036 1035 MqttTransportAdaptor adaptor;
1037 1036 switch (rpcSubTopicType) {
... ...