Commit bc50ef88a629b1dad57ca4a98a7b524fd89a0233

Authored by Igor Kulikov
2 parents 1abf320c 931c7409

Merge branch 'master' of github.com:thingsboard/thingsboard

@@ -125,7 +125,7 @@ class DefaultTbContext implements TbContext { @@ -125,7 +125,7 @@ class DefaultTbContext implements TbContext {
125 125
126 @Override 126 @Override
127 public void enqueue(TbMsg tbMsg, String queueName, Runnable onSuccess, Consumer<Throwable> onFailure) { 127 public void enqueue(TbMsg tbMsg, String queueName, Runnable onSuccess, Consumer<Throwable> onFailure) {
128 - TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, queueName, getTenantId(), tbMsg.getOriginator()); 128 + TopicPartitionInfo tpi = resolvePartition(tbMsg, queueName);
129 enqueue(tpi, tbMsg, onFailure, onSuccess); 129 enqueue(tpi, tbMsg, onFailure, onSuccess);
130 } 130 }
131 131
@@ -142,46 +142,54 @@ class DefaultTbContext implements TbContext { @@ -142,46 +142,54 @@ class DefaultTbContext implements TbContext {
142 142
143 @Override 143 @Override
144 public void enqueueForTellFailure(TbMsg tbMsg, String failureMessage) { 144 public void enqueueForTellFailure(TbMsg tbMsg, String failureMessage) {
145 - TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getTenantId(), tbMsg.getOriginator()); 145 + TopicPartitionInfo tpi = resolvePartition(tbMsg);
146 enqueueForTellNext(tpi, tbMsg, Collections.singleton(TbRelationTypes.FAILURE), failureMessage, null, null); 146 enqueueForTellNext(tpi, tbMsg, Collections.singleton(TbRelationTypes.FAILURE), failureMessage, null, null);
147 } 147 }
148 148
149 @Override 149 @Override
150 public void enqueueForTellNext(TbMsg tbMsg, String relationType) { 150 public void enqueueForTellNext(TbMsg tbMsg, String relationType) {
151 - TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getTenantId(), tbMsg.getOriginator()); 151 + TopicPartitionInfo tpi = resolvePartition(tbMsg);
152 enqueueForTellNext(tpi, tbMsg, Collections.singleton(relationType), null, null, null); 152 enqueueForTellNext(tpi, tbMsg, Collections.singleton(relationType), null, null, null);
153 } 153 }
154 154
155 @Override 155 @Override
156 public void enqueueForTellNext(TbMsg tbMsg, Set<String> relationTypes) { 156 public void enqueueForTellNext(TbMsg tbMsg, Set<String> relationTypes) {
157 - TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getTenantId(), tbMsg.getOriginator()); 157 + TopicPartitionInfo tpi = resolvePartition(tbMsg);
158 enqueueForTellNext(tpi, tbMsg, relationTypes, null, null, null); 158 enqueueForTellNext(tpi, tbMsg, relationTypes, null, null, null);
159 } 159 }
160 160
161 @Override 161 @Override
162 public void enqueueForTellNext(TbMsg tbMsg, String relationType, Runnable onSuccess, Consumer<Throwable> onFailure) { 162 public void enqueueForTellNext(TbMsg tbMsg, String relationType, Runnable onSuccess, Consumer<Throwable> onFailure) {
163 - TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getTenantId(), tbMsg.getOriginator()); 163 + TopicPartitionInfo tpi = resolvePartition(tbMsg);
164 enqueueForTellNext(tpi, tbMsg, Collections.singleton(relationType), null, onSuccess, onFailure); 164 enqueueForTellNext(tpi, tbMsg, Collections.singleton(relationType), null, onSuccess, onFailure);
165 } 165 }
166 166
167 @Override 167 @Override
168 public void enqueueForTellNext(TbMsg tbMsg, Set<String> relationTypes, Runnable onSuccess, Consumer<Throwable> onFailure) { 168 public void enqueueForTellNext(TbMsg tbMsg, Set<String> relationTypes, Runnable onSuccess, Consumer<Throwable> onFailure) {
169 - TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, getTenantId(), tbMsg.getOriginator()); 169 + TopicPartitionInfo tpi = resolvePartition(tbMsg);
170 enqueueForTellNext(tpi, tbMsg, relationTypes, null, onSuccess, onFailure); 170 enqueueForTellNext(tpi, tbMsg, relationTypes, null, onSuccess, onFailure);
171 } 171 }
172 172
173 @Override 173 @Override
174 public void enqueueForTellNext(TbMsg tbMsg, String queueName, String relationType, Runnable onSuccess, Consumer<Throwable> onFailure) { 174 public void enqueueForTellNext(TbMsg tbMsg, String queueName, String relationType, Runnable onSuccess, Consumer<Throwable> onFailure) {
175 - TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, queueName, getTenantId(), tbMsg.getOriginator()); 175 + TopicPartitionInfo tpi = resolvePartition(tbMsg, queueName);
176 enqueueForTellNext(tpi, tbMsg, Collections.singleton(relationType), null, onSuccess, onFailure); 176 enqueueForTellNext(tpi, tbMsg, Collections.singleton(relationType), null, onSuccess, onFailure);
177 } 177 }
178 178
179 @Override 179 @Override
180 public void enqueueForTellNext(TbMsg tbMsg, String queueName, Set<String> relationTypes, Runnable onSuccess, Consumer<Throwable> onFailure) { 180 public void enqueueForTellNext(TbMsg tbMsg, String queueName, Set<String> relationTypes, Runnable onSuccess, Consumer<Throwable> onFailure) {
181 - TopicPartitionInfo tpi = mainCtx.resolve(ServiceType.TB_RULE_ENGINE, queueName, getTenantId(), tbMsg.getOriginator()); 181 + TopicPartitionInfo tpi = resolvePartition(tbMsg, queueName);
182 enqueueForTellNext(tpi, tbMsg, relationTypes, null, onSuccess, onFailure); 182 enqueueForTellNext(tpi, tbMsg, relationTypes, null, onSuccess, onFailure);
183 } 183 }
184 184
  185 + private TopicPartitionInfo resolvePartition(TbMsg tbMsg, String queueName) {
  186 + return mainCtx.resolve(ServiceType.TB_RULE_ENGINE, queueName, getTenantId(), tbMsg.getOriginator());
  187 + }
  188 +
  189 + private TopicPartitionInfo resolvePartition(TbMsg tbMsg) {
  190 + return resolvePartition(tbMsg, tbMsg.getQueueName());
  191 + }
  192 +
185 private void enqueueForTellNext(TopicPartitionInfo tpi, TbMsg source, Set<String> relationTypes, String failureMessage, Runnable onSuccess, Consumer<Throwable> onFailure) { 193 private void enqueueForTellNext(TopicPartitionInfo tpi, TbMsg source, Set<String> relationTypes, String failureMessage, Runnable onSuccess, Consumer<Throwable> onFailure) {
186 RuleChainId ruleChainId = nodeCtx.getSelf().getRuleChainId(); 194 RuleChainId ruleChainId = nodeCtx.getSelf().getRuleChainId();
187 RuleNodeId ruleNodeId = nodeCtx.getSelf().getId(); 195 RuleNodeId ruleNodeId = nodeCtx.getSelf().getId();
@@ -50,9 +50,9 @@ public class DefaultTbDeviceProfileCache implements TbDeviceProfileCache { @@ -50,9 +50,9 @@ public class DefaultTbDeviceProfileCache implements TbDeviceProfileCache {
50 public DeviceProfile get(TenantId tenantId, DeviceProfileId deviceProfileId) { 50 public DeviceProfile get(TenantId tenantId, DeviceProfileId deviceProfileId) {
51 DeviceProfile profile = deviceProfilesMap.get(deviceProfileId); 51 DeviceProfile profile = deviceProfilesMap.get(deviceProfileId);
52 if (profile == null) { 52 if (profile == null) {
53 - deviceProfileFetchLock.lock();  
54 profile = deviceProfilesMap.get(deviceProfileId); 53 profile = deviceProfilesMap.get(deviceProfileId);
55 if (profile == null) { 54 if (profile == null) {
  55 + deviceProfileFetchLock.lock();
56 try { 56 try {
57 profile = deviceProfileService.findDeviceProfileById(tenantId, deviceProfileId); 57 profile = deviceProfileService.findDeviceProfileById(tenantId, deviceProfileId);
58 if (profile != null) { 58 if (profile != null) {
@@ -481,7 +481,7 @@ spring: @@ -481,7 +481,7 @@ spring:
481 database-platform: "${SPRING_JPA_DATABASE_PLATFORM:org.hibernate.dialect.PostgreSQLDialect}" 481 database-platform: "${SPRING_JPA_DATABASE_PLATFORM:org.hibernate.dialect.PostgreSQLDialect}"
482 datasource: 482 datasource:
483 driverClassName: "${SPRING_DRIVER_CLASS_NAME:org.postgresql.Driver}" 483 driverClassName: "${SPRING_DRIVER_CLASS_NAME:org.postgresql.Driver}"
484 - url: "${SPRING_DATASOURCE_URL:jdbc:postgresql://localhost:5432/thingsboard_32}" 484 + url: "${SPRING_DATASOURCE_URL:jdbc:postgresql://localhost:5432/thingsboard}"
485 username: "${SPRING_DATASOURCE_USERNAME:postgres}" 485 username: "${SPRING_DATASOURCE_USERNAME:postgres}"
486 password: "${SPRING_DATASOURCE_PASSWORD:postgres}" 486 password: "${SPRING_DATASOURCE_PASSWORD:postgres}"
487 hikari: 487 hikari:
@@ -608,6 +608,8 @@ transport: @@ -608,6 +608,8 @@ transport:
608 key_password: "${MQTT_SSL_KEY_PASSWORD:server_key_password}" 608 key_password: "${MQTT_SSL_KEY_PASSWORD:server_key_password}"
609 # Type of the key store 609 # Type of the key store
610 key_store_type: "${MQTT_SSL_KEY_STORE_TYPE:JKS}" 610 key_store_type: "${MQTT_SSL_KEY_STORE_TYPE:JKS}"
  611 + # Skip certificate validity check for client certificates.
  612 + skip_validity_check_for_client_cert: "${MQTT_SSL_SKIP_VALIDITY_CHECK_FOR_CLIENT_CERT:false}"
611 # Local CoAP transport parameters 613 # Local CoAP transport parameters
612 coap: 614 coap:
613 # Enable/disable coap transport protocol. 615 # Enable/disable coap transport protocol.
@@ -52,6 +52,10 @@ public class MqttTransportContext extends TransportContext { @@ -52,6 +52,10 @@ public class MqttTransportContext extends TransportContext {
52 private Integer maxPayloadSize; 52 private Integer maxPayloadSize;
53 53
54 @Getter 54 @Getter
  55 + @Value("${transport.mqtt.netty.skip_validity_check_for_client_cert:false}")
  56 + private boolean skipValidityCheckForClientCert;
  57 +
  58 + @Getter
55 @Setter 59 @Setter
56 private SslHandler sslHandler; 60 private SslHandler sslHandler;
57 61
@@ -387,7 +387,9 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement @@ -387,7 +387,9 @@ public class MqttTransportHandler extends ChannelInboundHandlerAdapter implement
387 387
388 private void processX509CertConnect(ChannelHandlerContext ctx, X509Certificate cert) { 388 private void processX509CertConnect(ChannelHandlerContext ctx, X509Certificate cert) {
389 try { 389 try {
390 - cert.checkValidity(new Date()); 390 + if(!context.isSkipValidityCheckForClientCert()){
  391 + cert.checkValidity();
  392 + }
391 String strCert = SslUtil.getX509CertificateString(cert); 393 String strCert = SslUtil.getX509CertificateString(cert);
392 String sha3Hash = EncryptionUtil.getSha3Hash(strCert); 394 String sha3Hash = EncryptionUtil.getSha3Hash(strCert);
393 transportService.process(DeviceTransportType.MQTT, ValidateDeviceX509CertRequestMsg.newBuilder().setHash(sha3Hash).build(), 395 transportService.process(DeviceTransportType.MQTT, ValidateDeviceX509CertRequestMsg.newBuilder().setHash(sha3Hash).build(),
@@ -66,6 +66,8 @@ transport: @@ -66,6 +66,8 @@ transport:
66 key_password: "${MQTT_SSL_KEY_PASSWORD:server_key_password}" 66 key_password: "${MQTT_SSL_KEY_PASSWORD:server_key_password}"
67 # Type of the key store 67 # Type of the key store
68 key_store_type: "${MQTT_SSL_KEY_STORE_TYPE:JKS}" 68 key_store_type: "${MQTT_SSL_KEY_STORE_TYPE:JKS}"
  69 + # Skip certificate validity check for client certificates.
  70 + skip_validity_check_for_client_cert: "${MQTT_SSL_SKIP_VALIDITY_CHECK_FOR_CLIENT_CERT:false}"
69 sessions: 71 sessions:
70 inactivity_timeout: "${TB_TRANSPORT_SESSIONS_INACTIVITY_TIMEOUT:300000}" 72 inactivity_timeout: "${TB_TRANSPORT_SESSIONS_INACTIVITY_TIMEOUT:300000}"
71 report_timeout: "${TB_TRANSPORT_SESSIONS_REPORT_TIMEOUT:30000}" 73 report_timeout: "${TB_TRANSPORT_SESSIONS_REPORT_TIMEOUT:30000}"