Commit 4a27d1b9 authored by Spencer Chang's avatar Spencer Chang

[fix] 线程池-执行上下文处理修改实现

parent dffcc5c8
...@@ -21,7 +21,7 @@ import com.hand.hls.dp.util.SqlCheckUtils; ...@@ -21,7 +21,7 @@ import com.hand.hls.dp.util.SqlCheckUtils;
import com.hand.hls.exception.HlsCusException; import com.hand.hls.exception.HlsCusException;
import hls.core.sys.mapper.SysCodeValueMapper; import hls.core.sys.mapper.SysCodeValueMapper;
import leaf.plugin.export.components.ExcelExportUtil; import leaf.plugin.export.components.ExcelExportUtil;
import leaf.util.ConfigUtils; import leaf.utils.ConfigUtils;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.ibatis.session.SqlSession; import org.apache.ibatis.session.SqlSession;
import org.apache.ibatis.session.SqlSessionFactory; import org.apache.ibatis.session.SqlSessionFactory;
...@@ -50,8 +50,7 @@ import java.util.Map; ...@@ -50,8 +50,7 @@ import java.util.Map;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService; import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
/** /**
...@@ -912,7 +911,7 @@ public class SysDpExecuteHistoryServiceImpl extends BaseServiceImpl<SysDpExecute ...@@ -912,7 +911,7 @@ public class SysDpExecuteHistoryServiceImpl extends BaseServiceImpl<SysDpExecute
public void insertSysDpExecuteHistory(IRequest requestContext, SysDpExecuteHistory sysDpExecuteHistory) { public void insertSysDpExecuteHistory(IRequest requestContext, SysDpExecuteHistory sysDpExecuteHistory) {
self().insertSelective(requestContext, sysDpExecuteHistory); self().insertSelective(requestContext, sysDpExecuteHistory);
} }
private static final TransmittableThreadLocal<ExecSqlContext> context = new TransmittableThreadLocal<>();
@Override @Override
public ResponseData execute(IRequest requestContext, String ip, String dpContext) throws Exception { public ResponseData execute(IRequest requestContext, String ip, String dpContext) throws Exception {
ResponseData responseData = new ResponseData(true); ResponseData responseData = new ResponseData(true);
...@@ -925,10 +924,14 @@ public class SysDpExecuteHistoryServiceImpl extends BaseServiceImpl<SysDpExecute ...@@ -925,10 +924,14 @@ public class SysDpExecuteHistoryServiceImpl extends BaseServiceImpl<SysDpExecute
throw new HlsCusException("没有可执行语句!"); throw new HlsCusException("没有可执行语句!");
} }
ConcurrentHashMap<Integer, String> retMsg = new ConcurrentHashMap<>(length,0.75f); ConcurrentHashMap<Integer, String> retMsg = new ConcurrentHashMap<>(length,0.75f);
List<String> result = new ArrayList<>(retMsg.size()); List<String> result = new ArrayList<>(length);
CountDownLatch downLatch = new CountDownLatch(length); CountDownLatch downLatch = new CountDownLatch(length);
ExecutorService service = Executors.newFixedThreadPool(10); Executor ttlTaskExecutor = TtlExecutors.getTtlExecutor(taskExecutor);
ExecutorService executorService = TtlExecutors.getTtlExecutorService(service); ExecSqlContext execSqlContext = new ExecSqlContext();
execSqlContext.setService(self());
execSqlContext.setRequestContext(requestContext);
execSqlContext.setIp(ip);
context.set(execSqlContext);
for (int i = 1; i <= length; i++) { for (int i = 1; i <= length; i++) {
String undo = undos[i - 1]; String undo = undos[i - 1];
if (StringUtils.isBlank(undo)) { if (StringUtils.isBlank(undo)) {
...@@ -952,12 +955,8 @@ public class SysDpExecuteHistoryServiceImpl extends BaseServiceImpl<SysDpExecute ...@@ -952,12 +955,8 @@ public class SysDpExecuteHistoryServiceImpl extends BaseServiceImpl<SysDpExecute
if (StringUtils.isNotEmpty(msg.toString())) { if (StringUtils.isNotEmpty(msg.toString())) {
throw new HlsCusException(msg.toString()); throw new HlsCusException(msg.toString());
} }
ExecSqlContext subExecSqlContext = new ExecSqlContext(); execSqlContext.setSeq(i);
subExecSqlContext.setService(self()); execSqlContext.setSql(formatSql);
subExecSqlContext.setRequestContext(requestContext);
subExecSqlContext.setIp(ip);
subExecSqlContext.setSeq(i);
subExecSqlContext.setSql(formatSql);
// 判断语句开始关键字 // 判断语句开始关键字
if (StringUtils.startsWithIgnoreCase(formatSql, SqlConstantUtils.SQL_EXEC_SELECT)) { if (StringUtils.startsWithIgnoreCase(formatSql, SqlConstantUtils.SQL_EXEC_SELECT)) {
throw new HlsCusException("执行按钮不支持SELECT查询语句,请使用查询按钮"); throw new HlsCusException("执行按钮不支持SELECT查询语句,请使用查询按钮");
...@@ -967,30 +966,24 @@ public class SysDpExecuteHistoryServiceImpl extends BaseServiceImpl<SysDpExecute ...@@ -967,30 +966,24 @@ public class SysDpExecuteHistoryServiceImpl extends BaseServiceImpl<SysDpExecute
if (StringUtils.isNotEmpty(msg.toString())) { if (StringUtils.isNotEmpty(msg.toString())) {
throw new HlsCusException(msg.toString()); throw new HlsCusException(msg.toString());
} }
subExecSqlContext.setExecType(SqlConstantUtils.SQL_EXEC_INSERT); execSqlContext.setExecType(SqlConstantUtils.SQL_EXEC_INSERT);
TransmittableThreadLocal<ExecSqlContext> ttl = new TransmittableThreadLocal<>(); ttlTaskExecutor.execute(new SubThreadTask(retMsg, downLatch));
ttl.set(subExecSqlContext);
executorService.execute(new SubThreadTask(ttl, retMsg, downLatch));
} else if (StringUtils.startsWithIgnoreCase(formatSql, SqlConstantUtils.SQL_EXEC_UPDATE)) { } else if (StringUtils.startsWithIgnoreCase(formatSql, SqlConstantUtils.SQL_EXEC_UPDATE)) {
String checkMsg = SqlParserCheckUtils.parserCheckUpdate(formatSql); String checkMsg = SqlParserCheckUtils.parserCheckUpdate(formatSql);
msg.append(checkMsg); msg.append(checkMsg);
if (StringUtils.isNotEmpty(msg.toString())) { if (StringUtils.isNotEmpty(msg.toString())) {
throw new HlsCusException(msg.toString()); throw new HlsCusException(msg.toString());
} }
subExecSqlContext.setExecType(SqlConstantUtils.SQL_EXEC_UPDATE); execSqlContext.setExecType(SqlConstantUtils.SQL_EXEC_UPDATE);
TransmittableThreadLocal<ExecSqlContext> ttl = new TransmittableThreadLocal<>(); ttlTaskExecutor.execute(new SubThreadTask(retMsg, downLatch));
ttl.set(subExecSqlContext);
executorService.execute(new SubThreadTask(ttl, retMsg, downLatch));
} else if (StringUtils.startsWithIgnoreCase(formatSql, SqlConstantUtils.SQL_EXEC_DELETE)) { } else if (StringUtils.startsWithIgnoreCase(formatSql, SqlConstantUtils.SQL_EXEC_DELETE)) {
String checkMsg = SqlParserCheckUtils.parserCheckDelete(formatSql); String checkMsg = SqlParserCheckUtils.parserCheckDelete(formatSql);
msg.append(checkMsg); msg.append(checkMsg);
if (StringUtils.isNotEmpty(msg.toString())) { if (StringUtils.isNotEmpty(msg.toString())) {
throw new HlsCusException(msg.toString()); throw new HlsCusException(msg.toString());
} }
subExecSqlContext.setExecType(SqlConstantUtils.SQL_EXEC_DELETE); execSqlContext.setExecType(SqlConstantUtils.SQL_EXEC_DELETE);
TransmittableThreadLocal<ExecSqlContext> ttl = new TransmittableThreadLocal<>(); ttlTaskExecutor.execute(new SubThreadTask(retMsg, downLatch));
ttl.set(subExecSqlContext);
executorService.execute(new SubThreadTask(ttl, retMsg, downLatch));
} else if (StringUtils.startsWithIgnoreCase(formatSql, SqlConstantUtils.DDL_CREATE_TABLE) || } else if (StringUtils.startsWithIgnoreCase(formatSql, SqlConstantUtils.DDL_CREATE_TABLE) ||
StringUtils.startsWithIgnoreCase(formatSql, SqlConstantUtils.DDL_ALTER_TABLE) || StringUtils.startsWithIgnoreCase(formatSql, SqlConstantUtils.DDL_ALTER_TABLE) ||
StringUtils.startsWithIgnoreCase(formatSql, SqlConstantUtils.DDL_COMMENT_TABLE) || StringUtils.startsWithIgnoreCase(formatSql, SqlConstantUtils.DDL_COMMENT_TABLE) ||
...@@ -1004,10 +997,8 @@ public class SysDpExecuteHistoryServiceImpl extends BaseServiceImpl<SysDpExecute ...@@ -1004,10 +997,8 @@ public class SysDpExecuteHistoryServiceImpl extends BaseServiceImpl<SysDpExecute
if (StringUtils.isNotEmpty(msg.toString())) { if (StringUtils.isNotEmpty(msg.toString())) {
throw new HlsCusException(msg.toString()); throw new HlsCusException(msg.toString());
} }
subExecSqlContext.setExecType(SqlConstantUtils.SQL_EXEC_DDL); execSqlContext.setExecType(SqlConstantUtils.SQL_EXEC_DDL);
TransmittableThreadLocal<ExecSqlContext> ttl = new TransmittableThreadLocal<>(); ttlTaskExecutor.execute(new SubThreadTask(retMsg, downLatch));
ttl.set(subExecSqlContext);
executorService.execute(new SubThreadTask(ttl, retMsg, downLatch));
} }
} }
try { try {
...@@ -1015,22 +1006,20 @@ public class SysDpExecuteHistoryServiceImpl extends BaseServiceImpl<SysDpExecute ...@@ -1015,22 +1006,20 @@ public class SysDpExecuteHistoryServiceImpl extends BaseServiceImpl<SysDpExecute
} catch (InterruptedException ie) { } catch (InterruptedException ie) {
logger.info("==> 错误信息:[{}]", ie.getMessage()); logger.info("==> 错误信息:[{}]", ie.getMessage());
} }
// subExecContext.getMsg().forEach((k,v) -> result.add("第" + k + "行语句," + v));
retMsg.forEach((k, v) -> result.add("第" + k + "行语句," + v)); retMsg.forEach((k, v) -> result.add("第" + k + "行语句," + v));
if (!result.isEmpty()) { if (!result.isEmpty()) {
responseData.setRows(result); responseData.setRows(result);
} }
context.remove();
} }
return responseData; return responseData;
} }
private static class SubThreadTask implements Runnable { private static class SubThreadTask implements Runnable {
private final ReentrantLock mainLock = new ReentrantLock(); private final ReentrantLock mainLock = new ReentrantLock();
private final TransmittableThreadLocal<ExecSqlContext> ttl;
private final ConcurrentHashMap<Integer, String> retMsg; private final ConcurrentHashMap<Integer, String> retMsg;
private final CountDownLatch downLatch; private final CountDownLatch downLatch;
private SubThreadTask(TransmittableThreadLocal<ExecSqlContext> ttl, ConcurrentHashMap<Integer, String> retMsg, CountDownLatch downLatch) { private SubThreadTask(ConcurrentHashMap<Integer, String> retMsg, CountDownLatch downLatch) {
this.ttl = ttl;
this.retMsg = retMsg; this.retMsg = retMsg;
this.downLatch = downLatch; this.downLatch = downLatch;
} }
...@@ -1039,13 +1028,13 @@ public class SysDpExecuteHistoryServiceImpl extends BaseServiceImpl<SysDpExecute ...@@ -1039,13 +1028,13 @@ public class SysDpExecuteHistoryServiceImpl extends BaseServiceImpl<SysDpExecute
public void run() { public void run() {
final ReentrantLock mainLock = this.mainLock; final ReentrantLock mainLock = this.mainLock;
mainLock.lock(); mainLock.lock();
final Integer seq = ttl.get().getSeq(); final Integer seq = context.get().getSeq();
final String sql = ttl.get().getSql(); final String sql = context.get().getSql();
try { try {
final SysDpExecuteHistoryService service = ttl.get().getService(); final SysDpExecuteHistoryService service = context.get().getService();
final IRequest requestContext = ttl.get().getRequestContext(); final IRequest requestContext = context.get().getRequestContext();
final String ip = ttl.get().getIp(); final String ip = context.get().getIp();
final String execType = ttl.get().getExecType(); final String execType = context.get().getExecType();
ResponseData subResponseData = new ResponseData(); ResponseData subResponseData = new ResponseData();
if (StringUtils.equals(execType, SqlConstantUtils.SQL_EXEC_INSERT)) { if (StringUtils.equals(execType, SqlConstantUtils.SQL_EXEC_INSERT)) {
subResponseData = service.insert(requestContext, ip, sql); subResponseData = service.insert(requestContext, ip, sql);
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment