数据实时同步之MongoDB( 三 )

4.2获取表对象
/*** 获取表对象* @param mongoDatabase* @param testCollection* @return*/public static MongoCollection<Document> getCollection(MongoDatabase mongoDatabase, String testCollection) {MongoCollection<Document> collection = null;try {//获取数据库dataBase下的集合collecTion,如果没有将自动创建collection = mongoDatabase.getCollection(testCollection);} catch (Exception e) {throw new RuntimeException("获取" + mongoDatabase.getName() + "数据库下的" + testCollection + "集合 failed !" + e);}return collection;}4.3获取数据流处理
/*** 解析操作类型* @param op* @return*/private static String getEventType(String op) {switch (op) {case "i":return "insert";case "u":return "update";case "d":return "delete";default:return "other";}}/*** 数据解析、格式封装,返回所有insert、update新数据,delete的老数据,做输出为逻辑删除,condition字段为空* @return JSONObject*/private static JSONObject resultRow(Document document, JSONObject result, String eventType) {JSONObject columns = new JSONObject();// 存放变化后的字段result.put("columns", columns);result.put("condition", new JSONObject()); // 条件for (Map.Entry<String, Object> entry : document.entrySet()) {if (entry.getKey().equalsIgnoreCase("_id")) {columns.put(entry.getKey(), ((ObjectId) entry.getValue()).toString());continue;}columns.put(entry.getKey(), entry.getValue());}return result;}case "d":return "delete";default:return "other";}}/*** 数据解析、格式封装,返回所有insert、update新数据,delete的老数据,做输出为逻辑删除,condition字段为空* @return JSONObject*/private static JSONObject resultRow(Document document, JSONObject result, String eventType) {JSONObject columns = new JSONObject();// 存放变化后的字段result.put("columns", columns);result.put("condition", new JSONObject()); // 条件for (Map.Entry<String, Object> entry : document.entrySet()) {if (entry.getKey().equalsIgnoreCase("_id")) {columns.put(entry.getKey(), ((ObjectId) entry.getValue()).toString());continue;}columns.put(entry.getKey(), entry.getValue());}return result;    }4.4数据流标准化
/*** 解析操作类型* @param op* @return*/private static String getEventType(String op) {switch (op) {case "i":return "insert";case "u":return "update";case "d":return "delete";default:return "other";}}/*** 数据解析、格式封装,返回所有insert、update新数据,delete的老数据,做输出为逻辑删除,condition字段为空* @return JSONObject*/private static JSONObject resultRow(Document document, JSONObject result, String eventType) {JSONObject columns = new JSONObject();// 存放变化后的字段result.put("columns", columns);result.put("condition", new JSONObject()); // 条件for (Map.Entry<String, Object> entry : document.entrySet()) {if (entry.getKey().equalsIgnoreCase("_id")) {columns.put(entry.getKey(), ((ObjectId) entry.getValue()).toString());continue;}columns.put(entry.getKey(), entry.getValue());}return result;}case "d":return "delete";default:return "other";}}/*** 数据解析、格式封装,返回所有insert、update新数据,delete的老数据,做输出为逻辑删除,condition字段为空* @return JSONObject*/private static JSONObject resultRow(Document document, JSONObject result, String eventType) {JSONObject columns = new JSONObject();// 存放变化后的字段result.put("columns", columns);result.put("condition", new JSONObject()); // 条件for (Map.Entry<String, Object> entry : document.entrySet()) {if (entry.getKey().equalsIgnoreCase("_id")) {columns.put(entry.getKey(), ((ObjectId) entry.getValue()).toString());continue;}columns.put(entry.getKey(), entry.getValue());}return result;    }5、结果
5.1新增

数据实时同步之MongoDB

文章插图
 
5.2更新
数据实时同步之MongoDB

文章插图
 
5.3删除
数据实时同步之MongoDB

文章插图
 
实践目前普元数据服务共享平台DSP(Data Service Platform),已经集成离线开发和在线开发实现单表和多表同步到HBASE的实践,做到了这一步,并且对客户的需求完成交付 。
总之,对于当前企业数据库MongoDB,无论是使用Change Streams,还是Oplog增量同步,实现数据汇聚、搭建数据服务共享平台,提取价值、长久规划,都是必不可少的 。
关于作者: 雨声,现任普元高级开发工程师,熟悉软件开发的大数据、Java、常用消息组件等主流技术,有数据采集、消息推送、数据清洗、实时计算、数据可视化的完整开发经验 。


推荐阅读