4.2获取表对象
/*** 获取表对象* @param mongoDatabase* @param testCollection* @return*/public static MongoCollection<Document> getCollection(MongoDatabase mongoDatabase, String testCollection) {MongoCollection<Document> collection = null;try {//获取数据库dataBase下的集合collecTion,如果没有将自动创建collection = mongoDatabase.getCollection(testCollection);} catch (Exception e) {throw new RuntimeException("获取" + mongoDatabase.getName() + "数据库下的" + testCollection + "集合 failed !" + e);}return collection;}
4.3获取数据流处理
/*** 解析操作类型* @param op* @return*/private static String getEventType(String op) {switch (op) {case "i":return "insert";case "u":return "update";case "d":return "delete";default:return "other";}}/*** 数据解析、格式封装,返回所有insert、update新数据,delete的老数据,做输出为逻辑删除,condition字段为空* @return JSONObject*/private static JSONObject resultRow(Document document, JSONObject result, String eventType) {JSONObject columns = new JSONObject();// 存放变化后的字段result.put("columns", columns);result.put("condition", new JSONObject()); // 条件for (Map.Entry<String, Object> entry : document.entrySet()) {if (entry.getKey().equalsIgnoreCase("_id")) {columns.put(entry.getKey(), ((ObjectId) entry.getValue()).toString());continue;}columns.put(entry.getKey(), entry.getValue());}return result;}case "d":return "delete";default:return "other";}}/*** 数据解析、格式封装,返回所有insert、update新数据,delete的老数据,做输出为逻辑删除,condition字段为空* @return JSONObject*/private static JSONObject resultRow(Document document, JSONObject result, String eventType) {JSONObject columns = new JSONObject();// 存放变化后的字段result.put("columns", columns);result.put("condition", new JSONObject()); // 条件for (Map.Entry<String, Object> entry : document.entrySet()) {if (entry.getKey().equalsIgnoreCase("_id")) {columns.put(entry.getKey(), ((ObjectId) entry.getValue()).toString());continue;}columns.put(entry.getKey(), entry.getValue());}return result; }
4.4数据流标准化
/*** 解析操作类型* @param op* @return*/private static String getEventType(String op) {switch (op) {case "i":return "insert";case "u":return "update";case "d":return "delete";default:return "other";}}/*** 数据解析、格式封装,返回所有insert、update新数据,delete的老数据,做输出为逻辑删除,condition字段为空* @return JSONObject*/private static JSONObject resultRow(Document document, JSONObject result, String eventType) {JSONObject columns = new JSONObject();// 存放变化后的字段result.put("columns", columns);result.put("condition", new JSONObject()); // 条件for (Map.Entry<String, Object> entry : document.entrySet()) {if (entry.getKey().equalsIgnoreCase("_id")) {columns.put(entry.getKey(), ((ObjectId) entry.getValue()).toString());continue;}columns.put(entry.getKey(), entry.getValue());}return result;}case "d":return "delete";default:return "other";}}/*** 数据解析、格式封装,返回所有insert、update新数据,delete的老数据,做输出为逻辑删除,condition字段为空* @return JSONObject*/private static JSONObject resultRow(Document document, JSONObject result, String eventType) {JSONObject columns = new JSONObject();// 存放变化后的字段result.put("columns", columns);result.put("condition", new JSONObject()); // 条件for (Map.Entry<String, Object> entry : document.entrySet()) {if (entry.getKey().equalsIgnoreCase("_id")) {columns.put(entry.getKey(), ((ObjectId) entry.getValue()).toString());continue;}columns.put(entry.getKey(), entry.getValue());}return result; }
5、结果
5.1新增
文章插图
5.2更新
文章插图
5.3删除
文章插图
实践目前普元数据服务共享平台DSP(Data Service Platform),已经集成离线开发和在线开发实现单表和多表同步到HBASE的实践,做到了这一步,并且对客户的需求完成交付 。
总之,对于当前企业数据库MongoDB,无论是使用Change Streams,还是Oplog增量同步,实现数据汇聚、搭建数据服务共享平台,提取价值、长久规划,都是必不可少的 。
关于作者: 雨声,现任普元高级开发工程师,熟悉软件开发的大数据、Java、常用消息组件等主流技术,有数据采集、消息推送、数据清洗、实时计算、数据可视化的完整开发经验 。
推荐阅读
- Nacos数据持久化到MySQL
- MySQL进阶之MySQL数据库整体架构设计
- 黑客入侵MongoDB数据库 被入侵数据占总数据库47%
- 如何在mysql 造1亿条记录的大容量数据表?
- 淘宝分析数据用什么工具 怎么学会分析淘宝店铺数据
- Dataphin-数据中台利器
- Facebook正在泄露信息 Facebook将数据秘密共享给其他16个app
- 数据存储层HBase进阶之写流程
- 数据持久化框架为什么放弃Hibernate、Mybatis选择JDBCTemplate
- 生意参谋竞店怎么删除 怎么看竞店的销售数据