代码拉取完成,页面将自动刷新
假设我们要将mysql表T_USER同步到目标端Oracle T_USER_2,源端表T_USER表结构与目标端表T_USER_2一致。我们的需求是只保留FLAG字段等于0的用户数据。
需求有了,接下来我们就要实现EventProcessor接口做自定义数据过滤
package cn.vbill.middleware.porter.plugin;
public class UserFilter implements cn.vbill.middleware.porter.core.event.s.EventProcessor {
@Override
public void process(ETLBucket etlBucket) {
List<ETLRow> rows = etlBucket.getRows().stream().filter(r -> {
//第一步 找到表名为T_USER的记录
boolean tableMatch = r.getFinalTable().equalsIgnoreCase("T_USER");
if (!tableMatch) return tableMatch;
//第二步 找到字段FLAG的值不等于0的记录
boolean columnMatch = r.getColumns().stream().filter(c -> c.getFinalName().equalsIgnoreCase("FLAG")
&& (null == c.getFinalValue() || !c.getFinalValue().equals("0"))).count() > 0;
return tableMatch && columnMatch;
}).collect(Collectors.toList());
//第三步 清除不符合条件的集合
etlBucket.getRows().removeAll(rows);
}
}
在任务中指定自定义数据处理插件:
以下配置文件格式适用配置管理后台"同步管理->本地任务->新增"
如果是本地任务配置文件需要增加前缀"porter.task[任务下标,从0开始]"
taskId=任务ID
nodeId=节点1,节点2,节点3
consumer.consumerName=CanalFetch
consumer.converter=canalRow
consumer.source.sourceType=CANAL
consumer.source.slaveId=0
consumer.source.address=127.0.0.1:3306
consumer.source.database=数据库
consumer.source.username=账号
consumer.source.password=密码
consumer.source.filter=*.\.t_user
consumer.eventProcessor.className=cn.vbill.middleware.porter.plugin.UserFilter
consumer.eventProcessor.content=/path/UserFilter.class(xxx.jar包,xxx.java类)
loader.loaderName=JdbcBatch #目标端插件
loader.source.sourceType=JDBC
loader.source.dbType=ORACLE
loader.source.url=jdbc:oracle:thin:@//127.0.0.1:1521/oracledb
loader.source.userName=demo
loader.source.password=demo
mapper[0].auto=false
mapper[0].table=T_USER,T_USER_2