Commit 39b75022616ab3299e3ef25d2c910b9f9a96ee88
1 parent
3de0b2c6
fix(DEFECT-1212): TCP脚本刷新问题
1、TCP脚本服务启动时缓存到脚本引擎。 2、脚本启用禁用时通知设备接入服务刷新脚本引擎。 3、设备接入服务使用事件方式刷新缓存 4、脚本初始化成功后才处理消息
Showing
16 changed files
with
273 additions
and
131 deletions
... | ... | @@ -95,16 +95,11 @@ public class TkDeviceScriptController extends BaseController { |
95 | 95 | return ResponseEntity.ok(false); |
96 | 96 | } |
97 | 97 | |
98 | - /**脚本引擎启用/禁用时,刷新规则引擎中缓存的脚本ID对应的脚本内容,禁用时为默认脚本*/ | |
99 | -// List<DeviceProfileDTO> usedProfiles = ytDeviceProfileService.findDeviceProfile(tenantId, id,null); | |
100 | -// for (DeviceProfileDTO profile : usedProfiles) { | |
101 | -// DeviceProfile tbDeviceProfile = | |
102 | -// buildTbDeviceProfileFromDeviceProfileDTO( | |
103 | -// profile, id,scriptDTO.getScriptType()); | |
104 | -// updateTbDeviceProfile(tbDeviceProfile); | |
105 | -// } | |
106 | - | |
98 | + /**脚本引擎启用/禁用时,刷新设备接入服务中的脚本,禁用时使用默认脚本*/ | |
107 | 99 | scriptService.updateScriptStatus(tenantId, id, status); |
100 | + scriptService.getDeviceScript(tenantId, id).ifPresent(i->{ | |
101 | + tbClusterService.onJsScriptChange(i,null); | |
102 | + }); | |
108 | 103 | return ResponseEntity.ok(true); |
109 | 104 | } |
110 | 105 | ... | ... |
... | ... | @@ -27,6 +27,8 @@ import org.thingsboard.server.cluster.TbClusterService; |
27 | 27 | import org.thingsboard.server.common.data.EdgeUtils; |
28 | 28 | import org.thingsboard.server.common.data.edge.EdgeEventActionType; |
29 | 29 | import org.thingsboard.server.common.data.edge.EdgeEventType; |
30 | +import org.thingsboard.server.common.data.yunteng.dto.TkDeviceScriptDTO; | |
31 | +import org.thingsboard.server.common.data.yunteng.id.ScriptId; | |
30 | 32 | import org.thingsboard.server.common.msg.ToDeviceActorNotificationMsg; |
31 | 33 | import org.thingsboard.server.common.data.ApiUsageState; |
32 | 34 | import org.thingsboard.server.common.data.Device; |
... | ... | @@ -473,4 +475,12 @@ public class DefaultTbClusterService implements TbClusterService { |
473 | 475 | break; |
474 | 476 | } |
475 | 477 | } |
478 | + | |
479 | + | |
480 | + //Thingskit function | |
481 | + @Override | |
482 | + public void onJsScriptChange(TkDeviceScriptDTO script, TbQueueCallback callback) { | |
483 | + broadcastEntityChangeToTransport( | |
484 | + new TenantId(UUID.fromString(script.getTenantId())), new ScriptId(UUID.fromString(script.getId())), script, callback); | |
485 | + } | |
476 | 486 | } | ... | ... |
... | ... | @@ -679,30 +679,22 @@ public class DefaultTransportApiService implements TransportApiService { |
679 | 679 | |
680 | 680 | //Thingskit function |
681 | 681 | private ListenableFuture<TransportApiResponseMsg> handle(TransportProtos.ScriptProto requestMsg) { |
682 | - | |
683 | - int type = requestMsg.getFunctionType(); | |
684 | - UUID scriptId = getUUID(requestMsg.getScriptIdLSB(),requestMsg.getScriptIdMSB()); | |
685 | - | |
686 | - List<TkDeviceScriptDTO> allScriptes = scriptService.getScriptes(Arrays.stream(TkScriptFunctionType.values()).filter(f->f.ordinal()==type).findFirst().get(),null,null,scriptId); | |
682 | + List<TkDeviceScriptDTO> allScriptes = scriptService.getScriptes(); | |
687 | 683 | TransportApiResponseMsg.Builder responseBuilder = TransportApiResponseMsg.newBuilder(); |
688 | - allScriptes.forEach(item->{ | |
689 | - UUID tenantId = UUID.fromString(item.getTenantId()); | |
690 | - UUID id = UUID.fromString(item.getId()); | |
691 | - responseBuilder.addScriptsResponseMsg(TransportProtos.ScriptProto.newBuilder() | |
692 | - .setConvertJs(item.getConvertJs()) | |
693 | - .setTenantIdLSB(tenantId.getLeastSignificantBits()) | |
694 | - .setTenantIdMSB(tenantId.getMostSignificantBits()) | |
695 | - .setScriptIdLSB(id.getLeastSignificantBits()) | |
696 | - .setScriptIdMSB(id.getMostSignificantBits()) | |
697 | - .setFunctionType(item.getScriptType().ordinal()) | |
698 | - ); | |
684 | + allScriptes.forEach( | |
685 | + item -> { | |
686 | + UUID tenantId = UUID.fromString(item.getTenantId()); | |
687 | + UUID id = UUID.fromString(item.getId()); | |
688 | + responseBuilder.addScriptsResponseMsg( | |
689 | + TransportProtos.ScriptProto.newBuilder() | |
690 | + .setConvertJs(item.getConvertJs()) | |
691 | + .setTenantIdLSB(tenantId.getLeastSignificantBits()) | |
692 | + .setTenantIdMSB(tenantId.getMostSignificantBits()) | |
693 | + .setScriptIdLSB(id.getLeastSignificantBits()) | |
694 | + .setScriptIdMSB(id.getMostSignificantBits()) | |
695 | + .setFunctionType(item.getScriptType().name()) | |
696 | + .setStatus(item.getStatus())); | |
699 | 697 | }); |
700 | 698 | return Futures.immediateFuture(responseBuilder.build()); |
701 | 699 | } |
702 | - private UUID getUUID(long lsb,long msb){ | |
703 | - if(lsb==0 && msb==0){ | |
704 | - return null; | |
705 | - } | |
706 | - return new UUID(msb,lsb); | |
707 | - } | |
708 | 700 | } | ... | ... |
... | ... | @@ -17,6 +17,7 @@ package org.thingsboard.server.cluster; |
17 | 17 | |
18 | 18 | import org.thingsboard.server.common.data.edge.EdgeEventActionType; |
19 | 19 | import org.thingsboard.server.common.data.edge.EdgeEventType; |
20 | +import org.thingsboard.server.common.data.yunteng.dto.TkDeviceScriptDTO; | |
20 | 21 | import org.thingsboard.server.common.msg.ToDeviceActorNotificationMsg; |
21 | 22 | import org.thingsboard.server.common.data.ApiUsageState; |
22 | 23 | import org.thingsboard.server.common.data.Device; |
... | ... | @@ -86,4 +87,8 @@ public interface TbClusterService { |
86 | 87 | void onEdgeEventUpdate(TenantId tenantId, EdgeId edgeId); |
87 | 88 | |
88 | 89 | void sendNotificationMsgToEdgeService(TenantId tenantId, EdgeId edgeId, EntityId entityId, String body, EdgeEventType type, EdgeEventActionType action); |
90 | + | |
91 | + | |
92 | + //Thingskit function | |
93 | + void onJsScriptChange(TkDeviceScriptDTO script, TbQueueCallback callback); | |
89 | 94 | } | ... | ... |
... | ... | @@ -789,8 +789,9 @@ message ScriptProto{ |
789 | 789 | int64 tenantIdLSB = 2; |
790 | 790 | // int64 projectIdMSB = 3; |
791 | 791 | // int64 projectIdLSB = 4; |
792 | - int64 scriptIdMSB = 5; | |
793 | - int64 scriptIdLSB = 6; | |
794 | - int32 functionType = 7; | |
795 | - string convertJs = 8; | |
792 | + int64 scriptIdMSB = 3; | |
793 | + int64 scriptIdLSB = 4; | |
794 | + string functionType = 5; | |
795 | + string convertJs = 6; | |
796 | + int32 status = 7; | |
796 | 797 | } | ... | ... |
... | ... | @@ -21,5 +21,5 @@ package org.thingsboard.server.common.data; |
21 | 21 | public enum EntityType { |
22 | 22 | TENANT, CUSTOMER, USER, DASHBOARD, ASSET, DEVICE, ALARM, RULE_CHAIN, RULE_NODE, ENTITY_VIEW, WIDGETS_BUNDLE, WIDGET_TYPE, TENANT_PROFILE, DEVICE_PROFILE, API_USAGE_STATE, TB_RESOURCE, OTA_PACKAGE, EDGE, RPC |
23 | 23 | //Thingskit function |
24 | - , RUNNING_EXCEPTION,SCENE_ACT; | |
24 | + , RUNNING_EXCEPTION,SCENE_ACT,JS_SCRIPT; | |
25 | 25 | } | ... | ... |
1 | +/** | |
2 | + * Copyright © 2016-2022 The Thingsboard Authors | |
3 | + * | |
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | + * you may not use this file except in compliance with the License. | |
6 | + * You may obtain a copy of the License at | |
7 | + * | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | + * | |
10 | + * Unless required by applicable law or agreed to in writing, software | |
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | |
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | + * See the License for the specific language governing permissions and | |
14 | + * limitations under the License. | |
15 | + */ | |
16 | +package org.thingsboard.server.common.data.yunteng.id; | |
17 | + | |
18 | +import com.fasterxml.jackson.annotation.JsonCreator; | |
19 | +import com.fasterxml.jackson.annotation.JsonProperty; | |
20 | +import io.swagger.annotations.ApiModel; | |
21 | +import io.swagger.annotations.ApiModelProperty; | |
22 | +import java.util.UUID; | |
23 | +import org.thingsboard.server.common.data.EntityType; | |
24 | +import org.thingsboard.server.common.data.id.EntityId; | |
25 | +import org.thingsboard.server.common.data.id.UUIDBased; | |
26 | + | |
27 | +@ApiModel | |
28 | +public class ScriptId extends UUIDBased implements EntityId { | |
29 | + | |
30 | + private static final long serialVersionUID = 1L; | |
31 | + | |
32 | + @JsonCreator | |
33 | + public ScriptId(@JsonProperty("id") UUID id) { | |
34 | + super(id); | |
35 | + } | |
36 | + | |
37 | + public static ScriptId fromString(String sceneId) { | |
38 | + return new ScriptId(UUID.fromString(sceneId)); | |
39 | + } | |
40 | + | |
41 | + @Override | |
42 | + @ApiModelProperty(position = 2, required = true, value = "string", example = "JS_SCRIPT", allowableValues = "JS_SCRIPT") | |
43 | + public EntityType getEntityType() { | |
44 | + return EntityType.JS_SCRIPT; | |
45 | + } | |
46 | +} | ... | ... |
... | ... | @@ -22,13 +22,25 @@ import lombok.extern.slf4j.Slf4j; |
22 | 22 | import org.springframework.beans.factory.annotation.Autowired; |
23 | 23 | import org.springframework.beans.factory.annotation.Value; |
24 | 24 | import org.springframework.boot.autoconfigure.condition.ConditionalOnExpression; |
25 | +import org.springframework.boot.context.event.ApplicationReadyEvent; | |
26 | +import org.springframework.context.event.EventListener; | |
27 | +import org.springframework.core.annotation.Order; | |
25 | 28 | import org.springframework.stereotype.Component; |
29 | +import org.thingsboard.common.util.ThingsBoardThreadFactory; | |
30 | +import org.thingsboard.server.common.data.yunteng.enums.StatusEnum; | |
31 | +import org.thingsboard.server.common.data.yunteng.enums.TkScriptFunctionType; | |
26 | 32 | import org.thingsboard.server.common.transport.TransportContext; |
33 | +import org.thingsboard.server.gen.transport.TransportProtos; | |
27 | 34 | import org.thingsboard.server.transport.tcp.adaptors.JsonTcpAdaptor; |
28 | 35 | import org.thingsboard.server.transport.tcp.script.TkScriptInvokeService; |
29 | 36 | |
30 | 37 | import javax.annotation.PostConstruct; |
31 | 38 | import java.net.InetSocketAddress; |
39 | +import java.util.List; | |
40 | +import java.util.UUID; | |
41 | +import java.util.concurrent.Executors; | |
42 | +import java.util.concurrent.ScheduledExecutorService; | |
43 | +import java.util.concurrent.TimeUnit; | |
32 | 44 | import java.util.concurrent.atomic.AtomicInteger; |
33 | 45 | |
34 | 46 | /** |
... | ... | @@ -77,11 +89,14 @@ public class TcpTransportContext extends TransportContext { |
77 | 89 | private boolean proxyEnabled; |
78 | 90 | |
79 | 91 | private final AtomicInteger connectionsCounter = new AtomicInteger(); |
80 | - | |
92 | + protected ScheduledExecutorService schedulerExecutor; | |
93 | + @Getter | |
94 | + private boolean ready=false; | |
81 | 95 | @PostConstruct |
82 | 96 | public void init() { |
83 | 97 | super.init(); |
84 | 98 | transportService.createGaugeStats("openConnections", connectionsCounter); |
99 | + schedulerExecutor = Executors.newSingleThreadScheduledExecutor(ThingsBoardThreadFactory.forName("tcp-init")); | |
85 | 100 | } |
86 | 101 | |
87 | 102 | public void channelRegistered() { |
... | ... | @@ -104,4 +119,19 @@ public class TcpTransportContext extends TransportContext { |
104 | 119 | rateLimitService.onAuthFailure(address); |
105 | 120 | } |
106 | 121 | |
122 | + | |
123 | + @EventListener(ApplicationReadyEvent.class) | |
124 | + @Order(value = 2) | |
125 | + public void onApplicationEvent(ApplicationReadyEvent event) { | |
126 | + List<TransportProtos.ScriptProto> all =transportService.getScripts(TransportProtos.ScriptProto.newBuilder().setStatus(StatusEnum.ENABLE.getIndex()).build()); | |
127 | + if(all ==null){ | |
128 | + return; | |
129 | + } | |
130 | + all.forEach(i->{ | |
131 | + UUID scriptId = new UUID(i.getScriptIdMSB(), i.getScriptIdLSB()); | |
132 | + TkScriptFunctionType scriptType = TkScriptFunctionType.valueOf(i.getFunctionType()); | |
133 | + jsEngine.cacheScript(scriptId, scriptType, i.getConvertJs(), StatusEnum.ENABLE.getIndex()); | |
134 | + }); | |
135 | + schedulerExecutor.schedule(()->ready=true,timeout, TimeUnit.MILLISECONDS); | |
136 | + } | |
107 | 137 | } | ... | ... |
... | ... | @@ -15,6 +15,13 @@ |
15 | 15 | */ |
16 | 16 | package org.thingsboard.server.transport.tcp; |
17 | 17 | |
18 | +import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_ACCEPTED; | |
19 | +import static io.netty.handler.codec.mqtt.MqttMessageType.PUBACK; | |
20 | +import static io.netty.handler.codec.mqtt.MqttMessageType.SUBACK; | |
21 | +import static io.netty.handler.codec.mqtt.MqttQoS.AT_MOST_ONCE; | |
22 | +import static org.thingsboard.server.common.transport.service.DefaultTransportService.SESSION_EVENT_MSG_CLOSED; | |
23 | +import static org.thingsboard.server.common.transport.service.DefaultTransportService.SESSION_EVENT_MSG_OPEN; | |
24 | + | |
18 | 25 | import com.fasterxml.jackson.databind.JsonNode; |
19 | 26 | import com.google.common.util.concurrent.FutureCallback; |
20 | 27 | import com.google.common.util.concurrent.Futures; |
... | ... | @@ -30,18 +37,28 @@ import io.netty.handler.ssl.SslHandler; |
30 | 37 | import io.netty.util.ReferenceCountUtil; |
31 | 38 | import io.netty.util.concurrent.Future; |
32 | 39 | import io.netty.util.concurrent.GenericFutureListener; |
40 | +import java.io.IOException; | |
41 | +import java.net.InetSocketAddress; | |
42 | +import java.util.List; | |
43 | +import java.util.Map; | |
44 | +import java.util.Optional; | |
45 | +import java.util.UUID; | |
46 | +import java.util.concurrent.atomic.AtomicInteger; | |
47 | +import java.util.regex.Pattern; | |
33 | 48 | import lombok.extern.slf4j.Slf4j; |
34 | 49 | import org.apache.commons.lang3.StringUtils; |
35 | 50 | import org.checkerframework.checker.nullness.qual.Nullable; |
51 | +import org.springframework.boot.context.event.ApplicationReadyEvent; | |
52 | +import org.springframework.context.event.EventListener; | |
36 | 53 | import org.thingsboard.common.util.JacksonUtil; |
37 | 54 | import org.thingsboard.server.common.data.DataConstants; |
38 | 55 | import org.thingsboard.server.common.data.Device; |
39 | 56 | import org.thingsboard.server.common.data.DeviceProfile; |
40 | 57 | import org.thingsboard.server.common.data.DeviceTransportType; |
41 | -import org.thingsboard.server.common.data.device.profile.MqttTopics; | |
42 | 58 | import org.thingsboard.server.common.data.device.profile.TkTcpDeviceProfileTransportConfiguration; |
43 | 59 | import org.thingsboard.server.common.data.id.DeviceId; |
44 | 60 | import org.thingsboard.server.common.data.rpc.RpcStatus; |
61 | +import org.thingsboard.server.common.data.yunteng.enums.StatusEnum; | |
45 | 62 | import org.thingsboard.server.common.data.yunteng.enums.TkScriptFunctionType; |
46 | 63 | import org.thingsboard.server.common.data.yunteng.utils.ByteUtils; |
47 | 64 | import org.thingsboard.server.common.msg.tools.TbRateLimitsException; |
... | ... | @@ -55,35 +72,15 @@ import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsRes |
55 | 72 | import org.thingsboard.server.common.transport.service.DefaultTransportService; |
56 | 73 | import org.thingsboard.server.common.transport.service.SessionMetaData; |
57 | 74 | import org.thingsboard.server.gen.transport.TransportProtos; |
58 | -import org.thingsboard.server.gen.transport.TransportProtos.ScriptProto; | |
59 | 75 | import org.thingsboard.server.transport.tcp.adaptors.TcpAuthEntry; |
60 | -import org.thingsboard.server.transport.tcp.adaptors.TcpDownEntry; | |
61 | 76 | import org.thingsboard.server.transport.tcp.adaptors.TcpTransportAdaptor; |
62 | 77 | import org.thingsboard.server.transport.tcp.adaptors.TcpUpEntry; |
63 | 78 | import org.thingsboard.server.transport.tcp.script.TkScriptFactory; |
79 | +import org.thingsboard.server.transport.tcp.script.TkScriptInvokeService; | |
64 | 80 | import org.thingsboard.server.transport.tcp.session.TcpDeviceSessionCtx; |
65 | 81 | import org.thingsboard.server.transport.tcp.session.TcpGatewaySessionHandler; |
66 | 82 | import org.thingsboard.server.transport.tcp.util.ByteBufUtils; |
67 | 83 | |
68 | -import java.io.IOException; | |
69 | -import java.net.InetSocketAddress; | |
70 | -import java.util.List; | |
71 | -import java.util.Map; | |
72 | -import java.util.Optional; | |
73 | -import java.util.UUID; | |
74 | -import java.util.concurrent.ConcurrentHashMap; | |
75 | -import java.util.concurrent.ConcurrentMap; | |
76 | -import java.util.concurrent.TimeUnit; | |
77 | -import java.util.concurrent.atomic.AtomicInteger; | |
78 | -import java.util.regex.Pattern; | |
79 | - | |
80 | -import static io.netty.handler.codec.mqtt.MqttConnectReturnCode.CONNECTION_ACCEPTED; | |
81 | -import static io.netty.handler.codec.mqtt.MqttMessageType.PUBACK; | |
82 | -import static io.netty.handler.codec.mqtt.MqttMessageType.SUBACK; | |
83 | -import static io.netty.handler.codec.mqtt.MqttQoS.AT_MOST_ONCE; | |
84 | -import static org.thingsboard.server.common.transport.service.DefaultTransportService.SESSION_EVENT_MSG_CLOSED; | |
85 | -import static org.thingsboard.server.common.transport.service.DefaultTransportService.SESSION_EVENT_MSG_OPEN; | |
86 | - | |
87 | 84 | /** |
88 | 85 | * @author Andrew Shvayka |
89 | 86 | */ |
... | ... | @@ -104,9 +101,6 @@ public class TcpTransportHandler extends ChannelInboundHandlerAdapter implements |
104 | 101 | volatile InetSocketAddress address; |
105 | 102 | |
106 | 103 | volatile TcpGatewaySessionHandler gatewaySessionHandler; |
107 | - | |
108 | - | |
109 | - private final ConcurrentMap<UUID, String> authScripts; | |
110 | 104 | private final AtomicInteger authedCounter = new AtomicInteger(); |
111 | 105 | |
112 | 106 | |
... | ... | @@ -116,22 +110,6 @@ public class TcpTransportHandler extends ChannelInboundHandlerAdapter implements |
116 | 110 | this.transportService = context.getTransportService(); |
117 | 111 | this.sslHandler = sslHandler; |
118 | 112 | this.deviceSessionCtx = new TcpDeviceSessionCtx(sessionId, context); |
119 | - this.authScripts = new ConcurrentHashMap<>(); | |
120 | - List<ScriptProto> authScripts = this.transportService.getScripts(ScriptProto.newBuilder().setFunctionType(TkScriptFunctionType.TRANSPORT_TCP_AUTH.ordinal()).build()); | |
121 | - authScripts.stream().forEach(i -> { | |
122 | - UUID scriptId = new UUID(i.getScriptIdMSB(), i.getScriptIdLSB()); | |
123 | - deviceSessionCtx.cacheScript(scriptId, TkScriptFunctionType.TRANSPORT_TCP_AUTH, i.getConvertJs(), new FutureCallback<UUID>() { | |
124 | - @Override | |
125 | - public void onSuccess(@Nullable UUID result) { | |
126 | - TcpTransportHandler.this.authScripts.put(scriptId, result.toString()); | |
127 | - } | |
128 | - | |
129 | - @Override | |
130 | - public void onFailure(Throwable t) { | |
131 | - } | |
132 | - }); | |
133 | - | |
134 | - }); | |
135 | 113 | } |
136 | 114 | |
137 | 115 | @Override |
... | ... | @@ -149,10 +127,14 @@ public class TcpTransportHandler extends ChannelInboundHandlerAdapter implements |
149 | 127 | @Override |
150 | 128 | public void channelRead(ChannelHandlerContext ctx, Object msg) { |
151 | 129 | log.trace("【{}】 Processing msg: 【{}】", sessionId, msg); |
152 | - if (address == null) { | |
153 | - address = getAddress(ctx); | |
154 | - } | |
155 | 130 | try { |
131 | + if(!context.isReady()){ | |
132 | + ctx.close(); | |
133 | + return; | |
134 | + } | |
135 | + if (address == null) { | |
136 | + address = getAddress(ctx); | |
137 | + } | |
156 | 138 | if (msg instanceof ByteBuf) { |
157 | 139 | ByteBuf message = (ByteBuf) msg; |
158 | 140 | byte[] byteMsg = ByteBufUtils.buf2Bytes(message); |
... | ... | @@ -316,7 +298,7 @@ public class TcpTransportHandler extends ChannelInboundHandlerAdapter implements |
316 | 298 | deviceSessionCtx.setProvisionOnly(true); |
317 | 299 | ctx.writeAndFlush(createTcpConnAckMsg(CONNECTION_ACCEPTED.name())); |
318 | 300 | } else { |
319 | - authScripts.forEach((id, idStr) -> { | |
301 | + TkScriptInvokeService.authScripts.forEach(id -> { | |
320 | 302 | ListenableFuture item = context.getJsEngine().invokeFunction(id, accessToken); |
321 | 303 | Futures.addCallback(item, new FutureCallback<String>() { |
322 | 304 | @Override |
... | ... | @@ -494,7 +476,7 @@ public class TcpTransportHandler extends ChannelInboundHandlerAdapter implements |
494 | 476 | |
495 | 477 | private void onValidateFailed(ChannelHandlerContext ctx, MqttConnectReturnCode msg) { |
496 | 478 | authedCounter.incrementAndGet(); |
497 | - if (authScripts.size() == authedCounter.incrementAndGet()) { | |
479 | + if (TkScriptInvokeService.authScripts.size() == authedCounter.incrementAndGet()) { | |
498 | 480 | ctx.writeAndFlush(createTcpConnAckMsg(msg.name())); |
499 | 481 | ctx.close(); |
500 | 482 | } | ... | ... |
... | ... | @@ -15,15 +15,29 @@ |
15 | 15 | */ |
16 | 16 | package org.thingsboard.server.transport.tcp.script; |
17 | 17 | |
18 | +import com.google.common.util.concurrent.FutureCallback; | |
18 | 19 | import com.google.common.util.concurrent.Futures; |
19 | 20 | import com.google.common.util.concurrent.ListenableFuture; |
21 | +import com.google.common.util.concurrent.MoreExecutors; | |
22 | +import lombok.Getter; | |
20 | 23 | import lombok.extern.slf4j.Slf4j; |
24 | +import org.jetbrains.annotations.NotNull; | |
25 | +import org.jetbrains.annotations.Nullable; | |
26 | +import org.springframework.boot.context.event.ApplicationReadyEvent; | |
27 | +import org.springframework.context.event.EventListener; | |
28 | +import org.springframework.core.annotation.Order; | |
21 | 29 | import org.thingsboard.common.util.ThingsBoardThreadFactory; |
30 | +import org.thingsboard.server.common.data.yunteng.dto.TkDeviceScriptDTO; | |
31 | +import org.thingsboard.server.common.data.yunteng.enums.StatusEnum; | |
22 | 32 | import org.thingsboard.server.common.data.yunteng.enums.TkScriptFunctionType; |
33 | +import org.thingsboard.server.common.transport.DeviceUpdatedEvent; | |
34 | +import org.thingsboard.server.common.transport.yunteng.ScriptUpdatedEvent; | |
35 | +import org.thingsboard.server.gen.transport.TransportProtos; | |
23 | 36 | |
24 | 37 | import java.util.Map; |
25 | 38 | import java.util.UUID; |
26 | 39 | import java.util.concurrent.ConcurrentHashMap; |
40 | +import java.util.concurrent.ConcurrentSkipListSet; | |
27 | 41 | import java.util.concurrent.Executors; |
28 | 42 | import java.util.concurrent.ScheduledExecutorService; |
29 | 43 | import java.util.concurrent.atomic.AtomicInteger; |
... | ... | @@ -57,11 +71,20 @@ public abstract class AbstractTkScriptInvokeService implements TkScriptInvokeSer |
57 | 71 | @Override |
58 | 72 | public ListenableFuture<UUID> eval(UUID id,TkScriptFunctionType scriptType, String scriptBody) { |
59 | 73 | UUID scriptId = id == null?UUID.randomUUID():id; |
60 | - String functionName = "invokeInternal_" + scriptId.toString().replace('-', '_'); | |
61 | - String jsScript = generateJsScript(scriptType, functionName, scriptBody); | |
74 | + String functionName = getFunctionName(scriptId); | |
75 | + String jsScript = generateJsScript(scriptType, functionName, scriptBody); | |
62 | 76 | return doEval(scriptId, functionName, jsScript); |
63 | 77 | } |
64 | 78 | |
79 | + @NotNull | |
80 | + /** | |
81 | + * 构建脚本函数名称 | |
82 | + */ | |
83 | + private static String getFunctionName(UUID scriptId) { | |
84 | + String functionName = "invokeInternal_" + scriptId.toString().replace('-', '_'); | |
85 | + return functionName; | |
86 | + } | |
87 | + | |
65 | 88 | @Override |
66 | 89 | public ListenableFuture<Object> invokeFunction(UUID scriptId, Object... args) { |
67 | 90 | String functionName = scriptIdToNameMap.get(scriptId); |
... | ... | @@ -153,4 +176,46 @@ public abstract class AbstractTkScriptInvokeService implements TkScriptInvokeSer |
153 | 176 | return expirationTime; |
154 | 177 | } |
155 | 178 | } |
179 | + | |
180 | + @EventListener(ScriptUpdatedEvent.class) | |
181 | + public void onDeviceUpdatedOrCreated(ScriptUpdatedEvent scriptUpdatedEvent) throws Exception { | |
182 | + TkDeviceScriptDTO scriptDTO = scriptUpdatedEvent.getScript(); | |
183 | + UUID scriptId = UUID.fromString(scriptDTO.getId()); | |
184 | + cacheScript(scriptId,scriptDTO.getScriptType(),scriptDTO.getConvertJs(), scriptDTO.getStatus()); | |
185 | + } | |
186 | + | |
187 | + /** | |
188 | + * 编译并缓存脚本引擎 | |
189 | + * | |
190 | + * @param scriptId 脚本引擎ID | |
191 | + * @param scriptType 脚本类型 | |
192 | + * @param convertStr 脚本内容 | |
193 | + * @param stats 脚本状态 | |
194 | + */ | |
195 | + public void cacheScript(UUID scriptId, TkScriptFunctionType scriptType, String convertStr, Integer stats) { | |
196 | + if(stats == StatusEnum.DISABLE.getIndex()){ | |
197 | + authScripts.remove(scriptId); | |
198 | + try { | |
199 | + doRelease(scriptId,getFunctionName(scriptId)); | |
200 | + } catch (Exception e) { | |
201 | + log.error(String.format("脚本【%s】内存释放失败:【%s】", convertStr, e)); | |
202 | + } | |
203 | + } else if (stats == StatusEnum.ENABLE.getIndex()) { | |
204 | + ListenableFuture<UUID> result = eval(scriptId, scriptType, convertStr); | |
205 | + | |
206 | + Futures.addCallback(result, new FutureCallback() { | |
207 | + @Override | |
208 | + public void onSuccess(@Nullable Object result) { | |
209 | + if(TkScriptFunctionType.TRANSPORT_TCP_AUTH == scriptType){ | |
210 | + authScripts.add(scriptId); | |
211 | + } | |
212 | + } | |
213 | + | |
214 | + @Override | |
215 | + public void onFailure(Throwable t) { | |
216 | + log.error(String.format("脚本【%s】解析时出现异常:【%s】", convertStr, t)); | |
217 | + } | |
218 | + }, MoreExecutors.directExecutor()); | |
219 | + } | |
220 | + } | |
156 | 221 | } | ... | ... |
... | ... | @@ -16,9 +16,11 @@ |
16 | 16 | package org.thingsboard.server.transport.tcp.script; |
17 | 17 | |
18 | 18 | import com.google.common.util.concurrent.ListenableFuture; |
19 | +import lombok.Getter; | |
19 | 20 | import org.thingsboard.server.common.data.yunteng.enums.TkScriptFunctionType; |
20 | 21 | |
21 | 22 | import java.util.UUID; |
23 | +import java.util.concurrent.ConcurrentSkipListSet; | |
22 | 24 | |
23 | 25 | public interface TkScriptInvokeService { |
24 | 26 | |
... | ... | @@ -46,4 +48,16 @@ public interface TkScriptInvokeService { |
46 | 48 | */ |
47 | 49 | ListenableFuture<Void> release(UUID scriptId); |
48 | 50 | |
51 | + | |
52 | + /**TCP协议的鉴权脚本*/ | |
53 | + ConcurrentSkipListSet<UUID> authScripts = new ConcurrentSkipListSet<>(); | |
54 | + /** | |
55 | + * 编译并缓存脚本引擎 | |
56 | + * | |
57 | + * @param scriptId 脚本引擎ID | |
58 | + * @param scriptType 脚本类型 | |
59 | + * @param convertStr 脚本内容 | |
60 | + * @param stats 脚本状态 | |
61 | + */ | |
62 | + public void cacheScript(UUID scriptId, TkScriptFunctionType scriptType, String convertStr, Integer stats); | |
49 | 63 | } | ... | ... |
... | ... | @@ -87,36 +87,10 @@ public abstract class TcpDeviceWareSessionContext extends DeviceAwareSessionCont |
87 | 87 | this.authScriptId = UUID.fromString(tcpConfiguration.getAuthScriptId()); |
88 | 88 | } |
89 | 89 | this.telemetryScriptId = UUID.fromString(tcpConfiguration.getUpScriptId()); |
90 | - TransportProtos.ScriptProto upScript = context.getTransportService().getScripts(TransportProtos.ScriptProto.newBuilder().setScriptIdLSB(telemetryScriptId.getLeastSignificantBits()).setScriptIdMSB(telemetryScriptId.getMostSignificantBits()).build()).get(0); | |
91 | - cacheScript(telemetryScriptId, TkScriptFunctionType.TRANSPORT_TCP_UP, upScript.getConvertJs(), null); | |
92 | 90 | |
93 | 91 | } |
94 | 92 | |
95 | - /** | |
96 | - * 编译并缓存脚本引擎 | |
97 | - * | |
98 | - * @param scriptId 脚本引擎ID | |
99 | - * @param scriptType 脚本类型 | |
100 | - * @param convertStr 脚本内容 | |
101 | - * @param callback 脚本引擎执行成功的回调函数 | |
102 | - */ | |
103 | - public void cacheScript(UUID scriptId, TkScriptFunctionType scriptType, String convertStr, FutureCallback callback) { | |
104 | - ListenableFuture<UUID> result = context.getJsEngine().eval(scriptId, scriptType, convertStr); | |
105 | - if (callback == null) { | |
106 | - callback = new FutureCallback() { | |
107 | - @Override | |
108 | - public void onSuccess(@Nullable Object result) { | |
109 | - | |
110 | - } | |
111 | 93 | |
112 | - @Override | |
113 | - public void onFailure(Throwable t) { | |
114 | - log.error(String.format("脚本【%s】解析时出现异常:【%s】", convertStr, t)); | |
115 | - } | |
116 | - }; | |
117 | - } | |
118 | - Futures.addCallback(result, callback, MoreExecutors.directExecutor()); | |
119 | - } | |
120 | 94 | |
121 | 95 | /** |
122 | 96 | * 执行上行数据的解析脚本 | ... | ... |
... | ... | @@ -21,6 +21,7 @@ import com.google.common.util.concurrent.MoreExecutors; |
21 | 21 | import com.google.gson.Gson; |
22 | 22 | import com.google.gson.JsonObject; |
23 | 23 | import com.google.protobuf.ByteString; |
24 | +import lombok.Getter; | |
24 | 25 | import lombok.extern.slf4j.Slf4j; |
25 | 26 | import org.springframework.beans.factory.annotation.Value; |
26 | 27 | import org.springframework.context.ApplicationEventPublisher; |
... | ... | @@ -49,6 +50,7 @@ import org.thingsboard.server.common.data.id.TenantProfileId; |
49 | 50 | import org.thingsboard.server.common.data.rpc.RpcStatus; |
50 | 51 | import org.thingsboard.server.common.data.yunteng.core.exception.TkDataValidationException; |
51 | 52 | import org.thingsboard.server.common.data.yunteng.core.message.ErrorMessage; |
53 | +import org.thingsboard.server.common.data.yunteng.dto.TkDeviceScriptDTO; | |
52 | 54 | import org.thingsboard.server.common.msg.TbMsg; |
53 | 55 | import org.thingsboard.server.common.msg.TbMsgMetaData; |
54 | 56 | import org.thingsboard.server.common.msg.queue.ServiceQueue; |
... | ... | @@ -72,6 +74,7 @@ import org.thingsboard.server.common.transport.auth.ValidateDeviceCredentialsRes |
72 | 74 | import org.thingsboard.server.common.transport.limits.TransportRateLimitService; |
73 | 75 | import org.thingsboard.server.common.transport.util.DataDecodingEncodingService; |
74 | 76 | import org.thingsboard.server.common.transport.util.JsonUtils; |
77 | +import org.thingsboard.server.common.transport.yunteng.ScriptUpdatedEvent; | |
75 | 78 | import org.thingsboard.server.gen.transport.TransportProtos; |
76 | 79 | import org.thingsboard.server.gen.transport.TransportProtos.ProvisionDeviceRequestMsg; |
77 | 80 | import org.thingsboard.server.gen.transport.TransportProtos.ProvisionDeviceResponseMsg; |
... | ... | @@ -109,13 +112,7 @@ import java.util.Optional; |
109 | 112 | import java.util.Random; |
110 | 113 | import java.util.Set; |
111 | 114 | import java.util.UUID; |
112 | -import java.util.concurrent.ConcurrentHashMap; | |
113 | -import java.util.concurrent.ConcurrentMap; | |
114 | -import java.util.concurrent.ExecutionException; | |
115 | -import java.util.concurrent.ExecutorService; | |
116 | -import java.util.concurrent.Executors; | |
117 | -import java.util.concurrent.ScheduledFuture; | |
118 | -import java.util.concurrent.TimeUnit; | |
115 | +import java.util.concurrent.*; | |
119 | 116 | import java.util.concurrent.atomic.AtomicInteger; |
120 | 117 | import java.util.stream.Collectors; |
121 | 118 | |
... | ... | @@ -188,6 +185,9 @@ public class DefaultTransportService implements TransportService { |
188 | 185 | private final ConcurrentMap<UUID, SessionActivityData> sessionsActivity = new ConcurrentHashMap<>(); |
189 | 186 | private final Map<String, RpcRequestMetadata> toServerRpcPendingMap = new ConcurrentHashMap<>(); |
190 | 187 | |
188 | + @Getter | |
189 | + /**TCP协议的鉴权脚本*/ | |
190 | + private final ConcurrentSkipListSet<UUID> authScripts = new ConcurrentSkipListSet<>(); | |
191 | 191 | private volatile boolean stopped = false; |
192 | 192 | |
193 | 193 | public DefaultTransportService(TbServiceInfoProvider serviceInfoProvider, |
... | ... | @@ -939,6 +939,13 @@ public class DefaultTransportService implements TransportService { |
939 | 939 | onDeviceUpdate(device); |
940 | 940 | eventPublisher.publishEvent(new DeviceUpdatedEvent(device)); |
941 | 941 | }); |
942 | + | |
943 | + //Thingskit function | |
944 | + } else if (EntityType.JS_SCRIPT.equals(entityType)) { | |
945 | + Optional<TkDeviceScriptDTO> deviceOpt = dataDecodingEncodingService.decode(msg.getData().toByteArray()); | |
946 | + deviceOpt.ifPresent(device -> { | |
947 | + eventPublisher.publishEvent(new ScriptUpdatedEvent(device)); | |
948 | + }); | |
942 | 949 | } |
943 | 950 | } else if (toSessionMsg.hasEntityDeleteMsg()) { |
944 | 951 | TransportProtos.EntityDeleteMsg msg = toSessionMsg.getEntityDeleteMsg(); | ... | ... |
1 | +/** | |
2 | + * Copyright © 2016-2022 The Thingsboard Authors | |
3 | + * | |
4 | + * Licensed under the Apache License, Version 2.0 (the "License"); | |
5 | + * you may not use this file except in compliance with the License. | |
6 | + * You may obtain a copy of the License at | |
7 | + * | |
8 | + * http://www.apache.org/licenses/LICENSE-2.0 | |
9 | + * | |
10 | + * Unless required by applicable law or agreed to in writing, software | |
11 | + * distributed under the License is distributed on an "AS IS" BASIS, | |
12 | + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
13 | + * See the License for the specific language governing permissions and | |
14 | + * limitations under the License. | |
15 | + */ | |
16 | +package org.thingsboard.server.common.transport.yunteng; | |
17 | + | |
18 | +import lombok.Getter; | |
19 | +import org.thingsboard.server.common.data.Device; | |
20 | +import org.thingsboard.server.common.data.yunteng.dto.TkDeviceScriptDTO; | |
21 | + | |
22 | +@Getter | |
23 | +public class ScriptUpdatedEvent { | |
24 | + private final TkDeviceScriptDTO script; | |
25 | + | |
26 | + public ScriptUpdatedEvent(TkDeviceScriptDTO script) { | |
27 | + this.script = script; | |
28 | + } | |
29 | +} | ... | ... |
... | ... | @@ -16,6 +16,7 @@ import org.thingsboard.server.common.data.yunteng.constant.QueryConstant; |
16 | 16 | import org.thingsboard.server.common.data.yunteng.core.exception.TkDataValidationException; |
17 | 17 | import org.thingsboard.server.common.data.yunteng.core.message.ErrorMessage; |
18 | 18 | import org.thingsboard.server.common.data.yunteng.dto.*; |
19 | +import org.thingsboard.server.common.data.yunteng.enums.StatusEnum; | |
19 | 20 | import org.thingsboard.server.common.data.yunteng.enums.TkScriptFunctionType; |
20 | 21 | import org.thingsboard.server.common.data.yunteng.enums.TransportTypeEnum; |
21 | 22 | import org.thingsboard.server.common.data.yunteng.utils.ByteUtils; |
... | ... | @@ -101,17 +102,10 @@ public class TkDeviceScriptServiceImpl |
101 | 102 | } |
102 | 103 | |
103 | 104 | @Override |
104 | - public List<TkDeviceScriptDTO> getScriptes(TkScriptFunctionType type, UUID tenantId, UUID projectId, UUID scriptId){ | |
105 | + public List<TkDeviceScriptDTO> getScriptes(){ | |
105 | 106 | LambdaQueryWrapper<TkDeviceScriptEntity> queryWrapper = |
106 | 107 | new QueryWrapper<TkDeviceScriptEntity>().lambda(); |
107 | - if(scriptId == null){ | |
108 | - queryWrapper.eq(TkDeviceScriptEntity::getScriptType, type); | |
109 | - }else{ | |
110 | - queryWrapper.eq(TkDeviceScriptEntity::getId, scriptId.toString()); | |
111 | - } | |
112 | - if(tenantId != null){ | |
113 | - queryWrapper.eq(TkDeviceScriptEntity::getTenantId, tenantId.toString()); | |
114 | - } | |
108 | + queryWrapper.eq(TkDeviceScriptEntity::getStatus, StatusEnum.ENABLE.getIndex()); | |
115 | 109 | List<TkDeviceScriptEntity> result = baseMapper.selectList(queryWrapper); |
116 | 110 | return result.stream() |
117 | 111 | .map(item -> item.getDTO(TkDeviceScriptDTO.class)) |
... | ... | @@ -250,7 +244,7 @@ public class TkDeviceScriptServiceImpl |
250 | 244 | LambdaQueryWrapper<TkDeviceScriptEntity> queryWrapper = |
251 | 245 | new QueryWrapper<TkDeviceScriptEntity>() |
252 | 246 | .lambda() |
253 | - .eq(TkDeviceScriptEntity::getStatus,1) | |
247 | + .eq(TkDeviceScriptEntity::getStatus,StatusEnum.ENABLE.getIndex()) | |
254 | 248 | .eq(scriptType!=null,TkDeviceScriptEntity::getScriptType, scriptType) |
255 | 249 | .and(s->s.eq(TkDeviceScriptEntity::getTenantId, tenantId).or(r->r.eq(TkDeviceScriptEntity::getTenantId, EntityId.NULL_UUID.toString()))); |
256 | 250 | return baseMapper.selectList(queryWrapper).stream() | ... | ... |
... | ... | @@ -13,11 +13,9 @@ public interface TkDeviceScriptService extends BaseService<TkDeviceScriptEntity> |
13 | 13 | |
14 | 14 | /** |
15 | 15 | * 查找脚本解析的脚本内容 |
16 | - * @param tenantId | |
17 | - * @param scriptId | |
18 | 16 | * @return |
19 | 17 | */ |
20 | - List<TkDeviceScriptDTO> getScriptes(TkScriptFunctionType type, UUID tenantId, UUID projectId, UUID scriptId); | |
18 | + List<TkDeviceScriptDTO> getScriptes(); | |
21 | 19 | |
22 | 20 | TkDeviceScriptDTO insertOrUpdate(TkDeviceScriptDTO deviceDTO); |
23 | 21 | ... | ... |