Compare commits

...

1 Commits

Author SHA1 Message Date
yinxing 0a40b18f5c feat:
1、数据请求和处理
2023-11-22 17:59:22 +08:00
25 changed files with 1176 additions and 4 deletions

View File

@ -1,4 +1,4 @@
FROM 119.45.158.12:5000/mini-java8:latest
COPY target/hbt-template-1.0-SNAPSHOT.jar /app/app.jar
COPY target/hbt-vehicle-management-biz-1.0-SNAPSHOT.jar /app/app.jar
ENV NACOSIP=""
ENTRYPOINT ["sh","-c","java -Dnacos.ip=$NACOSIP -jar /app/app.jar"]

View File

@ -12,12 +12,22 @@
<artifactId>hbt-vehicle-management-biz</artifactId>
<dependencies>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</dependency>
<dependency>
<groupId>com.hbt.onreal</groupId>
<artifactId>hbt-onreal-common-rabbit</artifactId>
</dependency>
<!-- SpringCloud Ailibaba Nacos -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-discovery</artifactId>
</dependency>
<!-- SpringCloud Ailibaba Nacos Config -->
<!-- SpringCloud Ailibaba Nacos NettyTest -->
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-nacos-config</artifactId>

View File

@ -0,0 +1,61 @@
package com.hbt.vehicle.management.netty.client;
import io.netty.bootstrap.Bootstrap;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class NettyClient {
private final String host;
private final Integer port;
private ChannelFuture future;
private EventLoopGroup group;
public NettyClient(String host, Integer port) {
this.host = host;
this.port = port;
}
public void run() throws Exception {
this.group = new NioEventLoopGroup();//I/O线程池
Bootstrap bs = new Bootstrap();//客户端辅助启动类
bs.group(group)
.channel(NioSocketChannel.class)//实例化一个Channel
.option(ChannelOption.TCP_NODELAY, true)
.handler(new NettyClientChannelInitializer());
//连接到远程节点;等待连接完成
this.future = bs.connect(host, port).sync();
//连接建立后都会自动创建一个管道pipeline这个管道也被称为责任链保证顺序执行同时又可以灵活的配置各类Handler这是一个很精妙的设计既减少了线程切换带来的资源开销、避免好多麻烦事同时性能又得到了极大增强
}
public void sendMessage(String message) {
//发送消息到服务器端编码格式是utf-8
this.future.channel().writeAndFlush(Unpooled.copiedBuffer(message, CharsetUtil.UTF_8));
}
public void closeClient() throws InterruptedException {
//阻塞操作closeFuture()开启了一个channel的监听器这期间channel在进行各项工作直到链路断开
try {
this.future.channel().closeFuture().sync();
} catch (InterruptedException e) {
log.error("close pipeline error!", e);
throw new RuntimeException(e);
} finally {
this.group.shutdownGracefully().sync();
}
}
}

View File

@ -0,0 +1,14 @@
package com.hbt.vehicle.management.netty.client;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
public class NettyClientChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel socketChannel) {
ChannelPipeline pipeline = socketChannel.pipeline();
pipeline.addLast(new NettyClientHandler());
}
}

View File

@ -0,0 +1,16 @@
package com.hbt.vehicle.management.netty.client;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class NettyClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf) {
log.info("client accept message:{}", byteBuf.toString(CharsetUtil.UTF_8));
}
}

View File

@ -0,0 +1,94 @@
package com.hbt.vehicle.management.netty.convert;
import com.hbt.common.core.utils.DateUtils;
import com.hbt.vehicle.management.netty.dto.*;
import java.math.BigDecimal;
import java.util.ArrayList;
public class NettyConvert {
public static LocationDto string2Position(String[] content) {
int wifiNum = Integer.parseInt(content[11]);
LocationDto position = LocationDto
.builder()
.maker(content[0])
.number(content[1])
.dataType(content[2])
.time(content[3])
.dataStatus(content[4])
.latitude(new BigDecimal(content[5]))
.lat(content[6])
.longitude(new BigDecimal(content[7]))
.lon(content[8])
.speed(new BigDecimal(content[9]))
.direction(new BigDecimal(content[10]))
.wifiNum(wifiNum)
.wifiList(new ArrayList<>())
.mcc(content[12 + 2 * wifiNum])
.mnc(content[13 + 2 * wifiNum])
.ta(Integer.parseInt(content[14 + 2 * wifiNum]))
.num(Integer.parseInt(content[15 + 2 * wifiNum]))
.stationList(new ArrayList<>())
.date(content[content.length - 3])
.vehicleStatus(content[content.length - 2])
.battery(content[content.length - 1])
.build();
for (int i = 0; i < wifiNum; i++) {
int baseIndex = 11 + 2 * i;
Wifi wifi = Wifi.builder().macAddr(content[baseIndex + 1]).rxLev(content[baseIndex + 2]).build();
position.getWifiList().add(wifi);
}
for (int i = 0; i < position.getNum(); i++) {
int baseIndex = 15 + wifiNum * 2 + 3 * i;
Station station = Station.builder()
.lac(content[baseIndex + 1])
.cid(content[baseIndex + 2])
.rxLev(content[baseIndex + 3])
.build();
position.getStationList().add(station);
}
return position;
}
public static String serverAck2String(ServerAck positionAck) {
return "*" + positionAck.getMaker() + "," +
positionAck.getNumber() + "," +
positionAck.getDataType() + "," +
DateUtils.parseDateToStr("YYYY-MM-DD HH:MM:SS", positionAck.getDate()) + "," +
(positionAck.getInfo() == null ? "" : positionAck.getInfo()) +
"#";
}
public static LoginDto string2LoginDto(String[] content) {
return LoginDto.builder()
.maker(content[0])
.number(content[1])
.dataType(content[2])
.time(content[3])
.dataStatus(content[4])
.latitude(new BigDecimal(content[5]))
.lat(content[6])
.longitude(new BigDecimal(content[7]))
.lon(content[8])
.speed(new BigDecimal(content[9]))
.direction(new BigDecimal(content[10]))
.date(content[11])
.vehicleStatus(content[12])
.mcc(content[13])
.mnc(content[14])
.lac(content[15])
.cid(content[16])
.iccid(content[17])
.build();
}
public static HeartbeatDto convert2HeartbeatDto(String[] content) {
return HeartbeatDto.builder()
.maker(content[0])
.number(content[1])
.dataType(content[2])
.time(content[3])
.build();
}
}

View File

@ -0,0 +1,32 @@
package com.hbt.vehicle.management.netty.dto;
import lombok.*;
@Getter
@Setter
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class HeartbeatDto {
/**
*
*/
private String maker;
/**
* 10
*/
private String number;
/**
*
*/
private String dataType;
/**
* // HHMMSS
*/
private String time;
}

View File

@ -0,0 +1,120 @@
package com.hbt.vehicle.management.netty.dto;
import lombok.*;
import java.math.BigDecimal;
import java.util.List;
@Builder
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
public class LocationDto {
/**
*
*/
private String maker;
/**
* 10
*/
private String number;
/**
* wifi
*/
private String dataType;
/**
* // HHMMSS
*/
private String time;
/**
* A/V
*/
private String dataStatus;
/**
*
*/
private BigDecimal latitude;
/**
* NS
*/
private String lat;
/**
*
*/
private BigDecimal longitude;
/**
* EW西
*/
private String lon;
/**
*
*/
private BigDecimal speed;
/**
* 01
*/
private BigDecimal direction;
/**
* WIFI16
*/
private Integer wifiNum;
/**
* wifi
*/
private List<Wifi> wifiList;
/**
*
*/
private String mcc;
/**
*
*/
private String mnc;
/**
*
*/
private Integer ta;
/**
*
*/
private Integer num;
/**
*
*/
private List<Station> stationList;
/**
* // DDMMYY
*/
private String date;
/**
*
*/
private String vehicleStatus;
/**
*
*/
private String battery;
}

View File

@ -0,0 +1,104 @@
package com.hbt.vehicle.management.netty.dto;
import lombok.*;
import java.math.BigDecimal;
@Getter
@Setter
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class LoginDto {
/**
*
*/
private String maker;
/**
* 10
*/
private String number;
/**
*
*/
private String dataType;
/**
* // HHMMSS
*/
private String time;
/**
* A/V
*/
private String dataStatus;
/**
*
*/
private BigDecimal latitude;
/**
* NS
*/
private String lat;
/**
*
*/
private BigDecimal longitude;
/**
* EW西
*/
private String lon;
/**
*
*/
private BigDecimal speed;
/**
* 01
*/
private BigDecimal direction;
/**
* // DDMMYY
*/
private String date;
/**
*
*/
private String vehicleStatus;
/**
*
*/
private String mcc;
/**
*
*/
private String mnc;
/**
*
*/
private String lac;
/**
*
*/
private String cid;
/**
* SIMICCID
*/
private String iccid;
}

View File

@ -0,0 +1,39 @@
package com.hbt.vehicle.management.netty.dto;
import lombok.Builder;
import lombok.Getter;
import lombok.Setter;
import java.util.Date;
@Getter
@Setter
@Builder
public class ServerAck {
/**
*
*/
private String maker;
/**
*
*/
private String number;
/**
*
*/
private String dataType;
/**
*
*/
private Date date;
/**
*
*/
private String info;
}

View File

@ -0,0 +1,35 @@
package com.hbt.vehicle.management.netty.dto;
import lombok.*;
@Builder
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
public class Station {
/**
*
*/
private String lac;
/**
*
*/
private String cid;
/**
* ()
*/
private String rxLev;
//
// @Override
// public String toString() {
// return "Station{" +
// "lac='" + lac + '\'' +
// ", cid='" + cid + '\'' +
// ", rxLev='" + rxLev + '\'' +
// '}';
// }
}

View File

@ -0,0 +1,29 @@
package com.hbt.vehicle.management.netty.dto;
import lombok.*;
@Builder
@Getter
@Setter
@NoArgsConstructor
@AllArgsConstructor
public class Wifi {
/**
* wifimac
*/
private String macAddr;
/**
* wifi
*/
private String rxLev;
// @Override
// public String toString() {
// return "Wifi{" +
// "macAddr='" + macAddr + '\'' +
// ", rxLev='" + rxLev + '\'' +
// '}';
// }
}

View File

@ -0,0 +1,52 @@
package com.hbt.vehicle.management.netty.server;
import com.hbt.vehicle.management.netty.convert.NettyConvert;
import com.hbt.vehicle.management.netty.dto.HeartbeatDto;
import com.hbt.vehicle.management.netty.dto.ServerAck;
import com.hbt.vehicle.management.rabbit.config.RabbitTopicConfig;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import java.util.Date;
@Slf4j
public class HeartbeatHandler extends LocateBaseHandler {
/**
*
* @param ctx ctx
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
String[] content = (String[]) msg;
// 检查数据类型
if (!"V7".equals(content[2])) {
// 非该处理器处理类型,交给下一个处理器
ctx.fireChannelRead(msg);
return;
}
// 数据转换
HeartbeatDto heartbeatDto = NettyConvert.convert2HeartbeatDto(content);
log.info("转换后的报文【{}】", heartbeatDto.toString());
putMessage2Queue(heartbeatDto);
// 构建应答消息
ServerAck ack = ServerAck.builder()
.maker(heartbeatDto.getMaker())
.number(heartbeatDto.getNumber())
.dataType("I7")
.date(new Date())
.build();
// 响应客户端
this.channelWrite(ctx.channel().id(), NettyConvert.serverAck2String(ack));
}
@Override
public void putMessage2Queue(Object message) {
super.rabbitTemplate.convertAndSend(RabbitTopicConfig.HEARTBEAT_EXCHANGE, RabbitTopicConfig.VEHICLE_HEARTBEAT_TOPIC,
message);
}
}

View File

@ -0,0 +1,145 @@
package com.hbt.vehicle.management.netty.server;
import com.hbt.common.core.utils.SpringUtils;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelId;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.handler.timeout.IdleState;
import io.netty.handler.timeout.IdleStateEvent;
import io.netty.util.CharsetUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import java.net.InetSocketAddress;
import java.util.concurrent.ConcurrentHashMap;
/**
*
*/
@Slf4j
public abstract class LocateBaseHandler extends ChannelInboundHandlerAdapter {
/**
* map
*/
static final ConcurrentHashMap<ChannelId, ChannelHandlerContext> CHANNEL_MAP = new ConcurrentHashMap<>();
final RabbitTemplate rabbitTemplate = SpringUtils.getBean(RabbitTemplate.class);
/**
*
* @param ctx ctx
*/
@Override
public void channelActive(ChannelHandlerContext ctx) {
InetSocketAddress inetSocket = (InetSocketAddress) ctx.channel().remoteAddress();
String clientIp = inetSocket.getAddress().getHostAddress();
int clientPort = inetSocket.getPort();
//获取连接通道唯一标识
ChannelId channelId = ctx.channel().id();
//如果map中不包含此连接就保存连接
if (CHANNEL_MAP.containsKey(channelId)) {
log.info("客户端【" + channelId + "】是连接状态,连接通道数量: " + CHANNEL_MAP.size());
} else {
//保存连接
CHANNEL_MAP.put(channelId, ctx);
log.info("客户端【" + channelId + "】连接netty服务器[IP:" + clientIp + "--->PORT:" + clientPort + "]");
log.info("连接通道数量: " + CHANNEL_MAP.size());
}
}
/**
*
* @param ctx ctx
*/
@Override
public void channelInactive(ChannelHandlerContext ctx) {
InetSocketAddress inetSocket = (InetSocketAddress) ctx.channel().remoteAddress();
String clientIp = inetSocket.getAddress().getHostAddress();
ChannelId channelId = ctx.channel().id();
//包含此客户端才去删除 // TODO 定时清理假死通道
if (CHANNEL_MAP.containsKey(channelId)) {
//删除连接
CHANNEL_MAP.remove(channelId);
log.info("客户端【" + channelId + "】退出netty服务器[IP:" + clientIp + "--->PORT:" + inetSocket.getPort() + "]");
log.info("连接通道数量: " + CHANNEL_MAP.size());
}
}
/**
*
* @param msg
* @param channelId id
*/
public void channelWrite(ChannelId channelId, String msg) {
ChannelHandlerContext ctx = CHANNEL_MAP.get(channelId);
if (ctx == null) {
log.info("通道【" + channelId + "】不存在");
return;
}
if (msg == null || msg.equals("")) {
log.info("服务端响应空的消息");
return;
}
ctx.writeAndFlush(Unpooled.copiedBuffer(msg, CharsetUtil.UTF_8));
// TODO 是否需要在应答后关闭链接
// ctx.close();
}
@Override
public void userEventTriggered(ChannelHandlerContext ctx, Object evt) {
String socketString = ctx.channel().remoteAddress().toString();
if (evt instanceof IdleStateEvent) {
IdleStateEvent event = (IdleStateEvent) evt;
if (event.state() == IdleState.READER_IDLE) {
log.info("Client: " + socketString + " READER_IDLE 读超时");
ctx.disconnect();
} else if (event.state() == IdleState.WRITER_IDLE) {
log.info("Client: " + socketString + " WRITER_IDLE 写超时");
ctx.disconnect();
} else if (event.state() == IdleState.ALL_IDLE) {
log.info("Client: " + socketString + " ALL_IDLE 总超时");
ctx.disconnect();
}
}
}
/**
*
* @param ctx ctx
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
ctx.close();
log.info(ctx.channel().id() + " 发生了错误,此连接被关闭" + "此时连通数量: " + CHANNEL_MAP.size());
cause.printStackTrace();
}
public String[] readMessage(Object msg) {
ByteBuf byteBuf = (ByteBuf)msg;
byte[] bytes = new byte[byteBuf.readableBytes()];
byteBuf.readBytes(bytes);
String message = new String(bytes);
log.info("客户端报文【{}】", message);
return message.split(",");
}
public abstract void putMessage2Queue(Object message);
}

View File

@ -0,0 +1,55 @@
package com.hbt.vehicle.management.netty.server;
import com.hbt.vehicle.management.netty.convert.NettyConvert;
import com.hbt.vehicle.management.netty.dto.LocationDto;
import com.hbt.vehicle.management.netty.dto.ServerAck;
import com.hbt.vehicle.management.rabbit.config.RabbitTopicConfig;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import java.util.Date;
@Slf4j
public class LocateHandler extends LocateBaseHandler {
/**
*
* @param ctx ctx
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
String[] content = (String[]) msg;
// 检查数据类型
if (!"V5".equals(content[2])) {
throw new RuntimeException("无法识别的报文!");
}
// 数据转换
LocationDto locationDto = NettyConvert.string2Position(content);
log.info("转换后的报文【{}】", locationDto.toString());
putMessage2Queue(locationDto);
// 构建应答消息
ServerAck ack = ServerAck.builder()
.maker(locationDto.getMaker())
.number(locationDto.getNumber())
.dataType("J5")
.date(new Date())
.build();
// TODO 等待业务处理结果
ack.setInfo("1:100");
// 响应客户端
this.channelWrite(ctx.channel().id(), NettyConvert.serverAck2String(ack));
}
@Override
public void putMessage2Queue(Object message) {
super.rabbitTemplate.convertAndSend(RabbitTopicConfig.LOCATION_EXCHANGE, RabbitTopicConfig.VEHICLE_LOCATION_TOPIC,
message);
}
}

View File

@ -0,0 +1,53 @@
package com.hbt.vehicle.management.netty.server;
import com.hbt.vehicle.management.netty.convert.NettyConvert;
import com.hbt.vehicle.management.netty.dto.LoginDto;
import com.hbt.vehicle.management.netty.dto.ServerAck;
import com.hbt.vehicle.management.rabbit.config.RabbitTopicConfig;
import io.netty.channel.ChannelHandlerContext;
import lombok.extern.slf4j.Slf4j;
import java.util.Date;
@Slf4j
public class LoginHandler extends LocateBaseHandler {
/**
*
* @param ctx ctx
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
String[] content = readMessage(msg);
log.info("通道【{}】,客户端报文【{}】", ctx.channel().id(), content);
// 检查数据类型
if (!"V6".equals(content[2])) {
// 非该处理器处理类型,交给下一个处理器
ctx.fireChannelRead(content);
return;
}
// 数据转换
LoginDto loginDto = NettyConvert.string2LoginDto(content);
log.info("转换后的报文【{}】", loginDto.toString());
putMessage2Queue(loginDto);
// 构建应答消息
ServerAck ack = ServerAck.builder()
.maker(loginDto.getMaker())
.number(loginDto.getNumber())
.dataType("J6")
.date(new Date())
.build();
// 响应客户端
this.channelWrite(ctx.channel().id(), NettyConvert.serverAck2String(ack));
}
@Override
public void putMessage2Queue(Object message) {
super.rabbitTemplate.convertAndSend(RabbitTopicConfig.LOGIN_EXCHANGE, RabbitTopicConfig.VEHICLE_LOGIN_TOPIC,
message);
}
}

View File

@ -0,0 +1,45 @@
package com.hbt.vehicle.management.netty.server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class NettyServer{
@Value("${netty-port}")
private Integer port;
public void start() {
//配置服务端的NIO线程组
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap bootstrap = new ServerBootstrap()
.group(bossGroup, workerGroup) // 绑定线程池
.channel(NioServerSocketChannel.class)
// .localAddress(address)
.childHandler(new NettyServerChannelInitializer())
.option(ChannelOption.SO_BACKLOG, 128) //服务端接受连接的队列长度,如果队列已满,客户端连接将被拒绝
.childOption(ChannelOption.SO_KEEPALIVE, true); //保持长连接2小时无数据激活心跳机制
// 绑定端口,开始接收进来的连接
ChannelFuture future = bootstrap.bind(port).sync();
log.info("netty服务器开始监听端口" + port);
//关闭channel和块直到它被关闭
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}

View File

@ -0,0 +1,20 @@
package com.hbt.vehicle.management.netty.server;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
public class NettyServerChannelInitializer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel channel) {
//获取管道
ChannelPipeline pipeline = channel.pipeline();
pipeline.addLast(new UnpackingDecoder()); // 解码
pipeline.addLast(new LoginHandler());
pipeline.addLast(new HeartbeatHandler());
pipeline.addLast(new LocateHandler());
}
}

View File

@ -0,0 +1,53 @@
package com.hbt.vehicle.management.netty.server;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import java.util.List;
public class UnpackingDecoder extends ByteToMessageDecoder {
private static final ByteBuf beginDelimiter = Unpooled.wrappedBuffer("*".getBytes());
private static final ByteBuf endDelimiter = Unpooled.wrappedBuffer("#".getBytes());
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) {
int beginDelimiterLength = beginDelimiter.readableBytes(); // 开始标识字节长度
int endDelimiterLength = endDelimiter.readableBytes(); // 结束标识字节长度
int beginIdx = indexOf(byteBuf, beginDelimiter); // 开始标识下标
int endIdx = indexOf(byteBuf, endDelimiter); // 结束标识下标
// 如果缓存区找不到开始标识,跳过该段字节
if (beginIdx == -1) {
byteBuf.readerIndex(byteBuf.writerIndex());
return;
}
// 如果缓冲区找不到结束下标,或者下标小于开始标识,则从开始标识开始读取
if (endIdx == -1 || endIdx < beginIdx) {
byteBuf.readerIndex(beginIdx);
return;
}
ByteBuf frame = byteBuf.retainedSlice(beginIdx + beginDelimiterLength, endIdx - beginIdx - beginDelimiterLength); // 将标识之间的数据提取出来,给后面的解码器使用
list.add(frame);
byteBuf.readerIndex(endIdx + endDelimiterLength); // 更新读取下标
}
private static int indexOf(ByteBuf haystack, ByteBuf needle) {
for (int i = haystack.readerIndex(); i < haystack.writerIndex(); i++) {
int haystackIndex = i;
int needleIndex;
for (needleIndex = 0; needleIndex < needle.capacity(); needleIndex++) {
if (haystack.getByte(haystackIndex) != needle.getByte(needleIndex)) break;
haystackIndex++;
if (haystackIndex == haystack.writerIndex() && needleIndex != needle.capacity() - 1) return -1;
}
if (needleIndex == needle.capacity()) return i - haystack.readerIndex();
}
return -1;
}
}

View File

@ -0,0 +1,96 @@
package com.hbt.vehicle.management.rabbit.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.boot.autoconfigure.amqp.SimpleRabbitListenerContainerFactoryConfigurer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* MQ
*
* @author yx
* @date 2023/11/21 17:54
*/
@Configuration
public class RabbitTopicConfig {
// 交换机
public static final String LOGIN_EXCHANGE = "vehicleLoginTopicExchange";
public static final String HEARTBEAT_EXCHANGE = "vehicleHeartbeatTopicExchange";
public static final String LOCATION_EXCHANGE = "vehicleLocationTopicExchange";
// 队列
public static final String SERVICE_ID = "hbt.vehicle";
//并发数量
public static final int DEFAULT_CONCURRENT = 10;
// 定位信息topic
public static final String VEHICLE_LOCATION_TOPIC = "vehicle.location";
public static final String VEHICLE_LOGIN_TOPIC = "vehicle.login";
public static final String VEHICLE_HEARTBEAT_TOPIC = "vehicle.heartbeat";
@Bean
public TopicExchange loginExchange() {
return new TopicExchange(LOGIN_EXCHANGE);
}
@Bean
public TopicExchange heartbeatExchange() {
return new TopicExchange(HEARTBEAT_EXCHANGE);
}
@Bean
public TopicExchange locationExchange() {
return new TopicExchange(LOCATION_EXCHANGE);
}
@Bean
public Queue loginMessage() {
return new Queue(SERVICE_ID + "." + VEHICLE_LOGIN_TOPIC);
}
@Bean
public Queue heartbeatMessage() {
return new Queue(SERVICE_ID + "." + VEHICLE_HEARTBEAT_TOPIC);
}
@Bean
public Queue locationMessage() {
return new Queue(SERVICE_ID + "." + VEHICLE_LOCATION_TOPIC);
}
@Bean
public Binding bindingExchangeLoginMessage(Queue loginMessage, TopicExchange loginExchange) {
return BindingBuilder.bind(loginMessage).to(loginExchange).with(RabbitTopicConfig.VEHICLE_LOGIN_TOPIC);
}
@Bean
public Binding bindingExchangeHeartbeatMessage(Queue heartbeatMessage, TopicExchange heartbeatExchange) {
return BindingBuilder.bind(heartbeatMessage).to(heartbeatExchange).with(RabbitTopicConfig.VEHICLE_HEARTBEAT_TOPIC);
}
@Bean
public Binding bindingExchangeLocationMessage(Queue locationMessage, TopicExchange locationExchange) {
return BindingBuilder.bind(locationMessage).to(locationExchange).with(RabbitTopicConfig.VEHICLE_LOCATION_TOPIC);
}
@Bean("customContainerFactory")
public SimpleRabbitListenerContainerFactory containerFactory(SimpleRabbitListenerContainerFactoryConfigurer configurer,
ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConcurrentConsumers(DEFAULT_CONCURRENT);
factory.setMaxConcurrentConsumers(DEFAULT_CONCURRENT);
configurer.configure(factory, connectionFactory);
return factory;
}
}

View File

@ -0,0 +1,46 @@
package com.hbt.vehicle.management.rabbit.consumer;
import com.alibaba.fastjson.JSON;
import com.hbt.vehicle.management.netty.dto.HeartbeatDto;
import com.hbt.vehicle.management.netty.dto.LocationDto;
import com.hbt.vehicle.management.netty.dto.LoginDto;
import com.hbt.vehicle.management.rabbit.config.RabbitTopicConfig;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Slf4j
@Component
public class LocationConsumer {
@RabbitListener(queues = RabbitTopicConfig.SERVICE_ID + "." + RabbitTopicConfig.VEHICLE_LOGIN_TOPIC,
containerFactory = "customContainerFactory")
public void locationMessage(LoginDto message) {
try {
log.info("收到登录MQ数据{}", JSON.toJSONString(message));
} catch (Exception e) {
log.error("登录数据处理失败:{}", JSON.toJSONString(message), e);
}
}
@RabbitListener(queues = RabbitTopicConfig.SERVICE_ID + "." + RabbitTopicConfig.VEHICLE_HEARTBEAT_TOPIC,
containerFactory = "customContainerFactory")
public void heartbeatMessage(HeartbeatDto message) {
try {
log.info("收到心跳MQ数据{}", JSON.toJSONString(message));
} catch (Exception e) {
log.error("心跳数据处理失败:{}", JSON.toJSONString(message), e);
}
}
@RabbitListener(queues = RabbitTopicConfig.SERVICE_ID + "." + RabbitTopicConfig.VEHICLE_LOCATION_TOPIC,
containerFactory = "customContainerFactory")
public void positionMessage(LocationDto message) {
try {
log.info("收到车辆定位MQ数据{}", JSON.toJSONString(message));
} catch (Exception e) {
log.error("车辆定位数据处理失败:{}", JSON.toJSONString(message), e);
}
}
}

View File

@ -0,0 +1,22 @@
package com.hbt.vehicle.management.rabbit.task;
import org.springframework.stereotype.Component;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@Component
public class LocationThreadPool {
private final ThreadPoolExecutor executor = new ThreadPoolExecutor(
10,
10,
10,
TimeUnit.HOURS,
new LinkedBlockingQueue<>(),
new ThreadPoolExecutor.CallerRunsPolicy());
public ThreadPoolExecutor getExecutor = this.executor;
}

View File

@ -3,10 +3,14 @@ package com.hbt.vehicle.management;
import com.hbt.common.security.annotation.EnableCustomConfig;
import com.hbt.common.security.annotation.EnableRyFeignClients;
import com.hbt.common.swagger.annotation.EnableCustomSwagger2;
import com.hbt.vehicle.management.netty.server.NettyServer;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.web.servlet.ServletComponentScan;
import org.springframework.cloud.client.SpringCloudApplication;
import javax.annotation.Resource;
/**
* ,
*/
@ -15,11 +19,19 @@ import org.springframework.cloud.client.SpringCloudApplication;
@EnableCustomSwagger2
@EnableRyFeignClients
@ServletComponentScan(basePackages = "com.hbt.vehicle.management")
public class vehicleManagementApplication {
public class vehicleManagementApplication implements CommandLineRunner{
@Resource
private NettyServer nettyServer;
public static void main(String[] args)
{
SpringApplication.run(vehicleManagementApplication.class, args);
System.out.println("(♥◠‿◠)ノ゙ 车辆管理系统启动成功 ლ(´ڡ`ლ)゙ ");
}
@Override
public void run(String... args) {
nettyServer.start();
}
}

View File

@ -11,4 +11,4 @@ spring:
active: dev
nacos:
ip: 119.45.158.12
ip: 192.168.1.211

View File

@ -0,0 +1,19 @@
import com.hbt.vehicle.management.netty.client.NettyClient;
import java.util.Scanner;
public class NettyTest {
public static void main(String[] args) throws Exception {
NettyClient nettyClient = new NettyClient("127.0.0.1", 8886);
nettyClient.run();
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
String s = scanner.next();
if (s.equals("exit")) {
break;
}
nettyClient.sendMessage(s);
}
}
}