From 2462f705bde4443570549bfcae1187e1bf318e35 Mon Sep 17 00:00:00 2001 From: AE86 <836391306@qq.com> Date: Thu, 7 Apr 2022 00:17:24 +0800 Subject: [PATCH 01/12] fix --- .../database/AbstractDatabaseConnector.java | 133 ++++++++++-------- .../connector/mysql/MysqlConnector.java | 14 +- .../connector/oracle/OracleConnector.java | 8 +- .../postgresql/PostgreSQLConnector.java | 16 +-- .../connector/sql/AbstractDQLConnector.java | 6 - .../sqlserver/SqlServerConnector.java | 7 +- 6 files changed, 90 insertions(+), 94 deletions(-) diff --git a/dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java b/dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java index 2753b0f9..797b0535 100644 --- a/dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java +++ b/dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/AbstractDatabaseConnector.java @@ -31,8 +31,6 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector private final Logger logger = LoggerFactory.getLogger(getClass()); - protected abstract String getTableSql(DatabaseConfig config); - @Override public ConnectorMapper connect(DatabaseConfig config) { try { @@ -59,16 +57,6 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector return String.format("%s-%s", config.getUrl(), config.getUsername()); } - @Override - public List getTable(DatabaseConnectorMapper connectorMapper) { - String sql = getTableSql(connectorMapper.getConfig()); - List tableNames = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForList(sql, String.class)); - if (!CollectionUtils.isEmpty(tableNames)) { - return tableNames.stream().map(name -> new Table(name)).collect(Collectors.toList()); - } - return Collections.EMPTY_LIST; - } - @Override public MetaInfo getMetaInfo(DatabaseConnectorMapper connectorMapper, String tableName) { String quotation = buildSqlWithQuotation(); @@ -222,6 +210,15 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector return map; } + /** + * 健康检查 + * + * @return + */ + protected String getValidationQuery() { + return "select 1"; + } + /** * 查询语句表名和字段带上引号(默认不加) * @@ -231,6 +228,21 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector return ""; } + /** + * 获取表列表 + * + * @param connectorMapper + * @param sql + * @return + */ + protected List
getTable(DatabaseConnectorMapper connectorMapper, String sql) { + List tableNames = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForList(sql, String.class)); + if (!CollectionUtils.isEmpty(tableNames)) { + return tableNames.stream().map(name -> new Table(name)).collect(Collectors.toList()); + } + return Collections.EMPTY_LIST; + } + /** * 获取查询条件SQL * @@ -267,6 +279,37 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector return sql.toString(); } + /** + * 根据过滤条件获取查询SQL + * + * @param queryOperator and/or + * @param filter + * @return + */ + private String getFilterSql(String queryOperator, List filter) { + List list = filter.stream().filter(f -> StringUtil.equals(f.getOperation(), queryOperator)).collect(Collectors.toList()); + if (CollectionUtils.isEmpty(list)) { + return ""; + } + + int size = list.size(); + int end = size - 1; + StringBuilder sql = new StringBuilder(); + sql.append("("); + Filter c = null; + String quotation = buildSqlWithQuotation(); + for (int i = 0; i < size; i++) { + c = list.get(i); + // "USER" = 'zhangsan' + sql.append(quotation).append(c.getName()).append(quotation).append(c.getFilter()).append("'").append(c.getValue()).append("'"); + if (i < end) { + sql.append(" ").append(queryOperator).append(" "); + } + } + sql.append(")"); + return sql.toString(); + } + /** * 获取查询SQL * @@ -319,15 +362,6 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector return SqlBuilderEnum.getSqlBuilder(type).buildSql(config); } - /** - * 健康检查 - * - * @return - */ - protected String getValidationQuery() { - return "select 1"; - } - /** * 获取数据库表元数据信息 * @@ -399,34 +433,26 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector } /** - * 根据过滤条件获取查询SQL + * 返回表主键 * - * @param queryOperator and/or - * @param filter + * @param md + * @param tableName * @return + * @throws SQLException */ - private String getFilterSql(String queryOperator, List filter) { - List list = filter.stream().filter(f -> StringUtil.equals(f.getOperation(), queryOperator)).collect(Collectors.toList()); - if (CollectionUtils.isEmpty(list)) { - return ""; - } - - int size = list.size(); - int end = size - 1; - StringBuilder sql = new StringBuilder(); - sql.append("("); - Filter c = null; - String quotation = buildSqlWithQuotation(); - for (int i = 0; i < size; i++) { - c = list.get(i); - // "USER" = 'zhangsan' - sql.append(quotation).append(c.getName()).append(quotation).append(c.getFilter()).append("'").append(c.getValue()).append("'"); - if (i < end) { - sql.append(" ").append(queryOperator).append(" "); + private List findTablePrimaryKeys(DatabaseMetaData md, String tableName) throws SQLException { + //根据表名获得主键结果集 + ResultSet rs = null; + List primaryKeys = new ArrayList<>(); + try { + rs = md.getPrimaryKeys(null, null, tableName); + while (rs.next()) { + primaryKeys.add(rs.getString("COLUMN_NAME")); } + } finally { + DatabaseUtil.close(rs); } - sql.append(")"); - return sql.toString(); + return primaryKeys; } /** @@ -485,9 +511,7 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector try { // 2、设置参数 int execute = connectorMapper.execute(databaseTemplate -> - databaseTemplate.update(sql, (ps) -> - batchRowsSetter(databaseTemplate.getConnection(), ps, fields, row) - ) + databaseTemplate.update(sql, (ps) -> batchRowsSetter(databaseTemplate.getConnection(), ps, fields, row)) ); if (execute == 0) { throw new ConnectorException(String.format("尝试执行[%s]失败", event)); @@ -517,19 +541,4 @@ public abstract class AbstractDatabaseConnector extends AbstractConnector return !CollectionUtils.isEmpty(pk) && pk.contains(name); } - private List findTablePrimaryKeys(DatabaseMetaData md, String tableName) throws SQLException { - //根据表名获得主键结果集 - ResultSet rs = null; - List primaryKeys = new ArrayList<>(); - try { - rs = md.getPrimaryKeys(null, null, tableName); - while (rs.next()) { - primaryKeys.add(rs.getString("COLUMN_NAME")); - } - } finally { - DatabaseUtil.close(rs); - } - return primaryKeys; - } - } \ No newline at end of file diff --git a/dbsyncer-connector/src/main/java/org/dbsyncer/connector/mysql/MysqlConnector.java b/dbsyncer-connector/src/main/java/org/dbsyncer/connector/mysql/MysqlConnector.java index 647d1326..e41070f1 100644 --- a/dbsyncer-connector/src/main/java/org/dbsyncer/connector/mysql/MysqlConnector.java +++ b/dbsyncer-connector/src/main/java/org/dbsyncer/connector/mysql/MysqlConnector.java @@ -1,16 +1,14 @@ package org.dbsyncer.connector.mysql; -import org.dbsyncer.connector.config.DatabaseConfig; import org.dbsyncer.connector.config.PageSqlConfig; +import org.dbsyncer.connector.config.Table; import org.dbsyncer.connector.constant.DatabaseConstant; import org.dbsyncer.connector.database.AbstractDatabaseConnector; +import org.dbsyncer.connector.database.DatabaseConnectorMapper; -public final class MysqlConnector extends AbstractDatabaseConnector { +import java.util.List; - @Override - protected String getTableSql(DatabaseConfig config) { - return "show tables"; - } +public final class MysqlConnector extends AbstractDatabaseConnector { @Override public String getPageSql(PageSqlConfig config) { @@ -22,4 +20,8 @@ public final class MysqlConnector extends AbstractDatabaseConnector { return new Object[]{(pageIndex - 1) * pageSize, pageSize}; } + @Override + public List
getTable(DatabaseConnectorMapper connectorMapper) { + return super.getTable(connectorMapper, "show tables"); + } } \ No newline at end of file diff --git a/dbsyncer-connector/src/main/java/org/dbsyncer/connector/oracle/OracleConnector.java b/dbsyncer-connector/src/main/java/org/dbsyncer/connector/oracle/OracleConnector.java index 0389edd8..8ea5382d 100644 --- a/dbsyncer-connector/src/main/java/org/dbsyncer/connector/oracle/OracleConnector.java +++ b/dbsyncer-connector/src/main/java/org/dbsyncer/connector/oracle/OracleConnector.java @@ -1,7 +1,6 @@ package org.dbsyncer.connector.oracle; import org.dbsyncer.common.util.CollectionUtils; -import org.dbsyncer.connector.config.DatabaseConfig; import org.dbsyncer.connector.config.PageSqlConfig; import org.dbsyncer.connector.config.Table; import org.dbsyncer.connector.constant.DatabaseConstant; @@ -15,14 +14,9 @@ import java.util.stream.Collectors; public final class OracleConnector extends AbstractDatabaseConnector { - @Override - protected String getTableSql(DatabaseConfig config) { - return "SELECT TABLE_NAME,TABLE_TYPE FROM USER_TAB_COMMENTS"; - } - @Override public List
getTable(DatabaseConnectorMapper connectorMapper) { - String sql = getTableSql(connectorMapper.getConfig()); + final String sql = "SELECT TABLE_NAME,TABLE_TYPE FROM USER_TAB_COMMENTS"; List> list = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForList(sql)); if (!CollectionUtils.isEmpty(list)) { return list.stream().map(r -> new Table(r.get("TABLE_NAME").toString(), r.get("TABLE_TYPE").toString())).collect(Collectors.toList()); diff --git a/dbsyncer-connector/src/main/java/org/dbsyncer/connector/postgresql/PostgreSQLConnector.java b/dbsyncer-connector/src/main/java/org/dbsyncer/connector/postgresql/PostgreSQLConnector.java index 6c64cee1..3687e5b0 100644 --- a/dbsyncer-connector/src/main/java/org/dbsyncer/connector/postgresql/PostgreSQLConnector.java +++ b/dbsyncer-connector/src/main/java/org/dbsyncer/connector/postgresql/PostgreSQLConnector.java @@ -14,20 +14,18 @@ import java.util.List; public final class PostgreSQLConnector extends AbstractDatabaseConnector { - @Override - protected String getTableSql(DatabaseConfig config) { - return String.format("SELECT TABLENAME FROM PG_TABLES WHERE SCHEMANAME ='%s'", config.getSchema()); - } - @Override public List
getTable(DatabaseConnectorMapper connectorMapper) { List
list = new LinkedList<>(); DatabaseConfig config = connectorMapper.getConfig(); - List tableNames = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForList(getTableSql(config), String.class)); + final String queryTableSql = String.format("SELECT TABLENAME FROM PG_TABLES WHERE SCHEMANAME ='%s'", config.getSchema()); + List tableNames = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForList(queryTableSql, String.class)); if (!CollectionUtils.isEmpty(tableNames)) { tableNames.forEach(name -> list.add(new Table(name, TableTypeEnum.TABLE.getCode()))); } - List tableViewNames = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForList(getTableViewSql(config), String.class)); + + final String queryTableViewSql = String.format("SELECT VIEWNAME FROM PG_VIEWS WHERE SCHEMANAME ='%s'", config.getSchema()); + List tableViewNames = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForList(queryTableViewSql, String.class)); if (!CollectionUtils.isEmpty(tableViewNames)) { tableViewNames.forEach(name -> list.add(new Table(name, TableTypeEnum.VIEW.getCode()))); } @@ -48,8 +46,4 @@ public final class PostgreSQLConnector extends AbstractDatabaseConnector { protected String buildSqlWithQuotation() { return "\""; } - - private String getTableViewSql(DatabaseConfig config) { - return String.format("SELECT VIEWNAME FROM PG_VIEWS WHERE SCHEMANAME ='%s'", config.getSchema()); - } } diff --git a/dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/AbstractDQLConnector.java b/dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/AbstractDQLConnector.java index 90de8499..26ab5173 100644 --- a/dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/AbstractDQLConnector.java +++ b/dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/AbstractDQLConnector.java @@ -1,7 +1,6 @@ package org.dbsyncer.connector.sql; import org.dbsyncer.common.util.StringUtil; -import org.dbsyncer.connector.ConnectorException; import org.dbsyncer.connector.config.*; import org.dbsyncer.connector.constant.ConnectorConstant; import org.dbsyncer.connector.database.AbstractDatabaseConnector; @@ -20,11 +19,6 @@ import java.util.Map; */ public abstract class AbstractDQLConnector extends AbstractDatabaseConnector { - @Override - protected String getTableSql(DatabaseConfig config) { - throw new ConnectorException("Unsupported method."); - } - @Override public List
getTable(DatabaseConnectorMapper config) { DatabaseConfig cfg = config.getConfig(); diff --git a/dbsyncer-connector/src/main/java/org/dbsyncer/connector/sqlserver/SqlServerConnector.java b/dbsyncer-connector/src/main/java/org/dbsyncer/connector/sqlserver/SqlServerConnector.java index e23c5258..206d4410 100644 --- a/dbsyncer-connector/src/main/java/org/dbsyncer/connector/sqlserver/SqlServerConnector.java +++ b/dbsyncer-connector/src/main/java/org/dbsyncer/connector/sqlserver/SqlServerConnector.java @@ -10,10 +10,12 @@ import org.dbsyncer.connector.config.Table; import org.dbsyncer.connector.constant.ConnectorConstant; import org.dbsyncer.connector.constant.DatabaseConstant; import org.dbsyncer.connector.database.AbstractDatabaseConnector; +import org.dbsyncer.connector.database.DatabaseConnectorMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.HashMap; +import java.util.List; import java.util.Map; public final class SqlServerConnector extends AbstractDatabaseConnector { @@ -31,8 +33,9 @@ public final class SqlServerConnector extends AbstractDatabaseConnector { } @Override - protected String getTableSql(DatabaseConfig config) { - return String.format("SELECT NAME FROM SYS.TABLES WHERE SCHEMA_ID = SCHEMA_ID('%s') AND IS_MS_SHIPPED = 0", config.getSchema()); + public List
getTable(DatabaseConnectorMapper connectorMapper) { + DatabaseConfig config = connectorMapper.getConfig(); + return super.getTable(connectorMapper, String.format("SELECT NAME FROM SYS.TABLES WHERE SCHEMA_ID = SCHEMA_ID('%s') AND IS_MS_SHIPPED = 0", config.getSchema())); } @Override -- Gitee From c9c8d5081e55e460609ec888b678e69f24db60d0 Mon Sep 17 00:00:00 2001 From: AE86 <836391306@qq.com> Date: Fri, 8 Apr 2022 21:15:24 +0800 Subject: [PATCH 02/12] fix --- .../dbsyncer/connector/sql/AbstractDQLConnector.java | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/AbstractDQLConnector.java b/dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/AbstractDQLConnector.java index 26ab5173..409aeafc 100644 --- a/dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/AbstractDQLConnector.java +++ b/dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/AbstractDQLConnector.java @@ -44,10 +44,10 @@ public abstract class AbstractDQLConnector extends AbstractDatabaseConnector { * 获取DQL源配置 * * @param commandConfig - * @param appendGroupByPK + * @param groupByPK * @return */ - protected Map getDqlSourceCommand(CommandConfig commandConfig, boolean appendGroupByPK) { + protected Map getDqlSourceCommand(CommandConfig commandConfig, boolean groupByPK) { // 获取过滤SQL List filter = commandConfig.getFilter(); String queryFilterSql = getQueryFilterSql(filter); @@ -67,12 +67,10 @@ public abstract class AbstractDQLConnector extends AbstractDatabaseConnector { // 获取查询总数SQL StringBuilder queryCount = new StringBuilder(); - queryCount.append("SELECT COUNT(1) FROM (").append(table.getName()); - if (StringUtil.isNotBlank(queryFilterSql)) { - queryCount.append(queryFilterSql); - } + queryCount.append("SELECT COUNT(1) FROM (").append(querySql); + // Mysql - if (appendGroupByPK) { + if (groupByPK) { queryCount.append(" GROUP BY ").append(pk); } queryCount.append(") DBSYNCER_T"); -- Gitee From b9c2bd8226522428da83fd53e1434db9d524844a Mon Sep 17 00:00:00 2001 From: AE86 <836391306@qq.com> Date: Sat, 9 Apr 2022 00:21:48 +0800 Subject: [PATCH 03/12] fix bug --- .../org/dbsyncer/connector/util/DatabaseUtil.java | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/dbsyncer-connector/src/main/java/org/dbsyncer/connector/util/DatabaseUtil.java b/dbsyncer-connector/src/main/java/org/dbsyncer/connector/util/DatabaseUtil.java index 488c817f..2bf40ddc 100644 --- a/dbsyncer-connector/src/main/java/org/dbsyncer/connector/util/DatabaseUtil.java +++ b/dbsyncer-connector/src/main/java/org/dbsyncer/connector/util/DatabaseUtil.java @@ -1,5 +1,6 @@ package org.dbsyncer.connector.util; +import org.dbsyncer.common.util.StringUtil; import org.dbsyncer.connector.ConnectorException; import java.sql.Connection; @@ -12,10 +13,12 @@ public abstract class DatabaseUtil { } public static Connection getConnection(String driverClassName, String url, String username, String password) throws SQLException { - try { - Class.forName(driverClassName); - } catch (ClassNotFoundException e) { - throw new ConnectorException(e.getCause()); + if (StringUtil.isNotBlank(driverClassName)) { + try { + Class.forName(driverClassName); + } catch (ClassNotFoundException e) { + throw new ConnectorException(e.getCause()); + } } return DriverManager.getConnection(url, username, password); } -- Gitee From 7351f4f1789030cf9af31576ecb7ac1b8a893056 Mon Sep 17 00:00:00 2001 From: AE86 <836391306@qq.com> Date: Sat, 9 Apr 2022 00:27:05 +0800 Subject: [PATCH 04/12] add test --- .../database/ds/SimpleConnection.java | 7 ++- .../src/main/test/PGReplicationTest.java | 57 +++++++++++++++++++ 2 files changed, 62 insertions(+), 2 deletions(-) create mode 100644 dbsyncer-listener/src/main/test/PGReplicationTest.java diff --git a/dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/ds/SimpleConnection.java b/dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/ds/SimpleConnection.java index dd6d6e2a..38d704f3 100644 --- a/dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/ds/SimpleConnection.java +++ b/dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/ds/SimpleConnection.java @@ -301,12 +301,15 @@ public class SimpleConnection implements Connection { @Override public T unwrap(Class iface) throws SQLException { - return null; + if (iface.isAssignableFrom(connection.getClass())) { + return iface.cast(connection); + } + throw new SQLException("Cannot unwrap to " + iface.getName()); } @Override public boolean isWrapperFor(Class iface) throws SQLException { - return false; + return iface.isAssignableFrom(connection.getClass()); } public Connection getConnection() { diff --git a/dbsyncer-listener/src/main/test/PGReplicationTest.java b/dbsyncer-listener/src/main/test/PGReplicationTest.java new file mode 100644 index 00000000..c3b9bb53 --- /dev/null +++ b/dbsyncer-listener/src/main/test/PGReplicationTest.java @@ -0,0 +1,57 @@ +import org.postgresql.PGConnection; +import org.postgresql.PGProperty; +import org.postgresql.replication.PGReplicationStream; + +import java.nio.ByteBuffer; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.Properties; +import java.util.concurrent.TimeUnit; + +/** + * @author AE86 + * @version 1.0.0 + * @date 2022/4/8 23:06 + */ +public class PGReplicationTest { + + public static void main(String[] args) throws SQLException, InterruptedException { + String url = "jdbc:postgresql://localhost:5432/postgres"; + Properties props = new Properties(); + PGProperty.USER.set(props, "postgres"); + PGProperty.PASSWORD.set(props, "123456"); + PGProperty.ASSUME_MIN_SERVER_VERSION.set(props, "14.2"); + PGProperty.REPLICATION.set(props, "postgres"); + PGProperty.PREFER_QUERY_MODE.set(props, "simple"); + Connection con = DriverManager.getConnection(url, props); + PGConnection replConnection = con.unwrap(PGConnection.class); + replConnection.getReplicationAPI() + .createReplicationSlot() + .logical() + .withSlotName("test_slot") + .withOutputPlugin("wal2json") + .make(); + PGReplicationStream stream = + replConnection.getReplicationAPI() + .replicationStream() + .logical() + .withSlotName("test_slot") + .start(); + while (true) { + //non blocking receive message + ByteBuffer msg = stream.readPending(); + + if (msg == null) { + TimeUnit.MILLISECONDS.sleep(10L); + continue; + } + int offset = msg.arrayOffset(); + byte[] source = msg.array(); + int length = source.length - offset; + System.out.println(new String(source, offset, length)); + } + + } + +} -- Gitee From c3c7559336490dd0029d7c733cf87b39df4d5d9a Mon Sep 17 00:00:00 2001 From: AE86 <836391306@qq.com> Date: Sun, 10 Apr 2022 22:38:45 +0800 Subject: [PATCH 05/12] add pg listener --- .../dbsyncer/listener/enums/ListenerEnum.java | 9 ++++++ .../postgresql/PostgreSQLExtractor.java | 20 ++++++++++++ .../src/main/test/PGReplicationTest.java | 31 ++++++++----------- 3 files changed, 42 insertions(+), 18 deletions(-) create mode 100644 dbsyncer-listener/src/main/java/org/dbsyncer/listener/postgresql/PostgreSQLExtractor.java diff --git a/dbsyncer-listener/src/main/java/org/dbsyncer/listener/enums/ListenerEnum.java b/dbsyncer-listener/src/main/java/org/dbsyncer/listener/enums/ListenerEnum.java index 28b23258..974a34ed 100644 --- a/dbsyncer-listener/src/main/java/org/dbsyncer/listener/enums/ListenerEnum.java +++ b/dbsyncer-listener/src/main/java/org/dbsyncer/listener/enums/ListenerEnum.java @@ -6,6 +6,7 @@ import org.dbsyncer.listener.ListenerException; import org.dbsyncer.listener.kafka.KafkaExtractor; import org.dbsyncer.listener.mysql.MysqlExtractor; import org.dbsyncer.listener.oracle.OracleExtractor; +import org.dbsyncer.listener.postgresql.PostgreSQLExtractor; import org.dbsyncer.listener.quartz.DatabaseQuartzExtractor; import org.dbsyncer.listener.quartz.ESQuartzExtractor; import org.dbsyncer.listener.sqlserver.SqlServerExtractor; @@ -31,6 +32,10 @@ public enum ListenerEnum { * log_SqlServer */ LOG_SQL_SERVER(ListenerTypeEnum.LOG.getType() + ConnectorEnum.SQL_SERVER.getType(), SqlServerExtractor.class), + /** + * log_PostgreSQL + */ + LOG_POSTGRE_SQL(ListenerTypeEnum.LOG.getType() + ConnectorEnum.POSTGRE_SQL.getType(), PostgreSQLExtractor.class), /** * log_Kafka */ @@ -47,6 +52,10 @@ public enum ListenerEnum { * timing_SqlServer */ TIMING_SQL_SERVER(ListenerTypeEnum.TIMING.getType() + ConnectorEnum.SQL_SERVER.getType(), DatabaseQuartzExtractor.class), + /** + * timing_PostgreSQL + */ + TIMING_POSTGRE_SQL(ListenerTypeEnum.TIMING.getType() + ConnectorEnum.POSTGRE_SQL.getType(), DatabaseQuartzExtractor.class), /** * timing_Elasticsearch */ diff --git a/dbsyncer-listener/src/main/java/org/dbsyncer/listener/postgresql/PostgreSQLExtractor.java b/dbsyncer-listener/src/main/java/org/dbsyncer/listener/postgresql/PostgreSQLExtractor.java new file mode 100644 index 00000000..15f76ba0 --- /dev/null +++ b/dbsyncer-listener/src/main/java/org/dbsyncer/listener/postgresql/PostgreSQLExtractor.java @@ -0,0 +1,20 @@ +package org.dbsyncer.listener.postgresql; + +import org.dbsyncer.listener.AbstractExtractor; + +/** + * @author AE86 + * @version 1.0.0 + * @date 2022/4/10 22:36 + */ +public class PostgreSQLExtractor extends AbstractExtractor { + @Override + public void start() { + + } + + @Override + public void close() { + + } +} diff --git a/dbsyncer-listener/src/main/test/PGReplicationTest.java b/dbsyncer-listener/src/main/test/PGReplicationTest.java index c3b9bb53..10dfeb2a 100644 --- a/dbsyncer-listener/src/main/test/PGReplicationTest.java +++ b/dbsyncer-listener/src/main/test/PGReplicationTest.java @@ -1,12 +1,10 @@ +import org.dbsyncer.connector.util.DatabaseUtil; import org.postgresql.PGConnection; -import org.postgresql.PGProperty; import org.postgresql.replication.PGReplicationStream; import java.nio.ByteBuffer; import java.sql.Connection; -import java.sql.DriverManager; import java.sql.SQLException; -import java.util.Properties; import java.util.concurrent.TimeUnit; /** @@ -17,27 +15,24 @@ import java.util.concurrent.TimeUnit; public class PGReplicationTest { public static void main(String[] args) throws SQLException, InterruptedException { - String url = "jdbc:postgresql://localhost:5432/postgres"; - Properties props = new Properties(); - PGProperty.USER.set(props, "postgres"); - PGProperty.PASSWORD.set(props, "123456"); - PGProperty.ASSUME_MIN_SERVER_VERSION.set(props, "14.2"); - PGProperty.REPLICATION.set(props, "postgres"); - PGProperty.PREFER_QUERY_MODE.set(props, "simple"); - Connection con = DriverManager.getConnection(url, props); + String url = "jdbc:postgresql://127.0.0.1:5432/postgres"; + String driverClassNam = "org.postgresql.Driver"; + String username = "postgres"; + String password = "123456"; + String slotName = "test_slot"; + Connection con = DatabaseUtil.getConnection(driverClassNam, url, username, password); PGConnection replConnection = con.unwrap(PGConnection.class); replConnection.getReplicationAPI() .createReplicationSlot() .logical() - .withSlotName("test_slot") + .withSlotName(slotName) .withOutputPlugin("wal2json") .make(); - PGReplicationStream stream = - replConnection.getReplicationAPI() - .replicationStream() - .logical() - .withSlotName("test_slot") - .start(); + PGReplicationStream stream = replConnection.getReplicationAPI() + .replicationStream() + .logical() + .withSlotName(slotName) + .start(); while (true) { //non blocking receive message ByteBuffer msg = stream.readPending(); -- Gitee From 33288592c7a200faf689abf0eef5b16b590e0cbf Mon Sep 17 00:00:00 2001 From: AE86 <836391306@qq.com> Date: Sun, 10 Apr 2022 23:38:47 +0800 Subject: [PATCH 06/12] add PostgreSQLExtractor --- .../postgresql/PostgreSQLExtractor.java | 95 +++++++++++++++++++ 1 file changed, 95 insertions(+) diff --git a/dbsyncer-listener/src/main/java/org/dbsyncer/listener/postgresql/PostgreSQLExtractor.java b/dbsyncer-listener/src/main/java/org/dbsyncer/listener/postgresql/PostgreSQLExtractor.java index 15f76ba0..b5a0771b 100644 --- a/dbsyncer-listener/src/main/java/org/dbsyncer/listener/postgresql/PostgreSQLExtractor.java +++ b/dbsyncer-listener/src/main/java/org/dbsyncer/listener/postgresql/PostgreSQLExtractor.java @@ -1,6 +1,20 @@ package org.dbsyncer.listener.postgresql; +import org.dbsyncer.connector.config.DatabaseConfig; +import org.dbsyncer.connector.database.DatabaseConnectorMapper; +import org.dbsyncer.connector.util.DatabaseUtil; import org.dbsyncer.listener.AbstractExtractor; +import org.dbsyncer.listener.ListenerException; +import org.postgresql.PGConnection; +import org.postgresql.replication.PGReplicationStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.Map; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; /** * @author AE86 @@ -8,13 +22,94 @@ import org.dbsyncer.listener.AbstractExtractor; * @date 2022/4/10 22:36 */ public class PostgreSQLExtractor extends AbstractExtractor { + + private static final String GET_VALIDATION = "SELECT 1"; + private static final String GET_ROLE = "SELECT r.rolcanlogin AS rolcanlogin, r.rolreplication AS rolreplication, CAST(array_position(ARRAY(SELECT b.rolname FROM pg_catalog.pg_auth_members m JOIN pg_catalog.pg_roles b ON (m.roleid = b.oid) WHERE m.member = r.oid), 'rds_superuser') AS BOOL) IS TRUE AS aws_superuser, CAST(array_position(ARRAY(SELECT b.rolname FROM pg_catalog.pg_auth_members m JOIN pg_catalog.pg_roles b ON (m.roleid = b.oid) WHERE m.member = r.oid), 'rdsadmin') AS BOOL) IS TRUE AS aws_admin, CAST(array_position(ARRAY(SELECT b.rolname FROM pg_catalog.pg_auth_members m JOIN pg_catalog.pg_roles b ON (m.roleid = b.oid) WHERE m.member = r.oid), 'rdsrepladmin') AS BOOL) IS TRUE AS aws_repladmin FROM pg_roles r WHERE r.rolname = current_user"; + private static final String GET_WAL_LEVEL = "SHOW WAL_LEVEL"; + private static final String DEFAULT_WAL_LEVEL = "logical"; + private static final String DEFAULT_SLOT_NAME = "DBSYNCER_SLOT"; + private static final String DEFAULT_PLUGIN_NAME = "wal2json"; + private final Logger logger = LoggerFactory.getLogger(getClass()); + private final Lock connectLock = new ReentrantLock(); + private volatile boolean connected; + private Connection connection; + private PGReplicationStream stream; + private DatabaseConfig config; + private DatabaseConnectorMapper connectorMapper; + @Override public void start() { + try { + connectLock.lock(); + if (connected) { + logger.error("PostgreSQLExtractor is already started"); + return; + } + + connect(); + connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForObject(GET_VALIDATION, Integer.class)); + logger.info("Successfully tested connection for {} with user '{}'", config.getUrl(), config.getUsername()); + + final String walLevel = connectorMapper.execute(databaseTemplate -> databaseTemplate.queryForObject(GET_WAL_LEVEL, String.class)); + if (!DEFAULT_WAL_LEVEL.equals(walLevel)) { + throw new ListenerException(String.format("Postgres server wal_level property must be \"%s\" but is: %s", DEFAULT_WAL_LEVEL, walLevel)); + } + + final boolean hasAuth = connectorMapper.execute(databaseTemplate -> { + Map rs = databaseTemplate.queryForObject(GET_ROLE, Map.class); + Boolean login = (Boolean) rs.getOrDefault("rolcanlogin", false); + Boolean replication = (Boolean) rs.getOrDefault("rolreplication", false); + Boolean superuser = (Boolean) rs.getOrDefault("aws_superuser", false); + Boolean admin = (Boolean) rs.getOrDefault("aws_admin", false); + Boolean replicationAdmin = (Boolean) rs.getOrDefault("aws_repladmin", false); + return login && (replication || superuser || admin || replicationAdmin); + }); + if (!hasAuth) { + throw new ListenerException(String.format("Postgres roles LOGIN and REPLICATION are not assigned to user: %s", config.getUsername())); + } + connected = true; + } catch (Exception e) { + logger.error("启动失败:{}", e.getMessage()); + throw new ListenerException(e); + } finally { + connectLock.unlock(); + close(); + } } @Override public void close() { + try { + connectLock.lock(); + connected = false; + DatabaseUtil.close(stream); + DatabaseUtil.close(connection); + } catch (Exception e) { + logger.error("关闭失败:{}", e.getMessage()); + } finally { + connectLock.unlock(); + } + } + + private void connect() throws SQLException { + if (connectorFactory.isAlive(connectorConfig)) { + config = (DatabaseConfig) connectorConfig; + connectorMapper = (DatabaseConnectorMapper) connectorFactory.connect(config); + connection = DatabaseUtil.getConnection(config.getDriverClassName(), config.getUrl(), config.getUsername(), config.getPassword()); + PGConnection replConnection = connection.unwrap(PGConnection.class); + replConnection.getReplicationAPI() + .createReplicationSlot() + .logical() + .withSlotName(DEFAULT_SLOT_NAME) + .withOutputPlugin(DEFAULT_PLUGIN_NAME) + .make(); + stream = replConnection.getReplicationAPI() + .replicationStream() + .logical() + .withSlotName(DEFAULT_SLOT_NAME) + .start(); + } } } -- Gitee From cf55394a7c8ed3fd59695a744c06e922663ac2d4 Mon Sep 17 00:00:00 2001 From: AE86 <836391306@qq.com> Date: Mon, 11 Apr 2022 20:41:49 +0800 Subject: [PATCH 07/12] rm SqlServer connection mapper --- dbsyncer-connector/pom.xml | 5 ++ .../connector/sql/DQLSqlServerConnector.java | 13 ---- .../sqlserver/SqlServerConnector.java | 11 --- .../sqlserver/SqlServerConnectorMapper.java | 59 ---------------- .../main/test/SqlServerConnectionTest.java | 68 +++++++++++++++++++ 5 files changed, 73 insertions(+), 83 deletions(-) delete mode 100644 dbsyncer-connector/src/main/java/org/dbsyncer/connector/sqlserver/SqlServerConnectorMapper.java create mode 100644 dbsyncer-connector/src/main/test/SqlServerConnectionTest.java diff --git a/dbsyncer-connector/pom.xml b/dbsyncer-connector/pom.xml index 524b6935..117abd93 100644 --- a/dbsyncer-connector/pom.xml +++ b/dbsyncer-connector/pom.xml @@ -66,6 +66,11 @@ kafka-clients + + org.springframework.boot + spring-boot-starter-log4j2 + + junit junit diff --git a/dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/DQLSqlServerConnector.java b/dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/DQLSqlServerConnector.java index 3d2a7610..916587d2 100644 --- a/dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/DQLSqlServerConnector.java +++ b/dbsyncer-connector/src/main/java/org/dbsyncer/connector/sql/DQLSqlServerConnector.java @@ -2,11 +2,8 @@ package org.dbsyncer.connector.sql; import org.dbsyncer.common.util.StringUtil; import org.dbsyncer.connector.ConnectorException; -import org.dbsyncer.connector.ConnectorMapper; -import org.dbsyncer.connector.config.DatabaseConfig; import org.dbsyncer.connector.config.PageSqlConfig; import org.dbsyncer.connector.constant.DatabaseConstant; -import org.dbsyncer.connector.sqlserver.SqlServerConnectorMapper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -14,16 +11,6 @@ public final class DQLSqlServerConnector extends AbstractDQLConnector { private final Logger logger = LoggerFactory.getLogger(getClass()); - @Override - public ConnectorMapper connect(DatabaseConfig config) { - try { - return new SqlServerConnectorMapper(config); - } catch (Exception e) { - logger.error(e.getMessage()); - throw new ConnectorException(e.getMessage()); - } - } - @Override public String getPageSql(PageSqlConfig config) { if (StringUtil.isBlank(config.getPk())) { diff --git a/dbsyncer-connector/src/main/java/org/dbsyncer/connector/sqlserver/SqlServerConnector.java b/dbsyncer-connector/src/main/java/org/dbsyncer/connector/sqlserver/SqlServerConnector.java index 206d4410..8eb979c6 100644 --- a/dbsyncer-connector/src/main/java/org/dbsyncer/connector/sqlserver/SqlServerConnector.java +++ b/dbsyncer-connector/src/main/java/org/dbsyncer/connector/sqlserver/SqlServerConnector.java @@ -2,7 +2,6 @@ package org.dbsyncer.connector.sqlserver; import org.dbsyncer.common.util.StringUtil; import org.dbsyncer.connector.ConnectorException; -import org.dbsyncer.connector.ConnectorMapper; import org.dbsyncer.connector.config.CommandConfig; import org.dbsyncer.connector.config.DatabaseConfig; import org.dbsyncer.connector.config.PageSqlConfig; @@ -22,16 +21,6 @@ public final class SqlServerConnector extends AbstractDatabaseConnector { private final Logger logger = LoggerFactory.getLogger(getClass()); - @Override - public ConnectorMapper connect(DatabaseConfig config) { - try { - return new SqlServerConnectorMapper(config); - } catch (Exception e) { - logger.error(e.getMessage()); - throw new ConnectorException(e.getMessage()); - } - } - @Override public List
getTable(DatabaseConnectorMapper connectorMapper) { DatabaseConfig config = connectorMapper.getConfig(); diff --git a/dbsyncer-connector/src/main/java/org/dbsyncer/connector/sqlserver/SqlServerConnectorMapper.java b/dbsyncer-connector/src/main/java/org/dbsyncer/connector/sqlserver/SqlServerConnectorMapper.java deleted file mode 100644 index 5cf2bdcd..00000000 --- a/dbsyncer-connector/src/main/java/org/dbsyncer/connector/sqlserver/SqlServerConnectorMapper.java +++ /dev/null @@ -1,59 +0,0 @@ -package org.dbsyncer.connector.sqlserver; - -import org.dbsyncer.connector.ConnectorException; -import org.dbsyncer.connector.config.DatabaseConfig; -import org.dbsyncer.connector.database.DatabaseConnectorMapper; -import org.dbsyncer.connector.database.DatabaseTemplate; -import org.dbsyncer.connector.database.HandleCallback; -import org.dbsyncer.connector.util.DatabaseUtil; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import org.springframework.dao.EmptyResultDataAccessException; - -import java.sql.Connection; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -public final class SqlServerConnectorMapper extends DatabaseConnectorMapper { - - private final Logger logger = LoggerFactory.getLogger(getClass()); - private final Lock lock = new ReentrantLock(true); - - public SqlServerConnectorMapper(DatabaseConfig config) { - super(config); - } - - /** - * 使用连接时加锁(SqlServer 2008以下版本连接未释放问题) - * - * @param callback - * @return - */ - @Override - public T execute(HandleCallback callback) { - final Lock connectionLock = lock; - boolean locked = false; - Object apply = null; - Connection connection = null; - try { - locked = connectionLock.tryLock(60, TimeUnit.SECONDS); - if (locked) { - connection = getConnection(); - apply = callback.apply(new DatabaseTemplate(connection)); - } - } catch (EmptyResultDataAccessException e) { - throw e; - } catch (Exception e) { - logger.error(e.getMessage()); - throw new ConnectorException(e.getMessage()); - } finally { - if (locked) { - DatabaseUtil.close(connection); - connectionLock.unlock(); - } - } - return (T) apply; - } - -} \ No newline at end of file diff --git a/dbsyncer-connector/src/main/test/SqlServerConnectionTest.java b/dbsyncer-connector/src/main/test/SqlServerConnectionTest.java new file mode 100644 index 00000000..14a2e1dc --- /dev/null +++ b/dbsyncer-connector/src/main/test/SqlServerConnectionTest.java @@ -0,0 +1,68 @@ +import org.dbsyncer.connector.config.DatabaseConfig; +import org.dbsyncer.connector.database.DatabaseConnectorMapper; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.LocalDateTime; +import java.util.concurrent.*; + +/** + * @author AE86 + * @version 1.0.0 + * @date 2022/4/11 20:19 + */ +public class SqlServerConnectionTest { + + private final Logger logger = LoggerFactory.getLogger(getClass()); + + @Test + public void testConnection() throws InterruptedException { + DatabaseConfig config = new DatabaseConfig(); + config.setUrl("jdbc:sqlserver://127.0.0.1:1433;DatabaseName=test"); + config.setUsername("sa"); + config.setPassword("123"); + config.setDriverClassName("com.microsoft.sqlserver.jdbc.SQLServerDriver"); + final DatabaseConnectorMapper connectorMapper = new DatabaseConnectorMapper(config); + + // 模拟并发 + final int threadSize = 100; + final ExecutorService pool = Executors.newFixedThreadPool(threadSize); + final CyclicBarrier barrier = new CyclicBarrier(threadSize); + final CountDownLatch latch = new CountDownLatch(threadSize); + for (int i = 0; i < threadSize; i++) { + final int k = i + 3; + pool.submit(() -> { + try { + barrier.await(); + + // 模拟操作 + System.out.println(String.format("%s %s:%s", LocalDateTime.now(), Thread.currentThread().getName(), k)); + + Object execute = connectorMapper.execute(tem -> tem.queryForObject("select 1", Integer.class)); + System.out.println(String.format("%s %s:%s execute=>%s", LocalDateTime.now(), Thread.currentThread().getName(), k, execute)); + + } catch (InterruptedException e) { + logger.error(e.getMessage()); + } catch (BrokenBarrierException e) { + logger.error(e.getMessage()); + } catch (Exception e) { + logger.error(e.getMessage()); + } finally { + latch.countDown(); + } + }); + } + + try { + latch.await(); + logger.info("try to shutdown"); + pool.shutdown(); + } catch (InterruptedException e) { + logger.error(e.getMessage()); + } + + TimeUnit.SECONDS.sleep(3); + logger.info("test end"); + } +} -- Gitee From 96ec05cb05fe3ef5cd6b50e8835e2a1477c553bb Mon Sep 17 00:00:00 2001 From: AE86 <836391306@qq.com> Date: Wed, 13 Apr 2022 04:08:15 +0000 Subject: [PATCH 08/12] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E8=B6=85=E6=97=B6?= =?UTF-8?q?=E6=96=AD=E8=BF=9E?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../database/setter/BigintSetter.java | 5 +++ .../sqlserver/SqlServerExtractor.java | 43 +++++++------------ .../org/dbsyncer/storage/lucene/Shard.java | 3 ++ 3 files changed, 24 insertions(+), 27 deletions(-) diff --git a/dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/setter/BigintSetter.java b/dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/setter/BigintSetter.java index ab51da66..14d5162a 100644 --- a/dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/setter/BigintSetter.java +++ b/dbsyncer-connector/src/main/java/org/dbsyncer/connector/database/setter/BigintSetter.java @@ -28,6 +28,11 @@ public class BigintSetter extends AbstractSetter { ps.setLong(i, bitInt.longValue()); return; } + if (val instanceof Integer) { + Integer integer = (Integer) val; + ps.setLong(i, integer); + return; + } throw new ConnectorException(String.format("BigintSetter can not find type [%s], val [%s]", type, val)); } } \ No newline at end of file diff --git a/dbsyncer-listener/src/main/java/org/dbsyncer/listener/sqlserver/SqlServerExtractor.java b/dbsyncer-listener/src/main/java/org/dbsyncer/listener/sqlserver/SqlServerExtractor.java index 1daada14..f9a6e7fa 100644 --- a/dbsyncer-listener/src/main/java/org/dbsyncer/listener/sqlserver/SqlServerExtractor.java +++ b/dbsyncer-listener/src/main/java/org/dbsyncer/listener/sqlserver/SqlServerExtractor.java @@ -18,7 +18,6 @@ import java.sql.PreparedStatement; import java.sql.ResultSet; import java.sql.SQLException; import java.util.*; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -48,12 +47,9 @@ public class SqlServerExtractor extends AbstractExtractor { private static final String LSN_POSITION = "position"; private static final long DEFAULT_POLL_INTERVAL_MILLIS = 300; - private static final int PREPARED_STATEMENT_CACHE_CAPACITY = 500; private static final int OFFSET_COLUMNS = 4; - private final Map preparedStatementCache = new ConcurrentHashMap<>(PREPARED_STATEMENT_CACHE_CAPACITY); private final Lock connectLock = new ReentrantLock(); private volatile boolean connected; - private volatile boolean connectionClosed; private static Set tables; private static Set changeTables; private DatabaseConnectorMapper connectorMapper; @@ -103,8 +99,6 @@ public class SqlServerExtractor extends AbstractExtractor { worker.interrupt(); worker = null; } - preparedStatementCache.values().forEach(this::close); - preparedStatementCache.clear(); connected = false; } } @@ -125,7 +119,6 @@ public class SqlServerExtractor extends AbstractExtractor { connectorMapper = (DatabaseConnectorMapper) connectorFactory.connect(cfg); serverName = cfg.getUrl(); schema = cfg.getSchema(); - connectionClosed = false; } } @@ -296,35 +289,26 @@ public class SqlServerExtractor extends AbstractExtractor { } private T query(String preparedQuerySql, StatementPreparer statementPreparer, ResultSetMapper mapper) { - if (connectionClosed) { - connect(); - return null; - } Object execute = connectorMapper.execute(databaseTemplate -> { - if (!preparedStatementCache.containsKey(preparedQuerySql)) { - preparedStatementCache.putIfAbsent(preparedQuerySql, databaseTemplate.getConnection().prepareStatement(preparedQuerySql)); - } - PreparedStatement ps = preparedStatementCache.get(preparedQuerySql); - if (ps.getConnection().isClosed() || ps.isClosed()) { - preparedStatementCache.clear(); - connectionClosed = true; - return null; - } - if (null != statementPreparer) { - statementPreparer.accept(ps); - } + PreparedStatement ps = null; ResultSet rs = null; + T apply = null; try { + ps = databaseTemplate.getConnection().prepareStatement(preparedQuerySql); + if (null != statementPreparer) { + statementPreparer.accept(ps); + } rs = ps.executeQuery(); - return mapper.apply(rs); + apply = mapper.apply(rs); } catch (SQLServerException e) { // 为过程或函数 cdc.fn_cdc_get_all_changes_ ... 提供的参数数目不足。 } catch (Exception e) { logger.error(e.getMessage()); } finally { close(rs); + close(ps); } - return null; + return apply; }); return (T) execute; } @@ -345,8 +329,13 @@ public class SqlServerExtractor extends AbstractExtractor { lastLsn = stopLsn; snapshot.put(LSN_POSITION, lastLsn.toString()); - } catch (InterruptedException e) { - break; + } catch (Exception e) { + logger.error(e.getMessage()); + try { + TimeUnit.SECONDS.sleep(1); + } catch (InterruptedException ex) { + logger.error(ex.getMessage()); + } } } } diff --git a/dbsyncer-storage/src/main/java/org/dbsyncer/storage/lucene/Shard.java b/dbsyncer-storage/src/main/java/org/dbsyncer/storage/lucene/Shard.java index 5f50e93f..d5502b06 100644 --- a/dbsyncer-storage/src/main/java/org/dbsyncer/storage/lucene/Shard.java +++ b/dbsyncer-storage/src/main/java/org/dbsyncer/storage/lucene/Shard.java @@ -91,6 +91,7 @@ public class Shard { } public void close() throws IOException { + indexWriter.flush(); indexWriter.commit(); indexReader.close(); indexWriter.close(); @@ -194,6 +195,8 @@ public class Shard { private void execute(Object value, Callback callback) throws IOException { if (null != value && indexWriter.isOpen()) { callback.execute(); + indexWriter.flush(); + indexWriter.commit(); return; } logger.error(value.toString()); -- Gitee From e7df5f1dc7e984dfda1b82613b0ae77249fcb356 Mon Sep 17 00:00:00 2001 From: AE86 <836391306@qq.com> Date: Wed, 13 Apr 2022 06:25:10 +0000 Subject: [PATCH 09/12] =?UTF-8?q?=E6=9B=BF=E6=8D=A2=E9=A9=B1=E5=8A=A8mssql?= =?UTF-8?q?-jdbc=E4=B8=BAsqljdbc4?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- dbsyncer-connector/pom.xml | 4 ++-- pom.xml | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/dbsyncer-connector/pom.xml b/dbsyncer-connector/pom.xml index 117abd93..691a271d 100644 --- a/dbsyncer-connector/pom.xml +++ b/dbsyncer-connector/pom.xml @@ -38,8 +38,8 @@ - com.microsoft.sqlserver - mssql-jdbc + com.microsoft + sqljdbc4 diff --git a/pom.xml b/pom.xml index afab1078..e4c47e6a 100644 --- a/pom.xml +++ b/pom.xml @@ -44,7 +44,7 @@ 11.2.0.4.0-atlassian-hosted 5.1.40 0.21.0 - 8.2.0.jre8 + 3.0 42.3.3 0.9.0.0 20090211 @@ -146,9 +146,9 @@ - com.microsoft.sqlserver - mssql-jdbc - ${mssql-jdbc.version} + com.microsoft + sqljdbc4 + ${sqljdbc4.version} -- Gitee From e04098ef04ea046d206dc3573881f55a2bc01591 Mon Sep 17 00:00:00 2001 From: AE86 <836391306@qq.com> Date: Wed, 13 Apr 2022 06:53:45 +0000 Subject: [PATCH 10/12] =?UTF-8?q?=E9=99=8D=E4=BD=8Emssql-jdbc=E7=89=88?= =?UTF-8?q?=E6=9C=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- dbsyncer-connector/pom.xml | 4 ++-- pom.xml | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/dbsyncer-connector/pom.xml b/dbsyncer-connector/pom.xml index 691a271d..117abd93 100644 --- a/dbsyncer-connector/pom.xml +++ b/dbsyncer-connector/pom.xml @@ -38,8 +38,8 @@ - com.microsoft - sqljdbc4 + com.microsoft.sqlserver + mssql-jdbc diff --git a/pom.xml b/pom.xml index e4c47e6a..6346b1ac 100644 --- a/pom.xml +++ b/pom.xml @@ -44,7 +44,7 @@ 11.2.0.4.0-atlassian-hosted 5.1.40 0.21.0 - 3.0 + 7.4.1.jre8 42.3.3 0.9.0.0 20090211 @@ -146,9 +146,9 @@ - com.microsoft - sqljdbc4 - ${sqljdbc4.version} + com.microsoft.sqlserver + mssql-jdbc + ${mssql-jdbc.version} -- Gitee From 382bdbcdb13ba2812472dfc8ac72ac009812aad1 Mon Sep 17 00:00:00 2001 From: AE86 <836391306@qq.com> Date: Wed, 13 Apr 2022 07:42:33 +0000 Subject: [PATCH 11/12] fix IW --- .../main/java/org/dbsyncer/storage/lucene/Shard.java | 1 - dbsyncer-storage/src/main/test/LuceneFactoryTest.java | 10 +++++----- 2 files changed, 5 insertions(+), 6 deletions(-) diff --git a/dbsyncer-storage/src/main/java/org/dbsyncer/storage/lucene/Shard.java b/dbsyncer-storage/src/main/java/org/dbsyncer/storage/lucene/Shard.java index d5502b06..fa01bc86 100644 --- a/dbsyncer-storage/src/main/java/org/dbsyncer/storage/lucene/Shard.java +++ b/dbsyncer-storage/src/main/java/org/dbsyncer/storage/lucene/Shard.java @@ -91,7 +91,6 @@ public class Shard { } public void close() throws IOException { - indexWriter.flush(); indexWriter.commit(); indexReader.close(); indexWriter.close(); diff --git a/dbsyncer-storage/src/main/test/LuceneFactoryTest.java b/dbsyncer-storage/src/main/test/LuceneFactoryTest.java index 85247a20..06f7dcdb 100644 --- a/dbsyncer-storage/src/main/test/LuceneFactoryTest.java +++ b/dbsyncer-storage/src/main/test/LuceneFactoryTest.java @@ -49,7 +49,6 @@ public class LuceneFactoryTest { @After public void tearDown() throws IOException { - shard.close(); shard.deleteAll(); } @@ -69,9 +68,10 @@ public class LuceneFactoryTest { // 模拟操作 System.out.println(String.format("%s:%s", Thread.currentThread().getName(), k)); - Document update = ParamsUtil.convertData2Doc(createMap(k)); - IndexableField field = update.getField(ConfigConstant.CONFIG_MODEL_ID); - shard.update(new Term(ConfigConstant.CONFIG_MODEL_ID, field.stringValue()), update); + Document data = ParamsUtil.convertData2Doc(createMap(k)); + //IndexableField field = data.getField(ConfigConstant.CONFIG_MODEL_ID); + //shard.update(new Term(ConfigConstant.CONFIG_MODEL_ID, field.stringValue()), data); + shard.insert(data); } catch (InterruptedException e) { logger.error(e.getMessage()); @@ -114,7 +114,7 @@ public class LuceneFactoryTest { params.put(ConfigConstant.DATA_EVENT, ConnectorConstant.OPERTION_UPDATE); params.put(ConfigConstant.DATA_ERROR, ""); Map row = new HashMap<>(); - row.put("id", "1"); + row.put("id", i); row.put("name", "中文"); row.put("tel", "15800001234"); row.put("update_time", System.currentTimeMillis()); -- Gitee From 2e4f045d3cbba320bbf7239f09dade2ff4f589c3 Mon Sep 17 00:00:00 2001 From: AE86 <836391306@qq.com> Date: Wed, 13 Apr 2022 19:25:04 +0800 Subject: [PATCH 12/12] upgrade version --- dbsyncer-biz/pom.xml | 8 +-- dbsyncer-cache/pom.xml | 8 +-- dbsyncer-cluster/pom.xml | 2 +- dbsyncer-common/pom.xml | 2 +- dbsyncer-connector/pom.xml | 2 +- dbsyncer-listener/pom.xml | 2 +- .../dbsyncer/listener/enums/ListenerEnum.java | 3 +- .../src/main/test/ChangeDataCaptureTest.java | 2 +- .../src/main/test/PGReplicationTest.java | 65 +++++++++++++++---- dbsyncer-manager/pom.xml | 2 +- dbsyncer-monitor/pom.xml | 2 +- dbsyncer-parser/pom.xml | 2 +- dbsyncer-plugin/pom.xml | 2 +- dbsyncer-storage/pom.xml | 2 +- dbsyncer-web/pom.xml | 2 +- .../src/main/resources/application.properties | 2 +- pom.xml | 2 +- 17 files changed, 75 insertions(+), 35 deletions(-) diff --git a/dbsyncer-biz/pom.xml b/dbsyncer-biz/pom.xml index eb260ca0..957995ce 100644 --- a/dbsyncer-biz/pom.xml +++ b/dbsyncer-biz/pom.xml @@ -3,10 +3,10 @@ - dbsyncer - org.ghi - 1.1.6-Beta - + dbsyncer + org.ghi + 1.1.7-Beta + 4.0.0 dbsyncer-biz diff --git a/dbsyncer-cache/pom.xml b/dbsyncer-cache/pom.xml index bccbd83c..2f00a4ab 100644 --- a/dbsyncer-cache/pom.xml +++ b/dbsyncer-cache/pom.xml @@ -2,10 +2,10 @@ - dbsyncer - org.ghi - 1.1.6-Beta - + dbsyncer + org.ghi + 1.1.7-Beta + 4.0.0 dbsyncer-cache diff --git a/dbsyncer-cluster/pom.xml b/dbsyncer-cluster/pom.xml index 69c5771a..91b5d526 100644 --- a/dbsyncer-cluster/pom.xml +++ b/dbsyncer-cluster/pom.xml @@ -5,7 +5,7 @@ dbsyncer org.ghi - 1.1.6-Beta + 1.1.7-Beta 4.0.0 dbsyncer-cluster diff --git a/dbsyncer-common/pom.xml b/dbsyncer-common/pom.xml index 0a91360e..2a84e610 100644 --- a/dbsyncer-common/pom.xml +++ b/dbsyncer-common/pom.xml @@ -5,7 +5,7 @@ dbsyncer org.ghi - 1.1.6-Beta + 1.1.7-Beta 4.0.0 dbsyncer-common diff --git a/dbsyncer-connector/pom.xml b/dbsyncer-connector/pom.xml index 117abd93..696cd319 100644 --- a/dbsyncer-connector/pom.xml +++ b/dbsyncer-connector/pom.xml @@ -5,7 +5,7 @@ dbsyncer org.ghi - 1.1.6-Beta + 1.1.7-Beta 4.0.0 dbsyncer-connector diff --git a/dbsyncer-listener/pom.xml b/dbsyncer-listener/pom.xml index 9c4c0e19..596cb970 100644 --- a/dbsyncer-listener/pom.xml +++ b/dbsyncer-listener/pom.xml @@ -5,7 +5,7 @@ dbsyncer org.ghi - 1.1.6-Beta + 1.1.7-Beta 4.0.0 dbsyncer-listener diff --git a/dbsyncer-listener/src/main/java/org/dbsyncer/listener/enums/ListenerEnum.java b/dbsyncer-listener/src/main/java/org/dbsyncer/listener/enums/ListenerEnum.java index 974a34ed..95fd133c 100644 --- a/dbsyncer-listener/src/main/java/org/dbsyncer/listener/enums/ListenerEnum.java +++ b/dbsyncer-listener/src/main/java/org/dbsyncer/listener/enums/ListenerEnum.java @@ -6,7 +6,6 @@ import org.dbsyncer.listener.ListenerException; import org.dbsyncer.listener.kafka.KafkaExtractor; import org.dbsyncer.listener.mysql.MysqlExtractor; import org.dbsyncer.listener.oracle.OracleExtractor; -import org.dbsyncer.listener.postgresql.PostgreSQLExtractor; import org.dbsyncer.listener.quartz.DatabaseQuartzExtractor; import org.dbsyncer.listener.quartz.ESQuartzExtractor; import org.dbsyncer.listener.sqlserver.SqlServerExtractor; @@ -35,7 +34,7 @@ public enum ListenerEnum { /** * log_PostgreSQL */ - LOG_POSTGRE_SQL(ListenerTypeEnum.LOG.getType() + ConnectorEnum.POSTGRE_SQL.getType(), PostgreSQLExtractor.class), +// LOG_POSTGRE_SQL(ListenerTypeEnum.LOG.getType() + ConnectorEnum.POSTGRE_SQL.getType(), PostgreSQLExtractor.class), /** * log_Kafka */ diff --git a/dbsyncer-listener/src/main/test/ChangeDataCaptureTest.java b/dbsyncer-listener/src/main/test/ChangeDataCaptureTest.java index 5bd346bc..d1094cb0 100644 --- a/dbsyncer-listener/src/main/test/ChangeDataCaptureTest.java +++ b/dbsyncer-listener/src/main/test/ChangeDataCaptureTest.java @@ -199,7 +199,7 @@ public class ChangeDataCaptureTest { public void start() throws SQLException { String username = "sa"; String password = "123"; - String url = "jdbc:sqlserver://127.0.0.1:1434;DatabaseName=test"; + String url = "jdbc:sqlserver://127.0.0.1:1433;DatabaseName=test"; schema = "dbo"; connection = DriverManager.getConnection(url, username, password); if (connection != null) { diff --git a/dbsyncer-listener/src/main/test/PGReplicationTest.java b/dbsyncer-listener/src/main/test/PGReplicationTest.java index 10dfeb2a..e6eaeefd 100644 --- a/dbsyncer-listener/src/main/test/PGReplicationTest.java +++ b/dbsyncer-listener/src/main/test/PGReplicationTest.java @@ -1,9 +1,15 @@ import org.dbsyncer.connector.util.DatabaseUtil; +import org.junit.Test; import org.postgresql.PGConnection; +import org.postgresql.replication.LogSequenceNumber; import org.postgresql.replication.PGReplicationStream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.nio.ByteBuffer; import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; import java.sql.SQLException; import java.util.concurrent.TimeUnit; @@ -14,24 +20,26 @@ import java.util.concurrent.TimeUnit; */ public class PGReplicationTest { - public static void main(String[] args) throws SQLException, InterruptedException { + private final Logger logger = LoggerFactory.getLogger(getClass()); + private Connection connection; + + @Test + public void testPG() throws SQLException, InterruptedException { String url = "jdbc:postgresql://127.0.0.1:5432/postgres"; String driverClassNam = "org.postgresql.Driver"; String username = "postgres"; String password = "123456"; - String slotName = "test_slot"; - Connection con = DatabaseUtil.getConnection(driverClassNam, url, username, password); - PGConnection replConnection = con.unwrap(PGConnection.class); - replConnection.getReplicationAPI() - .createReplicationSlot() - .logical() - .withSlotName(slotName) - .withOutputPlugin("wal2json") - .make(); - PGReplicationStream stream = replConnection.getReplicationAPI() + connection = DatabaseUtil.getConnection(driverClassNam, url, username, password); + + LogSequenceNumber currentLSN = query("SELECT pg_current_wal_lsn()", rs -> LogSequenceNumber.valueOf(rs.getString(1))); + + PGConnection replConnection = connection.unwrap(PGConnection.class); + PGReplicationStream stream = replConnection + .getReplicationAPI() .replicationStream() .logical() - .withSlotName(slotName) + .withSlotName("test_slot") + .withStartPosition(currentLSN) .start(); while (true) { //non blocking receive message @@ -49,4 +57,37 @@ public class PGReplicationTest { } + public T query(String sql, ResultSetMapper mapper) { + PreparedStatement ps = null; + ResultSet rs = null; + T apply = null; + try { + ps = connection.prepareStatement(sql); + rs = ps.executeQuery(); + if (rs.next()) { + apply = (T) mapper.apply(rs); + } + } catch (Exception e) { + logger.error(e.getMessage()); + } finally { + close(rs); + close(ps); + } + return apply; + } + + private void close(AutoCloseable closeable) { + if (null != closeable) { + try { + closeable.close(); + } catch (Exception e) { + logger.error(e.getMessage()); + } + } + } + + public interface ResultSetMapper { + T apply(ResultSet rs) throws SQLException; + } + } diff --git a/dbsyncer-manager/pom.xml b/dbsyncer-manager/pom.xml index 2c4761b7..9df1ed6b 100644 --- a/dbsyncer-manager/pom.xml +++ b/dbsyncer-manager/pom.xml @@ -5,7 +5,7 @@ dbsyncer org.ghi - 1.1.6-Beta + 1.1.7-Beta 4.0.0 dbsyncer-manager diff --git a/dbsyncer-monitor/pom.xml b/dbsyncer-monitor/pom.xml index fc439f82..a8d2c1ce 100644 --- a/dbsyncer-monitor/pom.xml +++ b/dbsyncer-monitor/pom.xml @@ -5,7 +5,7 @@ dbsyncer org.ghi - 1.1.6-Beta + 1.1.7-Beta 4.0.0 dbsyncer-monitor diff --git a/dbsyncer-parser/pom.xml b/dbsyncer-parser/pom.xml index a81d16c8..e87cbc4f 100644 --- a/dbsyncer-parser/pom.xml +++ b/dbsyncer-parser/pom.xml @@ -5,7 +5,7 @@ dbsyncer org.ghi - 1.1.6-Beta + 1.1.7-Beta 4.0.0 dbsyncer-parser diff --git a/dbsyncer-plugin/pom.xml b/dbsyncer-plugin/pom.xml index 91e18dc9..6c664f1b 100644 --- a/dbsyncer-plugin/pom.xml +++ b/dbsyncer-plugin/pom.xml @@ -5,7 +5,7 @@ dbsyncer org.ghi - 1.1.6-Beta + 1.1.7-Beta 4.0.0 dbsyncer-plugin diff --git a/dbsyncer-storage/pom.xml b/dbsyncer-storage/pom.xml index 2946b6ce..5dd6576b 100644 --- a/dbsyncer-storage/pom.xml +++ b/dbsyncer-storage/pom.xml @@ -5,7 +5,7 @@ dbsyncer org.ghi - 1.1.6-Beta + 1.1.7-Beta 4.0.0 dbsyncer-storage diff --git a/dbsyncer-web/pom.xml b/dbsyncer-web/pom.xml index 44722d8e..a28e391d 100644 --- a/dbsyncer-web/pom.xml +++ b/dbsyncer-web/pom.xml @@ -5,7 +5,7 @@ dbsyncer org.ghi - 1.1.6-Beta + 1.1.7-Beta 4.0.0 dbsyncer-web diff --git a/dbsyncer-web/src/main/resources/application.properties b/dbsyncer-web/src/main/resources/application.properties index eb3ee691..cf92fd16 100644 --- a/dbsyncer-web/src/main/resources/application.properties +++ b/dbsyncer-web/src/main/resources/application.properties @@ -25,7 +25,7 @@ management.endpoints.web.exposure.include=* management.endpoint.health.show-details=always management.health.elasticsearch.enabled=false info.app.name=DBSyncer -info.app.version=1.1.6-Beta +info.app.version=1.1.7-Beta info.app.copyright=©2021 ${info.app.name}(${info.app.version}) #All < Trace < Debug < Info < Warn < Error < Fatal < OFF diff --git a/pom.xml b/pom.xml index 6346b1ac..10588c53 100644 --- a/pom.xml +++ b/pom.xml @@ -6,7 +6,7 @@ org.ghi dbsyncer - 1.1.6-Beta + 1.1.7-Beta pom dbsyncer https://gitee.com/ghi/dbsyncer -- Gitee