/** * Echoes back any received data from a client. */ publicfinalclassEchoServer{
publicstaticvoidmain(String[] args)throws Exception { // Configure the server. EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); final EchoServerHandler serverHandler = new EchoServerHandler(); try { ServerBootstrap b = new ServerBootstrap() // 主从 Reactor 模式 .group(bossGroup, workerGroup) // ServerSocketChannel IO 模式 .channel(NioServerSocketChannel.class) // 最大等待数量:当服务器请求处理线程全满时,用于临时存放已完成三次握手的请求的队列的最大长度 .option(ChannelOption.SO_BACKLOG, 100) // ServerSocketChannelHandler .handler(newLoggingHandler(LogLevel.INFO)) // SocketChannelHandler .childHandler(newChannelInitializer<SocketChannel>() { @Override publicvoidinitChannel(SocketChannel ch){ ChannelPipeline p = ch.pipeline(); p.addLast(new LoggingHandler(LogLevel.INFO)); p.addLast(serverHandler); } });
// Start the server. ChannelFuture f = b.bind(8007).sync();
// Wait until the server socket is closed. f.channel().closeFuture().sync(); } finally { // Shut down all event loops to terminate all threads. bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
/** * Handler implementation for the echo server. */ @Sharable publicclassEchoServerHandlerextendsChannelInboundHandlerAdapter{
/** * 发生异常时触发 */ @Override publicvoidexceptionCaught(ChannelHandlerContext ctx, Throwable cause){ // Close the connection when an exception is raised. cause.printStackTrace(); ctx.close(); } }
/** * Sends one message when a connection is open and echoes back any received * data to the server. Simply put, the echo client initiates the ping-pong * traffic between the echo client and server by sending the first message to * the server. */ publicfinalclassEchoClient{
publicstaticvoidmain(String[] args)throws Exception { // Configure the client. EventLoopGroup group = new NioEventLoopGroup(); try { Bootstrap b = new Bootstrap() .group(group) // SocketChannel IO 模式 .channel(NioSocketChannel.class) // 是否启用 Nagle 算法(通过将小的碎片数据连接成更大的报文来提高发送效率) // 如果需要发送一些较小的报文,则需要禁用该算法 .option(ChannelOption.TCP_NODELAY, true) // SocketChannelHandler .handler(newChannelInitializer<SocketChannel>() { @Override publicvoidinitChannel(SocketChannel ch){ ChannelPipeline p = ch.pipeline(); p.addLast(new LoggingHandler(LogLevel.INFO)); p.addLast(new EchoClientHandler()); } });
// Start the client. ChannelFuture f = b.connect("127.0.0.1", 8007).sync();
// Wait until the connection is closed. f.channel().closeFuture().sync(); } finally { // Shut down the event loop to terminate all threads. group.shutdownGracefully(); } } }
/** * Handler implementation for the echo client. It initiates the ping-pong * traffic between the echo client and server by sending the first message to * the server. */ publicclassEchoClientHandlerextendsChannelInboundHandlerAdapter{
privatefinal ByteBuf firstMessage;
/** * Creates a client-side handler. */ publicEchoClientHandler(){ firstMessage = Unpooled.wrappedBuffer("I am echo message".getBytes()); }
/** * 发生异常时触发 */ @Override publicvoidexceptionCaught(ChannelHandlerContext ctx, Throwable cause){ // Close the connection when an exception is raised. cause.printStackTrace(); ctx.close(); } }
/** * An HTTP server that sends back the content of the received HTTP request * in a pretty plaintext form. */ publicfinalclassHttpHelloWorldServer{
publicstaticvoidmain(String[] args)throws Exception { // Configure the server. EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap() // 主从 Reactor 模式 .group(bossGroup, workerGroup) // ServerSocketChannel IO 模式 .channel(NioServerSocketChannel.class) // 最大等待数量:当服务器请求处理线程全满时,用于临时存放已完成三次握手的请求的队列的最大长度 .option(ChannelOption.SO_BACKLOG, 1024) // ServerSocketChannelHandler .handler(newLoggingHandler(LogLevel.INFO)) // SocketChannelHandler .childHandler(newHttpHelloWorldServerInitializer());
// Start the server. Channel ch = b.bind(8080).sync().channel();
System.err.println("Open your web browser and navigate to http://127.0.0.1:8080/");
// Wait until the server socket is closed. ch.closeFuture().sync(); } finally { // Shut down all event loops to terminate all threads. bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }
if (keepAlive) { if (!req.protocolVersion().isKeepAliveDefault()) { response.headers().set(CONNECTION, KEEP_ALIVE); } } else { // Tell the client we're going to close the connection. response.headers().set(CONNECTION, CLOSE); }
ChannelFuture f = ctx.write(response);
if (!keepAlive) { f.addListener(ChannelFutureListener.CLOSE); } } }
// 发送消息 long streamId = IdUtil.nextId(); RequestMessage requestMessage = new RequestMessage(streamId, new OrderOperation(1001, "土豆")); OperationResultFuture future = new OperationResultFuture(); // 记录 streamId 与 future 的对应关系 requestPendingCenter.add(streamId, future);
// 做了认证处理,要求第一个操作是认证操作 channelFuture.channel().writeAndFlush(new RequestMessage(IdUtil.nextId(), new AuthOperation("admin", "password"))); channelFuture.channel().writeAndFlush(requestMessage);
// 阻塞,等待获取结果 BaseOperationResult result = future.get(); log.info("Result: {}", result);
// 等待直到 Server Socket 关闭 channelFuture.channel().closeFuture().sync(); } finally { // Shut down the event loop to terminate all threads. group.shutdownGracefully(); } } }