InfoQServerless 实战:3 分钟实现文本敏感词过滤( 二 )


def filter(self, message, repl="*"):message = message.lower()ret = []start = 0while start
return ''.join(ret)
gfw = DFAFilter()gfw.parse( "./sensitive_words")content = " 这是一个关键词替换的例子 , 这里涉及到了关键词 1 还有关键词 2 , 最后还会有关键词 3 。 " * 1000startTime = time.time()result = gfw.filter(content)print(time.time()-startTime)
这里的字典库是:
with open("./sensitive_words", 'w') as f:f.write("\n".join( [ " 关键词 " + str(i) for i in range(0,10000)]))
执行结果:
0.06450581550598145
从中 , 我们可以看到性能又进一步得到了提升 。
AC 自动机过滤敏感词算法
什么是 AC 自动机?简单来说 , AC 自动机就是字典树 +kmp 算法 + 失配指针 , 一个常见的例子就是给出 n 个单词 , 再给出一段包含 m 个字符的文章 , 让你找出有多少个单词在文章里出现过 。
代码实现:
import timeclass Node(object):def __init__(self):self.next = {}self.fail = Noneself.isWord = Falseself.word = ""
class AcAutomation(object):
def __init__(self):self.root = Node()
# 查找敏感词函数def search(self, content):p = self.rootresult = []currentposition = 0
while currentposition
if word in p.next:p = p.next[word]else:p = self.root
if p.isWord:result.append(p.word)p = self.rootcurrentposition += 1return result
# 加载敏感词库函数def parse(self, path):with open(path, encoding='utf-8') as f:for keyword in f:temp_root = self.rootfor char in str(keyword).strip():if char not in temp_root.next:temp_root.next[char] = Node()temp_root = temp_root.next[char]temp_root.isWord = Truetemp_root.word = str(keyword).strip()
# 敏感词替换函数def wordsFilter(self, text):""":param ah: AC 自动机:param text: 文本:return: 过滤敏感词之后的文本"""result = list(set(self.search(text)))for x in result:m = text.replace(x, '*' * len(x))text = mreturn text
acAutomation = AcAutomation()acAutomation.parse('./sensitive_words')startTime = time.time()print(acAutomation.wordsFilter(" 这是一个关键词替换的例子 , 这里涉及到了关键词 1 还有关键词 2 , 最后还会有关键词 3 。 "*1000))print(time.time()-startTime)
词库同样是:
with open("./sensitive_words", 'w') as f:f.write("\n".join( [ " 关键词 " + str(i) for i in range(0,10000)]))
使用上面的方法 , 测试结果为 0.017391204833984375 。
敏感词过滤方法小结
根据上文的测试对比 , 我们可以发现在所有算法中 , DFA 过滤敏感词性能最高 , 但是在实际应用中 , DFA 过滤和 AC 自动机过滤各自有自己的适用场景 , 可以根据具体业务来选择 。
实现敏感词过滤 API
想要实现敏感词过滤 API , 就需要将代码部署到 Serverless 架构上 , 选择 API 网关与函数计算进行结合 。 以 AC 自动机过滤敏感词算法为例:我们只需要增加是几行代码就好:
# -*- coding:utf-8 -*-
import json, uuid
class Node(object):def __init__(self):self.next = {}self.fail = Noneself.isWord = Falseself.word = ""
class AcAutomation(object):
def __init__(self):self.root = Node()
# 查找敏感词函数def search(self, content):p = self.rootresult = []currentposition = 0
while currentposition
if word in p.next:p = p.next[word]else:p = self.root


推荐阅读