diff --git a/dbsyncer-biz/pom.xml b/dbsyncer-biz/pom.xml index eb260ca0234a99d231dbdab35fee5ff413761c57..957995ce029c099c6c68fd3567e92dbd5f5e3c86 100644 --- a/dbsyncer-biz/pom.xml +++ b/dbsyncer-biz/pom.xml @@ -3,10 +3,10 @@ - dbsyncer - org.ghi - 1.1.6-Beta - + dbsyncer + org.ghi + 1.1.7-Beta + 4.0.0 dbsyncer-biz diff --git a/dbsyncer-cache/pom.xml b/dbsyncer-cache/pom.xml index bccbd83c868bc4652043cc6be3d98f80c974505d..2f00a4ab35506338da59179552f590a12d161149 100644 --- a/dbsyncer-cache/pom.xml +++ b/dbsyncer-cache/pom.xml @@ -2,10 +2,10 @@ - dbsyncer - org.ghi - 1.1.6-Beta - + dbsyncer + org.ghi + 1.1.7-Beta + 4.0.0 dbsyncer-cache diff --git a/dbsyncer-cluster/pom.xml b/dbsyncer-cluster/pom.xml index 69c5771a7a11165ee3cea7787b82878f5752b8e0..91b5d52632beb02954caba04add7b7522130b3c7 100644 --- a/dbsyncer-cluster/pom.xml +++ b/dbsyncer-cluster/pom.xml @@ -5,7 +5,7 @@ dbsyncer org.ghi - 1.1.6-Beta + 1.1.7-Beta 4.0.0 dbsyncer-cluster diff --git a/dbsyncer-common/pom.xml b/dbsyncer-common/pom.xml index 0a91360ef94c3592f8596db289036be5f7e1cdfb..2a84e6101c7f3a0935d4b703ea9879c7372f67fb 100644 --- a/dbsyncer-common/pom.xml +++ b/dbsyncer-common/pom.xml @@ -5,7 +5,7 @@ dbsyncer org.ghi - 1.1.6-Beta + 1.1.7-Beta 4.0.0 dbsyncer-common diff --git a/dbsyncer-connector/pom.xml b/dbsyncer-connector/pom.xml index 524b693524a6eb424866969162a39b8a5641c895..696cd31955d033e284090f2919c6c832fb92250f 100644 --- a/dbsyncer-connector/pom.xml +++ b/dbsyncer-connector/pom.xml @@ -5,7 +5,7 @@ dbsyncer org.ghi - 1.1.6-Beta + 1.1.7-Beta 4.0.0 dbsyncer-connector @@ -66,6 +66,11 @@ kafka-clients + + org.springframework.boot + spring-boot-starter-log4j2 + + junit junit diff --git a/dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java b/dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java index 2753b0f985231519b5750b8ff24a55a9b7a40fbe..797b0535653a1795bad335157b7d367e7aaf8384 100644 --- a/dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java +++ b/dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java @@ -31,8 +31,6 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector private final Logger logger = LoggerFactory.getLogger(getClass()); - protected abstract String getTableSql(DatabaseConfig config); - @Override public ConnectorMapper connect(DatabaseConfig config) { try { @@ -59,16 +57,6 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector return String.format("%s-%s", config.getUrl(), config.getUsername()); } - @Override - public List getTable(DatabaseConnectorMapper connectorMapper) { - String sql = getTableSql(connectorMapper.getConfig()); - List tableNames = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForList(sql, String.class)); - if (!CollectionUtils.isEmpty(tableNames)) { - return tableNames.stream().map(name -> new Table(name)).collect(Collectors.toList()); - } - return Collections.EMPTY_LIST; - } - @Override public MetaInfo getMetaInfo(DatabaseConnectorMapper connectorMapper, String tableName) { String quotation = buildSqlWithQuotation(); @@ -222,6 +210,15 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector return map; } + /** + * 健康检查 + * + * @return + */ + protected String getValidationQuery() { + return "select 1"; + } + /** * 查询语句表名和字段带上引号(默认不加) * @@ -231,6 +228,21 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector return ""; } + /** + * 获取表列表 + * + * @param connectorMapper + * @param sql + * @return + */ + protected List
getTable(DatabaseConnectorMapper connectorMapper, String sql) { + List tableNames = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForList(sql, String.class)); + if (!CollectionUtils.isEmpty(tableNames)) { + return tableNames.stream().map(name -> new Table(name)).collect(Collectors.toList()); + } + return Collections.EMPTY_LIST; + } + /** * 获取查询条件SQL * @@ -267,6 +279,37 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector return sql.toString(); } + /** + * 根据过滤条件获取查询SQL + * + * @param queryOperator and/or + * @param filter + * @return + */ + private String getFilterSql(String queryOperator, List filter) { + List list = filter.stream().filter(f -> StringUtil.equals(f.getOperation(), queryOperator)).collect(Collectors.toList()); + if (CollectionUtils.isEmpty(list)) { + return ""; + } + + int size = list.size(); + int end = size - 1; + StringBuilder sql = new StringBuilder(); + sql.append("("); + Filter c = null; + String quotation = buildSqlWithQuotation(); + for (int i = 0; i < size; i++) { + c = list.get(i); + // "USER" = 'zhangsan' + sql.append(quotation).append(c.getName()).append(quotation).append(c.getFilter()).append("'").append(c.getValue()).append("'"); + if (i < end) { + sql.append(" ").append(queryOperator).append(" "); + } + } + sql.append(")"); + return sql.toString(); + } + /** * 获取查询SQL * @@ -319,15 +362,6 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector return SqlBuilderEnum.getSqlBuilder(type).buildSql(config); } - /** - * 健康检查 - * - * @return - */ - protected String getValidationQuery() { - return "select 1"; - } - /** * 获取数据库表元数据信息 * @@ -399,34 +433,26 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector } /** - * 根据过滤条件获取查询SQL + * 返回表主键 * - * @param queryOperator and/or - * @param filter + * @param md + * @param tableName * @return + * @throws SQLException */ - private String getFilterSql(String queryOperator, List filter) { - List list = filter.stream().filter(f -> StringUtil.equals(f.getOperation(), queryOperator)).collect(Collectors.toList()); - if (CollectionUtils.isEmpty(list)) { - return ""; - } - - int size = list.size(); - int end = size - 1; - StringBuilder sql = new StringBuilder(); - sql.append("("); - Filter c = null; - String quotation = buildSqlWithQuotation(); - for (int i = 0; i < size; i++) { - c = list.get(i); - // "USER" = 'zhangsan' - sql.append(quotation).append(c.getName()).append(quotation).append(c.getFilter()).append("'").append(c.getValue()).append("'"); - if (i < end) { - sql.append(" ").append(queryOperator).append(" "); + private List findTablePrimaryKeys(DatabaseMetaData md, String tableName) throws SQLException { + //根据表名获得主键结果集 + ResultSet rs = null; + List primaryKeys = new ArrayList<>(); + try { + rs = md.getPrimaryKeys(null, null, tableName); + while (rs.next()) { + primaryKeys.add(rs.getString("COLUMN_NAME")); } + } finally { + DatabaseUtil.close(rs); } - sql.append(")"); - return sql.toString(); + return primaryKeys; } /** @@ -485,9 +511,7 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector try { // 2、设置参数 int execute = connectorMapper.execute(databaseTemplate -> - databaseTemplate.update(sql, (ps) -> - batchRowsSetter(databaseTemplate.getConnection(), ps, fields, row) - ) + databaseTemplate.update(sql, (ps) -> batchRowsSetter(databaseTemplate.getConnection(), ps, fields, row)) ); if (execute == 0) { throw new ConnectorException(String.format("尝试执行[%s]失败", event)); @@ -517,19 +541,4 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector return !CollectionUtils.isEmpty(pk) && pk.contains(name); } - private List findTablePrimaryKeys(DatabaseMetaData md, String tableName) throws SQLException { - //根据表名获得主键结果集 - ResultSet rs = null; - List primaryKeys = new ArrayList<>(); - try { - rs = md.getPrimaryKeys(null, null, tableName); - while (rs.next()) { - primaryKeys.add(rs.getString("COLUMN_NAME")); - } - } finally { - DatabaseUtil.close(rs); - } - return primaryKeys; - } - } \ No newline at end of file diff --git a/dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/ds/SimpleConnection.java b/dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/ds/SimpleConnection.java index dd6d6e2a29f4141e41666338c2a87f91ccd2931b..38d704f3df59c4bbbbe53d0f5380bd619e2d18f1 100644 --- a/dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/ds/SimpleConnection.java +++ b/dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/ds/SimpleConnection.java @@ -301,12 +301,15 @@ public class SimpleConnection implements Connection { @Override public T unwrap(Class iface) throws SQLException { - return null; + if (iface.isAssignableFrom(connection.getClass())) { + return iface.cast(connection); + } + throw new SQLException("Cannot unwrap to " + iface.getName()); } @Override public boolean isWrapperFor(Class iface) throws SQLException { - return false; + return iface.isAssignableFrom(connection.getClass()); } public Connection getConnection() { diff --git a/dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/setter/BigintSetter.java b/dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/setter/BigintSetter.java index ab51da66b446e3fccfe9eaddf3640b4cc0fd0ef7..14d5162af3951205a58b23bd561ae344ee043ec5 100644 --- a/dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/setter/BigintSetter.java +++ b/dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/setter/BigintSetter.java @@ -28,6 +28,11 @@ public class BigintSetter extends AbstractSetter { ps.setLong(i, bitInt.longValue()); return; } + if (val instanceof Integer) { + Integer integer = (Integer) val; + ps.setLong(i, integer); + return; + } throw new ConnectorException(String.format("BigintSetter can not find type [%s], val [%s]", type, val)); } } \ No newline at end of file diff --git a/dbsyncer-connector/src/main/java/org/dbsyncer/connector/mysql/MysqlConnector.java b/dbsyncer-connector/src/main/java/org/dbsyncer/connector/mysql/MysqlConnector.java index 647d132671b07dcdb1e612f93354c7b143ea37a2..e41070f1d7196e94db2720c8360b8da567a3efed 100644 --- a/dbsyncer-connector/src/main/java/org/dbsyncer/connector/mysql/MysqlConnector.java +++ b/dbsyncer-connector/src/main/java/org/dbsyncer/connector/mysql/MysqlConnector.java @@ -1,16 +1,14 @@ package org.dbsyncer.connector.mysql; -import org.dbsyncer.connector.config.DatabaseConfig; import org.dbsyncer.connector.config.PageSqlConfig; +import org.dbsyncer.connector.config.Table; import org.dbsyncer.connector.constant.DatabaseConstant; import org.dbsyncer.connector.database.AbstractDatabaseConnector; +import org.dbsyncer.connector.database.DatabaseConnectorMapper; -public final class MysqlConnector extends AbstractDatabaseConnector { +import java.util.List; - @Override - protected String getTableSql(DatabaseConfig config) { - return "show tables"; - } +public final class MysqlConnector extends AbstractDatabaseConnector { @Override public String getPageSql(PageSqlConfig config) { @@ -22,4 +20,8 @@ public final class MysqlConnector extends AbstractDatabaseConnector { return new Object[]{(pageIndex - 1) * pageSize, pageSize}; } + @Override + public List
getTable(DatabaseConnectorMapper connectorMapper) { + return super.getTable(connectorMapper, "show tables"); + } } \ No newline at end of file diff --git a/dbsyncer-connector/src/main/java/org/dbsyncer/connector/oracle/OracleConnector.java b/dbsyncer-connector/src/main/java/org/dbsyncer/connector/oracle/OracleConnector.java index 0389edd86c7e04e6228fb052ed308d7325ca04d1..8ea5382d7a5b1d18a7524dcb6ee4b8bbd8c293f3 100644 --- a/dbsyncer-connector/src/main/java/org/dbsyncer/connector/oracle/OracleConnector.java +++ b/dbsyncer-connector/src/main/java/org/dbsyncer/connector/oracle/OracleConnector.java @@ -1,7 +1,6 @@ package org.dbsyncer.connector.oracle; import org.dbsyncer.common.util.CollectionUtils; -import org.dbsyncer.connector.config.DatabaseConfig; import org.dbsyncer.connector.config.PageSqlConfig; import org.dbsyncer.connector.config.Table; import org.dbsyncer.connector.constant.DatabaseConstant; @@ -15,14 +14,9 @@ import java.util.stream.Collectors; public final class OracleConnector extends AbstractDatabaseConnector { - @Override - protected String getTableSql(DatabaseConfig config) { - return "SELECT TABLE_NAME,TABLE_TYPE FROM USER_TAB_COMMENTS"; - } - @Override public List
getTable(DatabaseConnectorMapper connectorMapper) { - String sql = getTableSql(connectorMapper.getConfig()); + final String sql = "SELECT TABLE_NAME,TABLE_TYPE FROM USER_TAB_COMMENTS"; List> list = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForList(sql)); if (!CollectionUtils.isEmpty(list)) { return list.stream().map(r -> new Table(r.get("TABLE_NAME").toString(), r.get("TABLE_TYPE").toString())).collect(Collectors.toList()); diff --git a/dbsyncer-connector/src/main/java/org/dbsyncer/connector/postgresql/PostgreSQLConnector.java b/dbsyncer-connector/src/main/java/org/dbsyncer/connector/postgresql/PostgreSQLConnector.java index 6c64cee1503329b3947c589782cbf21794029c7c..3687e5b021149b15c40571f722e75b4b462f2366 100644 --- a/dbsyncer-connector/src/main/java/org/dbsyncer/connector/postgresql/PostgreSQLConnector.java +++ b/dbsyncer-connector/src/main/java/org/dbsyncer/connector/postgresql/PostgreSQLConnector.java @@ -14,20 +14,18 @@ import java.util.List; public final class PostgreSQLConnector extends AbstractDatabaseConnector { - @Override - protected String getTableSql(DatabaseConfig config) { - return String.format("SELECT TABLENAME FROM PG_TABLES WHERE SCHEMANAME ='%s'", config.getSchema()); - } - @Override public List
getTable(DatabaseConnectorMapper connectorMapper) { List
list = new LinkedList<>(); DatabaseConfig config = connectorMapper.getConfig(); - List tableNames = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForList(getTableSql(config), String.class)); + final String queryTableSql = String.format("SELECT TABLENAME FROM PG_TABLES WHERE SCHEMANAME ='%s'", config.getSchema()); + List tableNames = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForList(queryTableSql, String.class)); if (!CollectionUtils.isEmpty(tableNames)) { tableNames.forEach(name -> list.add(new Table(name, TableTypeEnum.TABLE.getCode()))); } - List tableViewNames = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForList(getTableViewSql(config), String.class)); + + final String queryTableViewSql = String.format("SELECT VIEWNAME FROM PG_VIEWS WHERE SCHEMANAME ='%s'", config.getSchema()); + List tableViewNames = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForList(queryTableViewSql, String.class)); if (!CollectionUtils.isEmpty(tableViewNames)) { tableViewNames.forEach(name -> list.add(new Table(name, TableTypeEnum.VIEW.getCode()))); } @@ -48,8 +46,4 @@ public final class PostgreSQLConnector extends AbstractDatabaseConnector { protected String buildSqlWithQuotation() { return "\""; } - - private String getTableViewSql(DatabaseConfig config) { - return String.format("SELECT VIEWNAME FROM PG_VIEWS WHERE SCHEMANAME ='%s'", config.getSchema()); - } } diff --git a/dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/AbstractDQLConnector.java b/dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/AbstractDQLConnector.java index 90de8499255e279f81c16078f029997d37647505..409aeafc0a39a9ce47a1e3e13031e9c8bc3ed721 100644 --- a/dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/AbstractDQLConnector.java +++ b/dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/AbstractDQLConnector.java @@ -1,7 +1,6 @@ package org.dbsyncer.connector.sql; import org.dbsyncer.common.util.StringUtil; -import org.dbsyncer.connector.ConnectorException; import org.dbsyncer.connector.config.*; import org.dbsyncer.connector.constant.ConnectorConstant; import org.dbsyncer.connector.database.AbstractDatabaseConnector; @@ -20,11 +19,6 @@ import java.util.Map; */ public abstract class AbstractDQLConnector extends AbstractDatabaseConnector { - @Override - protected String getTableSql(DatabaseConfig config) { - throw new ConnectorException("Unsupported method."); - } - @Override public List
getTable(DatabaseConnectorMapper config) { DatabaseConfig cfg = config.getConfig(); @@ -50,10 +44,10 @@ public abstract class AbstractDQLConnector extends AbstractDatabaseConnector { * 获取DQL源配置 * * @param commandConfig - * @param appendGroupByPK + * @param groupByPK * @return */ - protected Map getDqlSourceCommand(CommandConfig commandConfig, boolean appendGroupByPK) { + protected Map getDqlSourceCommand(CommandConfig commandConfig, boolean groupByPK) { // 获取过滤SQL List filter = commandConfig.getFilter(); String queryFilterSql = getQueryFilterSql(filter); @@ -73,12 +67,10 @@ public abstract class AbstractDQLConnector extends AbstractDatabaseConnector { // 获取查询总数SQL StringBuilder queryCount = new StringBuilder(); - queryCount.append("SELECT COUNT(1) FROM (").append(table.getName()); - if (StringUtil.isNotBlank(queryFilterSql)) { - queryCount.append(queryFilterSql); - } + queryCount.append("SELECT COUNT(1) FROM (").append(querySql); + // Mysql - if (appendGroupByPK) { + if (groupByPK) { queryCount.append(" GROUP BY ").append(pk); } queryCount.append(") DBSYNCER_T"); diff --git a/dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/DQLSqlServerConnector.java b/dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/DQLSqlServerConnector.java index 3d2a7610cd23b192367fd106c13dec66b2fc5d49..916587d249b59cc79cee1c8b2a6f47d34be5a8be 100644 --- a/dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/DQLSqlServerConnector.java +++ b/dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/DQLSqlServerConnector.java @@ -2,11 +2,8 @@ package org.dbsyncer.connector.sql; import org.dbsyncer.common.util.StringUtil; import org.dbsyncer.connector.ConnectorException; -import org.dbsyncer.connector.ConnectorMapper; -import org.dbsyncer.connector.config.DatabaseConfig; import org.dbsyncer.connector.config.PageSqlConfig; import org.dbsyncer.connector.constant.DatabaseConstant; -import org.dbsyncer.connector.sqlserver.SqlServerConnectorMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -14,16 +11,6 @@ public final class DQLSqlServerConnector extends AbstractDQLConnector { private final Logger logger = LoggerFactory.getLogger(getClass()); - @Override - public ConnectorMapper connect(DatabaseConfig config) { - try { - return new SqlServerConnectorMapper(config); - } catch (Exception e) { - logger.error(e.getMessage()); - throw new ConnectorException(e.getMessage()); - } - } - @Override public String getPageSql(PageSqlConfig config) { if (StringUtil.isBlank(config.getPk())) { diff --git a/dbsyncer-connector/src/main/java/org/dbsyncer/connector/sqlserver/SqlServerConnector.java b/dbsyncer-connector/src/main/java/org/dbsyncer/connector/sqlserver/SqlServerConnector.java index e23c525873f2ab432a75d904f6cb9254f8406265..8eb979c6a531d7471547d8870a91d360c4065230 100644 --- a/dbsyncer-connector/src/main/java/org/dbsyncer/connector/sqlserver/SqlServerConnector.java +++ b/dbsyncer-connector/src/main/java/org/dbsyncer/connector/sqlserver/SqlServerConnector.java @@ -2,7 +2,6 @@ package org.dbsyncer.connector.sqlserver; import org.dbsyncer.common.util.StringUtil; import org.dbsyncer.connector.ConnectorException; -import org.dbsyncer.connector.ConnectorMapper; import org.dbsyncer.connector.config.CommandConfig; import org.dbsyncer.connector.config.DatabaseConfig; import org.dbsyncer.connector.config.PageSqlConfig; @@ -10,10 +9,12 @@ import org.dbsyncer.connector.config.Table; import org.dbsyncer.connector.constant.ConnectorConstant; import org.dbsyncer.connector.constant.DatabaseConstant; import org.dbsyncer.connector.database.AbstractDatabaseConnector; +import org.dbsyncer.connector.database.DatabaseConnectorMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.HashMap; +import java.util.List; import java.util.Map; public final class SqlServerConnector extends AbstractDatabaseConnector { @@ -21,18 +22,9 @@ public final class SqlServerConnector extends AbstractDatabaseConnector { private final Logger logger = LoggerFactory.getLogger(getClass()); @Override - public ConnectorMapper connect(DatabaseConfig config) { - try { - return new SqlServerConnectorMapper(config); - } catch (Exception e) { - logger.error(e.getMessage()); - throw new ConnectorException(e.getMessage()); - } - } - - @Override - protected String getTableSql(DatabaseConfig config) { - return String.format("SELECT NAME FROM SYS.TABLES WHERE SCHEMA_ID = SCHEMA_ID('%s') AND IS_MS_SHIPPED = 0", config.getSchema()); + public List
getTable(DatabaseConnectorMapper connectorMapper) { + DatabaseConfig config = connectorMapper.getConfig(); + return super.getTable(connectorMapper, String.format("SELECT NAME FROM SYS.TABLES WHERE SCHEMA_ID = SCHEMA_ID('%s') AND IS_MS_SHIPPED = 0", config.getSchema())); } @Override diff --git a/dbsyncer-connector/src/main/java/org/dbsyncer/connector/sqlserver/SqlServerConnectorMapper.java b/dbsyncer-connector/src/main/java/org/dbsyncer/connector/sqlserver/SqlServerConnectorMapper.java deleted file mode 100644 index 5cf2bdcd5db3ec5f08f61290c8e7fbea6d549db0..0000000000000000000000000000000000000000 --- a/dbsyncer-connector/src/main/java/org/dbsyncer/connector/sqlserver/SqlServerConnectorMapper.java +++ /dev/null @@ -1,59 +0,0 @@ -package org.dbsyncer.connector.sqlserver; - -import org.dbsyncer.connector.ConnectorException; -import org.dbsyncer.connector.config.DatabaseConfig; -import org.dbsyncer.connector.database.DatabaseConnectorMapper; -import org.dbsyncer.connector.database.DatabaseTemplate; -import org.dbsyncer.connector.database.HandleCallback; -import org.dbsyncer.connector.util.DatabaseUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.dao.EmptyResultDataAccessException; - -import java.sql.Connection; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -public final class SqlServerConnectorMapper extends DatabaseConnectorMapper { - - private final Logger logger = LoggerFactory.getLogger(getClass()); - private final Lock lock = new ReentrantLock(true); - - public SqlServerConnectorMapper(DatabaseConfig config) { - super(config); - } - - /** - * 使用连接时加锁(SqlServer 2008以下版本连接未释放问题) - * - * @param callback - * @return - */ - @Override - public T execute(HandleCallback callback) { - final Lock connectionLock = lock; - boolean locked = false; - Object apply = null; - Connection connection = null; - try { - locked = connectionLock.tryLock(60, TimeUnit.SECONDS); - if (locked) { - connection = getConnection(); - apply = callback.apply(new DatabaseTemplate(connection)); - } - } catch (EmptyResultDataAccessException e) { - throw e; - } catch (Exception e) { - logger.error(e.getMessage()); - throw new ConnectorException(e.getMessage()); - } finally { - if (locked) { - DatabaseUtil.close(connection); - connectionLock.unlock(); - } - } - return (T) apply; - } - -} \ No newline at end of file diff --git a/dbsyncer-connector/src/main/java/org/dbsyncer/connector/util/DatabaseUtil.java b/dbsyncer-connector/src/main/java/org/dbsyncer/connector/util/DatabaseUtil.java index 488c817f9f0a13b08f3d0459a1774aeee8218e49..2bf40ddc6d004193cb0120b390df94f18ed28513 100644 --- a/dbsyncer-connector/src/main/java/org/dbsyncer/connector/util/DatabaseUtil.java +++ b/dbsyncer-connector/src/main/java/org/dbsyncer/connector/util/DatabaseUtil.java @@ -1,5 +1,6 @@ package org.dbsyncer.connector.util; +import org.dbsyncer.common.util.StringUtil; import org.dbsyncer.connector.ConnectorException; import java.sql.Connection; @@ -12,10 +13,12 @@ public abstract class DatabaseUtil { } public static Connection getConnection(String driverClassName, String url, String username, String password) throws SQLException { - try { - Class.forName(driverClassName); - } catch (ClassNotFoundException e) { - throw new ConnectorException(e.getCause()); + if (StringUtil.isNotBlank(driverClassName)) { + try { + Class.forName(driverClassName); + } catch (ClassNotFoundException e) { + throw new ConnectorException(e.getCause()); + } } return DriverManager.getConnection(url, username, password); } diff --git a/dbsyncer-connector/src/main/test/SqlServerConnectionTest.java b/dbsyncer-connector/src/main/test/SqlServerConnectionTest.java new file mode 100644 index 0000000000000000000000000000000000000000..14a2e1dc9153867c0dd3ea5c2c00e2744f9e5c1a --- /dev/null +++ b/dbsyncer-connector/src/main/test/SqlServerConnectionTest.java @@ -0,0 +1,68 @@ +import org.dbsyncer.connector.config.DatabaseConfig; +import org.dbsyncer.connector.database.DatabaseConnectorMapper; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.LocalDateTime; +import java.util.concurrent.*; + +/** + * @author AE86 + * @version 1.0.0 + * @date 2022/4/11 20:19 + */ +public class SqlServerConnectionTest { + + private final Logger logger = LoggerFactory.getLogger(getClass()); + + @Test + public void testConnection() throws InterruptedException { + DatabaseConfig config = new DatabaseConfig(); + config.setUrl("jdbc:sqlserver://127.0.0.1:1433;DatabaseName=test"); + config.setUsername("sa"); + config.setPassword("123"); + config.setDriverClassName("com.microsoft.sqlserver.jdbc.SQLServerDriver"); + final DatabaseConnectorMapper connectorMapper = new DatabaseConnectorMapper(config); + + // 模拟并发 + final int threadSize = 100; + final ExecutorService pool = Executors.newFixedThreadPool(threadSize); + final CyclicBarrier barrier = new CyclicBarrier(threadSize); + final CountDownLatch latch = new CountDownLatch(threadSize); + for (int i = 0; i < threadSize; i++) { + final int k = i + 3; + pool.submit(() -> { + try { + barrier.await(); + + // 模拟操作 + System.out.println(String.format("%s %s:%s", LocalDateTime.now(), Thread.currentThread().getName(), k)); + + Object execute = connectorMapper.execute(tem -> tem.queryForObject("select 1", Integer.class)); + System.out.println(String.format("%s %s:%s execute=>%s", LocalDateTime.now(), Thread.currentThread().getName(), k, execute)); + + } catch (InterruptedException e) { + logger.error(e.getMessage()); + } catch (BrokenBarrierException e) { + logger.error(e.getMessage()); + } catch (Exception e) { + logger.error(e.getMessage()); + } finally { + latch.countDown(); + } + }); + } + + try { + latch.await(); + logger.info("try to shutdown"); + pool.shutdown(); + } catch (InterruptedException e) { + logger.error(e.getMessage()); + } + + TimeUnit.SECONDS.sleep(3); + logger.info("test end"); + } +} diff --git a/dbsyncer-listener/pom.xml b/dbsyncer-listener/pom.xml index 9c4c0e1934642175726122a1a9b2af9b8718d4e8..596cb97068a8b2d5ee7a53812c2f0e94b8eaf4d8 100644 --- a/dbsyncer-listener/pom.xml +++ b/dbsyncer-listener/pom.xml @@ -5,7 +5,7 @@ dbsyncer org.ghi - 1.1.6-Beta + 1.1.7-Beta 4.0.0 dbsyncer-listener diff --git a/dbsyncer-listener/src/main/java/org/dbsyncer/listener/enums/ListenerEnum.java b/dbsyncer-listener/src/main/java/org/dbsyncer/listener/enums/ListenerEnum.java index 28b23258926d681fc846863c14584cd20d61db96..95fd133cc7989c7924e48629d40fd60a3ac81929 100644 --- a/dbsyncer-listener/src/main/java/org/dbsyncer/listener/enums/ListenerEnum.java +++ b/dbsyncer-listener/src/main/java/org/dbsyncer/listener/enums/ListenerEnum.java @@ -31,6 +31,10 @@ public enum ListenerEnum { * log_SqlServer */ LOG_SQL_SERVER(ListenerTypeEnum.LOG.getType() + ConnectorEnum.SQL_SERVER.getType(), SqlServerExtractor.class), + /** + * log_PostgreSQL + */ +// LOG_POSTGRE_SQL(ListenerTypeEnum.LOG.getType() + ConnectorEnum.POSTGRE_SQL.getType(), PostgreSQLExtractor.class), /** * log_Kafka */ @@ -47,6 +51,10 @@ public enum ListenerEnum { * timing_SqlServer */ TIMING_SQL_SERVER(ListenerTypeEnum.TIMING.getType() + ConnectorEnum.SQL_SERVER.getType(), DatabaseQuartzExtractor.class), + /** + * timing_PostgreSQL + */ + TIMING_POSTGRE_SQL(ListenerTypeEnum.TIMING.getType() + ConnectorEnum.POSTGRE_SQL.getType(), DatabaseQuartzExtractor.class), /** * timing_Elasticsearch */ diff --git a/dbsyncer-listener/src/main/java/org/dbsyncer/listener/postgresql/PostgreSQLExtractor.java b/dbsyncer-listener/src/main/java/org/dbsyncer/listener/postgresql/PostgreSQLExtractor.java new file mode 100644 index 0000000000000000000000000000000000000000..b5a0771ba65173328813d77175383e2bca6ab466 --- /dev/null +++ b/dbsyncer-listener/src/main/java/org/dbsyncer/listener/postgresql/PostgreSQLExtractor.java @@ -0,0 +1,115 @@ +package org.dbsyncer.listener.postgresql; + +import org.dbsyncer.connector.config.DatabaseConfig; +import org.dbsyncer.connector.database.DatabaseConnectorMapper; +import org.dbsyncer.connector.util.DatabaseUtil; +import org.dbsyncer.listener.AbstractExtractor; +import org.dbsyncer.listener.ListenerException; +import org.postgresql.PGConnection; +import org.postgresql.replication.PGReplicationStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.Map; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +/** + * @author AE86 + * @version 1.0.0 + * @date 2022/4/10 22:36 + */ +public class PostgreSQLExtractor extends AbstractExtractor { + + private static final String GET_VALIDATION = "SELECT 1"; + private static final String GET_ROLE = "SELECT r.rolcanlogin AS rolcanlogin, r.rolreplication AS rolreplication, CAST(array_position(ARRAY(SELECT b.rolname FROM pg_catalog.pg_auth_members m JOIN pg_catalog.pg_roles b ON (m.roleid = b.oid) WHERE m.member = r.oid), 'rds_superuser') AS BOOL) IS TRUE AS aws_superuser, CAST(array_position(ARRAY(SELECT b.rolname FROM pg_catalog.pg_auth_members m JOIN pg_catalog.pg_roles b ON (m.roleid = b.oid) WHERE m.member = r.oid), 'rdsadmin') AS BOOL) IS TRUE AS aws_admin, CAST(array_position(ARRAY(SELECT b.rolname FROM pg_catalog.pg_auth_members m JOIN pg_catalog.pg_roles b ON (m.roleid = b.oid) WHERE m.member = r.oid), 'rdsrepladmin') AS BOOL) IS TRUE AS aws_repladmin FROM pg_roles r WHERE r.rolname = current_user"; + private static final String GET_WAL_LEVEL = "SHOW WAL_LEVEL"; + private static final String DEFAULT_WAL_LEVEL = "logical"; + private static final String DEFAULT_SLOT_NAME = "DBSYNCER_SLOT"; + private static final String DEFAULT_PLUGIN_NAME = "wal2json"; + private final Logger logger = LoggerFactory.getLogger(getClass()); + private final Lock connectLock = new ReentrantLock(); + private volatile boolean connected; + private Connection connection; + private PGReplicationStream stream; + private DatabaseConfig config; + private DatabaseConnectorMapper connectorMapper; + + @Override + public void start() { + try { + connectLock.lock(); + if (connected) { + logger.error("PostgreSQLExtractor is already started"); + return; + } + + connect(); + + connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForObject(GET_VALIDATION, Integer.class)); + logger.info("Successfully tested connection for {} with user '{}'", config.getUrl(), config.getUsername()); + + final String walLevel = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForObject(GET_WAL_LEVEL, String.class)); + if (!DEFAULT_WAL_LEVEL.equals(walLevel)) { + throw new ListenerException(String.format("Postgres server wal_level property must be \"%s\" but is: %s", DEFAULT_WAL_LEVEL, walLevel)); + } + + final boolean hasAuth = connectorMapper.execute(databaseTemplate -> { + Map rs = databaseTemplate.queryForObject(GET_ROLE, Map.class); + Boolean login = (Boolean) rs.getOrDefault("rolcanlogin", false); + Boolean replication = (Boolean) rs.getOrDefault("rolreplication", false); + Boolean superuser = (Boolean) rs.getOrDefault("aws_superuser", false); + Boolean admin = (Boolean) rs.getOrDefault("aws_admin", false); + Boolean replicationAdmin = (Boolean) rs.getOrDefault("aws_repladmin", false); + return login && (replication || superuser || admin || replicationAdmin); + }); + if (!hasAuth) { + throw new ListenerException(String.format("Postgres roles LOGIN and REPLICATION are not assigned to user: %s", config.getUsername())); + } + connected = true; + } catch (Exception e) { + logger.error("启动失败:{}", e.getMessage()); + throw new ListenerException(e); + } finally { + connectLock.unlock(); + close(); + } + } + + @Override + public void close() { + try { + connectLock.lock(); + connected = false; + DatabaseUtil.close(stream); + DatabaseUtil.close(connection); + } catch (Exception e) { + logger.error("关闭失败:{}", e.getMessage()); + } finally { + connectLock.unlock(); + } + } + + private void connect() throws SQLException { + if (connectorFactory.isAlive(connectorConfig)) { + config = (DatabaseConfig) connectorConfig; + connectorMapper = (DatabaseConnectorMapper) connectorFactory.connect(config); + + connection = DatabaseUtil.getConnection(config.getDriverClassName(), config.getUrl(), config.getUsername(), config.getPassword()); + PGConnection replConnection = connection.unwrap(PGConnection.class); + replConnection.getReplicationAPI() + .createReplicationSlot() + .logical() + .withSlotName(DEFAULT_SLOT_NAME) + .withOutputPlugin(DEFAULT_PLUGIN_NAME) + .make(); + stream = replConnection.getReplicationAPI() + .replicationStream() + .logical() + .withSlotName(DEFAULT_SLOT_NAME) + .start(); + } + } +} diff --git a/dbsyncer-listener/src/main/java/org/dbsyncer/listener/sqlserver/SqlServerExtractor.java b/dbsyncer-listener/src/main/java/org/dbsyncer/listener/sqlserver/SqlServerExtractor.java index 1daada142d3a70ab9c0d109ee8d9cf1dab647dd1..f9a6e7facfbd14a1d96d6c356c0ec8df9af8e381 100644 --- a/dbsyncer-listener/src/main/java/org/dbsyncer/listener/sqlserver/SqlServerExtractor.java +++ b/dbsyncer-listener/src/main/java/org/dbsyncer/listener/sqlserver/SqlServerExtractor.java @@ -18,7 +18,6 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.*; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -48,12 +47,9 @@ public class SqlServerExtractor extends AbstractExtractor { private static final String LSN_POSITION = "position"; private static final long DEFAULT_POLL_INTERVAL_MILLIS = 300; - private static final int PREPARED_STATEMENT_CACHE_CAPACITY = 500; private static final int OFFSET_COLUMNS = 4; - private final Map preparedStatementCache = new ConcurrentHashMap<>(PREPARED_STATEMENT_CACHE_CAPACITY); private final Lock connectLock = new ReentrantLock(); private volatile boolean connected; - private volatile boolean connectionClosed; private static Set tables; private static Set changeTables; private DatabaseConnectorMapper connectorMapper; @@ -103,8 +99,6 @@ public class SqlServerExtractor extends AbstractExtractor { worker.interrupt(); worker = null; } - preparedStatementCache.values().forEach(this::close); - preparedStatementCache.clear(); connected = false; } } @@ -125,7 +119,6 @@ public class SqlServerExtractor extends AbstractExtractor { connectorMapper = (DatabaseConnectorMapper) connectorFactory.connect(cfg); serverName = cfg.getUrl(); schema = cfg.getSchema(); - connectionClosed = false; } } @@ -296,35 +289,26 @@ public class SqlServerExtractor extends AbstractExtractor { } private T query(String preparedQuerySql, StatementPreparer statementPreparer, ResultSetMapper mapper) { - if (connectionClosed) { - connect(); - return null; - } Object execute = connectorMapper.execute(databaseTemplate -> { - if (!preparedStatementCache.containsKey(preparedQuerySql)) { - preparedStatementCache.putIfAbsent(preparedQuerySql, databaseTemplate.getConnection().prepareStatement(preparedQuerySql)); - } - PreparedStatement ps = preparedStatementCache.get(preparedQuerySql); - if (ps.getConnection().isClosed() || ps.isClosed()) { - preparedStatementCache.clear(); - connectionClosed = true; - return null; - } - if (null != statementPreparer) { - statementPreparer.accept(ps); - } + PreparedStatement ps = null; ResultSet rs = null; + T apply = null; try { + ps = databaseTemplate.getConnection().prepareStatement(preparedQuerySql); + if (null != statementPreparer) { + statementPreparer.accept(ps); + } rs = ps.executeQuery(); - return mapper.apply(rs); + apply = mapper.apply(rs); } catch (SQLServerException e) { // 为过程或函数 cdc.fn_cdc_get_all_changes_ ... 提供的参数数目不足。 } catch (Exception e) { logger.error(e.getMessage()); } finally { close(rs); + close(ps); } - return null; + return apply; }); return (T) execute; } @@ -345,8 +329,13 @@ public class SqlServerExtractor extends AbstractExtractor { lastLsn = stopLsn; snapshot.put(LSN_POSITION, lastLsn.toString()); - } catch (InterruptedException e) { - break; + } catch (Exception e) { + logger.error(e.getMessage()); + try { + TimeUnit.SECONDS.sleep(1); + } catch (InterruptedException ex) { + logger.error(ex.getMessage()); + } } } } diff --git a/dbsyncer-listener/src/main/test/ChangeDataCaptureTest.java b/dbsyncer-listener/src/main/test/ChangeDataCaptureTest.java index 5bd346bcd05b3a7a54b3b395e8541087d007f47a..d1094cb048e748c0cc736f3dfbf672f063af0655 100644 --- a/dbsyncer-listener/src/main/test/ChangeDataCaptureTest.java +++ b/dbsyncer-listener/src/main/test/ChangeDataCaptureTest.java @@ -199,7 +199,7 @@ public class ChangeDataCaptureTest { public void start() throws SQLException { String username = "sa"; String password = "123"; - String url = "jdbc:sqlserver://127.0.0.1:1434;DatabaseName=test"; + String url = "jdbc:sqlserver://127.0.0.1:1433;DatabaseName=test"; schema = "dbo"; connection = DriverManager.getConnection(url, username, password); if (connection != null) { diff --git a/dbsyncer-listener/src/main/test/PGReplicationTest.java b/dbsyncer-listener/src/main/test/PGReplicationTest.java new file mode 100644 index 0000000000000000000000000000000000000000..e6eaeefd40a50bffc93f3de79c34dc97263b5d6a --- /dev/null +++ b/dbsyncer-listener/src/main/test/PGReplicationTest.java @@ -0,0 +1,93 @@ +import org.dbsyncer.connector.util.DatabaseUtil; +import org.junit.Test; +import org.postgresql.PGConnection; +import org.postgresql.replication.LogSequenceNumber; +import org.postgresql.replication.PGReplicationStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.concurrent.TimeUnit; + +/** + * @author AE86 + * @version 1.0.0 + * @date 2022/4/8 23:06 + */ +public class PGReplicationTest { + + private final Logger logger = LoggerFactory.getLogger(getClass()); + private Connection connection; + + @Test + public void testPG() throws SQLException, InterruptedException { + String url = "jdbc:postgresql://127.0.0.1:5432/postgres"; + String driverClassNam = "org.postgresql.Driver"; + String username = "postgres"; + String password = "123456"; + connection = DatabaseUtil.getConnection(driverClassNam, url, username, password); + + LogSequenceNumber currentLSN = query("SELECT pg_current_wal_lsn()", rs -> LogSequenceNumber.valueOf(rs.getString(1))); + + PGConnection replConnection = connection.unwrap(PGConnection.class); + PGReplicationStream stream = replConnection + .getReplicationAPI() + .replicationStream() + .logical() + .withSlotName("test_slot") + .withStartPosition(currentLSN) + .start(); + while (true) { + //non blocking receive message + ByteBuffer msg = stream.readPending(); + + if (msg == null) { + TimeUnit.MILLISECONDS.sleep(10L); + continue; + } + int offset = msg.arrayOffset(); + byte[] source = msg.array(); + int length = source.length - offset; + System.out.println(new String(source, offset, length)); + } + + } + + public T query(String sql, ResultSetMapper mapper) { + PreparedStatement ps = null; + ResultSet rs = null; + T apply = null; + try { + ps = connection.prepareStatement(sql); + rs = ps.executeQuery(); + if (rs.next()) { + apply = (T) mapper.apply(rs); + } + } catch (Exception e) { + logger.error(e.getMessage()); + } finally { + close(rs); + close(ps); + } + return apply; + } + + private void close(AutoCloseable closeable) { + if (null != closeable) { + try { + closeable.close(); + } catch (Exception e) { + logger.error(e.getMessage()); + } + } + } + + public interface ResultSetMapper { + T apply(ResultSet rs) throws SQLException; + } + +} diff --git a/dbsyncer-manager/pom.xml b/dbsyncer-manager/pom.xml index 2c4761b731c07b7ce3da773b63797c84b10ec893..9df1ed6b5878b20149560a452ab2a18f2055abde 100644 --- a/dbsyncer-manager/pom.xml +++ b/dbsyncer-manager/pom.xml @@ -5,7 +5,7 @@ dbsyncer org.ghi - 1.1.6-Beta + 1.1.7-Beta 4.0.0 dbsyncer-manager diff --git a/dbsyncer-monitor/pom.xml b/dbsyncer-monitor/pom.xml index fc439f820be62115bff02f70f2c42acce09b79d8..a8d2c1cecd2f556cae4d2b64f6f16aafb0d8a11f 100644 --- a/dbsyncer-monitor/pom.xml +++ b/dbsyncer-monitor/pom.xml @@ -5,7 +5,7 @@ dbsyncer org.ghi - 1.1.6-Beta + 1.1.7-Beta 4.0.0 dbsyncer-monitor diff --git a/dbsyncer-parser/pom.xml b/dbsyncer-parser/pom.xml index a81d16c84f8eb5e9433e5a3b5802ac16d5421f87..e87cbc4fd8355fdcfb3fe9badcb4df357f69be70 100644 --- a/dbsyncer-parser/pom.xml +++ b/dbsyncer-parser/pom.xml @@ -5,7 +5,7 @@ dbsyncer org.ghi - 1.1.6-Beta + 1.1.7-Beta 4.0.0 dbsyncer-parser diff --git a/dbsyncer-plugin/pom.xml b/dbsyncer-plugin/pom.xml index 91e18dc99b1b2f74eae12649920f890ae223bd41..6c664f1bdd2c6c37a76b34b2333c1e91f3dfe657 100644 --- a/dbsyncer-plugin/pom.xml +++ b/dbsyncer-plugin/pom.xml @@ -5,7 +5,7 @@ dbsyncer org.ghi - 1.1.6-Beta + 1.1.7-Beta 4.0.0 dbsyncer-plugin diff --git a/dbsyncer-storage/pom.xml b/dbsyncer-storage/pom.xml index 2946b6ce352e45e8ecc489dd6019f76f7c586ead..5dd6576b063bfd4da7865e37d8c0589b1919300f 100644 --- a/dbsyncer-storage/pom.xml +++ b/dbsyncer-storage/pom.xml @@ -5,7 +5,7 @@ dbsyncer org.ghi - 1.1.6-Beta + 1.1.7-Beta 4.0.0 dbsyncer-storage diff --git a/dbsyncer-storage/src/main/java/org/dbsyncer/storage/lucene/Shard.java b/dbsyncer-storage/src/main/java/org/dbsyncer/storage/lucene/Shard.java index 5f50e93fca2c4e591d920430a8a87fab4240b410..fa01bc8625103b4b1187fe25e9aa4fb256ba9f2b 100644 --- a/dbsyncer-storage/src/main/java/org/dbsyncer/storage/lucene/Shard.java +++ b/dbsyncer-storage/src/main/java/org/dbsyncer/storage/lucene/Shard.java @@ -194,6 +194,8 @@ public class Shard { private void execute(Object value, Callback callback) throws IOException { if (null != value && indexWriter.isOpen()) { callback.execute(); + indexWriter.flush(); + indexWriter.commit(); return; } logger.error(value.toString()); diff --git a/dbsyncer-storage/src/main/test/LuceneFactoryTest.java b/dbsyncer-storage/src/main/test/LuceneFactoryTest.java index 85247a2058ffefedbafb1ccea10250edee4365a3..06f7dcdb523a13fc6dfc93748ed5d39d9b5b26a0 100644 --- a/dbsyncer-storage/src/main/test/LuceneFactoryTest.java +++ b/dbsyncer-storage/src/main/test/LuceneFactoryTest.java @@ -49,7 +49,6 @@ public class LuceneFactoryTest { @After public void tearDown() throws IOException { - shard.close(); shard.deleteAll(); } @@ -69,9 +68,10 @@ public class LuceneFactoryTest { // 模拟操作 System.out.println(String.format("%s:%s", Thread.currentThread().getName(), k)); - Document update = ParamsUtil.convertData2Doc(createMap(k)); - IndexableField field = update.getField(ConfigConstant.CONFIG_MODEL_ID); - shard.update(new Term(ConfigConstant.CONFIG_MODEL_ID, field.stringValue()), update); + Document data = ParamsUtil.convertData2Doc(createMap(k)); + //IndexableField field = data.getField(ConfigConstant.CONFIG_MODEL_ID); + //shard.update(new Term(ConfigConstant.CONFIG_MODEL_ID, field.stringValue()), data); + shard.insert(data); } catch (InterruptedException e) { logger.error(e.getMessage()); @@ -114,7 +114,7 @@ public class LuceneFactoryTest { params.put(ConfigConstant.DATA_EVENT, ConnectorConstant.OPERTION_UPDATE); params.put(ConfigConstant.DATA_ERROR, ""); Map row = new HashMap<>(); - row.put("id", "1"); + row.put("id", i); row.put("name", "中文"); row.put("tel", "15800001234"); row.put("update_time", System.currentTimeMillis()); diff --git a/dbsyncer-web/pom.xml b/dbsyncer-web/pom.xml index 44722d8e97e755bde772ae0d2b9b47451aa139be..a28e391d7ece1aaea958a03b4f38e10db07dcbfc 100644 --- a/dbsyncer-web/pom.xml +++ b/dbsyncer-web/pom.xml @@ -5,7 +5,7 @@ dbsyncer org.ghi - 1.1.6-Beta + 1.1.7-Beta 4.0.0 dbsyncer-web diff --git a/dbsyncer-web/src/main/resources/application.properties b/dbsyncer-web/src/main/resources/application.properties index eb3ee691e999efd48cbb1fc2117e920a745dd2af..cf92fd161d7d76aa9cbf1aaee913add5e34b6777 100644 --- a/dbsyncer-web/src/main/resources/application.properties +++ b/dbsyncer-web/src/main/resources/application.properties @@ -25,7 +25,7 @@ management.endpoints.web.exposure.include=* management.endpoint.health.show-details=always management.health.elasticsearch.enabled=false info.app.name=DBSyncer -info.app.version=1.1.6-Beta +info.app.version=1.1.7-Beta info.app.copyright=©2021 ${info.app.name}(${info.app.version}) #All < Trace < Debug < Info < Warn < Error < Fatal < OFF diff --git a/pom.xml b/pom.xml index afab1078b128dbe8c532c2eb7b112975dfaa8f23..10588c537e5c5daa235cb18c010d0b6df92e355a 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ org.ghi dbsyncer - 1.1.6-Beta + 1.1.7-Beta pom dbsyncer https://gitee.com/ghi/dbsyncer @@ -44,7 +44,7 @@ 11.2.0.4.0-atlassian-hosted 5.1.40 0.21.0 - 8.2.0.jre8 + 7.4.1.jre8 42.3.3 0.9.0.0 20090211