Envío de archivos del servidor al cliente mediante Netty.io

Estoy tratando de enviar un archivo, un cliente solicitado del servidor, del servidor al cliente. El Cliente especifica el archivo en elFileRequestProtocol, lo envía al servidor, el servidor agrega el tamaño del archivo al archivoFileRequestProtocol y lo devuelve al cliente.

El cliente agrega un nuevoFileChunkReqWriteHandler con el tamaño de archivo correcto a su canalización.

El servidor crea un nuevoChunkedFileServerHandler con el contexto y el archivo deseado e intenta enviarlo, pero elFileChunkReqWriteHandler nunca lee byte del canal.

¿Qué estoy haciendo mal aquí?

Iniciar sesión

INFO  ProtocolHeadHandler:48 - Client send ProtocolHead [version=1, jobType=FILEREQUEST]
INFO  ProtocolHeadServerHandler:36 - Server receive ProtocolHead [version=1, jobType=FILEREQUEST]
INFO  ProtocolHeadHandler:57 - Client ProtocolHead equals, Send Protocol FileRequestProtocol [filePath=test.jpg, fileSize=0]
INFO  FileRequestServerHandler:42 - Server new FileRequest FileRequestProtocol [filePath=test.jpg, fileSize=0]
INFO  FileRequestHandler:41 - Client receives FileRequestProtocol [filePath=test.jpg, fileSize=174878]
INFO  ChunkedFileServerHandler:39 - New ChunkedFileServerHandler
INFO  FileChunkReqWriteHandler:20 - New ChunkedFile Handler FileRequestProtocol [filePath=test.jpg, fileSize=174878]

Cliente

FileRequestHandler.java

public class FileRequestHandler extends
    SimpleChannelInboundHandler<FileRequestProtocol> {

private Logger logger = Logger.getLogger(this.getClass());

public FileRequestHandler() {
}

@Override
public void channelRead0(ChannelHandlerContext ctx, FileRequestProtocol msg) {
    logger.info("Client receives " + msg);
    ReferenceCountUtil.release(msg);
    ctx.channel().pipeline().addLast(new FileChunkReqWriteHandler(msg));
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
    logger.info("Client read complete");
    ctx.flush();
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
    cause.printStackTrace();
    ctx.close();
}
}

FileChunkReqWriteHandler.java

public class FileChunkReqWriteHandler extends SimpleChannelInboundHandler<ChunkedFile> {

FileRequestProtocol fileRequestProtocol;
private Logger logger = Logger.getLogger(this.getClass());


public FileChunkReqWriteHandler(FileRequestProtocol msg) {
    this.fileRequestProtocol = msg;
    logger.info("New ChunkedFile Handler " + msg);
}

@Override
public void channelActive(ChannelHandlerContext ctx) {
    logger.info("in channel active method");
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
    cause.printStackTrace();

    if (ctx.channel().isActive()) {
        ctx.writeAndFlush("ERR: " +
                cause.getClass().getSimpleName() + ": " +
                cause.getMessage() + '\n').addListener(ChannelFutureListener.CLOSE);
    }
}

@Override
protected void channelRead0(ChannelHandlerContext ctx, ChunkedFile msg)
        throws Exception {
    logger.info("in channelRead0");

}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    logger.info("channelRead");
    ByteBuf buf = (ByteBuf) msg;
    byte[] bytes = new byte[buf.readableBytes()];
    buf.readBytes(bytes);
    if(buf.readableBytes() >= this.fileRequestProtocol.getFileSize())
    {
        logger.info("received all data");
    }
}
}

Servidor

FileRequestServerHandler.java

public class FileRequestServerHandler extends
    SimpleChannelInboundHandler<FileRequestProtocol> {

private File f;
private Logger logger = Logger.getLogger(this.getClass());

@Override
public void channelRead0(ChannelHandlerContext ctx, FileRequestProtocol fileRequest) {
    logger.info("Server new FileRequest " + fileRequest);
    f = new File(fileRequest.getFilePath());
    fileRequest.setFileSize(f.length());
    ctx.writeAndFlush(fileRequest);

    new ChunkedFileServerHandler(ctx,f);
}

@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
    logger.info("Server read complete");

}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
    cause.printStackTrace();
    ctx.close();
}
}

ChunkedFileServerHandler.java

public class ChunkedFileServerHandler extends ChunkedWriteHandler {

private Logger logger = Logger.getLogger(this.getClass());

private File file;
public ChunkedFileServerHandler(ChannelHandlerContext ctx, File file) {
    this.file = file;

    logger.info("New ChunkedFileServerHandler");
    ChunkedFile chunkedFile;
    try {
        chunkedFile = new ChunkedFile(this.file);
        ctx.writeAndFlush(chunkedFile);
        ctx.close();
    } catch (IOException e) {
        e.printStackTrace();
    }
}


@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
    super.channelActive(ctx);
    logger.info("FILE WRITE GETS ACTIVE");
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
    cause.printStackTrace();
    ctx.close();
}
}

Actualizar

public class ServerInitializer extends ChannelInitializer<SocketChannel> {

@Override
protected void initChannel(SocketChannel ch) throws Exception {
    ChannelPipeline p = ch.pipeline();

    p.addLast("encoder", new ObjectEncoder());
    p.addLast("decoder",
            new ObjectDecoder(ClassResolvers.cacheDisabled(null)));
    p.addLast("protocolhead", new ProtocolHeadServerHandler());
    p.addLast("filerequestserverhandler", new FileRequestServerHandler());
    p.addLast("chunkedfileserver", new ChunkedFileServerHandler());

}

}

Inicio del servidor

public void startUp()
{
        bossGroup = new NioEventLoopGroup(1);
        workerGroup = new NioEventLoopGroup();
        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
             .channel(NioServerSocketChannel.class)
             .handler(new LoggingHandler(LogLevel.INFO))
             .childHandler(new ServerInitializer());

            // Bind and start to accept incoming connections.
            b.bind(this.port).sync().channel().closeFuture().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
}

Respuestas a la pregunta(0)

Su respuesta a la pregunta