Commit b7b1d849804f14a861ebde475d1337a66a604861

Authored by xp.Huang
2 parents 814fae23 9906e21c

Merge branch '20220214' into 'master'

refactor: 命令下发记录命令类型

See merge request yunteng/thingskit!158
Showing 27 changed files with 1243 additions and 22 deletions
@@ -58,6 +58,8 @@ import org.thingsboard.server.common.data.rpc.RpcStatus; @@ -58,6 +58,8 @@ import org.thingsboard.server.common.data.rpc.RpcStatus;
58 import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody; 58 import org.thingsboard.server.common.data.rpc.ToDeviceRpcRequestBody;
59 import org.thingsboard.server.common.data.security.DeviceCredentials; 59 import org.thingsboard.server.common.data.security.DeviceCredentials;
60 import org.thingsboard.server.common.data.security.DeviceCredentialsType; 60 import org.thingsboard.server.common.data.security.DeviceCredentialsType;
  61 +import org.thingsboard.server.common.data.yunteng.constant.ModelConstants;
  62 +import org.thingsboard.server.common.data.yunteng.enums.CmdTypeEnum;
61 import org.thingsboard.server.common.msg.TbActorMsg; 63 import org.thingsboard.server.common.msg.TbActorMsg;
62 import org.thingsboard.server.common.msg.TbMsgMetaData; 64 import org.thingsboard.server.common.msg.TbMsgMetaData;
63 import org.thingsboard.server.common.msg.queue.TbCallback; 65 import org.thingsboard.server.common.msg.queue.TbCallback;
@@ -251,8 +253,8 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor { @@ -251,8 +253,8 @@ class DeviceActorMessageProcessor extends AbstractContextAwareMsgProcessor {
251 //Thingskit function 253 //Thingskit function
252 JsonNode old = JacksonUtil.toJsonNode(request.getAdditionalInfo()); 254 JsonNode old = JacksonUtil.toJsonNode(request.getAdditionalInfo());
253 ObjectNode additional = (old == null || old.isEmpty()) ?mapper.createObjectNode():(ObjectNode)old; 255 ObjectNode additional = (old == null || old.isEmpty()) ?mapper.createObjectNode():(ObjectNode)old;
254 - if(!additional.has("cmdType")){  
255 - additional.put("cmdType", DeviceTransportType.MQTT.name()); 256 + if(!additional.has(ModelConstants.TablePropertyMapping.COMMAND_TYPE)){
  257 + additional.put(ModelConstants.TablePropertyMapping.COMMAND_TYPE, CmdTypeEnum.DIY.ordinal());
256 } 258 }
257 rpc.setAdditionalInfo(additional); 259 rpc.setAdditionalInfo(additional);
258 260
@@ -39,12 +39,15 @@ import org.thingsboard.server.common.data.kv.LongDataEntry; @@ -39,12 +39,15 @@ import org.thingsboard.server.common.data.kv.LongDataEntry;
39 import org.thingsboard.server.common.data.kv.StringDataEntry; 39 import org.thingsboard.server.common.data.kv.StringDataEntry;
40 import org.thingsboard.server.common.data.kv.TsKvEntry; 40 import org.thingsboard.server.common.data.kv.TsKvEntry;
41 import org.thingsboard.server.common.data.kv.TsKvLatestRemovingResult; 41 import org.thingsboard.server.common.data.kv.TsKvLatestRemovingResult;
  42 +import org.thingsboard.server.common.data.yunteng.dto.TkEventKvEntry;
  43 +import org.thingsboard.server.common.data.yunteng.enums.TkEventType;
42 import org.thingsboard.server.common.msg.queue.ServiceType; 44 import org.thingsboard.server.common.msg.queue.ServiceType;
43 import org.thingsboard.server.common.msg.queue.TbCallback; 45 import org.thingsboard.server.common.msg.queue.TbCallback;
44 import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; 46 import org.thingsboard.server.common.msg.queue.TopicPartitionInfo;
45 import org.thingsboard.server.dao.attributes.AttributesService; 47 import org.thingsboard.server.dao.attributes.AttributesService;
46 import org.thingsboard.server.dao.entityview.EntityViewService; 48 import org.thingsboard.server.dao.entityview.EntityViewService;
47 import org.thingsboard.server.dao.timeseries.TimeseriesService; 49 import org.thingsboard.server.dao.timeseries.TimeseriesService;
  50 +import org.thingsboard.server.dao.yunteng.event.TkEventsService;
48 import org.thingsboard.server.gen.transport.TransportProtos; 51 import org.thingsboard.server.gen.transport.TransportProtos;
49 import org.thingsboard.server.queue.discovery.PartitionService; 52 import org.thingsboard.server.queue.discovery.PartitionService;
50 import org.thingsboard.server.queue.usagestats.TbApiUsageClient; 53 import org.thingsboard.server.queue.usagestats.TbApiUsageClient;
@@ -74,6 +77,8 @@ import java.util.concurrent.Executors; @@ -74,6 +77,8 @@ import java.util.concurrent.Executors;
74 public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionService implements TelemetrySubscriptionService { 77 public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionService implements TelemetrySubscriptionService {
75 78
76 private final AttributesService attrService; 79 private final AttributesService attrService;
  80 +
  81 + private final TkEventsService eventsService;
77 private final TimeseriesService tsService; 82 private final TimeseriesService tsService;
78 private final EntityViewService entityViewService; 83 private final EntityViewService entityViewService;
79 private final TbApiUsageClient apiUsageClient; 84 private final TbApiUsageClient apiUsageClient;
@@ -82,6 +87,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer @@ -82,6 +87,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
82 private ExecutorService tsCallBackExecutor; 87 private ExecutorService tsCallBackExecutor;
83 88
84 public DefaultTelemetrySubscriptionService(AttributesService attrService, 89 public DefaultTelemetrySubscriptionService(AttributesService attrService,
  90 + TkEventsService eventsService,
85 TimeseriesService tsService, 91 TimeseriesService tsService,
86 EntityViewService entityViewService, 92 EntityViewService entityViewService,
87 TbClusterService clusterService, 93 TbClusterService clusterService,
@@ -90,6 +96,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer @@ -90,6 +96,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
90 TbApiUsageStateService apiUsageStateService) { 96 TbApiUsageStateService apiUsageStateService) {
91 super(clusterService, partitionService); 97 super(clusterService, partitionService);
92 this.attrService = attrService; 98 this.attrService = attrService;
  99 + this.eventsService = eventsService;
93 this.tsService = tsService; 100 this.tsService = tsService;
94 this.entityViewService = entityViewService; 101 this.entityViewService = entityViewService;
95 this.apiUsageClient = apiUsageClient; 102 this.apiUsageClient = apiUsageClient;
@@ -241,6 +248,20 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer @@ -241,6 +248,20 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
241 saveAndNotifyInternal(tenantId, entityId, scope, attributes, notifyDevice, callback); 248 saveAndNotifyInternal(tenantId, entityId, scope, attributes, notifyDevice, callback);
242 } 249 }
243 250
  251 +
  252 + //Thingskit function
  253 + @Override
  254 + public void saveAndNotify(TenantId tenantId, EntityId entityId, String eventId, TkEventType eventType, List<TkEventKvEntry> attributes, FutureCallback<Void> callback) {
  255 + checkInternalEntity(entityId);
  256 + saveAndNotifyInternal(tenantId, entityId, eventId,eventType, attributes, callback);
  257 + }
  258 + @Override
  259 + public void saveAndNotifyInternal(TenantId tenantId, EntityId entityId,String eventId, TkEventType eventType, List<TkEventKvEntry> attributes, FutureCallback<Void> callback) {
  260 + ListenableFuture<List<Void>> saveFuture = eventsService.save(tenantId, entityId,eventId,eventType, attributes);
  261 + addVoidCallback(saveFuture, callback);
  262 +// addWsCallback(saveFuture, success -> onAttributesUpdate(tenantId, entityId, scope, attributes, notifyDevice));
  263 + }
  264 +
244 @Override 265 @Override
245 public void saveAndNotifyInternal(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes, boolean notifyDevice, FutureCallback<Void> callback) { 266 public void saveAndNotifyInternal(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes, boolean notifyDevice, FutureCallback<Void> callback) {
246 ListenableFuture<List<Void>> saveFuture = attrService.save(tenantId, entityId, scope, attributes); 267 ListenableFuture<List<Void>> saveFuture = attrService.save(tenantId, entityId, scope, attributes);
@@ -21,6 +21,8 @@ import org.thingsboard.server.common.data.id.EntityId; @@ -21,6 +21,8 @@ import org.thingsboard.server.common.data.id.EntityId;
21 import org.thingsboard.server.common.data.id.TenantId; 21 import org.thingsboard.server.common.data.id.TenantId;
22 import org.thingsboard.server.common.data.kv.AttributeKvEntry; 22 import org.thingsboard.server.common.data.kv.AttributeKvEntry;
23 import org.thingsboard.server.common.data.kv.TsKvEntry; 23 import org.thingsboard.server.common.data.kv.TsKvEntry;
  24 +import org.thingsboard.server.common.data.yunteng.dto.TkEventKvEntry;
  25 +import org.thingsboard.server.common.data.yunteng.enums.TkEventType;
24 26
25 import java.util.List; 27 import java.util.List;
26 28
@@ -35,6 +37,9 @@ public interface InternalTelemetryService extends RuleEngineTelemetryService { @@ -35,6 +37,9 @@ public interface InternalTelemetryService extends RuleEngineTelemetryService {
35 37
36 void saveAndNotifyInternal(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes, boolean notifyDevice, FutureCallback<Void> callback); 38 void saveAndNotifyInternal(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes, boolean notifyDevice, FutureCallback<Void> callback);
37 39
  40 + //Thingskit function
  41 + void saveAndNotifyInternal(TenantId tenantId, EntityId entityId, String eventId, TkEventType eventType, List<TkEventKvEntry> attributes, FutureCallback<Void> callback);
  42 +
38 void saveLatestAndNotifyInternal(TenantId tenantId, EntityId entityId, List<TsKvEntry> ts, FutureCallback<Void> callback); 43 void saveLatestAndNotifyInternal(TenantId tenantId, EntityId entityId, List<TsKvEntry> ts, FutureCallback<Void> callback);
39 44
40 void deleteAndNotifyInternal(TenantId tenantId, EntityId entityId, String scope, List<String> keys, FutureCallback<Void> callback); 45 void deleteAndNotifyInternal(TenantId tenantId, EntityId entityId, String scope, List<String> keys, FutureCallback<Void> callback);
  1 +/**
  2 + * Copyright © 2016-2022 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.dao.yunteng.event;
  17 +
  18 +import com.google.common.util.concurrent.ListenableFuture;
  19 +import org.thingsboard.server.common.data.EntityType;
  20 +import org.thingsboard.server.common.data.id.DeviceProfileId;
  21 +import org.thingsboard.server.common.data.id.EntityId;
  22 +import org.thingsboard.server.common.data.id.TenantId;
  23 +import org.thingsboard.server.common.data.yunteng.dto.TkEventKvEntry;
  24 +import org.thingsboard.server.common.data.yunteng.enums.TkEventType;
  25 +
  26 +import java.util.Collection;
  27 +import java.util.List;
  28 +import java.util.Optional;
  29 +
  30 +/**
  31 + * @author Andrew Shvayka
  32 + */
  33 +public interface TkEventsService {
  34 +
  35 + ListenableFuture<Optional<TkEventKvEntry>> find(TenantId tenantId, EntityId entityId, String scope,String eventKey);
  36 +
  37 + ListenableFuture<List<TkEventKvEntry>> find(TenantId tenantId, EntityId entityId, String scope, Collection<String> eventKeys);
  38 +
  39 + ListenableFuture<List<TkEventKvEntry>> findAll(TenantId tenantId, EntityId entityId,String eventId, TkEventType eventType);
  40 +
  41 + ListenableFuture<List<Void>> save(TenantId tenantId, EntityId entityId, String eventId, TkEventType eventType, List<TkEventKvEntry> attributes);
  42 +
  43 + ListenableFuture<List<Void>> removeAll(TenantId tenantId, EntityId entityId, String eventId, TkEventType eventType, List<String> eventKeys);
  44 +
  45 + List<String> findAllKeysByDeviceProfileId(TenantId tenantId, DeviceProfileId deviceProfileId);
  46 +
  47 + List<String> findAllKeysByEntityIds(TenantId tenantId, EntityType entityType, List<EntityId> entityIds);
  48 +
  49 +}
@@ -131,6 +131,7 @@ public final class ModelConstants { @@ -131,6 +131,7 @@ public final class ModelConstants {
131 public static final String TB_DEVICE_ID = "tbDeviceId"; 131 public static final String TB_DEVICE_ID = "tbDeviceId";
132 public static final String DEVICE_STATE = "deviceState"; 132 public static final String DEVICE_STATE = "deviceState";
133 public static final String ACTIVE_TIME = "activeTime"; 133 public static final String ACTIVE_TIME = "activeTime";
  134 + public static final String COMMAND_TYPE = "cmdType";
134 public static final String ADDITIONALINFO_DESCRIPTION = "description"; 135 public static final String ADDITIONALINFO_DESCRIPTION = "description";
135 } 136 }
136 137
  1 +/**
  2 + * Copyright © 2016-2022 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.common.data.yunteng.dto;
  17 +
  18 +import org.thingsboard.server.common.data.kv.AttributeKvEntry;
  19 +import org.thingsboard.server.common.data.kv.DataType;
  20 +import org.thingsboard.server.common.data.kv.KvEntry;
  21 +
  22 +import javax.validation.Valid;
  23 +import java.util.Optional;
  24 +
  25 +/**
  26 + * @author Andrew Shvayka
  27 + */
  28 +public class TkEventKvDto implements TkEventKvEntry {
  29 +
  30 + private static final long serialVersionUID = -6460767583563159407L;
  31 +
  32 + private final long eventTime;
  33 + @Valid
  34 + private final KvEntry kv;
  35 +
  36 + public TkEventKvDto(KvEntry kv, long eventTime) {
  37 + this.kv = kv;
  38 + this.eventTime = eventTime;
  39 + }
  40 +
  41 + public TkEventKvDto(long lastUpdateTs, KvEntry kv) {
  42 + this(kv, lastUpdateTs);
  43 + }
  44 +
  45 + @Override
  46 + public long getEventTime() {
  47 + return eventTime;
  48 + }
  49 +
  50 + @Override
  51 + public String getKey() {
  52 + return kv.getKey();
  53 + }
  54 +
  55 + @Override
  56 + public DataType getDataType() {
  57 + return kv.getDataType();
  58 + }
  59 +
  60 + @Override
  61 + public Optional<String> getStrValue() {
  62 + return kv.getStrValue();
  63 + }
  64 +
  65 + @Override
  66 + public Optional<Long> getLongValue() {
  67 + return kv.getLongValue();
  68 + }
  69 +
  70 + @Override
  71 + public Optional<Boolean> getBooleanValue() {
  72 + return kv.getBooleanValue();
  73 + }
  74 +
  75 + @Override
  76 + public Optional<Double> getDoubleValue() {
  77 + return kv.getDoubleValue();
  78 + }
  79 +
  80 + @Override
  81 + public Optional<String> getJsonValue() {
  82 + return kv.getJsonValue();
  83 + }
  84 +
  85 + @Override
  86 + public String getValueAsString() {
  87 + return kv.getValueAsString();
  88 + }
  89 +
  90 + @Override
  91 + public Object getValue() {
  92 + return kv.getValue();
  93 + }
  94 +
  95 + @Override
  96 + public boolean equals(Object o) {
  97 + if (this == o) return true;
  98 + if (o == null || getClass() != o.getClass()) return false;
  99 +
  100 + TkEventKvDto that = (TkEventKvDto) o;
  101 +
  102 + if (eventTime != that.eventTime) return false;
  103 + return kv.equals(that.kv);
  104 +
  105 + }
  106 +
  107 + @Override
  108 + public int hashCode() {
  109 + int result = (int) (eventTime ^ (eventTime >>> 32));
  110 + result = 31 * result + kv.hashCode();
  111 + return result;
  112 + }
  113 +
  114 + @Override
  115 + public String toString() {
  116 + return "BaseAttributeKvEntry{" +
  117 + "lastUpdateTs=" + eventTime +
  118 + ", kv=" + kv +
  119 + '}';
  120 + }
  121 +}
  1 +/**
  2 + * Copyright © 2016-2022 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.common.data.yunteng.dto;
  17 +
  18 +import org.thingsboard.server.common.data.kv.KvEntry;
  19 +
  20 +/**
  21 + * @author Andrew Shvayka
  22 + */
  23 +public interface TkEventKvEntry extends KvEntry {
  24 +
  25 + long getEventTime();
  26 +
  27 +}
  1 +package org.thingsboard.server.common.data.yunteng.enums;
  2 +
  3 +/**
  4 + * 命令下发类型
  5 + */
  6 +public enum CmdTypeEnum {
  7 + //自定义
  8 + DIY,
  9 + //服务
  10 + SERVICE
  11 +}
@@ -34,6 +34,8 @@ import org.thingsboard.server.common.data.kv.JsonDataEntry; @@ -34,6 +34,8 @@ import org.thingsboard.server.common.data.kv.JsonDataEntry;
34 import org.thingsboard.server.common.data.kv.KvEntry; 34 import org.thingsboard.server.common.data.kv.KvEntry;
35 import org.thingsboard.server.common.data.kv.LongDataEntry; 35 import org.thingsboard.server.common.data.kv.LongDataEntry;
36 import org.thingsboard.server.common.data.kv.StringDataEntry; 36 import org.thingsboard.server.common.data.kv.StringDataEntry;
  37 +import org.thingsboard.server.common.data.yunteng.dto.TkEventKvDto;
  38 +import org.thingsboard.server.common.data.yunteng.dto.TkEventKvEntry;
37 import org.thingsboard.server.gen.transport.TransportProtos; 39 import org.thingsboard.server.gen.transport.TransportProtos;
38 import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg; 40 import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg;
39 import org.thingsboard.server.gen.transport.TransportProtos.ClaimDeviceMsg; 41 import org.thingsboard.server.gen.transport.TransportProtos.ClaimDeviceMsg;
@@ -516,6 +518,13 @@ public class JsonConverter { @@ -516,6 +518,13 @@ public class JsonConverter {
516 return result; 518 return result;
517 } 519 }
518 520
  521 + public static Set<TkEventKvEntry> convertToEvents(JsonElement element) {
  522 + Set<TkEventKvEntry> result = new HashSet<>();
  523 + long ts = System.currentTimeMillis();
  524 + result.addAll(parseValues(element.getAsJsonObject()).stream().map(kv -> new TkEventKvDto(kv, ts)).collect(Collectors.toList()));
  525 + return result;
  526 + }
  527 +
519 private static List<KvEntry> parseValues(JsonObject valuesObject) { 528 private static List<KvEntry> parseValues(JsonObject valuesObject) {
520 List<KvEntry> result = new ArrayList<>(); 529 List<KvEntry> result = new ArrayList<>();
521 for (Entry<String, JsonElement> valueEntry : valuesObject.entrySet()) { 530 for (Entry<String, JsonElement> valueEntry : valuesObject.entrySet()) {
@@ -293,8 +293,8 @@ public class DefaultTransportService implements TransportService { @@ -293,8 +293,8 @@ public class DefaultTransportService implements TransportService {
293 293
294 @Override 294 @Override
295 public TransportProtos.GetEntityProfileResponseMsg getEntityProfile(TransportProtos.GetEntityProfileRequestMsg msg) { 295 public TransportProtos.GetEntityProfileResponseMsg getEntityProfile(TransportProtos.GetEntityProfileRequestMsg msg) {
296 - TbProtoQueueMsg<TransportProtos.TransportApiRequestMsg> protoMsg =  
297 - new TbProtoQueueMsg<>(UUID.randomUUID(), TransportProtos.TransportApiRequestMsg.newBuilder().setEntityProfileRequestMsg(msg).build()); 296 + TbProtoQueueMsg<TransportApiRequestMsg> protoMsg =
  297 + new TbProtoQueueMsg<>(UUID.randomUUID(), TransportApiRequestMsg.newBuilder().setEntityProfileRequestMsg(msg).build());
298 try { 298 try {
299 TbProtoQueueMsg<TransportApiResponseMsg> response = transportApiRequestTemplate.send(protoMsg).get(); 299 TbProtoQueueMsg<TransportApiResponseMsg> response = transportApiRequestTemplate.send(protoMsg).get();
300 return response.getValue().getEntityProfileResponseMsg(); 300 return response.getValue().getEntityProfileResponseMsg();
@@ -305,8 +305,8 @@ public class DefaultTransportService implements TransportService { @@ -305,8 +305,8 @@ public class DefaultTransportService implements TransportService {
305 305
306 @Override 306 @Override
307 public TransportProtos.GetResourceResponseMsg getResource(TransportProtos.GetResourceRequestMsg msg) { 307 public TransportProtos.GetResourceResponseMsg getResource(TransportProtos.GetResourceRequestMsg msg) {
308 - TbProtoQueueMsg<TransportProtos.TransportApiRequestMsg> protoMsg =  
309 - new TbProtoQueueMsg<>(UUID.randomUUID(), TransportProtos.TransportApiRequestMsg.newBuilder().setResourceRequestMsg(msg).build()); 308 + TbProtoQueueMsg<TransportApiRequestMsg> protoMsg =
  309 + new TbProtoQueueMsg<>(UUID.randomUUID(), TransportApiRequestMsg.newBuilder().setResourceRequestMsg(msg).build());
310 try { 310 try {
311 TbProtoQueueMsg<TransportApiResponseMsg> response = transportApiRequestTemplate.send(protoMsg).get(); 311 TbProtoQueueMsg<TransportApiResponseMsg> response = transportApiRequestTemplate.send(protoMsg).get();
312 return response.getValue().getResourceResponseMsg(); 312 return response.getValue().getResourceResponseMsg();
@@ -317,8 +317,8 @@ public class DefaultTransportService implements TransportService { @@ -317,8 +317,8 @@ public class DefaultTransportService implements TransportService {
317 317
318 @Override 318 @Override
319 public TransportProtos.GetSnmpDevicesResponseMsg getSnmpDevicesIds(TransportProtos.GetSnmpDevicesRequestMsg requestMsg) { 319 public TransportProtos.GetSnmpDevicesResponseMsg getSnmpDevicesIds(TransportProtos.GetSnmpDevicesRequestMsg requestMsg) {
320 - TbProtoQueueMsg<TransportProtos.TransportApiRequestMsg> protoMsg = new TbProtoQueueMsg<>(  
321 - UUID.randomUUID(), TransportProtos.TransportApiRequestMsg.newBuilder() 320 + TbProtoQueueMsg<TransportApiRequestMsg> protoMsg = new TbProtoQueueMsg<>(
  321 + UUID.randomUUID(), TransportApiRequestMsg.newBuilder()
322 .setSnmpDevicesRequestMsg(requestMsg) 322 .setSnmpDevicesRequestMsg(requestMsg)
323 .build() 323 .build()
324 ); 324 );
@@ -334,7 +334,7 @@ public class DefaultTransportService implements TransportService { @@ -334,7 +334,7 @@ public class DefaultTransportService implements TransportService {
334 @Override 334 @Override
335 public TransportProtos.GetDeviceResponseMsg getDevice(TransportProtos.GetDeviceRequestMsg requestMsg) { 335 public TransportProtos.GetDeviceResponseMsg getDevice(TransportProtos.GetDeviceRequestMsg requestMsg) {
336 TbProtoQueueMsg<TransportApiRequestMsg> protoMsg = new TbProtoQueueMsg<>( 336 TbProtoQueueMsg<TransportApiRequestMsg> protoMsg = new TbProtoQueueMsg<>(
337 - UUID.randomUUID(), TransportProtos.TransportApiRequestMsg.newBuilder() 337 + UUID.randomUUID(), TransportApiRequestMsg.newBuilder()
338 .setDeviceRequestMsg(requestMsg) 338 .setDeviceRequestMsg(requestMsg)
339 .build() 339 .build()
340 ); 340 );
@@ -354,7 +354,7 @@ public class DefaultTransportService implements TransportService { @@ -354,7 +354,7 @@ public class DefaultTransportService implements TransportService {
354 @Override 354 @Override
355 public TransportProtos.GetDeviceCredentialsResponseMsg getDeviceCredentials(TransportProtos.GetDeviceCredentialsRequestMsg requestMsg) { 355 public TransportProtos.GetDeviceCredentialsResponseMsg getDeviceCredentials(TransportProtos.GetDeviceCredentialsRequestMsg requestMsg) {
356 TbProtoQueueMsg<TransportApiRequestMsg> protoMsg = new TbProtoQueueMsg<>( 356 TbProtoQueueMsg<TransportApiRequestMsg> protoMsg = new TbProtoQueueMsg<>(
357 - UUID.randomUUID(), TransportProtos.TransportApiRequestMsg.newBuilder() 357 + UUID.randomUUID(), TransportApiRequestMsg.newBuilder()
358 .setDeviceCredentialsRequestMsg(requestMsg) 358 .setDeviceCredentialsRequestMsg(requestMsg)
359 .build() 359 .build()
360 ); 360 );
@@ -576,25 +576,26 @@ public class DefaultTransportService implements TransportService { @@ -576,25 +576,26 @@ public class DefaultTransportService implements TransportService {
576 @Override 576 @Override
577 public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostEventMsg msg, 577 public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostEventMsg msg,
578 TransportServiceCallback<Void> callback,String topicName) { 578 TransportServiceCallback<Void> callback,String topicName) {
579 - //Topic地址:v1/devices/event/{${identifier}}/{$eventType}  
580 - String[] topicInfo = topicName.split(MqttTopics.BASE_DEVICE_EVENT_TOPIC); 579 + //Topic地址:v1/devices/event/${deviceId}/${identifier}/{$eventType}
  580 + String[] topicInfo = topicName.split(MqttTopics.BASE_DEVICE_EVENT_TOPIC + "/");
581 if(null == topicInfo || topicInfo.length !=2){ 581 if(null == topicInfo || topicInfo.length !=2){
582 throw new TkDataValidationException(ErrorMessage.INVALID_TOPIC.getMessage()); 582 throw new TkDataValidationException(ErrorMessage.INVALID_TOPIC.getMessage());
583 } 583 }
584 String[] eventInfo = topicInfo[1].split("/"); 584 String[] eventInfo = topicInfo[1].split("/");
585 - if(null == eventInfo || eventInfo.length !=2){ 585 + if(null == eventInfo || eventInfo.length !=3){
586 throw new TkDataValidationException(ErrorMessage.INVALID_TOPIC.getMessage()); 586 throw new TkDataValidationException(ErrorMessage.INVALID_TOPIC.getMessage());
587 } 587 }
588 reportActivityInternal(sessionInfo); 588 reportActivityInternal(sessionInfo);
589 JsonObject json = JsonUtils.getJsonObject(msg.getKvList()); 589 JsonObject json = JsonUtils.getJsonObject(msg.getKvList());
590 TbMsgMetaData metaData = new TbMsgMetaData(); 590 TbMsgMetaData metaData = new TbMsgMetaData();
591 - metaData.putValue("deviceName", sessionInfo.getDeviceName());  
592 - metaData.putValue("deviceType", sessionInfo.getDeviceType());  
593 - metaData.putValue("event_identifier", eventInfo[0]);  
594 - metaData.putValue("event_type",eventInfo[1]);  
595 CustomerId customerId = getCustomerId(sessionInfo); 591 CustomerId customerId = getCustomerId(sessionInfo);
596 TenantId tenantId = getTenantId(sessionInfo); 592 TenantId tenantId = getTenantId(sessionInfo);
597 DeviceId deviceId = new DeviceId(new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB())); 593 DeviceId deviceId = new DeviceId(new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB()));
  594 + metaData.putValue("deviceName", sessionInfo.getDeviceName());
  595 + metaData.putValue("deviceType", sessionInfo.getDeviceType());
  596 + metaData.putValue("deviceId",eventInfo[0]);
  597 + metaData.putValue("event_identifier", eventInfo[1]);
  598 + metaData.putValue("event_type",eventInfo[2]);
598 sendToRuleEngine(tenantId, deviceId, customerId, sessionInfo, json, metaData, SessionMsgType.POST_EVENT_REQUEST, 599 sendToRuleEngine(tenantId, deviceId, customerId, sessionInfo, json, metaData, SessionMsgType.POST_EVENT_REQUEST,
599 new TransportTbQueueCallback(new ApiStatsProxyCallback<>(tenantId, customerId, msg.getKvList().size(), callback))); 600 new TransportTbQueueCallback(new ApiStatsProxyCallback<>(tenantId, customerId, msg.getKvList().size(), callback)));
600 } 601 }
@@ -726,8 +727,8 @@ public class DefaultTransportService implements TransportService { @@ -726,8 +727,8 @@ public class DefaultTransportService implements TransportService {
726 @Override 727 @Override
727 public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.GetOtaPackageRequestMsg msg, TransportServiceCallback<TransportProtos.GetOtaPackageResponseMsg> callback) { 728 public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.GetOtaPackageRequestMsg msg, TransportServiceCallback<TransportProtos.GetOtaPackageResponseMsg> callback) {
728 if (checkLimits(sessionInfo, msg, callback)) { 729 if (checkLimits(sessionInfo, msg, callback)) {
729 - TbProtoQueueMsg<TransportProtos.TransportApiRequestMsg> protoMsg =  
730 - new TbProtoQueueMsg<>(UUID.randomUUID(), TransportProtos.TransportApiRequestMsg.newBuilder().setOtaPackageRequestMsg(msg).build()); 730 + TbProtoQueueMsg<TransportApiRequestMsg> protoMsg =
  731 + new TbProtoQueueMsg<>(UUID.randomUUID(), TransportApiRequestMsg.newBuilder().setOtaPackageRequestMsg(msg).build());
731 732
732 AsyncCallbackTemplate.withCallback(transportApiRequestTemplate.send(protoMsg), response -> { 733 AsyncCallbackTemplate.withCallback(transportApiRequestTemplate.send(protoMsg), response -> {
733 callback.onSuccess(response.getValue().getOtaPackageResponseMsg()); 734 callback.onSuccess(response.getValue().getOtaPackageResponseMsg());
@@ -870,7 +871,7 @@ public class DefaultTransportService implements TransportService { @@ -870,7 +871,7 @@ public class DefaultTransportService implements TransportService {
870 } 871 }
871 } 872 }
872 873
873 - protected void processToTransportMsg(TransportProtos.ToTransportMsg toSessionMsg) { 874 + protected void processToTransportMsg(ToTransportMsg toSessionMsg) {
874 UUID sessionId = new UUID(toSessionMsg.getSessionIdMSB(), toSessionMsg.getSessionIdLSB()); 875 UUID sessionId = new UUID(toSessionMsg.getSessionIdMSB(), toSessionMsg.getSessionIdLSB());
875 SessionMetaData md = sessions.get(sessionId); 876 SessionMetaData md = sessions.get(sessionId);
876 if (md != null) { 877 if (md != null) {
@@ -56,6 +56,12 @@ public class ModelConstants { @@ -56,6 +56,12 @@ public class ModelConstants {
56 public static final String ATTRIBUTE_KEY_COLUMN = "attribute_key"; 56 public static final String ATTRIBUTE_KEY_COLUMN = "attribute_key";
57 public static final String LAST_UPDATE_TS_COLUMN = "last_update_ts"; 57 public static final String LAST_UPDATE_TS_COLUMN = "last_update_ts";
58 58
  59 + //Thingskit function
  60 + public static final String EVENT_IDENTIFIER_COLUMN = "event_identifier";
  61 + public static final String EVENT_TIME_COLUMN = "event_time";
  62 + public static final String EVENT_TYPE_COLUMN = "event_type";
  63 + public static final String EVENT_KEY_COLUMN = "event_key";
  64 +
59 /** 65 /**
60 * Cassandra user constants. 66 * Cassandra user constants.
61 */ 67 */
@@ -9,8 +9,10 @@ import lombok.RequiredArgsConstructor; @@ -9,8 +9,10 @@ import lombok.RequiredArgsConstructor;
9 import org.apache.commons.lang3.StringUtils; 9 import org.apache.commons.lang3.StringUtils;
10 import org.springframework.stereotype.Service; 10 import org.springframework.stereotype.Service;
11 import org.springframework.transaction.annotation.Transactional; 11 import org.springframework.transaction.annotation.Transactional;
  12 +import org.thingsboard.server.common.data.DataConstants;
12 import org.thingsboard.server.common.data.id.EntityId; 13 import org.thingsboard.server.common.data.id.EntityId;
13 import org.thingsboard.server.common.data.yunteng.constant.FastIotConstants; 14 import org.thingsboard.server.common.data.yunteng.constant.FastIotConstants;
  15 +import org.thingsboard.server.common.data.yunteng.constant.ModelConstants;
14 import org.thingsboard.server.common.data.yunteng.core.exception.TkDataValidationException; 16 import org.thingsboard.server.common.data.yunteng.core.exception.TkDataValidationException;
15 import org.thingsboard.server.common.data.yunteng.core.message.ErrorMessage; 17 import org.thingsboard.server.common.data.yunteng.core.message.ErrorMessage;
16 import org.thingsboard.server.common.data.yunteng.dto.*; 18 import org.thingsboard.server.common.data.yunteng.dto.*;
@@ -251,6 +253,9 @@ public class TkSceneLinkageServiceImpl extends AbstractBaseService<SceneLinkageM @@ -251,6 +253,9 @@ public class TkSceneLinkageServiceImpl extends AbstractBaseService<SceneLinkageM
251 ObjectNode doContext = JacksonUtil.newObjectNode(); 253 ObjectNode doContext = JacksonUtil.newObjectNode();
252 doContext.put("method", "methodThingskit"); 254 doContext.put("method", "methodThingskit");
253 doContext.put("params", doActionDTO.getDoContext()); 255 doContext.put("params", doActionDTO.getDoContext());
  256 + ObjectNode addtionalInfo = JacksonUtil.newObjectNode();
  257 + addtionalInfo.put(ModelConstants.TablePropertyMapping.COMMAND_TYPE, doActionDTO.getCommandType());
  258 + doContext.put(DataConstants.ADDITIONAL_INFO, addtionalInfo);
254 doActionDTO.setDoContext(doContext); 259 doActionDTO.setDoContext(doContext);
255 } 260 }
256 return doActionDTO.getEntity(TkDoActionEntity.class); 261 return doActionDTO.getEntity(TkDoActionEntity.class);
  1 +/**
  2 + * Copyright © 2016-2022 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.dao.yunteng.jpa.dao.event;
  17 +
  18 +import com.google.common.util.concurrent.Futures;
  19 +import com.google.common.util.concurrent.ListenableFuture;
  20 +import lombok.extern.slf4j.Slf4j;
  21 +import org.springframework.stereotype.Service;
  22 +import org.thingsboard.server.common.data.EntityType;
  23 +import org.thingsboard.server.common.data.id.DeviceProfileId;
  24 +import org.thingsboard.server.common.data.id.EntityId;
  25 +import org.thingsboard.server.common.data.id.TenantId;
  26 +import org.thingsboard.server.common.data.yunteng.dto.TkEventKvEntry;
  27 +import org.thingsboard.server.common.data.yunteng.enums.TkEventType;
  28 +import org.thingsboard.server.dao.service.Validator;
  29 +import org.thingsboard.server.dao.yunteng.event.TkEventsService;
  30 +
  31 +import java.util.Collection;
  32 +import java.util.List;
  33 +import java.util.Optional;
  34 +import java.util.stream.Collectors;
  35 +
  36 +import static org.thingsboard.server.dao.yunteng.jpa.dao.event.EventUtils.validate;
  37 +
  38 +
  39 +/**
  40 + * @author Andrew Shvayka
  41 + */
  42 +@Service
  43 +@Slf4j
  44 +public class BaseEventsService implements TkEventsService {
  45 + private final EventsDao eventsDao;
  46 +
  47 + public BaseEventsService(EventsDao eventsDao) {
  48 + this.eventsDao = eventsDao;
  49 + }
  50 +
  51 + @Override
  52 + public ListenableFuture<Optional<TkEventKvEntry>> find(TenantId tenantId, EntityId entityId, String eventId,String attributeKey) {
  53 + validate(entityId, eventId);
  54 + Validator.validateString(attributeKey, "Incorrect attribute key " + attributeKey);
  55 + return eventsDao.find(tenantId, entityId, null, attributeKey);
  56 + }
  57 +
  58 + @Override
  59 + public ListenableFuture<List<TkEventKvEntry>> find(TenantId tenantId, EntityId entityId, String eventId, Collection<String> attributeKeys) {
  60 + validate(entityId, eventId);
  61 + attributeKeys.forEach(attributeKey -> Validator.validateString(attributeKey, "Incorrect attribute key " + attributeKey));
  62 + return eventsDao.find(tenantId, entityId, null, attributeKeys);
  63 + }
  64 +
  65 + @Override
  66 + public ListenableFuture<List<TkEventKvEntry>> findAll(TenantId tenantId, EntityId entityId,String eventId, TkEventType eventType) {
  67 + validate(entityId,eventId);
  68 + return eventsDao.findAll(tenantId, entityId, eventType);
  69 + }
  70 +
  71 + @Override
  72 + public List<String> findAllKeysByDeviceProfileId(TenantId tenantId, DeviceProfileId deviceProfileId) {
  73 + return eventsDao.findAllKeysByDeviceProfileId(tenantId, deviceProfileId);
  74 + }
  75 +
  76 + @Override
  77 + public List<String> findAllKeysByEntityIds(TenantId tenantId, EntityType entityType, List<EntityId> entityIds) {
  78 + return eventsDao.findAllKeysByEntityIds(tenantId, entityType, entityIds);
  79 + }
  80 +
  81 + @Override
  82 + public ListenableFuture<List<Void>> save(TenantId tenantId, EntityId entityId, String eventId, TkEventType eventType, List<TkEventKvEntry> attributes) {
  83 + validate(entityId, eventId,eventType);
  84 + attributes.forEach(attribute -> validate(attribute));
  85 +
  86 + List<ListenableFuture<Void>> saveFutures = attributes.stream().map(attribute -> eventsDao.save(tenantId, entityId, eventType,eventId, attribute)).collect(Collectors.toList());
  87 + return Futures.allAsList(saveFutures);
  88 + }
  89 +
  90 + @Override
  91 + public ListenableFuture<List<Void>> removeAll(TenantId tenantId, EntityId entityId, String eventId, TkEventType eventType, List<String> eventKeys) {
  92 + validate(entityId, eventId);
  93 + return eventsDao.removeAll(tenantId, entityId, eventId,eventType, eventKeys);
  94 + }
  95 +}
  1 +/**
  2 + * Copyright © 2016-2022 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.dao.yunteng.jpa.dao.event;
  17 +
  18 +import org.thingsboard.server.common.data.id.EntityId;
  19 +import org.thingsboard.server.common.data.kv.AttributeKvEntry;
  20 +import org.thingsboard.server.common.data.yunteng.dto.TkEventKvEntry;
  21 +import org.thingsboard.server.common.data.yunteng.enums.TkEventType;
  22 +import org.thingsboard.server.dao.exception.IncorrectParameterException;
  23 +import org.thingsboard.server.dao.service.Validator;
  24 +
  25 +public class EventUtils {
  26 + public static void validate(EntityId id, String eventIdentifier) {
  27 + Validator.validateId(id.getId(), "Incorrect id " + id);
  28 + Validator.validateString(eventIdentifier, "Incorrect scope " + eventIdentifier);
  29 + }
  30 +
  31 +
  32 + public static void validate(EntityId id, String eventIdentifier, TkEventType eventType) {
  33 + Validator.validateId(id.getId(), "Incorrect id " + id);
  34 + Validator.validateString(eventIdentifier, "Incorrect scope " + eventIdentifier);
  35 + if(eventType == null){
  36 + throw new IncorrectParameterException("eventType entry can't be null");
  37 + }
  38 + }
  39 +
  40 + public static void validate(TkEventKvEntry kvEntry) {
  41 + if (kvEntry == null) {
  42 + throw new IncorrectParameterException("Key value entry can't be null");
  43 + } else if (kvEntry.getDataType() == null) {
  44 + throw new IncorrectParameterException("Incorrect kvEntry. Data type can't be null");
  45 + } else {
  46 + Validator.validateString(kvEntry.getKey(), "Incorrect kvEntry. Key can't be empty");
  47 + Validator.validatePositiveNumber(kvEntry.getEventTime(), "Incorrect last update ts. Ts should be positive");
  48 + }
  49 + }
  50 +}
  1 +/**
  2 + * Copyright © 2016-2022 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.dao.yunteng.jpa.dao.event;
  17 +
  18 +import com.google.common.util.concurrent.ListenableFuture;
  19 +import org.thingsboard.server.common.data.EntityType;
  20 +import org.thingsboard.server.common.data.id.DeviceProfileId;
  21 +import org.thingsboard.server.common.data.id.EntityId;
  22 +import org.thingsboard.server.common.data.id.TenantId;
  23 +import org.thingsboard.server.common.data.yunteng.dto.TkEventKvEntry;
  24 +import org.thingsboard.server.common.data.yunteng.enums.TkEventType;
  25 +
  26 +import java.util.Collection;
  27 +import java.util.List;
  28 +import java.util.Optional;
  29 +
  30 +/**
  31 + * @author Andrew Shvayka
  32 + */
  33 +public interface EventsDao {
  34 +
  35 + ListenableFuture<Optional<TkEventKvEntry>> find(TenantId tenantId, EntityId entityId, TkEventType attributeType, String attributeKey);
  36 +
  37 + ListenableFuture<List<TkEventKvEntry>> find(TenantId tenantId, EntityId entityId, TkEventType attributeType, Collection<String> attributeKey);
  38 +
  39 + ListenableFuture<List<TkEventKvEntry>> findAll(TenantId tenantId, EntityId entityId, TkEventType attributeType);
  40 +
  41 + ListenableFuture<Void> save(TenantId tenantId, EntityId entityId, TkEventType attributeType,String eventIdentifier, TkEventKvEntry attribute);
  42 +
  43 + ListenableFuture<List<Void>> removeAll(TenantId tenantId, EntityId entityId, String eventId,TkEventType eventType,List<String> keys);
  44 +
  45 + List<String> findAllKeysByDeviceProfileId(TenantId tenantId, DeviceProfileId deviceProfileId);
  46 +
  47 + List<String> findAllKeysByEntityIds(TenantId tenantId, EntityType entityType, List<EntityId> entityIds);
  48 +}
  1 +/**
  2 + * Copyright © 2016-2022 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.dao.yunteng.jpa.entity.events;
  17 +
  18 +import lombok.AllArgsConstructor;
  19 +import lombok.Data;
  20 +import lombok.NoArgsConstructor;
  21 +import org.thingsboard.server.common.data.yunteng.enums.TkEventType;
  22 +
  23 +import javax.persistence.Column;
  24 +import javax.persistence.Embeddable;
  25 +import javax.persistence.EnumType;
  26 +import javax.persistence.Enumerated;
  27 +import java.io.Serializable;
  28 +import java.util.UUID;
  29 +
  30 +import static org.thingsboard.server.dao.model.ModelConstants.*;
  31 +
  32 +@Data
  33 +@AllArgsConstructor
  34 +@NoArgsConstructor
  35 +@Embeddable
  36 +public class TkEventKvCompositeKey implements Serializable {
  37 + @Enumerated(EnumType.STRING)
  38 + @Column(name = EVENT_TYPE_COLUMN)
  39 + private TkEventType eventType;
  40 + @Column(name = ENTITY_ID_COLUMN, columnDefinition = "uuid")
  41 + private UUID entityId;
  42 + @Column(name = EVENT_IDENTIFIER_COLUMN)
  43 + private String eventIdentifier;
  44 + @Column(name = EVENT_TIME_COLUMN)
  45 + private Long eventTime;
  46 + @Column(name = EVENT_KEY_COLUMN)
  47 + private String eventKey;
  48 +}
  1 +/**
  2 + * Copyright © 2016-2022 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.dao.yunteng.jpa.entity.events;
  17 +
  18 +import lombok.Data;
  19 +import org.thingsboard.server.common.data.kv.*;
  20 +import org.thingsboard.server.common.data.yunteng.dto.TkEventKvDto;
  21 +import org.thingsboard.server.common.data.yunteng.dto.TkEventKvEntry;
  22 +import org.thingsboard.server.dao.model.ToData;
  23 +
  24 +import javax.persistence.Column;
  25 +import javax.persistence.EmbeddedId;
  26 +import javax.persistence.Entity;
  27 +import javax.persistence.Table;
  28 +import java.io.Serializable;
  29 +import java.util.UUID;
  30 +
  31 +import static org.thingsboard.server.dao.model.ModelConstants.*;
  32 +
  33 +@Data
  34 +@Entity
  35 +@Table(name = "tk_event_kv")
  36 +public class TkEventKvEntity implements ToData<TkEventKvEntry>, Serializable {
  37 +
  38 + @EmbeddedId
  39 + private TkEventKvCompositeKey id;
  40 +
  41 + @Column(name = BOOLEAN_VALUE_COLUMN)
  42 + private Boolean booleanValue;
  43 +
  44 + @Column(name = STRING_VALUE_COLUMN)
  45 + private String strValue;
  46 +
  47 + @Column(name = LONG_VALUE_COLUMN)
  48 + private Long longValue;
  49 +
  50 + @Column(name = DOUBLE_VALUE_COLUMN)
  51 + private Double doubleValue;
  52 +
  53 + @Column(name = JSON_VALUE_COLUMN)
  54 + private String jsonValue;
  55 +
  56 +
  57 + @Column(name = DEVICE_DEVICE_PROFILE_ID_PROPERTY, columnDefinition = "uuid")
  58 + private UUID deviceProfileId;
  59 +
  60 + @Override
  61 + public TkEventKvEntry toData() {
  62 + KvEntry kvEntry = null;
  63 + if (strValue != null) {
  64 + kvEntry = new StringDataEntry(id.getEventIdentifier(), strValue);
  65 + } else if (booleanValue != null) {
  66 + kvEntry = new BooleanDataEntry(id.getEventIdentifier(), booleanValue);
  67 + } else if (doubleValue != null) {
  68 + kvEntry = new DoubleDataEntry(id.getEventIdentifier(), doubleValue);
  69 + } else if (longValue != null) {
  70 + kvEntry = new LongDataEntry(id.getEventIdentifier(), longValue);
  71 + } else if (jsonValue != null) {
  72 + kvEntry = new JsonDataEntry(id.getEventIdentifier(), jsonValue);
  73 + }
  74 +
  75 + return new TkEventKvDto(kvEntry, id.getEventTime());
  76 + }
  77 +}
  1 +package org.thingsboard.server.dao.yunteng.jpa.repository.event;
  2 +
  3 +import lombok.extern.slf4j.Slf4j;
  4 +import org.springframework.beans.factory.annotation.Autowired;
  5 +import org.springframework.beans.factory.annotation.Value;
  6 +import org.springframework.jdbc.core.BatchPreparedStatementSetter;
  7 +import org.springframework.jdbc.core.JdbcTemplate;
  8 +import org.springframework.stereotype.Repository;
  9 +import org.springframework.transaction.TransactionStatus;
  10 +import org.springframework.transaction.support.TransactionCallbackWithoutResult;
  11 +import org.springframework.transaction.support.TransactionTemplate;
  12 +import org.thingsboard.server.dao.yunteng.jpa.entity.events.TkEventKvEntity;
  13 +
  14 +import java.sql.PreparedStatement;
  15 +import java.sql.SQLException;
  16 +import java.sql.Types;
  17 +import java.util.ArrayList;
  18 +import java.util.List;
  19 +import java.util.regex.Pattern;
  20 +
  21 +@Repository
  22 +@Slf4j
  23 +public abstract class EventKvInsertRepository {
  24 +
  25 + private static final ThreadLocal<Pattern> PATTERN_THREAD_LOCAL = ThreadLocal.withInitial(() -> Pattern.compile(String.valueOf(Character.MIN_VALUE)));
  26 + private static final String EMPTY_STR = "";
  27 +
  28 + private static final String BATCH_UPDATE = "UPDATE tk_event_kv SET str_v = ?, long_v = ?, dbl_v = ?, bool_v = ?, json_v = cast(? AS json) " +
  29 + "WHERE event_key = ? and event_type = ? and entity_id = ? and event_identifier =? and event_time = ?;";
  30 +
  31 + private static final String INSERT_OR_UPDATE =
  32 + "INSERT INTO tk_event_kv (event_type, entity_id, event_identifier, event_time, str_v, long_v, dbl_v, bool_v, json_v, event_key) " +
  33 + "VALUES(?, ?, ?, ?, ?, ?, ?, ?, cast(? AS json), ?) " +
  34 + "ON CONFLICT (event_type, entity_id, event_identifier, event_time, event_key) " +
  35 + "DO UPDATE SET str_v = ?, long_v = ?, dbl_v = ?, bool_v = ?, json_v = cast(? AS json), event_key = ?;";
  36 +
  37 + @Autowired
  38 + protected JdbcTemplate jdbcTemplate;
  39 +
  40 + @Autowired
  41 + private TransactionTemplate transactionTemplate;
  42 +
  43 + @Value("${sql.remove_null_chars:true}")
  44 + private boolean removeNullChars;
  45 +
  46 + protected void saveOrUpdate(List<TkEventKvEntity> entities) {
  47 + transactionTemplate.execute(new TransactionCallbackWithoutResult() {
  48 + @Override
  49 + protected void doInTransactionWithoutResult(TransactionStatus status) {
  50 + int[] result = jdbcTemplate.batchUpdate(BATCH_UPDATE, new BatchPreparedStatementSetter() {
  51 + @Override
  52 + public void setValues(PreparedStatement ps, int i) throws SQLException {
  53 + TkEventKvEntity kvEntity = entities.get(i);
  54 + ps.setString(1, replaceNullChars(kvEntity.getStrValue()));
  55 +
  56 + if (kvEntity.getLongValue() != null) {
  57 + ps.setLong(2, kvEntity.getLongValue());
  58 + } else {
  59 + ps.setNull(2, Types.BIGINT);
  60 + }
  61 +
  62 + if (kvEntity.getDoubleValue() != null) {
  63 + ps.setDouble(3, kvEntity.getDoubleValue());
  64 + } else {
  65 + ps.setNull(3, Types.DOUBLE);
  66 + }
  67 +
  68 + if (kvEntity.getBooleanValue() != null) {
  69 + ps.setBoolean(4, kvEntity.getBooleanValue());
  70 + } else {
  71 + ps.setNull(4, Types.BOOLEAN);
  72 + }
  73 +
  74 + ps.setString(5, replaceNullChars(kvEntity.getJsonValue()));
  75 +
  76 + ps.setString(6, kvEntity.getId().getEventKey());
  77 + ps.setString(7, kvEntity.getId().getEventType().name());
  78 + ps.setObject(8, kvEntity.getId().getEntityId());
  79 + ps.setString(9, kvEntity.getId().getEventIdentifier());
  80 + ps.setLong(10, kvEntity.getId().getEventTime());
  81 + }
  82 +
  83 + @Override
  84 + public int getBatchSize() {
  85 + return entities.size();
  86 + }
  87 + });
  88 +
  89 + int updatedCount = 0;
  90 + for (int i = 0; i < result.length; i++) {
  91 + if (result[i] == 0) {
  92 + updatedCount++;
  93 + }
  94 + }
  95 +
  96 + List<TkEventKvEntity> insertEntities = new ArrayList<>(updatedCount);
  97 + for (int i = 0; i < result.length; i++) {
  98 + if (result[i] == 0) {
  99 + insertEntities.add(entities.get(i));
  100 + }
  101 + }
  102 +
  103 + jdbcTemplate.batchUpdate(INSERT_OR_UPDATE, new BatchPreparedStatementSetter() {
  104 + @Override
  105 + public void setValues(PreparedStatement ps, int i) throws SQLException {
  106 + TkEventKvEntity kvEntity = insertEntities.get(i);
  107 + ps.setString(1, kvEntity.getId().getEventType().name());
  108 + ps.setObject(2, kvEntity.getId().getEntityId());
  109 + ps.setString(3, kvEntity.getId().getEventIdentifier());
  110 + ps.setLong(4, kvEntity.getId().getEventTime());
  111 +
  112 + ps.setString(5, replaceNullChars(kvEntity.getStrValue()));
  113 + ps.setString(11, replaceNullChars(kvEntity.getStrValue()));
  114 +
  115 + if (kvEntity.getLongValue() != null) {
  116 + ps.setLong(6, kvEntity.getLongValue());
  117 + ps.setLong(12, kvEntity.getLongValue());
  118 + } else {
  119 + ps.setNull(6, Types.BIGINT);
  120 + ps.setNull(12, Types.BIGINT);
  121 + }
  122 +
  123 + if (kvEntity.getDoubleValue() != null) {
  124 + ps.setDouble(7, kvEntity.getDoubleValue());
  125 + ps.setDouble(13, kvEntity.getDoubleValue());
  126 + } else {
  127 + ps.setNull(7, Types.DOUBLE);
  128 + ps.setNull(13, Types.DOUBLE);
  129 + }
  130 +
  131 + if (kvEntity.getBooleanValue() != null) {
  132 + ps.setBoolean(8, kvEntity.getBooleanValue());
  133 + ps.setBoolean(14, kvEntity.getBooleanValue());
  134 + } else {
  135 + ps.setNull(8, Types.BOOLEAN);
  136 + ps.setNull(14, Types.BOOLEAN);
  137 + }
  138 +
  139 + ps.setString(9, replaceNullChars(kvEntity.getJsonValue()));
  140 + ps.setString(15, replaceNullChars(kvEntity.getJsonValue()));
  141 +
  142 + ps.setString(10, kvEntity.getId().getEventKey());
  143 + ps.setString(16, kvEntity.getId().getEventKey());
  144 + }
  145 +
  146 + @Override
  147 + public int getBatchSize() {
  148 + return insertEntities.size();
  149 + }
  150 + });
  151 + }
  152 + });
  153 + }
  154 +
  155 + private String replaceNullChars(String strValue) {
  156 + if (removeNullChars && strValue != null) {
  157 + return PATTERN_THREAD_LOCAL.get().matcher(strValue).replaceAll(EMPTY_STR);
  158 + }
  159 + return strValue;
  160 + }
  161 +}
  1 +/**
  2 + * Copyright © 2016-2022 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.dao.yunteng.jpa.repository.event;
  17 +
  18 +import org.springframework.data.jpa.repository.Modifying;
  19 +import org.springframework.data.jpa.repository.Query;
  20 +import org.springframework.data.repository.CrudRepository;
  21 +import org.springframework.data.repository.query.Param;
  22 +import org.springframework.transaction.annotation.Transactional;
  23 +import org.thingsboard.server.common.data.yunteng.enums.TkEventType;
  24 +import org.thingsboard.server.dao.yunteng.jpa.entity.events.TkEventKvCompositeKey;
  25 +import org.thingsboard.server.dao.yunteng.jpa.entity.events.TkEventKvEntity;
  26 +
  27 +import java.util.List;
  28 +import java.util.UUID;
  29 +
  30 +public interface EventKvRepository extends CrudRepository<TkEventKvEntity, TkEventKvCompositeKey> {
  31 +
  32 + @Query("SELECT a FROM TkEventKvEntity a WHERE a.id.eventType = :eventType " +
  33 + "AND a.id.entityId = :entityId " )
  34 + List<TkEventKvEntity> findAllByEventTypeAndEntityId(@Param("eventType") TkEventType eventType,
  35 + @Param("entityId") UUID entityId);
  36 +
  37 + @Transactional
  38 + @Modifying
  39 + @Query("DELETE FROM TkEventKvEntity a WHERE a.id.eventType = :eventType " +
  40 + "AND a.id.entityId = :entityId " +
  41 + "AND a.id.eventIdentifier = :eventIdentifier " +
  42 + "AND a.id.eventKey = :eventKey")
  43 + void delete(@Param("eventType") TkEventType eventType,
  44 + @Param("entityId") UUID entityId,
  45 + @Param("eventIdentifier") String eventIdentifier,
  46 + @Param("eventKey") String eventKey);
  47 +
  48 + @Query(value = "SELECT DISTINCT event_key FROM tk_event_kv WHERE entity_id " +
  49 + " in (SELECT id FROM device WHERE tenant_id = :tenantId and device_profile_id = :deviceProfileId limit 100) ORDER BY event_key", nativeQuery = true)
  50 + List<String> findAllKeysByDeviceProfileId(@Param("tenantId") UUID tenantId,
  51 + @Param("deviceProfileId") UUID deviceProfileId);
  52 +
  53 + @Query(value = "SELECT DISTINCT event_key FROM tk_event_kv WHERE entity_id " +
  54 + " in (SELECT id FROM device WHERE tenant_id = :tenantId limit 100) ORDER BY event_key", nativeQuery = true)
  55 + List<String> findAllKeysByTenantId(@Param("tenantId") UUID tenantId);
  56 +
  57 + @Query(value = "SELECT DISTINCT event_key FROM tk_event_kv WHERE entity_id " +
  58 + " in :entityIds ORDER BY event_key", nativeQuery = true)
  59 + List<String> findAllKeysByEntityIds(@Param("entityIds") List<UUID> entityIds);
  60 +}
  61 +
  1 +/**
  2 + * Copyright © 2016-2022 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.dao.yunteng.jpa.repository.event;
  17 +
  18 +import org.springframework.stereotype.Repository;
  19 +import org.springframework.transaction.annotation.Transactional;
  20 +import org.thingsboard.server.dao.util.HsqlDao;
  21 +import org.thingsboard.server.dao.yunteng.jpa.entity.events.TkEventKvEntity;
  22 +
  23 +import java.sql.Types;
  24 +import java.util.List;
  25 +
  26 +@HsqlDao
  27 +@Repository
  28 +@Transactional
  29 +public class HsqlEventsInsertRepository extends EventKvInsertRepository {
  30 +
  31 + private static final String INSERT_OR_UPDATE =
  32 + "MERGE INTO tk_event_kv USING(VALUES ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) " +
  33 + "A (event_type, entity_id, event_identifier, event_key, str_v, long_v, dbl_v, bool_v, json_v, event_key) " +
  34 + "ON (tk_event_kv.event_type=A.event_type " +
  35 + "AND tk_event_kv.entity_id=A.entity_id " +
  36 + "AND tk_event_kv.event_identifier=A.event_identifier " +
  37 + "AND tk_event_kv.event_time=A.event_time) " +
  38 + "AND tk_event_kv.event_key=A.event_key) " +
  39 + "WHEN MATCHED THEN UPDATE SET tk_event_kv.str_v = A.str_v, tk_event_kv.long_v = A.long_v, tk_event_kv.dbl_v = A.dbl_v, tk_event_kv.bool_v = A.bool_v, tk_event_kv.json_v = A.json_v " +
  40 + "WHEN NOT MATCHED THEN INSERT (event_type, entity_id, event_identifier, event_time, str_v, long_v, dbl_v, bool_v, json_v, event_key) " +
  41 + "VALUES (A.event_type, A.entity_id, A.event_identifier, A.event_time, A.str_v, A.long_v, A.dbl_v, A.bool_v, A.json_v, A.event_key)";
  42 +
  43 + @Override
  44 + protected void saveOrUpdate(List<TkEventKvEntity> entities) {
  45 + entities.forEach(entity -> {
  46 + jdbcTemplate.update(INSERT_OR_UPDATE, ps -> {
  47 + ps.setString(1, entity.getId().getEventType().name());
  48 + ps.setObject(2, entity.getId().getEntityId());
  49 + ps.setString(3, entity.getId().getEventIdentifier());
  50 + ps.setLong(4, entity.getId().getEventTime());
  51 + ps.setString(5, entity.getStrValue());
  52 +
  53 + if (entity.getLongValue() != null) {
  54 + ps.setLong(6, entity.getLongValue());
  55 + } else {
  56 + ps.setNull(6, Types.BIGINT);
  57 + }
  58 +
  59 + if (entity.getDoubleValue() != null) {
  60 + ps.setDouble(7, entity.getDoubleValue());
  61 + } else {
  62 + ps.setNull(7, Types.DOUBLE);
  63 + }
  64 +
  65 + if (entity.getBooleanValue() != null) {
  66 + ps.setBoolean(8, entity.getBooleanValue());
  67 + } else {
  68 + ps.setNull(8, Types.BOOLEAN);
  69 + }
  70 +
  71 + ps.setString(9, entity.getJsonValue());
  72 +
  73 + ps.setString(10, entity.getId().getEventKey());
  74 + });
  75 + });
  76 + }
  77 +}
  1 +/**
  2 + * Copyright © 2016-2022 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.dao.yunteng.jpa.repository.event;
  17 +
  18 +import com.google.common.collect.Lists;
  19 +import com.google.common.util.concurrent.Futures;
  20 +import com.google.common.util.concurrent.ListenableFuture;
  21 +import lombok.extern.slf4j.Slf4j;
  22 +import org.springframework.beans.factory.annotation.Autowired;
  23 +import org.springframework.beans.factory.annotation.Value;
  24 +import org.springframework.stereotype.Component;
  25 +import org.thingsboard.server.common.data.EntityType;
  26 +import org.thingsboard.server.common.data.id.DeviceProfileId;
  27 +import org.thingsboard.server.common.data.id.EntityId;
  28 +import org.thingsboard.server.common.data.id.TenantId;
  29 +import org.thingsboard.server.common.data.yunteng.dto.TkEventKvEntry;
  30 +import org.thingsboard.server.common.data.yunteng.enums.TkEventType;
  31 +import org.thingsboard.server.common.stats.StatsFactory;
  32 +import org.thingsboard.server.dao.DaoUtil;
  33 +import org.thingsboard.server.dao.sql.JpaAbstractDaoListeningExecutorService;
  34 +import org.thingsboard.server.dao.sql.ScheduledLogExecutorComponent;
  35 +import org.thingsboard.server.dao.sql.TbSqlBlockingQueueParams;
  36 +import org.thingsboard.server.dao.sql.TbSqlBlockingQueueWrapper;
  37 +import org.thingsboard.server.dao.yunteng.jpa.entity.events.TkEventKvCompositeKey;
  38 +import org.thingsboard.server.dao.yunteng.jpa.entity.events.TkEventKvEntity;
  39 +import org.thingsboard.server.dao.yunteng.jpa.dao.event.EventsDao;
  40 +
  41 +import javax.annotation.PostConstruct;
  42 +import javax.annotation.PreDestroy;
  43 +import java.util.Collection;
  44 +import java.util.Comparator;
  45 +import java.util.List;
  46 +import java.util.Optional;
  47 +import java.util.function.Function;
  48 +import java.util.stream.Collectors;
  49 +
  50 +@Component
  51 +@Slf4j
  52 +public class JpaEventDao extends JpaAbstractDaoListeningExecutorService implements EventsDao {
  53 +
  54 + @Autowired
  55 + ScheduledLogExecutorComponent logExecutor;
  56 +
  57 + @Autowired
  58 + private EventKvRepository eventKvRepository;
  59 +
  60 + @Autowired
  61 + private EventKvInsertRepository eventKvInsertRepository;
  62 +
  63 + @Autowired
  64 + private StatsFactory statsFactory;
  65 +
  66 + @Value("${sql.attributes.batch_size:1000}")
  67 + private int batchSize;
  68 +
  69 + @Value("${sql.attributes.batch_max_delay:100}")
  70 + private long maxDelay;
  71 +
  72 + @Value("${sql.attributes.stats_print_interval_ms:1000}")
  73 + private long statsPrintIntervalMs;
  74 +
  75 + @Value("${sql.attributes.batch_threads:4}")
  76 + private int batchThreads;
  77 +
  78 + @Value("${sql.batch_sort:false}")
  79 + private boolean batchSortEnabled;
  80 +
  81 + private TbSqlBlockingQueueWrapper<TkEventKvEntity> queue;
  82 +
  83 + @PostConstruct
  84 + private void init() {
  85 + TbSqlBlockingQueueParams params = TbSqlBlockingQueueParams.builder()
  86 + .logName("Events")
  87 + .batchSize(batchSize)
  88 + .maxDelay(maxDelay)
  89 + .statsPrintIntervalMs(statsPrintIntervalMs)
  90 + .statsNamePrefix("events")
  91 + .batchSortEnabled(batchSortEnabled)
  92 + .build();
  93 +
  94 + Function<TkEventKvEntity, Integer> hashcodeFunction = entity -> entity.getId().getEntityId().hashCode();
  95 + queue = new TbSqlBlockingQueueWrapper<>(params, hashcodeFunction, batchThreads, statsFactory);
  96 + queue.init(logExecutor, v -> eventKvInsertRepository.saveOrUpdate(v),
  97 + Comparator.comparing((TkEventKvEntity attributeKvEntity) -> attributeKvEntity.getId().getEntityId())
  98 + .thenComparing(attributeKvEntity -> attributeKvEntity.getId().getEventType().name())
  99 + .thenComparing(attributeKvEntity -> attributeKvEntity.getId().getEventTime())
  100 + .thenComparing(attributeKvEntity -> attributeKvEntity.getId().getEventIdentifier())
  101 + );
  102 + }
  103 +
  104 + @PreDestroy
  105 + private void destroy() {
  106 + if (queue != null) {
  107 + queue.destroy();
  108 + }
  109 + }
  110 +
  111 + @Override
  112 + public ListenableFuture<Optional<TkEventKvEntry>> find(TenantId tenantId, EntityId entityId, TkEventType eventType, String attributeKey) {
  113 + TkEventKvCompositeKey compositeKey =
  114 + getEventKvCompositeKey(eventType,entityId,null, attributeKey,null);
  115 + return Futures.immediateFuture(
  116 + Optional.ofNullable(DaoUtil.getData(eventKvRepository.findById(compositeKey))));
  117 + }
  118 +
  119 + @Override
  120 + public ListenableFuture<List<TkEventKvEntry>> find(TenantId tenantId, EntityId entityId, TkEventType eventType, Collection<String> attributeKeys) {
  121 + List<TkEventKvCompositeKey> compositeKeys =
  122 + attributeKeys
  123 + .stream()
  124 + .map(attributeKey ->
  125 + getEventKvCompositeKey(eventType,entityId, null,attributeKey,null))
  126 + .collect(Collectors.toList());
  127 + return Futures.immediateFuture(
  128 + DaoUtil.convertDataList(Lists.newArrayList(eventKvRepository.findAllById(compositeKeys))));
  129 + }
  130 +
  131 + @Override
  132 + public ListenableFuture<List<TkEventKvEntry>> findAll(TenantId tenantId, EntityId entityId, TkEventType eventType) {
  133 + return Futures.immediateFuture(
  134 + DaoUtil.convertDataList(Lists.newArrayList(
  135 + eventKvRepository.findAllByEventTypeAndEntityId(
  136 + eventType,
  137 + entityId.getId()))));
  138 + }
  139 +
  140 + @Override
  141 + public List<String> findAllKeysByDeviceProfileId(TenantId tenantId, DeviceProfileId deviceProfileId) {
  142 + if (deviceProfileId != null) {
  143 + return eventKvRepository.findAllKeysByDeviceProfileId(tenantId.getId(), deviceProfileId.getId());
  144 + } else {
  145 + return eventKvRepository.findAllKeysByTenantId(tenantId.getId());
  146 + }
  147 + }
  148 +
  149 + @Override
  150 + public List<String> findAllKeysByEntityIds(TenantId tenantId, EntityType entityType, List<EntityId> entityIds) {
  151 + return eventKvRepository
  152 + .findAllKeysByEntityIds(entityIds.stream().map(EntityId::getId).collect(Collectors.toList()));
  153 + }
  154 +
  155 + @Override
  156 + public ListenableFuture<Void> save(TenantId tenantId, EntityId entityId,TkEventType eventType, String eventIdentifier, TkEventKvEntry attribute) {
  157 + TkEventKvEntity entity = new TkEventKvEntity();
  158 + entity.setId(new TkEventKvCompositeKey(eventType, entityId.getId(), eventIdentifier, attribute.getEventTime(),attribute.getKey()));
  159 +
  160 +// entity.setDeviceProfileId();
  161 + entity.setStrValue(attribute.getStrValue().orElse(null));
  162 + entity.setDoubleValue(attribute.getDoubleValue().orElse(null));
  163 + entity.setLongValue(attribute.getLongValue().orElse(null));
  164 + entity.setBooleanValue(attribute.getBooleanValue().orElse(null));
  165 + entity.setJsonValue(attribute.getJsonValue().orElse(null));
  166 + return addToQueue(entity);
  167 + }
  168 +
  169 + private ListenableFuture<Void> addToQueue(TkEventKvEntity entity) {
  170 + return queue.add(entity);
  171 + }
  172 +
  173 + @Override
  174 + public ListenableFuture<List<Void>> removeAll(TenantId tenantId, EntityId entityId, String eventId,TkEventType eventType,List<String> keys) {
  175 + return service.submit(() -> {
  176 + keys.forEach(key ->
  177 + eventKvRepository.delete(eventType,entityId.getId(), eventId,key)
  178 + );
  179 + return null;
  180 + });
  181 + }
  182 +
  183 + private TkEventKvCompositeKey getEventKvCompositeKey(TkEventType eventType,EntityId entityId, String eventIdentifier,String eventKey, Long eventTime) {
  184 + return new TkEventKvCompositeKey(
  185 + eventType,
  186 + entityId.getId(),
  187 + eventIdentifier,
  188 + eventTime,
  189 + eventKey);
  190 + }
  191 +}
  1 +/**
  2 + * Copyright © 2016-2022 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.server.dao.yunteng.jpa.repository.event;
  17 +
  18 +import org.springframework.stereotype.Repository;
  19 +import org.springframework.transaction.annotation.Transactional;
  20 +import org.thingsboard.server.dao.util.PsqlDao;
  21 +
  22 +@PsqlDao
  23 +@Repository
  24 +@Transactional
  25 +public class PsqlEventsInsertRepository extends EventKvInsertRepository {
  26 +
  27 +}
@@ -22,6 +22,8 @@ import org.thingsboard.server.common.data.id.TenantId; @@ -22,6 +22,8 @@ import org.thingsboard.server.common.data.id.TenantId;
22 import org.thingsboard.server.common.data.kv.AttributeKvEntry; 22 import org.thingsboard.server.common.data.kv.AttributeKvEntry;
23 import org.thingsboard.server.common.data.kv.DeleteTsKvQuery; 23 import org.thingsboard.server.common.data.kv.DeleteTsKvQuery;
24 import org.thingsboard.server.common.data.kv.TsKvEntry; 24 import org.thingsboard.server.common.data.kv.TsKvEntry;
  25 +import org.thingsboard.server.common.data.yunteng.dto.TkEventKvEntry;
  26 +import org.thingsboard.server.common.data.yunteng.enums.TkEventType;
25 27
26 import java.util.Collection; 28 import java.util.Collection;
27 import java.util.List; 29 import java.util.List;
@@ -41,6 +43,9 @@ public interface RuleEngineTelemetryService { @@ -41,6 +43,9 @@ public interface RuleEngineTelemetryService {
41 43
42 void saveAndNotify(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes, boolean notifyDevice, FutureCallback<Void> callback); 44 void saveAndNotify(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes, boolean notifyDevice, FutureCallback<Void> callback);
43 45
  46 + //Thingskit function
  47 + void saveAndNotify(TenantId tenantId, EntityId entityId, String eventId, TkEventType eventType, List<TkEventKvEntry> attributes, FutureCallback<Void> callback);
  48 +
44 void saveLatestAndNotify(TenantId tenantId, EntityId entityId, List<TsKvEntry> ts, FutureCallback<Void> callback); 49 void saveLatestAndNotify(TenantId tenantId, EntityId entityId, List<TsKvEntry> ts, FutureCallback<Void> callback);
45 50
46 void saveAttrAndNotify(TenantId tenantId, EntityId entityId, String scope, String key, long value, FutureCallback<Void> callback); 51 void saveAttrAndNotify(TenantId tenantId, EntityId entityId, String scope, String key, long value, FutureCallback<Void> callback);
@@ -33,7 +33,7 @@ import org.thingsboard.server.common.msg.session.SessionMsgType; @@ -33,7 +33,7 @@ import org.thingsboard.server.common.msg.session.SessionMsgType;
33 type = ComponentType.FILTER, 33 type = ComponentType.FILTER,
34 name = "message type switch", 34 name = "message type switch",
35 configClazz = EmptyNodeConfiguration.class, 35 configClazz = EmptyNodeConfiguration.class,
36 - relationTypes = {"Post attributes", "Post telemetry", "RPC Request from Device", "RPC Request to Device", "RPC Queued", "RPC Sent", "RPC Delivered", "RPC Successful", "RPC Timeout", "RPC Expired", "RPC Failed", "RPC Deleted", 36 + relationTypes = {"Post event","Post attributes", "Post telemetry", "RPC Request from Device", "RPC Request to Device", "RPC Queued", "RPC Sent", "RPC Delivered", "RPC Successful", "RPC Timeout", "RPC Expired", "RPC Failed", "RPC Deleted",
37 "Activity Event", "Inactivity Event", "Connect Event", "Disconnect Event", "Entity Created", "Entity Updated", "Entity Deleted", "Entity Assigned", 37 "Activity Event", "Inactivity Event", "Connect Event", "Disconnect Event", "Entity Created", "Entity Updated", "Entity Deleted", "Entity Assigned",
38 "Entity Unassigned", "Attributes Updated", "Attributes Deleted", "Alarm Acknowledged", "Alarm Cleared", "Other", "Entity Assigned From Tenant", "Entity Assigned To Tenant", 38 "Entity Unassigned", "Attributes Updated", "Attributes Deleted", "Alarm Acknowledged", "Alarm Cleared", "Other", "Entity Assigned From Tenant", "Entity Assigned To Tenant",
39 "Timeseries Updated", "Timeseries Deleted"}, 39 "Timeseries Updated", "Timeseries Deleted"},
@@ -57,7 +57,14 @@ public class TbMsgTypeSwitchNode implements TbNode { @@ -57,7 +57,14 @@ public class TbMsgTypeSwitchNode implements TbNode {
57 relationType = "Post attributes"; 57 relationType = "Post attributes";
58 } else if (msg.getType().equals(SessionMsgType.POST_TELEMETRY_REQUEST.name())) { 58 } else if (msg.getType().equals(SessionMsgType.POST_TELEMETRY_REQUEST.name())) {
59 relationType = "Post telemetry"; 59 relationType = "Post telemetry";
60 - } else if (msg.getType().equals(SessionMsgType.TO_SERVER_RPC_REQUEST.name())) { 60 + }
  61 +
  62 + //Thingskit function
  63 + else if (msg.getType().equals(SessionMsgType.POST_EVENT_REQUEST.name())) {
  64 + relationType = "Post event";
  65 + }
  66 +
  67 + else if (msg.getType().equals(SessionMsgType.TO_SERVER_RPC_REQUEST.name())) {
61 relationType = "RPC Request from Device"; 68 relationType = "RPC Request from Device";
62 } else if (msg.getType().equals(DataConstants.ACTIVITY_EVENT)) { 69 } else if (msg.getType().equals(DataConstants.ACTIVITY_EVENT)) {
63 relationType = "Activity Event"; 70 relationType = "Activity Event";
@@ -26,6 +26,7 @@ import javax.annotation.Nullable; @@ -26,6 +26,7 @@ import javax.annotation.Nullable;
26 * Created by ashvayka on 02.04.18. 26 * Created by ashvayka on 02.04.18.
27 */ 27 */
28 @Data 28 @Data
  29 +public
29 class TelemetryNodeCallback implements FutureCallback<Void> { 30 class TelemetryNodeCallback implements FutureCallback<Void> {
30 private final TbContext ctx; 31 private final TbContext ctx;
31 private final TbMsg msg; 32 private final TbMsg msg;
  1 +/**
  2 + * Copyright © 2016-2022 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.rule.engine.yunteng.event;
  17 +
  18 +import com.google.gson.JsonParser;
  19 +import lombok.extern.slf4j.Slf4j;
  20 +import org.apache.commons.lang3.StringUtils;
  21 +import org.thingsboard.rule.engine.api.*;
  22 +import org.thingsboard.rule.engine.api.util.TbNodeUtils;
  23 +import org.thingsboard.rule.engine.telemetry.TelemetryNodeCallback;
  24 +import org.thingsboard.server.common.data.kv.AttributeKvEntry;
  25 +import org.thingsboard.server.common.data.plugin.ComponentType;
  26 +import org.thingsboard.server.common.data.yunteng.dto.TkEventKvEntry;
  27 +import org.thingsboard.server.common.data.yunteng.enums.TkEventType;
  28 +import org.thingsboard.server.common.msg.TbMsg;
  29 +import org.thingsboard.server.common.msg.session.SessionMsgType;
  30 +import org.thingsboard.server.common.transport.adaptor.JsonConverter;
  31 +
  32 +import java.util.ArrayList;
  33 +import java.util.Set;
  34 +
  35 +import static org.thingsboard.server.dao.model.ModelConstants.EVENT_IDENTIFIER_COLUMN;
  36 +import static org.thingsboard.server.dao.model.ModelConstants.EVENT_TYPE_COLUMN;
  37 +
  38 +@Slf4j
  39 +@RuleNode(
  40 + type = ComponentType.ACTION,
  41 + name = "save event",
  42 + configClazz = TkMsgEventNodeConfiguration.class,
  43 + nodeDescription = "Saves device event data",
  44 + nodeDetails = "Saves entity event . Expects messages with 'POST_EVENT_REQUEST' message type",
  45 + uiResources = {"static/rulenode/rulenode-core-config.js"},
  46 + configDirective = "tkMsgEventNodeConfiguration",
  47 + icon = "file_upload"
  48 +)
  49 +public class TkMsgEventNode implements TbNode {
  50 +
  51 + private TkMsgEventNodeConfiguration config;
  52 +
  53 + @Override
  54 + public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
  55 + this.config = TbNodeUtils.convert(configuration, TkMsgEventNodeConfiguration.class);
  56 + }
  57 +
  58 + @Override
  59 + public void onMsg(TbContext ctx, TbMsg msg) {
  60 + if (!msg.getType().equals(SessionMsgType.POST_EVENT_REQUEST.name())) {
  61 + ctx.tellFailure(msg, new IllegalArgumentException("Unsupported msg type: " + msg.getType()));
  62 + return;
  63 + }
  64 + String src = msg.getData();
  65 + Set<TkEventKvEntry> events = JsonConverter.convertToEvents(new JsonParser().parse(src));
  66 + String eventIdentifier = msg.getMetaData().getValue(EVENT_IDENTIFIER_COLUMN);
  67 + TkEventType eventType = TkEventType.valueOf(msg.getMetaData().getValue(EVENT_TYPE_COLUMN));
  68 + ctx.getTelemetryService().saveAndNotify(
  69 + ctx.getTenantId(),
  70 + msg.getOriginator(),
  71 + eventIdentifier,
  72 + eventType,
  73 + new ArrayList<>(events),
  74 + new TelemetryNodeCallback(ctx, msg)
  75 + );
  76 + }
  77 +
  78 + @Override
  79 + public void destroy() {
  80 + }
  81 +
  82 +}
  1 +/**
  2 + * Copyright © 2016-2022 The Thingsboard Authors
  3 + *
  4 + * Licensed under the Apache License, Version 2.0 (the "License");
  5 + * you may not use this file except in compliance with the License.
  6 + * You may obtain a copy of the License at
  7 + *
  8 + * http://www.apache.org/licenses/LICENSE-2.0
  9 + *
  10 + * Unless required by applicable law or agreed to in writing, software
  11 + * distributed under the License is distributed on an "AS IS" BASIS,
  12 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  13 + * See the License for the specific language governing permissions and
  14 + * limitations under the License.
  15 + */
  16 +package org.thingsboard.rule.engine.yunteng.event;
  17 +
  18 +import lombok.Data;
  19 +import org.thingsboard.rule.engine.api.NodeConfiguration;
  20 +import org.thingsboard.server.common.data.DataConstants;
  21 +
  22 +@Data
  23 +public class TkMsgEventNodeConfiguration implements NodeConfiguration<TkMsgEventNodeConfiguration> {
  24 +
  25 +
  26 + private boolean useServerTs;
  27 + @Override
  28 + public TkMsgEventNodeConfiguration defaultConfiguration() {
  29 + TkMsgEventNodeConfiguration configuration = new TkMsgEventNodeConfiguration();
  30 + configuration.setUseServerTs(true);
  31 + return configuration;
  32 + }
  33 +}