Showing
23 changed files
with
1222 additions
and
20 deletions
... | ... | @@ -39,12 +39,15 @@ import org.thingsboard.server.common.data.kv.LongDataEntry; |
39 | 39 | import org.thingsboard.server.common.data.kv.StringDataEntry; |
40 | 40 | import org.thingsboard.server.common.data.kv.TsKvEntry; |
41 | 41 | import org.thingsboard.server.common.data.kv.TsKvLatestRemovingResult; |
42 | +import org.thingsboard.server.common.data.yunteng.dto.TkEventKvEntry; | |
43 | +import org.thingsboard.server.common.data.yunteng.enums.TkEventType; | |
42 | 44 | import org.thingsboard.server.common.msg.queue.ServiceType; |
43 | 45 | import org.thingsboard.server.common.msg.queue.TbCallback; |
44 | 46 | import org.thingsboard.server.common.msg.queue.TopicPartitionInfo; |
45 | 47 | import org.thingsboard.server.dao.attributes.AttributesService; |
46 | 48 | import org.thingsboard.server.dao.entityview.EntityViewService; |
47 | 49 | import org.thingsboard.server.dao.timeseries.TimeseriesService; |
50 | +import org.thingsboard.server.dao.yunteng.event.TkEventsService; | |
48 | 51 | import org.thingsboard.server.gen.transport.TransportProtos; |
49 | 52 | import org.thingsboard.server.queue.discovery.PartitionService; |
50 | 53 | import org.thingsboard.server.queue.usagestats.TbApiUsageClient; |
... | ... | @@ -74,6 +77,8 @@ import java.util.concurrent.Executors; |
74 | 77 | public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionService implements TelemetrySubscriptionService { |
75 | 78 | |
76 | 79 | private final AttributesService attrService; |
80 | + | |
81 | + private final TkEventsService eventsService; | |
77 | 82 | private final TimeseriesService tsService; |
78 | 83 | private final EntityViewService entityViewService; |
79 | 84 | private final TbApiUsageClient apiUsageClient; |
... | ... | @@ -82,6 +87,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer |
82 | 87 | private ExecutorService tsCallBackExecutor; |
83 | 88 | |
84 | 89 | public DefaultTelemetrySubscriptionService(AttributesService attrService, |
90 | + TkEventsService eventsService, | |
85 | 91 | TimeseriesService tsService, |
86 | 92 | EntityViewService entityViewService, |
87 | 93 | TbClusterService clusterService, |
... | ... | @@ -90,6 +96,7 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer |
90 | 96 | TbApiUsageStateService apiUsageStateService) { |
91 | 97 | super(clusterService, partitionService); |
92 | 98 | this.attrService = attrService; |
99 | + this.eventsService = eventsService; | |
93 | 100 | this.tsService = tsService; |
94 | 101 | this.entityViewService = entityViewService; |
95 | 102 | this.apiUsageClient = apiUsageClient; |
... | ... | @@ -241,6 +248,20 @@ public class DefaultTelemetrySubscriptionService extends AbstractSubscriptionSer |
241 | 248 | saveAndNotifyInternal(tenantId, entityId, scope, attributes, notifyDevice, callback); |
242 | 249 | } |
243 | 250 | |
251 | + | |
252 | + //Thingskit function | |
253 | + @Override | |
254 | + public void saveAndNotify(TenantId tenantId, EntityId entityId, String eventId, TkEventType eventType, List<TkEventKvEntry> attributes, FutureCallback<Void> callback) { | |
255 | + checkInternalEntity(entityId); | |
256 | + saveAndNotifyInternal(tenantId, entityId, eventId,eventType, attributes, callback); | |
257 | + } | |
258 | + @Override | |
259 | + public void saveAndNotifyInternal(TenantId tenantId, EntityId entityId,String eventId, TkEventType eventType, List<TkEventKvEntry> attributes, FutureCallback<Void> callback) { | |
260 | + ListenableFuture<List<Void>> saveFuture = eventsService.save(tenantId, entityId,eventId,eventType, attributes); | |
261 | + addVoidCallback(saveFuture, callback); | |
262 | +// addWsCallback(saveFuture, success -> onAttributesUpdate(tenantId, entityId, scope, attributes, notifyDevice)); | |
263 | + } | |
264 | + | |
244 | 265 | @Override |
245 | 266 | public void saveAndNotifyInternal(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes, boolean notifyDevice, FutureCallback<Void> callback) { |
246 | 267 | ListenableFuture<List<Void>> saveFuture = attrService.save(tenantId, entityId, scope, attributes); | ... | ... |
... | ... | @@ -21,6 +21,8 @@ import org.thingsboard.server.common.data.id.EntityId; |
21 | 21 | import org.thingsboard.server.common.data.id.TenantId; |
22 | 22 | import org.thingsboard.server.common.data.kv.AttributeKvEntry; |
23 | 23 | import org.thingsboard.server.common.data.kv.TsKvEntry; |
24 | +import org.thingsboard.server.common.data.yunteng.dto.TkEventKvEntry; | |
25 | +import org.thingsboard.server.common.data.yunteng.enums.TkEventType; | |
24 | 26 | |
25 | 27 | import java.util.List; |
26 | 28 | |
... | ... | @@ -35,6 +37,9 @@ public interface InternalTelemetryService extends RuleEngineTelemetryService { |
35 | 37 | |
36 | 38 | void saveAndNotifyInternal(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes, boolean notifyDevice, FutureCallback<Void> callback); |
37 | 39 | |
40 | + //Thingskit function | |
41 | + void saveAndNotifyInternal(TenantId tenantId, EntityId entityId, String eventId, TkEventType eventType, List<TkEventKvEntry> attributes, FutureCallback<Void> callback); | |
42 | + | |
38 | 43 | void saveLatestAndNotifyInternal(TenantId tenantId, EntityId entityId, List<TsKvEntry> ts, FutureCallback<Void> callback); |
39 | 44 | |
40 | 45 | void deleteAndNotifyInternal(TenantId tenantId, EntityId entityId, String scope, List<String> keys, FutureCallback<Void> callback); | ... | ... |
common/dao-api/src/main/java/org/thingsboard/server/dao/yunteng/event/TkEventsService.java
0 → 100644
1 | +/** | |
2 | + * Copyright © 2016-2022 The Thingsboard Authors | |
3 | + * | |
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | + * you may not use this file except in compliance with the License. | |
6 | + * You may obtain a copy of the License at | |
7 | + * | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | + * | |
10 | + * Unless required by applicable law or agreed to in writing, software | |
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | |
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | + * See the License for the specific language governing permissions and | |
14 | + * limitations under the License. | |
15 | + */ | |
16 | +package org.thingsboard.server.dao.yunteng.event; | |
17 | + | |
18 | +import com.google.common.util.concurrent.ListenableFuture; | |
19 | +import org.thingsboard.server.common.data.EntityType; | |
20 | +import org.thingsboard.server.common.data.id.DeviceProfileId; | |
21 | +import org.thingsboard.server.common.data.id.EntityId; | |
22 | +import org.thingsboard.server.common.data.id.TenantId; | |
23 | +import org.thingsboard.server.common.data.yunteng.dto.TkEventKvEntry; | |
24 | +import org.thingsboard.server.common.data.yunteng.enums.TkEventType; | |
25 | + | |
26 | +import java.util.Collection; | |
27 | +import java.util.List; | |
28 | +import java.util.Optional; | |
29 | + | |
30 | +/** | |
31 | + * @author Andrew Shvayka | |
32 | + */ | |
33 | +public interface TkEventsService { | |
34 | + | |
35 | + ListenableFuture<Optional<TkEventKvEntry>> find(TenantId tenantId, EntityId entityId, String scope,String eventKey); | |
36 | + | |
37 | + ListenableFuture<List<TkEventKvEntry>> find(TenantId tenantId, EntityId entityId, String scope, Collection<String> eventKeys); | |
38 | + | |
39 | + ListenableFuture<List<TkEventKvEntry>> findAll(TenantId tenantId, EntityId entityId,String eventId, TkEventType eventType); | |
40 | + | |
41 | + ListenableFuture<List<Void>> save(TenantId tenantId, EntityId entityId, String eventId, TkEventType eventType, List<TkEventKvEntry> attributes); | |
42 | + | |
43 | + ListenableFuture<List<Void>> removeAll(TenantId tenantId, EntityId entityId, String eventId, TkEventType eventType, List<String> eventKeys); | |
44 | + | |
45 | + List<String> findAllKeysByDeviceProfileId(TenantId tenantId, DeviceProfileId deviceProfileId); | |
46 | + | |
47 | + List<String> findAllKeysByEntityIds(TenantId tenantId, EntityType entityType, List<EntityId> entityIds); | |
48 | + | |
49 | +} | ... | ... |
common/data/src/main/java/org/thingsboard/server/common/data/yunteng/dto/TkEventKvDto.java
0 → 100644
1 | +/** | |
2 | + * Copyright © 2016-2022 The Thingsboard Authors | |
3 | + * | |
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | + * you may not use this file except in compliance with the License. | |
6 | + * You may obtain a copy of the License at | |
7 | + * | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | + * | |
10 | + * Unless required by applicable law or agreed to in writing, software | |
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | |
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | + * See the License for the specific language governing permissions and | |
14 | + * limitations under the License. | |
15 | + */ | |
16 | +package org.thingsboard.server.common.data.yunteng.dto; | |
17 | + | |
18 | +import org.thingsboard.server.common.data.kv.AttributeKvEntry; | |
19 | +import org.thingsboard.server.common.data.kv.DataType; | |
20 | +import org.thingsboard.server.common.data.kv.KvEntry; | |
21 | + | |
22 | +import javax.validation.Valid; | |
23 | +import java.util.Optional; | |
24 | + | |
25 | +/** | |
26 | + * @author Andrew Shvayka | |
27 | + */ | |
28 | +public class TkEventKvDto implements TkEventKvEntry { | |
29 | + | |
30 | + private static final long serialVersionUID = -6460767583563159407L; | |
31 | + | |
32 | + private final long eventTime; | |
33 | + @Valid | |
34 | + private final KvEntry kv; | |
35 | + | |
36 | + public TkEventKvDto(KvEntry kv, long eventTime) { | |
37 | + this.kv = kv; | |
38 | + this.eventTime = eventTime; | |
39 | + } | |
40 | + | |
41 | + public TkEventKvDto(long lastUpdateTs, KvEntry kv) { | |
42 | + this(kv, lastUpdateTs); | |
43 | + } | |
44 | + | |
45 | + @Override | |
46 | + public long getEventTime() { | |
47 | + return eventTime; | |
48 | + } | |
49 | + | |
50 | + @Override | |
51 | + public String getKey() { | |
52 | + return kv.getKey(); | |
53 | + } | |
54 | + | |
55 | + @Override | |
56 | + public DataType getDataType() { | |
57 | + return kv.getDataType(); | |
58 | + } | |
59 | + | |
60 | + @Override | |
61 | + public Optional<String> getStrValue() { | |
62 | + return kv.getStrValue(); | |
63 | + } | |
64 | + | |
65 | + @Override | |
66 | + public Optional<Long> getLongValue() { | |
67 | + return kv.getLongValue(); | |
68 | + } | |
69 | + | |
70 | + @Override | |
71 | + public Optional<Boolean> getBooleanValue() { | |
72 | + return kv.getBooleanValue(); | |
73 | + } | |
74 | + | |
75 | + @Override | |
76 | + public Optional<Double> getDoubleValue() { | |
77 | + return kv.getDoubleValue(); | |
78 | + } | |
79 | + | |
80 | + @Override | |
81 | + public Optional<String> getJsonValue() { | |
82 | + return kv.getJsonValue(); | |
83 | + } | |
84 | + | |
85 | + @Override | |
86 | + public String getValueAsString() { | |
87 | + return kv.getValueAsString(); | |
88 | + } | |
89 | + | |
90 | + @Override | |
91 | + public Object getValue() { | |
92 | + return kv.getValue(); | |
93 | + } | |
94 | + | |
95 | + @Override | |
96 | + public boolean equals(Object o) { | |
97 | + if (this == o) return true; | |
98 | + if (o == null || getClass() != o.getClass()) return false; | |
99 | + | |
100 | + TkEventKvDto that = (TkEventKvDto) o; | |
101 | + | |
102 | + if (eventTime != that.eventTime) return false; | |
103 | + return kv.equals(that.kv); | |
104 | + | |
105 | + } | |
106 | + | |
107 | + @Override | |
108 | + public int hashCode() { | |
109 | + int result = (int) (eventTime ^ (eventTime >>> 32)); | |
110 | + result = 31 * result + kv.hashCode(); | |
111 | + return result; | |
112 | + } | |
113 | + | |
114 | + @Override | |
115 | + public String toString() { | |
116 | + return "BaseAttributeKvEntry{" + | |
117 | + "lastUpdateTs=" + eventTime + | |
118 | + ", kv=" + kv + | |
119 | + '}'; | |
120 | + } | |
121 | +} | ... | ... |
common/data/src/main/java/org/thingsboard/server/common/data/yunteng/dto/TkEventKvEntry.java
0 → 100644
1 | +/** | |
2 | + * Copyright © 2016-2022 The Thingsboard Authors | |
3 | + * | |
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | + * you may not use this file except in compliance with the License. | |
6 | + * You may obtain a copy of the License at | |
7 | + * | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | + * | |
10 | + * Unless required by applicable law or agreed to in writing, software | |
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | |
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | + * See the License for the specific language governing permissions and | |
14 | + * limitations under the License. | |
15 | + */ | |
16 | +package org.thingsboard.server.common.data.yunteng.dto; | |
17 | + | |
18 | +import org.thingsboard.server.common.data.kv.KvEntry; | |
19 | + | |
20 | +/** | |
21 | + * @author Andrew Shvayka | |
22 | + */ | |
23 | +public interface TkEventKvEntry extends KvEntry { | |
24 | + | |
25 | + long getEventTime(); | |
26 | + | |
27 | +} | ... | ... |
... | ... | @@ -34,6 +34,8 @@ import org.thingsboard.server.common.data.kv.JsonDataEntry; |
34 | 34 | import org.thingsboard.server.common.data.kv.KvEntry; |
35 | 35 | import org.thingsboard.server.common.data.kv.LongDataEntry; |
36 | 36 | import org.thingsboard.server.common.data.kv.StringDataEntry; |
37 | +import org.thingsboard.server.common.data.yunteng.dto.TkEventKvDto; | |
38 | +import org.thingsboard.server.common.data.yunteng.dto.TkEventKvEntry; | |
37 | 39 | import org.thingsboard.server.gen.transport.TransportProtos; |
38 | 40 | import org.thingsboard.server.gen.transport.TransportProtos.AttributeUpdateNotificationMsg; |
39 | 41 | import org.thingsboard.server.gen.transport.TransportProtos.ClaimDeviceMsg; |
... | ... | @@ -516,6 +518,13 @@ public class JsonConverter { |
516 | 518 | return result; |
517 | 519 | } |
518 | 520 | |
521 | + public static Set<TkEventKvEntry> convertToEvents(JsonElement element) { | |
522 | + Set<TkEventKvEntry> result = new HashSet<>(); | |
523 | + long ts = System.currentTimeMillis(); | |
524 | + result.addAll(parseValues(element.getAsJsonObject()).stream().map(kv -> new TkEventKvDto(kv, ts)).collect(Collectors.toList())); | |
525 | + return result; | |
526 | + } | |
527 | + | |
519 | 528 | private static List<KvEntry> parseValues(JsonObject valuesObject) { |
520 | 529 | List<KvEntry> result = new ArrayList<>(); |
521 | 530 | for (Entry<String, JsonElement> valueEntry : valuesObject.entrySet()) { | ... | ... |
... | ... | @@ -293,8 +293,8 @@ public class DefaultTransportService implements TransportService { |
293 | 293 | |
294 | 294 | @Override |
295 | 295 | public TransportProtos.GetEntityProfileResponseMsg getEntityProfile(TransportProtos.GetEntityProfileRequestMsg msg) { |
296 | - TbProtoQueueMsg<TransportProtos.TransportApiRequestMsg> protoMsg = | |
297 | - new TbProtoQueueMsg<>(UUID.randomUUID(), TransportProtos.TransportApiRequestMsg.newBuilder().setEntityProfileRequestMsg(msg).build()); | |
296 | + TbProtoQueueMsg<TransportApiRequestMsg> protoMsg = | |
297 | + new TbProtoQueueMsg<>(UUID.randomUUID(), TransportApiRequestMsg.newBuilder().setEntityProfileRequestMsg(msg).build()); | |
298 | 298 | try { |
299 | 299 | TbProtoQueueMsg<TransportApiResponseMsg> response = transportApiRequestTemplate.send(protoMsg).get(); |
300 | 300 | return response.getValue().getEntityProfileResponseMsg(); |
... | ... | @@ -305,8 +305,8 @@ public class DefaultTransportService implements TransportService { |
305 | 305 | |
306 | 306 | @Override |
307 | 307 | public TransportProtos.GetResourceResponseMsg getResource(TransportProtos.GetResourceRequestMsg msg) { |
308 | - TbProtoQueueMsg<TransportProtos.TransportApiRequestMsg> protoMsg = | |
309 | - new TbProtoQueueMsg<>(UUID.randomUUID(), TransportProtos.TransportApiRequestMsg.newBuilder().setResourceRequestMsg(msg).build()); | |
308 | + TbProtoQueueMsg<TransportApiRequestMsg> protoMsg = | |
309 | + new TbProtoQueueMsg<>(UUID.randomUUID(), TransportApiRequestMsg.newBuilder().setResourceRequestMsg(msg).build()); | |
310 | 310 | try { |
311 | 311 | TbProtoQueueMsg<TransportApiResponseMsg> response = transportApiRequestTemplate.send(protoMsg).get(); |
312 | 312 | return response.getValue().getResourceResponseMsg(); |
... | ... | @@ -317,8 +317,8 @@ public class DefaultTransportService implements TransportService { |
317 | 317 | |
318 | 318 | @Override |
319 | 319 | public TransportProtos.GetSnmpDevicesResponseMsg getSnmpDevicesIds(TransportProtos.GetSnmpDevicesRequestMsg requestMsg) { |
320 | - TbProtoQueueMsg<TransportProtos.TransportApiRequestMsg> protoMsg = new TbProtoQueueMsg<>( | |
321 | - UUID.randomUUID(), TransportProtos.TransportApiRequestMsg.newBuilder() | |
320 | + TbProtoQueueMsg<TransportApiRequestMsg> protoMsg = new TbProtoQueueMsg<>( | |
321 | + UUID.randomUUID(), TransportApiRequestMsg.newBuilder() | |
322 | 322 | .setSnmpDevicesRequestMsg(requestMsg) |
323 | 323 | .build() |
324 | 324 | ); |
... | ... | @@ -334,7 +334,7 @@ public class DefaultTransportService implements TransportService { |
334 | 334 | @Override |
335 | 335 | public TransportProtos.GetDeviceResponseMsg getDevice(TransportProtos.GetDeviceRequestMsg requestMsg) { |
336 | 336 | TbProtoQueueMsg<TransportApiRequestMsg> protoMsg = new TbProtoQueueMsg<>( |
337 | - UUID.randomUUID(), TransportProtos.TransportApiRequestMsg.newBuilder() | |
337 | + UUID.randomUUID(), TransportApiRequestMsg.newBuilder() | |
338 | 338 | .setDeviceRequestMsg(requestMsg) |
339 | 339 | .build() |
340 | 340 | ); |
... | ... | @@ -354,7 +354,7 @@ public class DefaultTransportService implements TransportService { |
354 | 354 | @Override |
355 | 355 | public TransportProtos.GetDeviceCredentialsResponseMsg getDeviceCredentials(TransportProtos.GetDeviceCredentialsRequestMsg requestMsg) { |
356 | 356 | TbProtoQueueMsg<TransportApiRequestMsg> protoMsg = new TbProtoQueueMsg<>( |
357 | - UUID.randomUUID(), TransportProtos.TransportApiRequestMsg.newBuilder() | |
357 | + UUID.randomUUID(), TransportApiRequestMsg.newBuilder() | |
358 | 358 | .setDeviceCredentialsRequestMsg(requestMsg) |
359 | 359 | .build() |
360 | 360 | ); |
... | ... | @@ -576,25 +576,26 @@ public class DefaultTransportService implements TransportService { |
576 | 576 | @Override |
577 | 577 | public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.PostEventMsg msg, |
578 | 578 | TransportServiceCallback<Void> callback,String topicName) { |
579 | - //Topic地址:v1/devices/event/{${identifier}}/{$eventType} | |
580 | - String[] topicInfo = topicName.split(MqttTopics.BASE_DEVICE_EVENT_TOPIC); | |
579 | + //Topic地址:v1/devices/event/${deviceId}/${identifier}/{$eventType} | |
580 | + String[] topicInfo = topicName.split(MqttTopics.BASE_DEVICE_EVENT_TOPIC + "/"); | |
581 | 581 | if(null == topicInfo || topicInfo.length !=2){ |
582 | 582 | throw new TkDataValidationException(ErrorMessage.INVALID_TOPIC.getMessage()); |
583 | 583 | } |
584 | 584 | String[] eventInfo = topicInfo[1].split("/"); |
585 | - if(null == eventInfo || eventInfo.length !=2){ | |
585 | + if(null == eventInfo || eventInfo.length !=3){ | |
586 | 586 | throw new TkDataValidationException(ErrorMessage.INVALID_TOPIC.getMessage()); |
587 | 587 | } |
588 | 588 | reportActivityInternal(sessionInfo); |
589 | 589 | JsonObject json = JsonUtils.getJsonObject(msg.getKvList()); |
590 | 590 | TbMsgMetaData metaData = new TbMsgMetaData(); |
591 | - metaData.putValue("deviceName", sessionInfo.getDeviceName()); | |
592 | - metaData.putValue("deviceType", sessionInfo.getDeviceType()); | |
593 | - metaData.putValue("event_identifier", eventInfo[0]); | |
594 | - metaData.putValue("event_type",eventInfo[1]); | |
595 | 591 | CustomerId customerId = getCustomerId(sessionInfo); |
596 | 592 | TenantId tenantId = getTenantId(sessionInfo); |
597 | 593 | DeviceId deviceId = new DeviceId(new UUID(sessionInfo.getDeviceIdMSB(), sessionInfo.getDeviceIdLSB())); |
594 | + metaData.putValue("deviceName", sessionInfo.getDeviceName()); | |
595 | + metaData.putValue("deviceType", sessionInfo.getDeviceType()); | |
596 | + metaData.putValue("deviceId",eventInfo[0]); | |
597 | + metaData.putValue("event_identifier", eventInfo[1]); | |
598 | + metaData.putValue("event_type",eventInfo[2]); | |
598 | 599 | sendToRuleEngine(tenantId, deviceId, customerId, sessionInfo, json, metaData, SessionMsgType.POST_EVENT_REQUEST, |
599 | 600 | new TransportTbQueueCallback(new ApiStatsProxyCallback<>(tenantId, customerId, msg.getKvList().size(), callback))); |
600 | 601 | } |
... | ... | @@ -726,8 +727,8 @@ public class DefaultTransportService implements TransportService { |
726 | 727 | @Override |
727 | 728 | public void process(TransportProtos.SessionInfoProto sessionInfo, TransportProtos.GetOtaPackageRequestMsg msg, TransportServiceCallback<TransportProtos.GetOtaPackageResponseMsg> callback) { |
728 | 729 | if (checkLimits(sessionInfo, msg, callback)) { |
729 | - TbProtoQueueMsg<TransportProtos.TransportApiRequestMsg> protoMsg = | |
730 | - new TbProtoQueueMsg<>(UUID.randomUUID(), TransportProtos.TransportApiRequestMsg.newBuilder().setOtaPackageRequestMsg(msg).build()); | |
730 | + TbProtoQueueMsg<TransportApiRequestMsg> protoMsg = | |
731 | + new TbProtoQueueMsg<>(UUID.randomUUID(), TransportApiRequestMsg.newBuilder().setOtaPackageRequestMsg(msg).build()); | |
731 | 732 | |
732 | 733 | AsyncCallbackTemplate.withCallback(transportApiRequestTemplate.send(protoMsg), response -> { |
733 | 734 | callback.onSuccess(response.getValue().getOtaPackageResponseMsg()); |
... | ... | @@ -870,7 +871,7 @@ public class DefaultTransportService implements TransportService { |
870 | 871 | } |
871 | 872 | } |
872 | 873 | |
873 | - protected void processToTransportMsg(TransportProtos.ToTransportMsg toSessionMsg) { | |
874 | + protected void processToTransportMsg(ToTransportMsg toSessionMsg) { | |
874 | 875 | UUID sessionId = new UUID(toSessionMsg.getSessionIdMSB(), toSessionMsg.getSessionIdLSB()); |
875 | 876 | SessionMetaData md = sessions.get(sessionId); |
876 | 877 | if (md != null) { | ... | ... |
... | ... | @@ -56,6 +56,12 @@ public class ModelConstants { |
56 | 56 | public static final String ATTRIBUTE_KEY_COLUMN = "attribute_key"; |
57 | 57 | public static final String LAST_UPDATE_TS_COLUMN = "last_update_ts"; |
58 | 58 | |
59 | + //Thingskit function | |
60 | + public static final String EVENT_IDENTIFIER_COLUMN = "event_identifier"; | |
61 | + public static final String EVENT_TIME_COLUMN = "event_time"; | |
62 | + public static final String EVENT_TYPE_COLUMN = "event_type"; | |
63 | + public static final String EVENT_KEY_COLUMN = "event_key"; | |
64 | + | |
59 | 65 | /** |
60 | 66 | * Cassandra user constants. |
61 | 67 | */ | ... | ... |
dao/src/main/java/org/thingsboard/server/dao/yunteng/jpa/dao/event/BaseEventsService.java
0 → 100644
1 | +/** | |
2 | + * Copyright © 2016-2022 The Thingsboard Authors | |
3 | + * | |
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | + * you may not use this file except in compliance with the License. | |
6 | + * You may obtain a copy of the License at | |
7 | + * | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | + * | |
10 | + * Unless required by applicable law or agreed to in writing, software | |
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | |
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | + * See the License for the specific language governing permissions and | |
14 | + * limitations under the License. | |
15 | + */ | |
16 | +package org.thingsboard.server.dao.yunteng.jpa.dao.event; | |
17 | + | |
18 | +import com.google.common.util.concurrent.Futures; | |
19 | +import com.google.common.util.concurrent.ListenableFuture; | |
20 | +import lombok.extern.slf4j.Slf4j; | |
21 | +import org.springframework.stereotype.Service; | |
22 | +import org.thingsboard.server.common.data.EntityType; | |
23 | +import org.thingsboard.server.common.data.id.DeviceProfileId; | |
24 | +import org.thingsboard.server.common.data.id.EntityId; | |
25 | +import org.thingsboard.server.common.data.id.TenantId; | |
26 | +import org.thingsboard.server.common.data.yunteng.dto.TkEventKvEntry; | |
27 | +import org.thingsboard.server.common.data.yunteng.enums.TkEventType; | |
28 | +import org.thingsboard.server.dao.service.Validator; | |
29 | +import org.thingsboard.server.dao.yunteng.event.TkEventsService; | |
30 | + | |
31 | +import java.util.Collection; | |
32 | +import java.util.List; | |
33 | +import java.util.Optional; | |
34 | +import java.util.stream.Collectors; | |
35 | + | |
36 | +import static org.thingsboard.server.dao.yunteng.jpa.dao.event.EventUtils.validate; | |
37 | + | |
38 | + | |
39 | +/** | |
40 | + * @author Andrew Shvayka | |
41 | + */ | |
42 | +@Service | |
43 | +@Slf4j | |
44 | +public class BaseEventsService implements TkEventsService { | |
45 | + private final EventsDao eventsDao; | |
46 | + | |
47 | + public BaseEventsService(EventsDao eventsDao) { | |
48 | + this.eventsDao = eventsDao; | |
49 | + } | |
50 | + | |
51 | + @Override | |
52 | + public ListenableFuture<Optional<TkEventKvEntry>> find(TenantId tenantId, EntityId entityId, String eventId,String attributeKey) { | |
53 | + validate(entityId, eventId); | |
54 | + Validator.validateString(attributeKey, "Incorrect attribute key " + attributeKey); | |
55 | + return eventsDao.find(tenantId, entityId, null, attributeKey); | |
56 | + } | |
57 | + | |
58 | + @Override | |
59 | + public ListenableFuture<List<TkEventKvEntry>> find(TenantId tenantId, EntityId entityId, String eventId, Collection<String> attributeKeys) { | |
60 | + validate(entityId, eventId); | |
61 | + attributeKeys.forEach(attributeKey -> Validator.validateString(attributeKey, "Incorrect attribute key " + attributeKey)); | |
62 | + return eventsDao.find(tenantId, entityId, null, attributeKeys); | |
63 | + } | |
64 | + | |
65 | + @Override | |
66 | + public ListenableFuture<List<TkEventKvEntry>> findAll(TenantId tenantId, EntityId entityId,String eventId, TkEventType eventType) { | |
67 | + validate(entityId,eventId); | |
68 | + return eventsDao.findAll(tenantId, entityId, eventType); | |
69 | + } | |
70 | + | |
71 | + @Override | |
72 | + public List<String> findAllKeysByDeviceProfileId(TenantId tenantId, DeviceProfileId deviceProfileId) { | |
73 | + return eventsDao.findAllKeysByDeviceProfileId(tenantId, deviceProfileId); | |
74 | + } | |
75 | + | |
76 | + @Override | |
77 | + public List<String> findAllKeysByEntityIds(TenantId tenantId, EntityType entityType, List<EntityId> entityIds) { | |
78 | + return eventsDao.findAllKeysByEntityIds(tenantId, entityType, entityIds); | |
79 | + } | |
80 | + | |
81 | + @Override | |
82 | + public ListenableFuture<List<Void>> save(TenantId tenantId, EntityId entityId, String eventId, TkEventType eventType, List<TkEventKvEntry> attributes) { | |
83 | + validate(entityId, eventId,eventType); | |
84 | + attributes.forEach(attribute -> validate(attribute)); | |
85 | + | |
86 | + List<ListenableFuture<Void>> saveFutures = attributes.stream().map(attribute -> eventsDao.save(tenantId, entityId, eventType,eventId, attribute)).collect(Collectors.toList()); | |
87 | + return Futures.allAsList(saveFutures); | |
88 | + } | |
89 | + | |
90 | + @Override | |
91 | + public ListenableFuture<List<Void>> removeAll(TenantId tenantId, EntityId entityId, String eventId, TkEventType eventType, List<String> eventKeys) { | |
92 | + validate(entityId, eventId); | |
93 | + return eventsDao.removeAll(tenantId, entityId, eventId,eventType, eventKeys); | |
94 | + } | |
95 | +} | ... | ... |
1 | +/** | |
2 | + * Copyright © 2016-2022 The Thingsboard Authors | |
3 | + * | |
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | + * you may not use this file except in compliance with the License. | |
6 | + * You may obtain a copy of the License at | |
7 | + * | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | + * | |
10 | + * Unless required by applicable law or agreed to in writing, software | |
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | |
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | + * See the License for the specific language governing permissions and | |
14 | + * limitations under the License. | |
15 | + */ | |
16 | +package org.thingsboard.server.dao.yunteng.jpa.dao.event; | |
17 | + | |
18 | +import org.thingsboard.server.common.data.id.EntityId; | |
19 | +import org.thingsboard.server.common.data.kv.AttributeKvEntry; | |
20 | +import org.thingsboard.server.common.data.yunteng.dto.TkEventKvEntry; | |
21 | +import org.thingsboard.server.common.data.yunteng.enums.TkEventType; | |
22 | +import org.thingsboard.server.dao.exception.IncorrectParameterException; | |
23 | +import org.thingsboard.server.dao.service.Validator; | |
24 | + | |
25 | +public class EventUtils { | |
26 | + public static void validate(EntityId id, String eventIdentifier) { | |
27 | + Validator.validateId(id.getId(), "Incorrect id " + id); | |
28 | + Validator.validateString(eventIdentifier, "Incorrect scope " + eventIdentifier); | |
29 | + } | |
30 | + | |
31 | + | |
32 | + public static void validate(EntityId id, String eventIdentifier, TkEventType eventType) { | |
33 | + Validator.validateId(id.getId(), "Incorrect id " + id); | |
34 | + Validator.validateString(eventIdentifier, "Incorrect scope " + eventIdentifier); | |
35 | + if(eventType == null){ | |
36 | + throw new IncorrectParameterException("eventType entry can't be null"); | |
37 | + } | |
38 | + } | |
39 | + | |
40 | + public static void validate(TkEventKvEntry kvEntry) { | |
41 | + if (kvEntry == null) { | |
42 | + throw new IncorrectParameterException("Key value entry can't be null"); | |
43 | + } else if (kvEntry.getDataType() == null) { | |
44 | + throw new IncorrectParameterException("Incorrect kvEntry. Data type can't be null"); | |
45 | + } else { | |
46 | + Validator.validateString(kvEntry.getKey(), "Incorrect kvEntry. Key can't be empty"); | |
47 | + Validator.validatePositiveNumber(kvEntry.getEventTime(), "Incorrect last update ts. Ts should be positive"); | |
48 | + } | |
49 | + } | |
50 | +} | ... | ... |
1 | +/** | |
2 | + * Copyright © 2016-2022 The Thingsboard Authors | |
3 | + * | |
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | + * you may not use this file except in compliance with the License. | |
6 | + * You may obtain a copy of the License at | |
7 | + * | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | + * | |
10 | + * Unless required by applicable law or agreed to in writing, software | |
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | |
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | + * See the License for the specific language governing permissions and | |
14 | + * limitations under the License. | |
15 | + */ | |
16 | +package org.thingsboard.server.dao.yunteng.jpa.dao.event; | |
17 | + | |
18 | +import com.google.common.util.concurrent.ListenableFuture; | |
19 | +import org.thingsboard.server.common.data.EntityType; | |
20 | +import org.thingsboard.server.common.data.id.DeviceProfileId; | |
21 | +import org.thingsboard.server.common.data.id.EntityId; | |
22 | +import org.thingsboard.server.common.data.id.TenantId; | |
23 | +import org.thingsboard.server.common.data.yunteng.dto.TkEventKvEntry; | |
24 | +import org.thingsboard.server.common.data.yunteng.enums.TkEventType; | |
25 | + | |
26 | +import java.util.Collection; | |
27 | +import java.util.List; | |
28 | +import java.util.Optional; | |
29 | + | |
30 | +/** | |
31 | + * @author Andrew Shvayka | |
32 | + */ | |
33 | +public interface EventsDao { | |
34 | + | |
35 | + ListenableFuture<Optional<TkEventKvEntry>> find(TenantId tenantId, EntityId entityId, TkEventType attributeType, String attributeKey); | |
36 | + | |
37 | + ListenableFuture<List<TkEventKvEntry>> find(TenantId tenantId, EntityId entityId, TkEventType attributeType, Collection<String> attributeKey); | |
38 | + | |
39 | + ListenableFuture<List<TkEventKvEntry>> findAll(TenantId tenantId, EntityId entityId, TkEventType attributeType); | |
40 | + | |
41 | + ListenableFuture<Void> save(TenantId tenantId, EntityId entityId, TkEventType attributeType,String eventIdentifier, TkEventKvEntry attribute); | |
42 | + | |
43 | + ListenableFuture<List<Void>> removeAll(TenantId tenantId, EntityId entityId, String eventId,TkEventType eventType,List<String> keys); | |
44 | + | |
45 | + List<String> findAllKeysByDeviceProfileId(TenantId tenantId, DeviceProfileId deviceProfileId); | |
46 | + | |
47 | + List<String> findAllKeysByEntityIds(TenantId tenantId, EntityType entityType, List<EntityId> entityIds); | |
48 | +} | ... | ... |
dao/src/main/java/org/thingsboard/server/dao/yunteng/jpa/entity/events/TkEventKvCompositeKey.java
0 → 100644
1 | +/** | |
2 | + * Copyright © 2016-2022 The Thingsboard Authors | |
3 | + * | |
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | + * you may not use this file except in compliance with the License. | |
6 | + * You may obtain a copy of the License at | |
7 | + * | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | + * | |
10 | + * Unless required by applicable law or agreed to in writing, software | |
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | |
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | + * See the License for the specific language governing permissions and | |
14 | + * limitations under the License. | |
15 | + */ | |
16 | +package org.thingsboard.server.dao.yunteng.jpa.entity.events; | |
17 | + | |
18 | +import lombok.AllArgsConstructor; | |
19 | +import lombok.Data; | |
20 | +import lombok.NoArgsConstructor; | |
21 | +import org.thingsboard.server.common.data.yunteng.enums.TkEventType; | |
22 | + | |
23 | +import javax.persistence.Column; | |
24 | +import javax.persistence.Embeddable; | |
25 | +import javax.persistence.EnumType; | |
26 | +import javax.persistence.Enumerated; | |
27 | +import java.io.Serializable; | |
28 | +import java.util.UUID; | |
29 | + | |
30 | +import static org.thingsboard.server.dao.model.ModelConstants.*; | |
31 | + | |
32 | +@Data | |
33 | +@AllArgsConstructor | |
34 | +@NoArgsConstructor | |
35 | +@Embeddable | |
36 | +public class TkEventKvCompositeKey implements Serializable { | |
37 | + @Enumerated(EnumType.STRING) | |
38 | + @Column(name = EVENT_TYPE_COLUMN) | |
39 | + private TkEventType eventType; | |
40 | + @Column(name = ENTITY_ID_COLUMN, columnDefinition = "uuid") | |
41 | + private UUID entityId; | |
42 | + @Column(name = EVENT_IDENTIFIER_COLUMN) | |
43 | + private String eventIdentifier; | |
44 | + @Column(name = EVENT_TIME_COLUMN) | |
45 | + private Long eventTime; | |
46 | + @Column(name = EVENT_KEY_COLUMN) | |
47 | + private String eventKey; | |
48 | +} | ... | ... |
dao/src/main/java/org/thingsboard/server/dao/yunteng/jpa/entity/events/TkEventKvEntity.java
0 → 100644
1 | +/** | |
2 | + * Copyright © 2016-2022 The Thingsboard Authors | |
3 | + * | |
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | + * you may not use this file except in compliance with the License. | |
6 | + * You may obtain a copy of the License at | |
7 | + * | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | + * | |
10 | + * Unless required by applicable law or agreed to in writing, software | |
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | |
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | + * See the License for the specific language governing permissions and | |
14 | + * limitations under the License. | |
15 | + */ | |
16 | +package org.thingsboard.server.dao.yunteng.jpa.entity.events; | |
17 | + | |
18 | +import lombok.Data; | |
19 | +import org.thingsboard.server.common.data.kv.*; | |
20 | +import org.thingsboard.server.common.data.yunteng.dto.TkEventKvDto; | |
21 | +import org.thingsboard.server.common.data.yunteng.dto.TkEventKvEntry; | |
22 | +import org.thingsboard.server.dao.model.ToData; | |
23 | + | |
24 | +import javax.persistence.Column; | |
25 | +import javax.persistence.EmbeddedId; | |
26 | +import javax.persistence.Entity; | |
27 | +import javax.persistence.Table; | |
28 | +import java.io.Serializable; | |
29 | +import java.util.UUID; | |
30 | + | |
31 | +import static org.thingsboard.server.dao.model.ModelConstants.*; | |
32 | + | |
33 | +@Data | |
34 | +@Entity | |
35 | +@Table(name = "tk_event_kv") | |
36 | +public class TkEventKvEntity implements ToData<TkEventKvEntry>, Serializable { | |
37 | + | |
38 | + @EmbeddedId | |
39 | + private TkEventKvCompositeKey id; | |
40 | + | |
41 | + @Column(name = BOOLEAN_VALUE_COLUMN) | |
42 | + private Boolean booleanValue; | |
43 | + | |
44 | + @Column(name = STRING_VALUE_COLUMN) | |
45 | + private String strValue; | |
46 | + | |
47 | + @Column(name = LONG_VALUE_COLUMN) | |
48 | + private Long longValue; | |
49 | + | |
50 | + @Column(name = DOUBLE_VALUE_COLUMN) | |
51 | + private Double doubleValue; | |
52 | + | |
53 | + @Column(name = JSON_VALUE_COLUMN) | |
54 | + private String jsonValue; | |
55 | + | |
56 | + | |
57 | + @Column(name = DEVICE_DEVICE_PROFILE_ID_PROPERTY, columnDefinition = "uuid") | |
58 | + private UUID deviceProfileId; | |
59 | + | |
60 | + @Override | |
61 | + public TkEventKvEntry toData() { | |
62 | + KvEntry kvEntry = null; | |
63 | + if (strValue != null) { | |
64 | + kvEntry = new StringDataEntry(id.getEventIdentifier(), strValue); | |
65 | + } else if (booleanValue != null) { | |
66 | + kvEntry = new BooleanDataEntry(id.getEventIdentifier(), booleanValue); | |
67 | + } else if (doubleValue != null) { | |
68 | + kvEntry = new DoubleDataEntry(id.getEventIdentifier(), doubleValue); | |
69 | + } else if (longValue != null) { | |
70 | + kvEntry = new LongDataEntry(id.getEventIdentifier(), longValue); | |
71 | + } else if (jsonValue != null) { | |
72 | + kvEntry = new JsonDataEntry(id.getEventIdentifier(), jsonValue); | |
73 | + } | |
74 | + | |
75 | + return new TkEventKvDto(kvEntry, id.getEventTime()); | |
76 | + } | |
77 | +} | ... | ... |
1 | +package org.thingsboard.server.dao.yunteng.jpa.repository.event; | |
2 | + | |
3 | +import lombok.extern.slf4j.Slf4j; | |
4 | +import org.springframework.beans.factory.annotation.Autowired; | |
5 | +import org.springframework.beans.factory.annotation.Value; | |
6 | +import org.springframework.jdbc.core.BatchPreparedStatementSetter; | |
7 | +import org.springframework.jdbc.core.JdbcTemplate; | |
8 | +import org.springframework.stereotype.Repository; | |
9 | +import org.springframework.transaction.TransactionStatus; | |
10 | +import org.springframework.transaction.support.TransactionCallbackWithoutResult; | |
11 | +import org.springframework.transaction.support.TransactionTemplate; | |
12 | +import org.thingsboard.server.dao.yunteng.jpa.entity.events.TkEventKvEntity; | |
13 | + | |
14 | +import java.sql.PreparedStatement; | |
15 | +import java.sql.SQLException; | |
16 | +import java.sql.Types; | |
17 | +import java.util.ArrayList; | |
18 | +import java.util.List; | |
19 | +import java.util.regex.Pattern; | |
20 | + | |
21 | +@Repository | |
22 | +@Slf4j | |
23 | +public abstract class EventKvInsertRepository { | |
24 | + | |
25 | + private static final ThreadLocal<Pattern> PATTERN_THREAD_LOCAL = ThreadLocal.withInitial(() -> Pattern.compile(String.valueOf(Character.MIN_VALUE))); | |
26 | + private static final String EMPTY_STR = ""; | |
27 | + | |
28 | + private static final String BATCH_UPDATE = "UPDATE tk_event_kv SET str_v = ?, long_v = ?, dbl_v = ?, bool_v = ?, json_v = cast(? AS json) " + | |
29 | + "WHERE event_key = ? and event_type = ? and entity_id = ? and event_identifier =? and event_time = ?;"; | |
30 | + | |
31 | + private static final String INSERT_OR_UPDATE = | |
32 | + "INSERT INTO tk_event_kv (event_type, entity_id, event_identifier, event_time, str_v, long_v, dbl_v, bool_v, json_v, event_key) " + | |
33 | + "VALUES(?, ?, ?, ?, ?, ?, ?, ?, cast(? AS json), ?) " + | |
34 | + "ON CONFLICT (event_type, entity_id, event_identifier, event_time, event_key) " + | |
35 | + "DO UPDATE SET str_v = ?, long_v = ?, dbl_v = ?, bool_v = ?, json_v = cast(? AS json), event_key = ?;"; | |
36 | + | |
37 | + @Autowired | |
38 | + protected JdbcTemplate jdbcTemplate; | |
39 | + | |
40 | + @Autowired | |
41 | + private TransactionTemplate transactionTemplate; | |
42 | + | |
43 | + @Value("${sql.remove_null_chars:true}") | |
44 | + private boolean removeNullChars; | |
45 | + | |
46 | + protected void saveOrUpdate(List<TkEventKvEntity> entities) { | |
47 | + transactionTemplate.execute(new TransactionCallbackWithoutResult() { | |
48 | + @Override | |
49 | + protected void doInTransactionWithoutResult(TransactionStatus status) { | |
50 | + int[] result = jdbcTemplate.batchUpdate(BATCH_UPDATE, new BatchPreparedStatementSetter() { | |
51 | + @Override | |
52 | + public void setValues(PreparedStatement ps, int i) throws SQLException { | |
53 | + TkEventKvEntity kvEntity = entities.get(i); | |
54 | + ps.setString(1, replaceNullChars(kvEntity.getStrValue())); | |
55 | + | |
56 | + if (kvEntity.getLongValue() != null) { | |
57 | + ps.setLong(2, kvEntity.getLongValue()); | |
58 | + } else { | |
59 | + ps.setNull(2, Types.BIGINT); | |
60 | + } | |
61 | + | |
62 | + if (kvEntity.getDoubleValue() != null) { | |
63 | + ps.setDouble(3, kvEntity.getDoubleValue()); | |
64 | + } else { | |
65 | + ps.setNull(3, Types.DOUBLE); | |
66 | + } | |
67 | + | |
68 | + if (kvEntity.getBooleanValue() != null) { | |
69 | + ps.setBoolean(4, kvEntity.getBooleanValue()); | |
70 | + } else { | |
71 | + ps.setNull(4, Types.BOOLEAN); | |
72 | + } | |
73 | + | |
74 | + ps.setString(5, replaceNullChars(kvEntity.getJsonValue())); | |
75 | + | |
76 | + ps.setString(6, kvEntity.getId().getEventKey()); | |
77 | + ps.setString(7, kvEntity.getId().getEventType().name()); | |
78 | + ps.setObject(8, kvEntity.getId().getEntityId()); | |
79 | + ps.setString(9, kvEntity.getId().getEventIdentifier()); | |
80 | + ps.setLong(10, kvEntity.getId().getEventTime()); | |
81 | + } | |
82 | + | |
83 | + @Override | |
84 | + public int getBatchSize() { | |
85 | + return entities.size(); | |
86 | + } | |
87 | + }); | |
88 | + | |
89 | + int updatedCount = 0; | |
90 | + for (int i = 0; i < result.length; i++) { | |
91 | + if (result[i] == 0) { | |
92 | + updatedCount++; | |
93 | + } | |
94 | + } | |
95 | + | |
96 | + List<TkEventKvEntity> insertEntities = new ArrayList<>(updatedCount); | |
97 | + for (int i = 0; i < result.length; i++) { | |
98 | + if (result[i] == 0) { | |
99 | + insertEntities.add(entities.get(i)); | |
100 | + } | |
101 | + } | |
102 | + | |
103 | + jdbcTemplate.batchUpdate(INSERT_OR_UPDATE, new BatchPreparedStatementSetter() { | |
104 | + @Override | |
105 | + public void setValues(PreparedStatement ps, int i) throws SQLException { | |
106 | + TkEventKvEntity kvEntity = insertEntities.get(i); | |
107 | + ps.setString(1, kvEntity.getId().getEventType().name()); | |
108 | + ps.setObject(2, kvEntity.getId().getEntityId()); | |
109 | + ps.setString(3, kvEntity.getId().getEventIdentifier()); | |
110 | + ps.setLong(4, kvEntity.getId().getEventTime()); | |
111 | + | |
112 | + ps.setString(5, replaceNullChars(kvEntity.getStrValue())); | |
113 | + ps.setString(11, replaceNullChars(kvEntity.getStrValue())); | |
114 | + | |
115 | + if (kvEntity.getLongValue() != null) { | |
116 | + ps.setLong(6, kvEntity.getLongValue()); | |
117 | + ps.setLong(12, kvEntity.getLongValue()); | |
118 | + } else { | |
119 | + ps.setNull(6, Types.BIGINT); | |
120 | + ps.setNull(12, Types.BIGINT); | |
121 | + } | |
122 | + | |
123 | + if (kvEntity.getDoubleValue() != null) { | |
124 | + ps.setDouble(7, kvEntity.getDoubleValue()); | |
125 | + ps.setDouble(13, kvEntity.getDoubleValue()); | |
126 | + } else { | |
127 | + ps.setNull(7, Types.DOUBLE); | |
128 | + ps.setNull(13, Types.DOUBLE); | |
129 | + } | |
130 | + | |
131 | + if (kvEntity.getBooleanValue() != null) { | |
132 | + ps.setBoolean(8, kvEntity.getBooleanValue()); | |
133 | + ps.setBoolean(14, kvEntity.getBooleanValue()); | |
134 | + } else { | |
135 | + ps.setNull(8, Types.BOOLEAN); | |
136 | + ps.setNull(14, Types.BOOLEAN); | |
137 | + } | |
138 | + | |
139 | + ps.setString(9, replaceNullChars(kvEntity.getJsonValue())); | |
140 | + ps.setString(15, replaceNullChars(kvEntity.getJsonValue())); | |
141 | + | |
142 | + ps.setString(10, kvEntity.getId().getEventKey()); | |
143 | + ps.setString(16, kvEntity.getId().getEventKey()); | |
144 | + } | |
145 | + | |
146 | + @Override | |
147 | + public int getBatchSize() { | |
148 | + return insertEntities.size(); | |
149 | + } | |
150 | + }); | |
151 | + } | |
152 | + }); | |
153 | + } | |
154 | + | |
155 | + private String replaceNullChars(String strValue) { | |
156 | + if (removeNullChars && strValue != null) { | |
157 | + return PATTERN_THREAD_LOCAL.get().matcher(strValue).replaceAll(EMPTY_STR); | |
158 | + } | |
159 | + return strValue; | |
160 | + } | |
161 | +} | ... | ... |
dao/src/main/java/org/thingsboard/server/dao/yunteng/jpa/repository/event/EventKvRepository.java
0 → 100644
1 | +/** | |
2 | + * Copyright © 2016-2022 The Thingsboard Authors | |
3 | + * | |
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | + * you may not use this file except in compliance with the License. | |
6 | + * You may obtain a copy of the License at | |
7 | + * | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | + * | |
10 | + * Unless required by applicable law or agreed to in writing, software | |
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | |
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | + * See the License for the specific language governing permissions and | |
14 | + * limitations under the License. | |
15 | + */ | |
16 | +package org.thingsboard.server.dao.yunteng.jpa.repository.event; | |
17 | + | |
18 | +import org.springframework.data.jpa.repository.Modifying; | |
19 | +import org.springframework.data.jpa.repository.Query; | |
20 | +import org.springframework.data.repository.CrudRepository; | |
21 | +import org.springframework.data.repository.query.Param; | |
22 | +import org.springframework.transaction.annotation.Transactional; | |
23 | +import org.thingsboard.server.common.data.yunteng.enums.TkEventType; | |
24 | +import org.thingsboard.server.dao.yunteng.jpa.entity.events.TkEventKvCompositeKey; | |
25 | +import org.thingsboard.server.dao.yunteng.jpa.entity.events.TkEventKvEntity; | |
26 | + | |
27 | +import java.util.List; | |
28 | +import java.util.UUID; | |
29 | + | |
30 | +public interface EventKvRepository extends CrudRepository<TkEventKvEntity, TkEventKvCompositeKey> { | |
31 | + | |
32 | + @Query("SELECT a FROM TkEventKvEntity a WHERE a.id.eventType = :eventType " + | |
33 | + "AND a.id.entityId = :entityId " ) | |
34 | + List<TkEventKvEntity> findAllByEventTypeAndEntityId(@Param("eventType") TkEventType eventType, | |
35 | + @Param("entityId") UUID entityId); | |
36 | + | |
37 | + @Transactional | |
38 | + @Modifying | |
39 | + @Query("DELETE FROM TkEventKvEntity a WHERE a.id.eventType = :eventType " + | |
40 | + "AND a.id.entityId = :entityId " + | |
41 | + "AND a.id.eventIdentifier = :eventIdentifier " + | |
42 | + "AND a.id.eventKey = :eventKey") | |
43 | + void delete(@Param("eventType") TkEventType eventType, | |
44 | + @Param("entityId") UUID entityId, | |
45 | + @Param("eventIdentifier") String eventIdentifier, | |
46 | + @Param("eventKey") String eventKey); | |
47 | + | |
48 | + @Query(value = "SELECT DISTINCT event_key FROM tk_event_kv WHERE entity_id " + | |
49 | + " in (SELECT id FROM device WHERE tenant_id = :tenantId and device_profile_id = :deviceProfileId limit 100) ORDER BY event_key", nativeQuery = true) | |
50 | + List<String> findAllKeysByDeviceProfileId(@Param("tenantId") UUID tenantId, | |
51 | + @Param("deviceProfileId") UUID deviceProfileId); | |
52 | + | |
53 | + @Query(value = "SELECT DISTINCT event_key FROM tk_event_kv WHERE entity_id " + | |
54 | + " in (SELECT id FROM device WHERE tenant_id = :tenantId limit 100) ORDER BY event_key", nativeQuery = true) | |
55 | + List<String> findAllKeysByTenantId(@Param("tenantId") UUID tenantId); | |
56 | + | |
57 | + @Query(value = "SELECT DISTINCT event_key FROM tk_event_kv WHERE entity_id " + | |
58 | + " in :entityIds ORDER BY event_key", nativeQuery = true) | |
59 | + List<String> findAllKeysByEntityIds(@Param("entityIds") List<UUID> entityIds); | |
60 | +} | |
61 | + | ... | ... |
1 | +/** | |
2 | + * Copyright © 2016-2022 The Thingsboard Authors | |
3 | + * | |
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | + * you may not use this file except in compliance with the License. | |
6 | + * You may obtain a copy of the License at | |
7 | + * | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | + * | |
10 | + * Unless required by applicable law or agreed to in writing, software | |
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | |
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | + * See the License for the specific language governing permissions and | |
14 | + * limitations under the License. | |
15 | + */ | |
16 | +package org.thingsboard.server.dao.yunteng.jpa.repository.event; | |
17 | + | |
18 | +import org.springframework.stereotype.Repository; | |
19 | +import org.springframework.transaction.annotation.Transactional; | |
20 | +import org.thingsboard.server.dao.util.HsqlDao; | |
21 | +import org.thingsboard.server.dao.yunteng.jpa.entity.events.TkEventKvEntity; | |
22 | + | |
23 | +import java.sql.Types; | |
24 | +import java.util.List; | |
25 | + | |
26 | +@HsqlDao | |
27 | +@Repository | |
28 | +@Transactional | |
29 | +public class HsqlEventsInsertRepository extends EventKvInsertRepository { | |
30 | + | |
31 | + private static final String INSERT_OR_UPDATE = | |
32 | + "MERGE INTO tk_event_kv USING(VALUES ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) " + | |
33 | + "A (event_type, entity_id, event_identifier, event_key, str_v, long_v, dbl_v, bool_v, json_v, event_key) " + | |
34 | + "ON (tk_event_kv.event_type=A.event_type " + | |
35 | + "AND tk_event_kv.entity_id=A.entity_id " + | |
36 | + "AND tk_event_kv.event_identifier=A.event_identifier " + | |
37 | + "AND tk_event_kv.event_time=A.event_time) " + | |
38 | + "AND tk_event_kv.event_key=A.event_key) " + | |
39 | + "WHEN MATCHED THEN UPDATE SET tk_event_kv.str_v = A.str_v, tk_event_kv.long_v = A.long_v, tk_event_kv.dbl_v = A.dbl_v, tk_event_kv.bool_v = A.bool_v, tk_event_kv.json_v = A.json_v " + | |
40 | + "WHEN NOT MATCHED THEN INSERT (event_type, entity_id, event_identifier, event_time, str_v, long_v, dbl_v, bool_v, json_v, event_key) " + | |
41 | + "VALUES (A.event_type, A.entity_id, A.event_identifier, A.event_time, A.str_v, A.long_v, A.dbl_v, A.bool_v, A.json_v, A.event_key)"; | |
42 | + | |
43 | + @Override | |
44 | + protected void saveOrUpdate(List<TkEventKvEntity> entities) { | |
45 | + entities.forEach(entity -> { | |
46 | + jdbcTemplate.update(INSERT_OR_UPDATE, ps -> { | |
47 | + ps.setString(1, entity.getId().getEventType().name()); | |
48 | + ps.setObject(2, entity.getId().getEntityId()); | |
49 | + ps.setString(3, entity.getId().getEventIdentifier()); | |
50 | + ps.setLong(4, entity.getId().getEventTime()); | |
51 | + ps.setString(5, entity.getStrValue()); | |
52 | + | |
53 | + if (entity.getLongValue() != null) { | |
54 | + ps.setLong(6, entity.getLongValue()); | |
55 | + } else { | |
56 | + ps.setNull(6, Types.BIGINT); | |
57 | + } | |
58 | + | |
59 | + if (entity.getDoubleValue() != null) { | |
60 | + ps.setDouble(7, entity.getDoubleValue()); | |
61 | + } else { | |
62 | + ps.setNull(7, Types.DOUBLE); | |
63 | + } | |
64 | + | |
65 | + if (entity.getBooleanValue() != null) { | |
66 | + ps.setBoolean(8, entity.getBooleanValue()); | |
67 | + } else { | |
68 | + ps.setNull(8, Types.BOOLEAN); | |
69 | + } | |
70 | + | |
71 | + ps.setString(9, entity.getJsonValue()); | |
72 | + | |
73 | + ps.setString(10, entity.getId().getEventKey()); | |
74 | + }); | |
75 | + }); | |
76 | + } | |
77 | +} | ... | ... |
dao/src/main/java/org/thingsboard/server/dao/yunteng/jpa/repository/event/JpaEventDao.java
0 → 100644
1 | +/** | |
2 | + * Copyright © 2016-2022 The Thingsboard Authors | |
3 | + * | |
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | + * you may not use this file except in compliance with the License. | |
6 | + * You may obtain a copy of the License at | |
7 | + * | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | + * | |
10 | + * Unless required by applicable law or agreed to in writing, software | |
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | |
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | + * See the License for the specific language governing permissions and | |
14 | + * limitations under the License. | |
15 | + */ | |
16 | +package org.thingsboard.server.dao.yunteng.jpa.repository.event; | |
17 | + | |
18 | +import com.google.common.collect.Lists; | |
19 | +import com.google.common.util.concurrent.Futures; | |
20 | +import com.google.common.util.concurrent.ListenableFuture; | |
21 | +import lombok.extern.slf4j.Slf4j; | |
22 | +import org.springframework.beans.factory.annotation.Autowired; | |
23 | +import org.springframework.beans.factory.annotation.Value; | |
24 | +import org.springframework.stereotype.Component; | |
25 | +import org.thingsboard.server.common.data.EntityType; | |
26 | +import org.thingsboard.server.common.data.id.DeviceProfileId; | |
27 | +import org.thingsboard.server.common.data.id.EntityId; | |
28 | +import org.thingsboard.server.common.data.id.TenantId; | |
29 | +import org.thingsboard.server.common.data.yunteng.dto.TkEventKvEntry; | |
30 | +import org.thingsboard.server.common.data.yunteng.enums.TkEventType; | |
31 | +import org.thingsboard.server.common.stats.StatsFactory; | |
32 | +import org.thingsboard.server.dao.DaoUtil; | |
33 | +import org.thingsboard.server.dao.sql.JpaAbstractDaoListeningExecutorService; | |
34 | +import org.thingsboard.server.dao.sql.ScheduledLogExecutorComponent; | |
35 | +import org.thingsboard.server.dao.sql.TbSqlBlockingQueueParams; | |
36 | +import org.thingsboard.server.dao.sql.TbSqlBlockingQueueWrapper; | |
37 | +import org.thingsboard.server.dao.yunteng.jpa.entity.events.TkEventKvCompositeKey; | |
38 | +import org.thingsboard.server.dao.yunteng.jpa.entity.events.TkEventKvEntity; | |
39 | +import org.thingsboard.server.dao.yunteng.jpa.dao.event.EventsDao; | |
40 | + | |
41 | +import javax.annotation.PostConstruct; | |
42 | +import javax.annotation.PreDestroy; | |
43 | +import java.util.Collection; | |
44 | +import java.util.Comparator; | |
45 | +import java.util.List; | |
46 | +import java.util.Optional; | |
47 | +import java.util.function.Function; | |
48 | +import java.util.stream.Collectors; | |
49 | + | |
50 | +@Component | |
51 | +@Slf4j | |
52 | +public class JpaEventDao extends JpaAbstractDaoListeningExecutorService implements EventsDao { | |
53 | + | |
54 | + @Autowired | |
55 | + ScheduledLogExecutorComponent logExecutor; | |
56 | + | |
57 | + @Autowired | |
58 | + private EventKvRepository eventKvRepository; | |
59 | + | |
60 | + @Autowired | |
61 | + private EventKvInsertRepository eventKvInsertRepository; | |
62 | + | |
63 | + @Autowired | |
64 | + private StatsFactory statsFactory; | |
65 | + | |
66 | + @Value("${sql.attributes.batch_size:1000}") | |
67 | + private int batchSize; | |
68 | + | |
69 | + @Value("${sql.attributes.batch_max_delay:100}") | |
70 | + private long maxDelay; | |
71 | + | |
72 | + @Value("${sql.attributes.stats_print_interval_ms:1000}") | |
73 | + private long statsPrintIntervalMs; | |
74 | + | |
75 | + @Value("${sql.attributes.batch_threads:4}") | |
76 | + private int batchThreads; | |
77 | + | |
78 | + @Value("${sql.batch_sort:false}") | |
79 | + private boolean batchSortEnabled; | |
80 | + | |
81 | + private TbSqlBlockingQueueWrapper<TkEventKvEntity> queue; | |
82 | + | |
83 | + @PostConstruct | |
84 | + private void init() { | |
85 | + TbSqlBlockingQueueParams params = TbSqlBlockingQueueParams.builder() | |
86 | + .logName("Events") | |
87 | + .batchSize(batchSize) | |
88 | + .maxDelay(maxDelay) | |
89 | + .statsPrintIntervalMs(statsPrintIntervalMs) | |
90 | + .statsNamePrefix("events") | |
91 | + .batchSortEnabled(batchSortEnabled) | |
92 | + .build(); | |
93 | + | |
94 | + Function<TkEventKvEntity, Integer> hashcodeFunction = entity -> entity.getId().getEntityId().hashCode(); | |
95 | + queue = new TbSqlBlockingQueueWrapper<>(params, hashcodeFunction, batchThreads, statsFactory); | |
96 | + queue.init(logExecutor, v -> eventKvInsertRepository.saveOrUpdate(v), | |
97 | + Comparator.comparing((TkEventKvEntity attributeKvEntity) -> attributeKvEntity.getId().getEntityId()) | |
98 | + .thenComparing(attributeKvEntity -> attributeKvEntity.getId().getEventType().name()) | |
99 | + .thenComparing(attributeKvEntity -> attributeKvEntity.getId().getEventTime()) | |
100 | + .thenComparing(attributeKvEntity -> attributeKvEntity.getId().getEventIdentifier()) | |
101 | + ); | |
102 | + } | |
103 | + | |
104 | + @PreDestroy | |
105 | + private void destroy() { | |
106 | + if (queue != null) { | |
107 | + queue.destroy(); | |
108 | + } | |
109 | + } | |
110 | + | |
111 | + @Override | |
112 | + public ListenableFuture<Optional<TkEventKvEntry>> find(TenantId tenantId, EntityId entityId, TkEventType eventType, String attributeKey) { | |
113 | + TkEventKvCompositeKey compositeKey = | |
114 | + getEventKvCompositeKey(eventType,entityId,null, attributeKey,null); | |
115 | + return Futures.immediateFuture( | |
116 | + Optional.ofNullable(DaoUtil.getData(eventKvRepository.findById(compositeKey)))); | |
117 | + } | |
118 | + | |
119 | + @Override | |
120 | + public ListenableFuture<List<TkEventKvEntry>> find(TenantId tenantId, EntityId entityId, TkEventType eventType, Collection<String> attributeKeys) { | |
121 | + List<TkEventKvCompositeKey> compositeKeys = | |
122 | + attributeKeys | |
123 | + .stream() | |
124 | + .map(attributeKey -> | |
125 | + getEventKvCompositeKey(eventType,entityId, null,attributeKey,null)) | |
126 | + .collect(Collectors.toList()); | |
127 | + return Futures.immediateFuture( | |
128 | + DaoUtil.convertDataList(Lists.newArrayList(eventKvRepository.findAllById(compositeKeys)))); | |
129 | + } | |
130 | + | |
131 | + @Override | |
132 | + public ListenableFuture<List<TkEventKvEntry>> findAll(TenantId tenantId, EntityId entityId, TkEventType eventType) { | |
133 | + return Futures.immediateFuture( | |
134 | + DaoUtil.convertDataList(Lists.newArrayList( | |
135 | + eventKvRepository.findAllByEventTypeAndEntityId( | |
136 | + eventType, | |
137 | + entityId.getId())))); | |
138 | + } | |
139 | + | |
140 | + @Override | |
141 | + public List<String> findAllKeysByDeviceProfileId(TenantId tenantId, DeviceProfileId deviceProfileId) { | |
142 | + if (deviceProfileId != null) { | |
143 | + return eventKvRepository.findAllKeysByDeviceProfileId(tenantId.getId(), deviceProfileId.getId()); | |
144 | + } else { | |
145 | + return eventKvRepository.findAllKeysByTenantId(tenantId.getId()); | |
146 | + } | |
147 | + } | |
148 | + | |
149 | + @Override | |
150 | + public List<String> findAllKeysByEntityIds(TenantId tenantId, EntityType entityType, List<EntityId> entityIds) { | |
151 | + return eventKvRepository | |
152 | + .findAllKeysByEntityIds(entityIds.stream().map(EntityId::getId).collect(Collectors.toList())); | |
153 | + } | |
154 | + | |
155 | + @Override | |
156 | + public ListenableFuture<Void> save(TenantId tenantId, EntityId entityId,TkEventType eventType, String eventIdentifier, TkEventKvEntry attribute) { | |
157 | + TkEventKvEntity entity = new TkEventKvEntity(); | |
158 | + entity.setId(new TkEventKvCompositeKey(eventType, entityId.getId(), eventIdentifier, attribute.getEventTime(),attribute.getKey())); | |
159 | + | |
160 | +// entity.setDeviceProfileId(); | |
161 | + entity.setStrValue(attribute.getStrValue().orElse(null)); | |
162 | + entity.setDoubleValue(attribute.getDoubleValue().orElse(null)); | |
163 | + entity.setLongValue(attribute.getLongValue().orElse(null)); | |
164 | + entity.setBooleanValue(attribute.getBooleanValue().orElse(null)); | |
165 | + entity.setJsonValue(attribute.getJsonValue().orElse(null)); | |
166 | + return addToQueue(entity); | |
167 | + } | |
168 | + | |
169 | + private ListenableFuture<Void> addToQueue(TkEventKvEntity entity) { | |
170 | + return queue.add(entity); | |
171 | + } | |
172 | + | |
173 | + @Override | |
174 | + public ListenableFuture<List<Void>> removeAll(TenantId tenantId, EntityId entityId, String eventId,TkEventType eventType,List<String> keys) { | |
175 | + return service.submit(() -> { | |
176 | + keys.forEach(key -> | |
177 | + eventKvRepository.delete(eventType,entityId.getId(), eventId,key) | |
178 | + ); | |
179 | + return null; | |
180 | + }); | |
181 | + } | |
182 | + | |
183 | + private TkEventKvCompositeKey getEventKvCompositeKey(TkEventType eventType,EntityId entityId, String eventIdentifier,String eventKey, Long eventTime) { | |
184 | + return new TkEventKvCompositeKey( | |
185 | + eventType, | |
186 | + entityId.getId(), | |
187 | + eventIdentifier, | |
188 | + eventTime, | |
189 | + eventKey); | |
190 | + } | |
191 | +} | ... | ... |
1 | +/** | |
2 | + * Copyright © 2016-2022 The Thingsboard Authors | |
3 | + * | |
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | + * you may not use this file except in compliance with the License. | |
6 | + * You may obtain a copy of the License at | |
7 | + * | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | + * | |
10 | + * Unless required by applicable law or agreed to in writing, software | |
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | |
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | + * See the License for the specific language governing permissions and | |
14 | + * limitations under the License. | |
15 | + */ | |
16 | +package org.thingsboard.server.dao.yunteng.jpa.repository.event; | |
17 | + | |
18 | +import org.springframework.stereotype.Repository; | |
19 | +import org.springframework.transaction.annotation.Transactional; | |
20 | +import org.thingsboard.server.dao.util.PsqlDao; | |
21 | + | |
22 | +@PsqlDao | |
23 | +@Repository | |
24 | +@Transactional | |
25 | +public class PsqlEventsInsertRepository extends EventKvInsertRepository { | |
26 | + | |
27 | +} | ... | ... |
... | ... | @@ -22,6 +22,8 @@ import org.thingsboard.server.common.data.id.TenantId; |
22 | 22 | import org.thingsboard.server.common.data.kv.AttributeKvEntry; |
23 | 23 | import org.thingsboard.server.common.data.kv.DeleteTsKvQuery; |
24 | 24 | import org.thingsboard.server.common.data.kv.TsKvEntry; |
25 | +import org.thingsboard.server.common.data.yunteng.dto.TkEventKvEntry; | |
26 | +import org.thingsboard.server.common.data.yunteng.enums.TkEventType; | |
25 | 27 | |
26 | 28 | import java.util.Collection; |
27 | 29 | import java.util.List; |
... | ... | @@ -41,6 +43,9 @@ public interface RuleEngineTelemetryService { |
41 | 43 | |
42 | 44 | void saveAndNotify(TenantId tenantId, EntityId entityId, String scope, List<AttributeKvEntry> attributes, boolean notifyDevice, FutureCallback<Void> callback); |
43 | 45 | |
46 | + //Thingskit function | |
47 | + void saveAndNotify(TenantId tenantId, EntityId entityId, String eventId, TkEventType eventType, List<TkEventKvEntry> attributes, FutureCallback<Void> callback); | |
48 | + | |
44 | 49 | void saveLatestAndNotify(TenantId tenantId, EntityId entityId, List<TsKvEntry> ts, FutureCallback<Void> callback); |
45 | 50 | |
46 | 51 | void saveAttrAndNotify(TenantId tenantId, EntityId entityId, String scope, String key, long value, FutureCallback<Void> callback); | ... | ... |
... | ... | @@ -33,7 +33,7 @@ import org.thingsboard.server.common.msg.session.SessionMsgType; |
33 | 33 | type = ComponentType.FILTER, |
34 | 34 | name = "message type switch", |
35 | 35 | configClazz = EmptyNodeConfiguration.class, |
36 | - relationTypes = {"Post attributes", "Post telemetry", "RPC Request from Device", "RPC Request to Device", "RPC Queued", "RPC Sent", "RPC Delivered", "RPC Successful", "RPC Timeout", "RPC Expired", "RPC Failed", "RPC Deleted", | |
36 | + relationTypes = {"Post event","Post attributes", "Post telemetry", "RPC Request from Device", "RPC Request to Device", "RPC Queued", "RPC Sent", "RPC Delivered", "RPC Successful", "RPC Timeout", "RPC Expired", "RPC Failed", "RPC Deleted", | |
37 | 37 | "Activity Event", "Inactivity Event", "Connect Event", "Disconnect Event", "Entity Created", "Entity Updated", "Entity Deleted", "Entity Assigned", |
38 | 38 | "Entity Unassigned", "Attributes Updated", "Attributes Deleted", "Alarm Acknowledged", "Alarm Cleared", "Other", "Entity Assigned From Tenant", "Entity Assigned To Tenant", |
39 | 39 | "Timeseries Updated", "Timeseries Deleted"}, |
... | ... | @@ -57,7 +57,14 @@ public class TbMsgTypeSwitchNode implements TbNode { |
57 | 57 | relationType = "Post attributes"; |
58 | 58 | } else if (msg.getType().equals(SessionMsgType.POST_TELEMETRY_REQUEST.name())) { |
59 | 59 | relationType = "Post telemetry"; |
60 | - } else if (msg.getType().equals(SessionMsgType.TO_SERVER_RPC_REQUEST.name())) { | |
60 | + } | |
61 | + | |
62 | + //Thingskit function | |
63 | + else if (msg.getType().equals(SessionMsgType.POST_EVENT_REQUEST.name())) { | |
64 | + relationType = "Post event"; | |
65 | + } | |
66 | + | |
67 | + else if (msg.getType().equals(SessionMsgType.TO_SERVER_RPC_REQUEST.name())) { | |
61 | 68 | relationType = "RPC Request from Device"; |
62 | 69 | } else if (msg.getType().equals(DataConstants.ACTIVITY_EVENT)) { |
63 | 70 | relationType = "Activity Event"; | ... | ... |
1 | +/** | |
2 | + * Copyright © 2016-2022 The Thingsboard Authors | |
3 | + * | |
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | + * you may not use this file except in compliance with the License. | |
6 | + * You may obtain a copy of the License at | |
7 | + * | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | + * | |
10 | + * Unless required by applicable law or agreed to in writing, software | |
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | |
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | + * See the License for the specific language governing permissions and | |
14 | + * limitations under the License. | |
15 | + */ | |
16 | +package org.thingsboard.rule.engine.yunteng.event; | |
17 | + | |
18 | +import com.google.gson.JsonParser; | |
19 | +import lombok.extern.slf4j.Slf4j; | |
20 | +import org.apache.commons.lang3.StringUtils; | |
21 | +import org.thingsboard.rule.engine.api.*; | |
22 | +import org.thingsboard.rule.engine.api.util.TbNodeUtils; | |
23 | +import org.thingsboard.rule.engine.telemetry.TelemetryNodeCallback; | |
24 | +import org.thingsboard.server.common.data.kv.AttributeKvEntry; | |
25 | +import org.thingsboard.server.common.data.plugin.ComponentType; | |
26 | +import org.thingsboard.server.common.data.yunteng.dto.TkEventKvEntry; | |
27 | +import org.thingsboard.server.common.data.yunteng.enums.TkEventType; | |
28 | +import org.thingsboard.server.common.msg.TbMsg; | |
29 | +import org.thingsboard.server.common.msg.session.SessionMsgType; | |
30 | +import org.thingsboard.server.common.transport.adaptor.JsonConverter; | |
31 | + | |
32 | +import java.util.ArrayList; | |
33 | +import java.util.Set; | |
34 | + | |
35 | +import static org.thingsboard.server.dao.model.ModelConstants.EVENT_IDENTIFIER_COLUMN; | |
36 | +import static org.thingsboard.server.dao.model.ModelConstants.EVENT_TYPE_COLUMN; | |
37 | + | |
38 | +@Slf4j | |
39 | +@RuleNode( | |
40 | + type = ComponentType.ACTION, | |
41 | + name = "save event", | |
42 | + configClazz = TkMsgEventNodeConfiguration.class, | |
43 | + nodeDescription = "Saves device event data", | |
44 | + nodeDetails = "Saves entity event . Expects messages with 'POST_EVENT_REQUEST' message type", | |
45 | + uiResources = {"static/rulenode/rulenode-core-config.js"}, | |
46 | + configDirective = "tkMsgEventNodeConfiguration", | |
47 | + icon = "file_upload" | |
48 | +) | |
49 | +public class TkMsgEventNode implements TbNode { | |
50 | + | |
51 | + private TkMsgEventNodeConfiguration config; | |
52 | + | |
53 | + @Override | |
54 | + public void init(TbContext ctx, TbNodeConfiguration configuration) throws TbNodeException { | |
55 | + this.config = TbNodeUtils.convert(configuration, TkMsgEventNodeConfiguration.class); | |
56 | + } | |
57 | + | |
58 | + @Override | |
59 | + public void onMsg(TbContext ctx, TbMsg msg) { | |
60 | + if (!msg.getType().equals(SessionMsgType.POST_EVENT_REQUEST.name())) { | |
61 | + ctx.tellFailure(msg, new IllegalArgumentException("Unsupported msg type: " + msg.getType())); | |
62 | + return; | |
63 | + } | |
64 | + String src = msg.getData(); | |
65 | + Set<TkEventKvEntry> events = JsonConverter.convertToEvents(new JsonParser().parse(src)); | |
66 | + String eventIdentifier = msg.getMetaData().getValue(EVENT_IDENTIFIER_COLUMN); | |
67 | + TkEventType eventType = TkEventType.valueOf(msg.getMetaData().getValue(EVENT_TYPE_COLUMN)); | |
68 | + ctx.getTelemetryService().saveAndNotify( | |
69 | + ctx.getTenantId(), | |
70 | + msg.getOriginator(), | |
71 | + eventIdentifier, | |
72 | + eventType, | |
73 | + new ArrayList<>(events), | |
74 | + new TelemetryNodeCallback(ctx, msg) | |
75 | + ); | |
76 | + } | |
77 | + | |
78 | + @Override | |
79 | + public void destroy() { | |
80 | + } | |
81 | + | |
82 | +} | ... | ... |
1 | +/** | |
2 | + * Copyright © 2016-2022 The Thingsboard Authors | |
3 | + * | |
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | + * you may not use this file except in compliance with the License. | |
6 | + * You may obtain a copy of the License at | |
7 | + * | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | + * | |
10 | + * Unless required by applicable law or agreed to in writing, software | |
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | |
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | + * See the License for the specific language governing permissions and | |
14 | + * limitations under the License. | |
15 | + */ | |
16 | +package org.thingsboard.rule.engine.yunteng.event; | |
17 | + | |
18 | +import lombok.Data; | |
19 | +import org.thingsboard.rule.engine.api.NodeConfiguration; | |
20 | +import org.thingsboard.server.common.data.DataConstants; | |
21 | + | |
22 | +@Data | |
23 | +public class TkMsgEventNodeConfiguration implements NodeConfiguration<TkMsgEventNodeConfiguration> { | |
24 | + | |
25 | + | |
26 | + private boolean useServerTs; | |
27 | + @Override | |
28 | + public TkMsgEventNodeConfiguration defaultConfiguration() { | |
29 | + TkMsgEventNodeConfiguration configuration = new TkMsgEventNodeConfiguration(); | |
30 | + configuration.setUseServerTs(true); | |
31 | + return configuration; | |
32 | + } | |
33 | +} | ... | ... |