From 0a40b18f5c2418526d85dcbef1f642ccea5d427b Mon Sep 17 00:00:00 2001 From: yinxing Date: Wed, 22 Nov 2023 17:59:22 +0800 Subject: [PATCH] =?UTF-8?q?feat:=201=E3=80=81=E6=95=B0=E6=8D=AE=E8=AF=B7?= =?UTF-8?q?=E6=B1=82=E5=92=8C=E5=A4=84=E7=90=86?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- hbt-vehicle-management-biz/Dockerfile | 2 +- hbt-vehicle-management-biz/pom.xml | 12 +- .../management/netty/client/NettyClient.java | 61 ++++++++ .../client/NettyClientChannelInitializer.java | 14 ++ .../netty/client/NettyClientHandler.java | 16 ++ .../netty/convert/NettyConvert.java | 94 ++++++++++++ .../management/netty/dto/HeartbeatDto.java | 32 ++++ .../management/netty/dto/LocationDto.java | 120 +++++++++++++++ .../management/netty/dto/LoginDto.java | 104 +++++++++++++ .../management/netty/dto/ServerAck.java | 39 +++++ .../vehicle/management/netty/dto/Station.java | 35 +++++ .../vehicle/management/netty/dto/Wifi.java | 29 ++++ .../netty/server/HeartbeatHandler.java | 52 +++++++ .../netty/server/LocateBaseHandler.java | 145 ++++++++++++++++++ .../netty/server/LocateHandler.java | 55 +++++++ .../management/netty/server/LoginHandler.java | 53 +++++++ .../management/netty/server/NettyServer.java | 45 ++++++ .../server/NettyServerChannelInitializer.java | 20 +++ .../netty/server/UnpackingDecoder.java | 53 +++++++ .../rabbit/config/RabbitTopicConfig.java | 96 ++++++++++++ .../rabbit/consumer/LocationConsumer.java | 46 ++++++ .../rabbit/task/LocationThreadPool.java | 22 +++ .../vehicleManagementApplication.java | 14 +- .../src/main/resources/bootstrap.yml | 2 +- .../src/test/java/NettyTest.java | 19 +++ 25 files changed, 1176 insertions(+), 4 deletions(-) create mode 100644 hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/client/NettyClient.java create mode 100644 hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/client/NettyClientChannelInitializer.java create mode 100644 hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/client/NettyClientHandler.java create mode 100644 hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/convert/NettyConvert.java create mode 100644 hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/dto/HeartbeatDto.java create mode 100644 hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/dto/LocationDto.java create mode 100644 hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/dto/LoginDto.java create mode 100644 hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/dto/ServerAck.java create mode 100644 hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/dto/Station.java create mode 100644 hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/dto/Wifi.java create mode 100644 hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/server/HeartbeatHandler.java create mode 100644 hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/server/LocateBaseHandler.java create mode 100644 hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/server/LocateHandler.java create mode 100644 hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/server/LoginHandler.java create mode 100644 hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/server/NettyServer.java create mode 100644 hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/server/NettyServerChannelInitializer.java create mode 100644 hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/server/UnpackingDecoder.java create mode 100644 hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/rabbit/config/RabbitTopicConfig.java create mode 100644 hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/rabbit/consumer/LocationConsumer.java create mode 100644 hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/rabbit/task/LocationThreadPool.java create mode 100644 hbt-vehicle-management-biz/src/test/java/NettyTest.java diff --git a/hbt-vehicle-management-biz/Dockerfile b/hbt-vehicle-management-biz/Dockerfile index 965d09a..d1dd9e9 100644 --- a/hbt-vehicle-management-biz/Dockerfile +++ b/hbt-vehicle-management-biz/Dockerfile @@ -1,4 +1,4 @@ FROM 119.45.158.12:5000/mini-java8:latest -COPY target/hbt-template-1.0-SNAPSHOT.jar /app/app.jar +COPY target/hbt-vehicle-management-biz-1.0-SNAPSHOT.jar /app/app.jar ENV NACOSIP="" ENTRYPOINT ["sh","-c","java -Dnacos.ip=$NACOSIP -jar /app/app.jar"] \ No newline at end of file diff --git a/hbt-vehicle-management-biz/pom.xml b/hbt-vehicle-management-biz/pom.xml index 960125f..bb4f190 100644 --- a/hbt-vehicle-management-biz/pom.xml +++ b/hbt-vehicle-management-biz/pom.xml @@ -12,12 +12,22 @@ hbt-vehicle-management-biz + + io.netty + netty-all + + + + com.hbt.onreal + hbt-onreal-common-rabbit + + com.alibaba.cloud spring-cloud-starter-alibaba-nacos-discovery - + com.alibaba.cloud spring-cloud-starter-alibaba-nacos-config diff --git a/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/client/NettyClient.java b/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/client/NettyClient.java new file mode 100644 index 0000000..0ced179 --- /dev/null +++ b/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/client/NettyClient.java @@ -0,0 +1,61 @@ +package com.hbt.vehicle.management.netty.client; + +import io.netty.bootstrap.Bootstrap; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioSocketChannel; +import io.netty.util.CharsetUtil; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class NettyClient { + + private final String host; + + private final Integer port; + + private ChannelFuture future; + + private EventLoopGroup group; + + public NettyClient(String host, Integer port) { + this.host = host; + this.port = port; + } + + public void run() throws Exception { + + this.group = new NioEventLoopGroup();//I/O线程池 + + Bootstrap bs = new Bootstrap();//客户端辅助启动类 + bs.group(group) + .channel(NioSocketChannel.class)//实例化一个Channel + .option(ChannelOption.TCP_NODELAY, true) + .handler(new NettyClientChannelInitializer()); + + //连接到远程节点;等待连接完成 + this.future = bs.connect(host, port).sync(); + //连接建立后,都会自动创建一个管道pipeline,这个管道也被称为责任链,保证顺序执行,同时又可以灵活的配置各类Handler,这是一个很精妙的设计,既减少了线程切换带来的资源开销、避免好多麻烦事,同时性能又得到了极大增强 + + } + + public void sendMessage(String message) { + //发送消息到服务器端,编码格式是utf-8 + this.future.channel().writeAndFlush(Unpooled.copiedBuffer(message, CharsetUtil.UTF_8)); + } + + public void closeClient() throws InterruptedException { + //阻塞操作,closeFuture()开启了一个channel的监听器(这期间channel在进行各项工作),直到链路断开 + try { + this.future.channel().closeFuture().sync(); + } catch (InterruptedException e) { + log.error("close pipeline error!", e); + throw new RuntimeException(e); + } finally { + this.group.shutdownGracefully().sync(); + } + } +} diff --git a/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/client/NettyClientChannelInitializer.java b/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/client/NettyClientChannelInitializer.java new file mode 100644 index 0000000..39b6622 --- /dev/null +++ b/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/client/NettyClientChannelInitializer.java @@ -0,0 +1,14 @@ +package com.hbt.vehicle.management.netty.client; + +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.socket.SocketChannel; + +public class NettyClientChannelInitializer extends ChannelInitializer { + + @Override + protected void initChannel(SocketChannel socketChannel) { + ChannelPipeline pipeline = socketChannel.pipeline(); + pipeline.addLast(new NettyClientHandler()); + } +} diff --git a/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/client/NettyClientHandler.java b/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/client/NettyClientHandler.java new file mode 100644 index 0000000..12939d0 --- /dev/null +++ b/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/client/NettyClientHandler.java @@ -0,0 +1,16 @@ +package com.hbt.vehicle.management.netty.client; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.SimpleChannelInboundHandler; +import io.netty.util.CharsetUtil; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class NettyClientHandler extends SimpleChannelInboundHandler { + + @Override + protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) { + log.info("client accept message:{}", byteBuf.toString(CharsetUtil.UTF_8)); + } +} diff --git a/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/convert/NettyConvert.java b/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/convert/NettyConvert.java new file mode 100644 index 0000000..4ae5b76 --- /dev/null +++ b/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/convert/NettyConvert.java @@ -0,0 +1,94 @@ +package com.hbt.vehicle.management.netty.convert; + +import com.hbt.common.core.utils.DateUtils; +import com.hbt.vehicle.management.netty.dto.*; + +import java.math.BigDecimal; +import java.util.ArrayList; + +public class NettyConvert { + + public static LocationDto string2Position(String[] content) { + int wifiNum = Integer.parseInt(content[11]); + LocationDto position = LocationDto + .builder() + .maker(content[0]) + .number(content[1]) + .dataType(content[2]) + .time(content[3]) + .dataStatus(content[4]) + .latitude(new BigDecimal(content[5])) + .lat(content[6]) + .longitude(new BigDecimal(content[7])) + .lon(content[8]) + .speed(new BigDecimal(content[9])) + .direction(new BigDecimal(content[10])) + .wifiNum(wifiNum) + .wifiList(new ArrayList<>()) + .mcc(content[12 + 2 * wifiNum]) + .mnc(content[13 + 2 * wifiNum]) + .ta(Integer.parseInt(content[14 + 2 * wifiNum])) + .num(Integer.parseInt(content[15 + 2 * wifiNum])) + .stationList(new ArrayList<>()) + .date(content[content.length - 3]) + .vehicleStatus(content[content.length - 2]) + .battery(content[content.length - 1]) + .build(); + for (int i = 0; i < wifiNum; i++) { + int baseIndex = 11 + 2 * i; + Wifi wifi = Wifi.builder().macAddr(content[baseIndex + 1]).rxLev(content[baseIndex + 2]).build(); + position.getWifiList().add(wifi); + } + for (int i = 0; i < position.getNum(); i++) { + int baseIndex = 15 + wifiNum * 2 + 3 * i; + Station station = Station.builder() + .lac(content[baseIndex + 1]) + .cid(content[baseIndex + 2]) + .rxLev(content[baseIndex + 3]) + .build(); + position.getStationList().add(station); + } + return position; + } + + public static String serverAck2String(ServerAck positionAck) { + return "*" + positionAck.getMaker() + "," + + positionAck.getNumber() + "," + + positionAck.getDataType() + "," + + DateUtils.parseDateToStr("YYYY-MM-DD HH:MM:SS", positionAck.getDate()) + "," + + (positionAck.getInfo() == null ? "" : positionAck.getInfo()) + + "#"; + } + + public static LoginDto string2LoginDto(String[] content) { + return LoginDto.builder() + .maker(content[0]) + .number(content[1]) + .dataType(content[2]) + .time(content[3]) + .dataStatus(content[4]) + .latitude(new BigDecimal(content[5])) + .lat(content[6]) + .longitude(new BigDecimal(content[7])) + .lon(content[8]) + .speed(new BigDecimal(content[9])) + .direction(new BigDecimal(content[10])) + .date(content[11]) + .vehicleStatus(content[12]) + .mcc(content[13]) + .mnc(content[14]) + .lac(content[15]) + .cid(content[16]) + .iccid(content[17]) + .build(); + } + + public static HeartbeatDto convert2HeartbeatDto(String[] content) { + return HeartbeatDto.builder() + .maker(content[0]) + .number(content[1]) + .dataType(content[2]) + .time(content[3]) + .build(); + } +} diff --git a/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/dto/HeartbeatDto.java b/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/dto/HeartbeatDto.java new file mode 100644 index 0000000..81f3a35 --- /dev/null +++ b/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/dto/HeartbeatDto.java @@ -0,0 +1,32 @@ +package com.hbt.vehicle.management.netty.dto; + +import lombok.*; + +@Getter +@Setter +@Builder +@AllArgsConstructor +@NoArgsConstructor +public class HeartbeatDto { + + /** + * 制造商 + */ + private String maker; + + /** + * 车载机序列号10位数 + */ + private String number; + + /** + * 数据类型 + */ + private String dataType; + + /** + * 时间:时/分/秒 HHMMSS 格林威治时间 + */ + private String time; + +} diff --git a/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/dto/LocationDto.java b/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/dto/LocationDto.java new file mode 100644 index 0000000..927859e --- /dev/null +++ b/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/dto/LocationDto.java @@ -0,0 +1,120 @@ +package com.hbt.vehicle.management.netty.dto; + +import lombok.*; + +import java.math.BigDecimal; +import java.util.List; + +@Builder +@Getter +@Setter +@NoArgsConstructor +@AllArgsConstructor +public class LocationDto { + + /** + * 制造商 + */ + private String maker; + + /** + * 车载机序列号10位数 + */ + private String number; + + /** + * 数据类型wifi数据 + */ + private String dataType; + + /** + * 时间:时/分/秒 HHMMSS 格林威治时间 + */ + private String time; + + /** + * 数据有效位(A/V) + */ + private String dataStatus; + + /** + * 纬度 + */ + private BigDecimal latitude; + + /** + * 纬度标志(N:北纬,S:南纬) + */ + private String lat; + + /** + * 经度 + */ + private BigDecimal longitude; + + /** + * 经度标志(E:东经,W:西经) + */ + private String lon; + + /** + * 速度 + */ + private BigDecimal speed; + + /** + * 方位角,正北为0度,分辨率1度,顺时针方向 + */ + private BigDecimal direction; + + /** + * WIFI个数(最多16个以逗号间隔) + */ + private Integer wifiNum; + + /** + * wifi信息 + */ + private List wifiList; + + /** + * 移动国家码 + */ + private String mcc; + + /** + * 移动网络码 + */ + private String mnc; + + /** + * 时间提前量 + */ + private Integer ta; + + /** + * 基站个数 + */ + private Integer num; + + /** + * 基站信息 + */ + private List stationList; + + /** + * 时间:日/月/年 DDMMYY + */ + private String date; + + /** + * 设备状态 + */ + private String vehicleStatus; + + /** + * 电池电压 + */ + private String battery; + +} diff --git a/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/dto/LoginDto.java b/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/dto/LoginDto.java new file mode 100644 index 0000000..d82e327 --- /dev/null +++ b/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/dto/LoginDto.java @@ -0,0 +1,104 @@ +package com.hbt.vehicle.management.netty.dto; + +import lombok.*; + +import java.math.BigDecimal; + +@Getter +@Setter +@Builder +@AllArgsConstructor +@NoArgsConstructor +public class LoginDto { + + /** + * 制造商 + */ + private String maker; + + /** + * 车载机序列号10位数 + */ + private String number; + + /** + * 数据类型 + */ + private String dataType; + + /** + * 时间:时/分/秒 HHMMSS 格林威治时间 + */ + private String time; + + /** + * 数据有效位(A/V) + */ + private String dataStatus; + + /** + * 纬度 + */ + private BigDecimal latitude; + + /** + * 纬度标志(N:北纬,S:南纬) + */ + private String lat; + + /** + * 经度 + */ + private BigDecimal longitude; + + /** + * 经度标志(E:东经,W:西经) + */ + private String lon; + + /** + * 速度 + */ + private BigDecimal speed; + + /** + * 方位角,正北为0度,分辨率1度,顺时针方向 + */ + private BigDecimal direction; + + /** + * 时间:日/月/年 DDMMYY + */ + private String date; + + /** + * 设备状态 + */ + private String vehicleStatus; + + /** + * 移动国家码 + */ + private String mcc; + + /** + * 移动网络码 + */ + private String mnc; + + /** + * 基站位置区域码 + */ + private String lac; + + /** + * 基站编号 + */ + private String cid; + + /** + * SIM卡ICCID + */ + private String iccid; + +} diff --git a/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/dto/ServerAck.java b/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/dto/ServerAck.java new file mode 100644 index 0000000..b948367 --- /dev/null +++ b/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/dto/ServerAck.java @@ -0,0 +1,39 @@ +package com.hbt.vehicle.management.netty.dto; + +import lombok.Builder; +import lombok.Getter; +import lombok.Setter; + +import java.util.Date; + +@Getter +@Setter +@Builder +public class ServerAck { + + /** + * 制造商 + */ + private String maker; + + /** + * 编号 + */ + private String number; + + /** + * 数据类型 + */ + private String dataType; + + /** + * 日期 + */ + private Date date; + + /** + * 应答主要信息 + */ + private String info; + +} diff --git a/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/dto/Station.java b/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/dto/Station.java new file mode 100644 index 0000000..b3ffc4b --- /dev/null +++ b/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/dto/Station.java @@ -0,0 +1,35 @@ +package com.hbt.vehicle.management.netty.dto; + +import lombok.*; + +@Builder +@Getter +@Setter +@NoArgsConstructor +@AllArgsConstructor +public class Station { + + /** + * 基站位置区域码 + */ + private String lac; + + /** + * 基站编号 + */ + private String cid; + + /** + * 基站接收信号电平(信号强度) + */ + private String rxLev; +// +// @Override +// public String toString() { +// return "Station{" + +// "lac='" + lac + '\'' + +// ", cid='" + cid + '\'' + +// ", rxLev='" + rxLev + '\'' + +// '}'; +// } +} diff --git a/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/dto/Wifi.java b/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/dto/Wifi.java new file mode 100644 index 0000000..236f6c4 --- /dev/null +++ b/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/dto/Wifi.java @@ -0,0 +1,29 @@ +package com.hbt.vehicle.management.netty.dto; + +import lombok.*; + +@Builder +@Getter +@Setter +@NoArgsConstructor +@AllArgsConstructor +public class Wifi { + + /** + * wifi信号mac地址 + */ + private String macAddr; + + /** + * wifi信号强度 + */ + private String rxLev; + +// @Override +// public String toString() { +// return "Wifi{" + +// "macAddr='" + macAddr + '\'' + +// ", rxLev='" + rxLev + '\'' + +// '}'; +// } +} diff --git a/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/server/HeartbeatHandler.java b/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/server/HeartbeatHandler.java new file mode 100644 index 0000000..17b2d91 --- /dev/null +++ b/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/server/HeartbeatHandler.java @@ -0,0 +1,52 @@ +package com.hbt.vehicle.management.netty.server; + +import com.hbt.vehicle.management.netty.convert.NettyConvert; +import com.hbt.vehicle.management.netty.dto.HeartbeatDto; +import com.hbt.vehicle.management.netty.dto.ServerAck; +import com.hbt.vehicle.management.rabbit.config.RabbitTopicConfig; +import io.netty.channel.ChannelHandlerContext; +import lombok.extern.slf4j.Slf4j; + +import java.util.Date; + +@Slf4j +public class HeartbeatHandler extends LocateBaseHandler { + + /** + * 有客户端发消息会触发此函数 + * @param ctx ctx + */ + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + String[] content = (String[]) msg; + + // 检查数据类型 + if (!"V7".equals(content[2])) { + // 非该处理器处理类型,交给下一个处理器 + ctx.fireChannelRead(msg); + return; + } + + // 数据转换 + HeartbeatDto heartbeatDto = NettyConvert.convert2HeartbeatDto(content); + log.info("转换后的报文【{}】", heartbeatDto.toString()); + + putMessage2Queue(heartbeatDto); + + // 构建应答消息 + ServerAck ack = ServerAck.builder() + .maker(heartbeatDto.getMaker()) + .number(heartbeatDto.getNumber()) + .dataType("I7") + .date(new Date()) + .build(); + // 响应客户端 + this.channelWrite(ctx.channel().id(), NettyConvert.serverAck2String(ack)); + } + + @Override + public void putMessage2Queue(Object message) { + super.rabbitTemplate.convertAndSend(RabbitTopicConfig.HEARTBEAT_EXCHANGE, RabbitTopicConfig.VEHICLE_HEARTBEAT_TOPIC, + message); + } +} diff --git a/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/server/LocateBaseHandler.java b/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/server/LocateBaseHandler.java new file mode 100644 index 0000000..64ed0cc --- /dev/null +++ b/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/server/LocateBaseHandler.java @@ -0,0 +1,145 @@ +package com.hbt.vehicle.management.netty.server; + +import com.hbt.common.core.utils.SpringUtils; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelId; +import io.netty.channel.ChannelInboundHandlerAdapter; +import io.netty.handler.timeout.IdleState; +import io.netty.handler.timeout.IdleStateEvent; +import io.netty.util.CharsetUtil; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.rabbit.core.RabbitTemplate; + +import java.net.InetSocketAddress; +import java.util.concurrent.ConcurrentHashMap; + +/** + * 处理器链的抽象类 + */ +@Slf4j +public abstract class LocateBaseHandler extends ChannelInboundHandlerAdapter { + + /** + * 管理一个全局map,保存连接进服务端的通道数量 + */ + static final ConcurrentHashMap CHANNEL_MAP = new ConcurrentHashMap<>(); + + final RabbitTemplate rabbitTemplate = SpringUtils.getBean(RabbitTemplate.class); + + /** + * 有客户端连接服务器会触发此函数 + * @param ctx ctx + */ + @Override + public void channelActive(ChannelHandlerContext ctx) { + + InetSocketAddress inetSocket = (InetSocketAddress) ctx.channel().remoteAddress(); + + String clientIp = inetSocket.getAddress().getHostAddress(); + int clientPort = inetSocket.getPort(); + + //获取连接通道唯一标识 + ChannelId channelId = ctx.channel().id(); + + //如果map中不包含此连接,就保存连接 + if (CHANNEL_MAP.containsKey(channelId)) { + log.info("客户端【" + channelId + "】是连接状态,连接通道数量: " + CHANNEL_MAP.size()); + } else { + //保存连接 + CHANNEL_MAP.put(channelId, ctx); + + log.info("客户端【" + channelId + "】连接netty服务器[IP:" + clientIp + "--->PORT:" + clientPort + "]"); + log.info("连接通道数量: " + CHANNEL_MAP.size()); + } + } + + /** + * 有客户端终止连接服务器会触发此函数 + * @param ctx ctx + */ + @Override + public void channelInactive(ChannelHandlerContext ctx) { + + InetSocketAddress inetSocket = (InetSocketAddress) ctx.channel().remoteAddress(); + + String clientIp = inetSocket.getAddress().getHostAddress(); + + ChannelId channelId = ctx.channel().id(); + + //包含此客户端才去删除 // TODO 定时清理假死通道 + if (CHANNEL_MAP.containsKey(channelId)) { + //删除连接 + CHANNEL_MAP.remove(channelId); + + log.info("客户端【" + channelId + "】退出netty服务器[IP:" + clientIp + "--->PORT:" + inetSocket.getPort() + "]"); + log.info("连接通道数量: " + CHANNEL_MAP.size()); + } + } + + /** + * 服务端给客户端发送消息 + * @param msg 需要发送的消息内容 + * @param channelId 连接通道唯一id + */ + public void channelWrite(ChannelId channelId, String msg) { + + ChannelHandlerContext ctx = CHANNEL_MAP.get(channelId); + + if (ctx == null) { + log.info("通道【" + channelId + "】不存在"); + return; + } + + if (msg == null || msg.equals("")) { + log.info("服务端响应空的消息"); + return; + } + ctx.writeAndFlush(Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8)); + // TODO 是否需要在应答后关闭链接 +// ctx.close(); + } + + @Override + public void userEventTriggered(ChannelHandlerContext ctx, Object evt) { + + String socketString = ctx.channel().remoteAddress().toString(); + + if (evt instanceof IdleStateEvent) { + IdleStateEvent event = (IdleStateEvent) evt; + if (event.state() == IdleState.READER_IDLE) { + log.info("Client: " + socketString + " READER_IDLE 读超时"); + ctx.disconnect(); + } else if (event.state() == IdleState.WRITER_IDLE) { + log.info("Client: " + socketString + " WRITER_IDLE 写超时"); + ctx.disconnect(); + } else if (event.state() == IdleState.ALL_IDLE) { + log.info("Client: " + socketString + " ALL_IDLE 总超时"); + ctx.disconnect(); + } + } + } + + /** + * 发生异常会触发此函数 + * @param ctx ctx + */ + @Override + public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { + ctx.close(); + log.info(ctx.channel().id() + " 发生了错误,此连接被关闭" + "此时连通数量: " + CHANNEL_MAP.size()); + cause.printStackTrace(); + } + + public String[] readMessage(Object msg) { + ByteBuf byteBuf = (ByteBuf)msg; + byte[] bytes = new byte[byteBuf.readableBytes()]; + byteBuf.readBytes(bytes); + String message = new String(bytes); + log.info("客户端报文【{}】", message); + return message.split(","); + } + + public abstract void putMessage2Queue(Object message); +} diff --git a/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/server/LocateHandler.java b/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/server/LocateHandler.java new file mode 100644 index 0000000..0b4d963 --- /dev/null +++ b/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/server/LocateHandler.java @@ -0,0 +1,55 @@ +package com.hbt.vehicle.management.netty.server; + +import com.hbt.vehicle.management.netty.convert.NettyConvert; +import com.hbt.vehicle.management.netty.dto.LocationDto; +import com.hbt.vehicle.management.netty.dto.ServerAck; +import com.hbt.vehicle.management.rabbit.config.RabbitTopicConfig; +import io.netty.channel.ChannelHandlerContext; +import lombok.extern.slf4j.Slf4j; + +import java.util.Date; + +@Slf4j +public class LocateHandler extends LocateBaseHandler { + + /** + * 有客户端发消息会触发此函数 + * @param ctx ctx + */ + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + String[] content = (String[]) msg; + + // 检查数据类型 + if (!"V5".equals(content[2])) { + throw new RuntimeException("无法识别的报文!"); + } + + // 数据转换 + LocationDto locationDto = NettyConvert.string2Position(content); + log.info("转换后的报文【{}】", locationDto.toString()); + + putMessage2Queue(locationDto); + + // 构建应答消息 + ServerAck ack = ServerAck.builder() + .maker(locationDto.getMaker()) + .number(locationDto.getNumber()) + .dataType("J5") + .date(new Date()) + .build(); + + // TODO 等待业务处理结果 + ack.setInfo("1:100"); + + // 响应客户端 + this.channelWrite(ctx.channel().id(), NettyConvert.serverAck2String(ack)); + } + + @Override + public void putMessage2Queue(Object message) { + super.rabbitTemplate.convertAndSend(RabbitTopicConfig.LOCATION_EXCHANGE, RabbitTopicConfig.VEHICLE_LOCATION_TOPIC, + message); + } + +} diff --git a/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/server/LoginHandler.java b/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/server/LoginHandler.java new file mode 100644 index 0000000..d6a6162 --- /dev/null +++ b/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/server/LoginHandler.java @@ -0,0 +1,53 @@ +package com.hbt.vehicle.management.netty.server; + +import com.hbt.vehicle.management.netty.convert.NettyConvert; +import com.hbt.vehicle.management.netty.dto.LoginDto; +import com.hbt.vehicle.management.netty.dto.ServerAck; +import com.hbt.vehicle.management.rabbit.config.RabbitTopicConfig; +import io.netty.channel.ChannelHandlerContext; +import lombok.extern.slf4j.Slf4j; + +import java.util.Date; + +@Slf4j +public class LoginHandler extends LocateBaseHandler { + + /** + * 有客户端发消息会触发此函数 + * @param ctx ctx + */ + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) { + String[] content = readMessage(msg); + log.info("通道【{}】,客户端报文【{}】", ctx.channel().id(), content); + + // 检查数据类型 + if (!"V6".equals(content[2])) { + // 非该处理器处理类型,交给下一个处理器 + ctx.fireChannelRead(content); + return; + } + + // 数据转换 + LoginDto loginDto = NettyConvert.string2LoginDto(content); + log.info("转换后的报文【{}】", loginDto.toString()); + + putMessage2Queue(loginDto); + + // 构建应答消息 + ServerAck ack = ServerAck.builder() + .maker(loginDto.getMaker()) + .number(loginDto.getNumber()) + .dataType("J6") + .date(new Date()) + .build(); + // 响应客户端 + this.channelWrite(ctx.channel().id(), NettyConvert.serverAck2String(ack)); + } + + @Override + public void putMessage2Queue(Object message) { + super.rabbitTemplate.convertAndSend(RabbitTopicConfig.LOGIN_EXCHANGE, RabbitTopicConfig.VEHICLE_LOGIN_TOPIC, + message); + } +} diff --git a/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/server/NettyServer.java b/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/server/NettyServer.java new file mode 100644 index 0000000..1bf7140 --- /dev/null +++ b/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/server/NettyServer.java @@ -0,0 +1,45 @@ +package com.hbt.vehicle.management.netty.server; + +import io.netty.bootstrap.ServerBootstrap; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Value; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +public class NettyServer{ + + @Value("${netty-port}") + private Integer port; + + public void start() { + //配置服务端的NIO线程组 + EventLoopGroup bossGroup = new NioEventLoopGroup(1); + EventLoopGroup workerGroup = new NioEventLoopGroup(); + + try { + ServerBootstrap bootstrap = new ServerBootstrap() + .group(bossGroup, workerGroup) // 绑定线程池 + .channel(NioServerSocketChannel.class) +// .localAddress(address) + .childHandler(new NettyServerChannelInitializer()) + .option(ChannelOption.SO_BACKLOG, 128) //服务端接受连接的队列长度,如果队列已满,客户端连接将被拒绝 + .childOption(ChannelOption.SO_KEEPALIVE, true); //保持长连接,2小时无数据激活心跳机制 + + // 绑定端口,开始接收进来的连接 + ChannelFuture future = bootstrap.bind(port).sync(); + log.info("netty服务器开始监听端口:" + port); + //关闭channel和块,直到它被关闭 + future.channel().closeFuture().sync(); + } catch (Exception e) { + e.printStackTrace(); + bossGroup.shutdownGracefully(); + workerGroup.shutdownGracefully(); + } + } +} diff --git a/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/server/NettyServerChannelInitializer.java b/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/server/NettyServerChannelInitializer.java new file mode 100644 index 0000000..4c8993b --- /dev/null +++ b/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/server/NettyServerChannelInitializer.java @@ -0,0 +1,20 @@ +package com.hbt.vehicle.management.netty.server; + +import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelPipeline; +import io.netty.channel.socket.SocketChannel; + +public class NettyServerChannelInitializer extends ChannelInitializer { + + @Override + protected void initChannel(SocketChannel channel) { + //获取管道 + ChannelPipeline pipeline = channel.pipeline(); + pipeline.addLast(new UnpackingDecoder()); // 解码 + + pipeline.addLast(new LoginHandler()); + pipeline.addLast(new HeartbeatHandler()); + pipeline.addLast(new LocateHandler()); + + } +} diff --git a/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/server/UnpackingDecoder.java b/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/server/UnpackingDecoder.java new file mode 100644 index 0000000..8c306e4 --- /dev/null +++ b/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/server/UnpackingDecoder.java @@ -0,0 +1,53 @@ +package com.hbt.vehicle.management.netty.server; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import io.netty.channel.ChannelHandlerContext; +import io.netty.handler.codec.ByteToMessageDecoder; + +import java.util.List; + +public class UnpackingDecoder extends ByteToMessageDecoder { + + private static final ByteBuf beginDelimiter = Unpooled.wrappedBuffer("*".getBytes()); + private static final ByteBuf endDelimiter = Unpooled.wrappedBuffer("#".getBytes()); + + @Override + protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List list) { + int beginDelimiterLength = beginDelimiter.readableBytes(); // 开始标识字节长度 + int endDelimiterLength = endDelimiter.readableBytes(); // 结束标识字节长度 + + int beginIdx = indexOf(byteBuf, beginDelimiter); // 开始标识下标 + int endIdx = indexOf(byteBuf, endDelimiter); // 结束标识下标 + + // 如果缓存区找不到开始标识,跳过该段字节 + if (beginIdx == -1) { + byteBuf.readerIndex(byteBuf.writerIndex()); + return; + } + + // 如果缓冲区找不到结束下标,或者下标小于开始标识,则从开始标识开始读取 + if (endIdx == -1 || endIdx < beginIdx) { + byteBuf.readerIndex(beginIdx); + return; + } + + ByteBuf frame = byteBuf.retainedSlice(beginIdx + beginDelimiterLength, endIdx - beginIdx - beginDelimiterLength); // 将标识之间的数据提取出来,给后面的解码器使用 + list.add(frame); + byteBuf.readerIndex(endIdx + endDelimiterLength); // 更新读取下标 + } + + private static int indexOf(ByteBuf haystack, ByteBuf needle) { + for (int i = haystack.readerIndex(); i < haystack.writerIndex(); i++) { + int haystackIndex = i; + int needleIndex; + for (needleIndex = 0; needleIndex < needle.capacity(); needleIndex++) { + if (haystack.getByte(haystackIndex) != needle.getByte(needleIndex)) break; + haystackIndex++; + if (haystackIndex == haystack.writerIndex() && needleIndex != needle.capacity() - 1) return -1; + } + if (needleIndex == needle.capacity()) return i - haystack.readerIndex(); + } + return -1; + } +} diff --git a/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/rabbit/config/RabbitTopicConfig.java b/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/rabbit/config/RabbitTopicConfig.java new file mode 100644 index 0000000..58abf1d --- /dev/null +++ b/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/rabbit/config/RabbitTopicConfig.java @@ -0,0 +1,96 @@ +package com.hbt.vehicle.management.rabbit.config; + +import org.springframework.amqp.core.Binding; +import org.springframework.amqp.core.BindingBuilder; +import org.springframework.amqp.core.Queue; +import org.springframework.amqp.core.TopicExchange; +import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory; +import org.springframework.amqp.rabbit.connection.ConnectionFactory; +import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +/** + * MQ 队列配置类 + * + * @author yx + * @date 2023/11/21 17:54 + */ +@Configuration +public class RabbitTopicConfig { + + // 交换机 + public static final String LOGIN_EXCHANGE = "vehicleLoginTopicExchange"; + + public static final String HEARTBEAT_EXCHANGE = "vehicleHeartbeatTopicExchange"; + + public static final String LOCATION_EXCHANGE = "vehicleLocationTopicExchange"; + + // 队列 + public static final String SERVICE_ID = "hbt.vehicle"; + + //并发数量 + public static final int DEFAULT_CONCURRENT = 10; + + // 定位信息topic + public static final String VEHICLE_LOCATION_TOPIC = "vehicle.location"; + + public static final String VEHICLE_LOGIN_TOPIC = "vehicle.login"; + + public static final String VEHICLE_HEARTBEAT_TOPIC = "vehicle.heartbeat"; + + @Bean + public TopicExchange loginExchange() { + return new TopicExchange(LOGIN_EXCHANGE); + } + + @Bean + public TopicExchange heartbeatExchange() { + return new TopicExchange(HEARTBEAT_EXCHANGE); + } + + @Bean + public TopicExchange locationExchange() { + return new TopicExchange(LOCATION_EXCHANGE); + } + + @Bean + public Queue loginMessage() { + return new Queue(SERVICE_ID + "." + VEHICLE_LOGIN_TOPIC); + } + + @Bean + public Queue heartbeatMessage() { + return new Queue(SERVICE_ID + "." + VEHICLE_HEARTBEAT_TOPIC); + } + + @Bean + public Queue locationMessage() { + return new Queue(SERVICE_ID + "." + VEHICLE_LOCATION_TOPIC); + } + + @Bean + public Binding bindingExchangeLoginMessage(Queue loginMessage, TopicExchange loginExchange) { + return BindingBuilder.bind(loginMessage).to(loginExchange).with(RabbitTopicConfig.VEHICLE_LOGIN_TOPIC); + } + + @Bean + public Binding bindingExchangeHeartbeatMessage(Queue heartbeatMessage, TopicExchange heartbeatExchange) { + return BindingBuilder.bind(heartbeatMessage).to(heartbeatExchange).with(RabbitTopicConfig.VEHICLE_HEARTBEAT_TOPIC); + } + + @Bean + public Binding bindingExchangeLocationMessage(Queue locationMessage, TopicExchange locationExchange) { + return BindingBuilder.bind(locationMessage).to(locationExchange).with(RabbitTopicConfig.VEHICLE_LOCATION_TOPIC); + } + + @Bean("customContainerFactory") + public SimpleRabbitListenerContainerFactory containerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer, + ConnectionFactory connectionFactory) { + SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); + factory.setConcurrentConsumers(DEFAULT_CONCURRENT); + factory.setMaxConcurrentConsumers(DEFAULT_CONCURRENT); + configurer.configure(factory, connectionFactory); + return factory; + } +} diff --git a/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/rabbit/consumer/LocationConsumer.java b/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/rabbit/consumer/LocationConsumer.java new file mode 100644 index 0000000..0f9877b --- /dev/null +++ b/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/rabbit/consumer/LocationConsumer.java @@ -0,0 +1,46 @@ +package com.hbt.vehicle.management.rabbit.consumer; + +import com.alibaba.fastjson.JSON; +import com.hbt.vehicle.management.netty.dto.HeartbeatDto; +import com.hbt.vehicle.management.netty.dto.LocationDto; +import com.hbt.vehicle.management.netty.dto.LoginDto; +import com.hbt.vehicle.management.rabbit.config.RabbitTopicConfig; +import lombok.extern.slf4j.Slf4j; +import org.springframework.amqp.rabbit.annotation.RabbitListener; +import org.springframework.stereotype.Component; + +@Slf4j +@Component +public class LocationConsumer { + + @RabbitListener(queues = RabbitTopicConfig.SERVICE_ID + "." + RabbitTopicConfig.VEHICLE_LOGIN_TOPIC, + containerFactory = "customContainerFactory") + public void locationMessage(LoginDto message) { + try { + log.info("收到登录MQ数据:{}", JSON.toJSONString(message)); + } catch (Exception e) { + log.error("登录数据处理失败:{}", JSON.toJSONString(message), e); + } + } + + @RabbitListener(queues = RabbitTopicConfig.SERVICE_ID + "." + RabbitTopicConfig.VEHICLE_HEARTBEAT_TOPIC, + containerFactory = "customContainerFactory") + public void heartbeatMessage(HeartbeatDto message) { + try { + log.info("收到心跳MQ数据:{}", JSON.toJSONString(message)); + } catch (Exception e) { + log.error("心跳数据处理失败:{}", JSON.toJSONString(message), e); + } + } + + @RabbitListener(queues = RabbitTopicConfig.SERVICE_ID + "." + RabbitTopicConfig.VEHICLE_LOCATION_TOPIC, + containerFactory = "customContainerFactory") + public void positionMessage(LocationDto message) { + try { + log.info("收到车辆定位MQ数据:{}", JSON.toJSONString(message)); + } catch (Exception e) { + log.error("车辆定位数据处理失败:{}", JSON.toJSONString(message), e); + } + } + +} diff --git a/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/rabbit/task/LocationThreadPool.java b/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/rabbit/task/LocationThreadPool.java new file mode 100644 index 0000000..891a728 --- /dev/null +++ b/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/rabbit/task/LocationThreadPool.java @@ -0,0 +1,22 @@ +package com.hbt.vehicle.management.rabbit.task; + +import org.springframework.stereotype.Component; + +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +@Component +public class LocationThreadPool { + + private final ThreadPoolExecutor executor = new ThreadPoolExecutor( + 10, + 10, + 10, + TimeUnit.HOURS, + new LinkedBlockingQueue<>(), + new ThreadPoolExecutor.CallerRunsPolicy()); + + public ThreadPoolExecutor getExecutor = this.executor; + +} diff --git a/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/vehicleManagementApplication.java b/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/vehicleManagementApplication.java index 7327fbf..58cc020 100644 --- a/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/vehicleManagementApplication.java +++ b/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/vehicleManagementApplication.java @@ -3,10 +3,14 @@ package com.hbt.vehicle.management; import com.hbt.common.security.annotation.EnableCustomConfig; import com.hbt.common.security.annotation.EnableRyFeignClients; import com.hbt.common.swagger.annotation.EnableCustomSwagger2; +import com.hbt.vehicle.management.netty.server.NettyServer; +import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.web.servlet.ServletComponentScan; import org.springframework.cloud.client.SpringCloudApplication; +import javax.annotation.Resource; + /** * 入口类, 扫描并注入其他配置类和服务 */ @@ -15,11 +19,19 @@ import org.springframework.cloud.client.SpringCloudApplication; @EnableCustomSwagger2 @EnableRyFeignClients @ServletComponentScan(basePackages = "com.hbt.vehicle.management") -public class vehicleManagementApplication { +public class vehicleManagementApplication implements CommandLineRunner{ + + @Resource + private NettyServer nettyServer; public static void main(String[] args) { SpringApplication.run(vehicleManagementApplication.class, args); System.out.println("(♥◠‿◠)ノ゙ 车辆管理系统启动成功 ლ(´ڡ`ლ)゙ "); } + + @Override + public void run(String... args) { + nettyServer.start(); + } } diff --git a/hbt-vehicle-management-biz/src/main/resources/bootstrap.yml b/hbt-vehicle-management-biz/src/main/resources/bootstrap.yml index 7b642af..4c20675 100644 --- a/hbt-vehicle-management-biz/src/main/resources/bootstrap.yml +++ b/hbt-vehicle-management-biz/src/main/resources/bootstrap.yml @@ -11,4 +11,4 @@ spring: active: dev nacos: - ip: 119.45.158.12 + ip: 192.168.1.211 diff --git a/hbt-vehicle-management-biz/src/test/java/NettyTest.java b/hbt-vehicle-management-biz/src/test/java/NettyTest.java new file mode 100644 index 0000000..c64908b --- /dev/null +++ b/hbt-vehicle-management-biz/src/test/java/NettyTest.java @@ -0,0 +1,19 @@ +import com.hbt.vehicle.management.netty.client.NettyClient; + +import java.util.Scanner; + +public class NettyTest { + + public static void main(String[] args) throws Exception { + NettyClient nettyClient = new NettyClient("127.0.0.1", 8886); + nettyClient.run(); + Scanner scanner = new Scanner(System.in); + while (scanner.hasNext()) { + String s = scanner.next(); + if (s.equals("exit")) { + break; + } + nettyClient.sendMessage(s); + } + } +}