JDBC连接池封装MaxCompute/Hive/Oracle/Mysql( 三 )

>}**/private List> buildListMap(ResultSet resultSet) throws SQLException {if (resultSet == null) {return Lists.newArrayList();}List> resultList = new ArrayList<>();// 获取元数据ResultSetMetaData metaData = resultSet.getMetaData();while (resultSet.next()) {// 获取列数int columnCount = metaData.getColumnCount();LinkedHashMap map = new LinkedHashMap<>();for (int i = 0; i < columnCount; i++) {String columnName = metaData.getColumnName(i + 1);// 过滤掉查询的结果包含序号的if("mm.row_num_01".equalsIgnoreCase(columnName)|| "row_num_01".equalsIgnoreCase(columnName)){continue;}// 去除hive查询结果的mm.别名前缀if (columnName.startsWith("mm.")){columnName = columnName.substring(columnName.indexOf(".") + 1);}Object object = resultSet.getObject(columnName);// maxCompute里面的空返回的是使用nif ("\N".equalsIgnoreCase(String.valueOf(object))) {map.put(columnName, "");} else {map.put(columnName, object);}}resultList.add(map);}return resultList;}}MaxCompute JDBC连接池封装
MaxCompute 已经有了JDBC连接方式 也就是 odbc-jdbc, 最终能够获取一个Connection. 官方文档:
https://help.aliyun.com/document_detail/161246.html
封装MaxCompute JDBC连接参数
/** * @author itdl * @description maxCompute使用JDBC的连接参数 * @date 2022/08/08 10:07 */@Datapublic class MaxComputeJdbcConnParam extends BaseJdbcConnParam{/**阿里云accessId 相当于用户名 */private String aliyunAccessId;/**阿里云accessKey 相当于密码 */private String aliyunAccessKey;/** maxcompute_endpoint */private String endpoint;/**项目名称*/private String projectName;}封装MaxCompute JDBC连接实现类
就是实现父类AbstractConnUtil,实现抽象方法buildConnection
/** * @Description maxCompute JDBC连接实现 * @Author itdl * @Date 2022/08/08 14:26 */@Slf4jpublic class MaxComputeJdbcUtil extends AbstractConnUtil<MaxComputeJdbcConnParam>{/**JDBC 驱动名称*/private static final String DRIVER_NAME = "com.aliyun.odps.jdbc.OdpsDriver";/*** 构造函数,构造工具类对象** @param connParam 连接参数*/public MaxComputeJdbcUtil(MaxComputeJdbcConnParam connParam) {super(connParam);}@Overrideprotected Connection buildConnection() {return buildConn();}/*** 创建连接* @return 数据库连接*/private Connection buildConn() {try {Class.forName(DRIVER_NAME);} catch (ClassNotFoundException e) {e.printStackTrace();throw new BizException(ResultCode.MAX_COMPUTE_DRIVE_LOAD_ERR);}try {Properties dbProperties = new Properties();dbProperties.put("user", connParam.getAliyunAccessId());dbProperties.put("password", connParam.getAliyunAccessKey());dbProperties.put("remarks", "true");// JDBCURL连接模板String jdbcUrlTemplate = "jdbc:odps:%s?project=%s&useProjectTimeZone=true";// 使用驱动管理器连接获取连接return DriverManager.getConnection(String.format(jdbcUrlTemplate, connParam.getEndpoint(), connParam.getProjectName()), dbProperties);} catch (SQLException e) {e.printStackTrace();throw new BizException(ResultCode.CONN_USER_PWD_ERR);}}}连接测试代码一起放在结尾,将会开启多个线程获取连接,然后去获取表名,表注释,字段名,字段注释,传入page, size和普通sql就可以实现分页查询的封装方法
Hive JDBC连接池封装
Hive JDBC连接参数
【JDBC连接池封装MaxCompute/Hive/Oracle/Mysql】Hive连接参数封装,除了基础的JDBC所需字段,还需要kerberos相关字段,因为hive开启kerberos认证后,需要使用kertab密钥文件和kbr5.conf配置文件去认证 。将会在参数和测试代码中得到重复的体现 。
/** * @Description Hive JDBC connection params * @Author itdl * @Date 2022/08/10 16:40 */@Data@EqualsAndHashCode(callSuper = false)public class HiveJdbcConnParam extends BaseJdbcConnParam {/*** enable kerberos authentication*/private boolean enableKerberos;/*** principal*/private String principal;/*** kbr5 file path in dick*/private String kbr5FilePath;/*** keytab file path in dick*/private String keytabFilePath;}Hive JDBC获取连接实现
Hive获取JDBC连接之后,本来可以从Connection的元数据中获取表的注释,但是获取的中文注释居然是乱码,但是我们Hue上查看表注释又是正常,暂时没找到这种方式如何解决,从而退而求其次,通过表名去获取建表语句,从建表语句中通过正则表达式提取表的注释 。
/** * @Description hive connection util * @Author itdl * @Date 2022/08/10 16:52 */@Slf4jpublic class HiveConnUtil extends AbstractConnUtil<HiveJdbcConnParam>{public HiveConnUtil(HiveJdbcConnParam connParam) {super(connParam);}/*** 获取连接* @return 连接*/public Connection getConnection() {return connection;}@Overrideprotected Connection buildConnection(){try {//Class.forName("org.Apache.hive.jdbc.HiveDriver");Class.forName(connParam.getDriverName());} catch (ClassNotFoundException e) {e.printStackTrace();throw new BizException(ResultCode.HIVE_DRIVE_LOAD_ERR);}// 开启kerberos后需要私钥// 拼接jdbcUrlString jdbcUrl = "jdbc:hive2://%s:%s/%s";String ip = connParam.getIp();String port = connParam.getPort() + "";String dbName = connParam.getDbName();final String username = connParam.getUsername();final String password = connParam.getPassword();// is enable kerberos authenticationfinal boolean enableKerberos = connParam.isEnableKerberos();// 格式化Connection connection;// 获取连接try {Properties dbProperties = new Properties();dbProperties.put("user", username);dbProperties.put("password", password);// 加上remark后,能够获取到标注释 但是会出现中文乱码dbProperties.put("remarks", "true");if (!enableKerberos) {jdbcUrl = String.format(jdbcUrl, ip, port, dbName);connection = DriverManager.getConnection(jdbcUrl, dbProperties);} else {final String principal = connParam.getPrincipal();final String kbr5FilePath = connParam.getKbr5FilePath();final String secretFilePath = connParam.getKeytabFilePath();String format = "jdbc:hive2://%s:%s/%s;principal=%s";jdbcUrl = String.format(format, ip, port, dbName, principal);// 使用hadoop安全认证System.setProperty("JAVA.security.krb5.conf", kbr5FilePath);System.setProperty("javax.security.auth.useSubjectCredsOnly", "false");// 解决windows中执行可能出现找不到HADOOP_HOME或hadoop.home.dir问题// Kerberos认证org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();conf.set("hadoop.security.authentication", "Kerberos");conf.set("keytab.file", secretFilePath);conf.set("kerberos.principal", principal);UserGroupInformation.setConfiguration(conf);try {UserGroupInformation.loginUserFromKeytab(username, secretFilePath);} catch (IOException e) {e.printStackTrace();throw new BizException(ResultCode.KERBEROS_AUTH_FAIL_ERR);}try {connection = DriverManager.getConnection(jdbcUrl, dbProperties);} catch (SQLException e) {e.printStackTrace();throw new BizException(ResultCode.KERBEROS_AUTH_SUCCESS_GET_CONN_FAIL_ERR);}}log.info("=====>>>获取hive连接成功:username:{},jdbcUrl: {}", username, jdbcUrl);return connection;} catch (SQLException e) {e.printStackTrace();throw new BizException(ResultCode.HIVE_CONN_USER_PWD_ERR);} catch (BizException e){throw e;}catch (Exception e) {e.printStackTrace();throw new BizException(ResultCode.HIVE_CONN_ERR);}}}Oracle JDBC连接参数封装
只需要继承父类即可
/** * @Description Oracle连接的JDBC参数 * @Author itdl * @Date 2022/08/15 09:50 */public class OracleJdbcConnParam extends BaseJdbcConnParam{}Oracle JDBC连接实现类
包括了普通用户的认证和dba用户的认证
/** * @Description Oracle获取jdbc连接工具类 * @Author itdl * @Date 2022/08/15 09:52 */@Slf4jpublic class OracleConnUtil extends AbstractConnUtil<OracleJdbcConnParam> {/*** 构造函数,构造工具类对象** @param connParam 连接参数*/public OracleConnUtil(OracleJdbcConnParam connParam) {super(connParam);}@Overrideprotected Connection buildConnection() {try {Class.forName("oracle.jdbc.driver.OracleDriver");} catch (ClassNotFoundException e) {e.printStackTrace();throw new BizException(ResultCode.ORACLE_DRIVE_LOAD_ERR);}// 拼接jdbcUrlString jdbcUrl = "jdbc:oracle:thin:@//%s:%s/%s";final String ip = connParam.getIp();final String port = connParam.getPort() + "";final String dbName = connParam.getDbName();final String username = connParam.getUsername();final String password = connParam.getPassword();// 格式化jdbcUrl = String.format(jdbcUrl, ip, port, dbName);// 获取连接Connection connection;try {Properties dbProperties = new Properties();// 用户名 如果是dba,则后面跟了as sysdbaString dba = "as sysdba";dbProperties.put("password", password);dbProperties.put("remarks", "true");if (username.trim().endsWith(dba)) {dbProperties.put("user", username.trim().substring(0, username.trim().indexOf(dba) - 1));dbProperties.put("defaultRowPrefetch", "15");dbProperties.put("internal_logon", "sysdba");connection = DriverManager.getConnection(jdbcUrl, dbProperties);} else {dbProperties.put("user", username);connection = DriverManager.getConnection(jdbcUrl, dbProperties);}log.info("=====>>>获取oracle连接成功:username:{}, jdbcUrl: {}", username, jdbcUrl);return connection;} catch (SQLException e) {e.printStackTrace();if (e.getMessage().contains("TNS:listener")) {throw new BizException(ResultCode.CONN_LISTENER_UNKNOWN_ERR);}if (e.getMessage().contains("ORA-01017")) {throw new BizException(ResultCode.CONN_USER_PWD_ERR);}if (e.getMessage().contains("IO 错误: Got minus one from a read call")) {throw new BizException(ResultCode.CONN_CONN_TOO_MANY_ERR);}throw new BizException(ResultCode.CONN_UNKNOWN_ERR);} catch (Exception e) {throw new BizException(ResultCode.CONN_UNKNOWN_ERR);}}}Mysql JDBC连接池封装
Mysql JDBC连接参数封装
只需要继承父类即可
/** * @Description Mysql连接的JDBC参数 * @Author itdl * @Date 2022/08/15 09:50 */public class MysqlJdbcConnParam extends BaseJdbcConnParam{}Mysql JDBC连接实现
需要注意的是连接的属性里面配置useInformationSchema=true,表示可以直接从Connection中获取表和字段的注释 。
/** * @Description Mysql获取jdbc连接工具类 * @Author itdl * @Date 2022/08/15 09:52 */@Slf4jpublic class MysqlConnUtil extends AbstractConnUtil<MysqlJdbcConnParam> {/*** 构造函数,构造工具类对象** @param connParam 连接参数*/public MysqlConnUtil(MysqlJdbcConnParam connParam) {super(connParam);}@Overrideprotected Connection buildConnection() {try {Class.forName("com.mysql.cj.jdbc.Driver");} catch (ClassNotFoundException e) {e.printStackTrace();throw new BizException(ResultCode.MYSQL_DRIVE_LOAD_ERR);}// 拼接jdbcUrlString jdbcUrl = "jdbc:mysql://%s:%s/%s?serverTimezone=Asia/Shanghai&useUnicode=true&characterEncoding=UTF-8";final String ip = connParam.getIp();final String port = connParam.getPort() + "";final String dbName = connParam.getDbName();final String username = connParam.getUsername();final String password = connParam.getPassword();// 格式化jdbcUrl = String.format(jdbcUrl, ip, port, dbName);// 获取连接try {Properties dbProperties = new Properties();dbProperties.put("user", username);dbProperties.put("password", password);dbProperties.put("remarks", "true");// 设置可以获取tables remarks信息dbProperties.setProperty("useInformationSchema", "true");Connection connection = DriverManager.getConnection(jdbcUrl,dbProperties);log.info("=====>>>获取mysql连接成功:username:{}, jdbcUrl: {}", username, jdbcUrl);return connection;} catch (SQLException e) {e.printStackTrace();if (e.getMessage().contains("Unknown database")){throw new BizException(ResultCode.CONN_UNKNOWN_DB_ERR);}throw new BizException(ResultCode.CONN_USER_PWD_ERR);} catch (Exception e) {throw new BizException(ResultCode.CONN_UNKNOWN_ERR);}}}测试代码连接各自数据库
@SpringBootTest(classes = DbConnectionDemoApplication.class)@RunWith(value = https://www.isolves.com/it/cxkf/yy/JAVA/2022-08-17/SpringRunner.class)@Slf4jclass DbConnectionDemoApplicationTests {private DbConnPool connPool = null;@Testpublic void testMysqlConn() throws InterruptedException {// 创建连接参数final MysqlJdbcConnParam connParam = new MysqlJdbcConnParam();final String ip = "localhost";final Integer port = 3306;final String username = "root";final String password = "root";final String dbname = "test_db";// 设置参数connParam.setDriverName(Driver.class.getName());connParam.setIp(ip);connParam.setPort(port);connParam.setUsername(username);connParam.setPassword(password);connParam.setDbName(dbname);// 创建连接池connPool = new DbConnPool<>(connParam, 2);handler01(dbname, DbDialectEnum.MYSQL);new Thread(() -> handler01(dbname, DbDialectEnum.MYSQL)).start();new Thread(() -> handler01(dbname, DbDialectEnum.MYSQL)).start();Thread.sleep(60 * 1000);}@Testpublic void testOracleConn() throws InterruptedException {// 创建连接参数final OracleJdbcConnParam connParam = new OracleJdbcConnParam();final String ip = "你的Oracle的IP地址";final Integer port = 1521;// 如果是admin账号 用户后面+ as sysdbafinal String username = "用户名";final String password = "密码";final String dbname = "实例/服务名";// 设置参数connParam.setDriverName(Driver.class.getName());connParam.setIp(ip);connParam.setPort(port);connParam.setUsername(username);connParam.setPassword(password);connParam.setDbName(dbname);// 创建连接池connPool = new DbConnPool<>(connParam, 2);final DbDialectEnum dbDialectEnum = DbDialectEnum.ORACLE;// 处理操作(oracle的schemaName就是用户名)handler01(username, dbDialectEnum);// 新建两个线程获取连接new Thread(() -> handler01(username, dbDialectEnum)).start();new Thread(() -> handler01(username, dbDialectEnum)).start();Thread.sleep(60 * 1000);}@Testpublic void testHiveConn() throws InterruptedException {// 创建连接参数final HiveJdbcConnParam connParam = new HiveJdbcConnParam();final String ip = "连接的域名";final Integer port = 10000;// 如果是admin账号 用户后面+ as sysdbafinal String username = "账号@域名";final String password = "";final String dbname = "数据库名";final String principal = "hive/_HOST@域名";final String kbr5FilePath = "C:\workspace\krb5.conf";final String keytabFilePath = "C:\workspace\zhouyu.keytab";// 设置参数connParam.setDriverName(Driver.class.getName());connParam.setIp(ip);connParam.setPort(port);connParam.setUsername(username);connParam.setPassword(password);connParam.setDbName(dbname);connParam.setEnableKerberos(true);connParam.setPrincipal(principal);connParam.setKbr5FilePath(kbr5FilePath);connParam.setKeytabFilePath(keytabFilePath);// 创建连接池connPool = new DbConnPool<>(connParam, 2);final DbDialectEnum dbDialectEnum = DbDialectEnum.HIVE;// 处理操作(oracle的schemaName就是用户名)handler01(username, dbDialectEnum);// 新建两个线程获取连接new Thread(() -> handler01(username, dbDialectEnum)).start();new Thread(() -> handler01(username, dbDialectEnum)).start();Thread.sleep(10 * 60 * 1000);}@Testpublic void testMaxComputeConn() throws InterruptedException {// 创建连接参数final MaxComputeJdbcConnParam connParam = new MaxComputeJdbcConnParam();String accessId = "你的阿里云accessId";String accessKey = "你的阿里云accessKey";String endpoint = "http://service.cn-chengdu.maxcompute.aliyun.com/api";String projectName = "项目名=数据库名";// 设置参数connParam.setDriverName(Driver.class.getName());connParam.setAliyunAccessId(accessId);connParam.setAliyunAccessKey(accessKey);connParam.setEndpoint(endpoint);connParam.setProjectName(projectName);// 创建连接池connPool = new DbConnPool<>(connParam, 2);final DbDialectEnum dbDialectEnum = DbDialectEnum.MAX_COMPUTE;// 处理操作(oracle的schemaName就是用户名)handler01(projectName, dbDialectEnum);// 新建两个线程获取连接new Thread(() -> handler01(projectName, dbDialectEnum)).start();new Thread(() -> handler01(projectName, dbDialectEnum)).start();Thread.sleep(60 * 1000);}private void handler01(String schemaName, DbDialectEnum dbDialectEnum) {final Connection connection = connPool.getConnection();// 构建工具类final SqlUtil sqlUtil = new SqlUtil(connection, dbDialectEnum.getCode());// 获取表和注释final List tables = sqlUtil.getTables(schemaName);log.info("===============获取所有表和注释开始===================");log.info(tables.toString());log.info("===============获取所有表和注释结束===================");// 获取字段和注释final String tableName = tables.get(0).getTableName();final List columns = sqlUtil.getColumnsByTableName(tableName);log.info("===============获取第一个表的字段和注释开始===================");log.info(columns.toString());log.info("===============获取第一个表的字段和注释结束===================");final PageResult> pageResult = sqlUtil.pageQueryMap("select * from " + tableName, 1, 10);log.info("===============SQL分页查询开始===================");log.info("总数:{}", pageResult.getTotal());log.info("记录数:{}", JSONObject.toJSONString(pageResult.getRows()));log.info("===============SQL分页查询结束===================");connPool.freeConnection(connection);}@Afterpublic void close(){if (connPool != null){connPool.close();log.info("==================连接池成功关闭================");}}}小结就是为了方便整合第三方数据源做数据源管理时比较重要(若有所需,私信“封装数据源源码”获取源码) 。


推荐阅读