1
0
mirror of https://github.com/Rogiel/torrent4j synced 2025-12-05 23:22:49 +00:00

New protocol codecs

Change-Id: I9cec611568032760eebb1325d16d8676e6dff097
This commit is contained in:
rogiel
2011-05-01 23:47:37 -03:00
parent e9c8b7247d
commit 0ddb7c5947
39 changed files with 992 additions and 98 deletions

1
.gitignore vendored
View File

@@ -1,2 +1,3 @@
/target
/target
/store.bin

View File

@@ -27,6 +27,8 @@ import net.torrent.protocol.peerwire.PeerWireManager;
import net.torrent.protocol.peerwire.manager.TorrentManager;
import net.torrent.torrent.Torrent;
import net.torrent.torrent.context.TorrentContext;
import net.torrent.torrent.piece.PieceSelector;
import net.torrent.torrent.piece.RandomPieceSelector;
/**
* Factory class for {@link BitTorrentClient}.
@@ -55,6 +57,10 @@ public class BitTorrentClientFactory {
* The torrent algorithm
*/
private TorrentAlgorithm algorithm;
/**
* The piece selector
*/
private PieceSelector selector;
/**
* Creates a new standard {@link BitTorrentClient BitTorrent client}
@@ -78,10 +84,12 @@ public class BitTorrentClientFactory {
* the torrent
*/
public BitTorrentClientFactory(final Torrent torrent) {
context = new TorrentContext(torrent);
datastore = new PlainTorrentDatastore(new File("store.bin"));
manager = new TorrentManager(context, datastore);
algorithm = new TorrentStdAlgorithm(manager);
this.context = new TorrentContext(torrent);
this.datastore = new PlainTorrentDatastore(new File("store.bin"));
this.manager = new TorrentManager(context, datastore);
if (this.selector == null)
this.selector = new RandomPieceSelector(manager);
this.algorithm = new TorrentStdAlgorithm(manager, selector);
}
/**
@@ -110,10 +118,12 @@ public class BitTorrentClientFactory {
*/
public BitTorrentClientFactory(final Torrent torrent,
TorrentDatastore datastore) {
context = new TorrentContext(torrent);
this.context = new TorrentContext(torrent);
this.datastore = datastore;
manager = new TorrentManager(context, datastore);
algorithm = new TorrentStdAlgorithm(manager);
this.manager = new TorrentManager(context, datastore);
if (this.selector == null)
this.selector = new RandomPieceSelector(manager);
this.algorithm = new TorrentStdAlgorithm(manager, selector);
}
/**
@@ -128,9 +138,9 @@ public class BitTorrentClientFactory {
*/
public BitTorrentClientFactory(final Torrent torrent,
TorrentDatastore datastore, final TorrentAlgorithm algorithm) {
context = new TorrentContext(torrent);
this.context = new TorrentContext(torrent);
this.datastore = datastore;
manager = new TorrentManager(context, datastore);
this.manager = new TorrentManager(context, datastore);
this.algorithm = algorithm;
}

View File

@@ -64,6 +64,50 @@ public interface TorrentPieceDownloadAlgorithm {
*/
TorrentPart allowedFast(TorrentPeer peer, TorrentPiece piece);
/**
* Issued when the peer has rejected our request.
*
* @param peer
* the rejecting peer
* @param part
* the rejected part
* @see RejectAction
*/
RejectAction rejected(TorrentPeer peer, TorrentPart part);
public enum RejectAction {
/**
* Only disconnects the peer, does not initiate a new connection with
* anyone.
*/
DISCONNECT,
/**
* Disconnects the current peer and connects a new one
*/
CONNECT_NEW_PEER,
/**
* Choke this peer
*/
NOT_INTERESTED,
/**
* Request again the same part
*/
RETRY,
/**
* Try to download another piece
*/
TRY_ANOTHER_PIECE,
/**
* Do nothing, ignore. Might cause peer to become idle.
*/
IGNORE;
}
/**
* Test if an certain piece has all its parts already download. If true, a
* checksum will be performed and a message informing we have this piece

View File

@@ -21,6 +21,7 @@ import net.torrent.protocol.algorithm.TorrentPeerAlgorithm;
import net.torrent.protocol.algorithm.TorrentPieceDownloadAlgorithm;
import net.torrent.protocol.algorithm.TorrentPieceUploadAlgorithm;
import net.torrent.protocol.peerwire.manager.TorrentManager;
import net.torrent.torrent.piece.PieceSelector;
/**
* Standard torrent algorithm
@@ -33,10 +34,12 @@ public class TorrentStdAlgorithm implements TorrentAlgorithm {
private final TorrentPieceDownloadAlgorithm downloadAlgorithm;
private final TorrentPieceUploadAlgorithm uploadAlgorithm;
public TorrentStdAlgorithm(final TorrentManager manager) {
public TorrentStdAlgorithm(final TorrentManager manager,
final PieceSelector selector) {
peerAlgorithm = new TorrentStdPeerAlgorithm(manager);
interestAlgorithm = new TorrentStdInterestAlgorithm(manager);
downloadAlgorithm = new TorrentStdPieceDownloadAlgorithm(manager);
interestAlgorithm = new TorrentStdInterestAlgorithm(manager, selector);
downloadAlgorithm = new TorrentStdPieceDownloadAlgorithm(manager,
selector);
uploadAlgorithm = new TorrentStdPieceUploadAlgorithm(manager);
}

View File

@@ -20,6 +20,7 @@ import net.torrent.protocol.peerwire.manager.TorrentManager;
import net.torrent.torrent.context.TorrentPeer;
import net.torrent.torrent.context.TorrentPeer.ChokingState;
import net.torrent.torrent.context.TorrentPeer.InterestState;
import net.torrent.torrent.piece.PieceSelector;
/**
* Standard torrent interest algorithm
@@ -27,18 +28,35 @@ import net.torrent.torrent.context.TorrentPeer.InterestState;
* @author <a href="http://www.rogiel.com/">Rogiel Josias Sulzbach</a>
*/
public class TorrentStdInterestAlgorithm implements TorrentInterestAlgorithm {
@SuppressWarnings("unused")
/**
* The torrent manager
*/
private final TorrentManager manager;
public TorrentStdInterestAlgorithm(TorrentManager manager) {
/**
* This selector is used to find the next piece to be downloaded. Parts are
* managed inside this algorithm.
*/
private final PieceSelector selector;
/**
* Creates a new instance
*
* @param manager
* the manager
* @param selector
* the piece selector
*/
public TorrentStdInterestAlgorithm(TorrentManager manager,
PieceSelector selector) {
this.manager = manager;
this.selector = selector;
}
@Override
public InterestState interested(TorrentPeer peer) {
// if(peer.getPort() == 25944)
// return InterestState.UNINTERESTED;
return InterestState.INTERESTED;
return (selector.select(peer) == null ? InterestState.UNINTERESTED
: InterestState.INTERESTED);
}
@Override

View File

@@ -33,6 +33,7 @@ import net.torrent.torrent.piece.RandomPieceSelector;
*
* @author <a href="http://www.rogiel.com/">Rogiel Josias Sulzbach</a>
*/
// TODO separate standard algorithm from extension ones
public class TorrentStdPieceDownloadAlgorithm implements
TorrentPieceDownloadAlgorithm {
/**
@@ -61,12 +62,13 @@ public class TorrentStdPieceDownloadAlgorithm implements
* @param manager
* the torrent manager instance. With this object is possible to
* retrieve current downloads/uploads and connections.
* @param selector
* the piece selector
*/
public TorrentStdPieceDownloadAlgorithm(TorrentManager manager) {
public TorrentStdPieceDownloadAlgorithm(TorrentManager manager,
PieceSelector selector) {
this.manager = manager;
// this.context = this.manager.getContext();
// this.torrent = this.manager.getTorrent();
selector = new RandomPieceSelector(manager);
this.selector = selector;
}
@Override
@@ -96,6 +98,11 @@ public class TorrentStdPieceDownloadAlgorithm implements
return piece.getFirstPart();
}
@Override
public RejectAction rejected(TorrentPeer peer, TorrentPart part) {
return RejectAction.TRY_ANOTHER_PIECE;
}
@Override
public boolean isComplete(TorrentPeer peer, TorrentPiece piece) {
if (manager.getContext().getBitfield().hasPiece(piece))

View File

@@ -16,8 +16,11 @@
package net.torrent.protocol.peerwire;
import static org.jboss.netty.channel.Channels.pipeline;
import net.torrent.protocol.peerwire.codec.PeerWireDecoder;
import net.torrent.protocol.peerwire.codec.PeerWireEncoder;
import net.torrent.protocol.peerwire.codec.PeerWireFrameDecoder;
import net.torrent.protocol.peerwire.codec.PeerWireFrameEncoder;
import net.torrent.protocol.peerwire.codec.PeerWireMessageDecoder;
import net.torrent.protocol.peerwire.codec.PeerWireMessageEncoder;
import net.torrent.protocol.peerwire.handler.PeerWireCodecHandler;
import net.torrent.protocol.peerwire.handler.PeerWireManagerHeadHandler;
import net.torrent.protocol.peerwire.handler.PeerWireManagerTailHandler;
import net.torrent.protocol.peerwire.manager.TorrentManager;
@@ -38,7 +41,8 @@ public class PeerWirePipelineFactory implements ChannelPipelineFactory {
* The logging handler
*/
private final LoggingHandler loggingHandler = new LoggingHandler(
InternalLogLevel.WARN);
InternalLogLevel.INFO);
/**
* The algorithm handler
*/
@@ -70,15 +74,28 @@ public class PeerWirePipelineFactory implements ChannelPipelineFactory {
@Override
public ChannelPipeline getPipeline() throws Exception {
final ChannelPipeline pipeline = pipeline();
final PeerWireState state = new PeerWireState();
// TODO create traffic shape handler
// TODO create firewall handler - block connections from unwanted peers
// TODO create traffic shape handler once Netty 4.0 is released
// TODO create firewall handler - block connections from unwanted peers.
pipeline.addLast("decoder", new PeerWireDecoder());
pipeline.addLast("encoder", new PeerWireEncoder());
// pipeline.addLast("old.decoder", new PeerWireOldDecoder(state));
// pipeline.addLast("old.encoder", new PeerWireOldEncoder());
// frame (or header) codec
pipeline.addLast("frame.decoder", new PeerWireFrameDecoder(state));
pipeline.addLast("frame.encoder", new PeerWireFrameEncoder(state));
// message codec
pipeline.addLast("message.decoder", new PeerWireMessageDecoder(state));
pipeline.addLast("message.encoder", new PeerWireMessageEncoder());
pipeline.addLast("codec.handler", new PeerWireCodecHandler(state));
// logging handler (before any other handler can take action)
pipeline.addLast("logging", loggingHandler);
// handlers
pipeline.addLast("head-handler", headManagerHandler);
pipeline.addLast("algorithm", algorithmHandler);
pipeline.addLast("tail-handler", tailManagerHandler);

View File

@@ -0,0 +1,75 @@
/*
* Copyright 2011 Rogiel Josias Sulzbach
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.torrent.protocol.peerwire;
import org.jboss.netty.channel.Channel;
/**
* Maintain an per {@link Channel} information about the connection state.
*
* @author <a href="http://www.rogiel.com/">Rogiel Josias Sulzbach</a>
*/
public class PeerWireState {
/**
* The peer handshake state
*/
private boolean peerHandshaked;
/**
* The handshake state
*/
private boolean handshaked;
/**
* Returns true if this connection has already been handshaked by the peer
* end.
*
* @return true if connection was handshaked
*/
public boolean hasPeerHandshaked() {
return peerHandshaked;
}
/**
* Set the handshake state of this connection by the peer end.
*
* @param peerHandshaked
* the handshake state
*/
public void setPeerHandshaked(boolean peerHandshaked) {
this.peerHandshaked = peerHandshaked;
}
/**
* Returns true if this connection has already been handshaked by the
* library end.
*
* @return true if connection was handshaked
*/
public boolean hasHandshaked() {
return handshaked;
}
/**
* Set the handshake state of this connection by the library end.
*
* @param handshaked
* the handshake state
*/
public void setHandshaked(boolean handshaked) {
this.handshaked = handshaked;
}
}

View File

@@ -0,0 +1,96 @@
/*
* Copyright 2011 Rogiel Josias Sulzbach
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.torrent.protocol.peerwire.codec;
import net.torrent.protocol.peerwire.PeerWireState;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.frame.CorruptedFrameException;
import org.jboss.netty.handler.codec.frame.FrameDecoder;
/**
* BitTorrent has two types of message headers:
* <p>
* <h1>Handshake message</h1> Handshake messages are composed of 1 byte, which
* indicates the size of protocol string ("BitTorrent protocol").
* <p>
* <h1>Messages</h1> All other messages are 4-byte integer containing the
* message length, an 1 byte opcode followed by the message content.
* <p>
* <p>
* Instances of this class keep channel state content and must not be shared nor
* cached.
*
* @author <a href="http://www.rogiel.com/">Rogiel Josias Sulzbach</a>
* @see PeerWireFrameDecoder#state
*/
public class PeerWireFrameDecoder extends FrameDecoder {
/**
* This connection state. This need to be shared with other encoders,
* decoders or handlers. But more importantly <b>NEVER</b> share the same
* instance across more than one {@link Channel}.
*/
private final PeerWireState state;
/**
* Creates a new instance of this decoder
*
* @param state
* the connection state
*/
public PeerWireFrameDecoder(final PeerWireState state) {
this.state = state;
}
@Override
protected Object decode(ChannelHandlerContext ctx, Channel channel,
ChannelBuffer buffer) throws Exception {
buffer.markReaderIndex();
final int index = buffer.readerIndex();
if (state.hasPeerHandshaked()) {
if (buffer.readableBytes() <= 4)
return null;
int len = buffer.readInt();
if (len == 0) {
// keep-alive message
return ChannelBuffers.EMPTY_BUFFER;
} else if (buffer.readableBytes() < len) {
buffer.resetReaderIndex();
return null;
}
buffer.skipBytes(len);
return buffer.slice(index + 4, len);
} else {
if (buffer.readableBytes() <= 1) // at least 1 byte for header
return null;
final int pstrlen = buffer.readByte();
if (pstrlen != 19)
throw new CorruptedFrameException(
"Handshake frame is corrupted. pstrlen != 19");
buffer.resetReaderIndex();
if (buffer.readableBytes() < pstrlen + 49) {
return null;
}
buffer.skipBytes(pstrlen + 49);
return buffer.slice(index, pstrlen + 49);
}
}
}

View File

@@ -0,0 +1,75 @@
/*
* Copyright 2011 Rogiel Josias Sulzbach
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.torrent.protocol.peerwire.codec;
import net.torrent.protocol.peerwire.PeerWireState;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.frame.CorruptedFrameException;
import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
/**
* BitTorrent has two types of message headers:
* <p>
* <h1>Handshake message</h1> Handshake messages are composed of 1 byte, which
* indicates the size of protocol string ("BitTorrent protocol").
* <p>
* <h1>Messages</h1> All other messages are 4-byte integer containing the
* message length, an 1 byte opcode followed by the message content.
* <p>
* <p>
* Instances of this class keep channel state content and must not be shared nor
* cached.
*
* @author <a href="http://www.rogiel.com/">Rogiel Josias Sulzbach</a>
* @see PeerWireFrameDecoder#state
*/
public class PeerWireFrameEncoder extends OneToOneEncoder {
/**
* This connection state. This need to be shared with other encoders,
* decoders or handlers. But more importantly <b>NEVER</b> share the same
* instance across more than one {@link Channel}.
*/
private final PeerWireState state;
/**
* Creates a new instance of this encoder
*
* @param state
* the connection state
*/
public PeerWireFrameEncoder(PeerWireState state) {
this.state = state;
}
@Override
protected Object encode(ChannelHandlerContext ctx, Channel channel,
Object msg) throws Exception {
if (!(msg instanceof ChannelBuffer))
return msg;
if (!state.hasHandshaked())
// on handshake message there is no visible "frame"
return msg;
ChannelBuffer buffer = (ChannelBuffer) msg;
if (buffer.readableBytes() < 4)
throw new CorruptedFrameException(
"A frame must have at least 4 bytes!");
buffer.setInt(0, buffer.readableBytes() - 4);
return buffer;
}
}

View File

@@ -0,0 +1,128 @@
/*
* Copyright 2011 Rogiel Josias Sulzbach
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.torrent.protocol.peerwire.codec;
import java.util.Arrays;
import net.torrent.protocol.peerwire.PeerWireState;
import net.torrent.protocol.peerwire.message.HandshakeMessage;
import net.torrent.protocol.peerwire.message.KeepAliveMessage;
import net.torrent.protocol.peerwire.message.header.PeerWireFastPeersMessageHeaderManager;
import net.torrent.protocol.peerwire.message.header.PeerWireMessageHeaderManager;
import net.torrent.protocol.peerwire.message.header.PeerWireSpecificationMessageHeaderManager;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.oneone.OneToOneDecoder;
/**
* This decoder decodes a {@link ChannelBuffer} into an
* {@link PeerWireReadableMessage}.
*
* @author <a href="http://www.rogiel.com/">Rogiel Josias Sulzbach</a>
* @see PeerWireMessageDecoder#state
*/
public class PeerWireMessageDecoder extends OneToOneDecoder {
/**
* This connection state. This need to be shared with other encoders,
* decoders or handlers. But more importantly <b>NEVER</b> share the same
* instance across more than one {@link Channel}.
*/
private final PeerWireState state;
/**
* The is an list of header managers that will create message instances for
* each message id passed as argument.
* <p>
* {@link PeerWireSpecificationMessageHeaderManager} and
* {@link PeerWireFastPeersMessageHeaderManager} are already in the list.
*/
private PeerWireMessageHeaderManager[] headerManager = new PeerWireMessageHeaderManager[] {
PeerWireSpecificationMessageHeaderManager.SHARED_INSTANCE,
PeerWireFastPeersMessageHeaderManager.SHARED_INSTANCE };
/**
* Creates a new instance of this decoder
*
* @param state
* the connection state
*/
public PeerWireMessageDecoder(PeerWireState state) {
this.state = state;
}
@Override
protected Object decode(ChannelHandlerContext ctx, Channel channel,
Object msg) throws Exception {
if (!(msg instanceof ChannelBuffer))
return msg;
final ChannelBuffer buffer = (ChannelBuffer) msg;
if (buffer.readableBytes() == 0)
return new KeepAliveMessage();
final PeerWireReadableMessage message;
if (state.hasPeerHandshaked()) {
buffer.markReaderIndex();
final byte id = buffer.readByte();
message = getMessage(id);
} else {
message = new HandshakeMessage();
}
if (message == null) { // unknown message
buffer.resetReaderIndex();
return msg;
}
message.read(buffer);
return message;
}
/**
* Adds a new message header manager to this decoder
*
* @param newHeaderManager
* the new header manager
*/
public void addMessageHeader(PeerWireMessageHeaderManager newHeaderManager) {
headerManager = Arrays
.copyOf(headerManager, (headerManager.length + 1));
headerManager[(headerManager.length - 1)] = newHeaderManager;
}
/**
* Return the message represented by <tt>id</tt>.
* <p>
* Iterate over all <tt>handlers</tt> and try to locate the message. Will
* return null if message id is unknown.
*
* @param id
* the id of the message
* @return the message
*/
private PeerWireReadableMessage getMessage(byte id) {
PeerWireReadableMessage message = null;
for (final PeerWireMessageHeaderManager handler : headerManager) {
if (handler == null)
continue;
message = handler.getMessage(id);
if (message != null)
break;
}
return message;
}
}

View File

@@ -0,0 +1,49 @@
/*
* Copyright 2011 Rogiel Josias Sulzbach
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.torrent.protocol.peerwire.codec;
import net.torrent.protocol.peerwire.message.HandshakeMessage;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
/**
* This encoder encodes {@link PeerWireWritableMessage} to a
* {@link ChannelBuffer}.
*
* @author <a href="http://www.rogiel.com/">Rogiel Josias Sulzbach</a>
*/
public class PeerWireMessageEncoder extends OneToOneEncoder {
@Override
protected Object encode(ChannelHandlerContext ctx, Channel channel,
Object msg) throws Exception {
if (!(msg instanceof PeerWireWritableMessage))
return msg;
final PeerWireWritableMessage message = (PeerWireWritableMessage) msg;
final ChannelBuffer buffer = ChannelBuffers
.buffer(message.length() + 4);
if (!(message instanceof HandshakeMessage)) {
buffer.writeInt(0x00);
}
message.write(buffer);
return buffer;
}
}

View File

@@ -15,18 +15,13 @@
*/
package net.torrent.protocol.peerwire.codec;
import net.torrent.protocol.peerwire.message.BitfieldMessage;
import net.torrent.protocol.peerwire.message.CancelMessage;
import net.torrent.protocol.peerwire.message.ChokeMessage;
import java.util.Arrays;
import net.torrent.protocol.peerwire.PeerWireState;
import net.torrent.protocol.peerwire.message.HandshakeMessage;
import net.torrent.protocol.peerwire.message.HaveMessage;
import net.torrent.protocol.peerwire.message.InterestedMessage;
import net.torrent.protocol.peerwire.message.KeepAliveMessage;
import net.torrent.protocol.peerwire.message.NotInterestedMessage;
import net.torrent.protocol.peerwire.message.PieceMessage;
import net.torrent.protocol.peerwire.message.PortMessage;
import net.torrent.protocol.peerwire.message.RequestMessage;
import net.torrent.protocol.peerwire.message.UnchokeMessage;
import net.torrent.protocol.peerwire.message.header.PeerWireMessageHeaderManager;
import net.torrent.protocol.peerwire.message.header.PeerWireSpecificationMessageHeaderManager;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
@@ -48,16 +43,39 @@ import org.jboss.netty.handler.codec.frame.FrameDecoder;
* cached.
*
* @author <a href="http://www.rogiel.com/">Rogiel Josias Sulzbach</a>
* @see PeerWireOldDecoder#state
*/
public class PeerWireDecoder extends FrameDecoder {
private boolean handshaked = false;
@Deprecated
public class PeerWireOldDecoder extends FrameDecoder {
/**
* This connection state. This need to be shared with other encoders or
* decoders. But more importantly <b>NEVER</b> share the same instance
* across more than one {@link Channel}.
*/
private final PeerWireState state;
/**
* The is an list of handlers that will create message instances for each
* message id passed as argument.
*/
private PeerWireMessageHeaderManager[] handlers = new PeerWireMessageHeaderManager[] { PeerWireSpecificationMessageHeaderManager.SHARED_INSTANCE };
/**
* Creates a new instance of this decoder
*
* @param state
* the connection state
*/
public PeerWireOldDecoder(final PeerWireState state) {
this.state = state;
}
@Override
protected Object decode(ChannelHandlerContext ctx, Channel channel,
ChannelBuffer buffer) throws Exception {
buffer.markReaderIndex();
if (!handshaked) {
if (!state.hasHandshaked()) {
if (buffer.readableBytes() <= 47) // at least 47 bytes
return null;
@@ -70,7 +88,7 @@ public class PeerWireDecoder extends FrameDecoder {
final HandshakeMessage message = new HandshakeMessage();
message.read(buffer);
handshaked = true;
state.setHandshaked(true);
return message;
} else {
@@ -98,8 +116,21 @@ public class PeerWireDecoder extends FrameDecoder {
}
/**
* Return the message represented by <tt>id</tt>. Will return null if
* message id is unknown.
* Adds a new message handler to this decoder
*
* @param handler
* the handler
*/
public void addMessageHandler(PeerWireMessageHeaderManager handler) {
Arrays.copyOf(this.handlers, (this.handlers.length + 1));
this.handlers[(this.handlers.length - 1)] = handler;
}
/**
* Return the message represented by <tt>id</tt>.
* <p>
* Iterate over all <tt>handlers</tt> and try to locate the message. Will
* return null if message id is unknown.
*
* @param id
* the id of the message
@@ -107,37 +138,12 @@ public class PeerWireDecoder extends FrameDecoder {
*/
private PeerWireReadableMessage getMessage(byte id) {
PeerWireReadableMessage message = null;
switch (id) {
case BitfieldMessage.MESSAGE_ID:
message = new BitfieldMessage();
break;
case CancelMessage.MESSAGE_ID:
message = new CancelMessage();
break;
case ChokeMessage.MESSAGE_ID:
message = new ChokeMessage();
break;
case HaveMessage.MESSAGE_ID:
message = new HaveMessage();
break;
case InterestedMessage.MESSAGE_ID:
message = new InterestedMessage();
break;
case NotInterestedMessage.MESSAGE_ID:
message = new NotInterestedMessage();
break;
case PieceMessage.MESSAGE_ID:
message = new PieceMessage();
break;
case PortMessage.MESSAGE_ID:
message = new PortMessage();
break;
case RequestMessage.MESSAGE_ID:
message = new RequestMessage();
break;
case UnchokeMessage.MESSAGE_ID:
message = new UnchokeMessage();
break;
for (final PeerWireMessageHeaderManager handler : handlers) {
if (handler == null)
continue;
message = handler.getMessage(id);
if (message != null)
break;
}
return message;
}

View File

@@ -29,7 +29,8 @@ import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
*
* @author <a href="http://www.rogiel.com/">Rogiel Josias Sulzbach</a>
*/
public class PeerWireEncoder extends OneToOneEncoder {
@Deprecated
public class PeerWireOldEncoder extends OneToOneEncoder {
@Override
protected Object encode(ChannelHandlerContext ctx, Channel channel,
Object msg) throws Exception {

View File

@@ -33,4 +33,9 @@ public interface PeerWireWritableMessage {
* @throws IOException
*/
void write(ChannelBuffer buffer) throws IOException;
/**
* @return the length of the message
*/
int length();
}

View File

@@ -37,6 +37,7 @@ import net.torrent.protocol.peerwire.message.PieceMessage;
import net.torrent.protocol.peerwire.message.RequestMessage;
import net.torrent.protocol.peerwire.message.UnchokeMessage;
import net.torrent.protocol.peerwire.message.fast.AllowedFastMessage;
import net.torrent.protocol.peerwire.message.fast.RejectMessage;
import net.torrent.protocol.peerwire.message.fast.SuggestPieceMessage;
import net.torrent.torrent.Torrent;
import net.torrent.torrent.TorrentPart;
@@ -66,6 +67,7 @@ import org.jboss.netty.handler.timeout.IdleStateEvent;
*
* @author <a href="http://www.rogiel.com/">Rogiel Josias Sulzbach</a>
*/
// TODO separate extensions handler from algorithm handler
public class PeerWireAlgorithmHandler extends IdleStateAwareChannelHandler {
/**
* The torrent manager
@@ -120,7 +122,7 @@ public class PeerWireAlgorithmHandler extends IdleStateAwareChannelHandler {
peer.handshake(manager.getTorrent().getInfoHash().toByteArray(),
"-TR2050-mcm14ye4h2mq".getBytes(), manager.getContext()
.getCapabilites().toBitSet());
peer.port((short) 1541);
// peer.port((short) 1541);
super.channelConnected(ctx, e);
}
@@ -184,6 +186,12 @@ public class PeerWireAlgorithmHandler extends IdleStateAwareChannelHandler {
final Torrent torrent = manager.getTorrent();
final TorrentPiece piece = torrent.getPiece(suggest.getPiece());
suggested(peer, piece);
} else if (msg instanceof RejectMessage) {
final RejectMessage request = (RejectMessage) msg;
final Torrent torrent = manager.getTorrent();
final TorrentPart part = torrent.getPart(request.getIndex(),
request.getStart(), request.getLength());
rejected(peer, part);
}
super.messageReceived(ctx, e);
}
@@ -218,7 +226,7 @@ public class PeerWireAlgorithmHandler extends IdleStateAwareChannelHandler {
peer.interested();
return;
case UNINTERESTED:
testChoke(peer);
peer.uninterested();
return;
}
}
@@ -483,4 +491,27 @@ public class PeerWireAlgorithmHandler extends IdleStateAwareChannelHandler {
return;
download(peer, part);
}
private void rejected(PeerWirePeer peer, TorrentPart part) {
switch (downloadAlgorithm.rejected(peer.getTorrentPeer(), part)) {
case DISCONNECT:
peer.disconnect();
break;
case CONNECT_NEW_PEER:
peer.disconnect();
connect(peerAlgorithm.connect());
break;
case NOT_INTERESTED:
peer.uninterested();
break;
case RETRY:
download(peer, part);
break;
case TRY_ANOTHER_PIECE:
final TorrentPart nextPart = downloadAlgorithm.getNextPart(
peer.getTorrentPeer(), part);
download(peer, nextPart);
break;
}
}
}

View File

@@ -0,0 +1,87 @@
/*
* Copyright 2011 Rogiel Josias Sulzbach
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.torrent.protocol.peerwire.handler;
import net.torrent.protocol.peerwire.PeerWireState;
import net.torrent.protocol.peerwire.message.HandshakeMessage;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
/**
* Handler used to manage the codecs' state
*
* @author <a href="http://www.rogiel.com/">Rogiel Josias Sulzbach</a>
*/
public class PeerWireCodecHandler extends SimpleChannelHandler {
/**
* This connection state. This need to be shared with other encoders,
* decoders or handlers. But more importantly <b>NEVER</b> share the same
* instance across more than one {@link Channel}.
*/
private final PeerWireState state;
/**
* Creates a new instance
*
* @param state
* the connection state
*/
public PeerWireCodecHandler(final PeerWireState state) {
this.state = state;
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
throws Exception {
if (e.getMessage() instanceof HandshakeMessage) {
if (state.hasPeerHandshaked())
throw new IllegalStateException(
"Peer is trying to handshaked twice");
state.setPeerHandshaked(true);
}
super.messageReceived(ctx, e);
}
@Override
public void writeRequested(ChannelHandlerContext ctx, MessageEvent e)
throws Exception {
if (e.getMessage() instanceof HandshakeMessage) {
if (state.hasHandshaked())
throw new IllegalStateException("Handshake has already been sent");
e.getFuture().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future)
throws Exception {
state.setHandshaked(true);
}
});
}
super.writeRequested(ctx, e);
}
@Override
public void channelDisconnected(ChannelHandlerContext ctx,
ChannelStateEvent e) throws Exception {
state.setHandshaked(false);
state.setPeerHandshaked(false);
}
}

View File

@@ -104,6 +104,11 @@ public class BitfieldMessage implements PeerWireWritableMessage,
}
}
@Override
public int length() {
return bitfield.size() / 8 + 1;
}
public BitSet getBitfield() {
return bitfield;
}

View File

@@ -74,6 +74,11 @@ public class CancelMessage implements PeerWireWritableMessage,
buffer.writeInt(start);
buffer.writeInt(length);
}
@Override
public int length() {
return 3 * 4 + 1;
}
public int getIndex() {
return index;

View File

@@ -44,6 +44,11 @@ public class ChokeMessage implements PeerWireWritableMessage,
public void write(ChannelBuffer buffer) throws IOException {
buffer.writeByte(MESSAGE_ID);
}
@Override
public int length() {
return 1;
}
@Override
public String toString() {

View File

@@ -74,16 +74,7 @@ public class HandshakeMessage implements PeerWireWritableMessage,
}
}
}
System.out.println(reserved);
// int bit = 0;
// for (int i = 0; i < 8; i++) {
// byte b = buffer.readByte();
// for (int j = 128; j > 0; j >>= 1) {
// reserved.set(bit++, (b & j) != 0);
// }
// }
// this.reserved = buffer.readBytes(8).array();
this.infohash = buffer.readBytes(20).array();
this.peerId = buffer.readBytes(20).array();
}
@@ -108,6 +99,11 @@ public class HandshakeMessage implements PeerWireWritableMessage,
buffer.writeBytes(this.peerId);
}
@Override
public int length() {
return 1 + pstrlen + pstr.length() + 8 + infohash.length + peerId.length;
}
public int getPstrlen() {
return pstrlen;
}

View File

@@ -71,6 +71,11 @@ public class HaveMessage implements PeerWireWritableMessage,
buffer.writeByte(MESSAGE_ID);
buffer.writeInt(piece);
}
@Override
public int length() {
return 5;
}
public int getPiece() {
return piece;

View File

@@ -44,6 +44,11 @@ public class InterestedMessage implements PeerWireWritableMessage,
public void write(ChannelBuffer buffer) throws IOException {
buffer.writeByte(MESSAGE_ID);
}
@Override
public int length() {
return 1;
}
@Override
public String toString() {

View File

@@ -46,6 +46,11 @@ public class KeepAliveMessage implements PeerWireWritableMessage,
@Override
public void write(ChannelBuffer buffer) throws IOException {
}
@Override
public int length() {
return 0;
}
@Override
public String toString() {

View File

@@ -44,6 +44,11 @@ public class NotInterestedMessage implements PeerWireWritableMessage,
public void write(ChannelBuffer buffer) throws IOException {
buffer.writeByte(MESSAGE_ID);
}
@Override
public int length() {
return 1;
}
@Override
public String toString() {

View File

@@ -90,6 +90,11 @@ public class PieceMessage implements PeerWireWritableMessage,
buffer.writeInt(start);
buffer.writeBytes(block);
}
@Override
public int length() {
return 9 + block.capacity();
}
public int getIndex() {
return index;

View File

@@ -61,6 +61,11 @@ public class PortMessage implements PeerWireWritableMessage,
buffer.writeByte(MESSAGE_ID);
buffer.writeShort(port);
}
@Override
public int length() {
return 3;
}
public short getPort() {
return port;

View File

@@ -112,6 +112,11 @@ public class RequestMessage implements PeerWireWritableMessage,
buffer.writeInt(start);
buffer.writeInt(length);
}
@Override
public int length() {
return 13;
}
public int getIndex() {
return index;

View File

@@ -44,6 +44,11 @@ public class UnchokeMessage implements PeerWireWritableMessage,
public void write(ChannelBuffer buffer) throws IOException {
buffer.writeByte(MESSAGE_ID);
}
@Override
public int length() {
return 1;
}
@Override
public String toString() {

View File

@@ -68,7 +68,7 @@ import org.jboss.netty.buffer.ChannelBuffer;
*/
public class AllowedFastMessage implements PeerWireWritableMessage,
PeerWireReadableMessage {
public static final byte MESSAGE_ID = 0x0D;
public static final byte MESSAGE_ID = 0x11;
/**
* The pieces indexes
@@ -97,6 +97,11 @@ public class AllowedFastMessage implements PeerWireWritableMessage,
buffer.writeInt(piece);
}
}
@Override
public int length() {
return 1 + pieces.length * 4;
}
public int[] getPieces() {
return pieces;

View File

@@ -52,6 +52,11 @@ public class HaveAllMessage implements PeerWireWritableMessage,
public void write(ChannelBuffer buffer) throws IOException {
buffer.writeByte(MESSAGE_ID);
}
@Override
public int length() {
return 1;
}
@Override
public String toString() {

View File

@@ -52,6 +52,11 @@ public class HaveNoneMessage implements PeerWireWritableMessage,
public void write(ChannelBuffer buffer) throws IOException {
buffer.writeByte(MESSAGE_ID);
}
@Override
public int length() {
return 1;
}
@Override
public String toString() {

View File

@@ -94,6 +94,11 @@ public class RejectMessage implements PeerWireWritableMessage,
buffer.writeInt(start);
buffer.writeInt(length);
}
@Override
public int length() {
return 13;
}
public int getIndex() {
return index;

View File

@@ -69,6 +69,11 @@ public class SuggestPieceMessage implements PeerWireWritableMessage,
buffer.writeByte(MESSAGE_ID);
buffer.writeInt(piece);
}
@Override
public int length() {
return 5;
}
public int getPiece() {
return piece;

View File

@@ -0,0 +1,52 @@
/*
* Copyright 2011 Rogiel Josias Sulzbach
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.torrent.protocol.peerwire.message.header;
import net.torrent.protocol.peerwire.codec.PeerWireReadableMessage;
import net.torrent.protocol.peerwire.message.fast.AllowedFastMessage;
import net.torrent.protocol.peerwire.message.fast.HaveAllMessage;
import net.torrent.protocol.peerwire.message.fast.HaveNoneMessage;
import net.torrent.protocol.peerwire.message.fast.RejectMessage;
import net.torrent.protocol.peerwire.message.fast.SuggestPieceMessage;
import net.torrent.torrent.context.TorrentPeerCapabilities.TorrentPeerCapability;
/**
* The message header manager for {@link TorrentPeerCapability#FAST_PEERS Fast
* Peers extension}.
*
* @author <a href="http://www.rogiel.com/">Rogiel Josias Sulzbach</a>
*/
public final class PeerWireFastPeersMessageHeaderManager implements
PeerWireMessageHeaderManager {
public static final PeerWireFastPeersMessageHeaderManager SHARED_INSTANCE = new PeerWireFastPeersMessageHeaderManager();
@Override
public PeerWireReadableMessage getMessage(byte id) {
switch (id) {
case AllowedFastMessage.MESSAGE_ID:
return new AllowedFastMessage();
case HaveAllMessage.MESSAGE_ID:
return new HaveAllMessage();
case HaveNoneMessage.MESSAGE_ID:
return new HaveNoneMessage();
case RejectMessage.MESSAGE_ID:
return new RejectMessage();
case SuggestPieceMessage.MESSAGE_ID:
return new SuggestPieceMessage();
}
return null;
}
}

View File

@@ -0,0 +1,36 @@
/*
* Copyright 2011 Rogiel Josias Sulzbach
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.torrent.protocol.peerwire.message.header;
import net.torrent.protocol.peerwire.codec.PeerWireReadableMessage;
/**
* The message header manager return the message to each given message id (aka
* opcode).
*
* @author <a href="http://www.rogiel.com/">Rogiel Josias Sulzbach</a>
*/
public interface PeerWireMessageHeaderManager {
/**
* Return the message for an given <tt>id</tt>. If this handler cannot
* handle the message id, <b>null</b> must be returned.
*
* @param id
* the message id
* @return the message, <tt>null</tt> if <tt>id</tt> is unknown.
*/
PeerWireReadableMessage getMessage(byte id);
}

View File

@@ -0,0 +1,65 @@
/*
* Copyright 2011 Rogiel Josias Sulzbach
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.torrent.protocol.peerwire.message.header;
import net.torrent.protocol.peerwire.codec.PeerWireReadableMessage;
import net.torrent.protocol.peerwire.message.BitfieldMessage;
import net.torrent.protocol.peerwire.message.CancelMessage;
import net.torrent.protocol.peerwire.message.ChokeMessage;
import net.torrent.protocol.peerwire.message.HaveMessage;
import net.torrent.protocol.peerwire.message.InterestedMessage;
import net.torrent.protocol.peerwire.message.NotInterestedMessage;
import net.torrent.protocol.peerwire.message.PieceMessage;
import net.torrent.protocol.peerwire.message.PortMessage;
import net.torrent.protocol.peerwire.message.RequestMessage;
import net.torrent.protocol.peerwire.message.UnchokeMessage;
/**
* The default PeerWire message handler
*
* @author <a href="http://www.rogiel.com/">Rogiel Josias Sulzbach</a>
*/
public final class PeerWireSpecificationMessageHeaderManager implements
PeerWireMessageHeaderManager {
public static final PeerWireSpecificationMessageHeaderManager SHARED_INSTANCE = new PeerWireSpecificationMessageHeaderManager();
@Override
public PeerWireReadableMessage getMessage(byte id) {
switch (id) {
case BitfieldMessage.MESSAGE_ID:
return new BitfieldMessage();
case CancelMessage.MESSAGE_ID:
return new CancelMessage();
case ChokeMessage.MESSAGE_ID:
return new ChokeMessage();
case HaveMessage.MESSAGE_ID:
return new HaveMessage();
case InterestedMessage.MESSAGE_ID:
return new InterestedMessage();
case NotInterestedMessage.MESSAGE_ID:
return new NotInterestedMessage();
case PieceMessage.MESSAGE_ID:
return new PieceMessage();
case PortMessage.MESSAGE_ID:
return new PortMessage();
case RequestMessage.MESSAGE_ID:
return new RequestMessage();
case UnchokeMessage.MESSAGE_ID:
return new UnchokeMessage();
}
return null;
}
}

View File

@@ -35,10 +35,10 @@ public class TorrentContext {
private final TorrentBitfield bitfield = new TorrentBitfield(this);
/**
* The capabilities
* The capabilities supported in this context
*/
private final TorrentPeerCapabilities capabilites = new TorrentPeerCapabilities(
TorrentPeerCapability.DHT, TorrentPeerCapability.FAST_PEERS);
TorrentPeerCapability.DHT);
private final Set<TorrentPeer> peers = new HashSet<TorrentPeer>();
/**

View File

@@ -19,6 +19,9 @@ import java.util.ArrayList;
import java.util.BitSet;
import java.util.List;
import net.torrent.protocol.peerwire.message.header.PeerWireFastPeersMessageHeaderManager;
import net.torrent.protocol.peerwire.message.header.PeerWireMessageHeaderManager;
/**
* Object containing peers support for certain capabilities.
*
@@ -66,7 +69,7 @@ public class TorrentPeerCapabilities {
* @return true if capability is supported, false otherwise.
*/
public boolean supports(TorrentPeerCapability capability) {
return capabilities.get(capability.getBit());
return capabilities.get(capability.bit);
}
/**
@@ -121,7 +124,7 @@ public class TorrentPeerCapabilities {
/**
* Fast peers support
*/
FAST_PEERS(62),
FAST_PEERS(62, PeerWireFastPeersMessageHeaderManager.SHARED_INSTANCE),
/**
* DHT Support
*/
@@ -130,25 +133,34 @@ public class TorrentPeerCapabilities {
/**
* The bit index for this capability
*/
private final int bit;
public final int bit;
/**
* The header manager for this extension
*/
public final PeerWireMessageHeaderManager headerManager;
/**
* Creates a new capability
*
* @param bit
* the bit marking this capability
* @param headerManager
* the header manager for this extension
*/
TorrentPeerCapability(int bit) {
TorrentPeerCapability(int bit,
PeerWireMessageHeaderManager headerManager) {
this.bit = bit;
this.headerManager = headerManager;
}
/**
* Get the bit marking this capability
* Creates a new capability will a <tt>null</tt> <tt>handlerManager</tt>
*
* @return the bit marking the capability
* @param bit
* the bit marking this capability
*/
public int getBit() {
return bit;
TorrentPeerCapability(int bit) {
this(bit, null);
}
}
}