/ 详情

如何开发自定义数据处理插件

待办的
创建于  
2019-01-21 13:33

假设我们要将mysql表T_USER同步到目标端Oracle T_USER_2,源端表T_USER表结构与目标端表T_USER_2一致。我们的需求是只保留FLAG字段等于0的用户数据。

需求有了,接下来我们就要实现EventProcessor接口做自定义数据过滤

	package cn.vbill.middleware.porter.plugin;
	public class UserFilter implements cn.vbill.middleware.porter.core.event.s.EventProcessor {
    @Override
    public void process(ETLBucket etlBucket) {
        List<ETLRow> rows = etlBucket.getRows().stream().filter(r -> {
            //第一步 找到表名为T_USER的记录
            boolean tableMatch = r.getFinalTable().equalsIgnoreCase("T_USER");
            if (!tableMatch) return tableMatch;
            //第二步 找到字段FLAG的值不等于0的记录
            boolean columnMatch = r.getColumns().stream().filter(c -> c.getFinalName().equalsIgnoreCase("FLAG")
            && (null == c.getFinalValue() || !c.getFinalValue().equals("0"))).count() > 0;
            return tableMatch && columnMatch;
        }).collect(Collectors.toList());
        //第三步 清除不符合条件的集合
        etlBucket.getRows().removeAll(rows);
    }
}

在任务中指定自定义数据处理插件:

以下配置文件格式适用配置管理后台"同步管理->本地任务->新增"
如果是本地任务配置文件需要增加前缀"porter.task[任务下标,从0开始]"

taskId=任务ID
nodeId=节点1,节点2,节点3
consumer.consumerName=CanalFetch
consumer.converter=canalRow
consumer.source.sourceType=CANAL
consumer.source.slaveId=0
consumer.source.address=127.0.0.1:3306
consumer.source.database=数据库
consumer.source.username=账号
consumer.source.password=密码
consumer.source.filter=*.\.t_user
consumer.eventProcessor.className=cn.vbill.middleware.porter.plugin.UserFilter
consumer.eventProcessor.content=/path/UserFilter.class(xxx.jar包,xxx.java类)

loader.loaderName=JdbcBatch #目标端插件
loader.source.sourceType=JDBC
loader.source.dbType=ORACLE
loader.source.url=jdbc:oracle:thin:@//127.0.0.1:1521/oracledb
loader.source.userName=demo
loader.source.password=demo

mapper[0].auto=false
mapper[0].table=T_USER,T_USER_2

评论 (0)

zkevin 创建了任务 6年前
zkevin 添加了
 
question
标签
6年前
展开全部操作日志

登录 后才可以发表评论

状态
负责人
里程碑
Pull Requests
关联的 Pull Requests 被合并后可能会关闭此 issue
分支
开始日期   -   截止日期
-
置顶选项
优先级
参与者(1)
zkevin-zhuoluo