Compare commits
1 Commits
Author | SHA1 | Date |
---|---|---|
|
0a40b18f5c |
|
@ -1,4 +1,4 @@
|
|||
FROM 119.45.158.12:5000/mini-java8:latest
|
||||
COPY target/hbt-template-1.0-SNAPSHOT.jar /app/app.jar
|
||||
COPY target/hbt-vehicle-management-biz-1.0-SNAPSHOT.jar /app/app.jar
|
||||
ENV NACOSIP=""
|
||||
ENTRYPOINT ["sh","-c","java -Dnacos.ip=$NACOSIP -jar /app/app.jar"]
|
|
@ -12,12 +12,22 @@
|
|||
<artifactId>hbt-vehicle-management-biz</artifactId>
|
||||
|
||||
<dependencies>
|
||||
<dependency>
|
||||
<groupId>io.netty</groupId>
|
||||
<artifactId>netty-all</artifactId>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>com.hbt.onreal</groupId>
|
||||
<artifactId>hbt-onreal-common-rabbit</artifactId>
|
||||
</dependency>
|
||||
|
||||
<!-- SpringCloud Ailibaba Nacos -->
|
||||
<dependency>
|
||||
<groupId>com.alibaba.cloud</groupId>
|
||||
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
|
||||
</dependency>
|
||||
<!-- SpringCloud Ailibaba Nacos Config -->
|
||||
<!-- SpringCloud Ailibaba Nacos NettyTest -->
|
||||
<dependency>
|
||||
<groupId>com.alibaba.cloud</groupId>
|
||||
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>
|
||||
|
|
|
@ -0,0 +1,61 @@
|
|||
package com.hbt.vehicle.management.netty.client;
|
||||
|
||||
import io.netty.bootstrap.Bootstrap;
|
||||
import io.netty.buffer.Unpooled;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelOption;
|
||||
import io.netty.channel.EventLoopGroup;
|
||||
import io.netty.channel.nio.NioEventLoopGroup;
|
||||
import io.netty.channel.socket.nio.NioSocketChannel;
|
||||
import io.netty.util.CharsetUtil;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
@Slf4j
|
||||
public class NettyClient {
|
||||
|
||||
private final String host;
|
||||
|
||||
private final Integer port;
|
||||
|
||||
private ChannelFuture future;
|
||||
|
||||
private EventLoopGroup group;
|
||||
|
||||
public NettyClient(String host, Integer port) {
|
||||
this.host = host;
|
||||
this.port = port;
|
||||
}
|
||||
|
||||
public void run() throws Exception {
|
||||
|
||||
this.group = new NioEventLoopGroup();//I/O线程池
|
||||
|
||||
Bootstrap bs = new Bootstrap();//客户端辅助启动类
|
||||
bs.group(group)
|
||||
.channel(NioSocketChannel.class)//实例化一个Channel
|
||||
.option(ChannelOption.TCP_NODELAY, true)
|
||||
.handler(new NettyClientChannelInitializer());
|
||||
|
||||
//连接到远程节点;等待连接完成
|
||||
this.future = bs.connect(host, port).sync();
|
||||
//连接建立后,都会自动创建一个管道pipeline,这个管道也被称为责任链,保证顺序执行,同时又可以灵活的配置各类Handler,这是一个很精妙的设计,既减少了线程切换带来的资源开销、避免好多麻烦事,同时性能又得到了极大增强
|
||||
|
||||
}
|
||||
|
||||
public void sendMessage(String message) {
|
||||
//发送消息到服务器端,编码格式是utf-8
|
||||
this.future.channel().writeAndFlush(Unpooled.copiedBuffer(message, CharsetUtil.UTF_8));
|
||||
}
|
||||
|
||||
public void closeClient() throws InterruptedException {
|
||||
//阻塞操作,closeFuture()开启了一个channel的监听器(这期间channel在进行各项工作),直到链路断开
|
||||
try {
|
||||
this.future.channel().closeFuture().sync();
|
||||
} catch (InterruptedException e) {
|
||||
log.error("close pipeline error!", e);
|
||||
throw new RuntimeException(e);
|
||||
} finally {
|
||||
this.group.shutdownGracefully().sync();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,14 @@
|
|||
package com.hbt.vehicle.management.netty.client;
|
||||
|
||||
import io.netty.channel.ChannelInitializer;
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
import io.netty.channel.socket.SocketChannel;
|
||||
|
||||
public class NettyClientChannelInitializer extends ChannelInitializer<SocketChannel> {
|
||||
|
||||
@Override
|
||||
protected void initChannel(SocketChannel socketChannel) {
|
||||
ChannelPipeline pipeline = socketChannel.pipeline();
|
||||
pipeline.addLast(new NettyClientHandler());
|
||||
}
|
||||
}
|
|
@ -0,0 +1,16 @@
|
|||
package com.hbt.vehicle.management.netty.client;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.SimpleChannelInboundHandler;
|
||||
import io.netty.util.CharsetUtil;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
@Slf4j
|
||||
public class NettyClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
|
||||
|
||||
@Override
|
||||
protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) {
|
||||
log.info("client accept message:{}", byteBuf.toString(CharsetUtil.UTF_8));
|
||||
}
|
||||
}
|
|
@ -0,0 +1,94 @@
|
|||
package com.hbt.vehicle.management.netty.convert;
|
||||
|
||||
import com.hbt.common.core.utils.DateUtils;
|
||||
import com.hbt.vehicle.management.netty.dto.*;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.util.ArrayList;
|
||||
|
||||
public class NettyConvert {
|
||||
|
||||
public static LocationDto string2Position(String[] content) {
|
||||
int wifiNum = Integer.parseInt(content[11]);
|
||||
LocationDto position = LocationDto
|
||||
.builder()
|
||||
.maker(content[0])
|
||||
.number(content[1])
|
||||
.dataType(content[2])
|
||||
.time(content[3])
|
||||
.dataStatus(content[4])
|
||||
.latitude(new BigDecimal(content[5]))
|
||||
.lat(content[6])
|
||||
.longitude(new BigDecimal(content[7]))
|
||||
.lon(content[8])
|
||||
.speed(new BigDecimal(content[9]))
|
||||
.direction(new BigDecimal(content[10]))
|
||||
.wifiNum(wifiNum)
|
||||
.wifiList(new ArrayList<>())
|
||||
.mcc(content[12 + 2 * wifiNum])
|
||||
.mnc(content[13 + 2 * wifiNum])
|
||||
.ta(Integer.parseInt(content[14 + 2 * wifiNum]))
|
||||
.num(Integer.parseInt(content[15 + 2 * wifiNum]))
|
||||
.stationList(new ArrayList<>())
|
||||
.date(content[content.length - 3])
|
||||
.vehicleStatus(content[content.length - 2])
|
||||
.battery(content[content.length - 1])
|
||||
.build();
|
||||
for (int i = 0; i < wifiNum; i++) {
|
||||
int baseIndex = 11 + 2 * i;
|
||||
Wifi wifi = Wifi.builder().macAddr(content[baseIndex + 1]).rxLev(content[baseIndex + 2]).build();
|
||||
position.getWifiList().add(wifi);
|
||||
}
|
||||
for (int i = 0; i < position.getNum(); i++) {
|
||||
int baseIndex = 15 + wifiNum * 2 + 3 * i;
|
||||
Station station = Station.builder()
|
||||
.lac(content[baseIndex + 1])
|
||||
.cid(content[baseIndex + 2])
|
||||
.rxLev(content[baseIndex + 3])
|
||||
.build();
|
||||
position.getStationList().add(station);
|
||||
}
|
||||
return position;
|
||||
}
|
||||
|
||||
public static String serverAck2String(ServerAck positionAck) {
|
||||
return "*" + positionAck.getMaker() + "," +
|
||||
positionAck.getNumber() + "," +
|
||||
positionAck.getDataType() + "," +
|
||||
DateUtils.parseDateToStr("YYYY-MM-DD HH:MM:SS", positionAck.getDate()) + "," +
|
||||
(positionAck.getInfo() == null ? "" : positionAck.getInfo()) +
|
||||
"#";
|
||||
}
|
||||
|
||||
public static LoginDto string2LoginDto(String[] content) {
|
||||
return LoginDto.builder()
|
||||
.maker(content[0])
|
||||
.number(content[1])
|
||||
.dataType(content[2])
|
||||
.time(content[3])
|
||||
.dataStatus(content[4])
|
||||
.latitude(new BigDecimal(content[5]))
|
||||
.lat(content[6])
|
||||
.longitude(new BigDecimal(content[7]))
|
||||
.lon(content[8])
|
||||
.speed(new BigDecimal(content[9]))
|
||||
.direction(new BigDecimal(content[10]))
|
||||
.date(content[11])
|
||||
.vehicleStatus(content[12])
|
||||
.mcc(content[13])
|
||||
.mnc(content[14])
|
||||
.lac(content[15])
|
||||
.cid(content[16])
|
||||
.iccid(content[17])
|
||||
.build();
|
||||
}
|
||||
|
||||
public static HeartbeatDto convert2HeartbeatDto(String[] content) {
|
||||
return HeartbeatDto.builder()
|
||||
.maker(content[0])
|
||||
.number(content[1])
|
||||
.dataType(content[2])
|
||||
.time(content[3])
|
||||
.build();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,32 @@
|
|||
package com.hbt.vehicle.management.netty.dto;
|
||||
|
||||
import lombok.*;
|
||||
|
||||
@Getter
|
||||
@Setter
|
||||
@Builder
|
||||
@AllArgsConstructor
|
||||
@NoArgsConstructor
|
||||
public class HeartbeatDto {
|
||||
|
||||
/**
|
||||
* 制造商
|
||||
*/
|
||||
private String maker;
|
||||
|
||||
/**
|
||||
* 车载机序列号10位数
|
||||
*/
|
||||
private String number;
|
||||
|
||||
/**
|
||||
* 数据类型
|
||||
*/
|
||||
private String dataType;
|
||||
|
||||
/**
|
||||
* 时间:时/分/秒 HHMMSS 格林威治时间
|
||||
*/
|
||||
private String time;
|
||||
|
||||
}
|
|
@ -0,0 +1,120 @@
|
|||
package com.hbt.vehicle.management.netty.dto;
|
||||
|
||||
import lombok.*;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
import java.util.List;
|
||||
|
||||
@Builder
|
||||
@Getter
|
||||
@Setter
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class LocationDto {
|
||||
|
||||
/**
|
||||
* 制造商
|
||||
*/
|
||||
private String maker;
|
||||
|
||||
/**
|
||||
* 车载机序列号10位数
|
||||
*/
|
||||
private String number;
|
||||
|
||||
/**
|
||||
* 数据类型wifi数据
|
||||
*/
|
||||
private String dataType;
|
||||
|
||||
/**
|
||||
* 时间:时/分/秒 HHMMSS 格林威治时间
|
||||
*/
|
||||
private String time;
|
||||
|
||||
/**
|
||||
* 数据有效位(A/V)
|
||||
*/
|
||||
private String dataStatus;
|
||||
|
||||
/**
|
||||
* 纬度
|
||||
*/
|
||||
private BigDecimal latitude;
|
||||
|
||||
/**
|
||||
* 纬度标志(N:北纬,S:南纬)
|
||||
*/
|
||||
private String lat;
|
||||
|
||||
/**
|
||||
* 经度
|
||||
*/
|
||||
private BigDecimal longitude;
|
||||
|
||||
/**
|
||||
* 经度标志(E:东经,W:西经)
|
||||
*/
|
||||
private String lon;
|
||||
|
||||
/**
|
||||
* 速度
|
||||
*/
|
||||
private BigDecimal speed;
|
||||
|
||||
/**
|
||||
* 方位角,正北为0度,分辨率1度,顺时针方向
|
||||
*/
|
||||
private BigDecimal direction;
|
||||
|
||||
/**
|
||||
* WIFI个数(最多16个以逗号间隔)
|
||||
*/
|
||||
private Integer wifiNum;
|
||||
|
||||
/**
|
||||
* wifi信息
|
||||
*/
|
||||
private List<Wifi> wifiList;
|
||||
|
||||
/**
|
||||
* 移动国家码
|
||||
*/
|
||||
private String mcc;
|
||||
|
||||
/**
|
||||
* 移动网络码
|
||||
*/
|
||||
private String mnc;
|
||||
|
||||
/**
|
||||
* 时间提前量
|
||||
*/
|
||||
private Integer ta;
|
||||
|
||||
/**
|
||||
* 基站个数
|
||||
*/
|
||||
private Integer num;
|
||||
|
||||
/**
|
||||
* 基站信息
|
||||
*/
|
||||
private List<Station> stationList;
|
||||
|
||||
/**
|
||||
* 时间:日/月/年 DDMMYY
|
||||
*/
|
||||
private String date;
|
||||
|
||||
/**
|
||||
* 设备状态
|
||||
*/
|
||||
private String vehicleStatus;
|
||||
|
||||
/**
|
||||
* 电池电压
|
||||
*/
|
||||
private String battery;
|
||||
|
||||
}
|
|
@ -0,0 +1,104 @@
|
|||
package com.hbt.vehicle.management.netty.dto;
|
||||
|
||||
import lombok.*;
|
||||
|
||||
import java.math.BigDecimal;
|
||||
|
||||
@Getter
|
||||
@Setter
|
||||
@Builder
|
||||
@AllArgsConstructor
|
||||
@NoArgsConstructor
|
||||
public class LoginDto {
|
||||
|
||||
/**
|
||||
* 制造商
|
||||
*/
|
||||
private String maker;
|
||||
|
||||
/**
|
||||
* 车载机序列号10位数
|
||||
*/
|
||||
private String number;
|
||||
|
||||
/**
|
||||
* 数据类型
|
||||
*/
|
||||
private String dataType;
|
||||
|
||||
/**
|
||||
* 时间:时/分/秒 HHMMSS 格林威治时间
|
||||
*/
|
||||
private String time;
|
||||
|
||||
/**
|
||||
* 数据有效位(A/V)
|
||||
*/
|
||||
private String dataStatus;
|
||||
|
||||
/**
|
||||
* 纬度
|
||||
*/
|
||||
private BigDecimal latitude;
|
||||
|
||||
/**
|
||||
* 纬度标志(N:北纬,S:南纬)
|
||||
*/
|
||||
private String lat;
|
||||
|
||||
/**
|
||||
* 经度
|
||||
*/
|
||||
private BigDecimal longitude;
|
||||
|
||||
/**
|
||||
* 经度标志(E:东经,W:西经)
|
||||
*/
|
||||
private String lon;
|
||||
|
||||
/**
|
||||
* 速度
|
||||
*/
|
||||
private BigDecimal speed;
|
||||
|
||||
/**
|
||||
* 方位角,正北为0度,分辨率1度,顺时针方向
|
||||
*/
|
||||
private BigDecimal direction;
|
||||
|
||||
/**
|
||||
* 时间:日/月/年 DDMMYY
|
||||
*/
|
||||
private String date;
|
||||
|
||||
/**
|
||||
* 设备状态
|
||||
*/
|
||||
private String vehicleStatus;
|
||||
|
||||
/**
|
||||
* 移动国家码
|
||||
*/
|
||||
private String mcc;
|
||||
|
||||
/**
|
||||
* 移动网络码
|
||||
*/
|
||||
private String mnc;
|
||||
|
||||
/**
|
||||
* 基站位置区域码
|
||||
*/
|
||||
private String lac;
|
||||
|
||||
/**
|
||||
* 基站编号
|
||||
*/
|
||||
private String cid;
|
||||
|
||||
/**
|
||||
* SIM卡ICCID
|
||||
*/
|
||||
private String iccid;
|
||||
|
||||
}
|
|
@ -0,0 +1,39 @@
|
|||
package com.hbt.vehicle.management.netty.dto;
|
||||
|
||||
import lombok.Builder;
|
||||
import lombok.Getter;
|
||||
import lombok.Setter;
|
||||
|
||||
import java.util.Date;
|
||||
|
||||
@Getter
|
||||
@Setter
|
||||
@Builder
|
||||
public class ServerAck {
|
||||
|
||||
/**
|
||||
* 制造商
|
||||
*/
|
||||
private String maker;
|
||||
|
||||
/**
|
||||
* 编号
|
||||
*/
|
||||
private String number;
|
||||
|
||||
/**
|
||||
* 数据类型
|
||||
*/
|
||||
private String dataType;
|
||||
|
||||
/**
|
||||
* 日期
|
||||
*/
|
||||
private Date date;
|
||||
|
||||
/**
|
||||
* 应答主要信息
|
||||
*/
|
||||
private String info;
|
||||
|
||||
}
|
|
@ -0,0 +1,35 @@
|
|||
package com.hbt.vehicle.management.netty.dto;
|
||||
|
||||
import lombok.*;
|
||||
|
||||
@Builder
|
||||
@Getter
|
||||
@Setter
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class Station {
|
||||
|
||||
/**
|
||||
* 基站位置区域码
|
||||
*/
|
||||
private String lac;
|
||||
|
||||
/**
|
||||
* 基站编号
|
||||
*/
|
||||
private String cid;
|
||||
|
||||
/**
|
||||
* 基站接收信号电平(信号强度)
|
||||
*/
|
||||
private String rxLev;
|
||||
//
|
||||
// @Override
|
||||
// public String toString() {
|
||||
// return "Station{" +
|
||||
// "lac='" + lac + '\'' +
|
||||
// ", cid='" + cid + '\'' +
|
||||
// ", rxLev='" + rxLev + '\'' +
|
||||
// '}';
|
||||
// }
|
||||
}
|
|
@ -0,0 +1,29 @@
|
|||
package com.hbt.vehicle.management.netty.dto;
|
||||
|
||||
import lombok.*;
|
||||
|
||||
@Builder
|
||||
@Getter
|
||||
@Setter
|
||||
@NoArgsConstructor
|
||||
@AllArgsConstructor
|
||||
public class Wifi {
|
||||
|
||||
/**
|
||||
* wifi信号mac地址
|
||||
*/
|
||||
private String macAddr;
|
||||
|
||||
/**
|
||||
* wifi信号强度
|
||||
*/
|
||||
private String rxLev;
|
||||
|
||||
// @Override
|
||||
// public String toString() {
|
||||
// return "Wifi{" +
|
||||
// "macAddr='" + macAddr + '\'' +
|
||||
// ", rxLev='" + rxLev + '\'' +
|
||||
// '}';
|
||||
// }
|
||||
}
|
|
@ -0,0 +1,52 @@
|
|||
package com.hbt.vehicle.management.netty.server;
|
||||
|
||||
import com.hbt.vehicle.management.netty.convert.NettyConvert;
|
||||
import com.hbt.vehicle.management.netty.dto.HeartbeatDto;
|
||||
import com.hbt.vehicle.management.netty.dto.ServerAck;
|
||||
import com.hbt.vehicle.management.rabbit.config.RabbitTopicConfig;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.Date;
|
||||
|
||||
@Slf4j
|
||||
public class HeartbeatHandler extends LocateBaseHandler {
|
||||
|
||||
/**
|
||||
* 有客户端发消息会触发此函数
|
||||
* @param ctx ctx
|
||||
*/
|
||||
@Override
|
||||
public void channelRead(ChannelHandlerContext ctx, Object msg) {
|
||||
String[] content = (String[]) msg;
|
||||
|
||||
// 检查数据类型
|
||||
if (!"V7".equals(content[2])) {
|
||||
// 非该处理器处理类型,交给下一个处理器
|
||||
ctx.fireChannelRead(msg);
|
||||
return;
|
||||
}
|
||||
|
||||
// 数据转换
|
||||
HeartbeatDto heartbeatDto = NettyConvert.convert2HeartbeatDto(content);
|
||||
log.info("转换后的报文【{}】", heartbeatDto.toString());
|
||||
|
||||
putMessage2Queue(heartbeatDto);
|
||||
|
||||
// 构建应答消息
|
||||
ServerAck ack = ServerAck.builder()
|
||||
.maker(heartbeatDto.getMaker())
|
||||
.number(heartbeatDto.getNumber())
|
||||
.dataType("I7")
|
||||
.date(new Date())
|
||||
.build();
|
||||
// 响应客户端
|
||||
this.channelWrite(ctx.channel().id(), NettyConvert.serverAck2String(ack));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void putMessage2Queue(Object message) {
|
||||
super.rabbitTemplate.convertAndSend(RabbitTopicConfig.HEARTBEAT_EXCHANGE, RabbitTopicConfig.VEHICLE_HEARTBEAT_TOPIC,
|
||||
message);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,145 @@
|
|||
package com.hbt.vehicle.management.netty.server;
|
||||
|
||||
import com.hbt.common.core.utils.SpringUtils;
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.Unpooled;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.channel.ChannelId;
|
||||
import io.netty.channel.ChannelInboundHandlerAdapter;
|
||||
import io.netty.handler.timeout.IdleState;
|
||||
import io.netty.handler.timeout.IdleStateEvent;
|
||||
import io.netty.util.CharsetUtil;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.amqp.rabbit.core.RabbitTemplate;
|
||||
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
/**
|
||||
* 处理器链的抽象类
|
||||
*/
|
||||
@Slf4j
|
||||
public abstract class LocateBaseHandler extends ChannelInboundHandlerAdapter {
|
||||
|
||||
/**
|
||||
* 管理一个全局map,保存连接进服务端的通道数量
|
||||
*/
|
||||
static final ConcurrentHashMap<ChannelId, ChannelHandlerContext> CHANNEL_MAP = new ConcurrentHashMap<>();
|
||||
|
||||
final RabbitTemplate rabbitTemplate = SpringUtils.getBean(RabbitTemplate.class);
|
||||
|
||||
/**
|
||||
* 有客户端连接服务器会触发此函数
|
||||
* @param ctx ctx
|
||||
*/
|
||||
@Override
|
||||
public void channelActive(ChannelHandlerContext ctx) {
|
||||
|
||||
InetSocketAddress inetSocket = (InetSocketAddress) ctx.channel().remoteAddress();
|
||||
|
||||
String clientIp = inetSocket.getAddress().getHostAddress();
|
||||
int clientPort = inetSocket.getPort();
|
||||
|
||||
//获取连接通道唯一标识
|
||||
ChannelId channelId = ctx.channel().id();
|
||||
|
||||
//如果map中不包含此连接,就保存连接
|
||||
if (CHANNEL_MAP.containsKey(channelId)) {
|
||||
log.info("客户端【" + channelId + "】是连接状态,连接通道数量: " + CHANNEL_MAP.size());
|
||||
} else {
|
||||
//保存连接
|
||||
CHANNEL_MAP.put(channelId, ctx);
|
||||
|
||||
log.info("客户端【" + channelId + "】连接netty服务器[IP:" + clientIp + "--->PORT:" + clientPort + "]");
|
||||
log.info("连接通道数量: " + CHANNEL_MAP.size());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 有客户端终止连接服务器会触发此函数
|
||||
* @param ctx ctx
|
||||
*/
|
||||
@Override
|
||||
public void channelInactive(ChannelHandlerContext ctx) {
|
||||
|
||||
InetSocketAddress inetSocket = (InetSocketAddress) ctx.channel().remoteAddress();
|
||||
|
||||
String clientIp = inetSocket.getAddress().getHostAddress();
|
||||
|
||||
ChannelId channelId = ctx.channel().id();
|
||||
|
||||
//包含此客户端才去删除 // TODO 定时清理假死通道
|
||||
if (CHANNEL_MAP.containsKey(channelId)) {
|
||||
//删除连接
|
||||
CHANNEL_MAP.remove(channelId);
|
||||
|
||||
log.info("客户端【" + channelId + "】退出netty服务器[IP:" + clientIp + "--->PORT:" + inetSocket.getPort() + "]");
|
||||
log.info("连接通道数量: " + CHANNEL_MAP.size());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 服务端给客户端发送消息
|
||||
* @param msg 需要发送的消息内容
|
||||
* @param channelId 连接通道唯一id
|
||||
*/
|
||||
public void channelWrite(ChannelId channelId, String msg) {
|
||||
|
||||
ChannelHandlerContext ctx = CHANNEL_MAP.get(channelId);
|
||||
|
||||
if (ctx == null) {
|
||||
log.info("通道【" + channelId + "】不存在");
|
||||
return;
|
||||
}
|
||||
|
||||
if (msg == null || msg.equals("")) {
|
||||
log.info("服务端响应空的消息");
|
||||
return;
|
||||
}
|
||||
ctx.writeAndFlush(Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8));
|
||||
// TODO 是否需要在应答后关闭链接
|
||||
// ctx.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
|
||||
|
||||
String socketString = ctx.channel().remoteAddress().toString();
|
||||
|
||||
if (evt instanceof IdleStateEvent) {
|
||||
IdleStateEvent event = (IdleStateEvent) evt;
|
||||
if (event.state() == IdleState.READER_IDLE) {
|
||||
log.info("Client: " + socketString + " READER_IDLE 读超时");
|
||||
ctx.disconnect();
|
||||
} else if (event.state() == IdleState.WRITER_IDLE) {
|
||||
log.info("Client: " + socketString + " WRITER_IDLE 写超时");
|
||||
ctx.disconnect();
|
||||
} else if (event.state() == IdleState.ALL_IDLE) {
|
||||
log.info("Client: " + socketString + " ALL_IDLE 总超时");
|
||||
ctx.disconnect();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 发生异常会触发此函数
|
||||
* @param ctx ctx
|
||||
*/
|
||||
@Override
|
||||
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
|
||||
ctx.close();
|
||||
log.info(ctx.channel().id() + " 发生了错误,此连接被关闭" + "此时连通数量: " + CHANNEL_MAP.size());
|
||||
cause.printStackTrace();
|
||||
}
|
||||
|
||||
public String[] readMessage(Object msg) {
|
||||
ByteBuf byteBuf = (ByteBuf)msg;
|
||||
byte[] bytes = new byte[byteBuf.readableBytes()];
|
||||
byteBuf.readBytes(bytes);
|
||||
String message = new String(bytes);
|
||||
log.info("客户端报文【{}】", message);
|
||||
return message.split(",");
|
||||
}
|
||||
|
||||
public abstract void putMessage2Queue(Object message);
|
||||
}
|
|
@ -0,0 +1,55 @@
|
|||
package com.hbt.vehicle.management.netty.server;
|
||||
|
||||
import com.hbt.vehicle.management.netty.convert.NettyConvert;
|
||||
import com.hbt.vehicle.management.netty.dto.LocationDto;
|
||||
import com.hbt.vehicle.management.netty.dto.ServerAck;
|
||||
import com.hbt.vehicle.management.rabbit.config.RabbitTopicConfig;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.Date;
|
||||
|
||||
@Slf4j
|
||||
public class LocateHandler extends LocateBaseHandler {
|
||||
|
||||
/**
|
||||
* 有客户端发消息会触发此函数
|
||||
* @param ctx ctx
|
||||
*/
|
||||
@Override
|
||||
public void channelRead(ChannelHandlerContext ctx, Object msg) {
|
||||
String[] content = (String[]) msg;
|
||||
|
||||
// 检查数据类型
|
||||
if (!"V5".equals(content[2])) {
|
||||
throw new RuntimeException("无法识别的报文!");
|
||||
}
|
||||
|
||||
// 数据转换
|
||||
LocationDto locationDto = NettyConvert.string2Position(content);
|
||||
log.info("转换后的报文【{}】", locationDto.toString());
|
||||
|
||||
putMessage2Queue(locationDto);
|
||||
|
||||
// 构建应答消息
|
||||
ServerAck ack = ServerAck.builder()
|
||||
.maker(locationDto.getMaker())
|
||||
.number(locationDto.getNumber())
|
||||
.dataType("J5")
|
||||
.date(new Date())
|
||||
.build();
|
||||
|
||||
// TODO 等待业务处理结果
|
||||
ack.setInfo("1:100");
|
||||
|
||||
// 响应客户端
|
||||
this.channelWrite(ctx.channel().id(), NettyConvert.serverAck2String(ack));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void putMessage2Queue(Object message) {
|
||||
super.rabbitTemplate.convertAndSend(RabbitTopicConfig.LOCATION_EXCHANGE, RabbitTopicConfig.VEHICLE_LOCATION_TOPIC,
|
||||
message);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,53 @@
|
|||
package com.hbt.vehicle.management.netty.server;
|
||||
|
||||
import com.hbt.vehicle.management.netty.convert.NettyConvert;
|
||||
import com.hbt.vehicle.management.netty.dto.LoginDto;
|
||||
import com.hbt.vehicle.management.netty.dto.ServerAck;
|
||||
import com.hbt.vehicle.management.rabbit.config.RabbitTopicConfig;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
import java.util.Date;
|
||||
|
||||
@Slf4j
|
||||
public class LoginHandler extends LocateBaseHandler {
|
||||
|
||||
/**
|
||||
* 有客户端发消息会触发此函数
|
||||
* @param ctx ctx
|
||||
*/
|
||||
@Override
|
||||
public void channelRead(ChannelHandlerContext ctx, Object msg) {
|
||||
String[] content = readMessage(msg);
|
||||
log.info("通道【{}】,客户端报文【{}】", ctx.channel().id(), content);
|
||||
|
||||
// 检查数据类型
|
||||
if (!"V6".equals(content[2])) {
|
||||
// 非该处理器处理类型,交给下一个处理器
|
||||
ctx.fireChannelRead(content);
|
||||
return;
|
||||
}
|
||||
|
||||
// 数据转换
|
||||
LoginDto loginDto = NettyConvert.string2LoginDto(content);
|
||||
log.info("转换后的报文【{}】", loginDto.toString());
|
||||
|
||||
putMessage2Queue(loginDto);
|
||||
|
||||
// 构建应答消息
|
||||
ServerAck ack = ServerAck.builder()
|
||||
.maker(loginDto.getMaker())
|
||||
.number(loginDto.getNumber())
|
||||
.dataType("J6")
|
||||
.date(new Date())
|
||||
.build();
|
||||
// 响应客户端
|
||||
this.channelWrite(ctx.channel().id(), NettyConvert.serverAck2String(ack));
|
||||
}
|
||||
|
||||
@Override
|
||||
public void putMessage2Queue(Object message) {
|
||||
super.rabbitTemplate.convertAndSend(RabbitTopicConfig.LOGIN_EXCHANGE, RabbitTopicConfig.VEHICLE_LOGIN_TOPIC,
|
||||
message);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,45 @@
|
|||
package com.hbt.vehicle.management.netty.server;
|
||||
|
||||
import io.netty.bootstrap.ServerBootstrap;
|
||||
import io.netty.channel.ChannelFuture;
|
||||
import io.netty.channel.ChannelOption;
|
||||
import io.netty.channel.EventLoopGroup;
|
||||
import io.netty.channel.nio.NioEventLoopGroup;
|
||||
import io.netty.channel.socket.nio.NioServerSocketChannel;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
public class NettyServer{
|
||||
|
||||
@Value("${netty-port}")
|
||||
private Integer port;
|
||||
|
||||
public void start() {
|
||||
//配置服务端的NIO线程组
|
||||
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
|
||||
EventLoopGroup workerGroup = new NioEventLoopGroup();
|
||||
|
||||
try {
|
||||
ServerBootstrap bootstrap = new ServerBootstrap()
|
||||
.group(bossGroup, workerGroup) // 绑定线程池
|
||||
.channel(NioServerSocketChannel.class)
|
||||
// .localAddress(address)
|
||||
.childHandler(new NettyServerChannelInitializer())
|
||||
.option(ChannelOption.SO_BACKLOG, 128) //服务端接受连接的队列长度,如果队列已满,客户端连接将被拒绝
|
||||
.childOption(ChannelOption.SO_KEEPALIVE, true); //保持长连接,2小时无数据激活心跳机制
|
||||
|
||||
// 绑定端口,开始接收进来的连接
|
||||
ChannelFuture future = bootstrap.bind(port).sync();
|
||||
log.info("netty服务器开始监听端口:" + port);
|
||||
//关闭channel和块,直到它被关闭
|
||||
future.channel().closeFuture().sync();
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
bossGroup.shutdownGracefully();
|
||||
workerGroup.shutdownGracefully();
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,20 @@
|
|||
package com.hbt.vehicle.management.netty.server;
|
||||
|
||||
import io.netty.channel.ChannelInitializer;
|
||||
import io.netty.channel.ChannelPipeline;
|
||||
import io.netty.channel.socket.SocketChannel;
|
||||
|
||||
public class NettyServerChannelInitializer extends ChannelInitializer<SocketChannel> {
|
||||
|
||||
@Override
|
||||
protected void initChannel(SocketChannel channel) {
|
||||
//获取管道
|
||||
ChannelPipeline pipeline = channel.pipeline();
|
||||
pipeline.addLast(new UnpackingDecoder()); // 解码
|
||||
|
||||
pipeline.addLast(new LoginHandler());
|
||||
pipeline.addLast(new HeartbeatHandler());
|
||||
pipeline.addLast(new LocateHandler());
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,53 @@
|
|||
package com.hbt.vehicle.management.netty.server;
|
||||
|
||||
import io.netty.buffer.ByteBuf;
|
||||
import io.netty.buffer.Unpooled;
|
||||
import io.netty.channel.ChannelHandlerContext;
|
||||
import io.netty.handler.codec.ByteToMessageDecoder;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public class UnpackingDecoder extends ByteToMessageDecoder {
|
||||
|
||||
private static final ByteBuf beginDelimiter = Unpooled.wrappedBuffer("*".getBytes());
|
||||
private static final ByteBuf endDelimiter = Unpooled.wrappedBuffer("#".getBytes());
|
||||
|
||||
@Override
|
||||
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) {
|
||||
int beginDelimiterLength = beginDelimiter.readableBytes(); // 开始标识字节长度
|
||||
int endDelimiterLength = endDelimiter.readableBytes(); // 结束标识字节长度
|
||||
|
||||
int beginIdx = indexOf(byteBuf, beginDelimiter); // 开始标识下标
|
||||
int endIdx = indexOf(byteBuf, endDelimiter); // 结束标识下标
|
||||
|
||||
// 如果缓存区找不到开始标识,跳过该段字节
|
||||
if (beginIdx == -1) {
|
||||
byteBuf.readerIndex(byteBuf.writerIndex());
|
||||
return;
|
||||
}
|
||||
|
||||
// 如果缓冲区找不到结束下标,或者下标小于开始标识,则从开始标识开始读取
|
||||
if (endIdx == -1 || endIdx < beginIdx) {
|
||||
byteBuf.readerIndex(beginIdx);
|
||||
return;
|
||||
}
|
||||
|
||||
ByteBuf frame = byteBuf.retainedSlice(beginIdx + beginDelimiterLength, endIdx - beginIdx - beginDelimiterLength); // 将标识之间的数据提取出来,给后面的解码器使用
|
||||
list.add(frame);
|
||||
byteBuf.readerIndex(endIdx + endDelimiterLength); // 更新读取下标
|
||||
}
|
||||
|
||||
private static int indexOf(ByteBuf haystack, ByteBuf needle) {
|
||||
for (int i = haystack.readerIndex(); i < haystack.writerIndex(); i++) {
|
||||
int haystackIndex = i;
|
||||
int needleIndex;
|
||||
for (needleIndex = 0; needleIndex < needle.capacity(); needleIndex++) {
|
||||
if (haystack.getByte(haystackIndex) != needle.getByte(needleIndex)) break;
|
||||
haystackIndex++;
|
||||
if (haystackIndex == haystack.writerIndex() && needleIndex != needle.capacity() - 1) return -1;
|
||||
}
|
||||
if (needleIndex == needle.capacity()) return i - haystack.readerIndex();
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,96 @@
|
|||
package com.hbt.vehicle.management.rabbit.config;
|
||||
|
||||
import org.springframework.amqp.core.Binding;
|
||||
import org.springframework.amqp.core.BindingBuilder;
|
||||
import org.springframework.amqp.core.Queue;
|
||||
import org.springframework.amqp.core.TopicExchange;
|
||||
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
|
||||
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
|
||||
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
|
||||
import org.springframework.context.annotation.Bean;
|
||||
import org.springframework.context.annotation.Configuration;
|
||||
|
||||
/**
|
||||
* MQ 队列配置类
|
||||
*
|
||||
* @author yx
|
||||
* @date 2023/11/21 17:54
|
||||
*/
|
||||
@Configuration
|
||||
public class RabbitTopicConfig {
|
||||
|
||||
// 交换机
|
||||
public static final String LOGIN_EXCHANGE = "vehicleLoginTopicExchange";
|
||||
|
||||
public static final String HEARTBEAT_EXCHANGE = "vehicleHeartbeatTopicExchange";
|
||||
|
||||
public static final String LOCATION_EXCHANGE = "vehicleLocationTopicExchange";
|
||||
|
||||
// 队列
|
||||
public static final String SERVICE_ID = "hbt.vehicle";
|
||||
|
||||
//并发数量
|
||||
public static final int DEFAULT_CONCURRENT = 10;
|
||||
|
||||
// 定位信息topic
|
||||
public static final String VEHICLE_LOCATION_TOPIC = "vehicle.location";
|
||||
|
||||
public static final String VEHICLE_LOGIN_TOPIC = "vehicle.login";
|
||||
|
||||
public static final String VEHICLE_HEARTBEAT_TOPIC = "vehicle.heartbeat";
|
||||
|
||||
@Bean
|
||||
public TopicExchange loginExchange() {
|
||||
return new TopicExchange(LOGIN_EXCHANGE);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public TopicExchange heartbeatExchange() {
|
||||
return new TopicExchange(HEARTBEAT_EXCHANGE);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public TopicExchange locationExchange() {
|
||||
return new TopicExchange(LOCATION_EXCHANGE);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Queue loginMessage() {
|
||||
return new Queue(SERVICE_ID + "." + VEHICLE_LOGIN_TOPIC);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Queue heartbeatMessage() {
|
||||
return new Queue(SERVICE_ID + "." + VEHICLE_HEARTBEAT_TOPIC);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Queue locationMessage() {
|
||||
return new Queue(SERVICE_ID + "." + VEHICLE_LOCATION_TOPIC);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Binding bindingExchangeLoginMessage(Queue loginMessage, TopicExchange loginExchange) {
|
||||
return BindingBuilder.bind(loginMessage).to(loginExchange).with(RabbitTopicConfig.VEHICLE_LOGIN_TOPIC);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Binding bindingExchangeHeartbeatMessage(Queue heartbeatMessage, TopicExchange heartbeatExchange) {
|
||||
return BindingBuilder.bind(heartbeatMessage).to(heartbeatExchange).with(RabbitTopicConfig.VEHICLE_HEARTBEAT_TOPIC);
|
||||
}
|
||||
|
||||
@Bean
|
||||
public Binding bindingExchangeLocationMessage(Queue locationMessage, TopicExchange locationExchange) {
|
||||
return BindingBuilder.bind(locationMessage).to(locationExchange).with(RabbitTopicConfig.VEHICLE_LOCATION_TOPIC);
|
||||
}
|
||||
|
||||
@Bean("customContainerFactory")
|
||||
public SimpleRabbitListenerContainerFactory containerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,
|
||||
ConnectionFactory connectionFactory) {
|
||||
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
|
||||
factory.setConcurrentConsumers(DEFAULT_CONCURRENT);
|
||||
factory.setMaxConcurrentConsumers(DEFAULT_CONCURRENT);
|
||||
configurer.configure(factory, connectionFactory);
|
||||
return factory;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,46 @@
|
|||
package com.hbt.vehicle.management.rabbit.consumer;
|
||||
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.hbt.vehicle.management.netty.dto.HeartbeatDto;
|
||||
import com.hbt.vehicle.management.netty.dto.LocationDto;
|
||||
import com.hbt.vehicle.management.netty.dto.LoginDto;
|
||||
import com.hbt.vehicle.management.rabbit.config.RabbitTopicConfig;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.springframework.amqp.rabbit.annotation.RabbitListener;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
@Slf4j
|
||||
@Component
|
||||
public class LocationConsumer {
|
||||
|
||||
@RabbitListener(queues = RabbitTopicConfig.SERVICE_ID + "." + RabbitTopicConfig.VEHICLE_LOGIN_TOPIC,
|
||||
containerFactory = "customContainerFactory")
|
||||
public void locationMessage(LoginDto message) {
|
||||
try {
|
||||
log.info("收到登录MQ数据:{}", JSON.toJSONString(message));
|
||||
} catch (Exception e) {
|
||||
log.error("登录数据处理失败:{}", JSON.toJSONString(message), e);
|
||||
}
|
||||
}
|
||||
|
||||
@RabbitListener(queues = RabbitTopicConfig.SERVICE_ID + "." + RabbitTopicConfig.VEHICLE_HEARTBEAT_TOPIC,
|
||||
containerFactory = "customContainerFactory")
|
||||
public void heartbeatMessage(HeartbeatDto message) {
|
||||
try {
|
||||
log.info("收到心跳MQ数据:{}", JSON.toJSONString(message));
|
||||
} catch (Exception e) {
|
||||
log.error("心跳数据处理失败:{}", JSON.toJSONString(message), e);
|
||||
}
|
||||
}
|
||||
|
||||
@RabbitListener(queues = RabbitTopicConfig.SERVICE_ID + "." + RabbitTopicConfig.VEHICLE_LOCATION_TOPIC,
|
||||
containerFactory = "customContainerFactory")
|
||||
public void positionMessage(LocationDto message) {
|
||||
try {
|
||||
log.info("收到车辆定位MQ数据:{}", JSON.toJSONString(message));
|
||||
} catch (Exception e) {
|
||||
log.error("车辆定位数据处理失败:{}", JSON.toJSONString(message), e);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,22 @@
|
|||
package com.hbt.vehicle.management.rabbit.task;
|
||||
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
@Component
|
||||
public class LocationThreadPool {
|
||||
|
||||
private final ThreadPoolExecutor executor = new ThreadPoolExecutor(
|
||||
10,
|
||||
10,
|
||||
10,
|
||||
TimeUnit.HOURS,
|
||||
new LinkedBlockingQueue<>(),
|
||||
new ThreadPoolExecutor.CallerRunsPolicy());
|
||||
|
||||
public ThreadPoolExecutor getExecutor = this.executor;
|
||||
|
||||
}
|
|
@ -3,10 +3,14 @@ package com.hbt.vehicle.management;
|
|||
import com.hbt.common.security.annotation.EnableCustomConfig;
|
||||
import com.hbt.common.security.annotation.EnableRyFeignClients;
|
||||
import com.hbt.common.swagger.annotation.EnableCustomSwagger2;
|
||||
import com.hbt.vehicle.management.netty.server.NettyServer;
|
||||
import org.springframework.boot.CommandLineRunner;
|
||||
import org.springframework.boot.SpringApplication;
|
||||
import org.springframework.boot.web.servlet.ServletComponentScan;
|
||||
import org.springframework.cloud.client.SpringCloudApplication;
|
||||
|
||||
import javax.annotation.Resource;
|
||||
|
||||
/**
|
||||
* 入口类, 扫描并注入其他配置类和服务
|
||||
*/
|
||||
|
@ -15,11 +19,19 @@ import org.springframework.cloud.client.SpringCloudApplication;
|
|||
@EnableCustomSwagger2
|
||||
@EnableRyFeignClients
|
||||
@ServletComponentScan(basePackages = "com.hbt.vehicle.management")
|
||||
public class vehicleManagementApplication {
|
||||
public class vehicleManagementApplication implements CommandLineRunner{
|
||||
|
||||
@Resource
|
||||
private NettyServer nettyServer;
|
||||
|
||||
public static void main(String[] args)
|
||||
{
|
||||
SpringApplication.run(vehicleManagementApplication.class, args);
|
||||
System.out.println("(♥◠‿◠)ノ゙ 车辆管理系统启动成功 ლ(´ڡ`ლ)゙ ");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run(String... args) {
|
||||
nettyServer.start();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -11,4 +11,4 @@ spring:
|
|||
active: dev
|
||||
|
||||
nacos:
|
||||
ip: 119.45.158.12
|
||||
ip: 192.168.1.211
|
||||
|
|
|
@ -0,0 +1,19 @@
|
|||
import com.hbt.vehicle.management.netty.client.NettyClient;
|
||||
|
||||
import java.util.Scanner;
|
||||
|
||||
public class NettyTest {
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
NettyClient nettyClient = new NettyClient("127.0.0.1", 8886);
|
||||
nettyClient.run();
|
||||
Scanner scanner = new Scanner(System.in);
|
||||
while (scanner.hasNext()) {
|
||||
String s = scanner.next();
|
||||
if (s.equals("exit")) {
|
||||
break;
|
||||
}
|
||||
nettyClient.sendMessage(s);
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue