采坑websocket總結

一笑杯莫停 2022-07-24 00:24:33 阅读数:80

websocket

需求:

  1. 要Nginx代理(不向外暴露端口號)
  2. 創建namespace監聽(事件名稱:connectTask,namespace名稱:/remote)
  3. 可隨時終止服務邏輯,但不關閉客戶端(异步執行服務邏輯)
  4. 同一瀏覽器不同頁面打開建立新的客戶端(session要不同)

socketio采坑

版本

 <!--netty socketio-->
<dependency>
<groupId>com.corundumstudio.socketio</groupId>
<artifactId>netty-socketio</artifactId>
<version>1.7.18</version>
</dependency>

 properties

netty.io.host=0.0.0.0
netty.io.port=9092
# 設置最大每幀處理數據的長度,防止他人利用大數據來攻擊服務器
socketio.maxFramePayloadLength=1048576
# 設置http交互最大內容長度
socketio.maxHttpContentLength=1048576
# socket連接數大小(如只監聽一個端口boss線程組為1即可)
socketio.bossCount=1
socketio.workCount=100
socketio.allowCustomRequests=true
# 協議昇級超時時間(毫秒),默認10秒。HTTP握手昇級為ws協議超時時間
socketio.upgradeTimeout=10000
# Ping消息超時時間(毫秒),默認60秒,這個時間間隔內沒有接收到心跳消息就會發送超時事件
socketio.pingTimeout=60000
# Ping消息間隔(毫秒),默認10秒。客戶端向服務器發送一條心跳消息間隔
socketio.pingInterval=10000

配置類

同一瀏覽器不同頁面打開建立新的客戶端,所以每次請求需要創建一個隨機的session id。

這裏踩了個坑,之前的老版本比如1.7.14,要生成隨機的session id就必須自己重寫相關邏輯

1.LocalAuthorizeHandler extends AuthorizeHandler------->
然後重寫方法:
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
}
然後實現隨機UUID的生成
2.class LocalSocketIoChannelInitializer extends SocketIOChannelInitializer------->
重寫裏面需要設置AuthorizeHandler的方法,替換為LocalAuthorizeHandler
3.在config類中將LocalSocketIoChannelInitializer set到SocketIOServer 裏面,像這樣
SocketIOServer server = new SocketIOServer(config);
server.setPipelineFactory(new SksSocketIoChannelInitializer());

 簡直low上天了,最後用新版本發現只需要在config中setRandomSession(true)就搞定了

config.setRandomSession(true);

 config類

@org.springframework.context.annotation.Configuration
public class SocketIoConfig {
private final Logger LOGGER = LoggerFactory.getLogger(SocketIoConfig.class);
@Value("${netty.io.host}")
private String host;
@Value("${netty.io.port}")
private Integer port;
@Value("${socketio.bossCount}")
private int bossCount;
@Value("${socketio.workCount}")
private int workCount;
@Value("${socketio.upgradeTimeout}")
private int upgradeTimeout;
@Value("${socketio.pingTimeout}")
private int pingTimeout;
@Value("${socketio.pingInterval}")
private int pingInterval;
@Bean
public SocketIOServer socketIOServer() {
SocketConfig socketConfig = new SocketConfig();
socketConfig.setTcpNoDelay(true);
Configuration config = new Configuration();
config.setRandomSession(true);
config.setAllowCustomRequests(true);
config.setSocketConfig(socketConfig);
config.setPort(port);
config.setHostname(host);
config.setBossThreads(bossCount);
config.setWorkerThreads(workCount);
config.setPingTimeout(pingTimeout);
config.setPingInterval(pingInterval);
config.setUpgradeTimeout(upgradeTimeout);
config.setMaxHttpContentLength(maxHttpContentLength);
//如果要用到Nginx代理,需要添加一個路徑/localPath區分不同的websocket,否則默認路徑是/socket.io很難搞
config.setContext("/localPath" + config.getContext());
config.setMaxFramePayloadLength(maxFramePayloadLength);
//該處可以用來進行身份驗證
config.setAuthorizationListener(new AuthorizationListener() {
@Override
public boolean isAuthorized(HandshakeData data) {
........
});
return new SocketIOServer(config);
}
}

 WebSocketHandler 類

添加namespace,這兒沒有添加room。namespace就相當於給連接分類,方便細化管理和廣播。這裏不深入說namespace和room

@Component
public class WebSocketHandler {
private final Logger log = LoggerFactory.getLogger(this.getClass());
private final SocketIOServer socketIOServer;
private final ConnectService connectService;
public final static String DEV_CONNECT = "connectTask";
/**
* 關聯client和webSocket session
*/
public static final Map<UUID, SocketIOClient> CLIENT_MAP = new ConcurrentHashMap<>();
public WebSocketHandler(SocketIOServer socketIOServer,ConnectService connectService) {
this.socketIOServer = socketIOServer;
this.connectService= connectService;
}
@PostConstruct
public void autoStart() {
this.start();
}
@PreDestroy
private void onDestroy() {
if (socketIOServer != null) {
socketIOServer.stop();
}
}
public void start() {
socketIOServer.addConnectListener(client -> {
if (CLIENT_MAP.containsKey(client.getSessionId())) {
client.disconnect();
} else {
CLIENT_MAP.put(client.getSessionId(), client);
}
});
socketIOServer.addDisconnectListener(client -> {
remoteWebHandler.onDisconnect(client);
remoteWebHandler.cleanContext(client);
log.info("clean client:{}", client.getSessionId());
client.disconnect();
});
socketIOServer.start();
log.info("start finish");
addOnDataNamespace();
}
private void addOnDataNamespace() {
final SocketIONamespace namespace = socketIOServer.addNamespace("/remote");
namespace.addEventListener(WebSocketHandler.DEV_CONNECT, String.class, new DataListener<String>() {
/**
* Invokes when data object received from client
*
* @param client - receiver
* @param data - received object
* @param ackSender - ack request
* @throws Exception
*/
@Override
public void onData(SocketIOClient client, String data, AckRequest ackSender) {
try {
connectService.onData(client, data);
} catch (Exception e) {
log.error("onData error", e);
}
}
});
}
}

service類大致寫了些邏輯

connect類消息

  1. 使用線程池來實現异步執行相關邏輯
  2. 緩存Future等信息作為上下文
  3. 執行邏輯

disconnect類消息

  1. 1.設置上下文的中斷標志isStop為true()
  2. 2.future.cancel(true)取消任務
  • 參數true:如果任務已經被執行,則會嘗試中斷處理。(中斷處理會改變中斷標志比特,任務應該判斷isInterrupted()或者在任務過程中使用sleep,這樣任務才可以被中斷)
  • 參數false:如果任務已經被執行,則會等待任務執行完畢。如果是個無線循環任務,將會無法停止。

3.發送消息給客戶端,如果想關閉連接可以client.disconnect();還要清除上下文緩存

@Component
public class DeviceRemoteWebHandler {
private final Logger log = LoggerFactory.getLogger(this.getClass());
private final ThreadPoolTaskExecutor threadPool;
private Map<UUID, RemoteWebContext> remoteWebContexts = new ConcurrentHashMap<>();
public DeviceRemoteWebHandler(@Qualifier(BeanConsts.USER_DATA_EXECUTOR_SERVICE) ThreadPoolTaskExecutor threadPool) {
this.threadPool = threadPool;
}
public void onData(final SocketIOClient client, final String data) {
getContext(client);
ClientMessage message = JsonUtils.deserializeJson(JsonUtils.getObjectMapper(), data, ClientMessage.class);
if (message == null) {
log.info("client message is empty");
return;
}
//remote的connect信息
if (connect message) {
try {
connect(client, message.getData());
} catch (InterruptedException i) {
return;
} catch (Exception e) {
log.error("connect event failed", e);
sendMsgToClient(client, getFailResult(DeviceConnectResultType.SERVER_FAIL, client));
return;
}
}
//remote的disconnect信息
if (disconnect message) {
onDisconnect(client);
}
}
private RemoteWebContext getContext(final SocketIOClient client) {
if (remoteWebContexts.containsKey(client.getSessionId())) {
return remoteWebContexts.get(client.getSessionId());
}
RemoteWebContext remoteWebContext = new RemoteWebContext(client.getSessionId());
remoteWebContexts.put(client.getSessionId(), remoteWebContext);
return remoteWebContext;
}
public void cleanContext(final SocketIOClient client) {
if (remoteWebContexts.containsKey(client.getSessionId())) {
log.info("remove remoteWebContext by sessionId:{}", client.getSessionId());
remoteWebContexts.remove(client.getSessionId());
}
}
public void onDisconnect(final SocketIOClient client) {
if (remoteWebContexts.containsKey(client.getSessionId())) {
RemoteWebContext context = getContext(client);
context.setIsStop();
Future future = remoteWebContexts.get(client.getSessionId()).getFuture();
log.info("Try to Interrupt if running");
if (future == null) {
//判空處理
return;
}
try {
future.cancel(true);
} catch (Exception e) {
log.error("DisConnect Failed.", e);
return;
}
sendMsgToClient(client);
}
}
/**
* 連接建立後處理
*/
public void connect(final SocketIOClient client, final DeviceConnectModel connectData) throws InterruptedException {
log.info("接收到參數:{}", client.getHandshakeData().getUrlParams());
Future<?> deviceConnectFuture = threadPool.submit(() -> {
@Todo 需要异步執行的邏輯
});
//對Future進行緩存
remoteWebContexts.put(client.getSessionId(), new RemoteWebContext(deviceConnectFuture));
}
/**
* send msg
*
* @param client socket.io client
*/
private void sendMsgToClient(SocketIOClient client, DeviceConnectResult connectResult) {
MessageBase<DeviceConnectResult> result = new MessageBase(configInstance.getDeviceConnectType(), connectResult);
log.info("starting send connect message,sid={}\nvalue {}", client.getSessionId(), result);
try {
client.sendEvent(WebSocketHandler.DEV_CONNECT, result);
} catch (Exception e) {
log.error("send message {} to client failed", result, e);
throw new RuntimeException(e.getMessage(), e);
}
}
}

版权声明:本文为[一笑杯莫停]所创,转载请带上原文链接,感谢。 https://gsmany.com/2022/205/202207240024094053.html