diff --git a/hbt-vehicle-management-biz/Dockerfile b/hbt-vehicle-management-biz/Dockerfile
index 965d09a..d1dd9e9 100644
--- a/hbt-vehicle-management-biz/Dockerfile
+++ b/hbt-vehicle-management-biz/Dockerfile
@@ -1,4 +1,4 @@
FROM 119.45.158.12:5000/mini-java8:latest
-COPY target/hbt-template-1.0-SNAPSHOT.jar /app/app.jar
+COPY target/hbt-vehicle-management-biz-1.0-SNAPSHOT.jar /app/app.jar
ENV NACOSIP=""
ENTRYPOINT ["sh","-c","java -Dnacos.ip=$NACOSIP -jar /app/app.jar"]
\ No newline at end of file
diff --git a/hbt-vehicle-management-biz/pom.xml b/hbt-vehicle-management-biz/pom.xml
index 960125f..bb4f190 100644
--- a/hbt-vehicle-management-biz/pom.xml
+++ b/hbt-vehicle-management-biz/pom.xml
@@ -12,12 +12,22 @@
hbt-vehicle-management-biz
+
+ io.netty
+ netty-all
+
+
+
+ com.hbt.onreal
+ hbt-onreal-common-rabbit
+
+
com.alibaba.cloud
spring-cloud-starter-alibaba-nacos-discovery
-
+
com.alibaba.cloud
spring-cloud-starter-alibaba-nacos-config
diff --git a/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/client/NettyClient.java b/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/client/NettyClient.java
new file mode 100644
index 0000000..0ced179
--- /dev/null
+++ b/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/client/NettyClient.java
@@ -0,0 +1,61 @@
+package com.hbt.vehicle.management.netty.client;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import io.netty.util.CharsetUtil;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class NettyClient {
+
+ private final String host;
+
+ private final Integer port;
+
+ private ChannelFuture future;
+
+ private EventLoopGroup group;
+
+ public NettyClient(String host, Integer port) {
+ this.host = host;
+ this.port = port;
+ }
+
+ public void run() throws Exception {
+
+ this.group = new NioEventLoopGroup();//I/O线程池
+
+ Bootstrap bs = new Bootstrap();//客户端辅助启动类
+ bs.group(group)
+ .channel(NioSocketChannel.class)//实例化一个Channel
+ .option(ChannelOption.TCP_NODELAY, true)
+ .handler(new NettyClientChannelInitializer());
+
+ //连接到远程节点;等待连接完成
+ this.future = bs.connect(host, port).sync();
+ //连接建立后,都会自动创建一个管道pipeline,这个管道也被称为责任链,保证顺序执行,同时又可以灵活的配置各类Handler,这是一个很精妙的设计,既减少了线程切换带来的资源开销、避免好多麻烦事,同时性能又得到了极大增强
+
+ }
+
+ public void sendMessage(String message) {
+ //发送消息到服务器端,编码格式是utf-8
+ this.future.channel().writeAndFlush(Unpooled.copiedBuffer(message, CharsetUtil.UTF_8));
+ }
+
+ public void closeClient() throws InterruptedException {
+ //阻塞操作,closeFuture()开启了一个channel的监听器(这期间channel在进行各项工作),直到链路断开
+ try {
+ this.future.channel().closeFuture().sync();
+ } catch (InterruptedException e) {
+ log.error("close pipeline error!", e);
+ throw new RuntimeException(e);
+ } finally {
+ this.group.shutdownGracefully().sync();
+ }
+ }
+}
diff --git a/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/client/NettyClientChannelInitializer.java b/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/client/NettyClientChannelInitializer.java
new file mode 100644
index 0000000..39b6622
--- /dev/null
+++ b/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/client/NettyClientChannelInitializer.java
@@ -0,0 +1,14 @@
+package com.hbt.vehicle.management.netty.client;
+
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.socket.SocketChannel;
+
+public class NettyClientChannelInitializer extends ChannelInitializer {
+
+ @Override
+ protected void initChannel(SocketChannel socketChannel) {
+ ChannelPipeline pipeline = socketChannel.pipeline();
+ pipeline.addLast(new NettyClientHandler());
+ }
+}
diff --git a/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/client/NettyClientHandler.java b/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/client/NettyClientHandler.java
new file mode 100644
index 0000000..12939d0
--- /dev/null
+++ b/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/client/NettyClientHandler.java
@@ -0,0 +1,16 @@
+package com.hbt.vehicle.management.netty.client;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.SimpleChannelInboundHandler;
+import io.netty.util.CharsetUtil;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class NettyClientHandler extends SimpleChannelInboundHandler {
+
+ @Override
+ protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) {
+ log.info("client accept message:{}", byteBuf.toString(CharsetUtil.UTF_8));
+ }
+}
diff --git a/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/convert/NettyConvert.java b/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/convert/NettyConvert.java
new file mode 100644
index 0000000..4ae5b76
--- /dev/null
+++ b/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/convert/NettyConvert.java
@@ -0,0 +1,94 @@
+package com.hbt.vehicle.management.netty.convert;
+
+import com.hbt.common.core.utils.DateUtils;
+import com.hbt.vehicle.management.netty.dto.*;
+
+import java.math.BigDecimal;
+import java.util.ArrayList;
+
+public class NettyConvert {
+
+ public static LocationDto string2Position(String[] content) {
+ int wifiNum = Integer.parseInt(content[11]);
+ LocationDto position = LocationDto
+ .builder()
+ .maker(content[0])
+ .number(content[1])
+ .dataType(content[2])
+ .time(content[3])
+ .dataStatus(content[4])
+ .latitude(new BigDecimal(content[5]))
+ .lat(content[6])
+ .longitude(new BigDecimal(content[7]))
+ .lon(content[8])
+ .speed(new BigDecimal(content[9]))
+ .direction(new BigDecimal(content[10]))
+ .wifiNum(wifiNum)
+ .wifiList(new ArrayList<>())
+ .mcc(content[12 + 2 * wifiNum])
+ .mnc(content[13 + 2 * wifiNum])
+ .ta(Integer.parseInt(content[14 + 2 * wifiNum]))
+ .num(Integer.parseInt(content[15 + 2 * wifiNum]))
+ .stationList(new ArrayList<>())
+ .date(content[content.length - 3])
+ .vehicleStatus(content[content.length - 2])
+ .battery(content[content.length - 1])
+ .build();
+ for (int i = 0; i < wifiNum; i++) {
+ int baseIndex = 11 + 2 * i;
+ Wifi wifi = Wifi.builder().macAddr(content[baseIndex + 1]).rxLev(content[baseIndex + 2]).build();
+ position.getWifiList().add(wifi);
+ }
+ for (int i = 0; i < position.getNum(); i++) {
+ int baseIndex = 15 + wifiNum * 2 + 3 * i;
+ Station station = Station.builder()
+ .lac(content[baseIndex + 1])
+ .cid(content[baseIndex + 2])
+ .rxLev(content[baseIndex + 3])
+ .build();
+ position.getStationList().add(station);
+ }
+ return position;
+ }
+
+ public static String serverAck2String(ServerAck positionAck) {
+ return "*" + positionAck.getMaker() + "," +
+ positionAck.getNumber() + "," +
+ positionAck.getDataType() + "," +
+ DateUtils.parseDateToStr("YYYY-MM-DD HH:MM:SS", positionAck.getDate()) + "," +
+ (positionAck.getInfo() == null ? "" : positionAck.getInfo()) +
+ "#";
+ }
+
+ public static LoginDto string2LoginDto(String[] content) {
+ return LoginDto.builder()
+ .maker(content[0])
+ .number(content[1])
+ .dataType(content[2])
+ .time(content[3])
+ .dataStatus(content[4])
+ .latitude(new BigDecimal(content[5]))
+ .lat(content[6])
+ .longitude(new BigDecimal(content[7]))
+ .lon(content[8])
+ .speed(new BigDecimal(content[9]))
+ .direction(new BigDecimal(content[10]))
+ .date(content[11])
+ .vehicleStatus(content[12])
+ .mcc(content[13])
+ .mnc(content[14])
+ .lac(content[15])
+ .cid(content[16])
+ .iccid(content[17])
+ .build();
+ }
+
+ public static HeartbeatDto convert2HeartbeatDto(String[] content) {
+ return HeartbeatDto.builder()
+ .maker(content[0])
+ .number(content[1])
+ .dataType(content[2])
+ .time(content[3])
+ .build();
+ }
+}
diff --git a/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/dto/HeartbeatDto.java b/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/dto/HeartbeatDto.java
new file mode 100644
index 0000000..81f3a35
--- /dev/null
+++ b/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/dto/HeartbeatDto.java
@@ -0,0 +1,32 @@
+package com.hbt.vehicle.management.netty.dto;
+
+import lombok.*;
+
+@Getter
+@Setter
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
+public class HeartbeatDto {
+
+ /**
+ * 制造商
+ */
+ private String maker;
+
+ /**
+ * 车载机序列号10位数
+ */
+ private String number;
+
+ /**
+ * 数据类型
+ */
+ private String dataType;
+
+ /**
+ * 时间:时/分/秒 HHMMSS 格林威治时间
+ */
+ private String time;
+
+}
diff --git a/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/dto/LocationDto.java b/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/dto/LocationDto.java
new file mode 100644
index 0000000..927859e
--- /dev/null
+++ b/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/dto/LocationDto.java
@@ -0,0 +1,120 @@
+package com.hbt.vehicle.management.netty.dto;
+
+import lombok.*;
+
+import java.math.BigDecimal;
+import java.util.List;
+
+@Builder
+@Getter
+@Setter
+@NoArgsConstructor
+@AllArgsConstructor
+public class LocationDto {
+
+ /**
+ * 制造商
+ */
+ private String maker;
+
+ /**
+ * 车载机序列号10位数
+ */
+ private String number;
+
+ /**
+ * 数据类型wifi数据
+ */
+ private String dataType;
+
+ /**
+ * 时间:时/分/秒 HHMMSS 格林威治时间
+ */
+ private String time;
+
+ /**
+ * 数据有效位(A/V)
+ */
+ private String dataStatus;
+
+ /**
+ * 纬度
+ */
+ private BigDecimal latitude;
+
+ /**
+ * 纬度标志(N:北纬,S:南纬)
+ */
+ private String lat;
+
+ /**
+ * 经度
+ */
+ private BigDecimal longitude;
+
+ /**
+ * 经度标志(E:东经,W:西经)
+ */
+ private String lon;
+
+ /**
+ * 速度
+ */
+ private BigDecimal speed;
+
+ /**
+ * 方位角,正北为0度,分辨率1度,顺时针方向
+ */
+ private BigDecimal direction;
+
+ /**
+ * WIFI个数(最多16个以逗号间隔)
+ */
+ private Integer wifiNum;
+
+ /**
+ * wifi信息
+ */
+ private List wifiList;
+
+ /**
+ * 移动国家码
+ */
+ private String mcc;
+
+ /**
+ * 移动网络码
+ */
+ private String mnc;
+
+ /**
+ * 时间提前量
+ */
+ private Integer ta;
+
+ /**
+ * 基站个数
+ */
+ private Integer num;
+
+ /**
+ * 基站信息
+ */
+ private List stationList;
+
+ /**
+ * 时间:日/月/年 DDMMYY
+ */
+ private String date;
+
+ /**
+ * 设备状态
+ */
+ private String vehicleStatus;
+
+ /**
+ * 电池电压
+ */
+ private String battery;
+
+}
diff --git a/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/dto/LoginDto.java b/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/dto/LoginDto.java
new file mode 100644
index 0000000..d82e327
--- /dev/null
+++ b/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/dto/LoginDto.java
@@ -0,0 +1,104 @@
+package com.hbt.vehicle.management.netty.dto;
+
+import lombok.*;
+
+import java.math.BigDecimal;
+
+@Getter
+@Setter
+@Builder
+@AllArgsConstructor
+@NoArgsConstructor
+public class LoginDto {
+
+ /**
+ * 制造商
+ */
+ private String maker;
+
+ /**
+ * 车载机序列号10位数
+ */
+ private String number;
+
+ /**
+ * 数据类型
+ */
+ private String dataType;
+
+ /**
+ * 时间:时/分/秒 HHMMSS 格林威治时间
+ */
+ private String time;
+
+ /**
+ * 数据有效位(A/V)
+ */
+ private String dataStatus;
+
+ /**
+ * 纬度
+ */
+ private BigDecimal latitude;
+
+ /**
+ * 纬度标志(N:北纬,S:南纬)
+ */
+ private String lat;
+
+ /**
+ * 经度
+ */
+ private BigDecimal longitude;
+
+ /**
+ * 经度标志(E:东经,W:西经)
+ */
+ private String lon;
+
+ /**
+ * 速度
+ */
+ private BigDecimal speed;
+
+ /**
+ * 方位角,正北为0度,分辨率1度,顺时针方向
+ */
+ private BigDecimal direction;
+
+ /**
+ * 时间:日/月/年 DDMMYY
+ */
+ private String date;
+
+ /**
+ * 设备状态
+ */
+ private String vehicleStatus;
+
+ /**
+ * 移动国家码
+ */
+ private String mcc;
+
+ /**
+ * 移动网络码
+ */
+ private String mnc;
+
+ /**
+ * 基站位置区域码
+ */
+ private String lac;
+
+ /**
+ * 基站编号
+ */
+ private String cid;
+
+ /**
+ * SIM卡ICCID
+ */
+ private String iccid;
+
+}
diff --git a/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/dto/ServerAck.java b/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/dto/ServerAck.java
new file mode 100644
index 0000000..b948367
--- /dev/null
+++ b/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/dto/ServerAck.java
@@ -0,0 +1,39 @@
+package com.hbt.vehicle.management.netty.dto;
+
+import lombok.Builder;
+import lombok.Getter;
+import lombok.Setter;
+
+import java.util.Date;
+
+@Getter
+@Setter
+@Builder
+public class ServerAck {
+
+ /**
+ * 制造商
+ */
+ private String maker;
+
+ /**
+ * 编号
+ */
+ private String number;
+
+ /**
+ * 数据类型
+ */
+ private String dataType;
+
+ /**
+ * 日期
+ */
+ private Date date;
+
+ /**
+ * 应答主要信息
+ */
+ private String info;
+
+}
diff --git a/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/dto/Station.java b/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/dto/Station.java
new file mode 100644
index 0000000..b3ffc4b
--- /dev/null
+++ b/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/dto/Station.java
@@ -0,0 +1,35 @@
+package com.hbt.vehicle.management.netty.dto;
+
+import lombok.*;
+
+@Builder
+@Getter
+@Setter
+@NoArgsConstructor
+@AllArgsConstructor
+public class Station {
+
+ /**
+ * 基站位置区域码
+ */
+ private String lac;
+
+ /**
+ * 基站编号
+ */
+ private String cid;
+
+ /**
+ * 基站接收信号电平(信号强度)
+ */
+ private String rxLev;
+//
+// @Override
+// public String toString() {
+// return "Station{" +
+// "lac='" + lac + '\'' +
+// ", cid='" + cid + '\'' +
+// ", rxLev='" + rxLev + '\'' +
+// '}';
+// }
+}
diff --git a/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/dto/Wifi.java b/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/dto/Wifi.java
new file mode 100644
index 0000000..236f6c4
--- /dev/null
+++ b/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/dto/Wifi.java
@@ -0,0 +1,29 @@
+package com.hbt.vehicle.management.netty.dto;
+
+import lombok.*;
+
+@Builder
+@Getter
+@Setter
+@NoArgsConstructor
+@AllArgsConstructor
+public class Wifi {
+
+ /**
+ * wifi信号mac地址
+ */
+ private String macAddr;
+
+ /**
+ * wifi信号强度
+ */
+ private String rxLev;
+
+// @Override
+// public String toString() {
+// return "Wifi{" +
+// "macAddr='" + macAddr + '\'' +
+// ", rxLev='" + rxLev + '\'' +
+// '}';
+// }
+}
diff --git a/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/server/HeartbeatHandler.java b/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/server/HeartbeatHandler.java
new file mode 100644
index 0000000..17b2d91
--- /dev/null
+++ b/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/server/HeartbeatHandler.java
@@ -0,0 +1,52 @@
+package com.hbt.vehicle.management.netty.server;
+
+import com.hbt.vehicle.management.netty.convert.NettyConvert;
+import com.hbt.vehicle.management.netty.dto.HeartbeatDto;
+import com.hbt.vehicle.management.netty.dto.ServerAck;
+import com.hbt.vehicle.management.rabbit.config.RabbitTopicConfig;
+import io.netty.channel.ChannelHandlerContext;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.Date;
+
+@Slf4j
+public class HeartbeatHandler extends LocateBaseHandler {
+
+ /**
+ * 有客户端发消息会触发此函数
+ * @param ctx ctx
+ */
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) {
+ String[] content = (String[]) msg;
+
+ // 检查数据类型
+ if (!"V7".equals(content[2])) {
+ // 非该处理器处理类型,交给下一个处理器
+ ctx.fireChannelRead(msg);
+ return;
+ }
+
+ // 数据转换
+ HeartbeatDto heartbeatDto = NettyConvert.convert2HeartbeatDto(content);
+ log.info("转换后的报文【{}】", heartbeatDto.toString());
+
+ putMessage2Queue(heartbeatDto);
+
+ // 构建应答消息
+ ServerAck ack = ServerAck.builder()
+ .maker(heartbeatDto.getMaker())
+ .number(heartbeatDto.getNumber())
+ .dataType("I7")
+ .date(new Date())
+ .build();
+ // 响应客户端
+ this.channelWrite(ctx.channel().id(), NettyConvert.serverAck2String(ack));
+ }
+
+ @Override
+ public void putMessage2Queue(Object message) {
+ super.rabbitTemplate.convertAndSend(RabbitTopicConfig.HEARTBEAT_EXCHANGE, RabbitTopicConfig.VEHICLE_HEARTBEAT_TOPIC,
+ message);
+ }
+}
diff --git a/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/server/LocateBaseHandler.java b/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/server/LocateBaseHandler.java
new file mode 100644
index 0000000..64ed0cc
--- /dev/null
+++ b/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/server/LocateBaseHandler.java
@@ -0,0 +1,145 @@
+package com.hbt.vehicle.management.netty.server;
+
+import com.hbt.common.core.utils.SpringUtils;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelId;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.handler.timeout.IdleState;
+import io.netty.handler.timeout.IdleStateEvent;
+import io.netty.util.CharsetUtil;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
+
+import java.net.InetSocketAddress;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * 处理器链的抽象类
+ */
+@Slf4j
+public abstract class LocateBaseHandler extends ChannelInboundHandlerAdapter {
+
+ /**
+ * 管理一个全局map,保存连接进服务端的通道数量
+ */
+ static final ConcurrentHashMap CHANNEL_MAP = new ConcurrentHashMap<>();
+
+ final RabbitTemplate rabbitTemplate = SpringUtils.getBean(RabbitTemplate.class);
+
+ /**
+ * 有客户端连接服务器会触发此函数
+ * @param ctx ctx
+ */
+ @Override
+ public void channelActive(ChannelHandlerContext ctx) {
+
+ InetSocketAddress inetSocket = (InetSocketAddress) ctx.channel().remoteAddress();
+
+ String clientIp = inetSocket.getAddress().getHostAddress();
+ int clientPort = inetSocket.getPort();
+
+ //获取连接通道唯一标识
+ ChannelId channelId = ctx.channel().id();
+
+ //如果map中不包含此连接,就保存连接
+ if (CHANNEL_MAP.containsKey(channelId)) {
+ log.info("客户端【" + channelId + "】是连接状态,连接通道数量: " + CHANNEL_MAP.size());
+ } else {
+ //保存连接
+ CHANNEL_MAP.put(channelId, ctx);
+
+ log.info("客户端【" + channelId + "】连接netty服务器[IP:" + clientIp + "--->PORT:" + clientPort + "]");
+ log.info("连接通道数量: " + CHANNEL_MAP.size());
+ }
+ }
+
+ /**
+ * 有客户端终止连接服务器会触发此函数
+ * @param ctx ctx
+ */
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) {
+
+ InetSocketAddress inetSocket = (InetSocketAddress) ctx.channel().remoteAddress();
+
+ String clientIp = inetSocket.getAddress().getHostAddress();
+
+ ChannelId channelId = ctx.channel().id();
+
+ //包含此客户端才去删除 // TODO 定时清理假死通道
+ if (CHANNEL_MAP.containsKey(channelId)) {
+ //删除连接
+ CHANNEL_MAP.remove(channelId);
+
+ log.info("客户端【" + channelId + "】退出netty服务器[IP:" + clientIp + "--->PORT:" + inetSocket.getPort() + "]");
+ log.info("连接通道数量: " + CHANNEL_MAP.size());
+ }
+ }
+
+ /**
+ * 服务端给客户端发送消息
+ * @param msg 需要发送的消息内容
+ * @param channelId 连接通道唯一id
+ */
+ public void channelWrite(ChannelId channelId, String msg) {
+
+ ChannelHandlerContext ctx = CHANNEL_MAP.get(channelId);
+
+ if (ctx == null) {
+ log.info("通道【" + channelId + "】不存在");
+ return;
+ }
+
+ if (msg == null || msg.equals("")) {
+ log.info("服务端响应空的消息");
+ return;
+ }
+ ctx.writeAndFlush(Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8));
+ // TODO 是否需要在应答后关闭链接
+// ctx.close();
+ }
+
+ @Override
+ public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
+
+ String socketString = ctx.channel().remoteAddress().toString();
+
+ if (evt instanceof IdleStateEvent) {
+ IdleStateEvent event = (IdleStateEvent) evt;
+ if (event.state() == IdleState.READER_IDLE) {
+ log.info("Client: " + socketString + " READER_IDLE 读超时");
+ ctx.disconnect();
+ } else if (event.state() == IdleState.WRITER_IDLE) {
+ log.info("Client: " + socketString + " WRITER_IDLE 写超时");
+ ctx.disconnect();
+ } else if (event.state() == IdleState.ALL_IDLE) {
+ log.info("Client: " + socketString + " ALL_IDLE 总超时");
+ ctx.disconnect();
+ }
+ }
+ }
+
+ /**
+ * 发生异常会触发此函数
+ * @param ctx ctx
+ */
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
+ ctx.close();
+ log.info(ctx.channel().id() + " 发生了错误,此连接被关闭" + "此时连通数量: " + CHANNEL_MAP.size());
+ cause.printStackTrace();
+ }
+
+ public String[] readMessage(Object msg) {
+ ByteBuf byteBuf = (ByteBuf)msg;
+ byte[] bytes = new byte[byteBuf.readableBytes()];
+ byteBuf.readBytes(bytes);
+ String message = new String(bytes);
+ log.info("客户端报文【{}】", message);
+ return message.split(",");
+ }
+
+ public abstract void putMessage2Queue(Object message);
+}
diff --git a/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/server/LocateHandler.java b/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/server/LocateHandler.java
new file mode 100644
index 0000000..0b4d963
--- /dev/null
+++ b/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/server/LocateHandler.java
@@ -0,0 +1,55 @@
+package com.hbt.vehicle.management.netty.server;
+
+import com.hbt.vehicle.management.netty.convert.NettyConvert;
+import com.hbt.vehicle.management.netty.dto.LocationDto;
+import com.hbt.vehicle.management.netty.dto.ServerAck;
+import com.hbt.vehicle.management.rabbit.config.RabbitTopicConfig;
+import io.netty.channel.ChannelHandlerContext;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.Date;
+
+@Slf4j
+public class LocateHandler extends LocateBaseHandler {
+
+ /**
+ * 有客户端发消息会触发此函数
+ * @param ctx ctx
+ */
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) {
+ String[] content = (String[]) msg;
+
+ // 检查数据类型
+ if (!"V5".equals(content[2])) {
+ throw new RuntimeException("无法识别的报文!");
+ }
+
+ // 数据转换
+ LocationDto locationDto = NettyConvert.string2Position(content);
+ log.info("转换后的报文【{}】", locationDto.toString());
+
+ putMessage2Queue(locationDto);
+
+ // 构建应答消息
+ ServerAck ack = ServerAck.builder()
+ .maker(locationDto.getMaker())
+ .number(locationDto.getNumber())
+ .dataType("J5")
+ .date(new Date())
+ .build();
+
+ // TODO 等待业务处理结果
+ ack.setInfo("1:100");
+
+ // 响应客户端
+ this.channelWrite(ctx.channel().id(), NettyConvert.serverAck2String(ack));
+ }
+
+ @Override
+ public void putMessage2Queue(Object message) {
+ super.rabbitTemplate.convertAndSend(RabbitTopicConfig.LOCATION_EXCHANGE, RabbitTopicConfig.VEHICLE_LOCATION_TOPIC,
+ message);
+ }
+
+}
diff --git a/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/server/LoginHandler.java b/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/server/LoginHandler.java
new file mode 100644
index 0000000..d6a6162
--- /dev/null
+++ b/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/server/LoginHandler.java
@@ -0,0 +1,53 @@
+package com.hbt.vehicle.management.netty.server;
+
+import com.hbt.vehicle.management.netty.convert.NettyConvert;
+import com.hbt.vehicle.management.netty.dto.LoginDto;
+import com.hbt.vehicle.management.netty.dto.ServerAck;
+import com.hbt.vehicle.management.rabbit.config.RabbitTopicConfig;
+import io.netty.channel.ChannelHandlerContext;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.Date;
+
+@Slf4j
+public class LoginHandler extends LocateBaseHandler {
+
+ /**
+ * 有客户端发消息会触发此函数
+ * @param ctx ctx
+ */
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) {
+ String[] content = readMessage(msg);
+ log.info("通道【{}】,客户端报文【{}】", ctx.channel().id(), content);
+
+ // 检查数据类型
+ if (!"V6".equals(content[2])) {
+ // 非该处理器处理类型,交给下一个处理器
+ ctx.fireChannelRead(content);
+ return;
+ }
+
+ // 数据转换
+ LoginDto loginDto = NettyConvert.string2LoginDto(content);
+ log.info("转换后的报文【{}】", loginDto.toString());
+
+ putMessage2Queue(loginDto);
+
+ // 构建应答消息
+ ServerAck ack = ServerAck.builder()
+ .maker(loginDto.getMaker())
+ .number(loginDto.getNumber())
+ .dataType("J6")
+ .date(new Date())
+ .build();
+ // 响应客户端
+ this.channelWrite(ctx.channel().id(), NettyConvert.serverAck2String(ack));
+ }
+
+ @Override
+ public void putMessage2Queue(Object message) {
+ super.rabbitTemplate.convertAndSend(RabbitTopicConfig.LOGIN_EXCHANGE, RabbitTopicConfig.VEHICLE_LOGIN_TOPIC,
+ message);
+ }
+}
diff --git a/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/server/NettyServer.java b/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/server/NettyServer.java
new file mode 100644
index 0000000..1bf7140
--- /dev/null
+++ b/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/server/NettyServer.java
@@ -0,0 +1,45 @@
+package com.hbt.vehicle.management.netty.server;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+public class NettyServer{
+
+ @Value("${netty-port}")
+ private Integer port;
+
+ public void start() {
+ //配置服务端的NIO线程组
+ EventLoopGroup bossGroup = new NioEventLoopGroup(1);
+ EventLoopGroup workerGroup = new NioEventLoopGroup();
+
+ try {
+ ServerBootstrap bootstrap = new ServerBootstrap()
+ .group(bossGroup, workerGroup) // 绑定线程池
+ .channel(NioServerSocketChannel.class)
+// .localAddress(address)
+ .childHandler(new NettyServerChannelInitializer())
+ .option(ChannelOption.SO_BACKLOG, 128) //服务端接受连接的队列长度,如果队列已满,客户端连接将被拒绝
+ .childOption(ChannelOption.SO_KEEPALIVE, true); //保持长连接,2小时无数据激活心跳机制
+
+ // 绑定端口,开始接收进来的连接
+ ChannelFuture future = bootstrap.bind(port).sync();
+ log.info("netty服务器开始监听端口:" + port);
+ //关闭channel和块,直到它被关闭
+ future.channel().closeFuture().sync();
+ } catch (Exception e) {
+ e.printStackTrace();
+ bossGroup.shutdownGracefully();
+ workerGroup.shutdownGracefully();
+ }
+ }
+}
diff --git a/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/server/NettyServerChannelInitializer.java b/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/server/NettyServerChannelInitializer.java
new file mode 100644
index 0000000..4c8993b
--- /dev/null
+++ b/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/server/NettyServerChannelInitializer.java
@@ -0,0 +1,20 @@
+package com.hbt.vehicle.management.netty.server;
+
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelPipeline;
+import io.netty.channel.socket.SocketChannel;
+
+public class NettyServerChannelInitializer extends ChannelInitializer {
+
+ @Override
+ protected void initChannel(SocketChannel channel) {
+ //获取管道
+ ChannelPipeline pipeline = channel.pipeline();
+ pipeline.addLast(new UnpackingDecoder()); // 解码
+
+ pipeline.addLast(new LoginHandler());
+ pipeline.addLast(new HeartbeatHandler());
+ pipeline.addLast(new LocateHandler());
+
+ }
+}
diff --git a/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/server/UnpackingDecoder.java b/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/server/UnpackingDecoder.java
new file mode 100644
index 0000000..8c306e4
--- /dev/null
+++ b/hbt-vehicle-management-biz/src/main/java/com/hbt/vehicle/management/netty/server/UnpackingDecoder.java
@@ -0,0 +1,53 @@
+package com.hbt.vehicle.management.netty.server;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.ByteToMessageDecoder;
+
+import java.util.List;
+
+public class UnpackingDecoder extends ByteToMessageDecoder {
+
+ private static final ByteBuf beginDelimiter = Unpooled.wrappedBuffer("*".getBytes());
+ private static final ByteBuf endDelimiter = Unpooled.wrappedBuffer("#".getBytes());
+
+ @Override
+ protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List