1
0
mirror of https://github.com/Rogiel/torrent4j synced 2025-12-06 07:32:47 +00:00

Ports torrent4j for netty 4.0 final

This commit is contained in:
2014-04-20 13:10:47 -03:00
parent 8485942691
commit 1e837589e8
28 changed files with 327 additions and 302 deletions

View File

@@ -264,7 +264,7 @@
<dependency> <dependency>
<groupId>io.netty</groupId> <groupId>io.netty</groupId>
<artifactId>netty-all</artifactId> <artifactId>netty-all</artifactId>
<version>4.0.14.Beta1</version> <version>4.0.9.Final</version>
<scope>compile</scope> <scope>compile</scope>
</dependency> </dependency>
<dependency> <dependency>

View File

@@ -21,7 +21,7 @@ public class TestMain {
new PeerWireProtocol(), new InMemoryTorrentStorage()); new PeerWireProtocol(), new InMemoryTorrentStorage());
controller.start(1234); controller.start(1234);
final Torrent torrent = Torrent.load(Paths.get("test.torrent")); final Torrent torrent = Torrent.load(Paths.get("Black Lab - This Night.mp3.torrent"));
System.out.println("Torrent hash is " + torrent.getHash().getString()); System.out.println("Torrent hash is " + torrent.getHash().getString());
// controller.checkExistingData(torrent); // controller.checkExistingData(torrent);
@@ -29,7 +29,7 @@ public class TestMain {
controller.registerTorrent(torrent); controller.registerTorrent(torrent);
final TorrentPeer peer = new TorrentPeer(torrent); final TorrentPeer peer = new TorrentPeer(torrent);
peer.setAddress(new InetSocketAddress(Inet4Address peer.setAddress(new InetSocketAddress(Inet4Address
.getByName("127.0.0.1"), 34096)); .getByName("127.0.0.1"), 63098));
torrent.getSwarm().addPeer(peer); torrent.getSwarm().addPeer(peer);
while(true) { while(true) {

View File

@@ -296,10 +296,12 @@ public class Torrent {
* if any error occur while reading the torrent file * if any error occur while reading the torrent file
*/ */
public static Torrent load(InputStream in) throws IOException { public static Torrent load(InputStream in) throws IOException {
final Object node = new BDecoder(in).readElement(); try(BDecoder bin = new BDecoder(in)) {
final Object node = bin.readElement();
final TorrentMetadata metadata = new TorrentMetadata((BMap) node); final TorrentMetadata metadata = new TorrentMetadata((BMap) node);
return new Torrent(metadata); return new Torrent(metadata);
} }
}
/** /**
* Load an torrent from an {@link InputStream} * Load an torrent from an {@link InputStream}

View File

@@ -187,7 +187,7 @@ public class TorrentPeer {
// NETWORK RELATED THINGS! // NETWORK RELATED THINGS!
public void handshake() { public void handshake() {
// FIXME sent another peer id // FIXME send another peer id
protocolPeer.handshake(torrent.getHash().getHash(), torrent protocolPeer.handshake(torrent.getHash().getHash(), torrent
.getController().getConfig().getPeerID()); .getController().getConfig().getPeerID());
} }

View File

@@ -1,6 +1,6 @@
package com.torrent4j.net.peerwire; package com.torrent4j.net.peerwire;
import io.netty.buffer.ChannelBuffer; import io.netty.buffer.ByteBuf;
public abstract class AbstractPeerWireMessage implements PeerWireMessage { public abstract class AbstractPeerWireMessage implements PeerWireMessage {
public final int messageID; public final int messageID;
@@ -10,19 +10,19 @@ public abstract class AbstractPeerWireMessage implements PeerWireMessage {
} }
@Override @Override
public final void write(ChannelBuffer buffer) { public final void write(ByteBuf buffer) {
buffer.writeByte(messageID); buffer.writeByte(messageID);
writeImpl(buffer); writeImpl(buffer);
} }
public void writeImpl(ChannelBuffer buffer) { public void writeImpl(ByteBuf buffer) {
} }
@Override @Override
public final void read(ChannelBuffer buffer) { public final void read(ByteBuf buffer) {
readImpl(buffer); readImpl(buffer);
} }
public void readImpl(ChannelBuffer buffer) { public void readImpl(ByteBuf buffer) {
} }
} }

View File

@@ -1,12 +1,12 @@
package com.torrent4j.net.peerwire; package com.torrent4j.net.peerwire;
import static io.netty.channel.Channels.pipeline; import io.netty.channel.Channel;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline; import io.netty.channel.ChannelPipeline;
import io.netty.channel.ChannelPipelineFactory; import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler; import io.netty.handler.logging.LoggingHandler;
import io.netty.logging.InternalLogLevel;
import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService;
import com.torrent4j.TorrentController; import com.torrent4j.TorrentController;
import com.torrent4j.net.peerwire.codec.PeerWireFrameDecoder; import com.torrent4j.net.peerwire.codec.PeerWireFrameDecoder;
@@ -16,22 +16,22 @@ import com.torrent4j.net.peerwire.codec.PeerWireMessageEncoder;
import com.torrent4j.net.peerwire.traffic.PeerTrafficShapingHandler; import com.torrent4j.net.peerwire.traffic.PeerTrafficShapingHandler;
import com.torrent4j.net.peerwire.traffic.TorrentTrafficShapingHandler; import com.torrent4j.net.peerwire.traffic.TorrentTrafficShapingHandler;
public class PeerWirePipelineFactory implements ChannelPipelineFactory { public class PeerWireChannelInitializer extends ChannelInitializer<Channel> {
private final TorrentController controller; private final TorrentController controller;
private final Executor executor; private final ScheduledExecutorService executor;
public PeerWirePipelineFactory(TorrentController controller, public PeerWireChannelInitializer(TorrentController controller,
Executor executor) { ScheduledExecutorService executor) {
this.controller = controller; this.controller = controller;
this.executor = executor; this.executor = executor;
} }
@Override @Override
public ChannelPipeline getPipeline() throws Exception { protected void initChannel(Channel ch) throws Exception {
final ChannelPipeline p = pipeline(); final ChannelPipeline p = ch.pipeline();
p.addLast("torrent-shaper", new TorrentTrafficShapingHandler(executor)); p.addLast("torrent-shaper", new TorrentTrafficShapingHandler(executor));
p.addLast("traffic-shaper", new PeerTrafficShapingHandler(executor)); p.addLast("traffic-shaper", new PeerTrafficShapingHandler());
p.addLast("frame-decoder", new PeerWireFrameDecoder()); p.addLast("frame-decoder", new PeerWireFrameDecoder());
p.addLast("frame-encoder", new PeerWireFrameEncoder()); p.addLast("frame-encoder", new PeerWireFrameEncoder());
@@ -39,10 +39,9 @@ public class PeerWirePipelineFactory implements ChannelPipelineFactory {
p.addLast("message-decoder", new PeerWireMessageDecoder()); p.addLast("message-decoder", new PeerWireMessageDecoder());
p.addLast("message-encoder", new PeerWireMessageEncoder()); p.addLast("message-encoder", new PeerWireMessageEncoder());
p.addLast("logging", new LoggingHandler(InternalLogLevel.WARN)); p.addLast("logging", new LoggingHandler(LogLevel.WARN));
p.addLast("handler", new PeerWireHandler(controller)); p.addLast("in-handler", new PeerWireInboundHandler(controller));
p.addLast("out-handler", new PeerWireOutboundHandler(controller));
return p;
} }
} }

View File

@@ -1,25 +1,20 @@
package com.torrent4j.net.peerwire; package com.torrent4j.net.peerwire;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.MessageEvent; import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.SimpleChannelHandler;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.util.Date; import java.util.Date;
import com.torrent4j.TorrentController; import com.torrent4j.TorrentController;
import com.torrent4j.model.Torrent; import com.torrent4j.model.Torrent;
import com.torrent4j.model.TorrentPiece;
import com.torrent4j.model.TorrentPieceBlock;
import com.torrent4j.model.peer.TorrentPeer; import com.torrent4j.model.peer.TorrentPeer;
import com.torrent4j.model.peer.TorrentPeerChoking; import com.torrent4j.model.peer.TorrentPeerChoking;
import com.torrent4j.model.peer.TorrentPeerInterest; import com.torrent4j.model.peer.TorrentPeerInterest;
import com.torrent4j.model.TorrentPiece;
import com.torrent4j.model.TorrentPieceBlock;
import com.torrent4j.net.peerwire.codec.PeerWireFrameDecoder; import com.torrent4j.net.peerwire.codec.PeerWireFrameDecoder;
import com.torrent4j.net.peerwire.codec.PeerWireFrameEncoder;
import com.torrent4j.net.peerwire.codec.PeerWireMessageDecoder; import com.torrent4j.net.peerwire.codec.PeerWireMessageDecoder;
import com.torrent4j.net.peerwire.codec.PeerWireMessageEncoder;
import com.torrent4j.net.peerwire.messages.BitFieldMessage; import com.torrent4j.net.peerwire.messages.BitFieldMessage;
import com.torrent4j.net.peerwire.messages.BlockMessage; import com.torrent4j.net.peerwire.messages.BlockMessage;
import com.torrent4j.net.peerwire.messages.CancelMessage; import com.torrent4j.net.peerwire.messages.CancelMessage;
@@ -35,52 +30,51 @@ import com.torrent4j.net.peerwire.traffic.TorrentTrafficShapingHandler;
import com.torrent4j.util.Hash; import com.torrent4j.util.Hash;
import com.torrent4j.util.HashType; import com.torrent4j.util.HashType;
public class PeerWireHandler extends SimpleChannelHandler { public class PeerWireInboundHandler extends ChannelInboundHandlerAdapter {
private final TorrentController controller; private final TorrentController controller;
private PeerWireProtocolPeer peer; private PeerWireProtocolPeer peer;
public PeerWireHandler(TorrentController controller) { public PeerWireInboundHandler(TorrentController controller) {
this.controller = controller; this.controller = controller;
} }
@Override @Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) public void channelRead(ChannelHandlerContext ctx, Object msg)
throws Exception { throws Exception {
try { try {
final Object msg = e.getMessage();
if (!(msg instanceof PeerWireMessage)) if (!(msg instanceof PeerWireMessage))
return; return;
if (msg instanceof HandshakeMessage) { if (msg instanceof HandshakeMessage) {
final HandshakeMessage message = (HandshakeMessage) msg; final HandshakeMessage message = (HandshakeMessage) msg;
((PeerWireFrameDecoder) e.getChannel().getPipeline() ((PeerWireFrameDecoder) ctx.channel().pipeline()
.get("frame-decoder")).setHandshaked(true); .get("frame-decoder")).setHandshaked(true);
((PeerWireMessageDecoder) e.getChannel().getPipeline() ((PeerWireMessageDecoder) ctx.channel().pipeline()
.get("message-decoder")).setHandshaked(true); .get("message-decoder")).setHandshaked(true);
final Hash hash = new Hash(HashType.SHA1, message.torrentHash); final Hash hash = new Hash(HashType.SHA1, message.torrentHash);
final Torrent torrent = controller.findTorrent(hash); final Torrent torrent = controller.findTorrent(hash);
if (torrent == null) { if (torrent == null) {
e.getChannel().disconnect(); ctx.channel().disconnect();
return; return;
} }
TorrentPeer peer = torrent.getSwarm().findPeer( TorrentPeer peer = torrent.getSwarm().findPeer(
(InetSocketAddress) e.getChannel().getRemoteAddress(), (InetSocketAddress) ctx.channel().remoteAddress(),
message.peerID); message.peerID);
if (peer == null) { if (peer == null) {
peer = new TorrentPeer(torrent); peer = new TorrentPeer(torrent);
peer.setAddress((InetSocketAddress) e.getChannel() peer.setAddress((InetSocketAddress) ctx.channel()
.getRemoteAddress()); .remoteAddress());
} }
peer.setPeerID(message.peerID); peer.setPeerID(message.peerID);
this.peer = (PeerWireProtocolPeer) peer.getProtocolPeer(); this.peer = (PeerWireProtocolPeer) peer.getProtocolPeer();
e.getChannel().getPipeline() ctx.channel().pipeline()
.get(PeerTrafficShapingHandler.class).setPeer(peer); .get(PeerTrafficShapingHandler.class).setPeer(peer);
e.getChannel().getPipeline() ctx.channel().pipeline()
.get(TorrentTrafficShapingHandler.class) .get(TorrentTrafficShapingHandler.class)
.setTorrent(torrent); .setTorrent(torrent);
@@ -209,93 +203,7 @@ public class PeerWireHandler extends SimpleChannelHandler {
System.out.println(msg); System.out.println(msg);
} }
} finally { } finally {
ctx.sendUpstream(e); super.channelRead(ctx, msg);
} }
} }
@Override
public void writeRequested(ChannelHandlerContext ctx, MessageEvent e)
throws Exception {
try {
final Object msg = e.getMessage();
if (!(msg instanceof PeerWireMessage))
return;
if (msg instanceof HandshakeMessage) {
e.getFuture().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future)
throws Exception {
((PeerWireFrameEncoder) future.getChannel()
.getPipeline().get("frame-encoder"))
.setHandshaked(true);
((PeerWireMessageEncoder) future.getChannel()
.getPipeline().get("message-encoder"))
.setHandshaked(true);
}
});
} else if (msg instanceof BlockMessage) {
final BlockMessage message = (BlockMessage) msg;
final TorrentPiece piece = peer.getTorrent().getPiece(
message.pieceIndex);
final TorrentPieceBlock block = piece.getBlock(message.begin,
message.data.remaining());
peer.getTorrentPeer().getState().setLastUploadedBlock(block);
peer.getTorrentPeer().getState()
.setLastUploadedBlockDate(new Date());
e.getFuture().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future)
throws Exception {
if (!future.isSuccess())
return;
peer.getTorrentPeer().getState()
.setUploadRequestedBlock(null);
peer.getTorrentPeer().getState()
.setUploadRequestedDate(null);
}
});
} else if (msg instanceof RequestMessage) {
final RequestMessage message = (RequestMessage) msg;
final TorrentPiece piece = peer.getTorrent().getPiece(
message.pieceIndex);
final TorrentPieceBlock block = piece.getBlock(message.begin,
message.length);
peer.getTorrentPeer().getState()
.setDownloadRequestedBlock(block);
peer.getTorrentPeer().getState()
.setDownloadRequestedDate(new Date());
} else if (msg instanceof CancelMessage) {
e.getFuture().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future)
throws Exception {
if (!future.isSuccess())
return;
peer.getTorrentPeer().getState()
.setDownloadRequestedBlock(null);
peer.getTorrentPeer().getState()
.setDownloadRequestedDate(null);
}
});
}
} finally {
ctx.sendDownstream(e);
}
}
// @Override
// public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent
// e)
// throws Exception {
// try {
// peer = new PeerWireProtocolPeer(e.getChannel());
// } finally {
// ctx.sendUpstream(e);
// }
// }
} }

View File

@@ -1,9 +1,9 @@
package com.torrent4j.net.peerwire; package com.torrent4j.net.peerwire;
import io.netty.buffer.ChannelBuffer; import io.netty.buffer.ByteBuf;
public interface PeerWireMessage { public interface PeerWireMessage {
void write(ChannelBuffer buffer); void write(ByteBuf buffer);
void read(ChannelBuffer buffer); void read(ByteBuf buffer);
} }

View File

@@ -0,0 +1,103 @@
package com.torrent4j.net.peerwire;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import java.util.Date;
import com.torrent4j.TorrentController;
import com.torrent4j.model.TorrentPiece;
import com.torrent4j.model.TorrentPieceBlock;
import com.torrent4j.net.peerwire.codec.PeerWireFrameEncoder;
import com.torrent4j.net.peerwire.codec.PeerWireMessageEncoder;
import com.torrent4j.net.peerwire.messages.BlockMessage;
import com.torrent4j.net.peerwire.messages.CancelMessage;
import com.torrent4j.net.peerwire.messages.HandshakeMessage;
import com.torrent4j.net.peerwire.messages.RequestMessage;
public class PeerWireOutboundHandler extends ChannelOutboundHandlerAdapter {
@SuppressWarnings("unused")
private final TorrentController controller;
private PeerWireProtocolPeer peer;
public PeerWireOutboundHandler(TorrentController controller) {
this.controller = controller;
}
@Override
public void write(ChannelHandlerContext ctx, Object msg,
ChannelPromise promise) throws Exception {
try {
if (!(msg instanceof PeerWireMessage))
return;
if (msg instanceof HandshakeMessage) {
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future)
throws Exception {
((PeerWireFrameEncoder) future.channel()
.pipeline().get("frame-encoder"))
.setHandshaked(true);
((PeerWireMessageEncoder) future.channel()
.pipeline().get("message-encoder"))
.setHandshaked(true);
}
});
} else if (msg instanceof BlockMessage) {
final BlockMessage message = (BlockMessage) msg;
final TorrentPiece piece = peer.getTorrent().getPiece(
message.pieceIndex);
final TorrentPieceBlock block = piece.getBlock(message.begin,
message.data.remaining());
peer.getTorrentPeer().getState().setLastUploadedBlock(block);
peer.getTorrentPeer().getState()
.setLastUploadedBlockDate(new Date());
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future)
throws Exception {
if (!future.isSuccess())
return;
peer.getTorrentPeer().getState()
.setUploadRequestedBlock(null);
peer.getTorrentPeer().getState()
.setUploadRequestedDate(null);
}
});
} else if (msg instanceof RequestMessage) {
final RequestMessage message = (RequestMessage) msg;
final TorrentPiece piece = peer.getTorrent().getPiece(
message.pieceIndex);
final TorrentPieceBlock block = piece.getBlock(message.begin,
message.length);
peer.getTorrentPeer().getState()
.setDownloadRequestedBlock(block);
peer.getTorrentPeer().getState()
.setDownloadRequestedDate(new Date());
} else if (msg instanceof CancelMessage) {
promise.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future)
throws Exception {
if (!future.isSuccess())
return;
peer.getTorrentPeer().getState()
.setDownloadRequestedBlock(null);
peer.getTorrentPeer().getState()
.setDownloadRequestedDate(null);
}
});
}
} finally {
super.write(ctx, msg, promise);
}
}
}

View File

@@ -1,51 +1,63 @@
package com.torrent4j.net.peerwire; package com.torrent4j.net.peerwire;
import io.netty.bootstrap.ClientBootstrap; import io.netty.bootstrap.Bootstrap;
import io.netty.bootstrap.ServerBootstrap; import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFuture;
import io.netty.channel.socket.nio.NioClientSocketChannelFactory; import io.netty.channel.ChannelOption;
import io.netty.channel.socket.nio.NioServerSocketChannelFactory; import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.net.Inet4Address; import java.net.Inet4Address;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.UnknownHostException; import java.net.UnknownHostException;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import com.torrent4j.TorrentController; import com.torrent4j.TorrentController;
import com.torrent4j.model.peer.TorrentPeer; import com.torrent4j.model.peer.TorrentPeer;
import com.torrent4j.net.TorrentProtocol; import com.torrent4j.net.TorrentProtocol;
public class PeerWireProtocol implements TorrentProtocol { public class PeerWireProtocol implements TorrentProtocol {
private final Executor threadPool = Executors.newCachedThreadPool(); private final ScheduledExecutorService threadPool = Executors
.newScheduledThreadPool(1);
private final ServerBootstrap serverBootstrap = new ServerBootstrap( private final EventLoopGroup bossGroup = new NioEventLoopGroup();
new NioServerSocketChannelFactory(threadPool, threadPool)); private final EventLoopGroup workerGroup = new NioEventLoopGroup();
private final ClientBootstrap clientBootstrap = new ClientBootstrap(
new NioClientSocketChannelFactory(threadPool, threadPool));
private Channel serverChannel; private final ServerBootstrap serverBootstrap = new ServerBootstrap();
private final Bootstrap clientBootstrap = new Bootstrap();
private NioServerSocketChannel serverChannel;
@Override @Override
public void start(TorrentController controller, int listenPort) { public void start(TorrentController controller, int listenPort) {
serverBootstrap.setPipelineFactory(new PeerWirePipelineFactory( serverBootstrap
controller, threadPool)); .group(bossGroup, workerGroup)
clientBootstrap.setPipelineFactory(new PeerWirePipelineFactory( .channel(NioServerSocketChannel.class)
controller, threadPool)); .option(ChannelOption.SO_BACKLOG, 100)
.localAddress(listenPort)
.childOption(ChannelOption.TCP_NODELAY, true)
.childHandler(
new PeerWireChannelInitializer(controller, threadPool)).validate();
clientBootstrap
.group(workerGroup)
.channel(NioSocketChannel.class)
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 100)
//.option(ChannelOption.SO_BACKLOG, 100)
.handler(new PeerWireChannelInitializer(controller, threadPool)).validate();
try { try {
serverChannel = serverBootstrap.bind(new InetSocketAddress( serverChannel = (NioServerSocketChannel) serverBootstrap.bind(new InetSocketAddress(
Inet4Address.getByName("0.0.0.0"), listenPort)); Inet4Address.getByName("0.0.0.0"), listenPort)).channel();
} catch (UnknownHostException e) { } catch (UnknownHostException e) {
} }
} }
@Override @Override
public void stop() { public void stop() {
clientBootstrap.releaseExternalResources();
serverChannel.close(); serverChannel.close();
serverBootstrap.releaseExternalResources();
} }
@Override @Override
@@ -54,7 +66,7 @@ public class PeerWireProtocol implements TorrentProtocol {
.awaitUninterruptibly(); .awaitUninterruptibly();
if (future.isSuccess()) { if (future.isSuccess()) {
final PeerWireProtocolPeer protocolPeer = new PeerWireProtocolPeer( final PeerWireProtocolPeer protocolPeer = new PeerWireProtocolPeer(
future.getChannel()); future.channel());
protocolPeer.setTorrentPeer(peer); protocolPeer.setTorrentPeer(peer);
peer.setProtocolPeer(protocolPeer); peer.setProtocolPeer(protocolPeer);
return true; return true;

View File

@@ -33,13 +33,13 @@ public class PeerWireProtocolPeer implements TorrentProtocolPeer {
@Override @Override
public boolean connect() { public boolean connect() {
return channel.connect(channel.getRemoteAddress()) return channel.connect(channel.remoteAddress())
.awaitUninterruptibly().isSuccess(); .awaitUninterruptibly().isSuccess();
} }
@Override @Override
public boolean isConnected() { public boolean isConnected() {
return channel.isConnected(); return channel.isActive();
} }
@Override @Override

View File

@@ -1,36 +1,37 @@
package com.torrent4j.net.peerwire.codec; package com.torrent4j.net.peerwire.codec;
import io.netty.buffer.ChannelBuffer; import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.frame.FrameDecoder; import io.netty.handler.codec.ByteToMessageDecoder;
public class PeerWireFrameDecoder extends FrameDecoder { import java.util.List;
public class PeerWireFrameDecoder extends ByteToMessageDecoder {
private boolean handshaked = false; private boolean handshaked = false;
@Override @Override
protected Object decode(ChannelHandlerContext ctx, Channel channel, protected void decode(ChannelHandlerContext ctx, ByteBuf buffer,
ChannelBuffer buffer) throws Exception { List<Object> out) throws Exception {
if (!handshaked) { if (!handshaked) {
int pos = buffer.readerIndex(); out.add(buffer);
buffer.skipBytes(68); return;
return buffer.copy(pos, 68);
} }
if (buffer.readableBytes() < 4) if (buffer.readableBytes() < 4)
return null; return;
int pos = buffer.readerIndex(); int pos = buffer.readerIndex();
final int len = buffer.readInt(); final int len = buffer.readInt();
if (buffer.readableBytes() >= len) { if (buffer.readableBytes() >= len) {
try { try {
return buffer.slice(buffer.readerIndex(), len); out.add(buffer.slice(buffer.readerIndex(), len));
return;
} finally { } finally {
buffer.skipBytes(len); buffer.skipBytes(len);
} }
} else { } else {
buffer.readerIndex(pos); buffer.readerIndex(pos);
return null; return;
} }
} }

View File

@@ -1,27 +1,28 @@
package com.torrent4j.net.peerwire.codec; package com.torrent4j.net.peerwire.codec;
import io.netty.buffer.ChannelBuffer; import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel; import io.netty.buffer.ByteBufUtil;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.oneone.OneToOneEncoder; import io.netty.handler.codec.MessageToByteEncoder;
public class PeerWireFrameEncoder extends OneToOneEncoder { public class PeerWireFrameEncoder extends MessageToByteEncoder<ByteBuf> {
private boolean handshaked = false; private boolean handshaked = false;
@Override @Override
protected Object encode(ChannelHandlerContext ctx, Channel channel, protected void encode(ChannelHandlerContext ctx, ByteBuf inputBuffer, ByteBuf outputBuffer)
Object msg) throws Exception { throws Exception {
if (!(msg instanceof ChannelBuffer) || !handshaked) if (!handshaked) {
return msg; outputBuffer.writeBytes(inputBuffer);
final ChannelBuffer buffer = (ChannelBuffer) msg; } else {
inputBuffer.readerIndex(0);
buffer.readerIndex(0); outputBuffer.writeInt(0x00);
outputBuffer.writeBytes(inputBuffer);
if (handshaked) { if (handshaked) {
final int len = buffer.readableBytes() - 4; final int len = inputBuffer.readableBytes();
buffer.setInt(0, len); outputBuffer.setInt(0, len);
} }
}
return buffer; System.out.println(ByteBufUtil.hexDump(outputBuffer));
} }
public void setHandshaked(boolean handshaked) { public void setHandshaked(boolean handshaked) {

View File

@@ -1,9 +1,10 @@
package com.torrent4j.net.peerwire.codec; package com.torrent4j.net.peerwire.codec;
import io.netty.buffer.ChannelBuffer; import io.netty.buffer.ByteBuf;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.oneone.OneToOneDecoder; import io.netty.handler.codec.ReplayingDecoder;
import java.util.List;
import com.torrent4j.net.peerwire.PeerWireMessage; import com.torrent4j.net.peerwire.PeerWireMessage;
import com.torrent4j.net.peerwire.messages.BitFieldMessage; import com.torrent4j.net.peerwire.messages.BitFieldMessage;
@@ -19,24 +20,23 @@ import com.torrent4j.net.peerwire.messages.PortMessage;
import com.torrent4j.net.peerwire.messages.RequestMessage; import com.torrent4j.net.peerwire.messages.RequestMessage;
import com.torrent4j.net.peerwire.messages.UnchokeMessage; import com.torrent4j.net.peerwire.messages.UnchokeMessage;
public class PeerWireMessageDecoder extends OneToOneDecoder { public class PeerWireMessageDecoder extends ReplayingDecoder<ByteBuf> {
private boolean handshaked = false; private boolean handshaked = false;
@Override @Override
protected Object decode(ChannelHandlerContext ctx, Channel channel, protected void decode(ChannelHandlerContext ctx, ByteBuf buffer,
Object msg) throws Exception { List<Object> out) throws Exception {
if (!(msg instanceof ChannelBuffer))
return msg;
final ChannelBuffer buffer = (ChannelBuffer) msg;
if (!handshaked) { if (!handshaked) {
final HandshakeMessage message = new HandshakeMessage(); final HandshakeMessage message = new HandshakeMessage();
message.read(buffer); message.read(buffer);
return message; out.add(message);
return;
} else { } else {
if(buffer.readableBytes() == 0) if(buffer.readableBytes() == 0) {
return new KeepAliveMessage(); out.add(new KeepAliveMessage());
return;
}
final byte opcode = buffer.readByte(); final byte opcode = buffer.readByte();
final PeerWireMessage message; final PeerWireMessage message;
@@ -72,10 +72,10 @@ public class PeerWireMessageDecoder extends OneToOneDecoder {
message = new UnchokeMessage(); message = new UnchokeMessage();
break; break;
default: default:
return null; return;
} }
message.read(buffer); message.read(buffer);
return message; out.add(message);
} }
} }

View File

@@ -1,30 +1,21 @@
package com.torrent4j.net.peerwire.codec; package com.torrent4j.net.peerwire.codec;
import io.netty.buffer.ChannelBuffer; import io.netty.buffer.ByteBuf;
import io.netty.buffer.ChannelBuffers;
import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.oneone.OneToOneEncoder; import io.netty.handler.codec.MessageToByteEncoder;
import com.torrent4j.net.peerwire.PeerWireMessage; import com.torrent4j.net.peerwire.PeerWireMessage;
import com.torrent4j.net.peerwire.messages.KeepAliveMessage; import com.torrent4j.net.peerwire.messages.KeepAliveMessage;
public class PeerWireMessageEncoder extends OneToOneEncoder { public class PeerWireMessageEncoder extends MessageToByteEncoder<PeerWireMessage> {
private boolean handshaked = false; private boolean handshaked = false;
@Override @Override
protected Object encode(ChannelHandlerContext ctx, Channel channel, protected void encode(ChannelHandlerContext ctx, PeerWireMessage message,
Object msg) throws Exception { ByteBuf buffer) throws Exception {
if (!(msg instanceof PeerWireMessage))
return msg;
final PeerWireMessage message = (PeerWireMessage) msg;
final ChannelBuffer buffer = ChannelBuffers.dynamicBuffer();
if (handshaked && !(message instanceof KeepAliveMessage)) if (handshaked && !(message instanceof KeepAliveMessage))
buffer.writeInt(0x00); buffer.writeInt(0x00);
message.write(buffer); message.write(buffer);
return buffer;
} }
public void setHandshaked(boolean handshaked) { public void setHandshaked(boolean handshaked) {

View File

@@ -1,6 +1,6 @@
package com.torrent4j.net.peerwire.messages; package com.torrent4j.net.peerwire.messages;
import io.netty.buffer.ChannelBuffer; import io.netty.buffer.ByteBuf;
import java.util.BitSet; import java.util.BitSet;
@@ -21,7 +21,7 @@ public class BitFieldMessage extends AbstractPeerWireMessage {
} }
@Override @Override
public void writeImpl(ChannelBuffer buffer) { public void writeImpl(ByteBuf buffer) {
for (int i = 0; i < bitSet.size();) { for (int i = 0; i < bitSet.size();) {
byte data = 0; byte data = 0;
for (int j = 128; i < bitSet.size() && j > 0; j >>= 1, i++) { for (int j = 128; i < bitSet.size() && j > 0; j >>= 1, i++) {
@@ -34,10 +34,10 @@ public class BitFieldMessage extends AbstractPeerWireMessage {
} }
@Override @Override
public void readImpl(ChannelBuffer buffer) { public void readImpl(ByteBuf buffer) {
bitSet = new BitSet(); bitSet = new BitSet();
int i = 0; int i = 0;
while (buffer.readable()) { while (buffer.isReadable()) {
byte b = buffer.readByte(); byte b = buffer.readByte();
for (int j = 128; j > 0; j >>= 1) { for (int j = 128; j > 0; j >>= 1) {
bitSet.set(i++, (b & j) != 0); bitSet.set(i++, (b & j) != 0);

View File

@@ -1,6 +1,6 @@
package com.torrent4j.net.peerwire.messages; package com.torrent4j.net.peerwire.messages;
import io.netty.buffer.ChannelBuffer; import io.netty.buffer.ByteBuf;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
@@ -25,17 +25,17 @@ public class BlockMessage extends AbstractPeerWireMessage {
} }
@Override @Override
public void writeImpl(ChannelBuffer buffer) { public void writeImpl(ByteBuf buffer) {
buffer.writeInt(pieceIndex); buffer.writeInt(pieceIndex);
buffer.writeInt(begin); buffer.writeInt(begin);
buffer.writeBytes(data); buffer.writeBytes(data);
} }
@Override @Override
public void readImpl(ChannelBuffer buffer) { public void readImpl(ByteBuf buffer) {
pieceIndex = buffer.readInt(); pieceIndex = buffer.readInt();
begin = buffer.readInt(); begin = buffer.readInt();
data = buffer.readBytes(buffer.readableBytes()).toByteBuffer(); data = buffer.readBytes(buffer.readableBytes()).nioBuffer();
} }
/* /*

View File

@@ -1,6 +1,6 @@
package com.torrent4j.net.peerwire.messages; package com.torrent4j.net.peerwire.messages;
import io.netty.buffer.ChannelBuffer; import io.netty.buffer.ByteBuf;
import com.torrent4j.net.peerwire.AbstractPeerWireMessage; import com.torrent4j.net.peerwire.AbstractPeerWireMessage;
@@ -23,14 +23,14 @@ public class CancelMessage extends AbstractPeerWireMessage {
} }
@Override @Override
public void writeImpl(ChannelBuffer buffer) { public void writeImpl(ByteBuf buffer) {
buffer.writeInt(pieceIndex); buffer.writeInt(pieceIndex);
buffer.writeInt(begin); buffer.writeInt(begin);
buffer.writeInt(length); buffer.writeInt(length);
} }
@Override @Override
public void readImpl(ChannelBuffer buffer) { public void readImpl(ByteBuf buffer) {
pieceIndex = buffer.readInt(); pieceIndex = buffer.readInt();
begin = buffer.readInt(); begin = buffer.readInt();
length = buffer.readInt(); length = buffer.readInt();

View File

@@ -1,6 +1,7 @@
package com.torrent4j.net.peerwire.messages; package com.torrent4j.net.peerwire.messages;
import io.netty.buffer.ChannelBuffer; import io.netty.buffer.ByteBuf;
import io.netty.buffer.ByteBufUtil;
import java.nio.charset.Charset; import java.nio.charset.Charset;
import java.util.Arrays; import java.util.Arrays;
@@ -23,7 +24,7 @@ public class HandshakeMessage implements PeerWireMessage {
} }
@Override @Override
public void write(ChannelBuffer buffer) { public void write(ByteBuf buffer) {
buffer.writeByte(protocolStringLength); buffer.writeByte(protocolStringLength);
buffer.writeBytes(protocolString.getBytes()); buffer.writeBytes(protocolString.getBytes());
buffer.writeLong(reserved); buffer.writeLong(reserved);
@@ -32,9 +33,12 @@ public class HandshakeMessage implements PeerWireMessage {
} }
@Override @Override
public void read(ChannelBuffer buffer) { public void read(ByteBuf buffer) {
protocolStringLength = buffer.readByte(); protocolStringLength = buffer.readByte();
protocolString = buffer.readBytes(protocolStringLength).toString(); byte[] str = new byte[protocolStringLength];
buffer.readBytes(str);
protocolString = new String(str);
//protocolString = buffer.readBytes
reserved = buffer.readLong(); reserved = buffer.readLong();
torrentHash = buffer.readBytes(20).array(); torrentHash = buffer.readBytes(20).array();
peerID = buffer.readBytes(20).toString(Charset.forName("UTF-8")); peerID = buffer.readBytes(20).toString(Charset.forName("UTF-8"));

View File

@@ -1,6 +1,6 @@
package com.torrent4j.net.peerwire.messages; package com.torrent4j.net.peerwire.messages;
import io.netty.buffer.ChannelBuffer; import io.netty.buffer.ByteBuf;
import com.torrent4j.net.peerwire.AbstractPeerWireMessage; import com.torrent4j.net.peerwire.AbstractPeerWireMessage;
@@ -19,12 +19,12 @@ public class HaveMessage extends AbstractPeerWireMessage {
} }
@Override @Override
public void writeImpl(ChannelBuffer buffer) { public void writeImpl(ByteBuf buffer) {
buffer.writeInt(pieceIndex); buffer.writeInt(pieceIndex);
} }
@Override @Override
public void readImpl(ChannelBuffer buffer) { public void readImpl(ByteBuf buffer) {
pieceIndex = buffer.readInt(); pieceIndex = buffer.readInt();
} }

View File

@@ -1,6 +1,6 @@
package com.torrent4j.net.peerwire.messages; package com.torrent4j.net.peerwire.messages;
import io.netty.buffer.ChannelBuffer; import io.netty.buffer.ByteBuf;
import com.torrent4j.net.peerwire.PeerWireMessage; import com.torrent4j.net.peerwire.PeerWireMessage;
@@ -9,11 +9,11 @@ public class KeepAliveMessage implements PeerWireMessage {
} }
@Override @Override
public void write(ChannelBuffer buffer) { public void write(ByteBuf buffer) {
} }
@Override @Override
public void read(ChannelBuffer buffer) { public void read(ByteBuf buffer) {
} }
/* /*

View File

@@ -1,6 +1,6 @@
package com.torrent4j.net.peerwire.messages; package com.torrent4j.net.peerwire.messages;
import io.netty.buffer.ChannelBuffer; import io.netty.buffer.ByteBuf;
import com.torrent4j.net.peerwire.AbstractPeerWireMessage; import com.torrent4j.net.peerwire.AbstractPeerWireMessage;
@@ -19,12 +19,12 @@ public class PortMessage extends AbstractPeerWireMessage {
} }
@Override @Override
public void writeImpl(ChannelBuffer buffer) { public void writeImpl(ByteBuf buffer) {
buffer.writeInt(listenPort); buffer.writeInt(listenPort);
} }
@Override @Override
public void readImpl(ChannelBuffer buffer) { public void readImpl(ByteBuf buffer) {
listenPort = buffer.readInt(); listenPort = buffer.readInt();
} }

View File

@@ -1,6 +1,6 @@
package com.torrent4j.net.peerwire.messages; package com.torrent4j.net.peerwire.messages;
import io.netty.buffer.ChannelBuffer; import io.netty.buffer.ByteBuf;
import com.torrent4j.net.peerwire.AbstractPeerWireMessage; import com.torrent4j.net.peerwire.AbstractPeerWireMessage;
@@ -23,14 +23,14 @@ public class RequestMessage extends AbstractPeerWireMessage {
} }
@Override @Override
public void writeImpl(ChannelBuffer buffer) { public void writeImpl(ByteBuf buffer) {
buffer.writeInt(pieceIndex); buffer.writeInt(pieceIndex);
buffer.writeInt(begin); buffer.writeInt(begin);
buffer.writeInt(length); buffer.writeInt(length);
} }
@Override @Override
public void readImpl(ChannelBuffer buffer) { public void readImpl(ByteBuf buffer) {
pieceIndex = buffer.readInt(); pieceIndex = buffer.readInt();
begin = buffer.readInt(); begin = buffer.readInt();
length = buffer.readInt(); length = buffer.readInt();

View File

@@ -3,8 +3,6 @@ package com.torrent4j.net.peerwire.traffic;
import io.netty.handler.traffic.ChannelTrafficShapingHandler; import io.netty.handler.traffic.ChannelTrafficShapingHandler;
import io.netty.handler.traffic.TrafficCounter; import io.netty.handler.traffic.TrafficCounter;
import java.util.concurrent.Executor;
import com.torrent4j.model.peer.TorrentPeer; import com.torrent4j.model.peer.TorrentPeer;
import com.torrent4j.model.peer.TorrentPeerTrafficControl; import com.torrent4j.model.peer.TorrentPeerTrafficControl;
@@ -14,8 +12,8 @@ public class PeerTrafficShapingHandler extends ChannelTrafficShapingHandler {
private TorrentPeer peer; private TorrentPeer peer;
public PeerTrafficShapingHandler(Executor executor) { public PeerTrafficShapingHandler() {
super(executor, 0, 0); super(0, 0);
} }
private void reconfigure() { private void reconfigure() {
@@ -37,9 +35,9 @@ public class PeerTrafficShapingHandler extends ChannelTrafficShapingHandler {
return; return;
reconfigure(); reconfigure();
peer.getTrafficControl().setCurrentDownloadSpeed( peer.getTrafficControl().setCurrentDownloadSpeed(
counter.getLastReadThroughput()); counter.lastReadThroughput());
peer.getTrafficControl().setCurrentUploadSpeed( peer.getTrafficControl().setCurrentUploadSpeed(
counter.getLastWriteThroughput()); counter.lastWriteThroughput());
} }
/** /**

View File

@@ -3,7 +3,7 @@ package com.torrent4j.net.peerwire.traffic;
import io.netty.handler.traffic.GlobalTrafficShapingHandler; import io.netty.handler.traffic.GlobalTrafficShapingHandler;
import io.netty.handler.traffic.TrafficCounter; import io.netty.handler.traffic.TrafficCounter;
import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService;
import com.torrent4j.model.Torrent; import com.torrent4j.model.Torrent;
import com.torrent4j.model.TorrentTrafficControl; import com.torrent4j.model.TorrentTrafficControl;
@@ -14,7 +14,7 @@ public class TorrentTrafficShapingHandler extends GlobalTrafficShapingHandler {
private long writeLimit; private long writeLimit;
private long readLimit; private long readLimit;
public TorrentTrafficShapingHandler(Executor executor) { public TorrentTrafficShapingHandler(ScheduledExecutorService executor) {
super(executor, 0, 0); super(executor, 0, 0);
} }
@@ -37,9 +37,9 @@ public class TorrentTrafficShapingHandler extends GlobalTrafficShapingHandler {
return; return;
reconfigure(); reconfigure();
torrent.getTrafficControl().setCurrentDownloadSpeed( torrent.getTrafficControl().setCurrentDownloadSpeed(
counter.getLastReadThroughput()); counter.lastReadThroughput());
torrent.getTrafficControl().setCurrentUploadSpeed( torrent.getTrafficControl().setCurrentUploadSpeed(
counter.getLastWriteThroughput()); counter.lastWriteThroughput());
} }
/** /**

View File

@@ -1,5 +1,7 @@
package com.torrent4j.strategy.standard; package com.torrent4j.strategy.standard;
import java.io.IOException;
import com.torrent4j.model.Torrent; import com.torrent4j.model.Torrent;
import com.torrent4j.model.TorrentPiece; import com.torrent4j.model.TorrentPiece;
import com.torrent4j.model.peer.TorrentPeer; import com.torrent4j.model.peer.TorrentPeer;
@@ -14,8 +16,11 @@ public class StandardTorrentPeerStrategy implements TorrentPeerStrategy {
@Override @Override
public void peerDiscovered(Torrent torrent, TorrentPeer peer) { public void peerDiscovered(Torrent torrent, TorrentPeer peer) {
peer.connect(); if(peer.connect()) {
peer.handshake(); peer.handshake();
} else {
throw new RuntimeException("Could not connect to peer");
}
} }
@Override @Override

View File

@@ -34,8 +34,7 @@ public class BDecoder extends InputStream {
* @throws IOException * @throws IOException
*/ */
public static Object bdecode(byte[] bencode) throws BDecodingException { public static Object bdecode(byte[] bencode) throws BDecodingException {
BDecoder in = new BDecoder(new ByteArrayInputStream(bencode)); try(BDecoder in = new BDecoder(new ByteArrayInputStream(bencode))) {
try {
return in.readElement(); return in.readElement();
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException(e); throw new RuntimeException(e);

View File

@@ -166,7 +166,9 @@ public final class BEncoder extends OutputStream {
public static final String encode(Object object) throws IOException { public static final String encode(Object object) throws IOException {
final ByteArrayOutputStream out = new ByteArrayOutputStream(2048); final ByteArrayOutputStream out = new ByteArrayOutputStream(2048);
new BEncoder(out).writeElement(object); try(BEncoder bout = new BEncoder(out)) {
bout.writeElement(object);
}
return new String(out.toByteArray()); return new String(out.toByteArray());
} }