InfoQServerless 实战:3 分钟实现文本敏感词过滤


InfoQServerless 实战:3 分钟实现文本敏感词过滤
本文插图

作者丨刘宇
策划丨田晓旭
敏感词过滤是随着互联网社区一起发展起来的一种阻止网络犯罪和网络暴力的技术手段 , 通过对可能存在犯罪或网络暴力的关键词进行有针对性的筛查和屏蔽 , 能够防患于未然 , 将后果严重的犯罪行为扼杀于萌芽之中 。
随着各种社交论坛的日益火爆 , 敏感词过滤逐渐成为了非常重要的功能 。 那么在 Serverless 架构下 , 利用 Python 语言 , 敏感词过滤又有那些新的实现呢?我们能否用最简单的方法实现一个敏感词过滤的 API 呢?
了解敏感过滤的几种方法
Replace 方法
敏感词过滤 , 其实在一定程度上是文本替换 , 以 Python 为例 , 我们可以通过 replace 来实现 , 首先准备一个敏感词库 , 然后通过 replace 进行敏感词替换:
def worldFilter(keywords, text):for eve in keywords:text = text.replace(eve, "***")return textkeywords = (" 关键词 1", " 关键词 2", " 关键词 3")content = " 这是一个关键词替换的例子 , 这里涉及到了关键词 1 还有关键词 2 , 最后还会有关键词 3 。 "print(worldFilter(keywords, content))
这种方法虽然操作简单 , 但是存在一个很大的问题:在文本和敏感词汇非常庞大的情况下 , 会出现很严重的性能问题 。
举个例子 , 我们先修改代码进行基本的性能测试:
import time
def worldFilter(keywords, text):for eve in keywords:text = text.replace(eve, "***")return textkeywords =[ " 关键词 " + str(i) for i in range(0,10000)]content = " 这是一个关键词替换的例子 , 这里涉及到了关键词 1 还有关键词 2 , 最后还会有关键词 3 。 " * 1000startTime = time.time()worldFilter(keywords, content)print(time.time()-startTime)
此时的输出结果是:0.12426114082336426 , 可以看到性能非常差 。
正则表达方法
相较于 replace , 使用正则表达 re.sub 实现可能更加快速 。
import timeimport redef worldFilter(keywords, text):return re.sub("|".join(keywords), "***", text)keywords =[ " 关键词 " + str(i) for i in range(0,10000)]content = " 这是一个关键词替换的例子 , 这里涉及到了关键词 1 还有关键词 2 , 最后还会有关键词 3 。 " * 1000startTime = time.time()worldFilter(keywords, content)print(time.time()-startTime)
增加性能测试之后 , 我们按照上面的方法进行改造测试 , 输出结果是 0.24773502349853516 。
对比这两个例子 , 我们会发现当前两种方法的性能差距不是很大 , 但是随着文本数量的增加 , 正则表达的优势会逐渐凸显 , 性能提升明显 。
正则表达方法相较于 replace , 使用正则表达 re.sub 实现可能更加快速 。
DFA 过滤敏感词
相对来说 , DFA 过滤敏感词的效率会更高一些 , 例如我们把坏人、坏孩子、坏蛋作为敏感词 , 那么它们的树关系可以这样表达:
InfoQServerless 实战:3 分钟实现文本敏感词过滤
本文插图

而 DFA 字典是这样表示的:
{'坏': {'蛋': {'\x00': 0},'人': {'\x00': 0},'孩': {'子': {'\x00': 0}}}}
使用这种树表示问题最大的好处就是可以降低检索次数、提高检索效率 。 其基本代码实现如下:
import time
class DFAFilter(object):def __init__(self):self.keyword_chains = {}# 关键词链表self.delimit = '\x00'# 限定
def parse(self, path):with open(path, encoding='utf-8') as f:for keyword in f:chars = str(keyword).strip().lower()# 关键词英文变为小写if not chars:# 如果关键词为空直接返回returnlevel = self.keyword_chainsfor i in range(len(chars)):if chars[i] in level:level = level[chars[i]]else:if not isinstance(level, dict):breakfor j in range(i, len(chars)):level[chars[j]] = {}last_level, last_char = level, chars[j]level = level[chars[j]]last_level[last_char] = {self.delimit: 0}breakif i == len(chars) - 1:level[self.delimit] = 0


推荐阅读