Showing
24 changed files
with
159 additions
and
107 deletions
... | ... | @@ -42,21 +42,26 @@ import org.thingsboard.server.common.msg.queue.RuleEngineException; |
42 | 42 | import org.thingsboard.server.common.msg.queue.ServiceType; |
43 | 43 | import org.thingsboard.server.dao.model.ModelConstants; |
44 | 44 | import org.thingsboard.server.dao.tenant.TenantService; |
45 | +import org.thingsboard.server.service.transport.msg.TransportToDeviceActorMsgWrapper; | |
45 | 46 | import scala.concurrent.duration.Duration; |
46 | 47 | |
48 | +import java.util.HashSet; | |
47 | 49 | import java.util.Optional; |
50 | +import java.util.Set; | |
48 | 51 | |
49 | 52 | public class AppActor extends ContextAwareActor { |
50 | 53 | |
51 | 54 | private static final TenantId SYSTEM_TENANT = new TenantId(ModelConstants.NULL_UUID); |
52 | 55 | private final TenantService tenantService; |
53 | 56 | private final BiMap<TenantId, ActorRef> tenantActors; |
57 | + private final Set<TenantId> deletedTenants; | |
54 | 58 | private boolean ruleChainsInitialized; |
55 | 59 | |
56 | 60 | private AppActor(ActorSystemContext systemContext) { |
57 | 61 | super(systemContext); |
58 | 62 | this.tenantService = systemContext.getTenantService(); |
59 | 63 | this.tenantActors = HashBiMap.create(); |
64 | + this.deletedTenants = new HashSet<>(); | |
60 | 65 | } |
61 | 66 | |
62 | 67 | @Override |
... | ... | @@ -139,7 +144,11 @@ public class AppActor extends ContextAwareActor { |
139 | 144 | if (SYSTEM_TENANT.equals(msg.getTenantId())) { |
140 | 145 | msg.getTbMsg().getCallback().onFailure(new RuleEngineException("Message has system tenant id!")); |
141 | 146 | } else { |
142 | - getOrCreateTenantActor(msg.getTenantId()).tell(msg, self()); | |
147 | + if (!deletedTenants.contains(msg.getTenantId())) { | |
148 | + getOrCreateTenantActor(msg.getTenantId()).tell(msg, self()); | |
149 | + } else { | |
150 | + msg.getTbMsg().getCallback().onSuccess(); | |
151 | + } | |
143 | 152 | } |
144 | 153 | } |
145 | 154 | |
... | ... | @@ -154,8 +163,10 @@ public class AppActor extends ContextAwareActor { |
154 | 163 | } else { |
155 | 164 | if (msg.getEntityId().getEntityType() == EntityType.TENANT |
156 | 165 | && msg.getEvent() == ComponentLifecycleEvent.DELETED) { |
157 | - log.debug("[{}] Handling tenant deleted notification: {}", msg.getTenantId(), msg); | |
158 | - ActorRef tenantActor = tenantActors.remove(new TenantId(msg.getEntityId().getId())); | |
166 | + log.info("[{}] Handling tenant deleted notification: {}", msg.getTenantId(), msg); | |
167 | + TenantId tenantId = new TenantId(msg.getEntityId().getId()); | |
168 | + deletedTenants.add(tenantId); | |
169 | + ActorRef tenantActor = tenantActors.get(tenantId); | |
159 | 170 | if (tenantActor != null) { |
160 | 171 | log.debug("[{}] Deleting tenant actor: {}", msg.getTenantId(), tenantActor); |
161 | 172 | context().stop(tenantActor); |
... | ... | @@ -172,16 +183,22 @@ public class AppActor extends ContextAwareActor { |
172 | 183 | } |
173 | 184 | |
174 | 185 | private void onToDeviceActorMsg(TenantAwareMsg msg) { |
175 | - getOrCreateTenantActor(msg.getTenantId()).tell(msg, ActorRef.noSender()); | |
186 | + if (!deletedTenants.contains(msg.getTenantId())) { | |
187 | + getOrCreateTenantActor(msg.getTenantId()).tell(msg, ActorRef.noSender()); | |
188 | + } else { | |
189 | + if (msg instanceof TransportToDeviceActorMsgWrapper) { | |
190 | + ((TransportToDeviceActorMsgWrapper) msg).getCallback().onSuccess(); | |
191 | + } | |
192 | + } | |
176 | 193 | } |
177 | 194 | |
178 | 195 | private ActorRef getOrCreateTenantActor(TenantId tenantId) { |
179 | 196 | return tenantActors.computeIfAbsent(tenantId, k -> { |
180 | - log.debug("[{}] Creating tenant actor.", tenantId); | |
197 | + log.info("[{}] Creating tenant actor.", tenantId); | |
181 | 198 | ActorRef tenantActor = context().actorOf(Props.create(new TenantActor.ActorCreator(systemContext, tenantId)) |
182 | 199 | .withDispatcher(DefaultActorService.CORE_DISPATCHER_NAME), tenantId.toString()); |
183 | 200 | context().watch(tenantActor); |
184 | - log.debug("[{}] Created tenant actor: {}.", tenantId, tenantActor); | |
201 | + log.info("[{}] Created tenant actor: {}.", tenantId, tenantActor); | |
185 | 202 | return tenantActor; |
186 | 203 | }); |
187 | 204 | } | ... | ... |
... | ... | @@ -66,6 +66,8 @@ public class TenantActor extends RuleChainManagerActor { |
66 | 66 | return strategy; |
67 | 67 | } |
68 | 68 | |
69 | + boolean cantFindTenant = false; | |
70 | + | |
69 | 71 | @Override |
70 | 72 | public void preStart() { |
71 | 73 | log.info("[{}] Starting tenant actor.", tenantId); |
... | ... | @@ -78,10 +80,14 @@ public class TenantActor extends RuleChainManagerActor { |
78 | 80 | isCore = systemContext.getServiceInfoProvider().isService(ServiceType.TB_CORE); |
79 | 81 | |
80 | 82 | if (isRuleEngineForCurrentTenant) { |
81 | - if (isolatedTenantId.map(id -> id.equals(tenantId)).orElseGet(() -> !tenant.isIsolatedTbRuleEngine())) { | |
82 | - initRuleChains(); | |
83 | - } else { | |
84 | - isRuleEngineForCurrentTenant = false; | |
83 | + try { | |
84 | + if (isolatedTenantId.map(id -> id.equals(tenantId)).orElseGet(() -> !tenant.isIsolatedTbRuleEngine())) { | |
85 | + initRuleChains(); | |
86 | + } else { | |
87 | + isRuleEngineForCurrentTenant = false; | |
88 | + } | |
89 | + } catch (Exception e) { | |
90 | + cantFindTenant = true; | |
85 | 91 | } |
86 | 92 | } |
87 | 93 | log.info("[{}] Tenant actor started.", tenantId); |
... | ... | @@ -97,6 +103,9 @@ public class TenantActor extends RuleChainManagerActor { |
97 | 103 | |
98 | 104 | @Override |
99 | 105 | protected boolean process(TbActorMsg msg) { |
106 | + if (cantFindTenant) { | |
107 | + log.info("Missing Tenant msg: {}", msg); | |
108 | + } | |
100 | 109 | switch (msg.getMsgType()) { |
101 | 110 | case PARTITION_CHANGE_MSG: |
102 | 111 | PartitionChangeMsg partitionChangeMsg = (PartitionChangeMsg) msg; | ... | ... |
... | ... | @@ -34,6 +34,7 @@ import org.thingsboard.server.common.data.Device; |
34 | 34 | import org.thingsboard.server.common.data.EntityType; |
35 | 35 | import org.thingsboard.server.common.data.EntityView; |
36 | 36 | import org.thingsboard.server.common.data.HasName; |
37 | +import org.thingsboard.server.common.data.HasTenantId; | |
37 | 38 | import org.thingsboard.server.common.data.Tenant; |
38 | 39 | import org.thingsboard.server.common.data.User; |
39 | 40 | import org.thingsboard.server.common.data.alarm.Alarm; |
... | ... | @@ -667,7 +668,13 @@ public abstract class BaseController { |
667 | 668 | } |
668 | 669 | } |
669 | 670 | TbMsg tbMsg = TbMsg.newMsg(msgType, entityId, metaData, TbMsgDataType.JSON, json.writeValueAsString(entityNode)); |
670 | - tbClusterService.pushMsgToRuleEngine(user.getTenantId(), entityId, tbMsg, null); | |
671 | + TenantId tenantId = user.getTenantId(); | |
672 | + if (tenantId.isNullUid()) { | |
673 | + if (entity instanceof HasTenantId) { | |
674 | + tenantId = ((HasTenantId) entity).getTenantId(); | |
675 | + } | |
676 | + } | |
677 | + tbClusterService.pushMsgToRuleEngine(tenantId, entityId, tbMsg, null); | |
671 | 678 | } catch (Exception e) { |
672 | 679 | log.warn("[{}] Failed to push entity action to rule engine: {}", entityId, actionType, e); |
673 | 680 | } | ... | ... |
... | ... | @@ -325,6 +325,8 @@ public class DefaultDeviceStateService implements DeviceStateService { |
325 | 325 | }); |
326 | 326 | }); |
327 | 327 | |
328 | + addedPartitions.forEach(tpi -> partitionedDevices.computeIfAbsent(tpi, key -> ConcurrentHashMap.newKeySet())); | |
329 | + | |
328 | 330 | //TODO 3.0: replace this dummy search with new functionality to search by partitions using SQL capabilities. |
329 | 331 | // Adding only devices that are in new partitions |
330 | 332 | List<Tenant> tenants = tenantService.findTenants(new TextPageLink(Integer.MAX_VALUE)).getData(); | ... | ... |
... | ... | @@ -27,6 +27,7 @@ import org.thingsboard.server.common.data.kv.JsonDataEntry; |
27 | 27 | import org.thingsboard.server.common.data.kv.LongDataEntry; |
28 | 28 | import org.thingsboard.server.common.data.kv.TsKvEntry; |
29 | 29 | import org.thingsboard.server.dao.asset.AssetService; |
30 | +import org.thingsboard.server.dao.exception.DataValidationException; | |
30 | 31 | import org.thingsboard.server.queue.discovery.TbServiceInfoProvider; |
31 | 32 | import org.thingsboard.server.queue.util.TbRuleEngineComponent; |
32 | 33 | import org.thingsboard.server.service.queue.TbRuleEngineConsumerStats; |
... | ... | @@ -76,13 +77,19 @@ public class DefaultRuleEngineStatisticsService implements RuleEngineStatisticsS |
76 | 77 | String queueName = ruleEngineStats.getQueueName(); |
77 | 78 | ruleEngineStats.getTenantStats().forEach((id, stats) -> { |
78 | 79 | TenantId tenantId = new TenantId(id); |
79 | - AssetId serviceAssetId = getServiceAssetId(tenantId, queueName); | |
80 | - if (stats.getTotalMsgCounter().get() > 0) { | |
81 | - List<TsKvEntry> tsList = stats.getCounters().entrySet().stream() | |
82 | - .map(kv -> new BasicTsKvEntry(ts, new LongDataEntry(kv.getKey(), (long) kv.getValue().get()))) | |
83 | - .collect(Collectors.toList()); | |
84 | - if (!tsList.isEmpty()) { | |
85 | - tsService.saveAndNotify(tenantId, serviceAssetId, tsList, CALLBACK); | |
80 | + try { | |
81 | + AssetId serviceAssetId = getServiceAssetId(tenantId, queueName); | |
82 | + if (stats.getTotalMsgCounter().get() > 0) { | |
83 | + List<TsKvEntry> tsList = stats.getCounters().entrySet().stream() | |
84 | + .map(kv -> new BasicTsKvEntry(ts, new LongDataEntry(kv.getKey(), (long) kv.getValue().get()))) | |
85 | + .collect(Collectors.toList()); | |
86 | + if (!tsList.isEmpty()) { | |
87 | + tsService.saveAndNotify(tenantId, serviceAssetId, tsList, CALLBACK); | |
88 | + } | |
89 | + } | |
90 | + } catch (DataValidationException e) { | |
91 | + if (!e.getMessage().equalsIgnoreCase("Asset is referencing to non-existent tenant!")) { | |
92 | + throw e; | |
86 | 93 | } |
87 | 94 | } |
88 | 95 | }); | ... | ... |
... | ... | @@ -27,7 +27,6 @@ |
27 | 27 | |
28 | 28 | <logger name="org.thingsboard.server" level="INFO" /> |
29 | 29 | <logger name="akka" level="INFO" /> |
30 | - <logger name="org.springframework.boot.autoconfigure.logging" level="DEBUG" /> | |
31 | 30 | |
32 | 31 | <!-- <logger name="org.thingsboard.server.service.queue" level="TRACE" />--> |
33 | 32 | <!-- <logger name="org.thingsboard.server.service.transport" level="TRACE" />--> | ... | ... |
... | ... | @@ -31,7 +31,7 @@ server: |
31 | 31 | key-store-type: "${SSL_KEY_STORE_TYPE:PKCS12}" |
32 | 32 | # Alias that identifies the key in the key store |
33 | 33 | key-alias: "${SSL_KEY_ALIAS:tomcat}" |
34 | - log_controller_error_stack_trace: "${HTTP_LOG_CONTROLLER_ERROR_STACK_TRACE:true}" | |
34 | + log_controller_error_stack_trace: "${HTTP_LOG_CONTROLLER_ERROR_STACK_TRACE:false}" | |
35 | 35 | ws: |
36 | 36 | send_timeout: "${TB_SERVER_WS_SEND_TIMEOUT:5000}" |
37 | 37 | limits: |
... | ... | @@ -412,7 +412,7 @@ audit-log: |
412 | 412 | state: |
413 | 413 | defaultInactivityTimeoutInSec: "${DEFAULT_INACTIVITY_TIMEOUT:10}" |
414 | 414 | defaultStateCheckIntervalInSec: "${DEFAULT_STATE_CHECK_INTERVAL:10}" |
415 | - persistToTelemetry: "${PERSIST_STATE_TO_TELEMETRY:true}" | |
415 | + persistToTelemetry: "${PERSIST_STATE_TO_TELEMETRY:false}" | |
416 | 416 | |
417 | 417 | js: |
418 | 418 | evaluator: "${JS_EVALUATOR:local}" # local/remote |
... | ... | @@ -513,7 +513,7 @@ swagger: |
513 | 513 | version: "${SWAGGER_VERSION:2.0}" |
514 | 514 | |
515 | 515 | queue: |
516 | - type: "${TB_QUEUE_TYPE:in-memory}" # kafka or in-memory or aws-sqs or pubsub or service-bus | |
516 | + type: "${TB_QUEUE_TYPE:in-memory}" # in-memory or kafka (Apache Kafka) or aws-sqs (AWS SQS) or pubsub (PubSub) or service-bus (Azure Service Bus) or rabbitmq (RabbitMQ) | |
517 | 517 | kafka: |
518 | 518 | bootstrap.servers: "${TB_KAFKA_SERVERS:localhost:9092}" |
519 | 519 | acks: "${TB_KAFKA_ACKS:all}" |
... | ... | @@ -588,7 +588,7 @@ queue: |
588 | 588 | enabled: "${TB_QUEUE_RULE_ENGINE_STATS_ENABLED:true}" |
589 | 589 | print-interval-ms: "${TB_QUEUE_RULE_ENGINE_STATS_PRINT_INTERVAL_MS:10000}" |
590 | 590 | queues: |
591 | - - name: "Main" | |
591 | + - name: "${TB_QUEUE_RE_MAIN_QUEUE_NAME:Main}" | |
592 | 592 | topic: "${TB_QUEUE_RE_MAIN_TOPIC:tb_rule_engine.main}" |
593 | 593 | poll-interval: "${TB_QUEUE_RE_MAIN_POLL_INTERVAL_MS:25}" |
594 | 594 | partitions: "${TB_QUEUE_RE_MAIN_PARTITIONS:10}" | ... | ... |
... | ... | @@ -33,6 +33,7 @@ import org.junit.rules.TestRule; |
33 | 33 | import org.junit.rules.TestWatcher; |
34 | 34 | import org.junit.runner.Description; |
35 | 35 | import org.junit.runner.RunWith; |
36 | +import org.mockito.Mockito; | |
36 | 37 | import org.springframework.beans.factory.annotation.Autowired; |
37 | 38 | import org.springframework.boot.test.context.SpringBootContextLoader; |
38 | 39 | import org.springframework.boot.test.context.SpringBootTest; |
... | ... | @@ -197,6 +198,7 @@ public abstract class AbstractControllerTest { |
197 | 198 | createUserAndLogin(customerUser, CUSTOMER_USER_PASSWORD); |
198 | 199 | |
199 | 200 | logout(); |
201 | + | |
200 | 202 | log.info("Executed setup"); |
201 | 203 | } |
202 | 204 | ... | ... |
... | ... | @@ -17,6 +17,7 @@ package org.thingsboard.server.controller; |
17 | 17 | |
18 | 18 | import com.datastax.driver.core.utils.UUIDs; |
19 | 19 | import com.fasterxml.jackson.core.type.TypeReference; |
20 | +import lombok.extern.slf4j.Slf4j; | |
20 | 21 | import org.apache.commons.lang3.RandomStringUtils; |
21 | 22 | import org.eclipse.paho.client.mqttv3.MqttAsyncClient; |
22 | 23 | import org.eclipse.paho.client.mqttv3.MqttConnectOptions; |
... | ... | @@ -24,6 +25,7 @@ import org.eclipse.paho.client.mqttv3.MqttMessage; |
24 | 25 | import org.junit.After; |
25 | 26 | import org.junit.Assert; |
26 | 27 | import org.junit.Before; |
28 | +import org.junit.Ignore; | |
27 | 29 | import org.junit.Test; |
28 | 30 | import org.thingsboard.server.common.data.Customer; |
29 | 31 | import org.thingsboard.server.common.data.Device; |
... | ... | @@ -46,6 +48,7 @@ import java.util.HashSet; |
46 | 48 | import java.util.List; |
47 | 49 | import java.util.Map; |
48 | 50 | import java.util.Set; |
51 | +import java.util.concurrent.TimeUnit; | |
49 | 52 | |
50 | 53 | import static org.hamcrest.Matchers.containsString; |
51 | 54 | import static org.junit.Assert.assertEquals; |
... | ... | @@ -55,6 +58,7 @@ import static org.junit.Assert.assertTrue; |
55 | 58 | import static org.springframework.test.web.servlet.result.MockMvcResultMatchers.status; |
56 | 59 | import static org.thingsboard.server.dao.model.ModelConstants.NULL_UUID; |
57 | 60 | |
61 | +@Slf4j | |
58 | 62 | public abstract class BaseEntityViewControllerTest extends AbstractControllerTest { |
59 | 63 | |
60 | 64 | private IdComparator<EntityView> idComparator; |
... | ... | @@ -417,12 +421,22 @@ public abstract class BaseEntityViewControllerTest extends AbstractControllerTes |
417 | 421 | MqttConnectOptions options = new MqttConnectOptions(); |
418 | 422 | options.setUserName(accessToken); |
419 | 423 | client.connect(options); |
420 | - Thread.sleep(3000); | |
421 | - | |
424 | + awaitConnected(client, TimeUnit.SECONDS.toMillis(30)); | |
422 | 425 | MqttMessage message = new MqttMessage(); |
423 | 426 | message.setPayload(strKvs.getBytes()); |
424 | 427 | client.publish("v1/devices/me/telemetry", message); |
425 | 428 | Thread.sleep(1000); |
429 | +// client.disconnect(); | |
430 | + } | |
431 | + | |
432 | + private void awaitConnected(MqttAsyncClient client, long ms) throws InterruptedException { | |
433 | + long start = System.currentTimeMillis(); | |
434 | + while (!client.isConnected()) { | |
435 | + Thread.sleep(100); | |
436 | + if (start + ms < System.currentTimeMillis()) { | |
437 | + throw new RuntimeException("Client is not connected!"); | |
438 | + } | |
439 | + } | |
426 | 440 | } |
427 | 441 | |
428 | 442 | private Set<String> getTelemetryKeys(String type, String id) throws Exception { | ... | ... |
... | ... | @@ -80,7 +80,7 @@ public abstract class AbstractMqttTelemetryIntegrationTest extends AbstractContr |
80 | 80 | String deviceId = savedDevice.getId().getId().toString(); |
81 | 81 | |
82 | 82 | Thread.sleep(1000); |
83 | - List<String> actualKeys = doGetAsync("/api/plugins/telemetry/DEVICE/" + deviceId + "/keys/timeseries", List.class); | |
83 | + List<String> actualKeys = doGetAsync("/api/plugins/telemetry/DEVICE/" + deviceId + "/keys/timeseries", List.class); | |
84 | 84 | Set<String> actualKeySet = new HashSet<>(actualKeys); |
85 | 85 | |
86 | 86 | List<String> expectedKeys = Arrays.asList("key1", "key2", "key3", "key4"); |
... | ... | @@ -88,7 +88,7 @@ public abstract class AbstractMqttTelemetryIntegrationTest extends AbstractContr |
88 | 88 | |
89 | 89 | assertEquals(expectedKeySet, actualKeySet); |
90 | 90 | |
91 | - String getTelemetryValuesUrl = "/api/plugins/telemetry/DEVICE/" + deviceId + "/values/timeseries?keys=" + String.join(",", actualKeySet); | |
91 | + String getTelemetryValuesUrl = "/api/plugins/telemetry/DEVICE/" + deviceId + "/values/timeseries?keys=" + String.join(",", actualKeySet); | |
92 | 92 | Map<String, List<Map<String, String>>> values = doGetAsync(getTelemetryValuesUrl, Map.class); |
93 | 93 | |
94 | 94 | assertEquals("value1", values.get("key1").get(0).get("value")); |
... | ... | @@ -104,13 +104,17 @@ public abstract class AbstractMqttTelemetryIntegrationTest extends AbstractContr |
104 | 104 | |
105 | 105 | MqttConnectOptions options = new MqttConnectOptions(); |
106 | 106 | options.setUserName(accessToken); |
107 | - client.connect(options).waitForCompletion(3000); | |
108 | 107 | CountDownLatch latch = new CountDownLatch(1); |
109 | 108 | TestMqttCallback callback = new TestMqttCallback(client, latch); |
110 | 109 | client.setCallback(callback); |
110 | + client.connect(options).waitForCompletion(3000); | |
111 | 111 | client.subscribe("v1/devices/me/attributes", MqttQoS.AT_MOST_ONCE.value()); |
112 | 112 | String payload = "{\"key\":\"value\"}"; |
113 | - String result = doPostAsync("/api/plugins/telemetry/" + savedDevice.getId() + "/SHARED_SCOPE", payload, String.class, status().isOk()); | |
113 | +// TODO 3.1: we need to acknowledge subscription only after it is processed by device actor and not when the message is pushed to queue. | |
114 | +// MqttClient -> SUB REQUEST -> Transport -> Kafka -> Device Actor (subscribed) | |
115 | +// MqttClient <- SUB_ACK <- Transport | |
116 | + Thread.sleep(1000); | |
117 | + doPostAsync("/api/plugins/telemetry/" + savedDevice.getId() + "/SHARED_SCOPE", payload, String.class, status().isOk()); | |
114 | 118 | latch.await(10, TimeUnit.SECONDS); |
115 | 119 | assertEquals(payload, callback.getPayload()); |
116 | 120 | assertEquals(MqttQoS.AT_MOST_ONCE.value(), callback.getQoS()); |
... | ... | @@ -120,8 +124,8 @@ public abstract class AbstractMqttTelemetryIntegrationTest extends AbstractContr |
120 | 124 | |
121 | 125 | private final MqttAsyncClient client; |
122 | 126 | private final CountDownLatch latch; |
123 | - private Integer qoS; | |
124 | - private String payload; | |
127 | + private volatile Integer qoS; | |
128 | + private volatile String payload; | |
125 | 129 | |
126 | 130 | String getPayload() { |
127 | 131 | return payload; |
... | ... | @@ -138,6 +142,7 @@ public abstract class AbstractMqttTelemetryIntegrationTest extends AbstractContr |
138 | 142 | |
139 | 143 | @Override |
140 | 144 | public void connectionLost(Throwable throwable) { |
145 | + log.error("Client connection lost", throwable); | |
141 | 146 | } |
142 | 147 | |
143 | 148 | @Override | ... | ... |
... | ... | @@ -15,14 +15,17 @@ |
15 | 15 | */ |
16 | 16 | package org.thingsboard.server.rules.flow; |
17 | 17 | |
18 | +import akka.actor.ActorRef; | |
18 | 19 | import com.datastax.driver.core.utils.UUIDs; |
19 | 20 | import lombok.extern.slf4j.Slf4j; |
20 | 21 | import org.junit.After; |
21 | 22 | import org.junit.Assert; |
22 | 23 | import org.junit.Before; |
23 | 24 | import org.junit.Test; |
25 | +import org.mockito.Mockito; | |
24 | 26 | import org.springframework.beans.factory.annotation.Autowired; |
25 | 27 | import org.thingsboard.rule.engine.metadata.TbGetAttributesNodeConfiguration; |
28 | +import org.thingsboard.server.actors.ActorSystemContext; | |
26 | 29 | import org.thingsboard.server.actors.service.ActorService; |
27 | 30 | import org.thingsboard.server.common.data.*; |
28 | 31 | import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; |
... | ... | @@ -35,6 +38,8 @@ import org.thingsboard.server.common.data.security.Authority; |
35 | 38 | import org.thingsboard.server.common.msg.TbMsg; |
36 | 39 | import org.thingsboard.server.common.msg.TbMsgDataType; |
37 | 40 | import org.thingsboard.server.common.msg.TbMsgMetaData; |
41 | +import org.thingsboard.server.common.msg.queue.QueueToRuleEngineMsg; | |
42 | +import org.thingsboard.server.common.msg.queue.TbMsgCallback; | |
38 | 43 | import org.thingsboard.server.controller.AbstractRuleEngineControllerTest; |
39 | 44 | import org.thingsboard.server.dao.attributes.AttributesService; |
40 | 45 | |
... | ... | @@ -55,7 +60,7 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule |
55 | 60 | protected User tenantAdmin; |
56 | 61 | |
57 | 62 | @Autowired |
58 | - protected ActorService actorService; | |
63 | + protected ActorSystemContext actorSystem; | |
59 | 64 | |
60 | 65 | @Autowired |
61 | 66 | protected AttributesService attributesService; |
... | ... | @@ -142,15 +147,12 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule |
142 | 147 | |
143 | 148 | Thread.sleep(1000); |
144 | 149 | |
150 | + TbMsgCallback tbMsgCallback = Mockito.mock(TbMsgCallback.class); | |
151 | + TbMsg tbMsg = TbMsg.newMsg("CUSTOM", device.getId(), new TbMsgMetaData(), "{}", tbMsgCallback); | |
152 | + QueueToRuleEngineMsg qMsg = new QueueToRuleEngineMsg(savedTenant.getId(), tbMsg, null, null); | |
145 | 153 | // Pushing Message to the system |
146 | - TbMsg tbMsg = TbMsg.newMsg( | |
147 | - "CUSTOM", | |
148 | - device.getId(), | |
149 | - new TbMsgMetaData(), TbMsgDataType.JSON, "{}"); | |
150 | - //TODO 2.5 | |
151 | -// actorService.onMsg(new SendToClusterMsg(device.getId(), new QueueToRuleEngineMsg(savedTenant.getId(), tbMsg))); | |
152 | - | |
153 | - Thread.sleep(3000); | |
154 | + actorSystem.tell(qMsg, ActorRef.noSender()); | |
155 | + Mockito.verify(tbMsgCallback, Mockito.timeout(3000)).onSuccess(); | |
154 | 156 | |
155 | 157 | TimePageData<Event> eventsPage = getDebugEvents(savedTenant.getId(), ruleChain.getFirstRuleNodeId(), 1000); |
156 | 158 | List<Event> events = eventsPage.getData().stream().filter(filterByCustomEvent()).collect(Collectors.toList()); |
... | ... | @@ -257,17 +259,13 @@ public abstract class AbstractRuleEngineFlowIntegrationTest extends AbstractRule |
257 | 259 | |
258 | 260 | Thread.sleep(1000); |
259 | 261 | |
262 | + TbMsgCallback tbMsgCallback = Mockito.mock(TbMsgCallback.class); | |
263 | + TbMsg tbMsg = TbMsg.newMsg("CUSTOM", device.getId(), new TbMsgMetaData(), "{}", tbMsgCallback); | |
264 | + QueueToRuleEngineMsg qMsg = new QueueToRuleEngineMsg(savedTenant.getId(), tbMsg, null, null); | |
260 | 265 | // Pushing Message to the system |
261 | - TbMsg tbMsg = TbMsg.newMsg( | |
262 | - "CUSTOM", | |
263 | - device.getId(), | |
264 | - new TbMsgMetaData(), | |
265 | - TbMsgDataType.JSON, | |
266 | - "{}"); | |
267 | - //TODO 2.5 | |
268 | -// actorService.onMsg(new SendToClusterMsg(device.getId(), new QueueToRuleEngineMsg(savedTenant.getId(), tbMsg))); | |
269 | - | |
270 | - Thread.sleep(3000); | |
266 | + actorSystem.tell(qMsg, ActorRef.noSender()); | |
267 | + | |
268 | + Mockito.verify(tbMsgCallback, Mockito.timeout(3000)).onSuccess(); | |
271 | 269 | |
272 | 270 | TimePageData<Event> eventsPage = getDebugEvents(savedTenant.getId(), rootRuleChain.getFirstRuleNodeId(), 1000); |
273 | 271 | List<Event> events = eventsPage.getData().stream().filter(filterByCustomEvent()).collect(Collectors.toList()); | ... | ... |
... | ... | @@ -15,14 +15,17 @@ |
15 | 15 | */ |
16 | 16 | package org.thingsboard.server.rules.lifecycle; |
17 | 17 | |
18 | +import akka.actor.ActorRef; | |
18 | 19 | import com.datastax.driver.core.utils.UUIDs; |
19 | 20 | import lombok.extern.slf4j.Slf4j; |
20 | 21 | import org.junit.After; |
21 | 22 | import org.junit.Assert; |
22 | 23 | import org.junit.Before; |
23 | 24 | import org.junit.Test; |
25 | +import org.mockito.Mockito; | |
24 | 26 | import org.springframework.beans.factory.annotation.Autowired; |
25 | 27 | import org.thingsboard.rule.engine.metadata.TbGetAttributesNodeConfiguration; |
28 | +import org.thingsboard.server.actors.ActorSystemContext; | |
26 | 29 | import org.thingsboard.server.actors.service.ActorService; |
27 | 30 | import org.thingsboard.server.common.data.DataConstants; |
28 | 31 | import org.thingsboard.server.common.data.Device; |
... | ... | @@ -39,6 +42,8 @@ import org.thingsboard.server.common.data.security.Authority; |
39 | 42 | import org.thingsboard.server.common.msg.TbMsg; |
40 | 43 | import org.thingsboard.server.common.msg.TbMsgDataType; |
41 | 44 | import org.thingsboard.server.common.msg.TbMsgMetaData; |
45 | +import org.thingsboard.server.common.msg.queue.QueueToRuleEngineMsg; | |
46 | +import org.thingsboard.server.common.msg.queue.TbMsgCallback; | |
42 | 47 | import org.thingsboard.server.controller.AbstractRuleEngineControllerTest; |
43 | 48 | import org.thingsboard.server.dao.attributes.AttributesService; |
44 | 49 | |
... | ... | @@ -58,7 +63,7 @@ public abstract class AbstractRuleEngineLifecycleIntegrationTest extends Abstrac |
58 | 63 | protected User tenantAdmin; |
59 | 64 | |
60 | 65 | @Autowired |
61 | - protected ActorService actorService; | |
66 | + protected ActorSystemContext actorSystem; | |
62 | 67 | |
63 | 68 | @Autowired |
64 | 69 | protected AttributesService attributesService; |
... | ... | @@ -133,17 +138,13 @@ public abstract class AbstractRuleEngineLifecycleIntegrationTest extends Abstrac |
133 | 138 | |
134 | 139 | Thread.sleep(1000); |
135 | 140 | |
141 | + TbMsgCallback tbMsgCallback = Mockito.mock(TbMsgCallback.class); | |
142 | + TbMsg tbMsg = TbMsg.newMsg("CUSTOM", device.getId(), new TbMsgMetaData(), "{}", tbMsgCallback); | |
143 | + QueueToRuleEngineMsg qMsg = new QueueToRuleEngineMsg(savedTenant.getId(), tbMsg, null, null); | |
136 | 144 | // Pushing Message to the system |
137 | - TbMsg tbMsg = TbMsg.newMsg( | |
138 | - "CUSTOM", | |
139 | - device.getId(), | |
140 | - new TbMsgMetaData(), | |
141 | - TbMsgDataType.JSON, | |
142 | - "{}"); | |
143 | - //TODO 2.5 | |
144 | -// actorService.onMsg(new SendToClusterMsg(device.getId(), new QueueToRuleEngineMsg(savedTenant.getId(), tbMsg))); | |
145 | - | |
146 | - Thread.sleep(3000); | |
145 | + actorSystem.tell(qMsg, ActorRef.noSender()); | |
146 | + Mockito.verify(tbMsgCallback, Mockito.timeout(3000)).onSuccess(); | |
147 | + | |
147 | 148 | |
148 | 149 | TimePageData<Event> eventsPage = getDebugEvents(savedTenant.getId(), ruleChain.getFirstRuleNodeId(), 1000); |
149 | 150 | List<Event> events = eventsPage.getData().stream().filter(filterByCustomEvent()).collect(Collectors.toList()); | ... | ... |
... | ... | @@ -7,7 +7,7 @@ |
7 | 7 | </encoder> |
8 | 8 | </appender> |
9 | 9 | |
10 | - <logger name="org.thingsboard.server" level="WARN"/> | |
10 | + <logger name="org.thingsboard.server" level="INFO"/> | |
11 | 11 | <logger name="org.springframework" level="WARN"/> |
12 | 12 | <logger name="org.springframework.boot.test" level="WARN"/> |
13 | 13 | <logger name="org.apache.cassandra" level="WARN"/> | ... | ... |
... | ... | @@ -65,6 +65,10 @@ public final class TbMsg implements Serializable { |
65 | 65 | return new TbMsg(UUID.randomUUID(), type, originator, metaData.copy(), dataType, data, ruleChainId, ruleNodeId, TbMsgCallback.EMPTY); |
66 | 66 | } |
67 | 67 | |
68 | + public static TbMsg newMsg(String type, EntityId originator, TbMsgMetaData metaData, String data, TbMsgCallback callback) { | |
69 | + return new TbMsg(UUID.randomUUID(), type, originator, metaData.copy(), TbMsgDataType.JSON, data, null, null, callback); | |
70 | + } | |
71 | + | |
68 | 72 | public static TbMsg transformMsg(TbMsg origMsg, String type, EntityId originator, TbMsgMetaData metaData, String data) { |
69 | 73 | return new TbMsg(origMsg.getId(), type, originator, metaData.copy(), origMsg.getDataType(), |
70 | 74 | data, origMsg.getRuleChainId(), origMsg.getRuleNodeId(), origMsg.getCallback()); | ... | ... |
... | ... | @@ -95,14 +95,13 @@ public class DefaultTbQueueRequestTemplate<Request extends TbQueueMsg, Response |
95 | 95 | continue; |
96 | 96 | } |
97 | 97 | responses.forEach(response -> { |
98 | - log.trace("Received response to Queue Template request: {}", response); | |
99 | 98 | byte[] requestIdHeader = response.getHeaders().get(REQUEST_ID_HEADER); |
100 | 99 | UUID requestId; |
101 | 100 | if (requestIdHeader == null) { |
102 | 101 | log.error("[{}] Missing requestId in header and body", response); |
103 | 102 | } else { |
104 | 103 | requestId = bytesToUuid(requestIdHeader); |
105 | - log.trace("[{}] Response received", requestId); | |
104 | + log.trace("[{}] Response received: {}", requestId, response); | |
106 | 105 | ResponseMetaData<Response> expectedResponse = pendingRequests.remove(requestId); |
107 | 106 | if (expectedResponse == null) { |
108 | 107 | log.trace("[{}] Invalid or stale request", requestId); | ... | ... |
... | ... | @@ -41,7 +41,7 @@ public class TBKafkaAdmin implements TbQueueAdmin { |
41 | 41 | client = AdminClient.create(settings.toProps()); |
42 | 42 | } |
43 | 43 | |
44 | - //TODO 2.5 | |
44 | + //TODO 2.5 - ybondarenko Need to pass not only settings but also properties for topic creation. Somewhere in thingsboard.yml, in KV format. | |
45 | 45 | @Override |
46 | 46 | public void createTopicIfNotExists(String topic) { |
47 | 47 | try { |
... | ... | @@ -57,29 +57,6 @@ public class TBKafkaAdmin implements TbQueueAdmin { |
57 | 57 | log.warn("[{}] Failed to create topic", topic, e); |
58 | 58 | throw new RuntimeException(e); |
59 | 59 | } |
60 | -// | |
61 | -// KafkaFuture<TopicDescription> topicDescriptionFuture = client.describeTopics(Collections.singleton(topic)).values().get(topic); | |
62 | -// | |
63 | -// ListenableFuture<TopicDescription> topicFuture = JdkFutureAdapters.listenInPoolThread(topicDescriptionFuture); | |
64 | -// | |
65 | -// return Futures.transformAsync(topicFuture, topicDescription -> { | |
66 | -// KafkaFuture<Void> resultFuture = createTopic(new NewTopic(topic, 1, (short) 1)).values().get(topic); | |
67 | -// return JdkFutureAdapters.listenInPoolThread(resultFuture); | |
68 | -// }); | |
69 | - } | |
70 | - | |
71 | - public void waitForTopic(String topic, long timeout, TimeUnit timeoutUnit) throws InterruptedException, TimeoutException { | |
72 | - synchronized (this) { | |
73 | - long timeoutExpiredMs = System.currentTimeMillis() + timeoutUnit.toMillis(timeout); | |
74 | - while (!topicExists(topic)) { | |
75 | - long waitMs = timeoutExpiredMs - System.currentTimeMillis(); | |
76 | - if (waitMs <= 0) { | |
77 | - throw new TimeoutException("Timeout occurred while waiting for topic [" + topic + "] to be available!"); | |
78 | - } else { | |
79 | - wait(1000); | |
80 | - } | |
81 | - } | |
82 | - } | |
83 | 60 | } |
84 | 61 | |
85 | 62 | public CreateTopicsResult createTopic(NewTopic topic) { | ... | ... |
... | ... | @@ -60,16 +60,13 @@ public final class InMemoryStorage { |
60 | 60 | if (first != null) { |
61 | 61 | entities = new ArrayList<>(); |
62 | 62 | entities.add(first); |
63 | - } else { | |
64 | - entities = Collections.emptyList(); | |
65 | 63 | List<TbQueueMsg> otherList = new ArrayList<>(); |
66 | - storage.get(topic).drainTo(otherList, 100); | |
64 | + storage.get(topic).drainTo(otherList, 999); | |
67 | 65 | for (TbQueueMsg other : otherList) { |
68 | 66 | entities.add((T) other); |
69 | 67 | } |
70 | - } | |
71 | - if (entities.size() > 0) { | |
72 | - storage.computeIfAbsent(topic, (t) -> new LinkedBlockingQueue<>()).addAll(entities); | |
68 | + } else { | |
69 | + entities = Collections.emptyList(); | |
73 | 70 | } |
74 | 71 | return entities; |
75 | 72 | } catch (InterruptedException e) { | ... | ... |
... | ... | @@ -36,11 +36,12 @@ public class TbQueueRuleEngineSettings { |
36 | 36 | private String topic; |
37 | 37 | private List<TbRuleEngineQueueConfiguration> queues; |
38 | 38 | |
39 | - //TODO 2.5 ybondarenko: make sure the queue names are valid to all queue providers. See how ther are used in TbRuleEngineQueueFactory.createToRuleEngineMsgConsumer and all producers | |
39 | + //TODO 2.5 ybondarenko: make sure the queue names are valid to all queue providers. | |
40 | + // See how they are used in TbRuleEngineQueueFactory.createToRuleEngineMsgConsumer and all producers | |
40 | 41 | @PostConstruct |
41 | 42 | public void validate() { |
42 | 43 | queues.stream().filter(queue -> queue.getName().equals("Main")).findFirst().orElseThrow(() -> { |
43 | - log.warn("Main queue is not configured in thingsboard.yml"); | |
44 | + log.error("Main queue is not configured in thingsboard.yml"); | |
44 | 45 | return new RuntimeException("No \"Main\" queue configured!"); |
45 | 46 | }); |
46 | 47 | } | ... | ... |
... | ... | @@ -400,11 +400,11 @@ message ToRuleEngineNotificationMsg { |
400 | 400 | |
401 | 401 | /* Messages that are handled by ThingsBoard Transport Service */ |
402 | 402 | message ToTransportMsg { |
403 | - int64 sessionIdMSB = 1; | |
404 | - int64 sessionIdLSB = 2; | |
405 | - SessionCloseNotificationProto sessionCloseNotification = 3; | |
406 | - GetAttributeResponseMsg getAttributesResponse = 4; | |
407 | - AttributeUpdateNotificationMsg attributeUpdateNotification = 5; | |
408 | - ToDeviceRpcRequestMsg toDeviceRequest = 6; | |
409 | - ToServerRpcResponseMsg toServerResponse = 7; | |
403 | + int64 sessionIdMSB = 1; | |
404 | + int64 sessionIdLSB = 2; | |
405 | + SessionCloseNotificationProto sessionCloseNotification = 3; | |
406 | + GetAttributeResponseMsg getAttributesResponse = 4; | |
407 | + AttributeUpdateNotificationMsg attributeUpdateNotification = 5; | |
408 | + ToDeviceRpcRequestMsg toDeviceRequest = 6; | |
409 | + ToServerRpcResponseMsg toServerResponse = 7; | |
410 | 410 | } | ... | ... |
... | ... | @@ -16,6 +16,8 @@ spring.datasource.url=jdbc:hsqldb:file:/tmp/testDb;sql.enforce_size=false |
16 | 16 | spring.datasource.driverClassName=org.hsqldb.jdbc.JDBCDriver |
17 | 17 | spring.datasource.hikari.maximumPoolSize = 50 |
18 | 18 | |
19 | +service.type=monolith | |
20 | + | |
19 | 21 | #database.ts.type=timescale |
20 | 22 | #database.ts.type=sql |
21 | 23 | #database.entities.type=sql | ... | ... |
... | ... | @@ -25,6 +25,7 @@ import org.junit.Before; |
25 | 25 | import org.junit.Test; |
26 | 26 | import org.junit.runner.RunWith; |
27 | 27 | import org.mockito.ArgumentCaptor; |
28 | +import org.mockito.Captor; | |
28 | 29 | import org.mockito.Mock; |
29 | 30 | import org.mockito.runners.MockitoJUnitRunner; |
30 | 31 | import org.mockito.stubbing.Answer; |
... | ... | @@ -47,6 +48,7 @@ import org.thingsboard.server.dao.alarm.AlarmService; |
47 | 48 | import javax.script.ScriptException; |
48 | 49 | import java.io.IOException; |
49 | 50 | import java.util.concurrent.Callable; |
51 | +import java.util.function.Consumer; | |
50 | 52 | |
51 | 53 | import static org.junit.Assert.assertEquals; |
52 | 54 | import static org.junit.Assert.assertNotSame; |
... | ... | @@ -82,6 +84,11 @@ public class TbAlarmNodeTest { |
82 | 84 | @Mock |
83 | 85 | private ScriptEngine detailsJs; |
84 | 86 | |
87 | + @Captor | |
88 | + private ArgumentCaptor<Runnable> successCaptor; | |
89 | + @Captor | |
90 | + private ArgumentCaptor<Consumer<Throwable>> failureCaptor; | |
91 | + | |
85 | 92 | private RuleChainId ruleChainId = new RuleChainId(UUIDs.timeBased()); |
86 | 93 | private RuleNodeId ruleNodeId = new RuleNodeId(UUIDs.timeBased()); |
87 | 94 | |
... | ... | @@ -119,11 +126,12 @@ public class TbAlarmNodeTest { |
119 | 126 | |
120 | 127 | when(detailsJs.executeJsonAsync(msg)).thenReturn(Futures.immediateFuture(null)); |
121 | 128 | when(alarmService.findLatestByOriginatorAndType(tenantId, originator, "SomeType")).thenReturn(Futures.immediateFuture(null)); |
122 | - | |
123 | 129 | doAnswer((Answer<Alarm>) invocationOnMock -> (Alarm) (invocationOnMock.getArguments())[0]).when(alarmService).createOrUpdateAlarm(any(Alarm.class)); |
124 | 130 | |
125 | 131 | node.onMsg(ctx, msg); |
126 | 132 | |
133 | + verify(ctx).enqueue(any(), successCaptor.capture(), failureCaptor.capture()); | |
134 | + successCaptor.getValue().run(); | |
127 | 135 | verify(ctx).tellNext(any(), eq("Created")); |
128 | 136 | |
129 | 137 | ArgumentCaptor<TbMsg> msgCaptor = ArgumentCaptor.forClass(TbMsg.class); |
... | ... | @@ -191,6 +199,8 @@ public class TbAlarmNodeTest { |
191 | 199 | |
192 | 200 | node.onMsg(ctx, msg); |
193 | 201 | |
202 | + verify(ctx).enqueue(any(), successCaptor.capture(), failureCaptor.capture()); | |
203 | + successCaptor.getValue().run(); | |
194 | 204 | verify(ctx).tellNext(any(), eq("Created")); |
195 | 205 | |
196 | 206 | ArgumentCaptor<TbMsg> msgCaptor = ArgumentCaptor.forClass(TbMsg.class); | ... | ... |
... | ... | @@ -256,7 +256,7 @@ public class TbGetCustomerAttributeNodeTest { |
256 | 256 | .thenReturn(Futures.immediateFuture(timeseries)); |
257 | 257 | |
258 | 258 | node.onMsg(ctx, msg); |
259 | - verify(ctx).tellNext(msg, SUCCESS); | |
259 | + verify(ctx).tellSuccess(msg); | |
260 | 260 | assertEquals(msg.getMetaData().getValue("tempo"), "highest"); |
261 | 261 | } |
262 | 262 | |
... | ... | @@ -268,7 +268,7 @@ public class TbGetCustomerAttributeNodeTest { |
268 | 268 | .thenReturn(Futures.immediateFuture(attributes)); |
269 | 269 | |
270 | 270 | node.onMsg(ctx, msg); |
271 | - verify(ctx).tellNext(msg, SUCCESS); | |
271 | + verify(ctx).tellSuccess(msg); | |
272 | 272 | assertEquals(msg.getMetaData().getValue("tempo"), "high"); |
273 | 273 | } |
274 | 274 | } |
\ No newline at end of file | ... | ... |
... | ... | @@ -77,7 +77,7 @@ public class TbTransformMsgNodeTest { |
77 | 77 | node.onMsg(ctx, msg); |
78 | 78 | verify(ctx).getDbCallbackExecutor(); |
79 | 79 | ArgumentCaptor<TbMsg> captor = ArgumentCaptor.forClass(TbMsg.class); |
80 | - verify(ctx).tellNext(captor.capture(), eq(SUCCESS)); | |
80 | + verify(ctx).tellSuccess(captor.capture()); | |
81 | 81 | TbMsg actualMsg = captor.getValue(); |
82 | 82 | assertEquals(transformedMsg, actualMsg); |
83 | 83 | } | ... | ... |