From dea1196859f3793fc70f12b7c0a60112350d0ea8 Mon Sep 17 00:00:00 2001 From: Rogiel Date: Fri, 16 Dec 2011 12:30:18 -0200 Subject: [PATCH] Adds support for asynchronous database operations --- .../service/core/threading/AsyncFuture.java | 17 ++++ .../service/core/threading/AsyncListener.java | 34 ++++++++ .../core/threading/ThreadServiceImpl.java | 75 +++++++++++++++++- .../service/database/AbstractDAO.java | 63 +++++++++++++++ .../service/database/DataAccessObject.java | 79 +++++++++++++++++-- .../com/l2jserver/util/ArrayUtilsTest.java | 3 - .../distribution/sql/sql/character.sql | 2 +- .../template/character/CharacterClass.java | 2 +- .../game/chat/DatabaseChatLoggingService.java | 2 +- .../game/world/filter/WorldFilters.java | 2 +- .../filter/impl/KnownListUpdateFilter.java | 7 +- 11 files changed, 269 insertions(+), 17 deletions(-) create mode 100644 l2jserver2-common/src/main/java/com/l2jserver/service/core/threading/AsyncListener.java diff --git a/l2jserver2-common/src/main/java/com/l2jserver/service/core/threading/AsyncFuture.java b/l2jserver2-common/src/main/java/com/l2jserver/service/core/threading/AsyncFuture.java index 4ce629b15..0ed2edea4 100644 --- a/l2jserver2-common/src/main/java/com/l2jserver/service/core/threading/AsyncFuture.java +++ b/l2jserver2-common/src/main/java/com/l2jserver/service/core/threading/AsyncFuture.java @@ -73,4 +73,21 @@ public interface AsyncFuture extends Future { * note that false will be returned if the timeout has expired too! */ boolean awaitUninterruptibly(long timeout, TimeUnit unit); + + /** + * Adds an listener that will be notified once the executing has been + * completed. + * + * @param listener + * the listener to be added + */ + void addListener(AsyncListener listener); + + /** + * Removes an listener + * + * @param listener + * the listener to be removed + */ + void removeListener(AsyncListener listener); } diff --git a/l2jserver2-common/src/main/java/com/l2jserver/service/core/threading/AsyncListener.java b/l2jserver2-common/src/main/java/com/l2jserver/service/core/threading/AsyncListener.java new file mode 100644 index 000000000..7135d1809 --- /dev/null +++ b/l2jserver2-common/src/main/java/com/l2jserver/service/core/threading/AsyncListener.java @@ -0,0 +1,34 @@ +/* + * This file is part of l2jserver2 . + * + * l2jserver2 is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * l2jserver2 is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with l2jserver2. If not, see . + */ +package com.l2jserver.service.core.threading; + +/** + * @author Rogiel + * @param + * the return type of the {@link AsyncFuture} + */ +public interface AsyncListener { + /** + * Called once the {@link AsyncFuture} has finished executing the task + * + * @param future + * the future + * @param object + * the object returned. If any. + */ + void onComplete(AsyncFuture future, T object); +} diff --git a/l2jserver2-common/src/main/java/com/l2jserver/service/core/threading/ThreadServiceImpl.java b/l2jserver2-common/src/main/java/com/l2jserver/service/core/threading/ThreadServiceImpl.java index 67b99c56f..64f9bc0a0 100644 --- a/l2jserver2-common/src/main/java/com/l2jserver/service/core/threading/ThreadServiceImpl.java +++ b/l2jserver2-common/src/main/java/com/l2jserver/service/core/threading/ThreadServiceImpl.java @@ -16,6 +16,9 @@ */ package com.l2jserver.service.core.threading; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; import java.util.concurrent.Callable; import java.util.concurrent.Delayed; import java.util.concurrent.ExecutionException; @@ -34,6 +37,7 @@ import com.google.common.base.Preconditions; import com.l2jserver.service.AbstractService; import com.l2jserver.service.ServiceStartException; import com.l2jserver.service.ServiceStopException; +import com.l2jserver.util.factory.CollectionFactory; /** * The default implementation for {@link ThreadService} @@ -51,9 +55,25 @@ public class ThreadServiceImpl extends AbstractService implements ThreadService */ private ThreadPool pool; + /** + * The list of active thread pools + */ + private Map threadPools; + @Override protected void doStart() throws ServiceStartException { pool = createThreadPool("shared", 20); + threadPools = CollectionFactory.newMap(); + + pool.async(50, TimeUnit.MILLISECONDS, 50, new Runnable() { + @Override + public void run() { + for (final Entry entry : threadPools + .entrySet()) { + entry.getValue().notifyListeners(); + } + } + }); // scheduler = Executors.newScheduledThreadPool(10); // async = Executors.newCachedThreadPool(); } @@ -96,15 +116,19 @@ public class ThreadServiceImpl extends AbstractService implements ThreadService public ThreadPool createThreadPool(String name, int maxThreads) { log.debug("Creating new ThreadPool {} with maximum of {} threads", name, maxThreads); - return new ThreadPoolImpl(name, + final ThreadPoolImpl pool = new ThreadPoolImpl(name, Executors.newScheduledThreadPool(maxThreads)); + threadPools.put(name, pool); + return pool; } @Override public void dispose(ThreadPool pool) { log.debug("Disposing ThreadPool {}", pool); - if (pool instanceof ThreadPoolImpl) + if (pool instanceof ThreadPoolImpl) { ((ThreadPoolImpl) pool).executor.shutdown(); + threadPools.remove(((ThreadPoolImpl) pool).name); + } throw new UnsupportedOperationException( "The given ThreadPool is not supported by this service"); } @@ -113,6 +137,7 @@ public class ThreadServiceImpl extends AbstractService implements ThreadService protected void doStop() throws ServiceStopException { dispose(pool); pool = null; + threadPools = null; } /** @@ -127,6 +152,10 @@ public class ThreadServiceImpl extends AbstractService implements ThreadService * The future that is delegated in this implementation */ private final Future future; + /** + * List of all active listeners + */ + private List> listeners = CollectionFactory.newList(); /** * Creates a new instance @@ -206,6 +235,31 @@ public class ThreadServiceImpl extends AbstractService implements ThreadService return false; } } + + @Override + public void addListener(AsyncListener listener) { + listeners.add(listener); + } + + @Override + public void removeListener(AsyncListener listener) { + listeners.remove(listener); + } + + /** + * Notify all listeners that the task has been completed + */ + private void notifyListeners() { + for (final AsyncListener listener : listeners) { + T object = null; + try { + object = this.get(0, TimeUnit.MILLISECONDS); + } catch (InterruptedException | ExecutionException + | TimeoutException e) { + } + listener.onComplete(this, object); + } + } } /** @@ -280,6 +334,11 @@ public class ThreadServiceImpl extends AbstractService implements ThreadService * The backing executor */ private final ScheduledExecutorService executor; + /** + * The list of active and pending futures + */ + private final List> activeFutures = CollectionFactory + .newList(); /** * @param name @@ -322,5 +381,17 @@ public class ThreadServiceImpl extends AbstractService implements ThreadService public void dispose() { ThreadServiceImpl.this.dispose(this); } + + /** + * Notify all future listeners when the task is complete. + */ + private void notifyListeners() { + for (final AsyncFutureImpl future : activeFutures) { + if (future.isDone()) { + future.notifyListeners(); + activeFutures.remove(future); + } + } + } } } diff --git a/l2jserver2-common/src/main/java/com/l2jserver/service/database/AbstractDAO.java b/l2jserver2-common/src/main/java/com/l2jserver/service/database/AbstractDAO.java index 8e9ba967a..ba6aa9751 100644 --- a/l2jserver2-common/src/main/java/com/l2jserver/service/database/AbstractDAO.java +++ b/l2jserver2-common/src/main/java/com/l2jserver/service/database/AbstractDAO.java @@ -17,10 +17,13 @@ package com.l2jserver.service.database; import java.util.Iterator; +import java.util.concurrent.Callable; import com.google.inject.Inject; import com.l2jserver.model.Model; import com.l2jserver.model.id.ID; +import com.l2jserver.service.core.threading.AsyncFuture; +import com.l2jserver.service.core.threading.ThreadService; /** * Abstract DAO implementations. Store an instance of {@link DatabaseService}. @@ -41,6 +44,12 @@ public abstract class AbstractDAO, I extends ID> */ protected final DatabaseService database; + @Inject + /** + * The ThreadService used to execute operations asynchronously. + */ + protected ThreadService threadService; + /** * @param database * the database service @@ -71,21 +80,75 @@ public abstract class AbstractDAO, I extends ID> } } + @Override + @SafeVarargs + public final int saveObjects(T... objects) { + int rows = 0; + for (final T object : objects) { + rows += save(object); + } + return rows; + } + + @Override + @SafeVarargs + public final AsyncFuture saveObjectsAsync(final T... objects) { + return threadService.async(new Callable() { + @Override + public Integer call() throws Exception { + return saveObjects(objects); + } + }); + } + @Override public void insert(T object) { insertObjects(wrap(object)); } + @Override + @SafeVarargs + public final AsyncFuture insertObjectsAsync(final T... objects) { + return threadService.async(new Callable() { + @Override + public Integer call() throws Exception { + return insertObjects(objects); + } + }); + } + @Override public void update(T object) { updateObjects(wrap(object)); } + @Override + @SafeVarargs + public final AsyncFuture updateObjectsAsync(final T... objects) { + return threadService.async(new Callable() { + @Override + public Integer call() throws Exception { + return updateObjects(objects); + } + }); + } + @Override public void delete(T object) { deleteObjects(wrap(object)); } + @Override + @SafeVarargs + public final AsyncFuture deleteObjectsAsync(final T... objects) { + return threadService.async(new Callable() { + @Override + public Integer call() throws Exception { + return deleteObjects(objects); + } + }); + } + @Override public Iterator iterator() { return new Iterator() { diff --git a/l2jserver2-common/src/main/java/com/l2jserver/service/database/DataAccessObject.java b/l2jserver2-common/src/main/java/com/l2jserver/service/database/DataAccessObject.java index 6bf6655a0..3cee4013c 100644 --- a/l2jserver2-common/src/main/java/com/l2jserver/service/database/DataAccessObject.java +++ b/l2jserver2-common/src/main/java/com/l2jserver/service/database/DataAccessObject.java @@ -21,6 +21,7 @@ import java.util.Iterator; import com.l2jserver.model.Model; import com.l2jserver.model.id.ID; +import com.l2jserver.service.core.threading.AsyncFuture; /** * The Data Access Object interface used used to retrieve, save and remove @@ -65,8 +66,8 @@ public interface DataAccessObject, I extends ID> extends Collection selectIDs(); /** - * Save the instance to the database. If a new database entry was created - * returns true. This method will only save if the object has changed. + * Save the instance to the database. This method will only save if the + * object has changed. * * @param object * the object @@ -75,6 +76,38 @@ public interface DataAccessObject, I extends ID> extends */ int save(O object); + /** + * Save several instances to the database. This method will only save if the + * object has changed. + *

+ * Note that differently from {@link #insertObjects(Model...)}, + * {@link #updateObjects(Model...)} and {@link #deleteObjects(Model...)}, + * this method does not uses an transaction and could have a bigger + * performance hit. + * + * @param objects + * the objects + * @return the number of affected rows + */ + int saveObjects(@SuppressWarnings("unchecked") O... objects); + + /** + * Asynchronously save several instances to the database. This method will + * only save if the object has changed. + *

+ * Note that differently from {@link #insertObjects(Model...)}, + * {@link #updateObjects(Model...)} and {@link #deleteObjects(Model...)}, + * this method does not uses an transaction and could have a bigger + * performance hit. + * + * @param objects + * the objects + * @return the task future. The future returns an Integer with the number of + * affected rows. + */ + AsyncFuture saveObjectsAsync( + @SuppressWarnings("unchecked") O... objects); + /** * Save the instance to the database. If a new database entry was created * returns true. @@ -86,7 +119,7 @@ public interface DataAccessObject, I extends ID> extends * @return the number of affected rows */ int save(O object, boolean force); - + /** * Inserts the instance in the database. * @@ -104,7 +137,19 @@ public interface DataAccessObject, I extends ID> extends * @return the number of inserted rows */ int insertObjects(@SuppressWarnings("unchecked") O... objects); - + + /** + * Asynchronously insert several instances in the database using a + * transaction (if possible). + * + * @param objects + * the objects + * @return the task future. The future returns an Integer with the number of + * inserted rows. + */ + AsyncFuture insertObjectsAsync( + @SuppressWarnings("unchecked") O... objects); + /** * Updates the instance in the database. * @@ -122,7 +167,19 @@ public interface DataAccessObject, I extends ID> extends * @return the number of updated rows */ int updateObjects(@SuppressWarnings("unchecked") O... objects); - + + /** + * Asynchronously update several instances in the database using a + * transaction (if possible). + * + * @param objects + * the objects + * @return the task future. The future returns an Integer with the number of + * updated rows. + */ + AsyncFuture updateObjectsAsync( + @SuppressWarnings("unchecked") O... objects); + /** * Deletes the instance in the database. * @@ -140,4 +197,16 @@ public interface DataAccessObject, I extends ID> extends * @return the number of deleted rows */ int deleteObjects(@SuppressWarnings("unchecked") O... objects); + + /** + * Asynchronously delete several instances in the database using a + * transaction (if possible). + * + * @param objects + * the objects + * @return the task future. The future returns an Integer with the number of + * deleted rows. + */ + AsyncFuture deleteObjectsAsync( + @SuppressWarnings("unchecked") O... objects); } diff --git a/l2jserver2-common/src/test/java/com/l2jserver/util/ArrayUtilsTest.java b/l2jserver2-common/src/test/java/com/l2jserver/util/ArrayUtilsTest.java index 8c2fdbc65..e4e863ec9 100644 --- a/l2jserver2-common/src/test/java/com/l2jserver/util/ArrayUtilsTest.java +++ b/l2jserver2-common/src/test/java/com/l2jserver/util/ArrayUtilsTest.java @@ -45,9 +45,6 @@ public class ArrayUtilsTest extends ArrayUtils { private static class TestClass { private String name; - /** - * @param string - */ public TestClass(String string) { this.name = string; } diff --git a/l2jserver2-gameserver/distribution/sql/sql/character.sql b/l2jserver2-gameserver/distribution/sql/sql/character.sql index 5298c7448..6454f1328 100644 --- a/l2jserver2-gameserver/distribution/sql/sql/character.sql +++ b/l2jserver2-gameserver/distribution/sql/sql/character.sql @@ -4,7 +4,7 @@ CREATE TABLE IF NOT EXISTS `character` ( `clan_id` int(10) DEFAULT NULL, `name` varchar(50) NOT NULL, `race` enum('HUMAN','ELF','DARK_ELF','ORC','DWARF','KAMAEL') NOT NULL, - `class` enum('HUMAN_FIGHTER','WARRIOR','GLADIATOR','WARLORD','KNIGHT','PALADIN','DARK_AVENGER','ROGUE','TREASURE_HUNTER','HAWKEYE','DUELIST','DREADNOUGHT','phoenixKnight','hellKnight','sagittarius','adventurer','HUMAN_MYSTIC','WIZARD','SORCEROR','NECROMANCER','WARLOCK','CLERIC','BISHOP','PROPHET','ARCHMAGE','SOULTAKER','ARCANA_LORD','CARDINAL','HIEROPHANT','ELVEN_FIGHTER','ELVEN_KNIGHT','TEMPLE_KNIGHT','SWORD_SINGER','ELVEN_SCOUT','PLAINS_WALKER','SILVER_RANGER','EVA_TEMPLAR','SWORD_MUSE','WIND_RIDER','MOONLIGHT_SENTINEL','ELVEN_MYSTIC','ELVEN_WIZARD','SPELLSINGER','ELEMENTAL_SUMMONER','ORACLE','ELDER','MYSTIC_MUSE','ELEMENTAL_MASTER','EVA_SAINT','DARK_FIGHTER','PALUS_KNIGHT','SHILLIEN_KNIGHT','BLADEDANCER','ASSASSIN','ABYSS_WALKER','PHANTOM_RANGER','SHILLIEN_TEMPLAR','spectralDancer','ghostHunter','ghostSentinel','DARK_MYSTIC','DARK_WIZARD','SPELLHOWLER','PHANTOM_SUMMONER','SHILLIEN_ORACLE','SHILLIEN_ELDER','STORM_SCREAMER','SPECTRAL_MASTER','SHILLIEAN_SAINT','ORC_FIGHTER','ORC_RAIDER','DESTROYER','ORC_MONK','TYRANT','TITAN','GRAND_KHAUATARI','ORC_MYSTIC','ORC_SHAMAN','OVERLORD','WARCRYER','DOMINATOR','DOOMCRYER','DWARVEN_FIGHTER','SCAVENGER','BOUNTY_HUNTER','ARTISAN','WARSMITH','FORTUNE_SEEKER','MAESTRO','MALE_SOLDIER','TROOPER','BERSEKER','MALE_SOULBREAKER','DOOMBRINGER','MALE_SOULDHOUND','FEMALE_SOLDIER','WARDER','FEMALE_SOULBREAKER','ARBALESTER','FEMALE_SOULDHOUND','TRICKSTER','INSPECTOR','JUDICATOR') NOT NULL DEFAULT 'HUMAN_FIGHTER', + `class` enum('HUMAN_FIGHTER','WARRIOR','GLADIATOR','WARLORD','KNIGHT','PALADIN','DARK_AVENGER','ROGUE','TREASURE_HUNTER','HAWKEYE','DUELIST','DREADNOUGHT','PHOENIX_KNIGHT','HELL_KNIGHT','SAGITTARIUS','ADVENTURER','HUMAN_MYSTIC','WIZARD','SORCEROR','NECROMANCER','WARLOCK','CLERIC','BISHOP','PROPHET','ARCHMAGE','SOULTAKER','ARCANA_LORD','CARDINAL','HIEROPHANT','ELVEN_FIGHTER','ELVEN_KNIGHT','TEMPLE_KNIGHT','SWORD_SINGER','ELVEN_SCOUT','PLAINS_WALKER','SILVER_RANGER','EVA_TEMPLAR','SWORD_MUSE','WIND_RIDER','MOONLIGHT_SENTINEL','ELVEN_MYSTIC','ELVEN_WIZARD','SPELLSINGER','ELEMENTAL_SUMMONER','ORACLE','ELDER','MYSTIC_MUSE','ELEMENTAL_MASTER','EVA_SAINT','DARK_FIGHTER','PALUS_KNIGHT','SHILLIEN_KNIGHT','BLADEDANCER','ASSASSIN','ABYSS_WALKER','PHANTOM_RANGER','SHILLIEN_TEMPLAR','SPECTRAL_DANCER','GHOST_HUNTER','GHOST_SENTINEL','DARK_MYSTIC','DARK_WIZARD','SPELLHOWLER','PHANTOM_SUMMONER','SHILLIEN_ORACLE','SHILLIEN_ELDER','STORM_SCREAMER','SPECTRAL_MASTER','SHILLIEAN_SAINT','ORC_FIGHTER','ORC_RAIDER','DESTROYER','ORC_MONK','TYRANT','TITAN','GRAND_KHAUATARI','ORC_MYSTIC','ORC_SHAMAN','OVERLORD','WARCRYER','DOMINATOR','DOOMCRYER','DWARVEN_FIGHTER','SCAVENGER','BOUNTY_HUNTER','ARTISAN','WARSMITH','FORTUNE_SEEKER','MAESTRO','MALE_SOLDIER','TROOPER','BERSEKER','MALE_SOULBREAKER','DOOMBRINGER','MALE_SOULDHOUND','FEMALE_SOLDIER','WARDER','FEMALE_SOULBREAKER','ARBALESTER','FEMALE_SOULDHOUND','TRICKSTER','INSPECTOR','JUDICATOR') NOT NULL DEFAULT 'HUMAN_FIGHTER', `sex` enum('MALE','FEMALE') NOT NULL, `level` int(3) NOT NULL, `experience` int(15) NOT NULL, diff --git a/l2jserver2-gameserver/src/main/java/com/l2jserver/model/template/character/CharacterClass.java b/l2jserver2-gameserver/src/main/java/com/l2jserver/model/template/character/CharacterClass.java index 04bbae610..188a773ed 100644 --- a/l2jserver2-gameserver/src/main/java/com/l2jserver/model/template/character/CharacterClass.java +++ b/l2jserver2-gameserver/src/main/java/com/l2jserver/model/template/character/CharacterClass.java @@ -80,7 +80,7 @@ public enum CharacterClass { DARK_FIGHTER(0x1f, FIGHTER, DARK_ELF), PALUS_KNIGHT(0x20, DARK_FIGHTER), SHILLIEN_KNIGHT( 0x21, PALUS_KNIGHT), BLADEDANCER(0x22, PALUS_KNIGHT), ASSASSIN( 0x23, DARK_FIGHTER), ABYSS_WALKER(0x24, ASSASSIN), PHANTOM_RANGER( - 0x25, ASSASSIN), SHILLIEN_TEMPLAR(0x6a, SHILLIEN_KNIGHT), spectralDancer( + 0x25, ASSASSIN), SHILLIEN_TEMPLAR(0x6a, SHILLIEN_KNIGHT), SPECTRAL_DANCER( 0x6b, BLADEDANCER), GHOST_HUNTER(0x6c, ABYSS_WALKER), GHOST_SENTINEL( 0x6d, PHANTOM_RANGER), diff --git a/l2jserver2-gameserver/src/main/java/com/l2jserver/service/game/chat/DatabaseChatLoggingService.java b/l2jserver2-gameserver/src/main/java/com/l2jserver/service/game/chat/DatabaseChatLoggingService.java index 02c6dba9e..b86880018 100644 --- a/l2jserver2-gameserver/src/main/java/com/l2jserver/service/game/chat/DatabaseChatLoggingService.java +++ b/l2jserver2-gameserver/src/main/java/com/l2jserver/service/game/chat/DatabaseChatLoggingService.java @@ -80,7 +80,7 @@ public class DatabaseChatLoggingService extends AbstractService implements message.setMessage(messageText); // save in database - chatMessageDao.save(message, true); + chatMessageDao.insert(message); return message; } diff --git a/l2jserver2-gameserver/src/main/java/com/l2jserver/service/game/world/filter/WorldFilters.java b/l2jserver2-gameserver/src/main/java/com/l2jserver/service/game/world/filter/WorldFilters.java index 95732525d..fae217d6d 100644 --- a/l2jserver2-gameserver/src/main/java/com/l2jserver/service/game/world/filter/WorldFilters.java +++ b/l2jserver2-gameserver/src/main/java/com/l2jserver/service/game/world/filter/WorldFilters.java @@ -55,7 +55,7 @@ public final class WorldFilters { } /** - * Performs an NOTA operation + * Performs an NOT operation * * @param * the object type diff --git a/l2jserver2-gameserver/src/main/java/com/l2jserver/service/game/world/filter/impl/KnownListUpdateFilter.java b/l2jserver2-gameserver/src/main/java/com/l2jserver/service/game/world/filter/impl/KnownListUpdateFilter.java index b8ce280e8..afaaef4d8 100644 --- a/l2jserver2-gameserver/src/main/java/com/l2jserver/service/game/world/filter/impl/KnownListUpdateFilter.java +++ b/l2jserver2-gameserver/src/main/java/com/l2jserver/service/game/world/filter/impl/KnownListUpdateFilter.java @@ -16,10 +16,11 @@ */ package com.l2jserver.service.game.world.filter.impl; +import static com.l2jserver.service.game.world.filter.WorldFilters.not; + import com.l2jserver.model.world.PositionableObject; import com.l2jserver.model.world.WorldObject; import com.l2jserver.service.game.world.filter.AndFilter; -import com.l2jserver.service.game.world.filter.NotFilter; import com.l2jserver.util.geometry.Point3D; /** @@ -42,7 +43,7 @@ public class KnownListUpdateFilter extends AndFilter { * the old position */ public KnownListUpdateFilter(PositionableObject object, Point3D old) { - super(new KnownListFilter(object), new NotFilter( - new RangePointFilter(old, KnownListFilter.KNOWNLIST_RANGE))); + super(new KnownListFilter(object), not(new RangePointFilter(old, + KnownListFilter.KNOWNLIST_RANGE))); } }