Commit d5a41a3d22971a639ca7da37d5236cd74fd18375
Merge branch '20230222' into 'master'
feat: 租户的规则链更新 See merge request yunteng/thingskit!159
Showing
22 changed files
with
210 additions
and
511 deletions
... | ... | @@ -113,6 +113,19 @@ |
113 | 113 | "port": "8082", |
114 | 114 | "resetApi": "/api/v1/notice/alert" |
115 | 115 | } |
116 | + }, | |
117 | + { | |
118 | + "additionalInfo": { | |
119 | + "description": "", | |
120 | + "layoutX": 824, | |
121 | + "layoutY": 549 | |
122 | + }, | |
123 | + "type": "org.thingsboard.rule.engine.yunteng.event.TkMsgEventNode", | |
124 | + "name": "save events", | |
125 | + "debugMode": false, | |
126 | + "configuration": { | |
127 | + "useServerTs": true | |
128 | + } | |
116 | 129 | } |
117 | 130 | ], |
118 | 131 | "connections": [ |
... | ... | @@ -150,6 +163,11 @@ |
150 | 163 | "fromIndex": 2, |
151 | 164 | "toIndex": 5, |
152 | 165 | "type": "RPC Request to Device" |
166 | + }, | |
167 | + { | |
168 | + "fromIndex": 2, | |
169 | + "toIndex": 8, | |
170 | + "type": "Post event" | |
153 | 171 | } |
154 | 172 | ], |
155 | 173 | "ruleChainConnections": null | ... | ... |
... | ... | @@ -316,7 +316,7 @@ public abstract class BaseController { |
316 | 316 | } |
317 | 317 | } |
318 | 318 | |
319 | - void checkParameter(String name, String param) throws ThingsboardException { | |
319 | + protected void checkParameter(String name, String param) throws ThingsboardException { | |
320 | 320 | if (StringUtils.isEmpty(param)) { |
321 | 321 | throw new ThingsboardException("参数【 " + name + "】不能为空!", ThingsboardErrorCode.BAD_REQUEST_PARAMS); |
322 | 322 | } | ... | ... |
application/src/main/java/org/thingsboard/server/controller/yunteng/TkEventController.java
0 → 100644
1 | +package org.thingsboard.server.controller.yunteng; | |
2 | + | |
3 | +import io.swagger.annotations.Api; | |
4 | +import io.swagger.annotations.ApiOperation; | |
5 | +import lombok.RequiredArgsConstructor; | |
6 | +import org.springframework.web.bind.annotation.GetMapping; | |
7 | +import org.springframework.web.bind.annotation.RequestMapping; | |
8 | +import org.springframework.web.bind.annotation.RequestParam; | |
9 | +import org.springframework.web.bind.annotation.RestController; | |
10 | +import org.thingsboard.server.common.data.exception.ThingsboardException; | |
11 | +import org.thingsboard.server.common.data.id.DeviceId; | |
12 | +import org.thingsboard.server.common.data.page.PageData; | |
13 | +import org.thingsboard.server.common.data.page.TimePageLink; | |
14 | +import org.thingsboard.server.common.data.yunteng.dto.TkEventKvDto; | |
15 | +import org.thingsboard.server.common.data.yunteng.enums.DeviceEventTypeEnum; | |
16 | +import org.thingsboard.server.common.data.yunteng.enums.OrderTypeEnum; | |
17 | +import org.thingsboard.server.controller.BaseController; | |
18 | +import org.thingsboard.server.dao.model.ModelConstants; | |
19 | +import org.thingsboard.server.dao.yunteng.event.TkEventsService; | |
20 | + | |
21 | +import java.util.UUID; | |
22 | + | |
23 | +import static org.thingsboard.server.common.data.yunteng.constant.QueryConstant.*; | |
24 | + | |
25 | +/** | |
26 | + * @author Administrator | |
27 | + */ | |
28 | +@RestController | |
29 | +@RequestMapping("api/yt/event") | |
30 | +@Api(tags = {"设备事件"}) | |
31 | +@RequiredArgsConstructor | |
32 | +public class TkEventController extends BaseController { | |
33 | + private final TkEventsService eventsService; | |
34 | + | |
35 | + @GetMapping(params = {PAGE_SIZE, PAGE}) | |
36 | + @ApiOperation("分页") | |
37 | + public PageData<TkEventKvDto> pageAlarmProfile( | |
38 | + @RequestParam(PAGE_SIZE) int pageSize, | |
39 | + @RequestParam(PAGE) int page, | |
40 | + @RequestParam(value = "eventIdentifier", required = false) String eventIdentifier, | |
41 | + @RequestParam(value = "eventType", required = false) DeviceEventTypeEnum eventType, | |
42 | + @RequestParam(value = "tbDeviceId", required = true) String tbDeviceId, | |
43 | + @RequestParam(required = false) Long startTime, | |
44 | + @RequestParam(required = false) Long endTime, | |
45 | + @RequestParam(value = ORDER_FILED, required = false) String orderBy, | |
46 | + @RequestParam(value = ORDER_TYPE, required = false) OrderTypeEnum orderType) | |
47 | + throws ThingsboardException { | |
48 | + checkParameter("tbDeviceId", tbDeviceId); | |
49 | + | |
50 | + if (orderBy != null && orderBy.isEmpty()) { | |
51 | + orderBy = ModelConstants.EVENT_TIME_COLUMN; | |
52 | + } | |
53 | + TimePageLink pageLink = createTimePageLink(pageSize, page, null, orderBy, orderType==null?"":orderType.name(), startTime, endTime); | |
54 | + return checkNotNull(eventsService.findEvents(new DeviceId(UUID.fromString(tbDeviceId)),eventIdentifier,eventType, pageLink)); | |
55 | + } | |
56 | + | |
57 | + | |
58 | +} | ... | ... |
... | ... | @@ -28,19 +28,11 @@ import org.thingsboard.server.common.data.ApiUsageRecordKey; |
28 | 28 | import org.thingsboard.server.common.data.EntityType; |
29 | 29 | import org.thingsboard.server.common.data.EntityView; |
30 | 30 | import org.thingsboard.server.common.data.id.CustomerId; |
31 | +import org.thingsboard.server.common.data.id.DeviceProfileId; | |
31 | 32 | import org.thingsboard.server.common.data.id.EntityId; |
32 | 33 | import org.thingsboard.server.common.data.id.TenantId; |
33 | -import org.thingsboard.server.common.data.kv.AttributeKvEntry; | |
34 | -import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; | |
35 | -import org.thingsboard.server.common.data.kv.BooleanDataEntry; | |
36 | -import org.thingsboard.server.common.data.kv.DeleteTsKvQuery; | |
37 | -import org.thingsboard.server.common.data.kv.DoubleDataEntry; | |
38 | -import org.thingsboard.server.common.data.kv.LongDataEntry; | |
39 | -import org.thingsboard.server.common.data.kv.StringDataEntry; | |
40 | -import org.thingsboard.server.common.data.kv.TsKvEntry; | |
41 | -import org.thingsboard.server.common.data.kv.TsKvLatestRemovingResult; | |
42 | -import org.thingsboard.server.common.data.yunteng.dto.TkEventKvEntry; | |
43 | -import org.thingsboard.server.common.data.yunteng.enums.TkEventType; | |
34 | +import org.thingsboard.server.common.data.kv.*; | |
35 | +import org.thingsboard.server.common.data.yunteng.enums.DeviceEventTypeEnum; | |
44 | 36 | import org.thingsboard.server.common.msg.queue.ServiceType; |
45 | 37 | import org.thingsboard.server.common.msg.queue.TbCallback; |
46 | 38 | import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; |
... | ... | @@ -57,15 +49,7 @@ import org.thingsboard.server.service.subscription.TbSubscriptionUtils; |
57 | 49 | import javax.annotation.Nullable; |
58 | 50 | import javax.annotation.PostConstruct; |
59 | 51 | import javax.annotation.PreDestroy; |
60 | -import java.util.ArrayList; | |
61 | -import java.util.Collection; | |
62 | -import java.util.Collections; | |
63 | -import java.util.Comparator; | |
64 | -import java.util.HashMap; | |
65 | -import java.util.List; | |
66 | -import java.util.Map; | |
67 | -import java.util.Objects; | |
68 | -import java.util.Optional; | |
52 | +import java.util.*; | |
69 | 53 | import java.util.concurrent.ExecutorService; |
70 | 54 | import java.util.concurrent.Executors; |
71 | 55 | |
... | ... | @@ -251,13 +235,13 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer |
251 | 235 | |
252 | 236 | //Thingskit function |
253 | 237 | @Override |
254 | - public void saveAndNotify(TenantId tenantId, EntityId entityId, String eventId, TkEventType eventType, List<TkEventKvEntry> attributes, FutureCallback<Void> callback) { | |
238 | + public void saveAndNotify(TenantId tenantId, DeviceProfileId profileId, EntityId entityId, String eventId, DeviceEventTypeEnum eventType, String eventData, Long eventTime, FutureCallback<Void> callback) { | |
255 | 239 | checkInternalEntity(entityId); |
256 | - saveAndNotifyInternal(tenantId, entityId, eventId,eventType, attributes, callback); | |
240 | + saveAndNotifyInternal(tenantId,profileId, entityId, eventId,eventType, eventData,eventTime, callback); | |
257 | 241 | } |
258 | 242 | @Override |
259 | - public void saveAndNotifyInternal(TenantId tenantId, EntityId entityId,String eventId, TkEventType eventType, List<TkEventKvEntry> attributes, FutureCallback<Void> callback) { | |
260 | - ListenableFuture<List<Void>> saveFuture = eventsService.save(tenantId, entityId,eventId,eventType, attributes); | |
243 | + public void saveAndNotifyInternal(TenantId tenantId,DeviceProfileId profileId, EntityId entityId,String eventId, DeviceEventTypeEnum eventType, String eventData,Long eventTime, FutureCallback<Void> callback) { | |
244 | + ListenableFuture<List<Void>> saveFuture = eventsService.save(tenantId,profileId, entityId,eventId,eventType, eventData,eventTime); | |
261 | 245 | addVoidCallback(saveFuture, callback); |
262 | 246 | // addWsCallback(saveFuture, success -> onAttributesUpdate(tenantId, entityId, scope, attributes, notifyDevice)); |
263 | 247 | } | ... | ... |
... | ... | @@ -17,12 +17,13 @@ package org.thingsboard.server.service.telemetry; |
17 | 17 | |
18 | 18 | import com.google.common.util.concurrent.FutureCallback; |
19 | 19 | import org.thingsboard.rule.engine.api.RuleEngineTelemetryService; |
20 | +import org.thingsboard.server.common.data.id.DeviceProfileId; | |
20 | 21 | import org.thingsboard.server.common.data.id.EntityId; |
21 | 22 | import org.thingsboard.server.common.data.id.TenantId; |
22 | 23 | import org.thingsboard.server.common.data.kv.AttributeKvEntry; |
23 | 24 | import org.thingsboard.server.common.data.kv.TsKvEntry; |
24 | 25 | import org.thingsboard.server.common.data.yunteng.dto.TkEventKvEntry; |
25 | -import org.thingsboard.server.common.data.yunteng.enums.TkEventType; | |
26 | +import org.thingsboard.server.common.data.yunteng.enums.DeviceEventTypeEnum; | |
26 | 27 | |
27 | 28 | import java.util.List; |
28 | 29 | |
... | ... | @@ -38,7 +39,7 @@ public interface InternalTelemetryService extends RuleEngineTelemetryService { |
38 | 39 | void saveAndNotifyInternal(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes, boolean notifyDevice, FutureCallback<Void> callback); |
39 | 40 | |
40 | 41 | //Thingskit function |
41 | - void saveAndNotifyInternal(TenantId tenantId, EntityId entityId, String eventId, TkEventType eventType, List<TkEventKvEntry> attributes, FutureCallback<Void> callback); | |
42 | + void saveAndNotifyInternal(TenantId tenantId, DeviceProfileId profileId, EntityId entityId, String eventId, DeviceEventTypeEnum eventType, String eventData,Long eventTime, FutureCallback<Void> callback); | |
42 | 43 | |
43 | 44 | void saveLatestAndNotifyInternal(TenantId tenantId, EntityId entityId, List<TsKvEntry> ts, FutureCallback<Void> callback); |
44 | 45 | ... | ... |
... | ... | @@ -16,34 +16,23 @@ |
16 | 16 | package org.thingsboard.server.dao.yunteng.event; |
17 | 17 | |
18 | 18 | import com.google.common.util.concurrent.ListenableFuture; |
19 | -import org.thingsboard.server.common.data.EntityType; | |
20 | 19 | import org.thingsboard.server.common.data.id.DeviceProfileId; |
21 | 20 | import org.thingsboard.server.common.data.id.EntityId; |
22 | 21 | import org.thingsboard.server.common.data.id.TenantId; |
23 | -import org.thingsboard.server.common.data.yunteng.dto.TkEventKvEntry; | |
24 | -import org.thingsboard.server.common.data.yunteng.enums.TkEventType; | |
22 | +import org.thingsboard.server.common.data.page.PageData; | |
23 | +import org.thingsboard.server.common.data.page.TimePageLink; | |
24 | +import org.thingsboard.server.common.data.yunteng.dto.TkEventKvDto; | |
25 | +import org.thingsboard.server.common.data.yunteng.enums.DeviceEventTypeEnum; | |
25 | 26 | |
26 | -import java.util.Collection; | |
27 | 27 | import java.util.List; |
28 | -import java.util.Optional; | |
29 | 28 | |
30 | 29 | /** |
31 | 30 | * @author Andrew Shvayka |
32 | 31 | */ |
33 | 32 | public interface TkEventsService { |
33 | + PageData<TkEventKvDto> findEvents(EntityId entityId,String eventIdentifier, DeviceEventTypeEnum eventType, TimePageLink pageLink); | |
34 | + ListenableFuture<List<Void>> save(TenantId tenantId, DeviceProfileId profileId,EntityId entityId, String eventId, DeviceEventTypeEnum eventType, String eventData,Long eventTime); | |
34 | 35 | |
35 | - ListenableFuture<Optional<TkEventKvEntry>> find(TenantId tenantId, EntityId entityId, String scope,String eventKey); | |
36 | 36 | |
37 | - ListenableFuture<List<TkEventKvEntry>> find(TenantId tenantId, EntityId entityId, String scope, Collection<String> eventKeys); | |
38 | - | |
39 | - ListenableFuture<List<TkEventKvEntry>> findAll(TenantId tenantId, EntityId entityId,String eventId, TkEventType eventType); | |
40 | - | |
41 | - ListenableFuture<List<Void>> save(TenantId tenantId, EntityId entityId, String eventId, TkEventType eventType, List<TkEventKvEntry> attributes); | |
42 | - | |
43 | - ListenableFuture<List<Void>> removeAll(TenantId tenantId, EntityId entityId, String eventId, TkEventType eventType, List<String> eventKeys); | |
44 | - | |
45 | - List<String> findAllKeysByDeviceProfileId(TenantId tenantId, DeviceProfileId deviceProfileId); | |
46 | - | |
47 | - List<String> findAllKeysByEntityIds(TenantId tenantId, EntityType entityType, List<EntityId> entityIds); | |
48 | 37 | |
49 | 38 | } | ... | ... |
... | ... | @@ -15,107 +15,28 @@ |
15 | 15 | */ |
16 | 16 | package org.thingsboard.server.common.data.yunteng.dto; |
17 | 17 | |
18 | -import org.thingsboard.server.common.data.kv.AttributeKvEntry; | |
19 | -import org.thingsboard.server.common.data.kv.DataType; | |
20 | -import org.thingsboard.server.common.data.kv.KvEntry; | |
18 | +import com.fasterxml.jackson.databind.JsonNode; | |
19 | +import io.swagger.annotations.ApiModel; | |
20 | +import lombok.Data; | |
21 | +import org.thingsboard.server.common.data.yunteng.enums.DeviceEventTypeEnum; | |
21 | 22 | |
22 | -import javax.validation.Valid; | |
23 | -import java.util.Optional; | |
23 | +import java.util.UUID; | |
24 | 24 | |
25 | 25 | /** |
26 | 26 | * @author Andrew Shvayka |
27 | 27 | */ |
28 | -public class TkEventKvDto implements TkEventKvEntry { | |
28 | +@Data | |
29 | +@ApiModel | |
30 | +public class TkEventKvDto { | |
29 | 31 | |
30 | 32 | private static final long serialVersionUID = -6460767583563159407L; |
31 | 33 | |
32 | 34 | private final long eventTime; |
33 | - @Valid | |
34 | - private final KvEntry kv; | |
35 | - | |
36 | - public TkEventKvDto(KvEntry kv, long eventTime) { | |
37 | - this.kv = kv; | |
38 | - this.eventTime = eventTime; | |
39 | - } | |
40 | - | |
41 | - public TkEventKvDto(long lastUpdateTs, KvEntry kv) { | |
42 | - this(kv, lastUpdateTs); | |
43 | - } | |
44 | - | |
45 | - @Override | |
46 | - public long getEventTime() { | |
47 | - return eventTime; | |
48 | - } | |
49 | - | |
50 | - @Override | |
51 | - public String getKey() { | |
52 | - return kv.getKey(); | |
53 | - } | |
54 | - | |
55 | - @Override | |
56 | - public DataType getDataType() { | |
57 | - return kv.getDataType(); | |
58 | - } | |
59 | - | |
60 | - @Override | |
61 | - public Optional<String> getStrValue() { | |
62 | - return kv.getStrValue(); | |
63 | - } | |
64 | - | |
65 | - @Override | |
66 | - public Optional<Long> getLongValue() { | |
67 | - return kv.getLongValue(); | |
68 | - } | |
69 | - | |
70 | - @Override | |
71 | - public Optional<Boolean> getBooleanValue() { | |
72 | - return kv.getBooleanValue(); | |
73 | - } | |
74 | - | |
75 | - @Override | |
76 | - public Optional<Double> getDoubleValue() { | |
77 | - return kv.getDoubleValue(); | |
78 | - } | |
79 | - | |
80 | - @Override | |
81 | - public Optional<String> getJsonValue() { | |
82 | - return kv.getJsonValue(); | |
83 | - } | |
84 | - | |
85 | - @Override | |
86 | - public String getValueAsString() { | |
87 | - return kv.getValueAsString(); | |
88 | - } | |
89 | - | |
90 | - @Override | |
91 | - public Object getValue() { | |
92 | - return kv.getValue(); | |
93 | - } | |
94 | - | |
95 | - @Override | |
96 | - public boolean equals(Object o) { | |
97 | - if (this == o) return true; | |
98 | - if (o == null || getClass() != o.getClass()) return false; | |
99 | - | |
100 | - TkEventKvDto that = (TkEventKvDto) o; | |
101 | - | |
102 | - if (eventTime != that.eventTime) return false; | |
103 | - return kv.equals(that.kv); | |
104 | - | |
105 | - } | |
106 | - | |
107 | - @Override | |
108 | - public int hashCode() { | |
109 | - int result = (int) (eventTime ^ (eventTime >>> 32)); | |
110 | - result = 31 * result + kv.hashCode(); | |
111 | - return result; | |
112 | - } | |
113 | - | |
114 | - @Override | |
115 | - public String toString() { | |
116 | - return "BaseAttributeKvEntry{" + | |
117 | - "lastUpdateTs=" + eventTime + | |
118 | - ", kv=" + kv + | |
119 | - '}'; | |
120 | - } | |
35 | + private final DeviceEventTypeEnum eventType; | |
36 | + private final JsonNode eventValue; | |
37 | + private final UUID entityId; | |
38 | + private final String eventIdentifier; | |
39 | + private final String eventName; | |
40 | + private final UUID deviceProfileId; | |
41 | + private final String deviceProfileName; | |
121 | 42 | } | ... | ... |
... | ... | @@ -15,53 +15,18 @@ |
15 | 15 | */ |
16 | 16 | package org.thingsboard.server.common.transport.adaptor; |
17 | 17 | |
18 | -import com.google.gson.Gson; | |
19 | -import com.google.gson.JsonArray; | |
20 | -import com.google.gson.JsonElement; | |
21 | -import com.google.gson.JsonObject; | |
22 | -import com.google.gson.JsonParser; | |
23 | -import com.google.gson.JsonPrimitive; | |
24 | -import com.google.gson.JsonSyntaxException; | |
18 | +import com.google.gson.*; | |
25 | 19 | import org.apache.commons.lang3.math.NumberUtils; |
26 | 20 | import org.springframework.util.StringUtils; |
27 | 21 | import org.thingsboard.server.common.data.DataConstants; |
28 | 22 | import org.thingsboard.server.common.data.id.DeviceId; |
29 | -import org.thingsboard.server.common.data.kv.AttributeKvEntry; | |
30 | -import org.thingsboard.server.common.data.kv.BaseAttributeKvEntry; | |
31 | -import org.thingsboard.server.common.data.kv.BooleanDataEntry; | |
32 | -import org.thingsboard.server.common.data.kv.DoubleDataEntry; | |
33 | -import org.thingsboard.server.common.data.kv.JsonDataEntry; | |
34 | -import org.thingsboard.server.common.data.kv.KvEntry; | |
35 | -import org.thingsboard.server.common.data.kv.LongDataEntry; | |
36 | -import org.thingsboard.server.common.data.kv.StringDataEntry; | |
37 | -import org.thingsboard.server.common.data.yunteng.dto.TkEventKvDto; | |
38 | -import org.thingsboard.server.common.data.yunteng.dto.TkEventKvEntry; | |
23 | +import org.thingsboard.server.common.data.kv.*; | |
39 | 24 | import org.thingsboard.server.gen.transport.TransportProtos; |
40 | -import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg; | |
41 | -import org.thingsboard.server.gen.transport.TransportProtos.ClaimDeviceMsg; | |
42 | -import org.thingsboard.server.gen.transport.TransportProtos.CredentialsType; | |
43 | -import org.thingsboard.server.gen.transport.TransportProtos.GetAttributeResponseMsg; | |
44 | -import org.thingsboard.server.gen.transport.TransportProtos.KeyValueProto; | |
45 | -import org.thingsboard.server.gen.transport.TransportProtos.KeyValueType; | |
46 | -import org.thingsboard.server.gen.transport.TransportProtos.PostAttributeMsg; | |
47 | -import org.thingsboard.server.gen.transport.TransportProtos.PostTelemetryMsg; | |
48 | -import org.thingsboard.server.gen.transport.TransportProtos.ProvisionDeviceResponseMsg; | |
49 | -import org.thingsboard.server.gen.transport.TransportProtos.ResponseStatus; | |
50 | -import org.thingsboard.server.gen.transport.TransportProtos.TsKvListProto; | |
51 | -import org.thingsboard.server.gen.transport.TransportProtos.TsKvProto; | |
52 | -import org.thingsboard.server.gen.transport.TransportProtos.ValidateBasicMqttCredRequestMsg; | |
53 | -import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceTokenRequestMsg; | |
54 | -import org.thingsboard.server.gen.transport.TransportProtos.ValidateDeviceX509CertRequestMsg; | |
25 | +import org.thingsboard.server.gen.transport.TransportProtos.*; | |
55 | 26 | |
56 | 27 | import java.math.BigDecimal; |
57 | -import java.util.ArrayList; | |
58 | -import java.util.HashMap; | |
59 | -import java.util.HashSet; | |
60 | -import java.util.List; | |
61 | -import java.util.Map; | |
28 | +import java.util.*; | |
62 | 29 | import java.util.Map.Entry; |
63 | -import java.util.Set; | |
64 | -import java.util.TreeMap; | |
65 | 30 | import java.util.function.Consumer; |
66 | 31 | import java.util.stream.Collectors; |
67 | 32 | |
... | ... | @@ -518,12 +483,6 @@ public class JsonConverter { |
518 | 483 | return result; |
519 | 484 | } |
520 | 485 | |
521 | - public static Set<TkEventKvEntry> convertToEvents(JsonElement element) { | |
522 | - Set<TkEventKvEntry> result = new HashSet<>(); | |
523 | - long ts = System.currentTimeMillis(); | |
524 | - result.addAll(parseValues(element.getAsJsonObject()).stream().map(kv -> new TkEventKvDto(kv, ts)).collect(Collectors.toList())); | |
525 | - return result; | |
526 | - } | |
527 | 486 | |
528 | 487 | private static List<KvEntry> parseValues(JsonObject valuesObject) { |
529 | 488 | List<KvEntry> result = new ArrayList<>(); | ... | ... |
... | ... | @@ -593,6 +593,8 @@ public class DefaultTransportService implements TransportService { |
593 | 593 | DeviceId deviceId = new DeviceId(new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB())); |
594 | 594 | metaData.putValue("deviceName", sessionInfo.getDeviceName()); |
595 | 595 | metaData.putValue("deviceType", sessionInfo.getDeviceType()); |
596 | + UUID deviceProfileId = new UUID(sessionInfo.getDeviceProfileIdMSB(),sessionInfo.getDeviceProfileIdMSB()); | |
597 | + metaData.putValue("device_profile_id", deviceProfileId.toString()); | |
596 | 598 | metaData.putValue("deviceId",eventInfo[0]); |
597 | 599 | metaData.putValue("event_identifier", eventInfo[1]); |
598 | 600 | metaData.putValue("event_type",eventInfo[2]); | ... | ... |
... | ... | @@ -60,7 +60,7 @@ public class ModelConstants { |
60 | 60 | public static final String EVENT_IDENTIFIER_COLUMN = "event_identifier"; |
61 | 61 | public static final String EVENT_TIME_COLUMN = "event_time"; |
62 | 62 | public static final String EVENT_TYPE_COLUMN = "event_type"; |
63 | - public static final String EVENT_KEY_COLUMN = "event_key"; | |
63 | + public static final String EVENT_VALUE_COLUMN = "event_value"; | |
64 | 64 | |
65 | 65 | /** |
66 | 66 | * Cassandra user constants. | ... | ... |
... | ... | @@ -19,19 +19,16 @@ import com.google.common.util.concurrent.Futures; |
19 | 19 | import com.google.common.util.concurrent.ListenableFuture; |
20 | 20 | import lombok.extern.slf4j.Slf4j; |
21 | 21 | import org.springframework.stereotype.Service; |
22 | -import org.thingsboard.server.common.data.EntityType; | |
23 | 22 | import org.thingsboard.server.common.data.id.DeviceProfileId; |
24 | 23 | import org.thingsboard.server.common.data.id.EntityId; |
25 | 24 | import org.thingsboard.server.common.data.id.TenantId; |
26 | -import org.thingsboard.server.common.data.yunteng.dto.TkEventKvEntry; | |
27 | -import org.thingsboard.server.common.data.yunteng.enums.TkEventType; | |
28 | -import org.thingsboard.server.dao.service.Validator; | |
25 | +import org.thingsboard.server.common.data.page.PageData; | |
26 | +import org.thingsboard.server.common.data.page.TimePageLink; | |
27 | +import org.thingsboard.server.common.data.yunteng.dto.TkEventKvDto; | |
28 | +import org.thingsboard.server.common.data.yunteng.enums.DeviceEventTypeEnum; | |
29 | 29 | import org.thingsboard.server.dao.yunteng.event.TkEventsService; |
30 | 30 | |
31 | -import java.util.Collection; | |
32 | 31 | import java.util.List; |
33 | -import java.util.Optional; | |
34 | -import java.util.stream.Collectors; | |
35 | 32 | |
36 | 33 | import static org.thingsboard.server.dao.yunteng.jpa.dao.event.EventUtils.validate; |
37 | 34 | |
... | ... | @@ -48,48 +45,19 @@ public class BaseEventsService implements TkEventsService { |
48 | 45 | this.eventsDao = eventsDao; |
49 | 46 | } |
50 | 47 | |
51 | - @Override | |
52 | - public ListenableFuture<Optional<TkEventKvEntry>> find(TenantId tenantId, EntityId entityId, String eventId,String attributeKey) { | |
53 | - validate(entityId, eventId); | |
54 | - Validator.validateString(attributeKey, "Incorrect attribute key " + attributeKey); | |
55 | - return eventsDao.find(tenantId, entityId, null, attributeKey); | |
56 | - } | |
57 | 48 | |
58 | 49 | @Override |
59 | - public ListenableFuture<List<TkEventKvEntry>> find(TenantId tenantId, EntityId entityId, String eventId, Collection<String> attributeKeys) { | |
60 | - validate(entityId, eventId); | |
61 | - attributeKeys.forEach(attributeKey -> Validator.validateString(attributeKey, "Incorrect attribute key " + attributeKey)); | |
62 | - return eventsDao.find(tenantId, entityId, null, attributeKeys); | |
50 | + public PageData<TkEventKvDto> findEvents(EntityId entityId, String eventId, DeviceEventTypeEnum eventType, TimePageLink pageLink) { | |
51 | + validate(entityId); | |
52 | + return eventsDao.findEvents( entityId, eventId,eventType,pageLink); | |
63 | 53 | } |
64 | 54 | |
65 | 55 | @Override |
66 | - public ListenableFuture<List<TkEventKvEntry>> findAll(TenantId tenantId, EntityId entityId,String eventId, TkEventType eventType) { | |
67 | - validate(entityId,eventId); | |
68 | - return eventsDao.findAll(tenantId, entityId, eventType); | |
69 | - } | |
70 | - | |
71 | - @Override | |
72 | - public List<String> findAllKeysByDeviceProfileId(TenantId tenantId, DeviceProfileId deviceProfileId) { | |
73 | - return eventsDao.findAllKeysByDeviceProfileId(tenantId, deviceProfileId); | |
74 | - } | |
75 | - | |
76 | - @Override | |
77 | - public List<String> findAllKeysByEntityIds(TenantId tenantId, EntityType entityType, List<EntityId> entityIds) { | |
78 | - return eventsDao.findAllKeysByEntityIds(tenantId, entityType, entityIds); | |
79 | - } | |
80 | - | |
81 | - @Override | |
82 | - public ListenableFuture<List<Void>> save(TenantId tenantId, EntityId entityId, String eventId, TkEventType eventType, List<TkEventKvEntry> attributes) { | |
56 | + public ListenableFuture<List<Void>> save(TenantId tenantId,DeviceProfileId profileId, EntityId entityId, String eventId, DeviceEventTypeEnum eventType, String eventData,Long eventTime) { | |
83 | 57 | validate(entityId, eventId,eventType); |
84 | - attributes.forEach(attribute -> validate(attribute)); | |
85 | 58 | |
86 | - List<ListenableFuture<Void>> saveFutures = attributes.stream().map(attribute -> eventsDao.save(tenantId, entityId, eventType,eventId, attribute)).collect(Collectors.toList()); | |
59 | + ListenableFuture<Void> saveFutures = eventsDao.save(tenantId, profileId,entityId, eventType,eventId, eventData,eventTime); | |
87 | 60 | return Futures.allAsList(saveFutures); |
88 | 61 | } |
89 | 62 | |
90 | - @Override | |
91 | - public ListenableFuture<List<Void>> removeAll(TenantId tenantId, EntityId entityId, String eventId, TkEventType eventType, List<String> eventKeys) { | |
92 | - validate(entityId, eventId); | |
93 | - return eventsDao.removeAll(tenantId, entityId, eventId,eventType, eventKeys); | |
94 | - } | |
95 | 63 | } | ... | ... |
... | ... | @@ -16,20 +16,24 @@ |
16 | 16 | package org.thingsboard.server.dao.yunteng.jpa.dao.event; |
17 | 17 | |
18 | 18 | import org.thingsboard.server.common.data.id.EntityId; |
19 | -import org.thingsboard.server.common.data.kv.AttributeKvEntry; | |
20 | 19 | import org.thingsboard.server.common.data.yunteng.dto.TkEventKvEntry; |
21 | -import org.thingsboard.server.common.data.yunteng.enums.TkEventType; | |
20 | +import org.thingsboard.server.common.data.yunteng.enums.DeviceEventTypeEnum; | |
22 | 21 | import org.thingsboard.server.dao.exception.IncorrectParameterException; |
23 | 22 | import org.thingsboard.server.dao.service.Validator; |
24 | 23 | |
25 | 24 | public class EventUtils { |
25 | + public static void validate(EntityId id) { | |
26 | + Validator.validateId(id.getId(), "Incorrect id " + id); | |
27 | + } | |
28 | + | |
29 | + | |
26 | 30 | public static void validate(EntityId id, String eventIdentifier) { |
27 | 31 | Validator.validateId(id.getId(), "Incorrect id " + id); |
28 | - Validator.validateString(eventIdentifier, "Incorrect scope " + eventIdentifier); | |
32 | + Validator.validateString(eventIdentifier, "Incorrect event identifier " + eventIdentifier); | |
29 | 33 | } |
30 | 34 | |
31 | 35 | |
32 | - public static void validate(EntityId id, String eventIdentifier, TkEventType eventType) { | |
36 | + public static void validate(EntityId id, String eventIdentifier, DeviceEventTypeEnum eventType) { | |
33 | 37 | Validator.validateId(id.getId(), "Incorrect id " + id); |
34 | 38 | Validator.validateString(eventIdentifier, "Incorrect scope " + eventIdentifier); |
35 | 39 | if(eventType == null){ | ... | ... |
... | ... | @@ -16,33 +16,21 @@ |
16 | 16 | package org.thingsboard.server.dao.yunteng.jpa.dao.event; |
17 | 17 | |
18 | 18 | import com.google.common.util.concurrent.ListenableFuture; |
19 | -import org.thingsboard.server.common.data.EntityType; | |
20 | 19 | import org.thingsboard.server.common.data.id.DeviceProfileId; |
21 | 20 | import org.thingsboard.server.common.data.id.EntityId; |
22 | 21 | import org.thingsboard.server.common.data.id.TenantId; |
23 | -import org.thingsboard.server.common.data.yunteng.dto.TkEventKvEntry; | |
24 | -import org.thingsboard.server.common.data.yunteng.enums.TkEventType; | |
25 | - | |
26 | -import java.util.Collection; | |
27 | -import java.util.List; | |
28 | -import java.util.Optional; | |
22 | +import org.thingsboard.server.common.data.page.PageData; | |
23 | +import org.thingsboard.server.common.data.page.TimePageLink; | |
24 | +import org.thingsboard.server.common.data.yunteng.dto.TkEventKvDto; | |
25 | +import org.thingsboard.server.common.data.yunteng.enums.DeviceEventTypeEnum; | |
29 | 26 | |
30 | 27 | /** |
31 | 28 | * @author Andrew Shvayka |
32 | 29 | */ |
33 | 30 | public interface EventsDao { |
34 | 31 | |
35 | - ListenableFuture<Optional<TkEventKvEntry>> find(TenantId tenantId, EntityId entityId, TkEventType attributeType, String attributeKey); | |
36 | - | |
37 | - ListenableFuture<List<TkEventKvEntry>> find(TenantId tenantId, EntityId entityId, TkEventType attributeType, Collection<String> attributeKey); | |
38 | - | |
39 | - ListenableFuture<List<TkEventKvEntry>> findAll(TenantId tenantId, EntityId entityId, TkEventType attributeType); | |
40 | - | |
41 | - ListenableFuture<Void> save(TenantId tenantId, EntityId entityId, TkEventType attributeType,String eventIdentifier, TkEventKvEntry attribute); | |
42 | - | |
43 | - ListenableFuture<List<Void>> removeAll(TenantId tenantId, EntityId entityId, String eventId,TkEventType eventType,List<String> keys); | |
32 | + PageData<TkEventKvDto> findEvents(EntityId entityId,String eventIdentifier, DeviceEventTypeEnum eventType, TimePageLink pageLink); | |
44 | 33 | |
45 | - List<String> findAllKeysByDeviceProfileId(TenantId tenantId, DeviceProfileId deviceProfileId); | |
34 | + ListenableFuture<Void> save(TenantId tenantId, DeviceProfileId profileId,EntityId entityId, DeviceEventTypeEnum attributeType,String eventIdentifier, String eventData,Long eventTime); | |
46 | 35 | |
47 | - List<String> findAllKeysByEntityIds(TenantId tenantId, EntityType entityType, List<EntityId> entityIds); | |
48 | 36 | } | ... | ... |
... | ... | @@ -18,7 +18,7 @@ package org.thingsboard.server.dao.yunteng.jpa.entity.events; |
18 | 18 | import lombok.AllArgsConstructor; |
19 | 19 | import lombok.Data; |
20 | 20 | import lombok.NoArgsConstructor; |
21 | -import org.thingsboard.server.common.data.yunteng.enums.TkEventType; | |
21 | +import org.thingsboard.server.common.data.yunteng.enums.DeviceEventTypeEnum; | |
22 | 22 | |
23 | 23 | import javax.persistence.Column; |
24 | 24 | import javax.persistence.Embeddable; |
... | ... | @@ -36,13 +36,11 @@ import static org.thingsboard.server.dao.model.ModelConstants.*; |
36 | 36 | public class TkEventKvCompositeKey implements Serializable { |
37 | 37 | @Enumerated(EnumType.STRING) |
38 | 38 | @Column(name = EVENT_TYPE_COLUMN) |
39 | - private TkEventType eventType; | |
39 | + private DeviceEventTypeEnum eventType; | |
40 | 40 | @Column(name = ENTITY_ID_COLUMN, columnDefinition = "uuid") |
41 | 41 | private UUID entityId; |
42 | 42 | @Column(name = EVENT_IDENTIFIER_COLUMN) |
43 | 43 | private String eventIdentifier; |
44 | 44 | @Column(name = EVENT_TIME_COLUMN) |
45 | 45 | private Long eventTime; |
46 | - @Column(name = EVENT_KEY_COLUMN) | |
47 | - private String eventKey; | |
48 | 46 | } | ... | ... |
... | ... | @@ -16,9 +16,8 @@ |
16 | 16 | package org.thingsboard.server.dao.yunteng.jpa.entity.events; |
17 | 17 | |
18 | 18 | import lombok.Data; |
19 | -import org.thingsboard.server.common.data.kv.*; | |
19 | +import org.thingsboard.common.util.JacksonUtil; | |
20 | 20 | import org.thingsboard.server.common.data.yunteng.dto.TkEventKvDto; |
21 | -import org.thingsboard.server.common.data.yunteng.dto.TkEventKvEntry; | |
22 | 21 | import org.thingsboard.server.dao.model.ToData; |
23 | 22 | |
24 | 23 | import javax.persistence.Column; |
... | ... | @@ -28,50 +27,26 @@ import javax.persistence.Table; |
28 | 27 | import java.io.Serializable; |
29 | 28 | import java.util.UUID; |
30 | 29 | |
31 | -import static org.thingsboard.server.dao.model.ModelConstants.*; | |
30 | +import static org.thingsboard.server.dao.model.ModelConstants.DEVICE_DEVICE_PROFILE_ID_PROPERTY; | |
31 | +import static org.thingsboard.server.dao.model.ModelConstants.EVENT_VALUE_COLUMN; | |
32 | 32 | |
33 | 33 | @Data |
34 | 34 | @Entity |
35 | 35 | @Table(name = "tk_event_kv") |
36 | -public class TkEventKvEntity implements ToData<TkEventKvEntry>, Serializable { | |
36 | +public class TkEventKvEntity implements ToData<TkEventKvDto>, Serializable { | |
37 | 37 | |
38 | 38 | @EmbeddedId |
39 | 39 | private TkEventKvCompositeKey id; |
40 | 40 | |
41 | - @Column(name = BOOLEAN_VALUE_COLUMN) | |
42 | - private Boolean booleanValue; | |
43 | - | |
44 | - @Column(name = STRING_VALUE_COLUMN) | |
45 | - private String strValue; | |
46 | - | |
47 | - @Column(name = LONG_VALUE_COLUMN) | |
48 | - private Long longValue; | |
49 | - | |
50 | - @Column(name = DOUBLE_VALUE_COLUMN) | |
51 | - private Double doubleValue; | |
52 | - | |
53 | - @Column(name = JSON_VALUE_COLUMN) | |
54 | - private String jsonValue; | |
41 | + @Column(name = EVENT_VALUE_COLUMN) | |
42 | + private String eventValue; | |
55 | 43 | |
56 | 44 | |
57 | 45 | @Column(name = DEVICE_DEVICE_PROFILE_ID_PROPERTY, columnDefinition = "uuid") |
58 | 46 | private UUID deviceProfileId; |
59 | 47 | |
60 | 48 | @Override |
61 | - public TkEventKvEntry toData() { | |
62 | - KvEntry kvEntry = null; | |
63 | - if (strValue != null) { | |
64 | - kvEntry = new StringDataEntry(id.getEventIdentifier(), strValue); | |
65 | - } else if (booleanValue != null) { | |
66 | - kvEntry = new BooleanDataEntry(id.getEventIdentifier(), booleanValue); | |
67 | - } else if (doubleValue != null) { | |
68 | - kvEntry = new DoubleDataEntry(id.getEventIdentifier(), doubleValue); | |
69 | - } else if (longValue != null) { | |
70 | - kvEntry = new LongDataEntry(id.getEventIdentifier(), longValue); | |
71 | - } else if (jsonValue != null) { | |
72 | - kvEntry = new JsonDataEntry(id.getEventIdentifier(), jsonValue); | |
73 | - } | |
74 | - | |
75 | - return new TkEventKvDto(kvEntry, id.getEventTime()); | |
49 | + public TkEventKvDto toData() { | |
50 | + return new TkEventKvDto(id.getEventTime(),id.getEventType(), JacksonUtil.toJsonNode(eventValue),id.getEntityId(),id.getEventIdentifier(),"",deviceProfileId,""); | |
76 | 51 | } |
77 | 52 | } | ... | ... |
... | ... | @@ -13,7 +13,6 @@ import org.thingsboard.server.dao.yunteng.jpa.entity.events.TkEventKvEntity; |
13 | 13 | |
14 | 14 | import java.sql.PreparedStatement; |
15 | 15 | import java.sql.SQLException; |
16 | -import java.sql.Types; | |
17 | 16 | import java.util.ArrayList; |
18 | 17 | import java.util.List; |
19 | 18 | import java.util.regex.Pattern; |
... | ... | @@ -25,14 +24,14 @@ public abstract class EventKvInsertRepository { |
25 | 24 | private static final ThreadLocal<Pattern> PATTERN_THREAD_LOCAL = ThreadLocal.withInitial(() -> Pattern.compile(String.valueOf(Character.MIN_VALUE))); |
26 | 25 | private static final String EMPTY_STR = ""; |
27 | 26 | |
28 | - private static final String BATCH_UPDATE = "UPDATE tk_event_kv SET str_v = ?, long_v = ?, dbl_v = ?, bool_v = ?, json_v = cast(? AS json) " + | |
29 | - "WHERE event_key = ? and event_type = ? and entity_id = ? and event_identifier =? and event_time = ?;"; | |
27 | + private static final String BATCH_UPDATE = "UPDATE tk_event_kv SET event_value = ?, device_profile_id = ? " + | |
28 | + "WHERE event_type = ? and entity_id = ? and event_identifier =? and event_time = ?;"; | |
30 | 29 | |
31 | 30 | private static final String INSERT_OR_UPDATE = |
32 | - "INSERT INTO tk_event_kv (event_type, entity_id, event_identifier, event_time, str_v, long_v, dbl_v, bool_v, json_v, event_key) " + | |
33 | - "VALUES(?, ?, ?, ?, ?, ?, ?, ?, cast(? AS json), ?) " + | |
34 | - "ON CONFLICT (event_type, entity_id, event_identifier, event_time, event_key) " + | |
35 | - "DO UPDATE SET str_v = ?, long_v = ?, dbl_v = ?, bool_v = ?, json_v = cast(? AS json), event_key = ?;"; | |
31 | + "INSERT INTO tk_event_kv (event_type, entity_id, event_identifier, event_time, event_value,device_profile_id) " + | |
32 | + "VALUES(?, ?, ?, ?, ?, ?) " + | |
33 | + "ON CONFLICT (event_type, entity_id, event_identifier, event_time) " + | |
34 | + "DO UPDATE SET event_value = ?,device_profile_id = ?;"; | |
36 | 35 | |
37 | 36 | @Autowired |
38 | 37 | protected JdbcTemplate jdbcTemplate; |
... | ... | @@ -51,33 +50,12 @@ public abstract class EventKvInsertRepository { |
51 | 50 | @Override |
52 | 51 | public void setValues(PreparedStatement ps, int i) throws SQLException { |
53 | 52 | TkEventKvEntity kvEntity = entities.get(i); |
54 | - ps.setString(1, replaceNullChars(kvEntity.getStrValue())); | |
55 | - | |
56 | - if (kvEntity.getLongValue() != null) { | |
57 | - ps.setLong(2, kvEntity.getLongValue()); | |
58 | - } else { | |
59 | - ps.setNull(2, Types.BIGINT); | |
60 | - } | |
61 | - | |
62 | - if (kvEntity.getDoubleValue() != null) { | |
63 | - ps.setDouble(3, kvEntity.getDoubleValue()); | |
64 | - } else { | |
65 | - ps.setNull(3, Types.DOUBLE); | |
66 | - } | |
67 | - | |
68 | - if (kvEntity.getBooleanValue() != null) { | |
69 | - ps.setBoolean(4, kvEntity.getBooleanValue()); | |
70 | - } else { | |
71 | - ps.setNull(4, Types.BOOLEAN); | |
72 | - } | |
73 | - | |
74 | - ps.setString(5, replaceNullChars(kvEntity.getJsonValue())); | |
75 | - | |
76 | - ps.setString(6, kvEntity.getId().getEventKey()); | |
77 | - ps.setString(7, kvEntity.getId().getEventType().name()); | |
78 | - ps.setObject(8, kvEntity.getId().getEntityId()); | |
79 | - ps.setString(9, kvEntity.getId().getEventIdentifier()); | |
80 | - ps.setLong(10, kvEntity.getId().getEventTime()); | |
53 | + ps.setString(1, replaceNullChars(kvEntity.getEventValue())); | |
54 | + ps.setObject(2, kvEntity.getDeviceProfileId()); | |
55 | + ps.setString(3, kvEntity.getId().getEventType().name()); | |
56 | + ps.setObject(4, kvEntity.getId().getEntityId()); | |
57 | + ps.setString(5, kvEntity.getId().getEventIdentifier()); | |
58 | + ps.setLong(6, kvEntity.getId().getEventTime()); | |
81 | 59 | } |
82 | 60 | |
83 | 61 | @Override |
... | ... | @@ -109,38 +87,12 @@ public abstract class EventKvInsertRepository { |
109 | 87 | ps.setString(3, kvEntity.getId().getEventIdentifier()); |
110 | 88 | ps.setLong(4, kvEntity.getId().getEventTime()); |
111 | 89 | |
112 | - ps.setString(5, replaceNullChars(kvEntity.getStrValue())); | |
113 | - ps.setString(11, replaceNullChars(kvEntity.getStrValue())); | |
114 | - | |
115 | - if (kvEntity.getLongValue() != null) { | |
116 | - ps.setLong(6, kvEntity.getLongValue()); | |
117 | - ps.setLong(12, kvEntity.getLongValue()); | |
118 | - } else { | |
119 | - ps.setNull(6, Types.BIGINT); | |
120 | - ps.setNull(12, Types.BIGINT); | |
121 | - } | |
122 | - | |
123 | - if (kvEntity.getDoubleValue() != null) { | |
124 | - ps.setDouble(7, kvEntity.getDoubleValue()); | |
125 | - ps.setDouble(13, kvEntity.getDoubleValue()); | |
126 | - } else { | |
127 | - ps.setNull(7, Types.DOUBLE); | |
128 | - ps.setNull(13, Types.DOUBLE); | |
129 | - } | |
130 | - | |
131 | - if (kvEntity.getBooleanValue() != null) { | |
132 | - ps.setBoolean(8, kvEntity.getBooleanValue()); | |
133 | - ps.setBoolean(14, kvEntity.getBooleanValue()); | |
134 | - } else { | |
135 | - ps.setNull(8, Types.BOOLEAN); | |
136 | - ps.setNull(14, Types.BOOLEAN); | |
137 | - } | |
138 | - | |
139 | - ps.setString(9, replaceNullChars(kvEntity.getJsonValue())); | |
140 | - ps.setString(15, replaceNullChars(kvEntity.getJsonValue())); | |
141 | - | |
142 | - ps.setString(10, kvEntity.getId().getEventKey()); | |
143 | - ps.setString(16, kvEntity.getId().getEventKey()); | |
90 | + ps.setString(5, replaceNullChars(kvEntity.getEventValue())); | |
91 | + ps.setString(7, replaceNullChars(kvEntity.getEventValue())); | |
92 | + | |
93 | + | |
94 | + ps.setObject(6, kvEntity.getDeviceProfileId()); | |
95 | + ps.setObject(8, kvEntity.getDeviceProfileId()); | |
144 | 96 | } |
145 | 97 | |
146 | 98 | @Override | ... | ... |
... | ... | @@ -15,47 +15,22 @@ |
15 | 15 | */ |
16 | 16 | package org.thingsboard.server.dao.yunteng.jpa.repository.event; |
17 | 17 | |
18 | -import org.springframework.data.jpa.repository.Modifying; | |
18 | +import org.springframework.data.domain.Page; | |
19 | +import org.springframework.data.domain.Pageable; | |
19 | 20 | import org.springframework.data.jpa.repository.Query; |
20 | -import org.springframework.data.repository.CrudRepository; | |
21 | +import org.springframework.data.repository.PagingAndSortingRepository; | |
21 | 22 | import org.springframework.data.repository.query.Param; |
22 | -import org.springframework.transaction.annotation.Transactional; | |
23 | -import org.thingsboard.server.common.data.yunteng.enums.TkEventType; | |
24 | 23 | import org.thingsboard.server.dao.yunteng.jpa.entity.events.TkEventKvCompositeKey; |
25 | 24 | import org.thingsboard.server.dao.yunteng.jpa.entity.events.TkEventKvEntity; |
26 | 25 | |
27 | -import java.util.List; | |
28 | 26 | import java.util.UUID; |
29 | 27 | |
30 | -public interface EventKvRepository extends CrudRepository<TkEventKvEntity, TkEventKvCompositeKey> { | |
28 | +public interface EventKvRepository extends PagingAndSortingRepository<TkEventKvEntity, TkEventKvCompositeKey> { | |
31 | 29 | |
32 | - @Query("SELECT a FROM TkEventKvEntity a WHERE a.id.eventType = :eventType " + | |
33 | - "AND a.id.entityId = :entityId " ) | |
34 | - List<TkEventKvEntity> findAllByEventTypeAndEntityId(@Param("eventType") TkEventType eventType, | |
35 | - @Param("entityId") UUID entityId); | |
36 | - | |
37 | - @Transactional | |
38 | - @Modifying | |
39 | - @Query("DELETE FROM TkEventKvEntity a WHERE a.id.eventType = :eventType " + | |
40 | - "AND a.id.entityId = :entityId " + | |
41 | - "AND a.id.eventIdentifier = :eventIdentifier " + | |
42 | - "AND a.id.eventKey = :eventKey") | |
43 | - void delete(@Param("eventType") TkEventType eventType, | |
44 | - @Param("entityId") UUID entityId, | |
45 | - @Param("eventIdentifier") String eventIdentifier, | |
46 | - @Param("eventKey") String eventKey); | |
47 | - | |
48 | - @Query(value = "SELECT DISTINCT event_key FROM tk_event_kv WHERE entity_id " + | |
49 | - " in (SELECT id FROM device WHERE tenant_id = :tenantId and device_profile_id = :deviceProfileId limit 100) ORDER BY event_key", nativeQuery = true) | |
50 | - List<String> findAllKeysByDeviceProfileId(@Param("tenantId") UUID tenantId, | |
51 | - @Param("deviceProfileId") UUID deviceProfileId); | |
52 | - | |
53 | - @Query(value = "SELECT DISTINCT event_key FROM tk_event_kv WHERE entity_id " + | |
54 | - " in (SELECT id FROM device WHERE tenant_id = :tenantId limit 100) ORDER BY event_key", nativeQuery = true) | |
55 | - List<String> findAllKeysByTenantId(@Param("tenantId") UUID tenantId); | |
56 | - | |
57 | - @Query(value = "SELECT DISTINCT event_key FROM tk_event_kv WHERE entity_id " + | |
58 | - " in :entityIds ORDER BY event_key", nativeQuery = true) | |
59 | - List<String> findAllKeysByEntityIds(@Param("entityIds") List<UUID> entityIds); | |
30 | + @Query("SELECT a FROM TkEventKvEntity a WHERE a.id.entityId = :entityId " | |
31 | +// +"AND a.id.eventType = :eventType AND a.id.eventIdentifier = :eventIdentifier " | |
32 | + ) | |
33 | + Page<TkEventKvEntity> findEvents(@Param("entityId") UUID entityId, | |
34 | + Pageable pageable); | |
60 | 35 | } |
61 | 36 | ... | ... |
... | ... | @@ -29,16 +29,15 @@ import java.util.List; |
29 | 29 | public class HsqlEventsInsertRepository extends EventKvInsertRepository { |
30 | 30 | |
31 | 31 | private static final String INSERT_OR_UPDATE = |
32 | - "MERGE INTO tk_event_kv USING(VALUES ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) " + | |
33 | - "A (event_type, entity_id, event_identifier, event_key, str_v, long_v, dbl_v, bool_v, json_v, event_key) " + | |
32 | + "MERGE INTO tk_event_kv USING(VALUES ?, ?, ?, ?, ?, ?) " + | |
33 | + "A (event_type, entity_id, event_identifier, event_time, event_value,device_profile_id) " + | |
34 | 34 | "ON (tk_event_kv.event_type=A.event_type " + |
35 | 35 | "AND tk_event_kv.entity_id=A.entity_id " + |
36 | 36 | "AND tk_event_kv.event_identifier=A.event_identifier " + |
37 | 37 | "AND tk_event_kv.event_time=A.event_time) " + |
38 | - "AND tk_event_kv.event_key=A.event_key) " + | |
39 | - "WHEN MATCHED THEN UPDATE SET tk_event_kv.str_v = A.str_v, tk_event_kv.long_v = A.long_v, tk_event_kv.dbl_v = A.dbl_v, tk_event_kv.bool_v = A.bool_v, tk_event_kv.json_v = A.json_v " + | |
40 | - "WHEN NOT MATCHED THEN INSERT (event_type, entity_id, event_identifier, event_time, str_v, long_v, dbl_v, bool_v, json_v, event_key) " + | |
41 | - "VALUES (A.event_type, A.entity_id, A.event_identifier, A.event_time, A.str_v, A.long_v, A.dbl_v, A.bool_v, A.json_v, A.event_key)"; | |
38 | + "WHEN MATCHED THEN UPDATE SET tk_event_kv.event_value = A.event_value " + | |
39 | + "WHEN NOT MATCHED THEN INSERT (event_type, entity_id, event_identifier, event_time, event_value, device_profile_id) " + | |
40 | + "VALUES (A.event_type, A.entity_id, A.event_identifier, A.event_time, A.event_value, A.device_profile_id)"; | |
42 | 41 | |
43 | 42 | @Override |
44 | 43 | protected void saveOrUpdate(List<TkEventKvEntity> entities) { |
... | ... | @@ -48,29 +47,8 @@ public class HsqlEventsInsertRepository extends EventKvInsertRepository { |
48 | 47 | ps.setObject(2, entity.getId().getEntityId()); |
49 | 48 | ps.setString(3, entity.getId().getEventIdentifier()); |
50 | 49 | ps.setLong(4, entity.getId().getEventTime()); |
51 | - ps.setString(5, entity.getStrValue()); | |
52 | - | |
53 | - if (entity.getLongValue() != null) { | |
54 | - ps.setLong(6, entity.getLongValue()); | |
55 | - } else { | |
56 | - ps.setNull(6, Types.BIGINT); | |
57 | - } | |
58 | - | |
59 | - if (entity.getDoubleValue() != null) { | |
60 | - ps.setDouble(7, entity.getDoubleValue()); | |
61 | - } else { | |
62 | - ps.setNull(7, Types.DOUBLE); | |
63 | - } | |
64 | - | |
65 | - if (entity.getBooleanValue() != null) { | |
66 | - ps.setBoolean(8, entity.getBooleanValue()); | |
67 | - } else { | |
68 | - ps.setNull(8, Types.BOOLEAN); | |
69 | - } | |
70 | - | |
71 | - ps.setString(9, entity.getJsonValue()); | |
72 | - | |
73 | - ps.setString(10, entity.getId().getEventKey()); | |
50 | + ps.setObject(5, entity.getEventValue()); | |
51 | + ps.setObject(6, entity.getEventValue()); | |
74 | 52 | }); |
75 | 53 | }); |
76 | 54 | } | ... | ... |
... | ... | @@ -15,37 +15,32 @@ |
15 | 15 | */ |
16 | 16 | package org.thingsboard.server.dao.yunteng.jpa.repository.event; |
17 | 17 | |
18 | -import com.google.common.collect.Lists; | |
19 | -import com.google.common.util.concurrent.Futures; | |
20 | 18 | import com.google.common.util.concurrent.ListenableFuture; |
21 | 19 | import lombok.extern.slf4j.Slf4j; |
22 | 20 | import org.springframework.beans.factory.annotation.Autowired; |
23 | 21 | import org.springframework.beans.factory.annotation.Value; |
24 | 22 | import org.springframework.stereotype.Component; |
25 | -import org.thingsboard.server.common.data.EntityType; | |
26 | 23 | import org.thingsboard.server.common.data.id.DeviceProfileId; |
27 | 24 | import org.thingsboard.server.common.data.id.EntityId; |
28 | 25 | import org.thingsboard.server.common.data.id.TenantId; |
29 | -import org.thingsboard.server.common.data.yunteng.dto.TkEventKvEntry; | |
30 | -import org.thingsboard.server.common.data.yunteng.enums.TkEventType; | |
26 | +import org.thingsboard.server.common.data.page.PageData; | |
27 | +import org.thingsboard.server.common.data.page.TimePageLink; | |
28 | +import org.thingsboard.server.common.data.yunteng.dto.TkEventKvDto; | |
29 | +import org.thingsboard.server.common.data.yunteng.enums.DeviceEventTypeEnum; | |
31 | 30 | import org.thingsboard.server.common.stats.StatsFactory; |
32 | 31 | import org.thingsboard.server.dao.DaoUtil; |
33 | 32 | import org.thingsboard.server.dao.sql.JpaAbstractDaoListeningExecutorService; |
34 | 33 | import org.thingsboard.server.dao.sql.ScheduledLogExecutorComponent; |
35 | 34 | import org.thingsboard.server.dao.sql.TbSqlBlockingQueueParams; |
36 | 35 | import org.thingsboard.server.dao.sql.TbSqlBlockingQueueWrapper; |
36 | +import org.thingsboard.server.dao.yunteng.jpa.dao.event.EventsDao; | |
37 | 37 | import org.thingsboard.server.dao.yunteng.jpa.entity.events.TkEventKvCompositeKey; |
38 | 38 | import org.thingsboard.server.dao.yunteng.jpa.entity.events.TkEventKvEntity; |
39 | -import org.thingsboard.server.dao.yunteng.jpa.dao.event.EventsDao; | |
40 | 39 | |
41 | 40 | import javax.annotation.PostConstruct; |
42 | 41 | import javax.annotation.PreDestroy; |
43 | -import java.util.Collection; | |
44 | 42 | import java.util.Comparator; |
45 | -import java.util.List; | |
46 | -import java.util.Optional; | |
47 | 43 | import java.util.function.Function; |
48 | -import java.util.stream.Collectors; | |
49 | 44 | |
50 | 45 | @Component |
51 | 46 | @Slf4j |
... | ... | @@ -109,60 +104,18 @@ public class JpaEventDao extends JpaAbstractDaoListeningExecutorService implemen |
109 | 104 | } |
110 | 105 | |
111 | 106 | @Override |
112 | - public ListenableFuture<Optional<TkEventKvEntry>> find(TenantId tenantId, EntityId entityId, TkEventType eventType, String attributeKey) { | |
113 | - TkEventKvCompositeKey compositeKey = | |
114 | - getEventKvCompositeKey(eventType,entityId,null, attributeKey,null); | |
115 | - return Futures.immediateFuture( | |
116 | - Optional.ofNullable(DaoUtil.getData(eventKvRepository.findById(compositeKey)))); | |
117 | - } | |
107 | + public PageData<TkEventKvDto> findEvents(EntityId entityId,String eventIdentifier, DeviceEventTypeEnum eventType, TimePageLink pageLink) { | |
118 | 108 | |
119 | - @Override | |
120 | - public ListenableFuture<List<TkEventKvEntry>> find(TenantId tenantId, EntityId entityId, TkEventType eventType, Collection<String> attributeKeys) { | |
121 | - List<TkEventKvCompositeKey> compositeKeys = | |
122 | - attributeKeys | |
123 | - .stream() | |
124 | - .map(attributeKey -> | |
125 | - getEventKvCompositeKey(eventType,entityId, null,attributeKey,null)) | |
126 | - .collect(Collectors.toList()); | |
127 | - return Futures.immediateFuture( | |
128 | - DaoUtil.convertDataList(Lists.newArrayList(eventKvRepository.findAllById(compositeKeys)))); | |
109 | + return DaoUtil.toPageData(eventKvRepository.findEvents(entityId.getId(),DaoUtil.toPageable(pageLink))); | |
129 | 110 | } |
130 | 111 | |
131 | - @Override | |
132 | - public ListenableFuture<List<TkEventKvEntry>> findAll(TenantId tenantId, EntityId entityId, TkEventType eventType) { | |
133 | - return Futures.immediateFuture( | |
134 | - DaoUtil.convertDataList(Lists.newArrayList( | |
135 | - eventKvRepository.findAllByEventTypeAndEntityId( | |
136 | - eventType, | |
137 | - entityId.getId())))); | |
138 | - } | |
139 | 112 | |
140 | 113 | @Override |
141 | - public List<String> findAllKeysByDeviceProfileId(TenantId tenantId, DeviceProfileId deviceProfileId) { | |
142 | - if (deviceProfileId != null) { | |
143 | - return eventKvRepository.findAllKeysByDeviceProfileId(tenantId.getId(), deviceProfileId.getId()); | |
144 | - } else { | |
145 | - return eventKvRepository.findAllKeysByTenantId(tenantId.getId()); | |
146 | - } | |
147 | - } | |
148 | - | |
149 | - @Override | |
150 | - public List<String> findAllKeysByEntityIds(TenantId tenantId, EntityType entityType, List<EntityId> entityIds) { | |
151 | - return eventKvRepository | |
152 | - .findAllKeysByEntityIds(entityIds.stream().map(EntityId::getId).collect(Collectors.toList())); | |
153 | - } | |
154 | - | |
155 | - @Override | |
156 | - public ListenableFuture<Void> save(TenantId tenantId, EntityId entityId,TkEventType eventType, String eventIdentifier, TkEventKvEntry attribute) { | |
114 | + public ListenableFuture<Void> save(TenantId tenantId, DeviceProfileId profileId,EntityId entityId,DeviceEventTypeEnum eventType, String eventIdentifier, String eventData,Long eventTime) { | |
157 | 115 | TkEventKvEntity entity = new TkEventKvEntity(); |
158 | - entity.setId(new TkEventKvCompositeKey(eventType, entityId.getId(), eventIdentifier, attribute.getEventTime(),attribute.getKey())); | |
159 | - | |
160 | -// entity.setDeviceProfileId(); | |
161 | - entity.setStrValue(attribute.getStrValue().orElse(null)); | |
162 | - entity.setDoubleValue(attribute.getDoubleValue().orElse(null)); | |
163 | - entity.setLongValue(attribute.getLongValue().orElse(null)); | |
164 | - entity.setBooleanValue(attribute.getBooleanValue().orElse(null)); | |
165 | - entity.setJsonValue(attribute.getJsonValue().orElse(null)); | |
116 | + entity.setId(new TkEventKvCompositeKey(eventType, entityId.getId(), eventIdentifier, eventTime)); | |
117 | + entity.setDeviceProfileId(profileId.getId()); | |
118 | + entity.setEventValue(eventData); | |
166 | 119 | return addToQueue(entity); |
167 | 120 | } |
168 | 121 | |
... | ... | @@ -170,22 +123,4 @@ public class JpaEventDao extends JpaAbstractDaoListeningExecutorService implemen |
170 | 123 | return queue.add(entity); |
171 | 124 | } |
172 | 125 | |
173 | - @Override | |
174 | - public ListenableFuture<List<Void>> removeAll(TenantId tenantId, EntityId entityId, String eventId,TkEventType eventType,List<String> keys) { | |
175 | - return service.submit(() -> { | |
176 | - keys.forEach(key -> | |
177 | - eventKvRepository.delete(eventType,entityId.getId(), eventId,key) | |
178 | - ); | |
179 | - return null; | |
180 | - }); | |
181 | - } | |
182 | - | |
183 | - private TkEventKvCompositeKey getEventKvCompositeKey(TkEventType eventType,EntityId entityId, String eventIdentifier,String eventKey, Long eventTime) { | |
184 | - return new TkEventKvCompositeKey( | |
185 | - eventType, | |
186 | - entityId.getId(), | |
187 | - eventIdentifier, | |
188 | - eventTime, | |
189 | - eventKey); | |
190 | - } | |
191 | 126 | } | ... | ... |
... | ... | @@ -17,13 +17,14 @@ package org.thingsboard.rule.engine.api; |
17 | 17 | |
18 | 18 | import com.google.common.util.concurrent.FutureCallback; |
19 | 19 | import org.thingsboard.server.common.data.id.CustomerId; |
20 | +import org.thingsboard.server.common.data.id.DeviceProfileId; | |
20 | 21 | import org.thingsboard.server.common.data.id.EntityId; |
21 | 22 | import org.thingsboard.server.common.data.id.TenantId; |
22 | 23 | import org.thingsboard.server.common.data.kv.AttributeKvEntry; |
23 | 24 | import org.thingsboard.server.common.data.kv.DeleteTsKvQuery; |
24 | 25 | import org.thingsboard.server.common.data.kv.TsKvEntry; |
25 | 26 | import org.thingsboard.server.common.data.yunteng.dto.TkEventKvEntry; |
26 | -import org.thingsboard.server.common.data.yunteng.enums.TkEventType; | |
27 | +import org.thingsboard.server.common.data.yunteng.enums.DeviceEventTypeEnum; | |
27 | 28 | |
28 | 29 | import java.util.Collection; |
29 | 30 | import java.util.List; |
... | ... | @@ -44,7 +45,7 @@ public interface RuleEngineTelemetryService { |
44 | 45 | void saveAndNotify(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes, boolean notifyDevice, FutureCallback<Void> callback); |
45 | 46 | |
46 | 47 | //Thingskit function |
47 | - void saveAndNotify(TenantId tenantId, EntityId entityId, String eventId, TkEventType eventType, List<TkEventKvEntry> attributes, FutureCallback<Void> callback); | |
48 | + void saveAndNotify(TenantId tenantId, DeviceProfileId profileId, EntityId entityId, String eventId, DeviceEventTypeEnum eventType, String eventData,Long eventTime, FutureCallback<Void> callback); | |
48 | 49 | |
49 | 50 | void saveLatestAndNotify(TenantId tenantId, EntityId entityId, List<TsKvEntry> ts, FutureCallback<Void> callback); |
50 | 51 | ... | ... |
... | ... | @@ -15,25 +15,19 @@ |
15 | 15 | */ |
16 | 16 | package org.thingsboard.rule.engine.yunteng.event; |
17 | 17 | |
18 | -import com.google.gson.JsonParser; | |
19 | 18 | import lombok.extern.slf4j.Slf4j; |
20 | -import org.apache.commons.lang3.StringUtils; | |
21 | 19 | import org.thingsboard.rule.engine.api.*; |
22 | 20 | import org.thingsboard.rule.engine.api.util.TbNodeUtils; |
23 | 21 | import org.thingsboard.rule.engine.telemetry.TelemetryNodeCallback; |
24 | -import org.thingsboard.server.common.data.kv.AttributeKvEntry; | |
22 | +import org.thingsboard.server.common.data.id.DeviceProfileId; | |
25 | 23 | import org.thingsboard.server.common.data.plugin.ComponentType; |
26 | -import org.thingsboard.server.common.data.yunteng.dto.TkEventKvEntry; | |
27 | -import org.thingsboard.server.common.data.yunteng.enums.TkEventType; | |
24 | +import org.thingsboard.server.common.data.yunteng.enums.DeviceEventTypeEnum; | |
28 | 25 | import org.thingsboard.server.common.msg.TbMsg; |
29 | 26 | import org.thingsboard.server.common.msg.session.SessionMsgType; |
30 | -import org.thingsboard.server.common.transport.adaptor.JsonConverter; | |
31 | 27 | |
32 | -import java.util.ArrayList; | |
33 | -import java.util.Set; | |
28 | +import java.util.UUID; | |
34 | 29 | |
35 | -import static org.thingsboard.server.dao.model.ModelConstants.EVENT_IDENTIFIER_COLUMN; | |
36 | -import static org.thingsboard.server.dao.model.ModelConstants.EVENT_TYPE_COLUMN; | |
30 | +import static org.thingsboard.server.dao.model.ModelConstants.*; | |
37 | 31 | |
38 | 32 | @Slf4j |
39 | 33 | @RuleNode( |
... | ... | @@ -49,10 +43,12 @@ import static org.thingsboard.server.dao.model.ModelConstants.EVENT_TYPE_COLUMN; |
49 | 43 | public class TkMsgEventNode implements TbNode { |
50 | 44 | |
51 | 45 | private TkMsgEventNodeConfiguration config; |
46 | + private RuleEngineDeviceProfileCache cache; | |
52 | 47 | |
53 | 48 | @Override |
54 | 49 | public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { |
55 | 50 | this.config = TbNodeUtils.convert(configuration, TkMsgEventNodeConfiguration.class); |
51 | + cache = ctx.getDeviceProfileCache(); | |
56 | 52 | } |
57 | 53 | |
58 | 54 | @Override |
... | ... | @@ -61,16 +57,18 @@ public class TkMsgEventNode implements TbNode { |
61 | 57 | ctx.tellFailure(msg, new IllegalArgumentException("Unsupported msg type: " + msg.getType())); |
62 | 58 | return; |
63 | 59 | } |
64 | - String src = msg.getData(); | |
65 | - Set<TkEventKvEntry> events = JsonConverter.convertToEvents(new JsonParser().parse(src)); | |
66 | 60 | String eventIdentifier = msg.getMetaData().getValue(EVENT_IDENTIFIER_COLUMN); |
67 | - TkEventType eventType = TkEventType.valueOf(msg.getMetaData().getValue(EVENT_TYPE_COLUMN)); | |
61 | + String deviceProfileId = msg.getMetaData().getValue(DEVICE_DEVICE_PROFILE_ID_PROPERTY); | |
62 | + ////TODO: 验证事件类型、事件标识符和数据建是否与产品物模型中的事件匹配 | |
63 | + long ts = System.currentTimeMillis(); | |
64 | + String src = msg.getData(); | |
65 | + DeviceEventTypeEnum eventType = DeviceEventTypeEnum.valueOf(msg.getMetaData().getValue(EVENT_TYPE_COLUMN)); | |
68 | 66 | ctx.getTelemetryService().saveAndNotify( |
69 | - ctx.getTenantId(), | |
67 | + ctx.getTenantId(),new DeviceProfileId(UUID.fromString(deviceProfileId)), | |
70 | 68 | msg.getOriginator(), |
71 | 69 | eventIdentifier, |
72 | 70 | eventType, |
73 | - new ArrayList<>(events), | |
71 | + src,ts, | |
74 | 72 | new TelemetryNodeCallback(ctx, msg) |
75 | 73 | ); |
76 | 74 | } | ... | ... |