First of all, let's make a stub of a server:
public class CLIServer
{
private ChannelFuture future;
private NioEventLoopGroup masterGroup;
private NioEventLoopGroup workerGroup;
// remember that arbitrary network ports start at 49152 and end with 65535
public CLIServer(int networkPort)
{
// these groups handle all I/O and events
masterGroup = new NioEventLoopGroup();
workerGroup = new NioEventLoopGroup();
try
{
// the bootstrap is used to simplify server setup
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(masterGroup, workerGroup);
serverBootstrap.channel(NioServerSocketChannel.class);
// The "backlog" argument is a maximum requested number of pending connections on the socket.
// Its exact effects are implementation specific.
// In particular, an implementation may impose a maximum length or may choose to ignore the parameter altogether.
// The value should be greater than 0, otherwise an implementation specific default will be used.
serverBootstrap.option(ChannelOption.SO_BACKLOG,128);
serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE,true);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>()
{
@Override
protected void initChannel(SocketChannel ch)
{
//TODO
}
}).validate();
// start the server (bind) and wait until it stops (sync)
future = serverBootstrap.bind(networkPort).sync();
}
catch (Exception e)
{
e.printStackTrace();
}
}
}
Next, we make a main class which will start the server; Apache Logging library will be used for logging (group: 'org.apache.logging.log4j', name: 'log4j-core', version: '2.12.1').
public class Starter
{
private static CLIServer SERVER_INSTANCE;
private static Logger logger;
public static void main(String[] strings)
{
logger = LogManager.getLogger(CLIServer.class.getSimpleName(), new ParameterizedMessageFactory());
logger.info(logger.getName() + " initialized");
logger.info("Type \"exit\" to stop the program");
SERVER_INSTANCE = new CLIServer(59433);
// It will read input from the console and stop the program if we type "exit"
BufferedReader reader = new BufferedReader(new InputStreamReader(System.in));
String s;
try
{
while ((s = reader.readLine()) != null)
{
if (s.equals("exit"))
{
//TODO
break;
}
}
reader.close();
}
catch (IOException e)
{
e.printStackTrace();
}
}
static CLIServer getServerInstance()
{
return SERVER_INSTANCE;
}
public static Logger getLogger()
{
return logger;
}
}
Here is a simple log configuration which is used for the logger; it is set to log both to console and file:
<Configuration status="INFO">
<Properties>
<Property name="messagePart">%msg</Property>
<Property name="recordPart">%d{MM/dd HH:mm:ss} %logger{36} -> %-5level:</Property>
</Properties>
<Appenders>
<Console name="Console" target="SYSTEM_OUT">
<PatternLayout>
<Pattern>${recordPart} %highlight{${messagePart}}{INFO=cyan,WARN=yellow,ERROR=red,FATAL=red}%n</Pattern>
</PatternLayout>
</Console>
<File name="FileAppend" fileName="server.log" append="false">
<PatternLayout>
<Pattern>${recordPart} ${messagePart}%n</Pattern>
</PatternLayout>
</File>
</Appenders>
<Loggers>
<Root level="info">
<AppenderRef ref="Console"/>
<AppenderRef ref="FileAppend"/>
</Root>
</Loggers>
</Configuration>
Place it into the resources directory and name it "log4j2.xml".
Then we add a shutdown method to the server:
void shutdown()
{
Starter.getLogger().info("Stopping server");
// using a separate thread for this so the server is able to shutdown from itself
Thread thread=new Thread(()->{
try
{
Starter.getLogger().debug(3);
// the order in which the objects are de-allocated does matter
masterGroup.shutdownGracefully().sync();
Starter.getLogger().debug(2);
workerGroup.shutdownGracefully().sync();
Starter.getLogger().debug(1);
future.channel().closeFuture().sync();
Starter.getLogger().info("Server stopped");
System.exit(0);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
},"Server down-shutter");
thread.start();
}
Call this method in the Starter's loop:
while ((s = reader.readLine()) != null)
{
if (s.equals("exit"))
{
SERVER_INSTANCE.shutdown();
break;
}
}
Now we should be able to start the server via Starter, and stop it by typing "exit" in the console.
Next, we can start adding inbound channel handlers to our server. The first handler will be called "CommandHandler" and will simulate predefined command execution:
public class CommandHandler extends ChannelInboundHandlerAdapter
{
// The "msg" parameter is usually an instance of ByteBuf subclass.
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
{
if(msg instanceof ByteBuf)
{
ByteBuf byteBuf = (ByteBuf) msg;
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
{
cause.printStackTrace();
}
}
ByteBuf has many read* methods. Explaining them all is not in scope of this tutorial - we will use only readCharSequence. We'll create 2 commands: "quit" and "stop"; every other input will be printed back to console:
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
{
if(msg instanceof ByteBuf)
{
ByteBuf byteBuf= (ByteBuf) msg;
// read UTF-8 char-sequence
final CharSequence charSequence = byteBuf.readCharSequence(byteBuf.readableBytes(), StandardCharsets.UTF_8);
// It is more convenient to work with strings than char-sequences
final String string=charSequence.toString();
//get rid of whitespace, including line breaks
String trimmed=string.trim();
if(trimmed.equals("quit"))
{
// this command just terminates a session between server and client
ctx.close();
}
else if(trimmed.equals("stop"))
{
// this command stops the server (as well as whole program)
Starter.getServerInstance().shutdown();
ctx.close();
}
else
{
// everything else is being written into the buffer...
byteBuf.writeCharSequence("Unknown command: "+charSequence,StandardCharsets.UTF_8);
//...and sent back to the console
ctx.writeAndFlush(byteBuf);
}
}
}
Next, add this handler to the server channel:
...
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>()
{
@Override
protected void initChannel(SocketChannel ch)
{
ch.pipeline().addLast(new CommandHandler());
}
}).validate();
...
Now start the program and try to connect to the server using some text-based protocol program - for example, on Unix you can use "nc", on Windows you can use "telnet":
nc localhost 59433
telnet localhost 59433
Test the commands. If everything was done correctly, quit will close the connection, stop will start shutdown, and everything else will be printed back as "unknown command".
Now let's add a different channel handler - it will generate random integers or random floats depending on input. Looks really simple:
public class RandomNumbers extends ChannelInboundHandlerAdapter
{
private Random random = new Random();
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg)
{
if (msg instanceof ByteBuf)
{
ByteBuf byteBuf = (ByteBuf) msg;
final String string = byteBuf.readCharSequence(byteBuf.readableBytes(), StandardCharsets.UTF_8).toString();
String trimmed = string.trim();
// generate random integer
if (trimmed.equals("random i"))
{
int ri = random.nextInt();
byteBuf.writeCharSequence(String.valueOf(ri) + '\n', StandardCharsets.UTF_8);
ctx.writeAndFlush(byteBuf);
}
// generate random float
else if (trimmed.equals("random f"))
{
float rf = random.nextFloat();
byteBuf.writeCharSequence(String.valueOf(rf) + '\n', StandardCharsets.UTF_8);
ctx.writeAndFlush(byteBuf);
}
}
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
{
cause.printStackTrace();
}
}
Add it after the CommandHandler:
@Override
protected void initChannel(SocketChannel ch)
{
ch.pipeline().addLast(new CommandHandler(), new RandomNumbers());
}
Although the handler was added, it won't work because CommandHandler doesn't pass the message down the pipeline. To fix it, replace the last else block (where it prints "unknown command") with this block:
else
{
try
{
// reset reader index so the message can be read again
byteBuf.resetReaderIndex();
// super method passes the message to the next handler
super.channelRead(ctx, msg);
}
catch (Exception e)
{
e.printStackTrace();
}
}
That will make the second handler receive the same ByteBuf with the message available again.
Hopefully, this article will help you to get into Netty and master the basics of it. Just in case, here is the final code of the server:
class CLIServer
{
private ChannelFuture future;
private NioEventLoopGroup masterGroup;
private NioEventLoopGroup workerGroup;
CLIServer(int networkPort)
{
masterGroup = new NioEventLoopGroup();
workerGroup = new NioEventLoopGroup();
try
{
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(masterGroup, workerGroup);
serverBootstrap.channel(NioServerSocketChannel.class);
//The value provided should be greater than 0, otherwise an implementation specific default will be used.
serverBootstrap.option(ChannelOption.SO_BACKLOG, 128);
serverBootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>()
{
@Override
protected void initChannel(SocketChannel ch)
{
ch.pipeline().addLast(new CommandHandler(), new RandomNumbers());
}
}).validate();
future = serverBootstrap.bind(networkPort).sync();
Starter.getLogger().info("Started server on port {}", networkPort);
}
catch (Exception e)
{
e.printStackTrace();
shutdown();
}
}
void shutdown()
{
Starter.getLogger().info("Stopping server");
// using a separate thread for this so the server is able to shutdown from itself
Thread thread = new Thread(() -> {
try
{
Starter.getLogger().debug(3);
// the order in which the objects are de-allocated does matter
masterGroup.shutdownGracefully().sync();
Starter.getLogger().debug(2);
workerGroup.shutdownGracefully().sync();
Starter.getLogger().debug(1);
future.channel().closeFuture().sync();
Starter.getLogger().info("Server stopped");
System.exit(0);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}, "Server down-shutter");
thread.start();
}
}
Main page