1
0
mirror of https://github.com/Rogiel/l2jserver2 synced 2025-12-05 23:22:47 +00:00

Implements task oriented ThreadService

This commit is contained in:
2011-12-17 18:25:49 -02:00
parent ccbc29d330
commit cea66c9363
16 changed files with 441 additions and 179 deletions

View File

@@ -0,0 +1,25 @@
/*
* This file is part of l2jserver2 <l2jserver2.com>.
*
* l2jserver2 is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* l2jserver2 is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with l2jserver2. If not, see <http://www.gnu.org/licenses/>.
*/
package com.l2jserver.service.core.threading;
/**
* @author <a href="http://www.rogiel.com">Rogiel</a>
* @param <T>
* the task return type
*/
public abstract class AbstractTask<T> implements Task<T> {
}

View File

@@ -0,0 +1,27 @@
/*
* This file is part of l2jserver2 <l2jserver2.com>.
*
* l2jserver2 is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* l2jserver2 is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with l2jserver2. If not, see <http://www.gnu.org/licenses/>.
*/
package com.l2jserver.service.core.threading;
import java.util.concurrent.Callable;
/**
* @author <a href="http://www.rogiel.com">Rogiel</a>
* @param <T>
* the task return type
*/
public interface Task<T> extends Callable<T> {
}

View File

@@ -16,7 +16,6 @@
*/
package com.l2jserver.service.core.threading;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
/**
@@ -31,18 +30,18 @@ public interface ThreadPool {
*
* @param <T>
* the task return type
* @param callable
* @param task
* the callable instance
* @return the {@link AsyncFuture} notified once the task has completed
*/
<T> AsyncFuture<T> async(Callable<T> callable);
<T> AsyncFuture<T> async(Task<T> task);
/**
* Executes an asynchronous tasks at an scheduled time.
*
* @param <T>
* the task return type
* @param callable
* @param task
* the callable instance
* @param delay
* the initial delay to wait before the task is executed
@@ -50,7 +49,7 @@ public interface ThreadPool {
* the time unit of delay
* @return the {@link AsyncFuture} notified once the task has completed
*/
<T> AsyncFuture<T> async(long delay, TimeUnit unit, Callable<T> callable);
<T> AsyncFuture<T> async(long delay, TimeUnit unit, Task<T> task);
/**
* Executes an asynchronous tasks at an scheduled time.
@@ -73,5 +72,4 @@ public interface ThreadPool {
* execute tasks.
*/
void dispose();
}

View File

@@ -16,7 +16,6 @@
*/
package com.l2jserver.service.core.threading;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import com.l2jserver.service.Service;
@@ -39,7 +38,7 @@ public interface ThreadService extends Service {
* the callable instance
* @return the {@link AsyncFuture} notified once the task has completed
*/
<T> AsyncFuture<T> async(Callable<T> callable);
<T> AsyncFuture<T> async(Task<T> callable);
/**
* Executes an asynchronous tasks at an scheduled time. <b>Please note that
@@ -58,7 +57,7 @@ public interface ThreadService extends Service {
* the time unit of delay
* @return the {@link AsyncFuture} notified once the task has completed
*/
<T> AsyncFuture<T> async(long delay, TimeUnit unit, Callable<T> callable);
<T> AsyncFuture<T> async(long delay, TimeUnit unit, Task<T> callable);
/**
* Executes an asynchronous tasks at an scheduled time. <b>Please note that

View File

@@ -19,7 +19,6 @@ package com.l2jserver.service.core.threading;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.concurrent.Callable;
import java.util.concurrent.Delayed;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
@@ -79,7 +78,7 @@ public class ThreadServiceImpl extends AbstractService implements ThreadService
}
@Override
public <T> AsyncFuture<T> async(Callable<T> callable) {
public <T> AsyncFuture<T> async(Task<T> callable) {
Preconditions.checkNotNull(callable, "callable");
log.debug("Scheduling async task: {}", callable);
@@ -87,8 +86,7 @@ public class ThreadServiceImpl extends AbstractService implements ThreadService
}
@Override
public <T> AsyncFuture<T> async(long delay, TimeUnit unit,
Callable<T> callable) {
public <T> AsyncFuture<T> async(long delay, TimeUnit unit, Task<T> callable) {
Preconditions.checkArgument(delay >= 0, "delay < 0");
Preconditions.checkNotNull(unit, "unit");
Preconditions.checkNotNull(callable, "callable");
@@ -352,14 +350,14 @@ public class ThreadServiceImpl extends AbstractService implements ThreadService
}
@Override
public <T> AsyncFuture<T> async(Callable<T> callable) {
public <T> AsyncFuture<T> async(Task<T> callable) {
log.debug("Task {} submited to {}", callable, name);
return new AsyncFutureImpl<T>(executor.submit(callable));
}
@Override
public <T> AsyncFuture<T> async(long delay, TimeUnit unit,
Callable<T> callable) {
Task<T> callable) {
if (log.isDebugEnabled())
log.debug("Task {} scheduled in {} {} to {}", new Object[] {
callable, delay, unit, name });
@@ -371,8 +369,9 @@ public class ThreadServiceImpl extends AbstractService implements ThreadService
public ScheduledAsyncFuture async(long delay, TimeUnit unit,
long repeat, Runnable task) {
if (log.isDebugEnabled())
log.debug("Task {} scheduled every {} {} to {}", new Object[] {
task, repeat, unit, name });
log.debug(
"Task {} scheduled every {} {} to {}, starting in {}",
new Object[] { task, repeat, unit, name, delay });
return new ScheduledAsyncFutureImpl(executor.scheduleAtFixedRate(
task, delay, repeat, unit));
}

View File

@@ -17,13 +17,14 @@
package com.l2jserver.service.database;
import java.util.Iterator;
import java.util.concurrent.Callable;
import com.google.inject.Inject;
import com.l2jserver.model.Model;
import com.l2jserver.model.id.ID;
import com.l2jserver.service.core.threading.AbstractTask;
import com.l2jserver.service.core.threading.AsyncFuture;
import com.l2jserver.service.core.threading.ThreadService;
import com.l2jserver.service.database.DatabaseService.TransactionExecutor;
/**
* Abstract DAO implementations. Store an instance of {@link DatabaseService}.
@@ -61,7 +62,7 @@ public abstract class AbstractDAO<T extends Model<?>, I extends ID<?>>
@Override
public AsyncFuture<T> selectAsync(final I id) {
return threadService.async(new Callable<T>() {
return threadService.async(new AbstractTask<T>() {
@Override
public T call() throws Exception {
return select(id);
@@ -92,18 +93,23 @@ public abstract class AbstractDAO<T extends Model<?>, I extends ID<?>>
@Override
@SafeVarargs
public final int saveObjects(T... objects) {
int rows = 0;
for (final T object : objects) {
rows += save(object);
}
return rows;
public final int saveObjects(final T... objects) {
return database.transaction(new TransactionExecutor() {
@Override
public int perform() {
int rows = 0;
for (final T object : objects) {
rows += save(object);
}
return rows;
}
});
}
@Override
@SafeVarargs
public final AsyncFuture<Integer> saveObjectsAsync(final T... objects) {
return threadService.async(new Callable<Integer>() {
return threadService.async(new AbstractTask<Integer>() {
@Override
public Integer call() throws Exception {
return saveObjects(objects);
@@ -119,7 +125,7 @@ public abstract class AbstractDAO<T extends Model<?>, I extends ID<?>>
@Override
@SafeVarargs
public final AsyncFuture<Integer> insertObjectsAsync(final T... objects) {
return threadService.async(new Callable<Integer>() {
return threadService.async(new AbstractTask<Integer>() {
@Override
public Integer call() throws Exception {
return insertObjects(objects);
@@ -135,7 +141,7 @@ public abstract class AbstractDAO<T extends Model<?>, I extends ID<?>>
@Override
@SafeVarargs
public final AsyncFuture<Integer> updateObjectsAsync(final T... objects) {
return threadService.async(new Callable<Integer>() {
return threadService.async(new AbstractTask<Integer>() {
@Override
public Integer call() throws Exception {
return updateObjects(objects);
@@ -151,7 +157,7 @@ public abstract class AbstractDAO<T extends Model<?>, I extends ID<?>>
@Override
@SafeVarargs
public final AsyncFuture<Integer> deleteObjectsAsync(final T... objects) {
return threadService.async(new Callable<Integer>() {
return threadService.async(new AbstractTask<Integer>() {
@Override
public Integer call() throws Exception {
return deleteObjects(objects);

View File

@@ -49,6 +49,8 @@ import com.l2jserver.service.cache.CacheService;
import com.l2jserver.service.configuration.ConfigurationService;
import com.l2jserver.service.configuration.ProxyConfigurationService.ConfigurationPropertyKey;
import com.l2jserver.service.configuration.XMLConfigurationService.ConfigurationXPath;
import com.l2jserver.service.core.threading.AbstractTask;
import com.l2jserver.service.core.threading.AsyncFuture;
import com.l2jserver.service.core.threading.ScheduledAsyncFuture;
import com.l2jserver.service.core.threading.ThreadService;
import com.l2jserver.util.ArrayIterator;
@@ -129,6 +131,11 @@ public abstract class AbstractJDBCDatabaseService extends AbstractService
*/
private ScheduledAsyncFuture autoSaveFuture;
/**
* The connection used inside a transaction from multiple DAOs.
*/
private ThreadLocal<Connection> transactionalConnection = new ThreadLocal<>();
/**
* Configuration interface for {@link AbstractJDBCDatabaseService}.
*
@@ -325,6 +332,46 @@ public abstract class AbstractJDBCDatabaseService extends AbstractService
});
}
@Override
public int transaction(TransactionExecutor executor) {
Preconditions.checkNotNull(executor, "executor");
try {
final Connection conn = dataSource.getConnection();
log.debug("Executing transaction {} with {}", executor, conn);
try {
conn.setAutoCommit(false);
transactionalConnection.set(conn);
final int rows = executor.perform();
conn.commit();
return rows;
} catch (Exception e) {
conn.rollback();
throw e;
} finally {
transactionalConnection.remove();
conn.setAutoCommit(true);
conn.close();
}
} catch (DatabaseException e) {
throw e;
} catch (Throwable e) {
throw new DatabaseException(e);
}
}
@Override
public AsyncFuture<Integer> transactionAsync(
final TransactionExecutor executor) {
return threadService.async(new AbstractTask<Integer>() {
@Override
public Integer call() throws Exception {
return transaction(executor);
}
});
}
/**
* Executes an <tt>query</tt> in the database.
*
@@ -333,23 +380,47 @@ public abstract class AbstractJDBCDatabaseService extends AbstractService
* @param query
* the query
* @return an instance of <tt>T</tt>
* @throws DatabaseException
* if any exception occur (can have nested {@link SQLException})
*/
public <T> T query(Query<T> query) {
public <T> T query(Query<T> query) throws DatabaseException {
Preconditions.checkNotNull(query, "query");
try {
final Connection conn = dataSource.getConnection();
boolean inTransaction = false;
Connection conn = transactionalConnection.get();
if (conn == null) {
log.debug(
"Transactional connection for {} is not set, creating new connection",
query);
inTransaction = false;
conn = dataSource.getConnection();
}
log.debug("Executing query {} with {}", query, conn);
try {
return query.query(conn);
} catch (SQLException e) {
log.error("Error executing query", e);
return null;
if (!inTransaction) {
conn.setAutoCommit(false);
}
try {
return query.query(conn);
} finally {
if (!inTransaction) {
conn.commit();
}
}
} catch (Exception e) {
if (!inTransaction) {
conn.rollback();
}
throw e;
} finally {
conn.close();
if (!inTransaction) {
conn.setAutoCommit(true);
conn.close();
}
}
} catch (SQLException e) {
} catch (Throwable e) {
log.error("Could not open database connection", e);
return null;
throw new DatabaseException(e);
}
}
@@ -500,64 +571,52 @@ public abstract class AbstractJDBCDatabaseService extends AbstractService
Preconditions.checkNotNull(conn, "conn");
log.debug("Starting INSERT/UPDATE query execution");
final String queryString = query();
log.debug("Preparing statement for {}", queryString);
final PreparedStatement st = conn.prepareStatement(queryString,
Statement.RETURN_GENERATED_KEYS);
try {
conn.setAutoCommit(false);
int rows = 0;
while (iterator.hasNext()) {
final T object = iterator.next();
final String queryString = query();
log.debug("Parametizing statement {} with {}", st, object);
this.parametize(st, object);
log.debug("Preparing statement for {}", queryString);
final PreparedStatement st = conn.prepareStatement(queryString,
Statement.RETURN_GENERATED_KEYS);
try {
int rows = 0;
while (iterator.hasNext()) {
final T object = iterator.next();
log.debug("Sending query to database for {}", object);
rows += st.executeUpdate();
log.debug("Query inserted or updated {} rows for {}", rows,
object);
log.debug("Parametizing statement {} with {}", st,
object);
this.parametize(st, object);
// update object desire --it has been realized
if (object instanceof Model && rows > 0) {
log.debug("Updating Model ObjectDesire to NONE");
((Model<?>) object).setObjectDesire(ObjectDesire.NONE);
log.debug("Sending query to database for {}", object);
rows += st.executeUpdate();
log.debug("Query inserted or updated {} rows for {}",
rows, object);
// update object desire --it has been realized
if (object instanceof Model && rows > 0) {
log.debug("Updating Model ObjectDesire to NONE");
((Model<?>) object)
.setObjectDesire(ObjectDesire.NONE);
final Mapper<? extends ID<?>> mapper = keyMapper();
if (mapper == null)
continue;
final ResultSet rs = st.getGeneratedKeys();
try {
log.debug(
"Mapping generated keys with {} using {}",
mapper, rs);
while (rs.next()) {
final ID<?> generatedID = mapper.map(rs);
log.debug("Generated ID for {} is {}",
object, generatedID);
((Model<ID<?>>) object).setID(generatedID);
mapper.map(rs);
}
} finally {
rs.close();
final Mapper<? extends ID<?>> mapper = keyMapper();
if (mapper == null)
continue;
final ResultSet rs = st.getGeneratedKeys();
try {
log.debug(
"Mapping generated keys with {} using {}",
mapper, rs);
while (rs.next()) {
final ID<?> generatedID = mapper.map(rs);
log.debug("Generated ID for {} is {}", object,
generatedID);
((Model<ID<?>>) object).setID(generatedID);
mapper.map(rs);
}
} finally {
rs.close();
}
}
conn.commit();
return rows;
} finally {
st.close();
}
} catch (SQLException e) {
conn.rollback();
throw e;
return rows;
} finally {
conn.setAutoCommit(true);
st.close();
}
}
@@ -650,43 +709,32 @@ public abstract class AbstractJDBCDatabaseService extends AbstractService
Preconditions.checkNotNull(conn, "conn");
log.debug("Starting DELETE query execution");
final String queryString = query();
log.debug("Preparing statement for {}", queryString);
final PreparedStatement st = conn.prepareStatement(queryString);
try {
conn.setAutoCommit(false);
int rows = 0;
while (iterator.hasNext()) {
final T object = iterator.next();
final String queryString = query();
log.debug("Parametizing statement {} with {}", st, object);
this.parametize(st, object);
log.debug("Preparing statement for {}", queryString);
final PreparedStatement st = conn.prepareStatement(queryString);
log.debug("Sending query to database for {}", object);
rows = st.executeUpdate();
log.debug("Query deleted {} rows for {}", rows, object);
try {
int rows = 0;
while (iterator.hasNext()) {
final T object = iterator.next();
log.debug("Parametizing statement {} with {}", st,
object);
this.parametize(st, object);
log.debug("Sending query to database for {}", object);
rows = st.executeUpdate();
log.debug("Query deleted {} rows for {}", rows, object);
dispose(object);
if (object instanceof Model) {
database.removeCache(((Model<?>) object)
.getObjectDesire());
}
dispose(object);
if (object instanceof Model) {
database.removeCache(((Model<?>) object)
.getObjectDesire());
}
conn.commit();
return rows;
} finally {
st.close();
}
} catch (SQLException e) {
conn.rollback();
throw e;
return rows;
} finally {
conn.setAutoCommit(true);
st.close();
}
}

View File

@@ -39,6 +39,8 @@ import com.l2jserver.service.cache.CacheService;
import com.l2jserver.service.configuration.ConfigurationService;
import com.l2jserver.service.configuration.ProxyConfigurationService.ConfigurationPropertyKey;
import com.l2jserver.service.configuration.XMLConfigurationService.ConfigurationXPath;
import com.l2jserver.service.core.threading.AbstractTask;
import com.l2jserver.service.core.threading.AsyncFuture;
import com.l2jserver.service.core.threading.ScheduledAsyncFuture;
import com.l2jserver.service.core.threading.ThreadService;
import com.l2jserver.util.ArrayIterator;
@@ -221,6 +223,22 @@ public abstract class AbstractOrientDatabaseService extends AbstractService
});
}
@Override
public int transaction(TransactionExecutor executor) {
return executor.perform();
}
@Override
public AsyncFuture<Integer> transactionAsync(
final TransactionExecutor executor) {
return threadService.async(new AbstractTask<Integer>() {
@Override
public Integer call() throws Exception {
return transaction(executor);
}
});
}
/**
* Executes an <tt>query</tt> in the database.
*

View File

@@ -89,13 +89,8 @@ public interface DataAccessObject<O extends Model<?>, I extends ID<?>> extends
int save(O object);
/**
* Save several instances to the database. This method will only save if the
* object has changed.
* <p>
* Note that differently from {@link #insertObjects(Model...)},
* {@link #updateObjects(Model...)} and {@link #deleteObjects(Model...)},
* this method does not uses an transaction and could have a bigger
* performance hit.
* Save several instances to the database using a transaction (if possible).
* This method will only save if the object has changed.
*
* @param objects
* the objects
@@ -104,13 +99,8 @@ public interface DataAccessObject<O extends Model<?>, I extends ID<?>> extends
int saveObjects(@SuppressWarnings("unchecked") O... objects);
/**
* Asynchronously save several instances to the database. This method will
* only save if the object has changed.
* <p>
* Note that differently from {@link #insertObjects(Model...)},
* {@link #updateObjects(Model...)} and {@link #deleteObjects(Model...)},
* this method does not uses an transaction and could have a bigger
* performance hit.
* Asynchronously save several instances to the database using a transaction
* (if possible). This method will only save if the object has changed.
*
* @param objects
* the objects

View File

@@ -0,0 +1,53 @@
/*
* This file is part of l2jserver2 <l2jserver2.com>.
*
* l2jserver2 is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* l2jserver2 is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with l2jserver2. If not, see <http://www.gnu.org/licenses/>.
*/
package com.l2jserver.service.database;
/**
* @author <a href="http://www.rogiel.com">Rogiel</a>
*/
public class DatabaseException extends RuntimeException {
private static final long serialVersionUID = 1L;
public DatabaseException() {
super();
}
/**
* @param message
* the message
* @param cause
* the root cause
*/
public DatabaseException(String message, Throwable cause) {
}
/**
* @param message
* the message
*/
public DatabaseException(String message) {
super(message);
}
/**
* @param cause
* the root cause
*/
public DatabaseException(Throwable cause) {
super(cause);
}
}

View File

@@ -20,6 +20,7 @@ import com.l2jserver.service.Service;
import com.l2jserver.service.ServiceConfiguration;
import com.l2jserver.service.configuration.Configuration;
import com.l2jserver.service.configuration.ProxyConfigurationService.ConfigurationName;
import com.l2jserver.service.core.threading.AsyncFuture;
/**
* This service provides access to an database implementation. Each
@@ -46,4 +47,57 @@ public interface DatabaseService extends Service {
@ConfigurationName("database")
public interface DatabaseConfiguration extends ServiceConfiguration {
}
/**
* Executes several operations inside a single database transaction.
* <p>
* Queries inside a transaction applies to an <i>all-or-none</i> model. If
* any of the queries executed fails, none of them will be persisted to the
* database and no changes will be performed. Transactions are useful in
* maintaining data consistency.
* <p>
* <b>Important</b>: You should <b>never</b> call any async
* {@link DataAccessObject} within a transaction. Doing so, will make it be
* executed in <b>another transaction</b> and might even cause data
* corruption due to queries being executed in different transactions.
* <p>
* If you wish to execute an transaction asynchronously, see
* {@link #transactionAsync(TransactionExecutor)}.
*
* @param executor
* the executor implementation (normally an anonymous class)
* @return the number of affected rows
* @see DatabaseService#transactionAsync(TransactionExecutor)
*/
int transaction(TransactionExecutor executor);
/**
* Asynchronously executes several operations inside a single database
* transaction.
* <p>
* Queries inside a transaction applies to an<i>all-or-none</i> model. If
* any of the queries executed fails, none of them will be persisted to the
* database and no changes will be performed. Transactions are useful in
* maintaining data consistency.
*
* @param executor
* the executor implementation (normally an anonymous class)
* @return the number of affected rows
*/
AsyncFuture<Integer> transactionAsync(TransactionExecutor executor);
/**
* This class executes DAO operations inside an transaction. It is
* recommended to implement it in an anonymous class.
*
* @author <a href="http://www.rogiel.com">Rogiel</a>
*/
public interface TransactionExecutor {
/**
* Perform operations inside the transaction.
*
* @return the number of affected rows
*/
int perform();
}
}