Commit 2133601441b6e4e26326b16f56e2e94d556a0633

Authored by 芯火源
1 parent 3884672e

refactor: 设备事件上报逻辑调整

1、从设备配置中取物模型中的事件
2、设备事件表的事件标识、事件名称、事件类型从物模型获取
3、设备配置增加物模型
... ... @@ -32,6 +32,7 @@ import org.thingsboard.server.common.data.id.DeviceProfileId;
32 32 import org.thingsboard.server.common.data.id.EntityId;
33 33 import org.thingsboard.server.common.data.id.TenantId;
34 34 import org.thingsboard.server.common.data.kv.*;
  35 +import org.thingsboard.server.common.data.yunteng.dto.TkEventKvEntry;
35 36 import org.thingsboard.server.common.data.yunteng.enums.DeviceEventTypeEnum;
36 37 import org.thingsboard.server.common.msg.queue.ServiceType;
37 38 import org.thingsboard.server.common.msg.queue.TbCallback;
... ... @@ -235,13 +236,13 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
235 236
236 237 //Thingskit function
237 238 @Override
238   - public void saveAndNotify(TenantId tenantId, DeviceProfileId profileId, EntityId entityId, String eventId, DeviceEventTypeEnum eventType, String eventData, Long eventTime, FutureCallback<Void> callback) {
  239 + public void saveAndNotify(TenantId tenantId, DeviceProfileId profileId, EntityId entityId, List<TkEventKvEntry> events, String eventData, Long eventTime, FutureCallback<Void> callback) {
239 240 checkInternalEntity(entityId);
240   - saveAndNotifyInternal(tenantId,profileId, entityId, eventId,eventType, eventData,eventTime, callback);
  241 + saveAndNotifyInternal(tenantId,profileId, entityId, events, eventData,eventTime, callback);
241 242 }
242 243 @Override
243   - public void saveAndNotifyInternal(TenantId tenantId,DeviceProfileId profileId, EntityId entityId,String eventId, DeviceEventTypeEnum eventType, String eventData,Long eventTime, FutureCallback<Void> callback) {
244   - ListenableFuture<List<Void>> saveFuture = eventsService.save(tenantId,profileId, entityId,eventId,eventType, eventData,eventTime);
  244 + public void saveAndNotifyInternal(TenantId tenantId,DeviceProfileId profileId, EntityId entityId,List<TkEventKvEntry> events, String eventData,Long eventTime, FutureCallback<Void> callback) {
  245 + ListenableFuture<List<Void>> saveFuture = eventsService.save(tenantId,profileId, entityId,events, eventData,eventTime);
245 246 addVoidCallback(saveFuture, callback);
246 247 // addWsCallback(saveFuture, success -> onAttributesUpdate(tenantId, entityId, scope, attributes, notifyDevice));
247 248 }
... ...
... ... @@ -39,7 +39,7 @@ public interface InternalTelemetryService extends RuleEngineTelemetryService {
39 39 void saveAndNotifyInternal(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes, boolean notifyDevice, FutureCallback<Void> callback);
40 40
41 41 //Thingskit function
42   - void saveAndNotifyInternal(TenantId tenantId, DeviceProfileId profileId, EntityId entityId, String eventId, DeviceEventTypeEnum eventType, String eventData,Long eventTime, FutureCallback<Void> callback);
  42 + void saveAndNotifyInternal(TenantId tenantId, DeviceProfileId profileId, EntityId entityId, List<TkEventKvEntry> eventKvEntry, String eventData, Long eventTime, FutureCallback<Void> callback);
43 43
44 44 void saveLatestAndNotifyInternal(TenantId tenantId, EntityId entityId, List<TsKvEntry> ts, FutureCallback<Void> callback);
45 45
... ...
... ... @@ -22,6 +22,7 @@ import org.thingsboard.server.common.data.id.TenantId;
22 22 import org.thingsboard.server.common.data.page.PageData;
23 23 import org.thingsboard.server.common.data.page.TimePageLink;
24 24 import org.thingsboard.server.common.data.yunteng.dto.TkEventKvDto;
  25 +import org.thingsboard.server.common.data.yunteng.dto.TkEventKvEntry;
25 26 import org.thingsboard.server.common.data.yunteng.enums.DeviceEventTypeEnum;
26 27
27 28 import java.util.List;
... ... @@ -31,7 +32,7 @@ import java.util.List;
31 32 */
32 33 public interface TkEventsService {
33 34 PageData<TkEventKvDto> findEvents(EntityId entityId,String eventIdentifier, DeviceEventTypeEnum eventType, TimePageLink pageLink);
34   - ListenableFuture<List<Void>> save(TenantId tenantId, DeviceProfileId profileId,EntityId entityId, String eventId, DeviceEventTypeEnum eventType, String eventData,Long eventTime);
  35 + ListenableFuture<List<Void>> save(TenantId tenantId, DeviceProfileId profileId, EntityId entityId, List<TkEventKvEntry> eventKvEntry, String eventData, Long eventTime);
35 36
36 37
37 38
... ...
... ... @@ -18,6 +18,7 @@ package org.thingsboard.server.common.data.device.profile;
18 18 import io.swagger.annotations.ApiModel;
19 19 import io.swagger.annotations.ApiModelProperty;
20 20 import lombok.Data;
  21 +import org.thingsboard.server.common.data.yunteng.dto.ThingsModelDTO;
21 22
22 23 import javax.validation.Valid;
23 24 import java.io.Serializable;
... ... @@ -38,4 +39,9 @@ public class DeviceProfileData implements Serializable {
38 39 @ApiModelProperty(position = 4, value = "JSON array of alarm rules configuration per device profile")
39 40 private List<DeviceProfileAlarm> alarms;
40 41
  42 + //Thingskit function
  43 + @Valid
  44 + @ApiModelProperty(position = 1, value = "JSON object of has published things model configuration")
  45 + private List<ThingsModelDTO> thingsModel;
  46 +
41 47 }
... ...
1   -/**
2   - * Copyright © 2016-2022 The Thingsboard Authors
3   - *
4   - * Licensed under the Apache License, Version 2.0 (the "License");
5   - * you may not use this file except in compliance with the License.
6   - * You may obtain a copy of the License at
7   - *
8   - * http://www.apache.org/licenses/LICENSE-2.0
9   - *
10   - * Unless required by applicable law or agreed to in writing, software
11   - * distributed under the License is distributed on an "AS IS" BASIS,
12   - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   - * See the License for the specific language governing permissions and
14   - * limitations under the License.
15   - */
16 1 package org.thingsboard.server.common.data.yunteng.dto;
17 2
18   -import org.thingsboard.server.common.data.kv.KvEntry;
  3 +import lombok.Data;
  4 +import org.thingsboard.server.common.data.yunteng.enums.DeviceEventTypeEnum;
  5 +
  6 +import java.io.Serializable;
19 7
20 8 /**
21 9 * @author Andrew Shvayka
22 10 */
23   -public interface TkEventKvEntry extends KvEntry {
  11 +@Data
  12 +public class TkEventKvEntry implements Serializable {
  13 +
24 14
25   - long getEventTime();
  15 + private DeviceEventTypeEnum eventType;
  16 + private String eventName;
  17 + private String eventIdentifier;
26 18
27 19 }
... ...
... ... @@ -25,10 +25,13 @@ import org.thingsboard.server.common.data.id.TenantId;
25 25 import org.thingsboard.server.common.data.page.PageData;
26 26 import org.thingsboard.server.common.data.page.TimePageLink;
27 27 import org.thingsboard.server.common.data.yunteng.dto.TkEventKvDto;
  28 +import org.thingsboard.server.common.data.yunteng.dto.TkEventKvEntry;
28 29 import org.thingsboard.server.common.data.yunteng.enums.DeviceEventTypeEnum;
  30 +import org.thingsboard.server.dao.attributes.AttributeUtils;
29 31 import org.thingsboard.server.dao.yunteng.event.TkEventsService;
30 32
31 33 import java.util.List;
  34 +import java.util.stream.Collectors;
32 35
33 36 import static org.thingsboard.server.dao.yunteng.jpa.dao.event.EventUtils.validate;
34 37
... ... @@ -53,10 +56,10 @@ public class BaseEventsService implements TkEventsService {
53 56 }
54 57
55 58 @Override
56   - public ListenableFuture<List<Void>> save(TenantId tenantId,DeviceProfileId profileId, EntityId entityId, String eventId, DeviceEventTypeEnum eventType, String eventData,Long eventTime) {
57   - validate(entityId, eventId,eventType);
58   -
59   - ListenableFuture<Void> saveFutures = eventsDao.save(tenantId, profileId,entityId, eventType,eventId, eventData,eventTime);
  59 + public ListenableFuture<List<Void>> save(TenantId tenantId, DeviceProfileId profileId, EntityId entityId, List<TkEventKvEntry> events, String eventData, Long eventTime) {
  60 + validate(entityId);
  61 + events.forEach(event -> validate(event));
  62 + List<ListenableFuture<Void>> saveFutures = events.stream().map(event -> eventsDao.save(tenantId, profileId,entityId, event,eventData,eventTime)).collect(Collectors.toList());
60 63 return Futures.allAsList(saveFutures);
61 64 }
62 65
... ...
... ... @@ -44,11 +44,8 @@ public class EventUtils {
44 44 public static void validate(TkEventKvEntry kvEntry) {
45 45 if (kvEntry == null) {
46 46 throw new IncorrectParameterException("Key value entry can't be null");
47   - } else if (kvEntry.getDataType() == null) {
48   - throw new IncorrectParameterException("Incorrect kvEntry. Data type can't be null");
49   - } else {
50   - Validator.validateString(kvEntry.getKey(), "Incorrect kvEntry. Key can't be empty");
51   - Validator.validatePositiveNumber(kvEntry.getEventTime(), "Incorrect last update ts. Ts should be positive");
  47 + } else if (kvEntry.getEventType() == null) {
  48 + throw new IncorrectParameterException("Incorrect event type. event type can't be null");
52 49 }
53 50 }
54 51 }
... ...
... ... @@ -22,8 +22,11 @@ import org.thingsboard.server.common.data.id.TenantId;
22 22 import org.thingsboard.server.common.data.page.PageData;
23 23 import org.thingsboard.server.common.data.page.TimePageLink;
24 24 import org.thingsboard.server.common.data.yunteng.dto.TkEventKvDto;
  25 +import org.thingsboard.server.common.data.yunteng.dto.TkEventKvEntry;
25 26 import org.thingsboard.server.common.data.yunteng.enums.DeviceEventTypeEnum;
26 27
  28 +import java.util.List;
  29 +
27 30 /**
28 31 * @author Andrew Shvayka
29 32 */
... ... @@ -31,6 +34,6 @@ public interface EventsDao {
31 34
32 35 PageData<TkEventKvDto> findEvents(EntityId entityId,String eventIdentifier, DeviceEventTypeEnum eventType, TimePageLink pageLink);
33 36
34   - ListenableFuture<Void> save(TenantId tenantId, DeviceProfileId profileId,EntityId entityId, DeviceEventTypeEnum attributeType,String eventIdentifier, String eventData,Long eventTime);
  37 + ListenableFuture<Void> save(TenantId tenantId, DeviceProfileId profileId, EntityId entityId, TkEventKvEntry eventKvEntry, String eventData, Long eventTime);
35 38
36 39 }
... ...
... ... @@ -26,6 +26,7 @@ import org.thingsboard.server.common.data.id.TenantId;
26 26 import org.thingsboard.server.common.data.page.PageData;
27 27 import org.thingsboard.server.common.data.page.TimePageLink;
28 28 import org.thingsboard.server.common.data.yunteng.dto.TkEventKvDto;
  29 +import org.thingsboard.server.common.data.yunteng.dto.TkEventKvEntry;
29 30 import org.thingsboard.server.common.data.yunteng.enums.DeviceEventTypeEnum;
30 31 import org.thingsboard.server.common.stats.StatsFactory;
31 32 import org.thingsboard.server.dao.DaoUtil;
... ... @@ -40,6 +41,7 @@ import org.thingsboard.server.dao.yunteng.jpa.entity.events.TkEventKvEntity;
40 41 import javax.annotation.PostConstruct;
41 42 import javax.annotation.PreDestroy;
42 43 import java.util.Comparator;
  44 +import java.util.List;
43 45 import java.util.function.Function;
44 46
45 47 @Component
... ... @@ -111,10 +113,10 @@ public class JpaEventDao extends JpaAbstractDaoListeningExecutorService implemen
111 113
112 114
113 115 @Override
114   - public ListenableFuture<Void> save(TenantId tenantId, DeviceProfileId profileId,EntityId entityId,DeviceEventTypeEnum eventType, String eventIdentifier, String eventData,Long eventTime) {
  116 + public ListenableFuture<Void> save(TenantId tenantId, DeviceProfileId profileId, EntityId entityId,TkEventKvEntry eventKvEntry, String eventData, Long eventTime) {
115 117 TkEventKvEntity entity = new TkEventKvEntity();
116   - entity.setId(new TkEventKvCompositeKey(eventType, entityId.getId(), eventIdentifier, eventTime));
117   - entity.setEventName("test");
  118 + entity.setId(new TkEventKvCompositeKey(eventKvEntry.getEventType(), entityId.getId(), eventKvEntry.getEventIdentifier(), eventTime));
  119 + entity.setEventName(eventKvEntry.getEventName());
118 120 entity.setDeviceProfileId(profileId.getId());
119 121 entity.setEventValue(eventData);
120 122 return addToQueue(entity);
... ...
... ... @@ -45,7 +45,7 @@ public interface RuleEngineTelemetryService {
45 45 void saveAndNotify(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes, boolean notifyDevice, FutureCallback<Void> callback);
46 46
47 47 //Thingskit function
48   - void saveAndNotify(TenantId tenantId, DeviceProfileId profileId, EntityId entityId, String eventId, DeviceEventTypeEnum eventType, String eventData,Long eventTime, FutureCallback<Void> callback);
  48 + void saveAndNotify(TenantId tenantId, DeviceProfileId profileId, EntityId entityId, List<TkEventKvEntry> eventKvEntry, String eventData, Long eventTime, FutureCallback<Void> callback);
49 49
50 50 void saveLatestAndNotify(TenantId tenantId, EntityId entityId, List<TsKvEntry> ts, FutureCallback<Void> callback);
51 51
... ...
... ... @@ -19,13 +19,21 @@ import lombok.extern.slf4j.Slf4j;
19 19 import org.thingsboard.rule.engine.api.*;
20 20 import org.thingsboard.rule.engine.api.util.TbNodeUtils;
21 21 import org.thingsboard.rule.engine.telemetry.TelemetryNodeCallback;
  22 +import org.thingsboard.server.common.data.DeviceProfile;
22 23 import org.thingsboard.server.common.data.id.DeviceProfileId;
  24 +import org.thingsboard.server.common.data.id.TenantId;
23 25 import org.thingsboard.server.common.data.plugin.ComponentType;
  26 +import org.thingsboard.server.common.data.yunteng.dto.ThingsModelDTO;
  27 +import org.thingsboard.server.common.data.yunteng.dto.TkEventKvEntry;
24 28 import org.thingsboard.server.common.data.yunteng.enums.DeviceEventTypeEnum;
  29 +import org.thingsboard.server.common.data.yunteng.enums.FunctionTypeEnum;
25 30 import org.thingsboard.server.common.msg.TbMsg;
26 31 import org.thingsboard.server.common.msg.session.SessionMsgType;
27 32
  33 +import java.util.ArrayList;
  34 +import java.util.List;
28 35 import java.util.UUID;
  36 +import java.util.stream.Collectors;
29 37
30 38 import static org.thingsboard.server.dao.model.ModelConstants.*;
31 39
... ... @@ -57,17 +65,29 @@ public class TkMsgEventNode implements TbNode {
57 65 ctx.tellFailure(msg, new IllegalArgumentException("Unsupported msg type: " + msg.getType()));
58 66 return;
59 67 }
60   - String eventIdentifier = msg.getMetaData().getValue(EVENT_IDENTIFIER_COLUMN);
61 68 String deviceProfileId = msg.getMetaData().getValue(DEVICE_DEVICE_PROFILE_ID_PROPERTY);
  69 + TenantId tenantId = ctx.getTenantId();
  70 + DeviceProfileId profileId = new DeviceProfileId(UUID.fromString(deviceProfileId));
  71 + DeviceProfile profile = cache.get(tenantId,profileId);
  72 + String eventIdentifier = msg.getMetaData().getValue(EVENT_IDENTIFIER_COLUMN);
  73 + List<ThingsModelDTO> eventList =profile.getProfileData().getThingsModel().stream()
  74 + .filter(f -> f.getIdentifier().equals(eventIdentifier) && FunctionTypeEnum.events.equals(f.getFunctionType()))
  75 + .collect(Collectors.toList());
  76 + List<TkEventKvEntry> entryList = new ArrayList<>();
  77 + eventList.stream().forEach(i ->{
  78 + TkEventKvEntry item = new TkEventKvEntry();
  79 + item.setEventIdentifier(i.getIdentifier());
  80 + item.setEventType(i.getEventType());
  81 + item.setEventName(i.getFunctionName());
  82 + });
62 83 ////TODO: 验证事件类型、事件标识符和数据建是否与产品物模型中的事件匹配
63 84 long ts = System.currentTimeMillis();
64 85 String src = msg.getData();
65 86 DeviceEventTypeEnum eventType = DeviceEventTypeEnum.valueOf(msg.getMetaData().getValue(EVENT_TYPE_COLUMN));
66 87 ctx.getTelemetryService().saveAndNotify(
67   - ctx.getTenantId(),new DeviceProfileId(UUID.fromString(deviceProfileId)),
  88 + tenantId,profileId,
68 89 msg.getOriginator(),
69   - eventIdentifier,
70   - eventType,
  90 + entryList,
71 91 src,ts,
72 92 new TelemetryNodeCallback(ctx, msg)
73 93 );
... ...