diff --git a/pom.xml b/pom.xml index 98dddff..fe97e01 100644 --- a/pom.xml +++ b/pom.xml @@ -264,7 +264,7 @@ io.netty netty-all - 4.0.14.Beta1 + 4.0.9.Final compile diff --git a/src/main/java/com/torrent4j/TestMain.java b/src/main/java/com/torrent4j/TestMain.java index 33a6f99..3a146c1 100644 --- a/src/main/java/com/torrent4j/TestMain.java +++ b/src/main/java/com/torrent4j/TestMain.java @@ -21,7 +21,7 @@ public class TestMain { new PeerWireProtocol(), new InMemoryTorrentStorage()); controller.start(1234); - final Torrent torrent = Torrent.load(Paths.get("test.torrent")); + final Torrent torrent = Torrent.load(Paths.get("Black Lab - This Night.mp3.torrent")); System.out.println("Torrent hash is " + torrent.getHash().getString()); // controller.checkExistingData(torrent); @@ -29,7 +29,7 @@ public class TestMain { controller.registerTorrent(torrent); final TorrentPeer peer = new TorrentPeer(torrent); peer.setAddress(new InetSocketAddress(Inet4Address - .getByName("127.0.0.1"), 34096)); + .getByName("127.0.0.1"), 63098)); torrent.getSwarm().addPeer(peer); while(true) { diff --git a/src/main/java/com/torrent4j/model/Torrent.java b/src/main/java/com/torrent4j/model/Torrent.java index ffe0b1c..f52a1a8 100644 --- a/src/main/java/com/torrent4j/model/Torrent.java +++ b/src/main/java/com/torrent4j/model/Torrent.java @@ -296,9 +296,11 @@ public class Torrent { * if any error occur while reading the torrent file */ public static Torrent load(InputStream in) throws IOException { - final Object node = new BDecoder(in).readElement(); - final TorrentMetadata metadata = new TorrentMetadata((BMap) node); - return new Torrent(metadata); + try(BDecoder bin = new BDecoder(in)) { + final Object node = bin.readElement(); + final TorrentMetadata metadata = new TorrentMetadata((BMap) node); + return new Torrent(metadata); + } } /** diff --git a/src/main/java/com/torrent4j/model/peer/TorrentPeer.java b/src/main/java/com/torrent4j/model/peer/TorrentPeer.java index 31d1920..6af5a9f 100644 --- a/src/main/java/com/torrent4j/model/peer/TorrentPeer.java +++ b/src/main/java/com/torrent4j/model/peer/TorrentPeer.java @@ -187,7 +187,7 @@ public class TorrentPeer { // NETWORK RELATED THINGS! public void handshake() { - // FIXME sent another peer id + // FIXME send another peer id protocolPeer.handshake(torrent.getHash().getHash(), torrent .getController().getConfig().getPeerID()); } diff --git a/src/main/java/com/torrent4j/net/peerwire/AbstractPeerWireMessage.java b/src/main/java/com/torrent4j/net/peerwire/AbstractPeerWireMessage.java index 0b5cff3..f41ea1c 100644 --- a/src/main/java/com/torrent4j/net/peerwire/AbstractPeerWireMessage.java +++ b/src/main/java/com/torrent4j/net/peerwire/AbstractPeerWireMessage.java @@ -1,6 +1,6 @@ package com.torrent4j.net.peerwire; -import io.netty.buffer.ChannelBuffer; +import io.netty.buffer.ByteBuf; public abstract class AbstractPeerWireMessage implements PeerWireMessage { public final int messageID; @@ -10,19 +10,19 @@ public abstract class AbstractPeerWireMessage implements PeerWireMessage { } @Override - public final void write(ChannelBuffer buffer) { + public final void write(ByteBuf buffer) { buffer.writeByte(messageID); writeImpl(buffer); } - public void writeImpl(ChannelBuffer buffer) { + public void writeImpl(ByteBuf buffer) { } @Override - public final void read(ChannelBuffer buffer) { + public final void read(ByteBuf buffer) { readImpl(buffer); } - public void readImpl(ChannelBuffer buffer) { + public void readImpl(ByteBuf buffer) { } } diff --git a/src/main/java/com/torrent4j/net/peerwire/PeerWirePipelineFactory.java b/src/main/java/com/torrent4j/net/peerwire/PeerWireChannelInitializer.java similarity index 55% rename from src/main/java/com/torrent4j/net/peerwire/PeerWirePipelineFactory.java rename to src/main/java/com/torrent4j/net/peerwire/PeerWireChannelInitializer.java index 9a9982e..5a817c5 100644 --- a/src/main/java/com/torrent4j/net/peerwire/PeerWirePipelineFactory.java +++ b/src/main/java/com/torrent4j/net/peerwire/PeerWireChannelInitializer.java @@ -1,12 +1,12 @@ package com.torrent4j.net.peerwire; -import static io.netty.channel.Channels.pipeline; +import io.netty.channel.Channel; +import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; -import io.netty.channel.ChannelPipelineFactory; +import io.netty.handler.logging.LogLevel; import io.netty.handler.logging.LoggingHandler; -import io.netty.logging.InternalLogLevel; -import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; import com.torrent4j.TorrentController; import com.torrent4j.net.peerwire.codec.PeerWireFrameDecoder; @@ -16,22 +16,22 @@ import com.torrent4j.net.peerwire.codec.PeerWireMessageEncoder; import com.torrent4j.net.peerwire.traffic.PeerTrafficShapingHandler; import com.torrent4j.net.peerwire.traffic.TorrentTrafficShapingHandler; -public class PeerWirePipelineFactory implements ChannelPipelineFactory { +public class PeerWireChannelInitializer extends ChannelInitializer { private final TorrentController controller; - private final Executor executor; + private final ScheduledExecutorService executor; - public PeerWirePipelineFactory(TorrentController controller, - Executor executor) { + public PeerWireChannelInitializer(TorrentController controller, + ScheduledExecutorService executor) { this.controller = controller; this.executor = executor; } @Override - public ChannelPipeline getPipeline() throws Exception { - final ChannelPipeline p = pipeline(); - + protected void initChannel(Channel ch) throws Exception { + final ChannelPipeline p = ch.pipeline(); + p.addLast("torrent-shaper", new TorrentTrafficShapingHandler(executor)); - p.addLast("traffic-shaper", new PeerTrafficShapingHandler(executor)); + p.addLast("traffic-shaper", new PeerTrafficShapingHandler()); p.addLast("frame-decoder", new PeerWireFrameDecoder()); p.addLast("frame-encoder", new PeerWireFrameEncoder()); @@ -39,10 +39,9 @@ public class PeerWirePipelineFactory implements ChannelPipelineFactory { p.addLast("message-decoder", new PeerWireMessageDecoder()); p.addLast("message-encoder", new PeerWireMessageEncoder()); - p.addLast("logging", new LoggingHandler(InternalLogLevel.WARN)); + p.addLast("logging", new LoggingHandler(LogLevel.WARN)); - p.addLast("handler", new PeerWireHandler(controller)); - - return p; + p.addLast("in-handler", new PeerWireInboundHandler(controller)); + p.addLast("out-handler", new PeerWireOutboundHandler(controller)); } } diff --git a/src/main/java/com/torrent4j/net/peerwire/PeerWireHandler.java b/src/main/java/com/torrent4j/net/peerwire/PeerWireInboundHandler.java similarity index 63% rename from src/main/java/com/torrent4j/net/peerwire/PeerWireHandler.java rename to src/main/java/com/torrent4j/net/peerwire/PeerWireInboundHandler.java index a4d7105..de6a15f 100644 --- a/src/main/java/com/torrent4j/net/peerwire/PeerWireHandler.java +++ b/src/main/java/com/torrent4j/net/peerwire/PeerWireInboundHandler.java @@ -1,25 +1,20 @@ package com.torrent4j.net.peerwire; -import io.netty.channel.ChannelFuture; -import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; -import io.netty.channel.MessageEvent; -import io.netty.channel.SimpleChannelHandler; +import io.netty.channel.ChannelInboundHandlerAdapter; import java.net.InetSocketAddress; import java.util.Date; import com.torrent4j.TorrentController; import com.torrent4j.model.Torrent; +import com.torrent4j.model.TorrentPiece; +import com.torrent4j.model.TorrentPieceBlock; import com.torrent4j.model.peer.TorrentPeer; import com.torrent4j.model.peer.TorrentPeerChoking; import com.torrent4j.model.peer.TorrentPeerInterest; -import com.torrent4j.model.TorrentPiece; -import com.torrent4j.model.TorrentPieceBlock; import com.torrent4j.net.peerwire.codec.PeerWireFrameDecoder; -import com.torrent4j.net.peerwire.codec.PeerWireFrameEncoder; import com.torrent4j.net.peerwire.codec.PeerWireMessageDecoder; -import com.torrent4j.net.peerwire.codec.PeerWireMessageEncoder; import com.torrent4j.net.peerwire.messages.BitFieldMessage; import com.torrent4j.net.peerwire.messages.BlockMessage; import com.torrent4j.net.peerwire.messages.CancelMessage; @@ -35,52 +30,51 @@ import com.torrent4j.net.peerwire.traffic.TorrentTrafficShapingHandler; import com.torrent4j.util.Hash; import com.torrent4j.util.HashType; -public class PeerWireHandler extends SimpleChannelHandler { +public class PeerWireInboundHandler extends ChannelInboundHandlerAdapter { private final TorrentController controller; private PeerWireProtocolPeer peer; - public PeerWireHandler(TorrentController controller) { + public PeerWireInboundHandler(TorrentController controller) { this.controller = controller; } @Override - public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { try { - final Object msg = e.getMessage(); if (!(msg instanceof PeerWireMessage)) return; if (msg instanceof HandshakeMessage) { final HandshakeMessage message = (HandshakeMessage) msg; - ((PeerWireFrameDecoder) e.getChannel().getPipeline() + ((PeerWireFrameDecoder) ctx.channel().pipeline() .get("frame-decoder")).setHandshaked(true); - ((PeerWireMessageDecoder) e.getChannel().getPipeline() + ((PeerWireMessageDecoder) ctx.channel().pipeline() .get("message-decoder")).setHandshaked(true); final Hash hash = new Hash(HashType.SHA1, message.torrentHash); final Torrent torrent = controller.findTorrent(hash); if (torrent == null) { - e.getChannel().disconnect(); + ctx.channel().disconnect(); return; } TorrentPeer peer = torrent.getSwarm().findPeer( - (InetSocketAddress) e.getChannel().getRemoteAddress(), + (InetSocketAddress) ctx.channel().remoteAddress(), message.peerID); if (peer == null) { peer = new TorrentPeer(torrent); - peer.setAddress((InetSocketAddress) e.getChannel() - .getRemoteAddress()); + peer.setAddress((InetSocketAddress) ctx.channel() + .remoteAddress()); } peer.setPeerID(message.peerID); this.peer = (PeerWireProtocolPeer) peer.getProtocolPeer(); - e.getChannel().getPipeline() + ctx.channel().pipeline() .get(PeerTrafficShapingHandler.class).setPeer(peer); - e.getChannel().getPipeline() + ctx.channel().pipeline() .get(TorrentTrafficShapingHandler.class) .setTorrent(torrent); @@ -209,93 +203,7 @@ public class PeerWireHandler extends SimpleChannelHandler { System.out.println(msg); } } finally { - ctx.sendUpstream(e); + super.channelRead(ctx, msg); } } - - @Override - public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) - throws Exception { - try { - final Object msg = e.getMessage(); - if (!(msg instanceof PeerWireMessage)) - return; - if (msg instanceof HandshakeMessage) { - e.getFuture().addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) - throws Exception { - ((PeerWireFrameEncoder) future.getChannel() - .getPipeline().get("frame-encoder")) - .setHandshaked(true); - ((PeerWireMessageEncoder) future.getChannel() - .getPipeline().get("message-encoder")) - .setHandshaked(true); - } - }); - } else if (msg instanceof BlockMessage) { - final BlockMessage message = (BlockMessage) msg; - - final TorrentPiece piece = peer.getTorrent().getPiece( - message.pieceIndex); - final TorrentPieceBlock block = piece.getBlock(message.begin, - message.data.remaining()); - - peer.getTorrentPeer().getState().setLastUploadedBlock(block); - peer.getTorrentPeer().getState() - .setLastUploadedBlockDate(new Date()); - - e.getFuture().addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) - throws Exception { - if (!future.isSuccess()) - return; - peer.getTorrentPeer().getState() - .setUploadRequestedBlock(null); - peer.getTorrentPeer().getState() - .setUploadRequestedDate(null); - } - }); - } else if (msg instanceof RequestMessage) { - final RequestMessage message = (RequestMessage) msg; - - final TorrentPiece piece = peer.getTorrent().getPiece( - message.pieceIndex); - final TorrentPieceBlock block = piece.getBlock(message.begin, - message.length); - - peer.getTorrentPeer().getState() - .setDownloadRequestedBlock(block); - peer.getTorrentPeer().getState() - .setDownloadRequestedDate(new Date()); - } else if (msg instanceof CancelMessage) { - e.getFuture().addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture future) - throws Exception { - if (!future.isSuccess()) - return; - peer.getTorrentPeer().getState() - .setDownloadRequestedBlock(null); - peer.getTorrentPeer().getState() - .setDownloadRequestedDate(null); - } - }); - } - } finally { - ctx.sendDownstream(e); - } - } - - // @Override - // public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent - // e) - // throws Exception { - // try { - // peer = new PeerWireProtocolPeer(e.getChannel()); - // } finally { - // ctx.sendUpstream(e); - // } - // } } diff --git a/src/main/java/com/torrent4j/net/peerwire/PeerWireMessage.java b/src/main/java/com/torrent4j/net/peerwire/PeerWireMessage.java index bcb98df..388a567 100644 --- a/src/main/java/com/torrent4j/net/peerwire/PeerWireMessage.java +++ b/src/main/java/com/torrent4j/net/peerwire/PeerWireMessage.java @@ -1,9 +1,9 @@ package com.torrent4j.net.peerwire; -import io.netty.buffer.ChannelBuffer; +import io.netty.buffer.ByteBuf; public interface PeerWireMessage { - void write(ChannelBuffer buffer); + void write(ByteBuf buffer); - void read(ChannelBuffer buffer); + void read(ByteBuf buffer); } diff --git a/src/main/java/com/torrent4j/net/peerwire/PeerWireOutboundHandler.java b/src/main/java/com/torrent4j/net/peerwire/PeerWireOutboundHandler.java new file mode 100644 index 0000000..51cd6b1 --- /dev/null +++ b/src/main/java/com/torrent4j/net/peerwire/PeerWireOutboundHandler.java @@ -0,0 +1,103 @@ +package com.torrent4j.net.peerwire; + +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelOutboundHandlerAdapter; +import io.netty.channel.ChannelPromise; + +import java.util.Date; + +import com.torrent4j.TorrentController; +import com.torrent4j.model.TorrentPiece; +import com.torrent4j.model.TorrentPieceBlock; +import com.torrent4j.net.peerwire.codec.PeerWireFrameEncoder; +import com.torrent4j.net.peerwire.codec.PeerWireMessageEncoder; +import com.torrent4j.net.peerwire.messages.BlockMessage; +import com.torrent4j.net.peerwire.messages.CancelMessage; +import com.torrent4j.net.peerwire.messages.HandshakeMessage; +import com.torrent4j.net.peerwire.messages.RequestMessage; + +public class PeerWireOutboundHandler extends ChannelOutboundHandlerAdapter { + @SuppressWarnings("unused") + private final TorrentController controller; + private PeerWireProtocolPeer peer; + + public PeerWireOutboundHandler(TorrentController controller) { + this.controller = controller; + } + + @Override + public void write(ChannelHandlerContext ctx, Object msg, + ChannelPromise promise) throws Exception { + try { + if (!(msg instanceof PeerWireMessage)) + return; + if (msg instanceof HandshakeMessage) { + promise.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) + throws Exception { + ((PeerWireFrameEncoder) future.channel() + .pipeline().get("frame-encoder")) + .setHandshaked(true); + ((PeerWireMessageEncoder) future.channel() + .pipeline().get("message-encoder")) + .setHandshaked(true); + } + }); + } else if (msg instanceof BlockMessage) { + final BlockMessage message = (BlockMessage) msg; + + final TorrentPiece piece = peer.getTorrent().getPiece( + message.pieceIndex); + final TorrentPieceBlock block = piece.getBlock(message.begin, + message.data.remaining()); + + peer.getTorrentPeer().getState().setLastUploadedBlock(block); + peer.getTorrentPeer().getState() + .setLastUploadedBlockDate(new Date()); + + promise.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) + throws Exception { + if (!future.isSuccess()) + return; + peer.getTorrentPeer().getState() + .setUploadRequestedBlock(null); + peer.getTorrentPeer().getState() + .setUploadRequestedDate(null); + } + }); + } else if (msg instanceof RequestMessage) { + final RequestMessage message = (RequestMessage) msg; + + final TorrentPiece piece = peer.getTorrent().getPiece( + message.pieceIndex); + final TorrentPieceBlock block = piece.getBlock(message.begin, + message.length); + + peer.getTorrentPeer().getState() + .setDownloadRequestedBlock(block); + peer.getTorrentPeer().getState() + .setDownloadRequestedDate(new Date()); + } else if (msg instanceof CancelMessage) { + promise.addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) + throws Exception { + if (!future.isSuccess()) + return; + peer.getTorrentPeer().getState() + .setDownloadRequestedBlock(null); + peer.getTorrentPeer().getState() + .setDownloadRequestedDate(null); + } + }); + } + } finally { + super.write(ctx, msg, promise); + } + } +} diff --git a/src/main/java/com/torrent4j/net/peerwire/PeerWireProtocol.java b/src/main/java/com/torrent4j/net/peerwire/PeerWireProtocol.java index ae8f93c..ecb644c 100644 --- a/src/main/java/com/torrent4j/net/peerwire/PeerWireProtocol.java +++ b/src/main/java/com/torrent4j/net/peerwire/PeerWireProtocol.java @@ -1,51 +1,63 @@ package com.torrent4j.net.peerwire; -import io.netty.bootstrap.ClientBootstrap; +import io.netty.bootstrap.Bootstrap; import io.netty.bootstrap.ServerBootstrap; -import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; -import io.netty.channel.socket.nio.NioClientSocketChannelFactory; -import io.netty.channel.socket.nio.NioServerSocketChannelFactory; +import io.netty.channel.ChannelOption; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.channel.socket.nio.NioServerSocketChannel; +import io.netty.channel.socket.nio.NioSocketChannel; import java.net.Inet4Address; import java.net.InetSocketAddress; import java.net.UnknownHostException; -import java.util.concurrent.Executor; import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import com.torrent4j.TorrentController; import com.torrent4j.model.peer.TorrentPeer; import com.torrent4j.net.TorrentProtocol; public class PeerWireProtocol implements TorrentProtocol { - private final Executor threadPool = Executors.newCachedThreadPool(); + private final ScheduledExecutorService threadPool = Executors + .newScheduledThreadPool(1); - private final ServerBootstrap serverBootstrap = new ServerBootstrap( - new NioServerSocketChannelFactory(threadPool, threadPool)); - private final ClientBootstrap clientBootstrap = new ClientBootstrap( - new NioClientSocketChannelFactory(threadPool, threadPool)); + private final EventLoopGroup bossGroup = new NioEventLoopGroup(); + private final EventLoopGroup workerGroup = new NioEventLoopGroup(); - private Channel serverChannel; + private final ServerBootstrap serverBootstrap = new ServerBootstrap(); + private final Bootstrap clientBootstrap = new Bootstrap(); + + private NioServerSocketChannel serverChannel; @Override public void start(TorrentController controller, int listenPort) { - serverBootstrap.setPipelineFactory(new PeerWirePipelineFactory( - controller, threadPool)); - clientBootstrap.setPipelineFactory(new PeerWirePipelineFactory( - controller, threadPool)); + serverBootstrap + .group(bossGroup, workerGroup) + .channel(NioServerSocketChannel.class) + .option(ChannelOption.SO_BACKLOG, 100) + .localAddress(listenPort) + .childOption(ChannelOption.TCP_NODELAY, true) + .childHandler( + new PeerWireChannelInitializer(controller, threadPool)).validate(); + clientBootstrap + .group(workerGroup) + .channel(NioSocketChannel.class) + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 100) + //.option(ChannelOption.SO_BACKLOG, 100) + .handler(new PeerWireChannelInitializer(controller, threadPool)).validate(); try { - serverChannel = serverBootstrap.bind(new InetSocketAddress( - Inet4Address.getByName("0.0.0.0"), listenPort)); + serverChannel = (NioServerSocketChannel) serverBootstrap.bind(new InetSocketAddress( + Inet4Address.getByName("0.0.0.0"), listenPort)).channel(); } catch (UnknownHostException e) { } } @Override public void stop() { - clientBootstrap.releaseExternalResources(); serverChannel.close(); - serverBootstrap.releaseExternalResources(); } @Override @@ -54,7 +66,7 @@ public class PeerWireProtocol implements TorrentProtocol { .awaitUninterruptibly(); if (future.isSuccess()) { final PeerWireProtocolPeer protocolPeer = new PeerWireProtocolPeer( - future.getChannel()); + future.channel()); protocolPeer.setTorrentPeer(peer); peer.setProtocolPeer(protocolPeer); return true; diff --git a/src/main/java/com/torrent4j/net/peerwire/PeerWireProtocolPeer.java b/src/main/java/com/torrent4j/net/peerwire/PeerWireProtocolPeer.java index c672af4..66eb69c 100644 --- a/src/main/java/com/torrent4j/net/peerwire/PeerWireProtocolPeer.java +++ b/src/main/java/com/torrent4j/net/peerwire/PeerWireProtocolPeer.java @@ -33,13 +33,13 @@ public class PeerWireProtocolPeer implements TorrentProtocolPeer { @Override public boolean connect() { - return channel.connect(channel.getRemoteAddress()) + return channel.connect(channel.remoteAddress()) .awaitUninterruptibly().isSuccess(); } @Override public boolean isConnected() { - return channel.isConnected(); + return channel.isActive(); } @Override diff --git a/src/main/java/com/torrent4j/net/peerwire/codec/PeerWireFrameDecoder.java b/src/main/java/com/torrent4j/net/peerwire/codec/PeerWireFrameDecoder.java index d31b46b..dcd9e43 100644 --- a/src/main/java/com/torrent4j/net/peerwire/codec/PeerWireFrameDecoder.java +++ b/src/main/java/com/torrent4j/net/peerwire/codec/PeerWireFrameDecoder.java @@ -1,36 +1,37 @@ package com.torrent4j.net.peerwire.codec; -import io.netty.buffer.ChannelBuffer; -import io.netty.channel.Channel; +import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.frame.FrameDecoder; +import io.netty.handler.codec.ByteToMessageDecoder; -public class PeerWireFrameDecoder extends FrameDecoder { +import java.util.List; + +public class PeerWireFrameDecoder extends ByteToMessageDecoder { private boolean handshaked = false; @Override - protected Object decode(ChannelHandlerContext ctx, Channel channel, - ChannelBuffer buffer) throws Exception { + protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, + List out) throws Exception { if (!handshaked) { - int pos = buffer.readerIndex(); - buffer.skipBytes(68); - return buffer.copy(pos, 68); + out.add(buffer); + return; } if (buffer.readableBytes() < 4) - return null; + return; int pos = buffer.readerIndex(); final int len = buffer.readInt(); if (buffer.readableBytes() >= len) { try { - return buffer.slice(buffer.readerIndex(), len); + out.add(buffer.slice(buffer.readerIndex(), len)); + return; } finally { buffer.skipBytes(len); } } else { buffer.readerIndex(pos); - return null; + return; } } diff --git a/src/main/java/com/torrent4j/net/peerwire/codec/PeerWireFrameEncoder.java b/src/main/java/com/torrent4j/net/peerwire/codec/PeerWireFrameEncoder.java index 61a2abd..fa98947 100644 --- a/src/main/java/com/torrent4j/net/peerwire/codec/PeerWireFrameEncoder.java +++ b/src/main/java/com/torrent4j/net/peerwire/codec/PeerWireFrameEncoder.java @@ -1,27 +1,28 @@ package com.torrent4j.net.peerwire.codec; -import io.netty.buffer.ChannelBuffer; -import io.netty.channel.Channel; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.oneone.OneToOneEncoder; +import io.netty.handler.codec.MessageToByteEncoder; -public class PeerWireFrameEncoder extends OneToOneEncoder { +public class PeerWireFrameEncoder extends MessageToByteEncoder { private boolean handshaked = false; @Override - protected Object encode(ChannelHandlerContext ctx, Channel channel, - Object msg) throws Exception { - if (!(msg instanceof ChannelBuffer) || !handshaked) - return msg; - final ChannelBuffer buffer = (ChannelBuffer) msg; - - buffer.readerIndex(0); - if (handshaked) { - final int len = buffer.readableBytes() - 4; - buffer.setInt(0, len); + protected void encode(ChannelHandlerContext ctx, ByteBuf inputBuffer, ByteBuf outputBuffer) + throws Exception { + if (!handshaked) { + outputBuffer.writeBytes(inputBuffer); + } else { + inputBuffer.readerIndex(0); + outputBuffer.writeInt(0x00); + outputBuffer.writeBytes(inputBuffer); + if (handshaked) { + final int len = inputBuffer.readableBytes(); + outputBuffer.setInt(0, len); + } } - - return buffer; + System.out.println(ByteBufUtil.hexDump(outputBuffer)); } public void setHandshaked(boolean handshaked) { diff --git a/src/main/java/com/torrent4j/net/peerwire/codec/PeerWireMessageDecoder.java b/src/main/java/com/torrent4j/net/peerwire/codec/PeerWireMessageDecoder.java index 75f9b2b..6b1aa15 100644 --- a/src/main/java/com/torrent4j/net/peerwire/codec/PeerWireMessageDecoder.java +++ b/src/main/java/com/torrent4j/net/peerwire/codec/PeerWireMessageDecoder.java @@ -1,9 +1,10 @@ package com.torrent4j.net.peerwire.codec; -import io.netty.buffer.ChannelBuffer; -import io.netty.channel.Channel; +import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.oneone.OneToOneDecoder; +import io.netty.handler.codec.ReplayingDecoder; + +import java.util.List; import com.torrent4j.net.peerwire.PeerWireMessage; import com.torrent4j.net.peerwire.messages.BitFieldMessage; @@ -19,64 +20,63 @@ import com.torrent4j.net.peerwire.messages.PortMessage; import com.torrent4j.net.peerwire.messages.RequestMessage; import com.torrent4j.net.peerwire.messages.UnchokeMessage; -public class PeerWireMessageDecoder extends OneToOneDecoder { +public class PeerWireMessageDecoder extends ReplayingDecoder { private boolean handshaked = false; - + @Override - protected Object decode(ChannelHandlerContext ctx, Channel channel, - Object msg) throws Exception { - if (!(msg instanceof ChannelBuffer)) - return msg; - final ChannelBuffer buffer = (ChannelBuffer) msg; - - if (!handshaked) { - final HandshakeMessage message = new HandshakeMessage(); - message.read(buffer); - - return message; - } else { - if(buffer.readableBytes() == 0) - return new KeepAliveMessage(); - - final byte opcode = buffer.readByte(); - final PeerWireMessage message; - switch (opcode) { - case CancelMessage.MESSAGE_ID: - message = new CancelMessage(); - break; - case BitFieldMessage.MESSAGE_ID: - message = new BitFieldMessage(); - break; - case ChokeMessage.MESSAGE_ID: - message = new ChokeMessage(); - break; - case HaveMessage.MESSAGE_ID: - message = new HaveMessage(); - break; - case InterestedMessage.MESSAGE_ID: - message = new InterestedMessage(); - break; - case NotInterestedMessage.MESSAGE_ID: - message = new NotInterestedMessage(); - break; - case BlockMessage.MESSAGE_ID: - message = new BlockMessage(); - break; - case PortMessage.MESSAGE_ID: - message = new PortMessage(); - break; - case RequestMessage.MESSAGE_ID: - message = new RequestMessage(); - break; - case UnchokeMessage.MESSAGE_ID: - message = new UnchokeMessage(); - break; - default: - return null; + protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, + List out) throws Exception { + if (!handshaked) { + final HandshakeMessage message = new HandshakeMessage(); + message.read(buffer); + + out.add(message); + return; + } else { + if(buffer.readableBytes() == 0) { + out.add(new KeepAliveMessage()); + return; + } + + final byte opcode = buffer.readByte(); + final PeerWireMessage message; + switch (opcode) { + case CancelMessage.MESSAGE_ID: + message = new CancelMessage(); + break; + case BitFieldMessage.MESSAGE_ID: + message = new BitFieldMessage(); + break; + case ChokeMessage.MESSAGE_ID: + message = new ChokeMessage(); + break; + case HaveMessage.MESSAGE_ID: + message = new HaveMessage(); + break; + case InterestedMessage.MESSAGE_ID: + message = new InterestedMessage(); + break; + case NotInterestedMessage.MESSAGE_ID: + message = new NotInterestedMessage(); + break; + case BlockMessage.MESSAGE_ID: + message = new BlockMessage(); + break; + case PortMessage.MESSAGE_ID: + message = new PortMessage(); + break; + case RequestMessage.MESSAGE_ID: + message = new RequestMessage(); + break; + case UnchokeMessage.MESSAGE_ID: + message = new UnchokeMessage(); + break; + default: + return; + } + message.read(buffer); + out.add(message); } - message.read(buffer); - return message; - } } public void setHandshaked(boolean handshaked) { diff --git a/src/main/java/com/torrent4j/net/peerwire/codec/PeerWireMessageEncoder.java b/src/main/java/com/torrent4j/net/peerwire/codec/PeerWireMessageEncoder.java index 9863ed8..cf18b25 100644 --- a/src/main/java/com/torrent4j/net/peerwire/codec/PeerWireMessageEncoder.java +++ b/src/main/java/com/torrent4j/net/peerwire/codec/PeerWireMessageEncoder.java @@ -1,30 +1,21 @@ package com.torrent4j.net.peerwire.codec; -import io.netty.buffer.ChannelBuffer; -import io.netty.buffer.ChannelBuffers; -import io.netty.channel.Channel; +import io.netty.buffer.ByteBuf; import io.netty.channel.ChannelHandlerContext; -import io.netty.handler.codec.oneone.OneToOneEncoder; +import io.netty.handler.codec.MessageToByteEncoder; import com.torrent4j.net.peerwire.PeerWireMessage; import com.torrent4j.net.peerwire.messages.KeepAliveMessage; -public class PeerWireMessageEncoder extends OneToOneEncoder { +public class PeerWireMessageEncoder extends MessageToByteEncoder { private boolean handshaked = false; @Override - protected Object encode(ChannelHandlerContext ctx, Channel channel, - Object msg) throws Exception { - if (!(msg instanceof PeerWireMessage)) - return msg; - final PeerWireMessage message = (PeerWireMessage) msg; - final ChannelBuffer buffer = ChannelBuffers.dynamicBuffer(); - + protected void encode(ChannelHandlerContext ctx, PeerWireMessage message, + ByteBuf buffer) throws Exception { if (handshaked && !(message instanceof KeepAliveMessage)) buffer.writeInt(0x00); message.write(buffer); - - return buffer; } public void setHandshaked(boolean handshaked) { diff --git a/src/main/java/com/torrent4j/net/peerwire/messages/BitFieldMessage.java b/src/main/java/com/torrent4j/net/peerwire/messages/BitFieldMessage.java index 565cd33..fe71a4a 100644 --- a/src/main/java/com/torrent4j/net/peerwire/messages/BitFieldMessage.java +++ b/src/main/java/com/torrent4j/net/peerwire/messages/BitFieldMessage.java @@ -1,6 +1,6 @@ package com.torrent4j.net.peerwire.messages; -import io.netty.buffer.ChannelBuffer; +import io.netty.buffer.ByteBuf; import java.util.BitSet; @@ -21,7 +21,7 @@ public class BitFieldMessage extends AbstractPeerWireMessage { } @Override - public void writeImpl(ChannelBuffer buffer) { + public void writeImpl(ByteBuf buffer) { for (int i = 0; i < bitSet.size();) { byte data = 0; for (int j = 128; i < bitSet.size() && j > 0; j >>= 1, i++) { @@ -34,10 +34,10 @@ public class BitFieldMessage extends AbstractPeerWireMessage { } @Override - public void readImpl(ChannelBuffer buffer) { + public void readImpl(ByteBuf buffer) { bitSet = new BitSet(); int i = 0; - while (buffer.readable()) { + while (buffer.isReadable()) { byte b = buffer.readByte(); for (int j = 128; j > 0; j >>= 1) { bitSet.set(i++, (b & j) != 0); diff --git a/src/main/java/com/torrent4j/net/peerwire/messages/BlockMessage.java b/src/main/java/com/torrent4j/net/peerwire/messages/BlockMessage.java index a0f1fba..2e19246 100644 --- a/src/main/java/com/torrent4j/net/peerwire/messages/BlockMessage.java +++ b/src/main/java/com/torrent4j/net/peerwire/messages/BlockMessage.java @@ -1,6 +1,6 @@ package com.torrent4j.net.peerwire.messages; -import io.netty.buffer.ChannelBuffer; +import io.netty.buffer.ByteBuf; import java.nio.ByteBuffer; @@ -25,17 +25,17 @@ public class BlockMessage extends AbstractPeerWireMessage { } @Override - public void writeImpl(ChannelBuffer buffer) { + public void writeImpl(ByteBuf buffer) { buffer.writeInt(pieceIndex); buffer.writeInt(begin); buffer.writeBytes(data); } @Override - public void readImpl(ChannelBuffer buffer) { + public void readImpl(ByteBuf buffer) { pieceIndex = buffer.readInt(); begin = buffer.readInt(); - data = buffer.readBytes(buffer.readableBytes()).toByteBuffer(); + data = buffer.readBytes(buffer.readableBytes()).nioBuffer(); } /* diff --git a/src/main/java/com/torrent4j/net/peerwire/messages/CancelMessage.java b/src/main/java/com/torrent4j/net/peerwire/messages/CancelMessage.java index 86bd7e0..a1ae0d2 100644 --- a/src/main/java/com/torrent4j/net/peerwire/messages/CancelMessage.java +++ b/src/main/java/com/torrent4j/net/peerwire/messages/CancelMessage.java @@ -1,6 +1,6 @@ package com.torrent4j.net.peerwire.messages; -import io.netty.buffer.ChannelBuffer; +import io.netty.buffer.ByteBuf; import com.torrent4j.net.peerwire.AbstractPeerWireMessage; @@ -23,14 +23,14 @@ public class CancelMessage extends AbstractPeerWireMessage { } @Override - public void writeImpl(ChannelBuffer buffer) { + public void writeImpl(ByteBuf buffer) { buffer.writeInt(pieceIndex); buffer.writeInt(begin); buffer.writeInt(length); } @Override - public void readImpl(ChannelBuffer buffer) { + public void readImpl(ByteBuf buffer) { pieceIndex = buffer.readInt(); begin = buffer.readInt(); length = buffer.readInt(); diff --git a/src/main/java/com/torrent4j/net/peerwire/messages/HandshakeMessage.java b/src/main/java/com/torrent4j/net/peerwire/messages/HandshakeMessage.java index 51ae041..20d51bc 100644 --- a/src/main/java/com/torrent4j/net/peerwire/messages/HandshakeMessage.java +++ b/src/main/java/com/torrent4j/net/peerwire/messages/HandshakeMessage.java @@ -1,6 +1,7 @@ package com.torrent4j.net.peerwire.messages; -import io.netty.buffer.ChannelBuffer; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufUtil; import java.nio.charset.Charset; import java.util.Arrays; @@ -23,7 +24,7 @@ public class HandshakeMessage implements PeerWireMessage { } @Override - public void write(ChannelBuffer buffer) { + public void write(ByteBuf buffer) { buffer.writeByte(protocolStringLength); buffer.writeBytes(protocolString.getBytes()); buffer.writeLong(reserved); @@ -32,9 +33,12 @@ public class HandshakeMessage implements PeerWireMessage { } @Override - public void read(ChannelBuffer buffer) { + public void read(ByteBuf buffer) { protocolStringLength = buffer.readByte(); - protocolString = buffer.readBytes(protocolStringLength).toString(); + byte[] str = new byte[protocolStringLength]; + buffer.readBytes(str); + protocolString = new String(str); + //protocolString = buffer.readBytes reserved = buffer.readLong(); torrentHash = buffer.readBytes(20).array(); peerID = buffer.readBytes(20).toString(Charset.forName("UTF-8")); diff --git a/src/main/java/com/torrent4j/net/peerwire/messages/HaveMessage.java b/src/main/java/com/torrent4j/net/peerwire/messages/HaveMessage.java index ad369b3..f1bcb8f 100644 --- a/src/main/java/com/torrent4j/net/peerwire/messages/HaveMessage.java +++ b/src/main/java/com/torrent4j/net/peerwire/messages/HaveMessage.java @@ -1,6 +1,6 @@ package com.torrent4j.net.peerwire.messages; -import io.netty.buffer.ChannelBuffer; +import io.netty.buffer.ByteBuf; import com.torrent4j.net.peerwire.AbstractPeerWireMessage; @@ -19,12 +19,12 @@ public class HaveMessage extends AbstractPeerWireMessage { } @Override - public void writeImpl(ChannelBuffer buffer) { + public void writeImpl(ByteBuf buffer) { buffer.writeInt(pieceIndex); } @Override - public void readImpl(ChannelBuffer buffer) { + public void readImpl(ByteBuf buffer) { pieceIndex = buffer.readInt(); } diff --git a/src/main/java/com/torrent4j/net/peerwire/messages/KeepAliveMessage.java b/src/main/java/com/torrent4j/net/peerwire/messages/KeepAliveMessage.java index 9e278b0..a3c840f 100644 --- a/src/main/java/com/torrent4j/net/peerwire/messages/KeepAliveMessage.java +++ b/src/main/java/com/torrent4j/net/peerwire/messages/KeepAliveMessage.java @@ -1,6 +1,6 @@ package com.torrent4j.net.peerwire.messages; -import io.netty.buffer.ChannelBuffer; +import io.netty.buffer.ByteBuf; import com.torrent4j.net.peerwire.PeerWireMessage; @@ -9,11 +9,11 @@ public class KeepAliveMessage implements PeerWireMessage { } @Override - public void write(ChannelBuffer buffer) { + public void write(ByteBuf buffer) { } @Override - public void read(ChannelBuffer buffer) { + public void read(ByteBuf buffer) { } /* diff --git a/src/main/java/com/torrent4j/net/peerwire/messages/PortMessage.java b/src/main/java/com/torrent4j/net/peerwire/messages/PortMessage.java index 1560cd3..1391a25 100644 --- a/src/main/java/com/torrent4j/net/peerwire/messages/PortMessage.java +++ b/src/main/java/com/torrent4j/net/peerwire/messages/PortMessage.java @@ -1,6 +1,6 @@ package com.torrent4j.net.peerwire.messages; -import io.netty.buffer.ChannelBuffer; +import io.netty.buffer.ByteBuf; import com.torrent4j.net.peerwire.AbstractPeerWireMessage; @@ -19,12 +19,12 @@ public class PortMessage extends AbstractPeerWireMessage { } @Override - public void writeImpl(ChannelBuffer buffer) { + public void writeImpl(ByteBuf buffer) { buffer.writeInt(listenPort); } @Override - public void readImpl(ChannelBuffer buffer) { + public void readImpl(ByteBuf buffer) { listenPort = buffer.readInt(); } diff --git a/src/main/java/com/torrent4j/net/peerwire/messages/RequestMessage.java b/src/main/java/com/torrent4j/net/peerwire/messages/RequestMessage.java index 144954d..155a2a3 100644 --- a/src/main/java/com/torrent4j/net/peerwire/messages/RequestMessage.java +++ b/src/main/java/com/torrent4j/net/peerwire/messages/RequestMessage.java @@ -1,6 +1,6 @@ package com.torrent4j.net.peerwire.messages; -import io.netty.buffer.ChannelBuffer; +import io.netty.buffer.ByteBuf; import com.torrent4j.net.peerwire.AbstractPeerWireMessage; @@ -23,14 +23,14 @@ public class RequestMessage extends AbstractPeerWireMessage { } @Override - public void writeImpl(ChannelBuffer buffer) { + public void writeImpl(ByteBuf buffer) { buffer.writeInt(pieceIndex); buffer.writeInt(begin); buffer.writeInt(length); } @Override - public void readImpl(ChannelBuffer buffer) { + public void readImpl(ByteBuf buffer) { pieceIndex = buffer.readInt(); begin = buffer.readInt(); length = buffer.readInt(); diff --git a/src/main/java/com/torrent4j/net/peerwire/traffic/PeerTrafficShapingHandler.java b/src/main/java/com/torrent4j/net/peerwire/traffic/PeerTrafficShapingHandler.java index 049c1f3..ef5134f 100644 --- a/src/main/java/com/torrent4j/net/peerwire/traffic/PeerTrafficShapingHandler.java +++ b/src/main/java/com/torrent4j/net/peerwire/traffic/PeerTrafficShapingHandler.java @@ -3,8 +3,6 @@ package com.torrent4j.net.peerwire.traffic; import io.netty.handler.traffic.ChannelTrafficShapingHandler; import io.netty.handler.traffic.TrafficCounter; -import java.util.concurrent.Executor; - import com.torrent4j.model.peer.TorrentPeer; import com.torrent4j.model.peer.TorrentPeerTrafficControl; @@ -14,8 +12,8 @@ public class PeerTrafficShapingHandler extends ChannelTrafficShapingHandler { private TorrentPeer peer; - public PeerTrafficShapingHandler(Executor executor) { - super(executor, 0, 0); + public PeerTrafficShapingHandler() { + super(0, 0); } private void reconfigure() { @@ -37,9 +35,9 @@ public class PeerTrafficShapingHandler extends ChannelTrafficShapingHandler { return; reconfigure(); peer.getTrafficControl().setCurrentDownloadSpeed( - counter.getLastReadThroughput()); + counter.lastReadThroughput()); peer.getTrafficControl().setCurrentUploadSpeed( - counter.getLastWriteThroughput()); + counter.lastWriteThroughput()); } /** diff --git a/src/main/java/com/torrent4j/net/peerwire/traffic/TorrentTrafficShapingHandler.java b/src/main/java/com/torrent4j/net/peerwire/traffic/TorrentTrafficShapingHandler.java index 4baec2c..feeae52 100644 --- a/src/main/java/com/torrent4j/net/peerwire/traffic/TorrentTrafficShapingHandler.java +++ b/src/main/java/com/torrent4j/net/peerwire/traffic/TorrentTrafficShapingHandler.java @@ -3,7 +3,7 @@ package com.torrent4j.net.peerwire.traffic; import io.netty.handler.traffic.GlobalTrafficShapingHandler; import io.netty.handler.traffic.TrafficCounter; -import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; import com.torrent4j.model.Torrent; import com.torrent4j.model.TorrentTrafficControl; @@ -14,7 +14,7 @@ public class TorrentTrafficShapingHandler extends GlobalTrafficShapingHandler { private long writeLimit; private long readLimit; - public TorrentTrafficShapingHandler(Executor executor) { + public TorrentTrafficShapingHandler(ScheduledExecutorService executor) { super(executor, 0, 0); } @@ -37,9 +37,9 @@ public class TorrentTrafficShapingHandler extends GlobalTrafficShapingHandler { return; reconfigure(); torrent.getTrafficControl().setCurrentDownloadSpeed( - counter.getLastReadThroughput()); + counter.lastReadThroughput()); torrent.getTrafficControl().setCurrentUploadSpeed( - counter.getLastWriteThroughput()); + counter.lastWriteThroughput()); } /** diff --git a/src/main/java/com/torrent4j/strategy/standard/StandardTorrentPeerStrategy.java b/src/main/java/com/torrent4j/strategy/standard/StandardTorrentPeerStrategy.java index 8c0063a..5e61926 100644 --- a/src/main/java/com/torrent4j/strategy/standard/StandardTorrentPeerStrategy.java +++ b/src/main/java/com/torrent4j/strategy/standard/StandardTorrentPeerStrategy.java @@ -1,5 +1,7 @@ package com.torrent4j.strategy.standard; +import java.io.IOException; + import com.torrent4j.model.Torrent; import com.torrent4j.model.TorrentPiece; import com.torrent4j.model.peer.TorrentPeer; @@ -14,8 +16,11 @@ public class StandardTorrentPeerStrategy implements TorrentPeerStrategy { @Override public void peerDiscovered(Torrent torrent, TorrentPeer peer) { - peer.connect(); - peer.handshake(); + if(peer.connect()) { + peer.handshake(); + } else { + throw new RuntimeException("Could not connect to peer"); + } } @Override diff --git a/src/main/java/com/torrent4j/util/bencoding/BDecoder.java b/src/main/java/com/torrent4j/util/bencoding/BDecoder.java index 62bec1d..de673c8 100644 --- a/src/main/java/com/torrent4j/util/bencoding/BDecoder.java +++ b/src/main/java/com/torrent4j/util/bencoding/BDecoder.java @@ -34,8 +34,7 @@ public class BDecoder extends InputStream { * @throws IOException */ public static Object bdecode(byte[] bencode) throws BDecodingException { - BDecoder in = new BDecoder(new ByteArrayInputStream(bencode)); - try { + try(BDecoder in = new BDecoder(new ByteArrayInputStream(bencode))) { return in.readElement(); } catch (IOException e) { throw new RuntimeException(e); diff --git a/src/main/java/com/torrent4j/util/bencoding/BEncoder.java b/src/main/java/com/torrent4j/util/bencoding/BEncoder.java index 5aa2800..fbc0410 100644 --- a/src/main/java/com/torrent4j/util/bencoding/BEncoder.java +++ b/src/main/java/com/torrent4j/util/bencoding/BEncoder.java @@ -166,7 +166,9 @@ public final class BEncoder extends OutputStream { public static final String encode(Object object) throws IOException { final ByteArrayOutputStream out = new ByteArrayOutputStream(2048); - new BEncoder(out).writeElement(object); + try(BEncoder bout = new BEncoder(out)) { + bout.writeElement(object); + } return new String(out.toByteArray()); }