1
0
mirror of https://github.com/Rogiel/l2jserver2 synced 2025-12-05 23:22:47 +00:00

Decouples CharacterService from NetworkService

This commit is contained in:
2011-12-18 18:07:59 -02:00
parent 87a1c0d3a5
commit b9e460b70c
17 changed files with 702 additions and 244 deletions

View File

@@ -19,8 +19,7 @@ package com.l2jserver.service.core.threading;
import java.util.concurrent.TimeUnit;
/**
* This is an ThreadPool you can use it to schedule tasks in the future or to
* repeat them many times.
* This is an ThreadPool that you can use to asynchronously execute tasks.
*
* @author <a href="http://www.rogiel.com">Rogiel</a>
*/
@@ -72,4 +71,9 @@ public interface ThreadPool {
* execute tasks.
*/
void dispose();
/**
* @return true if the thread pool is no longer usable (i.e. was disposed)
*/
boolean isDisposed();
}

View File

@@ -0,0 +1,55 @@
/*
* This file is part of l2jserver2 <l2jserver2.com>.
*
* l2jserver2 is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* l2jserver2 is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with l2jserver2. If not, see <http://www.gnu.org/licenses/>.
*/
package com.l2jserver.service.core.threading;
/**
* The priority of the thread pool
*
* @author <a href="http://www.rogiel.com">Rogiel</a>
*/
public enum ThreadPoolPriority {
/**
* High priority.
* <p>
* Processor will block {@link ThreadPoolPriority#NORMAL} and
* {@link ThreadPoolPriority#LOW} priority threads in order to finish tasks
* in pools on this priority.
*/
HIGH(Thread.MAX_PRIORITY),
/**
* Normal priority.
* <p>
* Processor will block {@link ThreadPoolPriority#LOW} priority threads in
* order to finish tasks in pools on this priority.
*/
NORMAL(Thread.NORM_PRIORITY),
/**
* Low priority.
* <p>
* Processor will give very low priority for tasks in this level.
*/
LOW(Thread.MIN_PRIORITY);
/**
* The priority to be used on {@link Thread}
*/
public final int threadPriority;
ThreadPoolPriority(int threadPriority) {
this.threadPriority = threadPriority;
}
}

View File

@@ -79,16 +79,72 @@ public interface ThreadService extends Service {
ScheduledAsyncFuture async(long delay, TimeUnit unit, long repeat,
Runnable task);
/**
* Creates a new thread pool with {@link ThreadPoolPriority#NORMAL normal}
* priority and that can be increased up to {@link Integer#MAX_VALUE} active
* threads, if necessary.
* <p>
* Threads in this pool will never expire.
*
* @param name
* the pool name
* @param threads
* the maximum amount of active threads
* @return the new thread pool
*/
ThreadPool createThreadPool(String name, int threads);
/**
* Creates a new thread pool that can increase up to
* {@link Integer#MAX_VALUE} active threads, if necessary.
* <p>
* Threads in this pool will never expire.
*
* @param name
* the pool name
* @param threads
* the maximum amount of active threads
* @param priority
* the processor scheduling priority
* @return the new thread pool
*/
ThreadPool createThreadPool(String name, int threads,
ThreadPoolPriority priority);
/**
* Creates a new thread pool with {@link ThreadPoolPriority#NORMAL normal}
* priority.
*
* @param name
* the pool name
* @param threads
* the maximum amount of active threads
* @param threadTimeout
* the time it takes to expire an inactive thread
* @param threadTimeoutUnit
* the {@link TimeUnit} for <code>threadTimeout</code>
* @return the new thread pool
*/
ThreadPool createThreadPool(String name, int threads, long threadTimeout,
TimeUnit threadTimeoutUnit);
/**
* Creates a new thread pool.
*
* @param name
* the pool name
* @param maxThreads
* the maximum amount of threads.
* @param threads
* the maximum amount of active threads
* @param threadTimeout
* the time it takes to expire an inactive thread
* @param threadTimeoutUnit
* the {@link TimeUnit} for <code>threadTimeout</code>
* @param priority
* the processor scheduling priority
* @return the new thread pool
*/
ThreadPool createThreadPool(String name, int maxThreads);
ThreadPool createThreadPool(String name, int threads, long threadTimeout,
TimeUnit threadTimeoutUnit, ThreadPoolPriority priority);
/**
* Disposes an given thread pool. After disposing the thread pool will no

View File

@@ -22,12 +22,13 @@ import java.util.Map.Entry;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -62,7 +63,7 @@ public class ThreadServiceImpl extends AbstractService implements ThreadService
@Override
protected void doStart() throws ServiceStartException {
threadPools = CollectionFactory.newMap();
pool = createThreadPool("shared", 20);
pool = createThreadPool("shared", 1);
pool.async(50, TimeUnit.MILLISECONDS, 50, new Runnable() {
@Override
@@ -111,21 +112,61 @@ public class ThreadServiceImpl extends AbstractService implements ThreadService
}
@Override
public ThreadPool createThreadPool(String name, int maxThreads) {
log.debug("Creating new ThreadPool {} with maximum of {} threads",
name, maxThreads);
final ThreadPoolImpl pool = new ThreadPoolImpl(name,
Executors.newScheduledThreadPool(maxThreads));
public ThreadPool createThreadPool(final String name, final int threads,
final long threadTimeout, final TimeUnit threadTimeoutUnit,
final ThreadPoolPriority priority) {
log.debug(
"Creating new {} priority ThreadPool {}; threads: {}, timeout:{}",
new Object[] { priority, name, threads, threadTimeout });
final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(
threads);
if (threadTimeout >= 1) {
executor.setKeepAliveTime(threadTimeout, threadTimeoutUnit);
executor.allowCoreThreadTimeOut(true);
}
executor.setThreadFactory(new ThreadFactory() {
private final AtomicInteger threadNumber = new AtomicInteger(1);
@Override
public Thread newThread(Runnable r) {
final Thread thread = new Thread(r, name + "-"
+ threadNumber.getAndIncrement());
thread.setPriority(priority.threadPriority);
return thread;
}
});
final ThreadPoolImpl pool = new ThreadPoolImpl(name, executor);
threadPools.put(name, pool);
return pool;
}
@Override
public ThreadPool createThreadPool(String name, int threads) {
return createThreadPool(name, threads, -1, null,
ThreadPoolPriority.NORMAL);
}
@Override
public ThreadPool createThreadPool(String name, int threads,
ThreadPoolPriority priority) {
return createThreadPool(name, threads, -1, null, priority);
}
@Override
public ThreadPool createThreadPool(String name, int threads,
long threadTimeout, TimeUnit threadTimeoutUnit) {
return createThreadPool(name, threads, threadTimeout,
threadTimeoutUnit, ThreadPoolPriority.NORMAL);
}
@Override
public void dispose(ThreadPool pool) {
log.debug("Disposing ThreadPool {}", pool);
if (pool instanceof ThreadPoolImpl) {
((ThreadPoolImpl) pool).executor.shutdown();
threadPools.remove(((ThreadPoolImpl) pool).name);
return;
}
throw new UnsupportedOperationException(
"The given ThreadPool is not supported by this service");
@@ -331,7 +372,7 @@ public class ThreadServiceImpl extends AbstractService implements ThreadService
/**
* The backing executor
*/
private final ScheduledExecutorService executor;
private final ScheduledThreadPoolExecutor executor;
/**
* The list of active and pending futures
*/
@@ -342,9 +383,9 @@ public class ThreadServiceImpl extends AbstractService implements ThreadService
* @param name
* the pool name
* @param executor
* the backing {@link ScheduledExecutorService}
* the backing {@link ScheduledThreadPoolExecutor}
*/
public ThreadPoolImpl(String name, ScheduledExecutorService executor) {
public ThreadPoolImpl(String name, ScheduledThreadPoolExecutor executor) {
this.name = name;
this.executor = executor;
}
@@ -392,5 +433,10 @@ public class ThreadServiceImpl extends AbstractService implements ThreadService
}
}
}
@Override
public boolean isDisposed() {
return executor.isShutdown();
}
}
}

View File

@@ -0,0 +1,156 @@
/*
* This file is part of l2jserver2 <l2jserver2.com>.
*
* l2jserver2 is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* l2jserver2 is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with l2jserver2. If not, see <http://www.gnu.org/licenses/>.
*/
package com.l2jserver.util;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import com.l2jserver.service.core.threading.Task;
import com.l2jserver.service.core.threading.ThreadPool;
/**
* @author <a href="http://www.rogiel.com">Rogiel</a>
*/
public class ThreadPoolUtils {
/**
* Wraps an {@link ThreadPool} into an {@link Executor}
*
* @param pool
* the pool the to be wrapped
* @return the wrapped {@link Executor}
*/
public static ExecutorService wrap(final ThreadPool pool) {
return new ExecutorService() {
@Override
public void execute(final Runnable command) {
pool.async(wrap(command));
}
@Override
public void shutdown() {
pool.dispose();
}
@Override
public List<Runnable> shutdownNow() {
pool.dispose();
return null;
}
@Override
public boolean isShutdown() {
return pool.isDisposed();
}
@Override
public boolean isTerminated() {
return pool.isDisposed();
}
@Override
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
throw new UnsupportedOperationException();
}
@Override
public <T> Future<T> submit(Callable<T> task) {
return pool.async(wrap(task));
}
@Override
public <T> Future<T> submit(Runnable task, T result) {
return pool.async(wrap(Executors.callable(task, result)));
}
@Override
public Future<?> submit(Runnable task) {
return pool.async(wrap(task));
}
@Override
public <T> List<Future<T>> invokeAll(
Collection<? extends Callable<T>> tasks)
throws InterruptedException {
throw new UnsupportedOperationException();
}
@Override
public <T> List<Future<T>> invokeAll(
Collection<? extends Callable<T>> tasks, long timeout,
TimeUnit unit) throws InterruptedException {
throw new UnsupportedOperationException();
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
throw new UnsupportedOperationException();
}
@Override
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit) throws InterruptedException,
ExecutionException, TimeoutException {
throw new UnsupportedOperationException();
}
};
}
/**
* Wraps an {@link Runnable} into an {@link Task}
*
* @param command
* the {@link Runnable} to be wrapped
* @return the wrapped {@link Runnable}
*/
public static Task<Runnable> wrap(final Runnable command) {
return new Task<Runnable>() {
@Override
public Runnable call() throws Exception {
command.run();
return command;
}
};
}
/**
* Wraps an {@link Runnable} into an {@link Task}
*
* @param <T>
* the {@link Task} return type
* @param command
* the {@link Runnable} to be wrapped
* @return the wrapped {@link Runnable}
*/
public static <T> Task<T> wrap(final Callable<T> command) {
return new Task<T>() {
@Override
public T call() throws Exception {
return command.call();
}
};
}
}