Commit 7caaa6d56e3411b4da89080298d374d52f811418

Authored by 芯火源
1 parent 292127b5

refactor: 设备事件分页查询

Showing 19 changed files with 166 additions and 472 deletions
... ... @@ -316,7 +316,7 @@ public abstract class BaseController {
316 316 }
317 317 }
318 318
319   - void checkParameter(String name, String param) throws ThingsboardException {
  319 + protected void checkParameter(String name, String param) throws ThingsboardException {
320 320 if (StringUtils.isEmpty(param)) {
321 321 throw new ThingsboardException("参数【 " + name + "】不能为空!", ThingsboardErrorCode.BAD_REQUEST_PARAMS);
322 322 }
... ...
  1 +package org.thingsboard.server.controller.yunteng;
  2 +
  3 +import io.swagger.annotations.Api;
  4 +import io.swagger.annotations.ApiOperation;
  5 +import lombok.RequiredArgsConstructor;
  6 +import org.springframework.web.bind.annotation.GetMapping;
  7 +import org.springframework.web.bind.annotation.RequestMapping;
  8 +import org.springframework.web.bind.annotation.RequestParam;
  9 +import org.springframework.web.bind.annotation.RestController;
  10 +import org.thingsboard.server.common.data.exception.ThingsboardException;
  11 +import org.thingsboard.server.common.data.id.DeviceId;
  12 +import org.thingsboard.server.common.data.page.PageData;
  13 +import org.thingsboard.server.common.data.page.TimePageLink;
  14 +import org.thingsboard.server.common.data.yunteng.dto.TkEventKvDto;
  15 +import org.thingsboard.server.common.data.yunteng.enums.DeviceEventTypeEnum;
  16 +import org.thingsboard.server.common.data.yunteng.enums.OrderTypeEnum;
  17 +import org.thingsboard.server.controller.BaseController;
  18 +import org.thingsboard.server.dao.model.ModelConstants;
  19 +import org.thingsboard.server.dao.yunteng.event.TkEventsService;
  20 +
  21 +import java.util.UUID;
  22 +
  23 +import static org.thingsboard.server.common.data.yunteng.constant.QueryConstant.*;
  24 +
  25 +/**
  26 + * @author Administrator
  27 + */
  28 +@RestController
  29 +@RequestMapping("api/yt/event")
  30 +@Api(tags = {"设备事件"})
  31 +@RequiredArgsConstructor
  32 +public class TkEventController extends BaseController {
  33 + private final TkEventsService eventsService;
  34 +
  35 + @GetMapping(params = {PAGE_SIZE, PAGE})
  36 + @ApiOperation("分页")
  37 + public PageData<TkEventKvDto> pageAlarmProfile(
  38 + @RequestParam(PAGE_SIZE) int pageSize,
  39 + @RequestParam(PAGE) int page,
  40 + @RequestParam(value = "eventIdentifier", required = false) String eventIdentifier,
  41 + @RequestParam(value = "eventType", required = false) DeviceEventTypeEnum eventType,
  42 + @RequestParam(value = "tbDeviceId", required = true) String tbDeviceId,
  43 + @RequestParam(required = false) Long startTime,
  44 + @RequestParam(required = false) Long endTime,
  45 + @RequestParam(value = ORDER_FILED, required = false) String orderBy,
  46 + @RequestParam(value = ORDER_TYPE, required = false) OrderTypeEnum orderType)
  47 + throws ThingsboardException {
  48 + checkParameter("tbDeviceId", tbDeviceId);
  49 +
  50 + if (orderBy != null && orderBy.isEmpty()) {
  51 + orderBy = ModelConstants.EVENT_TIME_COLUMN;
  52 + }
  53 + TimePageLink pageLink = createTimePageLink(pageSize, page, null, orderBy, orderType==null?"":orderType.name(), startTime, endTime);
  54 + return checkNotNull(eventsService.findEvents(new DeviceId(UUID.fromString(tbDeviceId)),eventIdentifier,eventType, pageLink));
  55 + }
  56 +
  57 +
  58 +}
... ...
... ... @@ -32,7 +32,6 @@ import org.thingsboard.server.common.data.id.DeviceProfileId;
32 32 import org.thingsboard.server.common.data.id.EntityId;
33 33 import org.thingsboard.server.common.data.id.TenantId;
34 34 import org.thingsboard.server.common.data.kv.*;
35   -import org.thingsboard.server.common.data.yunteng.dto.TkEventKvEntry;
36 35 import org.thingsboard.server.common.data.yunteng.enums.DeviceEventTypeEnum;
37 36 import org.thingsboard.server.common.msg.queue.ServiceType;
38 37 import org.thingsboard.server.common.msg.queue.TbCallback;
... ... @@ -236,13 +235,13 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer
236 235
237 236 //Thingskit function
238 237 @Override
239   - public void saveAndNotify(TenantId tenantId, DeviceProfileId profileId, EntityId entityId, String eventId, DeviceEventTypeEnum eventType, List<TkEventKvEntry> attributes, FutureCallback<Void> callback) {
  238 + public void saveAndNotify(TenantId tenantId, DeviceProfileId profileId, EntityId entityId, String eventId, DeviceEventTypeEnum eventType, String eventData, Long eventTime, FutureCallback<Void> callback) {
240 239 checkInternalEntity(entityId);
241   - saveAndNotifyInternal(tenantId,profileId, entityId, eventId,eventType, attributes, callback);
  240 + saveAndNotifyInternal(tenantId,profileId, entityId, eventId,eventType, eventData,eventTime, callback);
242 241 }
243 242 @Override
244   - public void saveAndNotifyInternal(TenantId tenantId,DeviceProfileId profileId, EntityId entityId,String eventId, DeviceEventTypeEnum eventType, List<TkEventKvEntry> attributes, FutureCallback<Void> callback) {
245   - ListenableFuture<List<Void>> saveFuture = eventsService.save(tenantId,profileId, entityId,eventId,eventType, attributes);
  243 + public void saveAndNotifyInternal(TenantId tenantId,DeviceProfileId profileId, EntityId entityId,String eventId, DeviceEventTypeEnum eventType, String eventData,Long eventTime, FutureCallback<Void> callback) {
  244 + ListenableFuture<List<Void>> saveFuture = eventsService.save(tenantId,profileId, entityId,eventId,eventType, eventData,eventTime);
246 245 addVoidCallback(saveFuture, callback);
247 246 // addWsCallback(saveFuture, success -> onAttributesUpdate(tenantId, entityId, scope, attributes, notifyDevice));
248 247 }
... ...
... ... @@ -39,7 +39,7 @@ public interface InternalTelemetryService extends RuleEngineTelemetryService {
39 39 void saveAndNotifyInternal(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes, boolean notifyDevice, FutureCallback<Void> callback);
40 40
41 41 //Thingskit function
42   - void saveAndNotifyInternal(TenantId tenantId, DeviceProfileId profileId, EntityId entityId, String eventId, DeviceEventTypeEnum eventType, List<TkEventKvEntry> attributes, FutureCallback<Void> callback);
  42 + void saveAndNotifyInternal(TenantId tenantId, DeviceProfileId profileId, EntityId entityId, String eventId, DeviceEventTypeEnum eventType, String eventData,Long eventTime, FutureCallback<Void> callback);
43 43
44 44 void saveLatestAndNotifyInternal(TenantId tenantId, EntityId entityId, List<TsKvEntry> ts, FutureCallback<Void> callback);
45 45
... ...
... ... @@ -16,34 +16,23 @@
16 16 package org.thingsboard.server.dao.yunteng.event;
17 17
18 18 import com.google.common.util.concurrent.ListenableFuture;
19   -import org.thingsboard.server.common.data.EntityType;
20 19 import org.thingsboard.server.common.data.id.DeviceProfileId;
21 20 import org.thingsboard.server.common.data.id.EntityId;
22 21 import org.thingsboard.server.common.data.id.TenantId;
23   -import org.thingsboard.server.common.data.yunteng.dto.TkEventKvEntry;
  22 +import org.thingsboard.server.common.data.page.PageData;
  23 +import org.thingsboard.server.common.data.page.TimePageLink;
  24 +import org.thingsboard.server.common.data.yunteng.dto.TkEventKvDto;
24 25 import org.thingsboard.server.common.data.yunteng.enums.DeviceEventTypeEnum;
25 26
26   -import java.util.Collection;
27 27 import java.util.List;
28   -import java.util.Optional;
29 28
30 29 /**
31 30 * @author Andrew Shvayka
32 31 */
33 32 public interface TkEventsService {
  33 + PageData<TkEventKvDto> findEvents(EntityId entityId,String eventIdentifier, DeviceEventTypeEnum eventType, TimePageLink pageLink);
  34 + ListenableFuture<List<Void>> save(TenantId tenantId, DeviceProfileId profileId,EntityId entityId, String eventId, DeviceEventTypeEnum eventType, String eventData,Long eventTime);
34 35
35   - ListenableFuture<Optional<TkEventKvEntry>> find(TenantId tenantId, EntityId entityId, String scope,String eventKey);
36 36
37   - ListenableFuture<List<TkEventKvEntry>> find(TenantId tenantId, EntityId entityId, String scope, Collection<String> eventKeys);
38   -
39   - ListenableFuture<List<TkEventKvEntry>> findAll(TenantId tenantId, EntityId entityId,String eventId, DeviceEventTypeEnum eventType);
40   -
41   - ListenableFuture<List<Void>> save(TenantId tenantId, DeviceProfileId profileId,EntityId entityId, String eventId, DeviceEventTypeEnum eventType, List<TkEventKvEntry> attributes);
42   -
43   - ListenableFuture<List<Void>> removeAll(TenantId tenantId, EntityId entityId, String eventId, DeviceEventTypeEnum eventType, List<String> eventKeys);
44   -
45   - List<String> findAllKeysByDeviceProfileId(TenantId tenantId, DeviceProfileId deviceProfileId);
46   -
47   - List<String> findAllKeysByEntityIds(TenantId tenantId, EntityType entityType, List<EntityId> entityIds);
48 37
49 38 }
... ...
... ... @@ -15,107 +15,28 @@
15 15 */
16 16 package org.thingsboard.server.common.data.yunteng.dto;
17 17
18   -import org.thingsboard.server.common.data.kv.AttributeKvEntry;
19   -import org.thingsboard.server.common.data.kv.DataType;
20   -import org.thingsboard.server.common.data.kv.KvEntry;
  18 +import com.fasterxml.jackson.databind.JsonNode;
  19 +import io.swagger.annotations.ApiModel;
  20 +import lombok.Data;
  21 +import org.thingsboard.server.common.data.yunteng.enums.DeviceEventTypeEnum;
21 22
22   -import javax.validation.Valid;
23   -import java.util.Optional;
  23 +import java.util.UUID;
24 24
25 25 /**
26 26 * @author Andrew Shvayka
27 27 */
28   -public class TkEventKvDto implements TkEventKvEntry {
  28 +@Data
  29 +@ApiModel
  30 +public class TkEventKvDto {
29 31
30 32 private static final long serialVersionUID = -6460767583563159407L;
31 33
32 34 private final long eventTime;
33   - @Valid
34   - private final KvEntry kv;
35   -
36   - public TkEventKvDto(KvEntry kv, long eventTime) {
37   - this.kv = kv;
38   - this.eventTime = eventTime;
39   - }
40   -
41   - public TkEventKvDto(long lastUpdateTs, KvEntry kv) {
42   - this(kv, lastUpdateTs);
43   - }
44   -
45   - @Override
46   - public long getEventTime() {
47   - return eventTime;
48   - }
49   -
50   - @Override
51   - public String getKey() {
52   - return kv.getKey();
53   - }
54   -
55   - @Override
56   - public DataType getDataType() {
57   - return kv.getDataType();
58   - }
59   -
60   - @Override
61   - public Optional<String> getStrValue() {
62   - return kv.getStrValue();
63   - }
64   -
65   - @Override
66   - public Optional<Long> getLongValue() {
67   - return kv.getLongValue();
68   - }
69   -
70   - @Override
71   - public Optional<Boolean> getBooleanValue() {
72   - return kv.getBooleanValue();
73   - }
74   -
75   - @Override
76   - public Optional<Double> getDoubleValue() {
77   - return kv.getDoubleValue();
78   - }
79   -
80   - @Override
81   - public Optional<String> getJsonValue() {
82   - return kv.getJsonValue();
83   - }
84   -
85   - @Override
86   - public String getValueAsString() {
87   - return kv.getValueAsString();
88   - }
89   -
90   - @Override
91   - public Object getValue() {
92   - return kv.getValue();
93   - }
94   -
95   - @Override
96   - public boolean equals(Object o) {
97   - if (this == o) return true;
98   - if (o == null || getClass() != o.getClass()) return false;
99   -
100   - TkEventKvDto that = (TkEventKvDto) o;
101   -
102   - if (eventTime != that.eventTime) return false;
103   - return kv.equals(that.kv);
104   -
105   - }
106   -
107   - @Override
108   - public int hashCode() {
109   - int result = (int) (eventTime ^ (eventTime >>> 32));
110   - result = 31 * result + kv.hashCode();
111   - return result;
112   - }
113   -
114   - @Override
115   - public String toString() {
116   - return "BaseAttributeKvEntry{" +
117   - "lastUpdateTs=" + eventTime +
118   - ", kv=" + kv +
119   - '}';
120   - }
  35 + private final DeviceEventTypeEnum eventType;
  36 + private final JsonNode eventValue;
  37 + private final UUID entityId;
  38 + private final String eventIdentifier;
  39 + private final String eventName;
  40 + private final UUID deviceProfileId;
  41 + private final String deviceProfileName;
121 42 }
... ...
... ... @@ -15,53 +15,18 @@
15 15 */
16 16 package org.thingsboard.server.common.transport.adaptor;
17 17
18   -import com.google.gson.Gson;
19   -import com.google.gson.JsonArray;
20   -import com.google.gson.JsonElement;
21   -import com.google.gson.JsonObject;
22   -import com.google.gson.JsonParser;
23   -import com.google.gson.JsonPrimitive;
24   -import com.google.gson.JsonSyntaxException;
  18 +import com.google.gson.*;
25 19 import org.apache.commons.lang3.math.NumberUtils;
26 20 import org.springframework.util.StringUtils;
27 21 import org.thingsboard.server.common.data.DataConstants;
28 22 import org.thingsboard.server.common.data.id.DeviceId;
29   -import org.thingsboard.server.common.data.kv.AttributeKvEntry;
30   -import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry;
31   -import org.thingsboard.server.common.data.kv.BooleanDataEntry;
32   -import org.thingsboard.server.common.data.kv.DoubleDataEntry;
33   -import org.thingsboard.server.common.data.kv.JsonDataEntry;
34   -import org.thingsboard.server.common.data.kv.KvEntry;
35   -import org.thingsboard.server.common.data.kv.LongDataEntry;
36   -import org.thingsboard.server.common.data.kv.StringDataEntry;
37   -import org.thingsboard.server.common.data.yunteng.dto.TkEventKvDto;
38   -import org.thingsboard.server.common.data.yunteng.dto.TkEventKvEntry;
  23 +import org.thingsboard.server.common.data.kv.*;
39 24 import org.thingsboard.server.gen.transport.TransportProtos;
40   -import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg;
41   -import org.thingsboard.server.gen.transport.TransportProtos.ClaimDeviceMsg;
42   -import org.thingsboard.server.gen.transport.TransportProtos.CredentialsType;
43   -import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeResponseMsg;
44   -import org.thingsboard.server.gen.transport.TransportProtos.KeyValueProto;
45   -import org.thingsboard.server.gen.transport.TransportProtos.KeyValueType;
46   -import org.thingsboard.server.gen.transport.TransportProtos.PostAttributeMsg;
47   -import org.thingsboard.server.gen.transport.TransportProtos.PostTelemetryMsg;
48   -import org.thingsboard.server.gen.transport.TransportProtos.ProvisionDeviceResponseMsg;
49   -import org.thingsboard.server.gen.transport.TransportProtos.ResponseStatus;
50   -import org.thingsboard.server.gen.transport.TransportProtos.TsKvListProto;
51   -import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto;
52   -import org.thingsboard.server.gen.transport.TransportProtos.ValidateBasicMqttCredRequestMsg;
53   -import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenRequestMsg;
54   -import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg;
  25 +import org.thingsboard.server.gen.transport.TransportProtos.*;
55 26
56 27 import java.math.BigDecimal;
57   -import java.util.ArrayList;
58   -import java.util.HashMap;
59   -import java.util.HashSet;
60   -import java.util.List;
61   -import java.util.Map;
  28 +import java.util.*;
62 29 import java.util.Map.Entry;
63   -import java.util.Set;
64   -import java.util.TreeMap;
65 30 import java.util.function.Consumer;
66 31 import java.util.stream.Collectors;
67 32
... ... @@ -518,12 +483,6 @@ public class JsonConverter {
518 483 return result;
519 484 }
520 485
521   - public static Set<TkEventKvEntry> convertToEvents(JsonElement element) {
522   - Set<TkEventKvEntry> result = new HashSet<>();
523   - long ts = System.currentTimeMillis();
524   - result.addAll(parseValues(element.getAsJsonObject()).stream().map(kv -> new TkEventKvDto(kv, ts)).collect(Collectors.toList()));
525   - return result;
526   - }
527 486
528 487 private static List<KvEntry> parseValues(JsonObject valuesObject) {
529 488 List<KvEntry> result = new ArrayList<>();
... ...
... ... @@ -60,7 +60,7 @@ public class ModelConstants {
60 60 public static final String EVENT_IDENTIFIER_COLUMN = "event_identifier";
61 61 public static final String EVENT_TIME_COLUMN = "event_time";
62 62 public static final String EVENT_TYPE_COLUMN = "event_type";
63   - public static final String EVENT_KEY_COLUMN = "event_key";
  63 + public static final String EVENT_VALUE_COLUMN = "event_value";
64 64
65 65 /**
66 66 * Cassandra user constants.
... ...
... ... @@ -19,19 +19,16 @@ import com.google.common.util.concurrent.Futures;
19 19 import com.google.common.util.concurrent.ListenableFuture;
20 20 import lombok.extern.slf4j.Slf4j;
21 21 import org.springframework.stereotype.Service;
22   -import org.thingsboard.server.common.data.EntityType;
23 22 import org.thingsboard.server.common.data.id.DeviceProfileId;
24 23 import org.thingsboard.server.common.data.id.EntityId;
25 24 import org.thingsboard.server.common.data.id.TenantId;
26   -import org.thingsboard.server.common.data.yunteng.dto.TkEventKvEntry;
  25 +import org.thingsboard.server.common.data.page.PageData;
  26 +import org.thingsboard.server.common.data.page.TimePageLink;
  27 +import org.thingsboard.server.common.data.yunteng.dto.TkEventKvDto;
27 28 import org.thingsboard.server.common.data.yunteng.enums.DeviceEventTypeEnum;
28   -import org.thingsboard.server.dao.service.Validator;
29 29 import org.thingsboard.server.dao.yunteng.event.TkEventsService;
30 30
31   -import java.util.Collection;
32 31 import java.util.List;
33   -import java.util.Optional;
34   -import java.util.stream.Collectors;
35 32
36 33 import static org.thingsboard.server.dao.yunteng.jpa.dao.event.EventUtils.validate;
37 34
... ... @@ -48,48 +45,19 @@ public class BaseEventsService implements TkEventsService {
48 45 this.eventsDao = eventsDao;
49 46 }
50 47
51   - @Override
52   - public ListenableFuture<Optional<TkEventKvEntry>> find(TenantId tenantId, EntityId entityId, String eventId,String attributeKey) {
53   - validate(entityId, eventId);
54   - Validator.validateString(attributeKey, "Incorrect attribute key " + attributeKey);
55   - return eventsDao.find(tenantId, entityId, null, attributeKey);
56   - }
57 48
58 49 @Override
59   - public ListenableFuture<List<TkEventKvEntry>> find(TenantId tenantId, EntityId entityId, String eventId, Collection<String> attributeKeys) {
60   - validate(entityId, eventId);
61   - attributeKeys.forEach(attributeKey -> Validator.validateString(attributeKey, "Incorrect attribute key " + attributeKey));
62   - return eventsDao.find(tenantId, entityId, null, attributeKeys);
  50 + public PageData<TkEventKvDto> findEvents(EntityId entityId, String eventId, DeviceEventTypeEnum eventType, TimePageLink pageLink) {
  51 + validate(entityId);
  52 + return eventsDao.findEvents( entityId, eventId,eventType,pageLink);
63 53 }
64 54
65 55 @Override
66   - public ListenableFuture<List<TkEventKvEntry>> findAll(TenantId tenantId, EntityId entityId,String eventId, DeviceEventTypeEnum eventType) {
67   - validate(entityId,eventId);
68   - return eventsDao.findAll(tenantId, entityId, eventType);
69   - }
70   -
71   - @Override
72   - public List<String> findAllKeysByDeviceProfileId(TenantId tenantId, DeviceProfileId deviceProfileId) {
73   - return eventsDao.findAllKeysByDeviceProfileId(tenantId, deviceProfileId);
74   - }
75   -
76   - @Override
77   - public List<String> findAllKeysByEntityIds(TenantId tenantId, EntityType entityType, List<EntityId> entityIds) {
78   - return eventsDao.findAllKeysByEntityIds(tenantId, entityType, entityIds);
79   - }
80   -
81   - @Override
82   - public ListenableFuture<List<Void>> save(TenantId tenantId,DeviceProfileId profileId, EntityId entityId, String eventId, DeviceEventTypeEnum eventType, List<TkEventKvEntry> attributes) {
  56 + public ListenableFuture<List<Void>> save(TenantId tenantId,DeviceProfileId profileId, EntityId entityId, String eventId, DeviceEventTypeEnum eventType, String eventData,Long eventTime) {
83 57 validate(entityId, eventId,eventType);
84   - attributes.forEach(attribute -> validate(attribute));
85 58
86   - List<ListenableFuture<Void>> saveFutures = attributes.stream().map(attribute -> eventsDao.save(tenantId, profileId,entityId, eventType,eventId, attribute)).collect(Collectors.toList());
  59 + ListenableFuture<Void> saveFutures = eventsDao.save(tenantId, profileId,entityId, eventType,eventId, eventData,eventTime);
87 60 return Futures.allAsList(saveFutures);
88 61 }
89 62
90   - @Override
91   - public ListenableFuture<List<Void>> removeAll(TenantId tenantId, EntityId entityId, String eventId, DeviceEventTypeEnum eventType, List<String> eventKeys) {
92   - validate(entityId, eventId);
93   - return eventsDao.removeAll(tenantId, entityId, eventId,eventType, eventKeys);
94   - }
95 63 }
... ...
... ... @@ -22,9 +22,14 @@ import org.thingsboard.server.dao.exception.IncorrectParameterException;
22 22 import org.thingsboard.server.dao.service.Validator;
23 23
24 24 public class EventUtils {
  25 + public static void validate(EntityId id) {
  26 + Validator.validateId(id.getId(), "Incorrect id " + id);
  27 + }
  28 +
  29 +
25 30 public static void validate(EntityId id, String eventIdentifier) {
26 31 Validator.validateId(id.getId(), "Incorrect id " + id);
27   - Validator.validateString(eventIdentifier, "Incorrect scope " + eventIdentifier);
  32 + Validator.validateString(eventIdentifier, "Incorrect event identifier " + eventIdentifier);
28 33 }
29 34
30 35
... ...
... ... @@ -16,33 +16,21 @@
16 16 package org.thingsboard.server.dao.yunteng.jpa.dao.event;
17 17
18 18 import com.google.common.util.concurrent.ListenableFuture;
19   -import org.thingsboard.server.common.data.EntityType;
20 19 import org.thingsboard.server.common.data.id.DeviceProfileId;
21 20 import org.thingsboard.server.common.data.id.EntityId;
22 21 import org.thingsboard.server.common.data.id.TenantId;
23   -import org.thingsboard.server.common.data.yunteng.dto.TkEventKvEntry;
  22 +import org.thingsboard.server.common.data.page.PageData;
  23 +import org.thingsboard.server.common.data.page.TimePageLink;
  24 +import org.thingsboard.server.common.data.yunteng.dto.TkEventKvDto;
24 25 import org.thingsboard.server.common.data.yunteng.enums.DeviceEventTypeEnum;
25 26
26   -import java.util.Collection;
27   -import java.util.List;
28   -import java.util.Optional;
29   -
30 27 /**
31 28 * @author Andrew Shvayka
32 29 */
33 30 public interface EventsDao {
34 31
35   - ListenableFuture<Optional<TkEventKvEntry>> find(TenantId tenantId, EntityId entityId, DeviceEventTypeEnum attributeType, String attributeKey);
36   -
37   - ListenableFuture<List<TkEventKvEntry>> find(TenantId tenantId, EntityId entityId, DeviceEventTypeEnum attributeType, Collection<String> attributeKey);
38   -
39   - ListenableFuture<List<TkEventKvEntry>> findAll(TenantId tenantId, EntityId entityId, DeviceEventTypeEnum attributeType);
40   -
41   - ListenableFuture<Void> save(TenantId tenantId, DeviceProfileId profileId,EntityId entityId, DeviceEventTypeEnum attributeType,String eventIdentifier, TkEventKvEntry attribute);
42   -
43   - ListenableFuture<List<Void>> removeAll(TenantId tenantId, EntityId entityId, String eventId,DeviceEventTypeEnum eventType,List<String> keys);
  32 + PageData<TkEventKvDto> findEvents(EntityId entityId,String eventIdentifier, DeviceEventTypeEnum eventType, TimePageLink pageLink);
44 33
45   - List<String> findAllKeysByDeviceProfileId(TenantId tenantId, DeviceProfileId deviceProfileId);
  34 + ListenableFuture<Void> save(TenantId tenantId, DeviceProfileId profileId,EntityId entityId, DeviceEventTypeEnum attributeType,String eventIdentifier, String eventData,Long eventTime);
46 35
47   - List<String> findAllKeysByEntityIds(TenantId tenantId, EntityType entityType, List<EntityId> entityIds);
48 36 }
... ...
... ... @@ -43,6 +43,4 @@ public class TkEventKvCompositeKey implements Serializable {
43 43 private String eventIdentifier;
44 44 @Column(name = EVENT_TIME_COLUMN)
45 45 private Long eventTime;
46   - @Column(name = EVENT_KEY_COLUMN)
47   - private String eventKey;
48 46 }
... ...
... ... @@ -16,9 +16,8 @@
16 16 package org.thingsboard.server.dao.yunteng.jpa.entity.events;
17 17
18 18 import lombok.Data;
19   -import org.thingsboard.server.common.data.kv.*;
  19 +import org.thingsboard.common.util.JacksonUtil;
20 20 import org.thingsboard.server.common.data.yunteng.dto.TkEventKvDto;
21   -import org.thingsboard.server.common.data.yunteng.dto.TkEventKvEntry;
22 21 import org.thingsboard.server.dao.model.ToData;
23 22
24 23 import javax.persistence.Column;
... ... @@ -28,50 +27,26 @@ import javax.persistence.Table;
28 27 import java.io.Serializable;
29 28 import java.util.UUID;
30 29
31   -import static org.thingsboard.server.dao.model.ModelConstants.*;
  30 +import static org.thingsboard.server.dao.model.ModelConstants.DEVICE_DEVICE_PROFILE_ID_PROPERTY;
  31 +import static org.thingsboard.server.dao.model.ModelConstants.EVENT_VALUE_COLUMN;
32 32
33 33 @Data
34 34 @Entity
35 35 @Table(name = "tk_event_kv")
36   -public class TkEventKvEntity implements ToData<TkEventKvEntry>, Serializable {
  36 +public class TkEventKvEntity implements ToData<TkEventKvDto>, Serializable {
37 37
38 38 @EmbeddedId
39 39 private TkEventKvCompositeKey id;
40 40
41   - @Column(name = BOOLEAN_VALUE_COLUMN)
42   - private Boolean booleanValue;
43   -
44   - @Column(name = STRING_VALUE_COLUMN)
45   - private String strValue;
46   -
47   - @Column(name = LONG_VALUE_COLUMN)
48   - private Long longValue;
49   -
50   - @Column(name = DOUBLE_VALUE_COLUMN)
51   - private Double doubleValue;
52   -
53   - @Column(name = JSON_VALUE_COLUMN)
54   - private String jsonValue;
  41 + @Column(name = EVENT_VALUE_COLUMN)
  42 + private String eventValue;
55 43
56 44
57 45 @Column(name = DEVICE_DEVICE_PROFILE_ID_PROPERTY, columnDefinition = "uuid")
58 46 private UUID deviceProfileId;
59 47
60 48 @Override
61   - public TkEventKvEntry toData() {
62   - KvEntry kvEntry = null;
63   - if (strValue != null) {
64   - kvEntry = new StringDataEntry(id.getEventIdentifier(), strValue);
65   - } else if (booleanValue != null) {
66   - kvEntry = new BooleanDataEntry(id.getEventIdentifier(), booleanValue);
67   - } else if (doubleValue != null) {
68   - kvEntry = new DoubleDataEntry(id.getEventIdentifier(), doubleValue);
69   - } else if (longValue != null) {
70   - kvEntry = new LongDataEntry(id.getEventIdentifier(), longValue);
71   - } else if (jsonValue != null) {
72   - kvEntry = new JsonDataEntry(id.getEventIdentifier(), jsonValue);
73   - }
74   -
75   - return new TkEventKvDto(kvEntry, id.getEventTime());
  49 + public TkEventKvDto toData() {
  50 + return new TkEventKvDto(id.getEventTime(),id.getEventType(), JacksonUtil.toJsonNode(eventValue),id.getEntityId(),id.getEventIdentifier(),"",deviceProfileId,"");
76 51 }
77 52 }
... ...
... ... @@ -13,7 +13,6 @@ import org.thingsboard.server.dao.yunteng.jpa.entity.events.TkEventKvEntity;
13 13
14 14 import java.sql.PreparedStatement;
15 15 import java.sql.SQLException;
16   -import java.sql.Types;
17 16 import java.util.ArrayList;
18 17 import java.util.List;
19 18 import java.util.regex.Pattern;
... ... @@ -25,14 +24,14 @@ public abstract class EventKvInsertRepository {
25 24 private static final ThreadLocal<Pattern> PATTERN_THREAD_LOCAL = ThreadLocal.withInitial(() -> Pattern.compile(String.valueOf(Character.MIN_VALUE)));
26 25 private static final String EMPTY_STR = "";
27 26
28   - private static final String BATCH_UPDATE = "UPDATE tk_event_kv SET str_v = ?, long_v = ?, dbl_v = ?, bool_v = ?, json_v = cast(? AS json),device_profile_id = ? " +
29   - "WHERE event_key = ? and event_type = ? and entity_id = ? and event_identifier =? and event_time = ?;";
  27 + private static final String BATCH_UPDATE = "UPDATE tk_event_kv SET event_value = ?, device_profile_id = ? " +
  28 + "WHERE event_type = ? and entity_id = ? and event_identifier =? and event_time = ?;";
30 29
31 30 private static final String INSERT_OR_UPDATE =
32   - "INSERT INTO tk_event_kv (event_type, entity_id, event_identifier, event_time, str_v, long_v, dbl_v, bool_v, json_v, event_key,device_profile_id) " +
33   - "VALUES(?, ?, ?, ?, ?, ?, ?, ?, cast(? AS json), ?, ?) " +
34   - "ON CONFLICT (event_type, entity_id, event_identifier, event_time, event_key) " +
35   - "DO UPDATE SET str_v = ?, long_v = ?, dbl_v = ?, bool_v = ?, json_v = cast(? AS json), event_key = ?,device_profile_id = ?;";
  31 + "INSERT INTO tk_event_kv (event_type, entity_id, event_identifier, event_time, event_value,device_profile_id) " +
  32 + "VALUES(?, ?, ?, ?, ?, ?) " +
  33 + "ON CONFLICT (event_type, entity_id, event_identifier, event_time) " +
  34 + "DO UPDATE SET event_value = ?,device_profile_id = ?;";
36 35
37 36 @Autowired
38 37 protected JdbcTemplate jdbcTemplate;
... ... @@ -51,34 +50,12 @@ public abstract class EventKvInsertRepository {
51 50 @Override
52 51 public void setValues(PreparedStatement ps, int i) throws SQLException {
53 52 TkEventKvEntity kvEntity = entities.get(i);
54   - ps.setString(1, replaceNullChars(kvEntity.getStrValue()));
55   -
56   - if (kvEntity.getLongValue() != null) {
57   - ps.setLong(2, kvEntity.getLongValue());
58   - } else {
59   - ps.setNull(2, Types.BIGINT);
60   - }
61   -
62   - if (kvEntity.getDoubleValue() != null) {
63   - ps.setDouble(3, kvEntity.getDoubleValue());
64   - } else {
65   - ps.setNull(3, Types.DOUBLE);
66   - }
67   -
68   - if (kvEntity.getBooleanValue() != null) {
69   - ps.setBoolean(4, kvEntity.getBooleanValue());
70   - } else {
71   - ps.setNull(4, Types.BOOLEAN);
72   - }
73   -
74   - ps.setString(5, replaceNullChars(kvEntity.getJsonValue()));
75   - ps.setObject(6, kvEntity.getDeviceProfileId());
76   -
77   - ps.setString(7, kvEntity.getId().getEventKey());
78   - ps.setString(8, kvEntity.getId().getEventType().name());
79   - ps.setObject(9, kvEntity.getId().getEntityId());
80   - ps.setString(10, kvEntity.getId().getEventIdentifier());
81   - ps.setLong(11, kvEntity.getId().getEventTime());
  53 + ps.setString(1, replaceNullChars(kvEntity.getEventValue()));
  54 + ps.setObject(2, kvEntity.getDeviceProfileId());
  55 + ps.setString(3, kvEntity.getId().getEventType().name());
  56 + ps.setObject(4, kvEntity.getId().getEntityId());
  57 + ps.setString(5, kvEntity.getId().getEventIdentifier());
  58 + ps.setLong(6, kvEntity.getId().getEventTime());
82 59 }
83 60
84 61 @Override
... ... @@ -110,42 +87,12 @@ public abstract class EventKvInsertRepository {
110 87 ps.setString(3, kvEntity.getId().getEventIdentifier());
111 88 ps.setLong(4, kvEntity.getId().getEventTime());
112 89
113   - ps.setString(5, replaceNullChars(kvEntity.getStrValue()));
114   - ps.setString(12, replaceNullChars(kvEntity.getStrValue()));
115   -
116   - if (kvEntity.getLongValue() != null) {
117   - ps.setLong(6, kvEntity.getLongValue());
118   - ps.setLong(13, kvEntity.getLongValue());
119   - } else {
120   - ps.setNull(6, Types.BIGINT);
121   - ps.setNull(13, Types.BIGINT);
122   - }
123   -
124   - if (kvEntity.getDoubleValue() != null) {
125   - ps.setDouble(7, kvEntity.getDoubleValue());
126   - ps.setDouble(14, kvEntity.getDoubleValue());
127   - } else {
128   - ps.setNull(7, Types.DOUBLE);
129   - ps.setNull(14, Types.DOUBLE);
130   - }
131   -
132   - if (kvEntity.getBooleanValue() != null) {
133   - ps.setBoolean(8, kvEntity.getBooleanValue());
134   - ps.setBoolean(15, kvEntity.getBooleanValue());
135   - } else {
136   - ps.setNull(8, Types.BOOLEAN);
137   - ps.setNull(15, Types.BOOLEAN);
138   - }
139   -
140   - ps.setString(9, replaceNullChars(kvEntity.getJsonValue()));
141   - ps.setString(16, replaceNullChars(kvEntity.getJsonValue()));
142   -
143   - ps.setString(10, kvEntity.getId().getEventKey());
144   - ps.setString(17, kvEntity.getId().getEventKey());
145   -
146   -
147   - ps.setObject(11, kvEntity.getDeviceProfileId());
148   - ps.setObject(18, kvEntity.getDeviceProfileId());
  90 + ps.setString(5, replaceNullChars(kvEntity.getEventValue()));
  91 + ps.setString(7, replaceNullChars(kvEntity.getEventValue()));
  92 +
  93 +
  94 + ps.setObject(6, kvEntity.getDeviceProfileId());
  95 + ps.setObject(8, kvEntity.getDeviceProfileId());
149 96 }
150 97
151 98 @Override
... ...
... ... @@ -15,47 +15,22 @@
15 15 */
16 16 package org.thingsboard.server.dao.yunteng.jpa.repository.event;
17 17
18   -import org.springframework.data.jpa.repository.Modifying;
  18 +import org.springframework.data.domain.Page;
  19 +import org.springframework.data.domain.Pageable;
19 20 import org.springframework.data.jpa.repository.Query;
20   -import org.springframework.data.repository.CrudRepository;
  21 +import org.springframework.data.repository.PagingAndSortingRepository;
21 22 import org.springframework.data.repository.query.Param;
22   -import org.springframework.transaction.annotation.Transactional;
23   -import org.thingsboard.server.common.data.yunteng.enums.DeviceEventTypeEnum;
24 23 import org.thingsboard.server.dao.yunteng.jpa.entity.events.TkEventKvCompositeKey;
25 24 import org.thingsboard.server.dao.yunteng.jpa.entity.events.TkEventKvEntity;
26 25
27   -import java.util.List;
28 26 import java.util.UUID;
29 27
30   -public interface EventKvRepository extends CrudRepository<TkEventKvEntity, TkEventKvCompositeKey> {
  28 +public interface EventKvRepository extends PagingAndSortingRepository<TkEventKvEntity, TkEventKvCompositeKey> {
31 29
32   - @Query("SELECT a FROM TkEventKvEntity a WHERE a.id.eventType = :eventType " +
33   - "AND a.id.entityId = :entityId " )
34   - List<TkEventKvEntity> findAllByEventTypeAndEntityId(@Param("eventType") DeviceEventTypeEnum eventType,
35   - @Param("entityId") UUID entityId);
36   -
37   - @Transactional
38   - @Modifying
39   - @Query("DELETE FROM TkEventKvEntity a WHERE a.id.eventType = :eventType " +
40   - "AND a.id.entityId = :entityId " +
41   - "AND a.id.eventIdentifier = :eventIdentifier " +
42   - "AND a.id.eventKey = :eventKey")
43   - void delete(@Param("eventType") DeviceEventTypeEnum eventType,
44   - @Param("entityId") UUID entityId,
45   - @Param("eventIdentifier") String eventIdentifier,
46   - @Param("eventKey") String eventKey);
47   -
48   - @Query(value = "SELECT DISTINCT event_key FROM tk_event_kv WHERE entity_id " +
49   - " in (SELECT id FROM device WHERE tenant_id = :tenantId and device_profile_id = :deviceProfileId limit 100) ORDER BY event_key", nativeQuery = true)
50   - List<String> findAllKeysByDeviceProfileId(@Param("tenantId") UUID tenantId,
51   - @Param("deviceProfileId") UUID deviceProfileId);
52   -
53   - @Query(value = "SELECT DISTINCT event_key FROM tk_event_kv WHERE entity_id " +
54   - " in (SELECT id FROM device WHERE tenant_id = :tenantId limit 100) ORDER BY event_key", nativeQuery = true)
55   - List<String> findAllKeysByTenantId(@Param("tenantId") UUID tenantId);
56   -
57   - @Query(value = "SELECT DISTINCT event_key FROM tk_event_kv WHERE entity_id " +
58   - " in :entityIds ORDER BY event_key", nativeQuery = true)
59   - List<String> findAllKeysByEntityIds(@Param("entityIds") List<UUID> entityIds);
  30 + @Query("SELECT a FROM TkEventKvEntity a WHERE a.id.entityId = :entityId "
  31 +// +"AND a.id.eventType = :eventType AND a.id.eventIdentifier = :eventIdentifier "
  32 + )
  33 + Page<TkEventKvEntity> findEvents(@Param("entityId") UUID entityId,
  34 + Pageable pageable);
60 35 }
61 36
... ...
... ... @@ -29,16 +29,15 @@ import java.util.List;
29 29 public class HsqlEventsInsertRepository extends EventKvInsertRepository {
30 30
31 31 private static final String INSERT_OR_UPDATE =
32   - "MERGE INTO tk_event_kv USING(VALUES ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) " +
33   - "A (event_type, entity_id, event_identifier, event_key, str_v, long_v, dbl_v, bool_v, json_v, event_key) " +
  32 + "MERGE INTO tk_event_kv USING(VALUES ?, ?, ?, ?, ?, ?) " +
  33 + "A (event_type, entity_id, event_identifier, event_time, event_value,device_profile_id) " +
34 34 "ON (tk_event_kv.event_type=A.event_type " +
35 35 "AND tk_event_kv.entity_id=A.entity_id " +
36 36 "AND tk_event_kv.event_identifier=A.event_identifier " +
37 37 "AND tk_event_kv.event_time=A.event_time) " +
38   - "AND tk_event_kv.event_key=A.event_key) " +
39   - "WHEN MATCHED THEN UPDATE SET tk_event_kv.str_v = A.str_v, tk_event_kv.long_v = A.long_v, tk_event_kv.dbl_v = A.dbl_v, tk_event_kv.bool_v = A.bool_v, tk_event_kv.json_v = A.json_v " +
40   - "WHEN NOT MATCHED THEN INSERT (event_type, entity_id, event_identifier, event_time, str_v, long_v, dbl_v, bool_v, json_v, event_key) " +
41   - "VALUES (A.event_type, A.entity_id, A.event_identifier, A.event_time, A.str_v, A.long_v, A.dbl_v, A.bool_v, A.json_v, A.event_key)";
  38 + "WHEN MATCHED THEN UPDATE SET tk_event_kv.event_value = A.event_value " +
  39 + "WHEN NOT MATCHED THEN INSERT (event_type, entity_id, event_identifier, event_time, event_value, device_profile_id) " +
  40 + "VALUES (A.event_type, A.entity_id, A.event_identifier, A.event_time, A.event_value, A.device_profile_id)";
42 41
43 42 @Override
44 43 protected void saveOrUpdate(List<TkEventKvEntity> entities) {
... ... @@ -48,29 +47,8 @@ public class HsqlEventsInsertRepository extends EventKvInsertRepository {
48 47 ps.setObject(2, entity.getId().getEntityId());
49 48 ps.setString(3, entity.getId().getEventIdentifier());
50 49 ps.setLong(4, entity.getId().getEventTime());
51   - ps.setString(5, entity.getStrValue());
52   -
53   - if (entity.getLongValue() != null) {
54   - ps.setLong(6, entity.getLongValue());
55   - } else {
56   - ps.setNull(6, Types.BIGINT);
57   - }
58   -
59   - if (entity.getDoubleValue() != null) {
60   - ps.setDouble(7, entity.getDoubleValue());
61   - } else {
62   - ps.setNull(7, Types.DOUBLE);
63   - }
64   -
65   - if (entity.getBooleanValue() != null) {
66   - ps.setBoolean(8, entity.getBooleanValue());
67   - } else {
68   - ps.setNull(8, Types.BOOLEAN);
69   - }
70   -
71   - ps.setString(9, entity.getJsonValue());
72   -
73   - ps.setString(10, entity.getId().getEventKey());
  50 + ps.setObject(5, entity.getEventValue());
  51 + ps.setObject(6, entity.getEventValue());
74 52 });
75 53 });
76 54 }
... ...
... ... @@ -15,18 +15,17 @@
15 15 */
16 16 package org.thingsboard.server.dao.yunteng.jpa.repository.event;
17 17
18   -import com.google.common.collect.Lists;
19   -import com.google.common.util.concurrent.Futures;
20 18 import com.google.common.util.concurrent.ListenableFuture;
21 19 import lombok.extern.slf4j.Slf4j;
22 20 import org.springframework.beans.factory.annotation.Autowired;
23 21 import org.springframework.beans.factory.annotation.Value;
24 22 import org.springframework.stereotype.Component;
25   -import org.thingsboard.server.common.data.EntityType;
26 23 import org.thingsboard.server.common.data.id.DeviceProfileId;
27 24 import org.thingsboard.server.common.data.id.EntityId;
28 25 import org.thingsboard.server.common.data.id.TenantId;
29   -import org.thingsboard.server.common.data.yunteng.dto.TkEventKvEntry;
  26 +import org.thingsboard.server.common.data.page.PageData;
  27 +import org.thingsboard.server.common.data.page.TimePageLink;
  28 +import org.thingsboard.server.common.data.yunteng.dto.TkEventKvDto;
30 29 import org.thingsboard.server.common.data.yunteng.enums.DeviceEventTypeEnum;
31 30 import org.thingsboard.server.common.stats.StatsFactory;
32 31 import org.thingsboard.server.dao.DaoUtil;
... ... @@ -34,18 +33,14 @@ import org.thingsboard.server.dao.sql.JpaAbstractDaoListeningExecutorService;
34 33 import org.thingsboard.server.dao.sql.ScheduledLogExecutorComponent;
35 34 import org.thingsboard.server.dao.sql.TbSqlBlockingQueueParams;
36 35 import org.thingsboard.server.dao.sql.TbSqlBlockingQueueWrapper;
  36 +import org.thingsboard.server.dao.yunteng.jpa.dao.event.EventsDao;
37 37 import org.thingsboard.server.dao.yunteng.jpa.entity.events.TkEventKvCompositeKey;
38 38 import org.thingsboard.server.dao.yunteng.jpa.entity.events.TkEventKvEntity;
39   -import org.thingsboard.server.dao.yunteng.jpa.dao.event.EventsDao;
40 39
41 40 import javax.annotation.PostConstruct;
42 41 import javax.annotation.PreDestroy;
43   -import java.util.Collection;
44 42 import java.util.Comparator;
45   -import java.util.List;
46   -import java.util.Optional;
47 43 import java.util.function.Function;
48   -import java.util.stream.Collectors;
49 44
50 45 @Component
51 46 @Slf4j
... ... @@ -109,59 +104,18 @@ public class JpaEventDao extends JpaAbstractDaoListeningExecutorService implemen
109 104 }
110 105
111 106 @Override
112   - public ListenableFuture<Optional<TkEventKvEntry>> find(TenantId tenantId, EntityId entityId, DeviceEventTypeEnum eventType, String attributeKey) {
113   - TkEventKvCompositeKey compositeKey =
114   - getEventKvCompositeKey(eventType,entityId,null, attributeKey,null);
115   - return Futures.immediateFuture(
116   - Optional.ofNullable(DaoUtil.getData(eventKvRepository.findById(compositeKey))));
117   - }
  107 + public PageData<TkEventKvDto> findEvents(EntityId entityId,String eventIdentifier, DeviceEventTypeEnum eventType, TimePageLink pageLink) {
118 108
119   - @Override
120   - public ListenableFuture<List<TkEventKvEntry>> find(TenantId tenantId, EntityId entityId, DeviceEventTypeEnum eventType, Collection<String> attributeKeys) {
121   - List<TkEventKvCompositeKey> compositeKeys =
122   - attributeKeys
123   - .stream()
124   - .map(attributeKey ->
125   - getEventKvCompositeKey(eventType,entityId, null,attributeKey,null))
126   - .collect(Collectors.toList());
127   - return Futures.immediateFuture(
128   - DaoUtil.convertDataList(Lists.newArrayList(eventKvRepository.findAllById(compositeKeys))));
  109 + return DaoUtil.toPageData(eventKvRepository.findEvents(entityId.getId(),DaoUtil.toPageable(pageLink)));
129 110 }
130 111
131   - @Override
132   - public ListenableFuture<List<TkEventKvEntry>> findAll(TenantId tenantId, EntityId entityId, DeviceEventTypeEnum eventType) {
133   - return Futures.immediateFuture(
134   - DaoUtil.convertDataList(Lists.newArrayList(
135   - eventKvRepository.findAllByEventTypeAndEntityId(
136   - eventType,
137   - entityId.getId()))));
138   - }
139 112
140 113 @Override
141   - public List<String> findAllKeysByDeviceProfileId(TenantId tenantId, DeviceProfileId deviceProfileId) {
142   - if (deviceProfileId != null) {
143   - return eventKvRepository.findAllKeysByDeviceProfileId(tenantId.getId(), deviceProfileId.getId());
144   - } else {
145   - return eventKvRepository.findAllKeysByTenantId(tenantId.getId());
146   - }
147   - }
148   -
149   - @Override
150   - public List<String> findAllKeysByEntityIds(TenantId tenantId, EntityType entityType, List<EntityId> entityIds) {
151   - return eventKvRepository
152   - .findAllKeysByEntityIds(entityIds.stream().map(EntityId::getId).collect(Collectors.toList()));
153   - }
154   -
155   - @Override
156   - public ListenableFuture<Void> save(TenantId tenantId, DeviceProfileId profileId,EntityId entityId,DeviceEventTypeEnum eventType, String eventIdentifier, TkEventKvEntry attribute) {
  114 + public ListenableFuture<Void> save(TenantId tenantId, DeviceProfileId profileId,EntityId entityId,DeviceEventTypeEnum eventType, String eventIdentifier, String eventData,Long eventTime) {
157 115 TkEventKvEntity entity = new TkEventKvEntity();
158   - entity.setId(new TkEventKvCompositeKey(eventType, entityId.getId(), eventIdentifier, attribute.getEventTime(),attribute.getKey()));
  116 + entity.setId(new TkEventKvCompositeKey(eventType, entityId.getId(), eventIdentifier, eventTime));
159 117 entity.setDeviceProfileId(profileId.getId());
160   - entity.setStrValue(attribute.getStrValue().orElse(null));
161   - entity.setDoubleValue(attribute.getDoubleValue().orElse(null));
162   - entity.setLongValue(attribute.getLongValue().orElse(null));
163   - entity.setBooleanValue(attribute.getBooleanValue().orElse(null));
164   - entity.setJsonValue(attribute.getJsonValue().orElse(null));
  118 + entity.setEventValue(eventData);
165 119 return addToQueue(entity);
166 120 }
167 121
... ... @@ -169,22 +123,4 @@ public class JpaEventDao extends JpaAbstractDaoListeningExecutorService implemen
169 123 return queue.add(entity);
170 124 }
171 125
172   - @Override
173   - public ListenableFuture<List<Void>> removeAll(TenantId tenantId, EntityId entityId, String eventId,DeviceEventTypeEnum eventType,List<String> keys) {
174   - return service.submit(() -> {
175   - keys.forEach(key ->
176   - eventKvRepository.delete(eventType,entityId.getId(), eventId,key)
177   - );
178   - return null;
179   - });
180   - }
181   -
182   - private TkEventKvCompositeKey getEventKvCompositeKey(DeviceEventTypeEnum eventType,EntityId entityId, String eventIdentifier,String eventKey, Long eventTime) {
183   - return new TkEventKvCompositeKey(
184   - eventType,
185   - entityId.getId(),
186   - eventIdentifier,
187   - eventTime,
188   - eventKey);
189   - }
190 126 }
... ...
... ... @@ -45,7 +45,7 @@ public interface RuleEngineTelemetryService {
45 45 void saveAndNotify(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes, boolean notifyDevice, FutureCallback<Void> callback);
46 46
47 47 //Thingskit function
48   - void saveAndNotify(TenantId tenantId, DeviceProfileId profileId, EntityId entityId, String eventId, DeviceEventTypeEnum eventType, List<TkEventKvEntry> attributes, FutureCallback<Void> callback);
  48 + void saveAndNotify(TenantId tenantId, DeviceProfileId profileId, EntityId entityId, String eventId, DeviceEventTypeEnum eventType, String eventData,Long eventTime, FutureCallback<Void> callback);
49 49
50 50 void saveLatestAndNotify(TenantId tenantId, EntityId entityId, List<TsKvEntry> ts, FutureCallback<Void> callback);
51 51
... ...
... ... @@ -15,21 +15,16 @@
15 15 */
16 16 package org.thingsboard.rule.engine.yunteng.event;
17 17
18   -import com.google.gson.JsonParser;
19 18 import lombok.extern.slf4j.Slf4j;
20 19 import org.thingsboard.rule.engine.api.*;
21 20 import org.thingsboard.rule.engine.api.util.TbNodeUtils;
22 21 import org.thingsboard.rule.engine.telemetry.TelemetryNodeCallback;
23 22 import org.thingsboard.server.common.data.id.DeviceProfileId;
24 23 import org.thingsboard.server.common.data.plugin.ComponentType;
25   -import org.thingsboard.server.common.data.yunteng.dto.TkEventKvEntry;
26 24 import org.thingsboard.server.common.data.yunteng.enums.DeviceEventTypeEnum;
27 25 import org.thingsboard.server.common.msg.TbMsg;
28 26 import org.thingsboard.server.common.msg.session.SessionMsgType;
29   -import org.thingsboard.server.common.transport.adaptor.JsonConverter;
30 27
31   -import java.util.ArrayList;
32   -import java.util.Set;
33 28 import java.util.UUID;
34 29
35 30 import static org.thingsboard.server.dao.model.ModelConstants.*;
... ... @@ -48,10 +43,12 @@ import static org.thingsboard.server.dao.model.ModelConstants.*;
48 43 public class TkMsgEventNode implements TbNode {
49 44
50 45 private TkMsgEventNodeConfiguration config;
  46 + private RuleEngineDeviceProfileCache cache;
51 47
52 48 @Override
53 49 public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException {
54 50 this.config = TbNodeUtils.convert(configuration, TkMsgEventNodeConfiguration.class);
  51 + cache = ctx.getDeviceProfileCache();
55 52 }
56 53
57 54 @Override
... ... @@ -60,17 +57,18 @@ public class TkMsgEventNode implements TbNode {
60 57 ctx.tellFailure(msg, new IllegalArgumentException("Unsupported msg type: " + msg.getType()));
61 58 return;
62 59 }
63   - String src = msg.getData();
64   - Set<TkEventKvEntry> events = JsonConverter.convertToEvents(new JsonParser().parse(src));
65 60 String eventIdentifier = msg.getMetaData().getValue(EVENT_IDENTIFIER_COLUMN);
66 61 String deviceProfileId = msg.getMetaData().getValue(DEVICE_DEVICE_PROFILE_ID_PROPERTY);
  62 + ////TODO: 验证事件类型、事件标识符和数据建是否与产品物模型中的事件匹配
  63 + long ts = System.currentTimeMillis();
  64 + String src = msg.getData();
67 65 DeviceEventTypeEnum eventType = DeviceEventTypeEnum.valueOf(msg.getMetaData().getValue(EVENT_TYPE_COLUMN));
68 66 ctx.getTelemetryService().saveAndNotify(
69 67 ctx.getTenantId(),new DeviceProfileId(UUID.fromString(deviceProfileId)),
70 68 msg.getOriginator(),
71 69 eventIdentifier,
72 70 eventType,
73   - new ArrayList<>(events),
  71 + src,ts,
74 72 new TelemetryNodeCallback(ctx, msg)
75 73 );
76 74 }
... ...