Commit 37fd7f4988f703f5d0f6aed2a7cc5856ff2bba7f
1 parent
6ad78e54
Validation and Replacement of Queue names in the Device profile
Showing
11 changed files
with
127 additions
and
34 deletions
... | ... | @@ -15,7 +15,7 @@ |
15 | 15 | */ |
16 | 16 | package org.thingsboard.server.controller; |
17 | 17 | |
18 | -import org.springframework.beans.factory.annotation.Autowired; | |
18 | +import lombok.RequiredArgsConstructor; | |
19 | 19 | import org.springframework.security.access.prepost.PreAuthorize; |
20 | 20 | import org.springframework.web.bind.annotation.RequestMapping; |
21 | 21 | import org.springframework.web.bind.annotation.RequestMethod; |
... | ... | @@ -24,41 +24,26 @@ import org.springframework.web.bind.annotation.ResponseBody; |
24 | 24 | import org.springframework.web.bind.annotation.RestController; |
25 | 25 | import org.thingsboard.server.common.data.exception.ThingsboardException; |
26 | 26 | import org.thingsboard.server.common.msg.queue.ServiceType; |
27 | -import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; | |
28 | -import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration; | |
27 | +import org.thingsboard.server.queue.QueueService; | |
29 | 28 | import org.thingsboard.server.queue.util.TbCoreComponent; |
30 | 29 | |
31 | -import java.util.Arrays; | |
32 | -import java.util.Collections; | |
33 | -import java.util.List; | |
34 | -import java.util.stream.Collectors; | |
30 | +import java.util.Set; | |
35 | 31 | |
36 | 32 | @RestController |
37 | 33 | @TbCoreComponent |
38 | 34 | @RequestMapping("/api") |
35 | +@RequiredArgsConstructor | |
39 | 36 | public class QueueController extends BaseController { |
40 | 37 | |
41 | - @Autowired(required = false) | |
42 | - private TbQueueRuleEngineSettings ruleEngineSettings; | |
38 | + private final QueueService queueService; | |
43 | 39 | |
44 | 40 | @PreAuthorize("hasAuthority('TENANT_ADMIN')") |
45 | 41 | @RequestMapping(value = "/tenant/queues", params = {"serviceType"}, method = RequestMethod.GET) |
46 | 42 | @ResponseBody |
47 | - public List<String> getTenantQueuesByServiceType(@RequestParam String serviceType) throws ThingsboardException { | |
43 | + public Set<String> getTenantQueuesByServiceType(@RequestParam String serviceType) throws ThingsboardException { | |
48 | 44 | checkParameter("serviceType", serviceType); |
49 | 45 | try { |
50 | - ServiceType type = ServiceType.valueOf(serviceType); | |
51 | - switch (type) { | |
52 | - case TB_RULE_ENGINE: | |
53 | - if (ruleEngineSettings == null) { | |
54 | - return Arrays.asList("Main", "HighPriority", "SequentialByOriginator"); | |
55 | - } | |
56 | - return ruleEngineSettings.getQueues().stream() | |
57 | - .map(TbRuleEngineQueueConfiguration::getName) | |
58 | - .collect(Collectors.toList()); | |
59 | - default: | |
60 | - return Collections.emptyList(); | |
61 | - } | |
46 | + return queueService.getQueuesByServiceType(ServiceType.valueOf(serviceType)); | |
62 | 47 | } catch (Exception e) { |
63 | 48 | throw handleException(e); |
64 | 49 | } | ... | ... |
... | ... | @@ -21,11 +21,13 @@ import org.junit.Assert; |
21 | 21 | import org.junit.Before; |
22 | 22 | import org.junit.Test; |
23 | 23 | import org.junit.runner.RunWith; |
24 | +import org.mockito.Mockito; | |
24 | 25 | import org.mockito.junit.MockitoJUnitRunner; |
25 | 26 | import org.springframework.context.ApplicationEventPublisher; |
26 | 27 | import org.springframework.test.util.ReflectionTestUtils; |
27 | 28 | import org.thingsboard.server.common.data.id.DeviceId; |
28 | 29 | import org.thingsboard.server.common.data.id.TenantId; |
30 | +import org.thingsboard.server.queue.QueueService; | |
29 | 31 | import org.thingsboard.server.queue.discovery.HashPartitionService; |
30 | 32 | import org.thingsboard.server.common.msg.queue.ServiceType; |
31 | 33 | import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; |
... | ... | @@ -57,6 +59,7 @@ public class HashPartitionServiceTest { |
57 | 59 | private TenantRoutingInfoService routingInfoService; |
58 | 60 | private ApplicationEventPublisher applicationEventPublisher; |
59 | 61 | private TbQueueRuleEngineSettings ruleEngineSettings; |
62 | + private QueueService queueService; | |
60 | 63 | |
61 | 64 | private String hashFunctionName = "sha256"; |
62 | 65 | |
... | ... | @@ -67,10 +70,12 @@ public class HashPartitionServiceTest { |
67 | 70 | applicationEventPublisher = mock(ApplicationEventPublisher.class); |
68 | 71 | routingInfoService = mock(TenantRoutingInfoService.class); |
69 | 72 | ruleEngineSettings = mock(TbQueueRuleEngineSettings.class); |
73 | + queueService = mock(QueueService.class); | |
70 | 74 | clusterRoutingService = new HashPartitionService(discoveryService, |
71 | 75 | routingInfoService, |
72 | 76 | applicationEventPublisher, |
73 | - ruleEngineSettings | |
77 | + ruleEngineSettings, | |
78 | + queueService | |
74 | 79 | ); |
75 | 80 | when(ruleEngineSettings.getQueues()).thenReturn(Collections.emptyList()); |
76 | 81 | ReflectionTestUtils.setField(clusterRoutingService, "coreTopic", "tb.core"); |
... | ... | @@ -82,6 +87,7 @@ public class HashPartitionServiceTest { |
82 | 87 | .setTenantIdLSB(TenantId.NULL_UUID.getLeastSignificantBits()) |
83 | 88 | .addAllServiceTypes(Collections.singletonList(ServiceType.TB_CORE.name())) |
84 | 89 | .build(); |
90 | +// when(queueService.resolve(Mockito.any(), Mockito.anyString())).thenAnswer(i -> i.getArguments()[1]); | |
85 | 91 | // when(discoveryService.getServiceInfo()).thenReturn(currentServer); |
86 | 92 | List<TransportProtos.ServiceInfo> otherServers = new ArrayList<>(); |
87 | 93 | for (int i = 1; i < SERVER_COUNT; i++) { | ... | ... |
1 | +/** | |
2 | + * Copyright © 2016-2021 The Thingsboard Authors | |
3 | + * | |
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | + * you may not use this file except in compliance with the License. | |
6 | + * You may obtain a copy of the License at | |
7 | + * | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | + * | |
10 | + * Unless required by applicable law or agreed to in writing, software | |
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | |
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | + * See the License for the specific language governing permissions and | |
14 | + * limitations under the License. | |
15 | + */ | |
16 | +package org.thingsboard.server.queue; | |
17 | + | |
18 | +import org.thingsboard.server.common.msg.queue.ServiceType; | |
19 | + | |
20 | +import java.util.Set; | |
21 | + | |
22 | +public interface QueueService { | |
23 | + | |
24 | + Set<String> getQueuesByServiceType(ServiceType serviceType); | |
25 | + | |
26 | + String resolve(ServiceType serviceType, String queueName); | |
27 | + | |
28 | +} | ... | ... |
... | ... | @@ -42,6 +42,10 @@ |
42 | 42 | </dependency> |
43 | 43 | <dependency> |
44 | 44 | <groupId>org.thingsboard.common</groupId> |
45 | + <artifactId>cluster-api</artifactId> | |
46 | + </dependency> | |
47 | + <dependency> | |
48 | + <groupId>org.thingsboard.common</groupId> | |
45 | 49 | <artifactId>message</artifactId> |
46 | 50 | </dependency> |
47 | 51 | <dependency> | ... | ... |
1 | +/** | |
2 | + * Copyright © 2016-2021 The Thingsboard Authors | |
3 | + * | |
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | + * you may not use this file except in compliance with the License. | |
6 | + * You may obtain a copy of the License at | |
7 | + * | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | + * | |
10 | + * Unless required by applicable law or agreed to in writing, software | |
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | |
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | + * See the License for the specific language governing permissions and | |
14 | + * limitations under the License. | |
15 | + */ | |
16 | +package org.thingsboard.server.queue; | |
17 | + | |
18 | +import lombok.RequiredArgsConstructor; | |
19 | +import org.springframework.stereotype.Service; | |
20 | +import org.springframework.util.StringUtils; | |
21 | +import org.thingsboard.server.common.msg.queue.ServiceQueue; | |
22 | +import org.thingsboard.server.common.msg.queue.ServiceType; | |
23 | +import org.thingsboard.server.queue.settings.TbQueueRuleEngineSettings; | |
24 | +import org.thingsboard.server.queue.settings.TbRuleEngineQueueConfiguration; | |
25 | + | |
26 | +import javax.annotation.PostConstruct; | |
27 | +import java.util.Collections; | |
28 | +import java.util.LinkedHashSet; | |
29 | +import java.util.Set; | |
30 | +import java.util.stream.Collectors; | |
31 | + | |
32 | +@Service | |
33 | +@RequiredArgsConstructor | |
34 | +public class DefaultQueueService implements QueueService { | |
35 | + | |
36 | + private final TbQueueRuleEngineSettings ruleEngineSettings; | |
37 | + private Set<String> ruleEngineQueues; | |
38 | + | |
39 | + @PostConstruct | |
40 | + public void init() { | |
41 | + ruleEngineQueues = ruleEngineSettings.getQueues().stream() | |
42 | + .map(TbRuleEngineQueueConfiguration::getName).collect(Collectors.toCollection(LinkedHashSet::new)); | |
43 | + } | |
44 | + | |
45 | + @Override | |
46 | + public Set<String> getQueuesByServiceType(ServiceType type) { | |
47 | + if (type == ServiceType.TB_RULE_ENGINE) { | |
48 | + return ruleEngineQueues; | |
49 | + } else { | |
50 | + return Collections.emptySet(); | |
51 | + } | |
52 | + } | |
53 | + | |
54 | + @Override | |
55 | + public String resolve(ServiceType serviceType, String queueName) { | |
56 | + if (StringUtils.isEmpty(queueName) || !getQueuesByServiceType(serviceType).contains(queueName)) { | |
57 | + return ServiceQueue.MAIN; | |
58 | + } else { | |
59 | + return queueName; | |
60 | + } | |
61 | + } | |
62 | +} | ... | ... |
... | ... | @@ -22,6 +22,7 @@ import lombok.extern.slf4j.Slf4j; |
22 | 22 | import org.springframework.beans.factory.annotation.Value; |
23 | 23 | import org.springframework.context.ApplicationEventPublisher; |
24 | 24 | import org.springframework.stereotype.Service; |
25 | +import org.springframework.util.StringUtils; | |
25 | 26 | import org.thingsboard.server.common.data.id.EntityId; |
26 | 27 | import org.thingsboard.server.common.data.id.TenantId; |
27 | 28 | import org.thingsboard.server.common.msg.queue.ServiceQueue; |
... | ... | @@ -30,6 +31,7 @@ import org.thingsboard.server.common.msg.queue.ServiceType; |
30 | 31 | import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; |
31 | 32 | import org.thingsboard.server.gen.transport.TransportProtos; |
32 | 33 | import org.thingsboard.server.gen.transport.TransportProtos.ServiceInfo; |
34 | +import org.thingsboard.server.queue.QueueService; | |
33 | 35 | import org.thingsboard.server.queue.discovery.event.ClusterTopologyChangeEvent; |
34 | 36 | import org.thingsboard.server.queue.discovery.event.PartitionChangeEvent; |
35 | 37 | import org.thingsboard.server.queue.discovery.event.ServiceListChangedEvent; |
... | ... | @@ -64,6 +66,7 @@ public class HashPartitionService implements PartitionService { |
64 | 66 | private final TbServiceInfoProvider serviceInfoProvider; |
65 | 67 | private final TenantRoutingInfoService tenantRoutingInfoService; |
66 | 68 | private final TbQueueRuleEngineSettings tbQueueRuleEngineSettings; |
69 | + private final QueueService queueService; | |
67 | 70 | private final ConcurrentMap<ServiceQueue, String> partitionTopics = new ConcurrentHashMap<>(); |
68 | 71 | private final ConcurrentMap<ServiceQueue, Integer> partitionSizes = new ConcurrentHashMap<>(); |
69 | 72 | private final ConcurrentMap<TenantId, TenantRoutingInfo> tenantRoutingInfoMap = new ConcurrentHashMap<>(); |
... | ... | @@ -81,11 +84,13 @@ public class HashPartitionService implements PartitionService { |
81 | 84 | public HashPartitionService(TbServiceInfoProvider serviceInfoProvider, |
82 | 85 | TenantRoutingInfoService tenantRoutingInfoService, |
83 | 86 | ApplicationEventPublisher applicationEventPublisher, |
84 | - TbQueueRuleEngineSettings tbQueueRuleEngineSettings) { | |
87 | + TbQueueRuleEngineSettings tbQueueRuleEngineSettings, | |
88 | + QueueService queueService) { | |
85 | 89 | this.serviceInfoProvider = serviceInfoProvider; |
86 | 90 | this.tenantRoutingInfoService = tenantRoutingInfoService; |
87 | 91 | this.applicationEventPublisher = applicationEventPublisher; |
88 | 92 | this.tbQueueRuleEngineSettings = tbQueueRuleEngineSettings; |
93 | + this.queueService = queueService; | |
89 | 94 | } |
90 | 95 | |
91 | 96 | @PostConstruct |
... | ... | @@ -106,6 +111,7 @@ public class HashPartitionService implements PartitionService { |
106 | 111 | |
107 | 112 | @Override |
108 | 113 | public TopicPartitionInfo resolve(ServiceType serviceType, String queueName, TenantId tenantId, EntityId entityId) { |
114 | + queueName = queueService.resolve(serviceType, queueName); | |
109 | 115 | return resolve(new ServiceQueue(serviceType, queueName), tenantId, entityId); |
110 | 116 | } |
111 | 117 | ... | ... |
... | ... | @@ -36,8 +36,6 @@ public class TbQueueRuleEngineSettings { |
36 | 36 | private String topic; |
37 | 37 | private List<TbRuleEngineQueueConfiguration> queues; |
38 | 38 | |
39 | - //TODO 2.5 ybondarenko: make sure the queue names are valid to all queue providers. | |
40 | - // See how they are used in TbRuleEngineQueueFactory.createToRuleEngineMsgConsumer and all producers | |
41 | 39 | @PostConstruct |
42 | 40 | public void validate() { |
43 | 41 | queues.stream().filter(queue -> queue.getName().equals("Main")).findFirst().orElseThrow(() -> { | ... | ... |
... | ... | @@ -28,7 +28,7 @@ import com.squareup.wire.schema.internal.parser.ProtoFileElement; |
28 | 28 | import com.squareup.wire.schema.internal.parser.ProtoParser; |
29 | 29 | import com.squareup.wire.schema.internal.parser.TypeElement; |
30 | 30 | import lombok.extern.slf4j.Slf4j; |
31 | -import org.apache.commons.lang3.StringUtils; | |
31 | +import org.thingsboard.server.common.data.StringUtils; | |
32 | 32 | import org.hibernate.exception.ConstraintViolationException; |
33 | 33 | import org.springframework.beans.factory.annotation.Autowired; |
34 | 34 | import org.springframework.cache.Cache; |
... | ... | @@ -63,6 +63,7 @@ import org.thingsboard.server.common.data.id.TenantId; |
63 | 63 | import org.thingsboard.server.common.data.page.PageData; |
64 | 64 | import org.thingsboard.server.common.data.page.PageLink; |
65 | 65 | import org.thingsboard.server.common.data.rule.RuleChain; |
66 | +import org.thingsboard.server.common.msg.queue.ServiceType; | |
66 | 67 | import org.thingsboard.server.dao.dashboard.DashboardService; |
67 | 68 | import org.thingsboard.server.dao.entity.AbstractEntityService; |
68 | 69 | import org.thingsboard.server.dao.exception.DataValidationException; |
... | ... | @@ -72,6 +73,7 @@ import org.thingsboard.server.dao.service.DataValidator; |
72 | 73 | import org.thingsboard.server.dao.service.PaginatedRemover; |
73 | 74 | import org.thingsboard.server.dao.service.Validator; |
74 | 75 | import org.thingsboard.server.dao.tenant.TenantDao; |
76 | +import org.thingsboard.server.queue.QueueService; | |
75 | 77 | |
76 | 78 | import java.util.Arrays; |
77 | 79 | import java.util.Collections; |
... | ... | @@ -82,7 +84,6 @@ import java.util.concurrent.locks.Lock; |
82 | 84 | import java.util.concurrent.locks.ReentrantLock; |
83 | 85 | import java.util.stream.Collectors; |
84 | 86 | |
85 | -import static com.google.protobuf.FieldType.MESSAGE; | |
86 | 87 | import static org.thingsboard.server.common.data.CacheConstants.DEVICE_PROFILE_CACHE; |
87 | 88 | import static org.thingsboard.server.dao.service.Validator.validateId; |
88 | 89 | |
... | ... | @@ -104,6 +105,9 @@ public class DeviceProfileServiceImpl extends AbstractEntityService implements D |
104 | 105 | return "[Transport Configuration] invalid " + schemaName + " provided!"; |
105 | 106 | } |
106 | 107 | |
108 | + @Autowired(required = false) | |
109 | + private QueueService queueService; | |
110 | + | |
107 | 111 | @Autowired |
108 | 112 | private DeviceProfileDao deviceProfileDao; |
109 | 113 | |
... | ... | @@ -373,6 +377,11 @@ public class DeviceProfileServiceImpl extends AbstractEntityService implements D |
373 | 377 | throw new DataValidationException("Another default device profile is present in scope of current tenant!"); |
374 | 378 | } |
375 | 379 | } |
380 | + if (!StringUtils.isEmpty(deviceProfile.getDefaultQueueName()) && queueService != null){ | |
381 | + if(!queueService.getQueuesByServiceType(ServiceType.TB_RULE_ENGINE).contains(deviceProfile.getDefaultQueueName())){ | |
382 | + throw new DataValidationException("Device profile is referencing to non-existent queue!"); | |
383 | + } | |
384 | + } | |
376 | 385 | if (deviceProfile.getProvisionType() == null) { |
377 | 386 | deviceProfile.setProvisionType(DeviceProfileProvisionType.DISABLED); |
378 | 387 | } | ... | ... |
... | ... | @@ -187,6 +187,7 @@ export class AddDeviceProfileDialogComponent extends |
187 | 187 | name: this.deviceProfileDetailsFormGroup.get('name').value, |
188 | 188 | type: this.deviceProfileDetailsFormGroup.get('type').value, |
189 | 189 | image: this.deviceProfileDetailsFormGroup.get('image').value, |
190 | + defaultQueueName: this.deviceProfileDetailsFormGroup.get('defaultQueueName').value, | |
190 | 191 | transportType: this.transportConfigFormGroup.get('transportType').value, |
191 | 192 | provisionType: deviceProvisionConfiguration.type, |
192 | 193 | provisionDeviceKey, | ... | ... |
... | ... | @@ -155,12 +155,7 @@ export class QueueTypeListComponent implements ControlValueAccessor, OnInit, Aft |
155 | 155 | return searchText ? queue.toUpperCase().startsWith(searchText.toUpperCase()) : true; |
156 | 156 | }); |
157 | 157 | if (result.length) { |
158 | - if (searchText && searchText.length && result.indexOf(searchText) === -1) { | |
159 | - result.push(searchText); | |
160 | - } | |
161 | 158 | result.sort(); |
162 | - } else if (searchText && searchText.length) { | |
163 | - result.push(searchText); | |
164 | 159 | } |
165 | 160 | return result; |
166 | 161 | }) | ... | ... |