1
0
mirror of https://github.com/Rogiel/torrent4j synced 2025-12-06 07:32:47 +00:00

merging refs/heads/master into HEAD

Change-Id: I0000000000000000000000000000000000000000
This commit is contained in:
rogiel
2011-05-01 23:59:49 -03:00
45 changed files with 1411 additions and 122 deletions

1
.gitignore vendored
View File

@@ -1,2 +1,3 @@
/target
/target
/store.bin

View File

@@ -27,6 +27,8 @@ import net.torrent.protocol.peerwire.PeerWireManager;
import net.torrent.protocol.peerwire.manager.TorrentManager;
import net.torrent.torrent.Torrent;
import net.torrent.torrent.context.TorrentContext;
import net.torrent.torrent.piece.PieceSelector;
import net.torrent.torrent.piece.RandomPieceSelector;
/**
* Factory class for {@link BitTorrentClient}.
@@ -55,6 +57,10 @@ public class BitTorrentClientFactory {
* The torrent algorithm
*/
private TorrentAlgorithm algorithm;
/**
* The piece selector
*/
private PieceSelector selector;
/**
* Creates a new standard {@link BitTorrentClient BitTorrent client}
@@ -78,10 +84,12 @@ public class BitTorrentClientFactory {
* the torrent
*/
public BitTorrentClientFactory(final Torrent torrent) {
context = new TorrentContext(torrent);
datastore = new PlainTorrentDatastore(new File("store.bin"));
manager = new TorrentManager(context, datastore);
algorithm = new TorrentStdAlgorithm(manager);
this.context = new TorrentContext(torrent);
this.datastore = new PlainTorrentDatastore(new File("store.bin"));
this.manager = new TorrentManager(context, datastore);
if (this.selector == null)
this.selector = new RandomPieceSelector(manager);
this.algorithm = new TorrentStdAlgorithm(manager, selector);
}
/**
@@ -110,10 +118,12 @@ public class BitTorrentClientFactory {
*/
public BitTorrentClientFactory(final Torrent torrent,
TorrentDatastore datastore) {
context = new TorrentContext(torrent);
this.context = new TorrentContext(torrent);
this.datastore = datastore;
manager = new TorrentManager(context, datastore);
algorithm = new TorrentStdAlgorithm(manager);
this.manager = new TorrentManager(context, datastore);
if (this.selector == null)
this.selector = new RandomPieceSelector(manager);
this.algorithm = new TorrentStdAlgorithm(manager, selector);
}
/**
@@ -128,9 +138,9 @@ public class BitTorrentClientFactory {
*/
public BitTorrentClientFactory(final Torrent torrent,
TorrentDatastore datastore, final TorrentAlgorithm algorithm) {
context = new TorrentContext(torrent);
this.context = new TorrentContext(torrent);
this.datastore = datastore;
manager = new TorrentManager(context, datastore);
this.manager = new TorrentManager(context, datastore);
this.algorithm = algorithm;
}

View File

@@ -64,6 +64,50 @@ public interface TorrentPieceDownloadAlgorithm {
*/
TorrentPart allowedFast(TorrentPeer peer, TorrentPiece piece);
/**
* Issued when the peer has rejected our request.
*
* @param peer
* the rejecting peer
* @param part
* the rejected part
* @see RejectAction
*/
RejectAction rejected(TorrentPeer peer, TorrentPart part);
public enum RejectAction {
/**
* Only disconnects the peer, does not initiate a new connection with
* anyone.
*/
DISCONNECT,
/**
* Disconnects the current peer and connects a new one
*/
CONNECT_NEW_PEER,
/**
* Choke this peer
*/
NOT_INTERESTED,
/**
* Request again the same part
*/
RETRY,
/**
* Try to download another piece
*/
TRY_ANOTHER_PIECE,
/**
* Do nothing, ignore. Might cause peer to become idle.
*/
IGNORE;
}
/**
* Test if an certain piece has all its parts already download. If true, a
* checksum will be performed and a message informing we have this piece

View File

@@ -37,14 +37,14 @@ public class TorrentStdAlgorithm implements TorrentAlgorithm {
private final TorrentPieceDownloadAlgorithm downloadAlgorithm;
private final TorrentPieceUploadAlgorithm uploadAlgorithm;
public TorrentStdAlgorithm(final TorrentManager manager) {
public TorrentStdAlgorithm(final TorrentManager manager,
final PieceSelector selector) {
pieceSelector = new ScoredPieceSelector(manager);
peerAlgorithm = new TorrentStdPeerAlgorithm(manager);
interestAlgorithm = new TorrentStdInterestAlgorithm(manager,
pieceSelector);
interestAlgorithm = new TorrentStdInterestAlgorithm(manager, selector);
downloadAlgorithm = new TorrentStdPieceDownloadAlgorithm(manager,
pieceSelector);
selector);
uploadAlgorithm = new TorrentStdPieceUploadAlgorithm(manager);
}

View File

@@ -28,8 +28,16 @@ import net.torrent.torrent.piece.PieceSelector;
* @author <a href="http://www.rogiel.com/">Rogiel Josias Sulzbach</a>
*/
public class TorrentStdInterestAlgorithm implements TorrentInterestAlgorithm {
/**
* The torrent manager
*/
@SuppressWarnings("unused")
private final TorrentManager manager;
/**
* This selector is used to find the next piece to be downloaded. Parts are
* managed inside this algorithm.
*/
private final PieceSelector selector;
/**
@@ -37,22 +45,20 @@ public class TorrentStdInterestAlgorithm implements TorrentInterestAlgorithm {
*
* @param manager
* the manager
* @param pieceSelector
* @param selector
* the piece selector
*/
public TorrentStdInterestAlgorithm(TorrentManager manager,
PieceSelector pieceSelector) {
PieceSelector selector) {
this.manager = manager;
this.selector = pieceSelector;
this.selector = selector;
}
@Override
public InterestState interested(TorrentPeer peer) {
int pieces = selector.countPieces(peer);
if(pieces >= 5)
if (pieces >= 5)
return InterestState.INTERESTED;
return InterestState.INTERESTED;
}

View File

@@ -32,6 +32,7 @@ import net.torrent.torrent.piece.PieceSelector;
*
* @author <a href="http://www.rogiel.com/">Rogiel Josias Sulzbach</a>
*/
// TODO separate standard algorithm from extension ones
public class TorrentStdPieceDownloadAlgorithm implements
TorrentPieceDownloadAlgorithm {
/**
@@ -60,11 +61,13 @@ public class TorrentStdPieceDownloadAlgorithm implements
* retrieve current downloads/uploads and connections.
* @param pieceSelector
* the piece selector
* @param selector
* the piece selector
*/
public TorrentStdPieceDownloadAlgorithm(TorrentManager manager,
PieceSelector pieceSelector) {
PieceSelector selector) {
this.manager = manager;
this.selector = pieceSelector;
this.selector = selector;
}
@Override
@@ -94,6 +97,11 @@ public class TorrentStdPieceDownloadAlgorithm implements
return piece.getFirstPart();
}
@Override
public RejectAction rejected(TorrentPeer peer, TorrentPart part) {
return RejectAction.TRY_ANOTHER_PIECE;
}
@Override
public boolean isComplete(TorrentPeer peer, TorrentPiece piece) {
if (manager.getContext().getBitfield().hasPiece(piece))

View File

@@ -16,8 +16,11 @@
package net.torrent.protocol.peerwire;
import static org.jboss.netty.channel.Channels.pipeline;
import net.torrent.protocol.peerwire.codec.PeerWireDecoder;
import net.torrent.protocol.peerwire.codec.PeerWireEncoder;
import net.torrent.protocol.peerwire.codec.PeerWireFrameDecoder;
import net.torrent.protocol.peerwire.codec.PeerWireFrameEncoder;
import net.torrent.protocol.peerwire.codec.PeerWireMessageDecoder;
import net.torrent.protocol.peerwire.codec.PeerWireMessageEncoder;
import net.torrent.protocol.peerwire.handler.PeerWireCodecHandler;
import net.torrent.protocol.peerwire.handler.PeerWireManagerHeadHandler;
import net.torrent.protocol.peerwire.handler.PeerWireManagerTailHandler;
import net.torrent.protocol.peerwire.manager.TorrentManager;
@@ -38,7 +41,8 @@ public class PeerWirePipelineFactory implements ChannelPipelineFactory {
* The logging handler
*/
private final LoggingHandler loggingHandler = new LoggingHandler(
InternalLogLevel.WARN);
InternalLogLevel.INFO);
/**
* The algorithm handler
*/
@@ -70,15 +74,28 @@ public class PeerWirePipelineFactory implements ChannelPipelineFactory {
@Override
public ChannelPipeline getPipeline() throws Exception {
final ChannelPipeline pipeline = pipeline();
final PeerWireState state = new PeerWireState();
// TODO create traffic shape handler
// TODO create firewall handler - block connections from unwanted peers
// TODO create traffic shape handler once Netty 4.0 is released
// TODO create firewall handler - block connections from unwanted peers.
pipeline.addLast("decoder", new PeerWireDecoder());
pipeline.addLast("encoder", new PeerWireEncoder());
// pipeline.addLast("old.decoder", new PeerWireOldDecoder(state));
// pipeline.addLast("old.encoder", new PeerWireOldEncoder());
// frame (or header) codec
pipeline.addLast("frame.decoder", new PeerWireFrameDecoder(state));
pipeline.addLast("frame.encoder", new PeerWireFrameEncoder(state));
// message codec
pipeline.addLast("message.decoder", new PeerWireMessageDecoder(state));
pipeline.addLast("message.encoder", new PeerWireMessageEncoder());
pipeline.addLast("codec.handler", new PeerWireCodecHandler(state));
// logging handler (before any other handler can take action)
pipeline.addLast("logging", loggingHandler);
// handlers
pipeline.addLast("head-handler", headManagerHandler);
pipeline.addLast("algorithm", algorithmHandler);
pipeline.addLast("tail-handler", tailManagerHandler);

View File

@@ -0,0 +1,75 @@
/*
* Copyright 2011 Rogiel Josias Sulzbach
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.torrent.protocol.peerwire;
import org.jboss.netty.channel.Channel;
/**
* Maintain an per {@link Channel} information about the connection state.
*
* @author <a href="http://www.rogiel.com/">Rogiel Josias Sulzbach</a>
*/
public class PeerWireState {
/**
* The peer handshake state
*/
private boolean peerHandshaked;
/**
* The handshake state
*/
private boolean handshaked;
/**
* Returns true if this connection has already been handshaked by the peer
* end.
*
* @return true if connection was handshaked
*/
public boolean hasPeerHandshaked() {
return peerHandshaked;
}
/**
* Set the handshake state of this connection by the peer end.
*
* @param peerHandshaked
* the handshake state
*/
public void setPeerHandshaked(boolean peerHandshaked) {
this.peerHandshaked = peerHandshaked;
}
/**
* Returns true if this connection has already been handshaked by the
* library end.
*
* @return true if connection was handshaked
*/
public boolean hasHandshaked() {
return handshaked;
}
/**
* Set the handshake state of this connection by the library end.
*
* @param handshaked
* the handshake state
*/
public void setHandshaked(boolean handshaked) {
this.handshaked = handshaked;
}
}

View File

@@ -0,0 +1,96 @@
/*
* Copyright 2011 Rogiel Josias Sulzbach
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.torrent.protocol.peerwire.codec;
import net.torrent.protocol.peerwire.PeerWireState;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.frame.CorruptedFrameException;
import org.jboss.netty.handler.codec.frame.FrameDecoder;
/**
* BitTorrent has two types of message headers:
* <p>
* <h1>Handshake message</h1> Handshake messages are composed of 1 byte, which
* indicates the size of protocol string ("BitTorrent protocol").
* <p>
* <h1>Messages</h1> All other messages are 4-byte integer containing the
* message length, an 1 byte opcode followed by the message content.
* <p>
* <p>
* Instances of this class keep channel state content and must not be shared nor
* cached.
*
* @author <a href="http://www.rogiel.com/">Rogiel Josias Sulzbach</a>
* @see PeerWireFrameDecoder#state
*/
public class PeerWireFrameDecoder extends FrameDecoder {
/**
* This connection state. This need to be shared with other encoders,
* decoders or handlers. But more importantly <b>NEVER</b> share the same
* instance across more than one {@link Channel}.
*/
private final PeerWireState state;
/**
* Creates a new instance of this decoder
*
* @param state
* the connection state
*/
public PeerWireFrameDecoder(final PeerWireState state) {
this.state = state;
}
@Override
protected Object decode(ChannelHandlerContext ctx, Channel channel,
ChannelBuffer buffer) throws Exception {
buffer.markReaderIndex();
final int index = buffer.readerIndex();
if (state.hasPeerHandshaked()) {
if (buffer.readableBytes() <= 4)
return null;
int len = buffer.readInt();
if (len == 0) {
// keep-alive message
return ChannelBuffers.EMPTY_BUFFER;
} else if (buffer.readableBytes() < len) {
buffer.resetReaderIndex();
return null;
}
buffer.skipBytes(len);
return buffer.slice(index + 4, len);
} else {
if (buffer.readableBytes() <= 1) // at least 1 byte for header
return null;
final int pstrlen = buffer.readByte();
if (pstrlen != 19)
throw new CorruptedFrameException(
"Handshake frame is corrupted. pstrlen != 19");
buffer.resetReaderIndex();
if (buffer.readableBytes() < pstrlen + 49) {
return null;
}
buffer.skipBytes(pstrlen + 49);
return buffer.slice(index, pstrlen + 49);
}
}
}

View File

@@ -0,0 +1,75 @@
/*
* Copyright 2011 Rogiel Josias Sulzbach
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.torrent.protocol.peerwire.codec;
import net.torrent.protocol.peerwire.PeerWireState;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.frame.CorruptedFrameException;
import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
/**
* BitTorrent has two types of message headers:
* <p>
* <h1>Handshake message</h1> Handshake messages are composed of 1 byte, which
* indicates the size of protocol string ("BitTorrent protocol").
* <p>
* <h1>Messages</h1> All other messages are 4-byte integer containing the
* message length, an 1 byte opcode followed by the message content.
* <p>
* <p>
* Instances of this class keep channel state content and must not be shared nor
* cached.
*
* @author <a href="http://www.rogiel.com/">Rogiel Josias Sulzbach</a>
* @see PeerWireFrameDecoder#state
*/
public class PeerWireFrameEncoder extends OneToOneEncoder {
/**
* This connection state. This need to be shared with other encoders,
* decoders or handlers. But more importantly <b>NEVER</b> share the same
* instance across more than one {@link Channel}.
*/
private final PeerWireState state;
/**
* Creates a new instance of this encoder
*
* @param state
* the connection state
*/
public PeerWireFrameEncoder(PeerWireState state) {
this.state = state;
}
@Override
protected Object encode(ChannelHandlerContext ctx, Channel channel,
Object msg) throws Exception {
if (!(msg instanceof ChannelBuffer))
return msg;
if (!state.hasHandshaked())
// on handshake message there is no visible "frame"
return msg;
ChannelBuffer buffer = (ChannelBuffer) msg;
if (buffer.readableBytes() < 4)
throw new CorruptedFrameException(
"A frame must have at least 4 bytes!");
buffer.setInt(0, buffer.readableBytes() - 4);
return buffer;
}
}

View File

@@ -0,0 +1,128 @@
/*
* Copyright 2011 Rogiel Josias Sulzbach
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.torrent.protocol.peerwire.codec;
import java.util.Arrays;
import net.torrent.protocol.peerwire.PeerWireState;
import net.torrent.protocol.peerwire.message.HandshakeMessage;
import net.torrent.protocol.peerwire.message.KeepAliveMessage;
import net.torrent.protocol.peerwire.message.header.PeerWireFastPeersMessageHeaderManager;
import net.torrent.protocol.peerwire.message.header.PeerWireMessageHeaderManager;
import net.torrent.protocol.peerwire.message.header.PeerWireSpecificationMessageHeaderManager;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.oneone.OneToOneDecoder;
/**
* This decoder decodes a {@link ChannelBuffer} into an
* {@link PeerWireReadableMessage}.
*
* @author <a href="http://www.rogiel.com/">Rogiel Josias Sulzbach</a>
* @see PeerWireMessageDecoder#state
*/
public class PeerWireMessageDecoder extends OneToOneDecoder {
/**
* This connection state. This need to be shared with other encoders,
* decoders or handlers. But more importantly <b>NEVER</b> share the same
* instance across more than one {@link Channel}.
*/
private final PeerWireState state;
/**
* The is an list of header managers that will create message instances for
* each message id passed as argument.
* <p>
* {@link PeerWireSpecificationMessageHeaderManager} and
* {@link PeerWireFastPeersMessageHeaderManager} are already in the list.
*/
private PeerWireMessageHeaderManager[] headerManager = new PeerWireMessageHeaderManager[] {
PeerWireSpecificationMessageHeaderManager.SHARED_INSTANCE,
PeerWireFastPeersMessageHeaderManager.SHARED_INSTANCE };
/**
* Creates a new instance of this decoder
*
* @param state
* the connection state
*/
public PeerWireMessageDecoder(PeerWireState state) {
this.state = state;
}
@Override
protected Object decode(ChannelHandlerContext ctx, Channel channel,
Object msg) throws Exception {
if (!(msg instanceof ChannelBuffer))
return msg;
final ChannelBuffer buffer = (ChannelBuffer) msg;
if (buffer.readableBytes() == 0)
return new KeepAliveMessage();
final PeerWireReadableMessage message;
if (state.hasPeerHandshaked()) {
buffer.markReaderIndex();
final byte id = buffer.readByte();
message = getMessage(id);
} else {
message = new HandshakeMessage();
}
if (message == null) { // unknown message
buffer.resetReaderIndex();
return msg;
}
message.read(buffer);
return message;
}
/**
* Adds a new message header manager to this decoder
*
* @param newHeaderManager
* the new header manager
*/
public void addMessageHeader(PeerWireMessageHeaderManager newHeaderManager) {
headerManager = Arrays
.copyOf(headerManager, (headerManager.length + 1));
headerManager[(headerManager.length - 1)] = newHeaderManager;
}
/**
* Return the message represented by <tt>id</tt>.
* <p>
* Iterate over all <tt>handlers</tt> and try to locate the message. Will
* return null if message id is unknown.
*
* @param id
* the id of the message
* @return the message
*/
private PeerWireReadableMessage getMessage(byte id) {
PeerWireReadableMessage message = null;
for (final PeerWireMessageHeaderManager handler : headerManager) {
if (handler == null)
continue;
message = handler.getMessage(id);
if (message != null)
break;
}
return message;
}
}

View File

@@ -0,0 +1,49 @@
/*
* Copyright 2011 Rogiel Josias Sulzbach
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.torrent.protocol.peerwire.codec;
import net.torrent.protocol.peerwire.message.HandshakeMessage;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
/**
* This encoder encodes {@link PeerWireWritableMessage} to a
* {@link ChannelBuffer}.
*
* @author <a href="http://www.rogiel.com/">Rogiel Josias Sulzbach</a>
*/
public class PeerWireMessageEncoder extends OneToOneEncoder {
@Override
protected Object encode(ChannelHandlerContext ctx, Channel channel,
Object msg) throws Exception {
if (!(msg instanceof PeerWireWritableMessage))
return msg;
final PeerWireWritableMessage message = (PeerWireWritableMessage) msg;
final ChannelBuffer buffer = ChannelBuffers
.buffer(message.length() + 4);
if (!(message instanceof HandshakeMessage)) {
buffer.writeInt(0x00);
}
message.write(buffer);
return buffer;
}
}

View File

@@ -15,18 +15,13 @@
*/
package net.torrent.protocol.peerwire.codec;
import net.torrent.protocol.peerwire.message.BitfieldMessage;
import net.torrent.protocol.peerwire.message.CancelMessage;
import net.torrent.protocol.peerwire.message.ChokeMessage;
import java.util.Arrays;
import net.torrent.protocol.peerwire.PeerWireState;
import net.torrent.protocol.peerwire.message.HandshakeMessage;
import net.torrent.protocol.peerwire.message.HaveMessage;
import net.torrent.protocol.peerwire.message.InterestedMessage;
import net.torrent.protocol.peerwire.message.KeepAliveMessage;
import net.torrent.protocol.peerwire.message.NotInterestedMessage;
import net.torrent.protocol.peerwire.message.PieceMessage;
import net.torrent.protocol.peerwire.message.PortMessage;
import net.torrent.protocol.peerwire.message.RequestMessage;
import net.torrent.protocol.peerwire.message.UnchokeMessage;
import net.torrent.protocol.peerwire.message.header.PeerWireMessageHeaderManager;
import net.torrent.protocol.peerwire.message.header.PeerWireSpecificationMessageHeaderManager;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
@@ -48,16 +43,39 @@ import org.jboss.netty.handler.codec.frame.FrameDecoder;
* cached.
*
* @author <a href="http://www.rogiel.com/">Rogiel Josias Sulzbach</a>
* @see PeerWireOldDecoder#state
*/
public class PeerWireDecoder extends FrameDecoder {
private boolean handshaked = false;
@Deprecated
public class PeerWireOldDecoder extends FrameDecoder {
/**
* This connection state. This need to be shared with other encoders or
* decoders. But more importantly <b>NEVER</b> share the same instance
* across more than one {@link Channel}.
*/
private final PeerWireState state;
/**
* The is an list of handlers that will create message instances for each
* message id passed as argument.
*/
private PeerWireMessageHeaderManager[] handlers = new PeerWireMessageHeaderManager[] { PeerWireSpecificationMessageHeaderManager.SHARED_INSTANCE };
/**
* Creates a new instance of this decoder
*
* @param state
* the connection state
*/
public PeerWireOldDecoder(final PeerWireState state) {
this.state = state;
}
@Override
protected Object decode(ChannelHandlerContext ctx, Channel channel,
ChannelBuffer buffer) throws Exception {
buffer.markReaderIndex();
if (!handshaked) {
if (!state.hasHandshaked()) {
if (buffer.readableBytes() <= 47) // at least 47 bytes
return null;
@@ -70,7 +88,7 @@ public class PeerWireDecoder extends FrameDecoder {
final HandshakeMessage message = new HandshakeMessage();
message.read(buffer);
handshaked = true;
state.setHandshaked(true);
return message;
} else {
@@ -98,8 +116,21 @@ public class PeerWireDecoder extends FrameDecoder {
}
/**
* Return the message represented by <tt>id</tt>. Will return null if
* message id is unknown.
* Adds a new message handler to this decoder
*
* @param handler
* the handler
*/
public void addMessageHandler(PeerWireMessageHeaderManager handler) {
Arrays.copyOf(this.handlers, (this.handlers.length + 1));
this.handlers[(this.handlers.length - 1)] = handler;
}
/**
* Return the message represented by <tt>id</tt>.
* <p>
* Iterate over all <tt>handlers</tt> and try to locate the message. Will
* return null if message id is unknown.
*
* @param id
* the id of the message
@@ -107,36 +138,11 @@ public class PeerWireDecoder extends FrameDecoder {
*/
private PeerWireReadableMessage getMessage(byte id) {
PeerWireReadableMessage message = null;
switch (id) {
case BitfieldMessage.MESSAGE_ID:
message = new BitfieldMessage();
break;
case CancelMessage.MESSAGE_ID:
message = new CancelMessage();
break;
case ChokeMessage.MESSAGE_ID:
message = new ChokeMessage();
break;
case HaveMessage.MESSAGE_ID:
message = new HaveMessage();
break;
case InterestedMessage.MESSAGE_ID:
message = new InterestedMessage();
break;
case NotInterestedMessage.MESSAGE_ID:
message = new NotInterestedMessage();
break;
case PieceMessage.MESSAGE_ID:
message = new PieceMessage();
break;
case PortMessage.MESSAGE_ID:
message = new PortMessage();
break;
case RequestMessage.MESSAGE_ID:
message = new RequestMessage();
break;
case UnchokeMessage.MESSAGE_ID:
message = new UnchokeMessage();
for (final PeerWireMessageHeaderManager handler : handlers) {
if (handler == null)
continue;
message = handler.getMessage(id);
if (message != null)
break;
}
return message;

View File

@@ -29,7 +29,8 @@ import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;
*
* @author <a href="http://www.rogiel.com/">Rogiel Josias Sulzbach</a>
*/
public class PeerWireEncoder extends OneToOneEncoder {
@Deprecated
public class PeerWireOldEncoder extends OneToOneEncoder {
@Override
protected Object encode(ChannelHandlerContext ctx, Channel channel,
Object msg) throws Exception {

View File

@@ -33,4 +33,9 @@ public interface PeerWireWritableMessage {
* @throws IOException
*/
void write(ChannelBuffer buffer) throws IOException;
/**
* @return the length of the message
*/
int length();
}

View File

@@ -37,6 +37,7 @@ import net.torrent.protocol.peerwire.message.PieceMessage;
import net.torrent.protocol.peerwire.message.RequestMessage;
import net.torrent.protocol.peerwire.message.UnchokeMessage;
import net.torrent.protocol.peerwire.message.fast.AllowedFastMessage;
import net.torrent.protocol.peerwire.message.fast.RejectMessage;
import net.torrent.protocol.peerwire.message.fast.SuggestPieceMessage;
import net.torrent.torrent.Torrent;
import net.torrent.torrent.TorrentPart;
@@ -58,9 +59,15 @@ import org.jboss.netty.handler.timeout.IdleStateEvent;
/**
* Standard handler responsible for forwarding calls to {@link TorrentAlgorithm}
* methods. This class handles low-level protocol specific behavior.
* <p>
* The general guide line for this handler is to abstract <b>ALL</b> protocol
* specific models and use an abstracted algorithm for download. This will
* obviously limit the complexity of algorithm implementations. If that is the
* case, you are free to implement a new handler.
*
* @author <a href="http://www.rogiel.com/">Rogiel Josias Sulzbach</a>
*/
// TODO separate extensions handler from algorithm handler
public class PeerWireAlgorithmHandler extends IdleStateAwareChannelHandler {
/**
* The torrent manager
@@ -115,7 +122,7 @@ public class PeerWireAlgorithmHandler extends IdleStateAwareChannelHandler {
peer.handshake(manager.getTorrent().getInfoHash().toByteArray(),
"-TR2050-mcm14ye4h2mq".getBytes(), manager.getContext()
.getCapabilites().toBitSet());
peer.port((short) 1541);
// peer.port((short) 1541);
super.channelConnected(ctx, e);
}
@@ -179,6 +186,12 @@ public class PeerWireAlgorithmHandler extends IdleStateAwareChannelHandler {
final Torrent torrent = manager.getTorrent();
final TorrentPiece piece = torrent.getPiece(suggest.getPiece());
suggested(peer, piece);
} else if (msg instanceof RejectMessage) {
final RejectMessage request = (RejectMessage) msg;
final Torrent torrent = manager.getTorrent();
final TorrentPart part = torrent.getPart(request.getIndex(),
request.getStart(), request.getLength());
rejected(peer, part);
}
super.messageReceived(ctx, e);
}
@@ -213,7 +226,7 @@ public class PeerWireAlgorithmHandler extends IdleStateAwareChannelHandler {
peer.interested();
return;
case UNINTERESTED:
testChoke(peer);
peer.uninterested();
return;
}
}
@@ -478,4 +491,27 @@ public class PeerWireAlgorithmHandler extends IdleStateAwareChannelHandler {
return;
download(peer, part);
}
private void rejected(PeerWirePeer peer, TorrentPart part) {
switch (downloadAlgorithm.rejected(peer.getTorrentPeer(), part)) {
case DISCONNECT:
peer.disconnect();
break;
case CONNECT_NEW_PEER:
peer.disconnect();
connect(peerAlgorithm.connect());
break;
case NOT_INTERESTED:
peer.uninterested();
break;
case RETRY:
download(peer, part);
break;
case TRY_ANOTHER_PIECE:
final TorrentPart nextPart = downloadAlgorithm.getNextPart(
peer.getTorrentPeer(), part);
download(peer, nextPart);
break;
}
}
}

View File

@@ -0,0 +1,87 @@
/*
* Copyright 2011 Rogiel Josias Sulzbach
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.torrent.protocol.peerwire.handler;
import net.torrent.protocol.peerwire.PeerWireState;
import net.torrent.protocol.peerwire.message.HandshakeMessage;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.ChannelFutureListener;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelHandler;
/**
* Handler used to manage the codecs' state
*
* @author <a href="http://www.rogiel.com/">Rogiel Josias Sulzbach</a>
*/
public class PeerWireCodecHandler extends SimpleChannelHandler {
/**
* This connection state. This need to be shared with other encoders,
* decoders or handlers. But more importantly <b>NEVER</b> share the same
* instance across more than one {@link Channel}.
*/
private final PeerWireState state;
/**
* Creates a new instance
*
* @param state
* the connection state
*/
public PeerWireCodecHandler(final PeerWireState state) {
this.state = state;
}
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
throws Exception {
if (e.getMessage() instanceof HandshakeMessage) {
if (state.hasPeerHandshaked())
throw new IllegalStateException(
"Peer is trying to handshaked twice");
state.setPeerHandshaked(true);
}
super.messageReceived(ctx, e);
}
@Override
public void writeRequested(ChannelHandlerContext ctx, MessageEvent e)
throws Exception {
if (e.getMessage() instanceof HandshakeMessage) {
if (state.hasHandshaked())
throw new IllegalStateException("Handshake has already been sent");
e.getFuture().addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future)
throws Exception {
state.setHandshaked(true);
}
});
}
super.writeRequested(ctx, e);
}
@Override
public void channelDisconnected(ChannelHandlerContext ctx,
ChannelStateEvent e) throws Exception {
state.setHandshaked(false);
state.setPeerHandshaked(false);
}
}

View File

@@ -25,7 +25,14 @@ import net.torrent.torrent.context.TorrentContext;
import org.jboss.netty.channel.Channel;
/**
* Connection manager: manages active and inactive connections.
* Connection manager: keep control over active and inactive {@link Channel
* channel} connections.
* <p>
* Please note that the manager actually does not make any decision nor create
* or block an connection.
* <p>
* You can {@link Iterable iterate} over this manager to get <b>active</b>
* {@link Channel} instances.
*
* @author <a href="http://www.rogiel.com/">Rogiel Josias Sulzbach</a>
*/

View File

@@ -28,19 +28,60 @@ import net.torrent.torrent.TorrentPiece;
import net.torrent.torrent.context.TorrentContext;
import net.torrent.torrent.context.TorrentPeer;
/**
* The download manager keep an track over current parts being downloaded at the
* moment.
* <p>
* Please note that the manager actually does not make any decision nor block an
* requested piece.
* <p>
* You can {@link Iterable iterate} over this manager to get <b>current</b>
* downloading {@link TorrentPart parts}.
*
* @author <a href="http://www.rogiel.com/">Rogiel Josias Sulzbach</a>
*/
// TODO track pieces which have been requested for some time but never got an
// REJECT (fast peer extensions) nor PIECE message (standard torrent
// implementation does nothing when the peer is not uploading the piece)
// TODO allow more then one peer per piece request
public class DownloadManager implements Iterable<TorrentPart> {
/**
* The torrent context
*/
private final TorrentContext context;
/**
* The requested pieces not yet attended.
*/
private final Map<TorrentPart, TorrentPeer> activeParts = new HashMap<TorrentPart, TorrentPeer>();
/**
* Creates a new instance
*
* @param context
* the torrent context
*/
public DownloadManager(TorrentContext context) {
this.context = context;
}
public boolean isDownloading(TorrentPart torrentPart) {
return activeParts.containsKey(torrentPart);
/**
* Check if the given <tt>part</tt> has been requested.
*
* @param part
* the torrent part
* @return true if request message have been sent
*/
public boolean isDownloading(TorrentPart part) {
return activeParts.containsKey(part);
}
/**
* Test if the given <tt>piece</tt> has been requested.
*
* @param piece
* the piece
* @return true if request message have been sent
*/
public boolean isDownloading(TorrentPiece piece) {
for (final TorrentPart part : activeParts.keySet()) {
if (part.getPiece().equals(piece))
@@ -49,14 +90,35 @@ public class DownloadManager implements Iterable<TorrentPart> {
return false;
}
/**
* Check if the given <tt>peer</tt> is uploading something.
*
* @param peer
* the peer
* @return true if downloading from this peer
*/
public boolean isDownloading(TorrentPeer peer) {
return activeParts.containsValue(peer);
}
public TorrentPeer getPeer(TorrentPart torrentPart) {
return activeParts.get(torrentPart);
/**
* Get the peer uploading <tt>part</tt>
*
* @param part
* the part
* @return the peer uploading the part
*/
public TorrentPeer getPeer(TorrentPart part) {
return activeParts.get(part);
}
/**
* Get all parts being downloaded from the given <tt>peer</tt>
*
* @param peer
* the peer
* @return a set of pieces
*/
public Set<TorrentPart> getTorrentParts(TorrentPeer peer) {
final Set<TorrentPart> parts = new HashSet<TorrentPart>();
for (final Entry<TorrentPart, TorrentPeer> entry : activeParts
@@ -67,18 +129,50 @@ public class DownloadManager implements Iterable<TorrentPart> {
return parts;
}
/**
* Test if there are no active pieces requests.
*
* @return true if no pending parts
*/
public boolean isInactive() {
return activeParts.isEmpty();
}
public TorrentPeer add(TorrentPart torrentPart, TorrentPeer peer) {
return activeParts.put(torrentPart, peer);
/**
* Add a new part request
*
* @param part
* the part
* @param peer
* the remote peer
* @return the peer (java collections thing)
*/
public TorrentPeer add(TorrentPart part, TorrentPeer peer) {
return activeParts.put(part, peer);
}
public TorrentPeer remove(TorrentPart torrentPart) {
return activeParts.remove(torrentPart);
/**
* Removes an part from this manager.
*
* @param part
* the part
* @return true if part was present before remove
*/
public TorrentPeer remove(TorrentPart part) {
return activeParts.remove(part);
}
/**
* Remove all pieces request from the given <tt>peer</tt>.
* <p>
* Note that since this implementation is decoupled from any protocol
* implementation, it will not cancel the request! This piece might arrive
* some time later.
*
* @param peer
* the peer
* @return a set containing pieces removed
*/
public Set<TorrentPart> remove(TorrentPeer peer) {
final Set<TorrentPart> parts = new HashSet<TorrentPart>();
for (TorrentPart part : getTorrentParts(peer)) {
@@ -88,10 +182,20 @@ public class DownloadManager implements Iterable<TorrentPart> {
return parts;
}
/**
* Get the current active requests
*
* @return the current active requests
*/
public int getActiveDownloadsCount() {
return activeParts.size();
}
/**
* Get an map containing each piece mapped to the peer uploading it.
*
* @return the map containing each piece mapped to the peer uploading it.
*/
public Map<TorrentPart, TorrentPeer> getActiveDownloads() {
return Collections.unmodifiableMap(activeParts);
}
@@ -101,6 +205,11 @@ public class DownloadManager implements Iterable<TorrentPart> {
return activeParts.keySet().iterator();
}
/**
* Get the torrent context
*
* @return the torrent context
*/
public TorrentContext getContext() {
return context;
}

View File

@@ -29,19 +29,56 @@ import net.torrent.util.PeerWirePeerCallback;
import org.jboss.netty.channel.Channel;
/**
* The peer manager is used to keep control over active peers and they
* {@link Channel Netty Channel} used to write or read messages.
* <p>
* Please note that the manager actually does not make any decision nor block an
* requested piece.
* <p>
* You can {@link Iterable iterate} over this manager to get <b>active</b>
* {@link PeerWirePeer peers}.
*/
public class PeerManager implements Iterable<PeerWirePeer> {
/**
* The torrent context
*/
private final TorrentContext context;
/**
* The {@link ConnectionManager} instance
*/
private final ConnectionManager connectionManager;
/**
* The map of active channel-peer mapping
*/
private final Map<Channel, PeerWirePeer> activePeers = new HashMap<Channel, PeerWirePeer>();
/**
* The map of inactive channel-peer mapping
*/
private final Map<Channel, PeerWirePeer> inactivePeers = new HashMap<Channel, PeerWirePeer>();
/**
* Creates a new instance
*
* @param context
* the torrent context
* @param connectionManager
* the connection manager instance
*/
public PeerManager(TorrentContext context,
ConnectionManager connectionManager) {
this.context = context;
this.connectionManager = connectionManager;
}
/**
* Tests if the given <tt>channel</tt> has an peer attached to it.
*
* @param channel
* the channel
* @return true if an peer is attached
*/
public boolean contains(Channel channel) {
if (activePeers.containsKey(channel))
return true;
@@ -50,6 +87,15 @@ public class PeerManager implements Iterable<PeerWirePeer> {
return false;
}
/**
* Tests if the current {@link PeerWirePeer} is registered in this manager.
* You will normally not have access to {@link PeerWirePeer} object, thus
* whis method might not be very useful outside handlers.
*
* @param peer
* the {@link PeerWirePeer} peer
* @return true if this <tt>peer</tt> is registered in this manager
*/
public boolean contains(PeerWirePeer peer) {
if (activePeers.containsValue(peer))
return true;
@@ -58,6 +104,13 @@ public class PeerManager implements Iterable<PeerWirePeer> {
return false;
}
/**
* Get the {@link PeerWirePeer} registered in the given <tt>channel</tt>.
*
* @param channel
* the channel
* @return the peer instance.
*/
public PeerWirePeer getPeer(Channel channel) {
PeerWirePeer peer = activePeers.get(channel);
if (peer == null)
@@ -65,6 +118,13 @@ public class PeerManager implements Iterable<PeerWirePeer> {
return peer;
}
/**
* Lookup for the {@link Channel} in which the <tt>peer</tt> is attached to.
*
* @param peer
* the peer
* @return the {@link Channel} for the given <tt>peer</tt>
*/
public Channel getChannel(PeerWirePeer peer) {
for (final Entry<Channel, PeerWirePeer> entry : activePeers.entrySet()) {
if (entry.getValue().equals(peer))
@@ -78,10 +138,24 @@ public class PeerManager implements Iterable<PeerWirePeer> {
return null;
}
/**
* Test if there are no active peers in this manager.
*
* @return true if no active peers
*/
public boolean isEmpty() {
return activePeers.isEmpty();
}
/**
* Adds a new peer to this manager.
*
* @param channel
* the channel
* @param peer
* the peer
* @return the {@link PeerWirePeer} created instance.
*/
public PeerWirePeer add(Channel channel, TorrentPeer peer) {
if (channel.isConnected()) {
return activePeers.put(channel, new PeerWirePeer(channel, peer));
@@ -90,6 +164,13 @@ public class PeerManager implements Iterable<PeerWirePeer> {
}
}
/**
* Removes an {@link Channel} and its {@link TorrentPeer} from this manager.
*
* @param channel
* the channel
* @return the, now removed, {@link PeerWirePeer} instance
*/
public PeerWirePeer remove(Channel channel) {
PeerWirePeer peer;
if ((peer = activePeers.remove(channel)) != null)
@@ -99,6 +180,13 @@ public class PeerManager implements Iterable<PeerWirePeer> {
return null;
}
/**
* Removes an {@link PeerWirePeer} from this manager.
*
* @param peer
* the peer
* @return the, now removed, {@link PeerWirePeer} instance
*/
public PeerWirePeer remove(PeerWirePeer peer) {
final Channel channel = getChannel(peer);
PeerWirePeer peerRemoved;
@@ -109,6 +197,13 @@ public class PeerManager implements Iterable<PeerWirePeer> {
return null;
}
/**
* Updates this {@link Channel} peer state (i.e. active or inactive)
*
* @param channel
* the channel
* @return the {@link PeerWirePeer} instance updated
*/
public PeerWirePeer update(Channel channel) {
PeerWirePeer peer;
if ((peer = remove(channel)) == null)
@@ -116,30 +211,66 @@ public class PeerManager implements Iterable<PeerWirePeer> {
return add(channel, peer.getTorrentPeer());
}
/**
* Get the total active peers
*
* @return the active peers count
*/
public int getActivePeersCount() {
return activePeers.size();
}
/**
* Get the total inactive peers
*
* @return the inactive peers count
*/
public int getImactivePeersCount() {
return activePeers.size();
}
/**
* Get an {@link Map} of all active peers
*
* @return the {@link Map} of all active peers
*/
public Map<Channel, PeerWirePeer> getActivePeers() {
return Collections.unmodifiableMap(activePeers);
}
/**
* Get an {@link Map} of all inactive peers
*
* @return the {@link Map} of all inactive peers
*/
public Map<Channel, PeerWirePeer> getInactivePeers() {
return Collections.unmodifiableMap(inactivePeers);
}
/**
* Get an {@link Set} of all active {@link Channel channels}
*
* @return the {@link Set} of all active {@link Channel channels}
*/
public Set<Channel> getActiveChannels() {
return Collections.unmodifiableSet(activePeers.keySet());
}
/**
* Get an {@link Set} of all inactive {@link Channel channels}
*
* @return the {@link Set} of all inactive {@link Channel channels}
*/
public Set<Channel> getInactiveChannels() {
return Collections.unmodifiableSet(inactivePeers.keySet());
}
/**
* Executes the <tt>callback</tt> for each active peer in this manager.
*
* @param callback
* the callback
*/
public void executeActive(PeerWirePeerCallback callback) {
for (final Entry<Channel, PeerWirePeer> entry : this.activePeers
.entrySet()) {
@@ -147,6 +278,12 @@ public class PeerManager implements Iterable<PeerWirePeer> {
}
}
/**
* Executes the <tt>callback</tt> for each inactive peer in this manager.
*
* @param callback
* the callback
*/
public void executeInactive(PeerWirePeerCallback callback) {
for (final Entry<Channel, PeerWirePeer> entry : this.inactivePeers
.entrySet()) {
@@ -154,6 +291,14 @@ public class PeerManager implements Iterable<PeerWirePeer> {
}
}
/**
* Executes the <tt>callback</tt> for each active and inactive peer in this
* manager. This method call firstly {@link #executeActive(PeerCallback)}
* and later {@link #executeInactive(PeerCallback)}.
*
* @param callback
* the callback
*/
public void execute(PeerWirePeerCallback callback) {
executeActive(callback);
executeInactive(callback);
@@ -164,10 +309,20 @@ public class PeerManager implements Iterable<PeerWirePeer> {
return activePeers.values().iterator();
}
/**
* Get the torrent context
*
* @return the torrent context
*/
public TorrentContext getContext() {
return context;
}
/**
* Get the connection manager
*
* @return the connection manager
*/
public ConnectionManager getConnectionManager() {
return connectionManager;
}

View File

@@ -19,6 +19,13 @@ import net.torrent.protocol.datastore.TorrentDatastore;
import net.torrent.torrent.Torrent;
import net.torrent.torrent.context.TorrentContext;
/**
* This is an simple class used to group up all managers for a single torrent.
* It does not do nothing special is just used to avoid incompatibility with
* handlers once a new manager is created.
*
* @author <a href="http://www.rogiel.com/">Rogiel Josias Sulzbach</a>
*/
public class TorrentManager {
private final TorrentContext context;

View File

@@ -27,27 +27,99 @@ import net.torrent.torrent.TorrentPart;
import net.torrent.torrent.context.TorrentContext;
import net.torrent.torrent.context.TorrentPeer;
/**
* The upload manager keep an track over current parts being uploaded at the
* moment. Notice that the part is remove right after the last byte is written,
* this is not a guarantee that the peer has received data nor that it is still
* alive.
* <p>
* Please note that the manager actually does not make any decision nor declines
* an requested piece.
* <p>
* You can {@link Iterable iterate} over this manager to get <b>current</b>
* uploading {@link TorrentPart parts}.
* <p>
* FIXME <b>BUG NOTICE</b>: Note that this manager has an huge issue. When two
* or more peers request the very same part, only the latter will be maintained
* under control by this manager. This should be fixed soon and changes to the
* manager signature can occur. The affected methods are:
* {@link UploadManager#getPeer(TorrentPart)},
* {@link UploadManager#remove(TorrentPart)} and
* {@link UploadManager#getActiveUploads()}.
*
* @author <a href="http://www.rogiel.com/">Rogiel Josias Sulzbach</a>
*/
// TODO track pieces which have been requested but the upload is not interesting
// right now, but might be later.
// FIXME this manager has an issue: if two peers request the same piece, only
// the latter one will be managed! UploadManager#getPeer(TorrentPart part) needs
// a fix.
public class UploadManager implements Iterable<TorrentPart> {
/**
* The torrent context
*/
private final TorrentContext context;
/**
* An map of requested pieces. Only accepted pieces are registered.
*/
private final Map<TorrentPart, TorrentPeer> activeParts = new HashMap<TorrentPart, TorrentPeer>();
/**
* Creates a new instance
*
* @param context
* the torrent context
*/
public UploadManager(TorrentContext context) {
this.context = context;
}
public boolean isUploading(TorrentPart torrentPart) {
return activeParts.containsKey(torrentPart);
/**
* Test if the given <tt>part</tt> is being uploaded.
*
* @param part
* the part
* @return true if piece has been requested <b>AND</b> accepted.
*/
public boolean isUploading(TorrentPart part) {
return activeParts.containsKey(part);
}
/**
* Test if uploads are being made to the given <tt>peer</tt>
*
* @param peer
* the peer
* @return true if at least one piece has been requested an piece <b>AND</b>
* the request was accepted
*/
public boolean isUploading(TorrentPeer peer) {
return activeParts.containsValue(peer);
}
public TorrentPeer getPeer(TorrentPart torrentPart) {
return activeParts.get(torrentPart);
/**
* This method retrieve the peer which has requested the given
* <tt>piece</tt>.
* <p>
* Note that this method has an huge issue: when two or more peers request
* the very same part (whichever piece), only the latter will be maintained
* under control by this manager. This need to be fixed soon and this method
* deprecated. You should avoid relying on it!
*
* @param part
* @return
*/
public TorrentPeer getPeer(TorrentPart part) {
return activeParts.get(part);
}
/**
* Get the pieces being uploaded to the given <tt>peer</tt>
*
* @param peer
* the peer
* @return an {@link Set} of all parts being uploaded to this peer.
*/
public Set<TorrentPart> getTorrentParts(TorrentPeer peer) {
final Set<TorrentPart> parts = new HashSet<TorrentPart>();
for (final Entry<TorrentPart, TorrentPeer> entry : activeParts
@@ -58,18 +130,51 @@ public class UploadManager implements Iterable<TorrentPart> {
return parts;
}
/**
* Tests if this manager has no active uploads.
*
* @return true if no uploads are being done
*/
public boolean isInactive() {
return activeParts.isEmpty();
}
public TorrentPeer add(TorrentPart torrentPart, TorrentPeer peer) {
return activeParts.put(torrentPart, peer);
/**
* Adds a new part to this manager
*
* @param part
* the uploaded part
* @param peer
* the downloading peer
* @return the downloding {@link TorrentPeer}
*/
public TorrentPeer add(TorrentPart part, TorrentPeer peer) {
return activeParts.put(part, peer);
}
public TorrentPeer remove(TorrentPart torrentPart) {
return activeParts.remove(torrentPart);
/**
* Removes the given <tt>part</tt> from this manager
* <p>
* Note that this method has an huge issue: when two or more peers request
* the very same part (whichever piece), only the latter will be maintained
* under control by this manager. This need to be fixed soon and this method
* deprecated. You should avoid relying on it!
*
* @param part
* the part
* @return the downloading peer for the <tt>part</tt>
*/
public TorrentPeer remove(TorrentPart part) {
return activeParts.remove(part);
}
/**
* Removes all uploads for the given <tt>peer</tt>
*
* @param peer
* the peer
* @return the set of {@link TorrentPart parts} being uploaded to that peer
*/
public Set<TorrentPart> remove(TorrentPeer peer) {
final Set<TorrentPart> parts = new HashSet<TorrentPart>();
for (TorrentPart part : getTorrentParts(peer)) {
@@ -79,10 +184,25 @@ public class UploadManager implements Iterable<TorrentPart> {
return parts;
}
/**
* Get the amount of active uploads
*
* @return the count of active uploads
*/
public int getActiveUploadsCount() {
return activeParts.size();
}
/**
* Get an {@link Map} of active uploads
* <p>
* Note that this method has an huge issue: when two or more peers request
* the very same part (whichever piece), only the latter will be maintained
* under control by this manager. This need to be fixed soon and this method
* deprecated. You should avoid relying on it!
*
* @return an {@link Map} of active uploads
*/
public Map<TorrentPart, TorrentPeer> getActiveUploads() {
return Collections.unmodifiableMap(activeParts);
}
@@ -92,6 +212,11 @@ public class UploadManager implements Iterable<TorrentPart> {
return activeParts.keySet().iterator();
}
/**
* Get the torrent context
*
* @return the torrent context
*/
public TorrentContext getContext() {
return context;
}

View File

@@ -104,6 +104,11 @@ public class BitfieldMessage implements PeerWireWritableMessage,
}
}
@Override
public int length() {
return bitfield.size() / 8 + 1;
}
public BitSet getBitfield() {
return bitfield;
}

View File

@@ -75,6 +75,11 @@ public class CancelMessage implements PeerWireWritableMessage,
buffer.writeInt(length);
}
@Override
public int length() {
return 3 * 4 + 1;
}
public int getIndex() {
return index;
}

View File

@@ -45,6 +45,11 @@ public class ChokeMessage implements PeerWireWritableMessage,
buffer.writeByte(MESSAGE_ID);
}
@Override
public int length() {
return 1;
}
@Override
public String toString() {
return "ChokeMessage []";

View File

@@ -74,16 +74,7 @@ public class HandshakeMessage implements PeerWireWritableMessage,
}
}
}
System.out.println(reserved);
// int bit = 0;
// for (int i = 0; i < 8; i++) {
// byte b = buffer.readByte();
// for (int j = 128; j > 0; j >>= 1) {
// reserved.set(bit++, (b & j) != 0);
// }
// }
// this.reserved = buffer.readBytes(8).array();
this.infohash = buffer.readBytes(20).array();
this.peerId = buffer.readBytes(20).array();
}
@@ -108,6 +99,11 @@ public class HandshakeMessage implements PeerWireWritableMessage,
buffer.writeBytes(this.peerId);
}
@Override
public int length() {
return 1 + pstrlen + pstr.length() + 8 + infohash.length + peerId.length;
}
public int getPstrlen() {
return pstrlen;
}

View File

@@ -72,6 +72,11 @@ public class HaveMessage implements PeerWireWritableMessage,
buffer.writeInt(piece);
}
@Override
public int length() {
return 5;
}
public int getPiece() {
return piece;
}

View File

@@ -45,6 +45,11 @@ public class InterestedMessage implements PeerWireWritableMessage,
buffer.writeByte(MESSAGE_ID);
}
@Override
public int length() {
return 1;
}
@Override
public String toString() {
return "InterestedMessage []";

View File

@@ -47,6 +47,11 @@ public class KeepAliveMessage implements PeerWireWritableMessage,
public void write(ChannelBuffer buffer) throws IOException {
}
@Override
public int length() {
return 0;
}
@Override
public String toString() {
return "KeepAliveMessage []";

View File

@@ -45,6 +45,11 @@ public class NotInterestedMessage implements PeerWireWritableMessage,
buffer.writeByte(MESSAGE_ID);
}
@Override
public int length() {
return 1;
}
@Override
public String toString() {
return "NotInterestedMessage []";

View File

@@ -91,6 +91,11 @@ public class PieceMessage implements PeerWireWritableMessage,
buffer.writeBytes(block);
}
@Override
public int length() {
return 9 + block.capacity();
}
public int getIndex() {
return index;
}

View File

@@ -62,6 +62,11 @@ public class PortMessage implements PeerWireWritableMessage,
buffer.writeShort(port);
}
@Override
public int length() {
return 3;
}
public short getPort() {
return port;
}

View File

@@ -113,6 +113,11 @@ public class RequestMessage implements PeerWireWritableMessage,
buffer.writeInt(length);
}
@Override
public int length() {
return 13;
}
public int getIndex() {
return index;
}

View File

@@ -45,6 +45,11 @@ public class UnchokeMessage implements PeerWireWritableMessage,
buffer.writeByte(MESSAGE_ID);
}
@Override
public int length() {
return 1;
}
@Override
public String toString() {
return "UnchokeMessage []";

View File

@@ -68,7 +68,7 @@ import org.jboss.netty.buffer.ChannelBuffer;
*/
public class AllowedFastMessage implements PeerWireWritableMessage,
PeerWireReadableMessage {
public static final byte MESSAGE_ID = 0x0D;
public static final byte MESSAGE_ID = 0x11;
/**
* The pieces indexes
@@ -98,6 +98,11 @@ public class AllowedFastMessage implements PeerWireWritableMessage,
}
}
@Override
public int length() {
return 1 + pieces.length * 4;
}
public int[] getPieces() {
return pieces;
}

View File

@@ -53,6 +53,11 @@ public class HaveAllMessage implements PeerWireWritableMessage,
buffer.writeByte(MESSAGE_ID);
}
@Override
public int length() {
return 1;
}
@Override
public String toString() {
return "HaveAllMessage []";

View File

@@ -53,6 +53,11 @@ public class HaveNoneMessage implements PeerWireWritableMessage,
buffer.writeByte(MESSAGE_ID);
}
@Override
public int length() {
return 1;
}
@Override
public String toString() {
return "HaveNoneMessage []";

View File

@@ -95,6 +95,11 @@ public class RejectMessage implements PeerWireWritableMessage,
buffer.writeInt(length);
}
@Override
public int length() {
return 13;
}
public int getIndex() {
return index;
}

View File

@@ -70,6 +70,11 @@ public class SuggestPieceMessage implements PeerWireWritableMessage,
buffer.writeInt(piece);
}
@Override
public int length() {
return 5;
}
public int getPiece() {
return piece;
}

View File

@@ -0,0 +1,52 @@
/*
* Copyright 2011 Rogiel Josias Sulzbach
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.torrent.protocol.peerwire.message.header;
import net.torrent.protocol.peerwire.codec.PeerWireReadableMessage;
import net.torrent.protocol.peerwire.message.fast.AllowedFastMessage;
import net.torrent.protocol.peerwire.message.fast.HaveAllMessage;
import net.torrent.protocol.peerwire.message.fast.HaveNoneMessage;
import net.torrent.protocol.peerwire.message.fast.RejectMessage;
import net.torrent.protocol.peerwire.message.fast.SuggestPieceMessage;
import net.torrent.torrent.context.TorrentPeerCapabilities.TorrentPeerCapability;
/**
* The message header manager for {@link TorrentPeerCapability#FAST_PEERS Fast
* Peers extension}.
*
* @author <a href="http://www.rogiel.com/">Rogiel Josias Sulzbach</a>
*/
public final class PeerWireFastPeersMessageHeaderManager implements
PeerWireMessageHeaderManager {
public static final PeerWireFastPeersMessageHeaderManager SHARED_INSTANCE = new PeerWireFastPeersMessageHeaderManager();
@Override
public PeerWireReadableMessage getMessage(byte id) {
switch (id) {
case AllowedFastMessage.MESSAGE_ID:
return new AllowedFastMessage();
case HaveAllMessage.MESSAGE_ID:
return new HaveAllMessage();
case HaveNoneMessage.MESSAGE_ID:
return new HaveNoneMessage();
case RejectMessage.MESSAGE_ID:
return new RejectMessage();
case SuggestPieceMessage.MESSAGE_ID:
return new SuggestPieceMessage();
}
return null;
}
}

View File

@@ -0,0 +1,36 @@
/*
* Copyright 2011 Rogiel Josias Sulzbach
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.torrent.protocol.peerwire.message.header;
import net.torrent.protocol.peerwire.codec.PeerWireReadableMessage;
/**
* The message header manager return the message to each given message id (aka
* opcode).
*
* @author <a href="http://www.rogiel.com/">Rogiel Josias Sulzbach</a>
*/
public interface PeerWireMessageHeaderManager {
/**
* Return the message for an given <tt>id</tt>. If this handler cannot
* handle the message id, <b>null</b> must be returned.
*
* @param id
* the message id
* @return the message, <tt>null</tt> if <tt>id</tt> is unknown.
*/
PeerWireReadableMessage getMessage(byte id);
}

View File

@@ -0,0 +1,65 @@
/*
* Copyright 2011 Rogiel Josias Sulzbach
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package net.torrent.protocol.peerwire.message.header;
import net.torrent.protocol.peerwire.codec.PeerWireReadableMessage;
import net.torrent.protocol.peerwire.message.BitfieldMessage;
import net.torrent.protocol.peerwire.message.CancelMessage;
import net.torrent.protocol.peerwire.message.ChokeMessage;
import net.torrent.protocol.peerwire.message.HaveMessage;
import net.torrent.protocol.peerwire.message.InterestedMessage;
import net.torrent.protocol.peerwire.message.NotInterestedMessage;
import net.torrent.protocol.peerwire.message.PieceMessage;
import net.torrent.protocol.peerwire.message.PortMessage;
import net.torrent.protocol.peerwire.message.RequestMessage;
import net.torrent.protocol.peerwire.message.UnchokeMessage;
/**
* The default PeerWire message handler
*
* @author <a href="http://www.rogiel.com/">Rogiel Josias Sulzbach</a>
*/
public final class PeerWireSpecificationMessageHeaderManager implements
PeerWireMessageHeaderManager {
public static final PeerWireSpecificationMessageHeaderManager SHARED_INSTANCE = new PeerWireSpecificationMessageHeaderManager();
@Override
public PeerWireReadableMessage getMessage(byte id) {
switch (id) {
case BitfieldMessage.MESSAGE_ID:
return new BitfieldMessage();
case CancelMessage.MESSAGE_ID:
return new CancelMessage();
case ChokeMessage.MESSAGE_ID:
return new ChokeMessage();
case HaveMessage.MESSAGE_ID:
return new HaveMessage();
case InterestedMessage.MESSAGE_ID:
return new InterestedMessage();
case NotInterestedMessage.MESSAGE_ID:
return new NotInterestedMessage();
case PieceMessage.MESSAGE_ID:
return new PieceMessage();
case PortMessage.MESSAGE_ID:
return new PortMessage();
case RequestMessage.MESSAGE_ID:
return new RequestMessage();
case UnchokeMessage.MESSAGE_ID:
return new UnchokeMessage();
}
return null;
}
}

View File

@@ -35,10 +35,10 @@ public class TorrentContext {
private final TorrentBitfield bitfield = new TorrentBitfield(this);
/**
* The capabilities
* The capabilities supported in this context
*/
private final TorrentPeerCapabilities capabilites = new TorrentPeerCapabilities(
TorrentPeerCapability.DHT, TorrentPeerCapability.FAST_PEERS);
TorrentPeerCapability.DHT);
private final TorrentSwarm swarm = new TorrentSwarm(this);
/**

View File

@@ -19,6 +19,9 @@ import java.util.ArrayList;
import java.util.BitSet;
import java.util.List;
import net.torrent.protocol.peerwire.message.header.PeerWireFastPeersMessageHeaderManager;
import net.torrent.protocol.peerwire.message.header.PeerWireMessageHeaderManager;
/**
* Object containing peers support for certain capabilities.
*
@@ -66,7 +69,7 @@ public class TorrentPeerCapabilities {
* @return true if capability is supported, false otherwise.
*/
public boolean supports(TorrentPeerCapability capability) {
return capabilities.get(capability.getBit());
return capabilities.get(capability.bit);
}
/**
@@ -121,7 +124,7 @@ public class TorrentPeerCapabilities {
/**
* Fast peers support
*/
FAST_PEERS(62),
FAST_PEERS(62, PeerWireFastPeersMessageHeaderManager.SHARED_INSTANCE),
/**
* DHT Support
*/
@@ -130,25 +133,34 @@ public class TorrentPeerCapabilities {
/**
* The bit index for this capability
*/
private final int bit;
public final int bit;
/**
* The header manager for this extension
*/
public final PeerWireMessageHeaderManager headerManager;
/**
* Creates a new capability
*
* @param bit
* the bit marking this capability
* @param headerManager
* the header manager for this extension
*/
TorrentPeerCapability(int bit) {
TorrentPeerCapability(int bit,
PeerWireMessageHeaderManager headerManager) {
this.bit = bit;
this.headerManager = headerManager;
}
/**
* Get the bit marking this capability
* Creates a new capability will a <tt>null</tt> <tt>handlerManager</tt>
*
* @return the bit marking the capability
* @param bit
* the bit marking this capability
*/
public int getBit() {
return bit;
TorrentPeerCapability(int bit) {
this(bit, null);
}
}
}

View File

@@ -18,6 +18,7 @@ package net.torrent.torrent.piece;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import net.torrent.protocol.peerwire.manager.TorrentManager;
import net.torrent.torrent.TorrentPiece;
@@ -39,7 +40,7 @@ public class ScoredPieceSelector extends SortedListPieceSelector {
/**
* Call counter. Used to sort the list.
*/
private int calls = 0;
private AtomicInteger calls = new AtomicInteger(0);
/**
* The piece comparator
@@ -69,9 +70,9 @@ public class ScoredPieceSelector extends SortedListPieceSelector {
@Override
public synchronized TorrentPiece select(TorrentPeer peer) {
if (calls % SORT_INTERVAL == 0)
if (calls.get() % SORT_INTERVAL == 0)
this.sort(pieces);
calls++;
calls.incrementAndGet();
return super.select(peer);
}
}