diff --git a/.gitignore b/.gitignore index 19f2e00..b461b83 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,3 @@ /target /target +/store.bin diff --git a/src/main/java/net/torrent/BitTorrentClientFactory.java b/src/main/java/net/torrent/BitTorrentClientFactory.java index ee0f979..578cf2c 100644 --- a/src/main/java/net/torrent/BitTorrentClientFactory.java +++ b/src/main/java/net/torrent/BitTorrentClientFactory.java @@ -27,6 +27,8 @@ import net.torrent.protocol.peerwire.PeerWireManager; import net.torrent.protocol.peerwire.manager.TorrentManager; import net.torrent.torrent.Torrent; import net.torrent.torrent.context.TorrentContext; +import net.torrent.torrent.piece.PieceSelector; +import net.torrent.torrent.piece.RandomPieceSelector; /** * Factory class for {@link BitTorrentClient}. @@ -55,6 +57,10 @@ public class BitTorrentClientFactory { * The torrent algorithm */ private TorrentAlgorithm algorithm; + /** + * The piece selector + */ + private PieceSelector selector; /** * Creates a new standard {@link BitTorrentClient BitTorrent client} @@ -78,10 +84,12 @@ public class BitTorrentClientFactory { * the torrent */ public BitTorrentClientFactory(final Torrent torrent) { - context = new TorrentContext(torrent); - datastore = new PlainTorrentDatastore(new File("store.bin")); - manager = new TorrentManager(context, datastore); - algorithm = new TorrentStdAlgorithm(manager); + this.context = new TorrentContext(torrent); + this.datastore = new PlainTorrentDatastore(new File("store.bin")); + this.manager = new TorrentManager(context, datastore); + if (this.selector == null) + this.selector = new RandomPieceSelector(manager); + this.algorithm = new TorrentStdAlgorithm(manager, selector); } /** @@ -110,10 +118,12 @@ public class BitTorrentClientFactory { */ public BitTorrentClientFactory(final Torrent torrent, TorrentDatastore datastore) { - context = new TorrentContext(torrent); + this.context = new TorrentContext(torrent); this.datastore = datastore; - manager = new TorrentManager(context, datastore); - algorithm = new TorrentStdAlgorithm(manager); + this.manager = new TorrentManager(context, datastore); + if (this.selector == null) + this.selector = new RandomPieceSelector(manager); + this.algorithm = new TorrentStdAlgorithm(manager, selector); } /** @@ -128,9 +138,9 @@ public class BitTorrentClientFactory { */ public BitTorrentClientFactory(final Torrent torrent, TorrentDatastore datastore, final TorrentAlgorithm algorithm) { - context = new TorrentContext(torrent); + this.context = new TorrentContext(torrent); this.datastore = datastore; - manager = new TorrentManager(context, datastore); + this.manager = new TorrentManager(context, datastore); this.algorithm = algorithm; } diff --git a/src/main/java/net/torrent/protocol/algorithm/TorrentPieceDownloadAlgorithm.java b/src/main/java/net/torrent/protocol/algorithm/TorrentPieceDownloadAlgorithm.java index f30ac0c..6512c57 100644 --- a/src/main/java/net/torrent/protocol/algorithm/TorrentPieceDownloadAlgorithm.java +++ b/src/main/java/net/torrent/protocol/algorithm/TorrentPieceDownloadAlgorithm.java @@ -64,6 +64,50 @@ public interface TorrentPieceDownloadAlgorithm { */ TorrentPart allowedFast(TorrentPeer peer, TorrentPiece piece); + /** + * Issued when the peer has rejected our request. + * + * @param peer + * the rejecting peer + * @param part + * the rejected part + * @see RejectAction + */ + RejectAction rejected(TorrentPeer peer, TorrentPart part); + + public enum RejectAction { + /** + * Only disconnects the peer, does not initiate a new connection with + * anyone. + */ + DISCONNECT, + + /** + * Disconnects the current peer and connects a new one + */ + CONNECT_NEW_PEER, + + /** + * Choke this peer + */ + NOT_INTERESTED, + + /** + * Request again the same part + */ + RETRY, + + /** + * Try to download another piece + */ + TRY_ANOTHER_PIECE, + + /** + * Do nothing, ignore. Might cause peer to become idle. + */ + IGNORE; + } + /** * Test if an certain piece has all its parts already download. If true, a * checksum will be performed and a message informing we have this piece diff --git a/src/main/java/net/torrent/protocol/algorithm/impl/TorrentStdAlgorithm.java b/src/main/java/net/torrent/protocol/algorithm/impl/TorrentStdAlgorithm.java index 507eefe..b6c5184 100644 --- a/src/main/java/net/torrent/protocol/algorithm/impl/TorrentStdAlgorithm.java +++ b/src/main/java/net/torrent/protocol/algorithm/impl/TorrentStdAlgorithm.java @@ -37,14 +37,14 @@ public class TorrentStdAlgorithm implements TorrentAlgorithm { private final TorrentPieceDownloadAlgorithm downloadAlgorithm; private final TorrentPieceUploadAlgorithm uploadAlgorithm; - public TorrentStdAlgorithm(final TorrentManager manager) { + public TorrentStdAlgorithm(final TorrentManager manager, + final PieceSelector selector) { pieceSelector = new ScoredPieceSelector(manager); - + peerAlgorithm = new TorrentStdPeerAlgorithm(manager); - interestAlgorithm = new TorrentStdInterestAlgorithm(manager, - pieceSelector); + interestAlgorithm = new TorrentStdInterestAlgorithm(manager, selector); downloadAlgorithm = new TorrentStdPieceDownloadAlgorithm(manager, - pieceSelector); + selector); uploadAlgorithm = new TorrentStdPieceUploadAlgorithm(manager); } diff --git a/src/main/java/net/torrent/protocol/algorithm/impl/TorrentStdInterestAlgorithm.java b/src/main/java/net/torrent/protocol/algorithm/impl/TorrentStdInterestAlgorithm.java index d185f8b..1b86b46 100644 --- a/src/main/java/net/torrent/protocol/algorithm/impl/TorrentStdInterestAlgorithm.java +++ b/src/main/java/net/torrent/protocol/algorithm/impl/TorrentStdInterestAlgorithm.java @@ -28,8 +28,16 @@ import net.torrent.torrent.piece.PieceSelector; * @author Rogiel Josias Sulzbach */ public class TorrentStdInterestAlgorithm implements TorrentInterestAlgorithm { + /** + * The torrent manager + */ @SuppressWarnings("unused") private final TorrentManager manager; + + /** + * This selector is used to find the next piece to be downloaded. Parts are + * managed inside this algorithm. + */ private final PieceSelector selector; /** @@ -37,22 +45,20 @@ public class TorrentStdInterestAlgorithm implements TorrentInterestAlgorithm { * * @param manager * the manager - * @param pieceSelector + * @param selector * the piece selector */ public TorrentStdInterestAlgorithm(TorrentManager manager, - PieceSelector pieceSelector) { + PieceSelector selector) { this.manager = manager; - this.selector = pieceSelector; + this.selector = selector; } @Override public InterestState interested(TorrentPeer peer) { int pieces = selector.countPieces(peer); - if(pieces >= 5) + if (pieces >= 5) return InterestState.INTERESTED; - - return InterestState.INTERESTED; } diff --git a/src/main/java/net/torrent/protocol/algorithm/impl/TorrentStdPieceDownloadAlgorithm.java b/src/main/java/net/torrent/protocol/algorithm/impl/TorrentStdPieceDownloadAlgorithm.java index be50b7e..138d60e 100644 --- a/src/main/java/net/torrent/protocol/algorithm/impl/TorrentStdPieceDownloadAlgorithm.java +++ b/src/main/java/net/torrent/protocol/algorithm/impl/TorrentStdPieceDownloadAlgorithm.java @@ -32,6 +32,7 @@ import net.torrent.torrent.piece.PieceSelector; * * @author Rogiel Josias Sulzbach */ +// TODO separate standard algorithm from extension ones public class TorrentStdPieceDownloadAlgorithm implements TorrentPieceDownloadAlgorithm { /** @@ -60,11 +61,13 @@ public class TorrentStdPieceDownloadAlgorithm implements * retrieve current downloads/uploads and connections. * @param pieceSelector * the piece selector + * @param selector + * the piece selector */ public TorrentStdPieceDownloadAlgorithm(TorrentManager manager, - PieceSelector pieceSelector) { + PieceSelector selector) { this.manager = manager; - this.selector = pieceSelector; + this.selector = selector; } @Override @@ -94,6 +97,11 @@ public class TorrentStdPieceDownloadAlgorithm implements return piece.getFirstPart(); } + @Override + public RejectAction rejected(TorrentPeer peer, TorrentPart part) { + return RejectAction.TRY_ANOTHER_PIECE; + } + @Override public boolean isComplete(TorrentPeer peer, TorrentPiece piece) { if (manager.getContext().getBitfield().hasPiece(piece)) diff --git a/src/main/java/net/torrent/protocol/peerwire/PeerWirePipelineFactory.java b/src/main/java/net/torrent/protocol/peerwire/PeerWirePipelineFactory.java index 154d36d..eac9667 100644 --- a/src/main/java/net/torrent/protocol/peerwire/PeerWirePipelineFactory.java +++ b/src/main/java/net/torrent/protocol/peerwire/PeerWirePipelineFactory.java @@ -16,8 +16,11 @@ package net.torrent.protocol.peerwire; import static org.jboss.netty.channel.Channels.pipeline; -import net.torrent.protocol.peerwire.codec.PeerWireDecoder; -import net.torrent.protocol.peerwire.codec.PeerWireEncoder; +import net.torrent.protocol.peerwire.codec.PeerWireFrameDecoder; +import net.torrent.protocol.peerwire.codec.PeerWireFrameEncoder; +import net.torrent.protocol.peerwire.codec.PeerWireMessageDecoder; +import net.torrent.protocol.peerwire.codec.PeerWireMessageEncoder; +import net.torrent.protocol.peerwire.handler.PeerWireCodecHandler; import net.torrent.protocol.peerwire.handler.PeerWireManagerHeadHandler; import net.torrent.protocol.peerwire.handler.PeerWireManagerTailHandler; import net.torrent.protocol.peerwire.manager.TorrentManager; @@ -38,7 +41,8 @@ public class PeerWirePipelineFactory implements ChannelPipelineFactory { * The logging handler */ private final LoggingHandler loggingHandler = new LoggingHandler( - InternalLogLevel.WARN); + InternalLogLevel.INFO); + /** * The algorithm handler */ @@ -70,15 +74,28 @@ public class PeerWirePipelineFactory implements ChannelPipelineFactory { @Override public ChannelPipeline getPipeline() throws Exception { final ChannelPipeline pipeline = pipeline(); + final PeerWireState state = new PeerWireState(); - // TODO create traffic shape handler - // TODO create firewall handler - block connections from unwanted peers + // TODO create traffic shape handler once Netty 4.0 is released + // TODO create firewall handler - block connections from unwanted peers. - pipeline.addLast("decoder", new PeerWireDecoder()); - pipeline.addLast("encoder", new PeerWireEncoder()); + // pipeline.addLast("old.decoder", new PeerWireOldDecoder(state)); + // pipeline.addLast("old.encoder", new PeerWireOldEncoder()); + // frame (or header) codec + pipeline.addLast("frame.decoder", new PeerWireFrameDecoder(state)); + pipeline.addLast("frame.encoder", new PeerWireFrameEncoder(state)); + + // message codec + pipeline.addLast("message.decoder", new PeerWireMessageDecoder(state)); + pipeline.addLast("message.encoder", new PeerWireMessageEncoder()); + + pipeline.addLast("codec.handler", new PeerWireCodecHandler(state)); + + // logging handler (before any other handler can take action) pipeline.addLast("logging", loggingHandler); + // handlers pipeline.addLast("head-handler", headManagerHandler); pipeline.addLast("algorithm", algorithmHandler); pipeline.addLast("tail-handler", tailManagerHandler); diff --git a/src/main/java/net/torrent/protocol/peerwire/PeerWireState.java b/src/main/java/net/torrent/protocol/peerwire/PeerWireState.java new file mode 100644 index 0000000..13352a8 --- /dev/null +++ b/src/main/java/net/torrent/protocol/peerwire/PeerWireState.java @@ -0,0 +1,75 @@ +/* + * Copyright 2011 Rogiel Josias Sulzbach + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package net.torrent.protocol.peerwire; + +import org.jboss.netty.channel.Channel; + +/** + * Maintain an per {@link Channel} information about the connection state. + * + * @author Rogiel Josias Sulzbach + */ +public class PeerWireState { + /** + * The peer handshake state + */ + private boolean peerHandshaked; + + /** + * The handshake state + */ + private boolean handshaked; + + /** + * Returns true if this connection has already been handshaked by the peer + * end. + * + * @return true if connection was handshaked + */ + public boolean hasPeerHandshaked() { + return peerHandshaked; + } + + /** + * Set the handshake state of this connection by the peer end. + * + * @param peerHandshaked + * the handshake state + */ + public void setPeerHandshaked(boolean peerHandshaked) { + this.peerHandshaked = peerHandshaked; + } + + /** + * Returns true if this connection has already been handshaked by the + * library end. + * + * @return true if connection was handshaked + */ + public boolean hasHandshaked() { + return handshaked; + } + + /** + * Set the handshake state of this connection by the library end. + * + * @param handshaked + * the handshake state + */ + public void setHandshaked(boolean handshaked) { + this.handshaked = handshaked; + } +} diff --git a/src/main/java/net/torrent/protocol/peerwire/codec/PeerWireFrameDecoder.java b/src/main/java/net/torrent/protocol/peerwire/codec/PeerWireFrameDecoder.java new file mode 100644 index 0000000..250cd04 --- /dev/null +++ b/src/main/java/net/torrent/protocol/peerwire/codec/PeerWireFrameDecoder.java @@ -0,0 +1,96 @@ +/* + * Copyright 2011 Rogiel Josias Sulzbach + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package net.torrent.protocol.peerwire.codec; + +import net.torrent.protocol.peerwire.PeerWireState; + +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.handler.codec.frame.CorruptedFrameException; +import org.jboss.netty.handler.codec.frame.FrameDecoder; + +/** + * BitTorrent has two types of message headers: + *
+ *
+ *
+ *
+ * Instances of this class keep channel state content and must not be shared nor + * cached. + * + * @author Rogiel Josias Sulzbach + * @see PeerWireFrameDecoder#state + */ +public class PeerWireFrameDecoder extends FrameDecoder { + /** + * This connection state. This need to be shared with other encoders, + * decoders or handlers. But more importantly NEVER share the same + * instance across more than one {@link Channel}. + */ + private final PeerWireState state; + + /** + * Creates a new instance of this decoder + * + * @param state + * the connection state + */ + public PeerWireFrameDecoder(final PeerWireState state) { + this.state = state; + } + + @Override + protected Object decode(ChannelHandlerContext ctx, Channel channel, + ChannelBuffer buffer) throws Exception { + buffer.markReaderIndex(); + final int index = buffer.readerIndex(); + + if (state.hasPeerHandshaked()) { + if (buffer.readableBytes() <= 4) + return null; + int len = buffer.readInt(); + if (len == 0) { + // keep-alive message + return ChannelBuffers.EMPTY_BUFFER; + } else if (buffer.readableBytes() < len) { + buffer.resetReaderIndex(); + return null; + } + buffer.skipBytes(len); + return buffer.slice(index + 4, len); + } else { + if (buffer.readableBytes() <= 1) // at least 1 byte for header + return null; + + final int pstrlen = buffer.readByte(); + if (pstrlen != 19) + throw new CorruptedFrameException( + "Handshake frame is corrupted. pstrlen != 19"); + buffer.resetReaderIndex(); + if (buffer.readableBytes() < pstrlen + 49) { + return null; + } + buffer.skipBytes(pstrlen + 49); + return buffer.slice(index, pstrlen + 49); + } + } +} diff --git a/src/main/java/net/torrent/protocol/peerwire/codec/PeerWireFrameEncoder.java b/src/main/java/net/torrent/protocol/peerwire/codec/PeerWireFrameEncoder.java new file mode 100644 index 0000000..51640bd --- /dev/null +++ b/src/main/java/net/torrent/protocol/peerwire/codec/PeerWireFrameEncoder.java @@ -0,0 +1,75 @@ +/* + * Copyright 2011 Rogiel Josias Sulzbach + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package net.torrent.protocol.peerwire.codec; + +import net.torrent.protocol.peerwire.PeerWireState; + +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.handler.codec.frame.CorruptedFrameException; +import org.jboss.netty.handler.codec.oneone.OneToOneEncoder; + +/** + * BitTorrent has two types of message headers: + *
+ *
+ *
+ *
+ * Instances of this class keep channel state content and must not be shared nor + * cached. + * + * @author Rogiel Josias Sulzbach + * @see PeerWireFrameDecoder#state + */ +public class PeerWireFrameEncoder extends OneToOneEncoder { + /** + * This connection state. This need to be shared with other encoders, + * decoders or handlers. But more importantly NEVER share the same + * instance across more than one {@link Channel}. + */ + private final PeerWireState state; + + /** + * Creates a new instance of this encoder + * + * @param state + * the connection state + */ + public PeerWireFrameEncoder(PeerWireState state) { + this.state = state; + } + + @Override + protected Object encode(ChannelHandlerContext ctx, Channel channel, + Object msg) throws Exception { + if (!(msg instanceof ChannelBuffer)) + return msg; + if (!state.hasHandshaked()) + // on handshake message there is no visible "frame" + return msg; + ChannelBuffer buffer = (ChannelBuffer) msg; + if (buffer.readableBytes() < 4) + throw new CorruptedFrameException( + "A frame must have at least 4 bytes!"); + buffer.setInt(0, buffer.readableBytes() - 4); + return buffer; + } +} diff --git a/src/main/java/net/torrent/protocol/peerwire/codec/PeerWireMessageDecoder.java b/src/main/java/net/torrent/protocol/peerwire/codec/PeerWireMessageDecoder.java new file mode 100644 index 0000000..eadd25a --- /dev/null +++ b/src/main/java/net/torrent/protocol/peerwire/codec/PeerWireMessageDecoder.java @@ -0,0 +1,128 @@ +/* + * Copyright 2011 Rogiel Josias Sulzbach + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package net.torrent.protocol.peerwire.codec; + +import java.util.Arrays; + +import net.torrent.protocol.peerwire.PeerWireState; +import net.torrent.protocol.peerwire.message.HandshakeMessage; +import net.torrent.protocol.peerwire.message.KeepAliveMessage; +import net.torrent.protocol.peerwire.message.header.PeerWireFastPeersMessageHeaderManager; +import net.torrent.protocol.peerwire.message.header.PeerWireMessageHeaderManager; +import net.torrent.protocol.peerwire.message.header.PeerWireSpecificationMessageHeaderManager; + +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.handler.codec.oneone.OneToOneDecoder; + +/** + * This decoder decodes a {@link ChannelBuffer} into an + * {@link PeerWireReadableMessage}. + * + * @author Rogiel Josias Sulzbach + * @see PeerWireMessageDecoder#state + */ +public class PeerWireMessageDecoder extends OneToOneDecoder { + /** + * This connection state. This need to be shared with other encoders, + * decoders or handlers. But more importantly NEVER share the same + * instance across more than one {@link Channel}. + */ + private final PeerWireState state; + + /** + * The is an list of header managers that will create message instances for + * each message id passed as argument. + *
+ * {@link PeerWireSpecificationMessageHeaderManager} and + * {@link PeerWireFastPeersMessageHeaderManager} are already in the list. + */ + private PeerWireMessageHeaderManager[] headerManager = new PeerWireMessageHeaderManager[] { + PeerWireSpecificationMessageHeaderManager.SHARED_INSTANCE, + PeerWireFastPeersMessageHeaderManager.SHARED_INSTANCE }; + + /** + * Creates a new instance of this decoder + * + * @param state + * the connection state + */ + public PeerWireMessageDecoder(PeerWireState state) { + this.state = state; + } + + @Override + protected Object decode(ChannelHandlerContext ctx, Channel channel, + Object msg) throws Exception { + if (!(msg instanceof ChannelBuffer)) + return msg; + final ChannelBuffer buffer = (ChannelBuffer) msg; + if (buffer.readableBytes() == 0) + return new KeepAliveMessage(); + + final PeerWireReadableMessage message; + if (state.hasPeerHandshaked()) { + buffer.markReaderIndex(); + + final byte id = buffer.readByte(); + message = getMessage(id); + } else { + message = new HandshakeMessage(); + } + + if (message == null) { // unknown message + buffer.resetReaderIndex(); + return msg; + } + message.read(buffer); + return message; + } + + /** + * Adds a new message header manager to this decoder + * + * @param newHeaderManager + * the new header manager + */ + public void addMessageHeader(PeerWireMessageHeaderManager newHeaderManager) { + headerManager = Arrays + .copyOf(headerManager, (headerManager.length + 1)); + headerManager[(headerManager.length - 1)] = newHeaderManager; + } + + /** + * Return the message represented by id. + *
+ * Iterate over all handlers and try to locate the message. Will + * return null if message id is unknown. + * + * @param id + * the id of the message + * @return the message + */ + private PeerWireReadableMessage getMessage(byte id) { + PeerWireReadableMessage message = null; + for (final PeerWireMessageHeaderManager handler : headerManager) { + if (handler == null) + continue; + message = handler.getMessage(id); + if (message != null) + break; + } + return message; + } +} diff --git a/src/main/java/net/torrent/protocol/peerwire/codec/PeerWireMessageEncoder.java b/src/main/java/net/torrent/protocol/peerwire/codec/PeerWireMessageEncoder.java new file mode 100644 index 0000000..7b3c387 --- /dev/null +++ b/src/main/java/net/torrent/protocol/peerwire/codec/PeerWireMessageEncoder.java @@ -0,0 +1,49 @@ +/* + * Copyright 2011 Rogiel Josias Sulzbach + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package net.torrent.protocol.peerwire.codec; + +import net.torrent.protocol.peerwire.message.HandshakeMessage; + +import org.jboss.netty.buffer.ChannelBuffer; +import org.jboss.netty.buffer.ChannelBuffers; +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.handler.codec.oneone.OneToOneEncoder; + +/** + * This encoder encodes {@link PeerWireWritableMessage} to a + * {@link ChannelBuffer}. + * + * @author Rogiel Josias Sulzbach + */ +public class PeerWireMessageEncoder extends OneToOneEncoder { + @Override + protected Object encode(ChannelHandlerContext ctx, Channel channel, + Object msg) throws Exception { + if (!(msg instanceof PeerWireWritableMessage)) + return msg; + final PeerWireWritableMessage message = (PeerWireWritableMessage) msg; + final ChannelBuffer buffer = ChannelBuffers + .buffer(message.length() + 4); + + if (!(message instanceof HandshakeMessage)) { + buffer.writeInt(0x00); + } + message.write(buffer); + + return buffer; + } +} diff --git a/src/main/java/net/torrent/protocol/peerwire/codec/PeerWireDecoder.java b/src/main/java/net/torrent/protocol/peerwire/codec/PeerWireOldDecoder.java similarity index 61% rename from src/main/java/net/torrent/protocol/peerwire/codec/PeerWireDecoder.java rename to src/main/java/net/torrent/protocol/peerwire/codec/PeerWireOldDecoder.java index 09d82eb..8e6f3ab 100644 --- a/src/main/java/net/torrent/protocol/peerwire/codec/PeerWireDecoder.java +++ b/src/main/java/net/torrent/protocol/peerwire/codec/PeerWireOldDecoder.java @@ -15,18 +15,13 @@ */ package net.torrent.protocol.peerwire.codec; -import net.torrent.protocol.peerwire.message.BitfieldMessage; -import net.torrent.protocol.peerwire.message.CancelMessage; -import net.torrent.protocol.peerwire.message.ChokeMessage; +import java.util.Arrays; + +import net.torrent.protocol.peerwire.PeerWireState; import net.torrent.protocol.peerwire.message.HandshakeMessage; -import net.torrent.protocol.peerwire.message.HaveMessage; -import net.torrent.protocol.peerwire.message.InterestedMessage; import net.torrent.protocol.peerwire.message.KeepAliveMessage; -import net.torrent.protocol.peerwire.message.NotInterestedMessage; -import net.torrent.protocol.peerwire.message.PieceMessage; -import net.torrent.protocol.peerwire.message.PortMessage; -import net.torrent.protocol.peerwire.message.RequestMessage; -import net.torrent.protocol.peerwire.message.UnchokeMessage; +import net.torrent.protocol.peerwire.message.header.PeerWireMessageHeaderManager; +import net.torrent.protocol.peerwire.message.header.PeerWireSpecificationMessageHeaderManager; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.channel.Channel; @@ -48,16 +43,39 @@ import org.jboss.netty.handler.codec.frame.FrameDecoder; * cached. * * @author Rogiel Josias Sulzbach + * @see PeerWireOldDecoder#state */ -public class PeerWireDecoder extends FrameDecoder { - private boolean handshaked = false; +@Deprecated +public class PeerWireOldDecoder extends FrameDecoder { + /** + * This connection state. This need to be shared with other encoders or + * decoders. But more importantly NEVER share the same instance + * across more than one {@link Channel}. + */ + private final PeerWireState state; + + /** + * The is an list of handlers that will create message instances for each + * message id passed as argument. + */ + private PeerWireMessageHeaderManager[] handlers = new PeerWireMessageHeaderManager[] { PeerWireSpecificationMessageHeaderManager.SHARED_INSTANCE }; + + /** + * Creates a new instance of this decoder + * + * @param state + * the connection state + */ + public PeerWireOldDecoder(final PeerWireState state) { + this.state = state; + } @Override protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception { buffer.markReaderIndex(); - if (!handshaked) { + if (!state.hasHandshaked()) { if (buffer.readableBytes() <= 47) // at least 47 bytes return null; @@ -70,7 +88,7 @@ public class PeerWireDecoder extends FrameDecoder { final HandshakeMessage message = new HandshakeMessage(); message.read(buffer); - handshaked = true; + state.setHandshaked(true); return message; } else { @@ -98,8 +116,21 @@ public class PeerWireDecoder extends FrameDecoder { } /** - * Return the message represented by id. Will return null if - * message id is unknown. + * Adds a new message handler to this decoder + * + * @param handler + * the handler + */ + public void addMessageHandler(PeerWireMessageHeaderManager handler) { + Arrays.copyOf(this.handlers, (this.handlers.length + 1)); + this.handlers[(this.handlers.length - 1)] = handler; + } + + /** + * Return the message represented by id. + *
+ * Iterate over all handlers and try to locate the message. Will + * return null if message id is unknown. * * @param id * the id of the message @@ -107,37 +138,12 @@ public class PeerWireDecoder extends FrameDecoder { */ private PeerWireReadableMessage getMessage(byte id) { PeerWireReadableMessage message = null; - switch (id) { - case BitfieldMessage.MESSAGE_ID: - message = new BitfieldMessage(); - break; - case CancelMessage.MESSAGE_ID: - message = new CancelMessage(); - break; - case ChokeMessage.MESSAGE_ID: - message = new ChokeMessage(); - break; - case HaveMessage.MESSAGE_ID: - message = new HaveMessage(); - break; - case InterestedMessage.MESSAGE_ID: - message = new InterestedMessage(); - break; - case NotInterestedMessage.MESSAGE_ID: - message = new NotInterestedMessage(); - break; - case PieceMessage.MESSAGE_ID: - message = new PieceMessage(); - break; - case PortMessage.MESSAGE_ID: - message = new PortMessage(); - break; - case RequestMessage.MESSAGE_ID: - message = new RequestMessage(); - break; - case UnchokeMessage.MESSAGE_ID: - message = new UnchokeMessage(); - break; + for (final PeerWireMessageHeaderManager handler : handlers) { + if (handler == null) + continue; + message = handler.getMessage(id); + if (message != null) + break; } return message; } diff --git a/src/main/java/net/torrent/protocol/peerwire/codec/PeerWireEncoder.java b/src/main/java/net/torrent/protocol/peerwire/codec/PeerWireOldEncoder.java similarity index 96% rename from src/main/java/net/torrent/protocol/peerwire/codec/PeerWireEncoder.java rename to src/main/java/net/torrent/protocol/peerwire/codec/PeerWireOldEncoder.java index d4534ad..1fccb44 100644 --- a/src/main/java/net/torrent/protocol/peerwire/codec/PeerWireEncoder.java +++ b/src/main/java/net/torrent/protocol/peerwire/codec/PeerWireOldEncoder.java @@ -29,7 +29,8 @@ import org.jboss.netty.handler.codec.oneone.OneToOneEncoder; * * @author Rogiel Josias Sulzbach */ -public class PeerWireEncoder extends OneToOneEncoder { +@Deprecated +public class PeerWireOldEncoder extends OneToOneEncoder { @Override protected Object encode(ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception { diff --git a/src/main/java/net/torrent/protocol/peerwire/codec/PeerWireWritableMessage.java b/src/main/java/net/torrent/protocol/peerwire/codec/PeerWireWritableMessage.java index cb70ae8..6dd5a18 100644 --- a/src/main/java/net/torrent/protocol/peerwire/codec/PeerWireWritableMessage.java +++ b/src/main/java/net/torrent/protocol/peerwire/codec/PeerWireWritableMessage.java @@ -33,4 +33,9 @@ public interface PeerWireWritableMessage { * @throws IOException */ void write(ChannelBuffer buffer) throws IOException; + + /** + * @return the length of the message + */ + int length(); } diff --git a/src/main/java/net/torrent/protocol/peerwire/handler/PeerWireAlgorithmHandler.java b/src/main/java/net/torrent/protocol/peerwire/handler/PeerWireAlgorithmHandler.java index 7016526..df0305c 100644 --- a/src/main/java/net/torrent/protocol/peerwire/handler/PeerWireAlgorithmHandler.java +++ b/src/main/java/net/torrent/protocol/peerwire/handler/PeerWireAlgorithmHandler.java @@ -37,6 +37,7 @@ import net.torrent.protocol.peerwire.message.PieceMessage; import net.torrent.protocol.peerwire.message.RequestMessage; import net.torrent.protocol.peerwire.message.UnchokeMessage; import net.torrent.protocol.peerwire.message.fast.AllowedFastMessage; +import net.torrent.protocol.peerwire.message.fast.RejectMessage; import net.torrent.protocol.peerwire.message.fast.SuggestPieceMessage; import net.torrent.torrent.Torrent; import net.torrent.torrent.TorrentPart; @@ -58,9 +59,15 @@ import org.jboss.netty.handler.timeout.IdleStateEvent; /** * Standard handler responsible for forwarding calls to {@link TorrentAlgorithm} * methods. This class handles low-level protocol specific behavior. + *
+ * The general guide line for this handler is to abstract ALL protocol + * specific models and use an abstracted algorithm for download. This will + * obviously limit the complexity of algorithm implementations. If that is the + * case, you are free to implement a new handler. * * @author Rogiel Josias Sulzbach */ +// TODO separate extensions handler from algorithm handler public class PeerWireAlgorithmHandler extends IdleStateAwareChannelHandler { /** * The torrent manager @@ -115,7 +122,7 @@ public class PeerWireAlgorithmHandler extends IdleStateAwareChannelHandler { peer.handshake(manager.getTorrent().getInfoHash().toByteArray(), "-TR2050-mcm14ye4h2mq".getBytes(), manager.getContext() .getCapabilites().toBitSet()); - peer.port((short) 1541); + // peer.port((short) 1541); super.channelConnected(ctx, e); } @@ -179,6 +186,12 @@ public class PeerWireAlgorithmHandler extends IdleStateAwareChannelHandler { final Torrent torrent = manager.getTorrent(); final TorrentPiece piece = torrent.getPiece(suggest.getPiece()); suggested(peer, piece); + } else if (msg instanceof RejectMessage) { + final RejectMessage request = (RejectMessage) msg; + final Torrent torrent = manager.getTorrent(); + final TorrentPart part = torrent.getPart(request.getIndex(), + request.getStart(), request.getLength()); + rejected(peer, part); } super.messageReceived(ctx, e); } @@ -213,7 +226,7 @@ public class PeerWireAlgorithmHandler extends IdleStateAwareChannelHandler { peer.interested(); return; case UNINTERESTED: - testChoke(peer); + peer.uninterested(); return; } } @@ -478,4 +491,27 @@ public class PeerWireAlgorithmHandler extends IdleStateAwareChannelHandler { return; download(peer, part); } -} \ No newline at end of file + + private void rejected(PeerWirePeer peer, TorrentPart part) { + switch (downloadAlgorithm.rejected(peer.getTorrentPeer(), part)) { + case DISCONNECT: + peer.disconnect(); + break; + case CONNECT_NEW_PEER: + peer.disconnect(); + connect(peerAlgorithm.connect()); + break; + case NOT_INTERESTED: + peer.uninterested(); + break; + case RETRY: + download(peer, part); + break; + case TRY_ANOTHER_PIECE: + final TorrentPart nextPart = downloadAlgorithm.getNextPart( + peer.getTorrentPeer(), part); + download(peer, nextPart); + break; + } + } +} diff --git a/src/main/java/net/torrent/protocol/peerwire/handler/PeerWireCodecHandler.java b/src/main/java/net/torrent/protocol/peerwire/handler/PeerWireCodecHandler.java new file mode 100644 index 0000000..4390bea --- /dev/null +++ b/src/main/java/net/torrent/protocol/peerwire/handler/PeerWireCodecHandler.java @@ -0,0 +1,87 @@ +/* + * Copyright 2011 Rogiel Josias Sulzbach + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package net.torrent.protocol.peerwire.handler; + +import net.torrent.protocol.peerwire.PeerWireState; +import net.torrent.protocol.peerwire.message.HandshakeMessage; + +import org.jboss.netty.channel.Channel; +import org.jboss.netty.channel.ChannelFuture; +import org.jboss.netty.channel.ChannelFutureListener; +import org.jboss.netty.channel.ChannelHandlerContext; +import org.jboss.netty.channel.ChannelStateEvent; +import org.jboss.netty.channel.MessageEvent; +import org.jboss.netty.channel.SimpleChannelHandler; + +/** + * Handler used to manage the codecs' state + * + * @author Rogiel Josias Sulzbach + */ +public class PeerWireCodecHandler extends SimpleChannelHandler { + /** + * This connection state. This need to be shared with other encoders, + * decoders or handlers. But more importantly NEVER share the same + * instance across more than one {@link Channel}. + */ + private final PeerWireState state; + + /** + * Creates a new instance + * + * @param state + * the connection state + */ + public PeerWireCodecHandler(final PeerWireState state) { + this.state = state; + } + + @Override + public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) + throws Exception { + if (e.getMessage() instanceof HandshakeMessage) { + if (state.hasPeerHandshaked()) + throw new IllegalStateException( + "Peer is trying to handshaked twice"); + state.setPeerHandshaked(true); + } + super.messageReceived(ctx, e); + } + + @Override + public void writeRequested(ChannelHandlerContext ctx, MessageEvent e) + throws Exception { + if (e.getMessage() instanceof HandshakeMessage) { + if (state.hasHandshaked()) + throw new IllegalStateException("Handshake has already been sent"); + e.getFuture().addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture future) + throws Exception { + state.setHandshaked(true); + } + }); + } + super.writeRequested(ctx, e); + } + + @Override + public void channelDisconnected(ChannelHandlerContext ctx, + ChannelStateEvent e) throws Exception { + state.setHandshaked(false); + state.setPeerHandshaked(false); + } +} \ No newline at end of file diff --git a/src/main/java/net/torrent/protocol/peerwire/manager/ConnectionManager.java b/src/main/java/net/torrent/protocol/peerwire/manager/ConnectionManager.java index 42ee6f7..a2c444c 100644 --- a/src/main/java/net/torrent/protocol/peerwire/manager/ConnectionManager.java +++ b/src/main/java/net/torrent/protocol/peerwire/manager/ConnectionManager.java @@ -25,7 +25,14 @@ import net.torrent.torrent.context.TorrentContext; import org.jboss.netty.channel.Channel; /** - * Connection manager: manages active and inactive connections. + * Connection manager: keep control over active and inactive {@link Channel + * channel} connections. + *
+ * Please note that the manager actually does not make any decision nor create + * or block an connection. + *
+ * You can {@link Iterable iterate} over this manager to get active + * {@link Channel} instances. * * @author Rogiel Josias Sulzbach */ diff --git a/src/main/java/net/torrent/protocol/peerwire/manager/DownloadManager.java b/src/main/java/net/torrent/protocol/peerwire/manager/DownloadManager.java index 9eecd4f..7ef9277 100644 --- a/src/main/java/net/torrent/protocol/peerwire/manager/DownloadManager.java +++ b/src/main/java/net/torrent/protocol/peerwire/manager/DownloadManager.java @@ -28,19 +28,60 @@ import net.torrent.torrent.TorrentPiece; import net.torrent.torrent.context.TorrentContext; import net.torrent.torrent.context.TorrentPeer; +/** + * The download manager keep an track over current parts being downloaded at the + * moment. + *
+ * Please note that the manager actually does not make any decision nor block an + * requested piece. + *
+ * You can {@link Iterable iterate} over this manager to get current
+ * downloading {@link TorrentPart parts}.
+ *
+ * @author Rogiel Josias Sulzbach
+ */
+// TODO track pieces which have been requested for some time but never got an
+// REJECT (fast peer extensions) nor PIECE message (standard torrent
+// implementation does nothing when the peer is not uploading the piece)
+// TODO allow more then one peer per piece request
public class DownloadManager implements Iterable
+ * Note that since this implementation is decoupled from any protocol
+ * implementation, it will not cancel the request! This piece might arrive
+ * some time later.
+ *
+ * @param peer
+ * the peer
+ * @return a set containing pieces removed
+ */
public Set
+ * Please note that the manager actually does not make any decision nor block an
+ * requested piece.
+ *
+ * You can {@link Iterable iterate} over this manager to get active
+ * {@link PeerWirePeer peers}.
+ */
public class PeerManager implements Iterable
+ * Please note that the manager actually does not make any decision nor declines
+ * an requested piece.
+ *
+ * You can {@link Iterable iterate} over this manager to get current
+ * uploading {@link TorrentPart parts}.
+ *
+ * FIXME BUG NOTICE: Note that this manager has an huge issue. When two
+ * or more peers request the very same part, only the latter will be maintained
+ * under control by this manager. This should be fixed soon and changes to the
+ * manager signature can occur. The affected methods are:
+ * {@link UploadManager#getPeer(TorrentPart)},
+ * {@link UploadManager#remove(TorrentPart)} and
+ * {@link UploadManager#getActiveUploads()}.
+ *
+ * @author Rogiel Josias Sulzbach
+ */
+// TODO track pieces which have been requested but the upload is not interesting
+// right now, but might be later.
+// FIXME this manager has an issue: if two peers request the same piece, only
+// the latter one will be managed! UploadManager#getPeer(TorrentPart part) needs
+// a fix.
public class UploadManager implements Iterable
+ * Note that this method has an huge issue: when two or more peers request
+ * the very same part (whichever piece), only the latter will be maintained
+ * under control by this manager. This need to be fixed soon and this method
+ * deprecated. You should avoid relying on it!
+ *
+ * @param part
+ * @return
+ */
+ public TorrentPeer getPeer(TorrentPart part) {
+ return activeParts.get(part);
}
+ /**
+ * Get the pieces being uploaded to the given peer
+ *
+ * @param peer
+ * the peer
+ * @return an {@link Set} of all parts being uploaded to this peer.
+ */
public Set
+ * Note that this method has an huge issue: when two or more peers request
+ * the very same part (whichever piece), only the latter will be maintained
+ * under control by this manager. This need to be fixed soon and this method
+ * deprecated. You should avoid relying on it!
+ *
+ * @param part
+ * the part
+ * @return the downloading peer for the part
+ */
+ public TorrentPeer remove(TorrentPart part) {
+ return activeParts.remove(part);
}
+ /**
+ * Removes all uploads for the given peer
+ *
+ * @param peer
+ * the peer
+ * @return the set of {@link TorrentPart parts} being uploaded to that peer
+ */
public Set
+ * Note that this method has an huge issue: when two or more peers request
+ * the very same part (whichever piece), only the latter will be maintained
+ * under control by this manager. This need to be fixed soon and this method
+ * deprecated. You should avoid relying on it!
+ *
+ * @return an {@link Map} of active uploads
+ */
public Map