1
0
mirror of https://github.com/Rogiel/l2jserver2 synced 2025-12-05 23:22:47 +00:00

Adds support for asynchronous database operations

This commit is contained in:
2011-12-16 12:30:18 -02:00
parent 82f6bcbc1c
commit dea1196859
11 changed files with 269 additions and 17 deletions

View File

@@ -73,4 +73,21 @@ public interface AsyncFuture<T> extends Future<T> {
* note that false will be returned if the timeout has expired too!
*/
boolean awaitUninterruptibly(long timeout, TimeUnit unit);
/**
* Adds an listener that will be notified once the executing has been
* completed.
*
* @param listener
* the listener to be added
*/
void addListener(AsyncListener<T> listener);
/**
* Removes an listener
*
* @param listener
* the listener to be removed
*/
void removeListener(AsyncListener<T> listener);
}

View File

@@ -0,0 +1,34 @@
/*
* This file is part of l2jserver2 <l2jserver2.com>.
*
* l2jserver2 is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* l2jserver2 is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with l2jserver2. If not, see <http://www.gnu.org/licenses/>.
*/
package com.l2jserver.service.core.threading;
/**
* @author <a href="http://www.rogiel.com">Rogiel</a>
* @param <T>
* the return type of the {@link AsyncFuture}
*/
public interface AsyncListener<T> {
/**
* Called once the {@link AsyncFuture} has finished executing the task
*
* @param future
* the future
* @param object
* the object returned. If any.
*/
void onComplete(AsyncFuture<T> future, T object);
}

View File

@@ -16,6 +16,9 @@
*/
package com.l2jserver.service.core.threading;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.Callable;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutionException;
@@ -34,6 +37,7 @@ import com.google.common.base.Preconditions;
import com.l2jserver.service.AbstractService;
import com.l2jserver.service.ServiceStartException;
import com.l2jserver.service.ServiceStopException;
import com.l2jserver.util.factory.CollectionFactory;
/**
* The default implementation for {@link ThreadService}
@@ -51,9 +55,25 @@ public class ThreadServiceImpl extends AbstractService implements ThreadService
*/
private ThreadPool pool;
/**
* The list of active thread pools
*/
private Map<String, ThreadPoolImpl> threadPools;
@Override
protected void doStart() throws ServiceStartException {
pool = createThreadPool("shared", 20);
threadPools = CollectionFactory.newMap();
pool.async(50, TimeUnit.MILLISECONDS, 50, new Runnable() {
@Override
public void run() {
for (final Entry<String, ThreadPoolImpl> entry : threadPools
.entrySet()) {
entry.getValue().notifyListeners();
}
}
});
// scheduler = Executors.newScheduledThreadPool(10);
// async = Executors.newCachedThreadPool();
}
@@ -96,15 +116,19 @@ public class ThreadServiceImpl extends AbstractService implements ThreadService
public ThreadPool createThreadPool(String name, int maxThreads) {
log.debug("Creating new ThreadPool {} with maximum of {} threads",
name, maxThreads);
return new ThreadPoolImpl(name,
final ThreadPoolImpl pool = new ThreadPoolImpl(name,
Executors.newScheduledThreadPool(maxThreads));
threadPools.put(name, pool);
return pool;
}
@Override
public void dispose(ThreadPool pool) {
log.debug("Disposing ThreadPool {}", pool);
if (pool instanceof ThreadPoolImpl)
if (pool instanceof ThreadPoolImpl) {
((ThreadPoolImpl) pool).executor.shutdown();
threadPools.remove(((ThreadPoolImpl) pool).name);
}
throw new UnsupportedOperationException(
"The given ThreadPool is not supported by this service");
}
@@ -113,6 +137,7 @@ public class ThreadServiceImpl extends AbstractService implements ThreadService
protected void doStop() throws ServiceStopException {
dispose(pool);
pool = null;
threadPools = null;
}
/**
@@ -127,6 +152,10 @@ public class ThreadServiceImpl extends AbstractService implements ThreadService
* The future that is delegated in this implementation
*/
private final Future<T> future;
/**
* List of all active listeners
*/
private List<AsyncListener<T>> listeners = CollectionFactory.newList();
/**
* Creates a new instance
@@ -206,6 +235,31 @@ public class ThreadServiceImpl extends AbstractService implements ThreadService
return false;
}
}
@Override
public void addListener(AsyncListener<T> listener) {
listeners.add(listener);
}
@Override
public void removeListener(AsyncListener<T> listener) {
listeners.remove(listener);
}
/**
* Notify all listeners that the task has been completed
*/
private void notifyListeners() {
for (final AsyncListener<T> listener : listeners) {
T object = null;
try {
object = this.get(0, TimeUnit.MILLISECONDS);
} catch (InterruptedException | ExecutionException
| TimeoutException e) {
}
listener.onComplete(this, object);
}
}
}
/**
@@ -280,6 +334,11 @@ public class ThreadServiceImpl extends AbstractService implements ThreadService
* The backing executor
*/
private final ScheduledExecutorService executor;
/**
* The list of active and pending futures
*/
private final List<AsyncFutureImpl<?>> activeFutures = CollectionFactory
.newList();
/**
* @param name
@@ -322,5 +381,17 @@ public class ThreadServiceImpl extends AbstractService implements ThreadService
public void dispose() {
ThreadServiceImpl.this.dispose(this);
}
/**
* Notify all future listeners when the task is complete.
*/
private void notifyListeners() {
for (final AsyncFutureImpl<?> future : activeFutures) {
if (future.isDone()) {
future.notifyListeners();
activeFutures.remove(future);
}
}
}
}
}

View File

@@ -17,10 +17,13 @@
package com.l2jserver.service.database;
import java.util.Iterator;
import java.util.concurrent.Callable;
import com.google.inject.Inject;
import com.l2jserver.model.Model;
import com.l2jserver.model.id.ID;
import com.l2jserver.service.core.threading.AsyncFuture;
import com.l2jserver.service.core.threading.ThreadService;
/**
* Abstract DAO implementations. Store an instance of {@link DatabaseService}.
@@ -41,6 +44,12 @@ public abstract class AbstractDAO<T extends Model<?>, I extends ID<?>>
*/
protected final DatabaseService database;
@Inject
/**
* The ThreadService used to execute operations asynchronously.
*/
protected ThreadService threadService;
/**
* @param database
* the database service
@@ -71,21 +80,75 @@ public abstract class AbstractDAO<T extends Model<?>, I extends ID<?>>
}
}
@Override
@SafeVarargs
public final int saveObjects(T... objects) {
int rows = 0;
for (final T object : objects) {
rows += save(object);
}
return rows;
}
@Override
@SafeVarargs
public final AsyncFuture<Integer> saveObjectsAsync(final T... objects) {
return threadService.async(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return saveObjects(objects);
}
});
}
@Override
public void insert(T object) {
insertObjects(wrap(object));
}
@Override
@SafeVarargs
public final AsyncFuture<Integer> insertObjectsAsync(final T... objects) {
return threadService.async(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return insertObjects(objects);
}
});
}
@Override
public void update(T object) {
updateObjects(wrap(object));
}
@Override
@SafeVarargs
public final AsyncFuture<Integer> updateObjectsAsync(final T... objects) {
return threadService.async(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return updateObjects(objects);
}
});
}
@Override
public void delete(T object) {
deleteObjects(wrap(object));
}
@Override
@SafeVarargs
public final AsyncFuture<Integer> deleteObjectsAsync(final T... objects) {
return threadService.async(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
return deleteObjects(objects);
}
});
}
@Override
public Iterator<T> iterator() {
return new Iterator<T>() {

View File

@@ -21,6 +21,7 @@ import java.util.Iterator;
import com.l2jserver.model.Model;
import com.l2jserver.model.id.ID;
import com.l2jserver.service.core.threading.AsyncFuture;
/**
* The Data Access Object interface used used to retrieve, save and remove
@@ -65,8 +66,8 @@ public interface DataAccessObject<O extends Model<?>, I extends ID<?>> extends
Collection<I> selectIDs();
/**
* Save the instance to the database. If a new database entry was created
* returns true. This method will only save if the object has changed.
* Save the instance to the database. This method will only save if the
* object has changed.
*
* @param object
* the object
@@ -75,6 +76,38 @@ public interface DataAccessObject<O extends Model<?>, I extends ID<?>> extends
*/
int save(O object);
/**
* Save several instances to the database. This method will only save if the
* object has changed.
* <p>
* Note that differently from {@link #insertObjects(Model...)},
* {@link #updateObjects(Model...)} and {@link #deleteObjects(Model...)},
* this method does not uses an transaction and could have a bigger
* performance hit.
*
* @param objects
* the objects
* @return the number of affected rows
*/
int saveObjects(@SuppressWarnings("unchecked") O... objects);
/**
* Asynchronously save several instances to the database. This method will
* only save if the object has changed.
* <p>
* Note that differently from {@link #insertObjects(Model...)},
* {@link #updateObjects(Model...)} and {@link #deleteObjects(Model...)},
* this method does not uses an transaction and could have a bigger
* performance hit.
*
* @param objects
* the objects
* @return the task future. The future returns an Integer with the number of
* affected rows.
*/
AsyncFuture<Integer> saveObjectsAsync(
@SuppressWarnings("unchecked") O... objects);
/**
* Save the instance to the database. If a new database entry was created
* returns true.
@@ -86,7 +119,7 @@ public interface DataAccessObject<O extends Model<?>, I extends ID<?>> extends
* @return the number of affected rows
*/
int save(O object, boolean force);
/**
* Inserts the instance in the database.
*
@@ -104,7 +137,19 @@ public interface DataAccessObject<O extends Model<?>, I extends ID<?>> extends
* @return the number of inserted rows
*/
int insertObjects(@SuppressWarnings("unchecked") O... objects);
/**
* Asynchronously insert several instances in the database using a
* transaction (if possible).
*
* @param objects
* the objects
* @return the task future. The future returns an Integer with the number of
* inserted rows.
*/
AsyncFuture<Integer> insertObjectsAsync(
@SuppressWarnings("unchecked") O... objects);
/**
* Updates the instance in the database.
*
@@ -122,7 +167,19 @@ public interface DataAccessObject<O extends Model<?>, I extends ID<?>> extends
* @return the number of updated rows
*/
int updateObjects(@SuppressWarnings("unchecked") O... objects);
/**
* Asynchronously update several instances in the database using a
* transaction (if possible).
*
* @param objects
* the objects
* @return the task future. The future returns an Integer with the number of
* updated rows.
*/
AsyncFuture<Integer> updateObjectsAsync(
@SuppressWarnings("unchecked") O... objects);
/**
* Deletes the instance in the database.
*
@@ -140,4 +197,16 @@ public interface DataAccessObject<O extends Model<?>, I extends ID<?>> extends
* @return the number of deleted rows
*/
int deleteObjects(@SuppressWarnings("unchecked") O... objects);
/**
* Asynchronously delete several instances in the database using a
* transaction (if possible).
*
* @param objects
* the objects
* @return the task future. The future returns an Integer with the number of
* deleted rows.
*/
AsyncFuture<Integer> deleteObjectsAsync(
@SuppressWarnings("unchecked") O... objects);
}

View File

@@ -45,9 +45,6 @@ public class ArrayUtilsTest extends ArrayUtils {
private static class TestClass {
private String name;
/**
* @param string
*/
public TestClass(String string) {
this.name = string;
}