基于PySpark SQL的媒体浏览日志ETL作业

pyspark除了官方的文档 , 网上的教程资料一直很少,但基于调度平台下 , 使用pyspark编写代码非常高效 , 程序本身是提交到spark集群中 , 性能上也是毫无问题的,在本文中,我们将深入探讨基于Spark的媒体浏览日志ETL(提取、转换、加载)流水线的详细实现,在展示如何使用PySpark SQL处理大规模的媒体浏览日志数据 , 包括IP地址转换、数据清洗、时间维度补充、码表关联等关键步骤 。

基于PySpark SQL的媒体浏览日志ETL作业

文章插图
一、环境配置首先,我们需要创建一个SparkSession并导入必要的库和设置默认参数,包括与IP-to-Location数据库的交互以及其他相关的配置 。
如果pyspark仅仅是本地运行而不是提交集群时,可以使用findspark库 , 它能够帮助我们快速初始化Spark环境 。在开始之前,确保您已经成功安装了findspark库,并已经下载并解压了Spark二进制文件 。将Spark的安装路径和Python/ target=_blank class=infotextkey>Python解释器路径指定为变量 。
【基于PySpark SQL的媒体浏览日志ETL作业】import findspark# 指定Spark的安装路径spark_home = "/usr/local/spark"# 指定用于Spark的Python解释器路径python_path = "/home/hadoop/.conda/envs/sparkbox/bin/python3.6"# 使用findspark.init方法初始化Spark环境findspark.init(spark_home, python_path)findspark.init方法将帮助设置PYSPARK_PYTHON和SPARK_HOME环境变量,确保正确的Spark库和配置文件被加载 。其简化了Spark环境的初始化过程,避免手动配置环境变量 。
二、数据处理接下来,我们定义了一个NewsEtl类,用于执行数据处理和转换的各个步骤 。这包括从HDFS中读取媒体浏览日志数据,进行IP地址转、换,清洗数据,添加时间维度,补充码表信息等 。
在spark_function中,我们详细说明了数据处理的逻辑 。这包括读取媒体浏览日志数据、进行IP地址转换、添加时间维度、补充码表信息、数据清洗和最终写入HDFS等步骤 。
1.数据读取首先,我们使用PySpark的read方法从HDFS中读取媒体浏览日志数据 。我们指定了数据的schema , 以确保正确地解析每一列 。
df = spark.read.schema(schema).parquet("hdfs://xxx:8020/user/hive/warehouse/xxx.db/ods_media_browse_log").filter("dt in ({})".format(",".join(["'{}'".format(partition) for partition in latest_partitions])))2.IP地址转换接下来,我们通过iptranslate函数将IP地址转换为地理位置信息 。这使用了XdbSearcher类,该类负责读取xdb文件并执行IP地址的二分查找 。
# 根据IP地址获取地点信息from_ip_get_place_udf = udf(action.iptranslate, struct_schema)df = df.withColumn('country', from_ip_get_place_udf(col('ip'), lit('country')))df = df.withColumn("place", from_ip_get_place_udf(col('ip')))df = df.withColumn("country", df["place"]["country"])df = df.withColumn("city", df["place"]["city"])df = df.withColumn("province", df["place"]["province"])df = df.drop('place')3.时间维度添加我们生成当前时间的时间戳,并添加各种时间格式的列,包括年、季度、月、日、小时等 。
# 生成当前时间的时间戳df = df.withColumn("current_timestamp", from_unixtime(df["operation_time"] / 1000))# 添加各种时间格式的列df = df.withColumn("year", date_format("current_timestamp", "yyyy"))df = df.withColumn("quarter", date_format("current_timestamp", "yyyy-MM"))df = df.withColumn("month", date_format("current_timestamp", "yyyy-MM"))df = df.withColumn("day", date_format("current_timestamp", "dd"))df = df.withColumn("hour_time", date_format("current_timestamp", "yyyy-MM-dd HH"))df = df.withColumn("dt", date_format("current_timestamp", "yyyy-MM-dd"))df = df.withColumn("hour", date_format("current_timestamp", "HH"))df = df.drop('current_timestamp')4.数据清洗最后,我们对数据进行清洗,包括将空值替换为默认值、字符串去除空格、数据类型转换等 。
# 数据清洗newdf = newdf.withColumn("media_type", when(col("media_type").isNull(), 0).otherwise(col("media_type")))newdf = newdf.withColumn("news_type", when(col("news_type").isNull(), 99).otherwise(col("news_type")))newdf = newdf.withColumn("original_type", when(col("original_type").isNull(), 99).otherwise(col("original_type")))# ...


推荐阅读