PDF电子发票内容提取

摘要

本文介绍如何提取PDF版电子发票的内容。

1. 加载内容

首先使用Python的pdfplumber库读入内容。

FILE=r"data/test-2.pdf"
pdf=pb.open(FILE)
page=pdf.pages[0]

接着读取内容并提取线段。

words=page.extract_words(x_tolerance=5)
lines=page.lines # 获取线段(不包括边框线)
for word in words:
    print(word)
# 坐标换算
for index,word in enumerate(words):
    words[index]["y0"]=word["top"]
    words[index]["y1"]=word["bottom"]
for index,line in enumerate(lines):
    lines[index]["x1"]=line["x0"]+line["width"]
    lines[index]["y0"]=line["top"]
    lines[index]["y1"]=line["bottom"]

2. 还原表格

为了将内容划分到合理的位置,需要还原出表格。

首先,把线段分类为横线和竖线,并且剔除较短的两根。

hlines=[line for line in lines if line["width"]>0] # 筛选横线
hlines=sorted(hlines,key=lambda h:h["width"],reverse=True)[:-2] #剔除较短的两根

vlines=[line for line in lines if line["height"]>0] #筛选竖线
vlines=sorted(vlines,key=lambda v:v["y0"]) #按照坐标排列

将线段展示出来如下图。

初始表格

此时的线段是不闭合的,将缺少的线段补齐得到表格如下。

# 查找边框顶点
hx0=hlines[0]["x0"] # 左侧
hx1=hlines[0]["x1"] # 右侧
vy0=vlines[0]["y0"] # 顶部
vy1=vlines[-1]["y1"] # 底部

thline={"x0":hx0,"y0":vy0,"x1":hx1,"y1":vy0} # 顶部横线
bhline={"x0":hx0,"y0":vy1,"x1":hx1,"y1":vy1} # 底部横线
lvline={"x0":hx0,"y0":vy0,"x1":hx0,"y1":vy1} # 左侧竖线
rvline={"x0":hx1,"y0":vy0,"x1":hx1,"y1":vy1} # 右侧竖线

hlines.insert(0,thline)
hlines.append(bhline)

vlines.insert(0,lvline)
vlines.append(rvline)

补齐缺失线段

接下来,查找所有线段的交点:

# 查找所有交点
points=[]

delta=1
for vline in vlines:
    vx0=vline["x0"]
    vy0=vline["y0"]
    vx1=vline["x1"]
    vy1=vline["y1"]    
    for hline in hlines:
        hx0=hline["x0"]
        hy0=hline["y0"]
        hx1=hline["x1"]
        hy1=hline["y1"]        
        if (hx0-delta)<=vx0<=(hx1+delta) and (vy0-delta)<=hy0<=(vy1+delta):
            points.append((int(vx0),int(hy0)))
print('所有交点:',points)
print('交点总计:',len(points))

线段交点

最后,根据交点构建矩形块

# 构造矩阵
X=sorted(set([int(p[0]) for p in points]))
Y=sorted(set([int(p[1]) for p in points]))

df=pd.DataFrame(index=Y,columns=X)
for p in points:
    x,y=int(p[0]),int(p[1])
    df.loc[y,x]=1
df=df.fillna(0)

# 寻找矩形
rects=[]
COLS=len(df.columns)-1
ROWS=len(df.index)-1

for row in range(ROWS):
    for col in range(COLS):
        p0=df.iat[row,col] # 主点:必能构造一个矩阵
        cnt=col+1
        while cnt<=COLS:
            p1=df.iat[row,cnt]
            p2=df.iat[row+1,col]
            p3=df.iat[row+1,cnt]
            if p0 and p1 and p2 and p3:
                rects.append(((df.columns[col],df.index[row]),(df.columns[cnt],df.index[row]),(df.columns[col],df.index[row+1]),(df.columns[cnt],df.index[row+1])))
                break
            else:
                cnt+=1
print(len(rects))
for r in rects:
    print(r)

构造矩阵

3.将单词放入矩形框

首先,在表格中查看一下单词的位置

单词位置

接下来,将内容放入到矩形框中

# 判断点是否在矩形内
def inRect(point,rect):
    px,py=point
    p1,p2,p3,p4=rect
    if p1[0]<=px<=p2[0] and p1[1]<=py<=p3[1]:
        return True
    else:
        return False

# 将words按照坐标层级放入矩阵中
groups={}
delta=2
for word in words:
    p=(int(word["x0"]),int((word["y0"]+word["y1"])/2))
    flag=False
    for r in rects:
        if inRect(p,r):
            flag=True
            groups[("IN",r[0][1],r)]=groups.get(("IN",r[0][1],r),[])+[word]
            break
    if not flag:
        y_range=[p[1]+x for x in range(delta)]+[p[1]-x for x in range(delta)]
        out_ys=[k[1] for k in list(groups.keys()) if k[0]=="OUT"]
        flag=False
        for y in set(y_range):
            if y in out_ys:
                v=out_ys[out_ys.index(y)]
                groups[("OUT",v)].append(word)
                flag=True
                break
        if not flag:
            groups[("OUT",p[1])]=[word]

# 按照y坐标排序
keys=sorted(groups.keys(),key=lambda k:k[1])
for k in keys:
    g=groups[k]
    print(k,[w["text"] for w in g])
    print("*-*-"*20)

4. 结果及代码

最后,提取得到结果:

结果

上图原样本示例:

样本

最后,将代码封装整理为类:


class Extractor(object):
    def __init__(self, path):
        self.file = path if os.path.isfile else None

    def _load_data(self):
        if self.file and os.path.splitext(self.file)[1] == '.pdf':
            pdf = pb.open(self.file)
            page = pdf.pages[0]
            words = page.extract_words(x_tolerance=5)
            lines = page.lines
            # convert coordination
            for index, word in enumerate(words):
                words[index]['y0'] = word['top']
                words[index]['y1'] = word['bottom']
            for index, line in enumerate(lines):
                lines[index]['x1'] = line['x0']+line['width']
                lines[index]['y0'] = line['top']
                lines[index]['y1'] = line['bottom']
            return {'words': words, 'lines': lines}
        else:
            print("file %s cann't be opened." % self.file)
            return None

    def _fill_line(self, lines):
        hlines = [line for line in lines if line['width'] > 0]  # 筛选横线
        hlines = sorted(hlines, key=lambda h: h['width'], reverse=True)[:-2]  # 剔除较短的两根
        vlines = [line for line in lines if line['height'] > 0]  # 筛选竖线
        vlines = sorted(vlines, key=lambda v: v['y0'])  # 按照坐标排列
        # 查找边框顶点
        hx0 = hlines[0]['x0']  # 左侧
        hx1 = hlines[0]['x1']  # 右侧
        vy0 = vlines[0]['y0']  # 顶部
        vy1 = vlines[-1]['y1']  # 底部

        thline = {'x0': hx0, 'y0': vy0, 'x1': hx1, 'y1': vy0}  # 顶部横线
        bhline = {'x0': hx0, 'y0': vy1, 'x1': hx1, 'y1': vy1}  # 底部横线
        lvline = {'x0': hx0, 'y0': vy0, 'x1': hx0, 'y1': vy1}  # 左侧竖线
        rvline = {'x0': hx1, 'y0': vy0, 'x1': hx1, 'y1': vy1}  # 右侧竖线

        hlines.insert(0, thline)
        hlines.append(bhline)
        vlines.insert(0, lvline)
        vlines.append(rvline)
        return {'hlines': hlines, 'vlines': vlines}

    def _is_point_in_rect(self, point, rect):
        '''判断点是否在矩形内'''
        px, py = point
        p1, p2, p3, p4 = rect
        if p1[0] <= px <= p2[0] and p1[1] <= py <= p3[1]:
            return True
        else:
            return False

    def _find_cross_points(self, hlines, vlines):
        points = []
        delta = 1
        for vline in vlines:
            vx0 = vline['x0']
            vy0 = vline['y0']
            vy1 = vline['y1']
            for hline in hlines:
                hx0 = hline['x0']
                hy0 = hline['y0']
                hx1 = hline['x1']
                if (hx0-delta) <= vx0 <= (hx1+delta) and (vy0-delta) <= hy0 <= (vy1+delta):
                    points.append((int(vx0), int(hy0)))
        return points

    def _find_rects(self, cross_points):
        # 构造矩阵
        X = sorted(set([int(p[0]) for p in cross_points]))
        Y = sorted(set([int(p[1]) for p in cross_points]))
        df = pd.DataFrame(index=Y, columns=X)
        for p in cross_points:
            x, y = int(p[0]), int(p[1])
            df.loc[y, x] = 1
        df = df.fillna(0)
        # 寻找矩形
        rects = []
        COLS = len(df.columns)-1
        ROWS = len(df.index)-1
        for row in range(ROWS):
            for col in range(COLS):
                p0 = df.iat[row, col]  # 主点:必能构造一个矩阵
                cnt = col+1
                while cnt <= COLS:
                    p1 = df.iat[row, cnt]
                    p2 = df.iat[row+1, col]
                    p3 = df.iat[row+1, cnt]
                    if p0 and p1 and p2 and p3:
                        rects.append(((df.columns[col], df.index[row]), (df.columns[cnt], df.index[row]), (
                            df.columns[col], df.index[row+1]), (df.columns[cnt], df.index[row+1])))
                        break
                    else:
                        cnt += 1
        return rects

    def _put_words_into_rect(self, words, rects):
        # 将words按照坐标层级放入矩阵中
        groups = {}
        delta = 2
        for word in words:
            p = (int(word['x0']), int((word['y0']+word['y1'])/2))
            flag = False
            for r in rects:
                if self._is_point_in_rect(p, r):
                    flag = True
                    groups[('IN', r[0][1], r)] = groups.get(
                        ('IN', r[0][1], r), [])+[word]
                    break
            if not flag:
                y_range = [
                    p[1]+x for x in range(delta)]+[p[1]-x for x in range(delta)]
                out_ys = [k[1] for k in list(groups.keys()) if k[0] == 'OUT']
                flag = False
                for y in set(y_range):
                    if y in out_ys:
                        v = out_ys[out_ys.index(y)]
                        groups[('OUT', v)].append(word)
                        flag = True
                        break
                if not flag:
                    groups[('OUT', p[1])] = [word]
        return groups

    def _find_text_by_same_line(self, group, delta=1):
        words = {}
        group = sorted(group, key=lambda x: x['x0'])
        for w in group:
            bottom = int(w['bottom'])
            text = w['text']
            k1 = [bottom-i for i in range(delta)]
            k2 = [bottom+i for i in range(delta)]
            k = set(k1+k2)
            flag = False
            for kk in k:
                if kk in words:
                    words[kk] = words.get(kk, '')+text
                    flag = True
                    break
            if not flag:
                words[bottom] = words.get(bottom, '')+text
        return words

    def _split_words_into_diff_line(self, groups):
        groups2 = {}
        for k, g in groups.items():
            words = self._find_text_by_same_line(g, 3)
            groups2[k] = words
        return groups2

    def _index_of_y(self, x, rects):
        for index, r in enumerate(rects):
            if x == r[2][0][0]:
                return index+1 if index+1 < len(rects) else None
        return None

    def _find_outer(self, k, words):
        df = pd.DataFrame()
        for pos, text in words.items():
            if re.search(r'发票$', text):  # 发票名称
                df.loc[0, '发票名称'] = text
            elif re.search(r'发票代码', text):  # 发票代码
                num = ''.join(re.findall(r'[0-9]+', text))
                df.loc[0, '发票代码'] = num
            elif re.search(r'发票号码', text):  # 发票号码
                num = ''.join(re.findall(r'[0-9]+', text))
                df.loc[0, '发票号码'] = num
            elif re.search(r'开票日期', text):  # 开票日期
                date = ''.join(re.findall(
                    r'[0-9]{4}年[0-9]{1,2}月[0-9]{1,2}日', text))
                df.loc[0, '开票日期'] = date
            elif '机器编号' in text and '校验码' in text:  # 校验码
                text1 = re.search(r'校验码:\d+', text)[0]
                num = ''.join(re.findall(r'[0-9]+', text1))
                df.loc[0, '校验码'] = num
                text2 = re.search(r'机器编号:\d+', text)[0]
                num = ''.join(re.findall(r'[0-9]+', text2))
                df.loc[0, '机器编号'] = num
            elif '机器编号' in text:
                num = ''.join(re.findall(r'[0-9]+', text))
                df.loc[0, '机器编号'] = num
            elif '校验码' in text:
                num = ''.join(re.findall(r'[0-9]+', text))
                df.loc[0, '校验码'] = num
            elif re.search(r'收款人', text):
                items = re.split(r'收款人:|复核:|开票人:|销售方:', text)
                items = [item for item in items if re.sub(
                    r'\s+', '', item) != '']
                df.loc[0, '收款人'] = items[0] if items and len(items) > 0 else ''
                df.loc[0, '复核'] = items[1] if items and len(items) > 1 else ''
                df.loc[0, '开票人'] = items[2] if items and len(items) > 2 else ''
                df.loc[0, '销售方'] = items[3] if items and len(items) > 3 else ''
        return df

    def _find_and_sort_rect_in_same_line(self, y, groups):
        same_rects_k = [k for k, v in groups.items() if k[1] == y]
        return sorted(same_rects_k, key=lambda x: x[2][0][0])

    def _find_inner(self, k, words, groups, groups2, free_zone_flag=False):
        df = pd.DataFrame()
        sort_words = sorted(words.items(), key=lambda x: x[0])
        text = [word for k, word in sort_words]
        context = ''.join(text)
        if '购买方' in context or '销售方' in context:
            y = k[1]
            x = k[2][0][0]
            same_rects_k = self._find_and_sort_rect_in_same_line(y, groups)
            target_index = self._index_of_y(x, same_rects_k)
            target_k = same_rects_k[target_index]
            group_context = groups2[target_k]
            prefix = '购买方' if '购买方' in context else '销售方'
            for pos, text in group_context.items():
                if '名称' in text:
                    name = re.sub(r'名称:', '', text)
                    df.loc[0, prefix+'名称'] = name
                elif '纳税人识别号' in text:
                    tax_man_id = re.sub(r'纳税人识别号:', '', text)
                    df.loc[0, prefix+'纳税人识别号'] = tax_man_id
                elif '地址、电话' in text:
                    addr = re.sub(r'地址、电话:', '', text)
                    df.loc[0, prefix+'地址电话'] = addr
                elif '开户行及账号' in text:
                    account = re.sub(r'开户行及账号:', '', text)
                    df.loc[0, prefix+'开户行及账号'] = account
        elif '密码区' in context:
            y = k[1]
            x = k[2][0][0]
            same_rects_k = self._find_and_sort_rect_in_same_line(y, groups)
            target_index = self._index_of_y(x, same_rects_k)
            target_k = same_rects_k[target_index]
            words = groups2[target_k]
            context = [v for k, v in words.items()]
            context = ''.join(context)
            df.loc[0, '密码区'] = context
        elif '价税合计' in context:
            y = k[1]
            x = k[2][0][0]
            same_rects_k = self._find_and_sort_rect_in_same_line(y, groups)
            target_index = self._index_of_y(x, same_rects_k)
            target_k = same_rects_k[target_index]
            group_words = groups2[target_k]
            group_context = ''.join([w for k, w in group_words.items()])
            items = re.split(r'[((]小写[))]', group_context)
            b = items[0] if items and len(items) > 0 else ''
            s = items[1] if items and len(items) > 1 else ''
            df.loc[0, '价税合计(大写)'] = b
            df.loc[0, '价税合计(小写)'] = s
        elif '备注' in context:
            y = k[1]
            x = k[2][0][0]
            same_rects_k = self._find_and_sort_rect_in_same_line(y, groups)
            target_index = self._index_of_y(x, same_rects_k)
            if target_index:
                target_k = same_rects_k[target_index]
                group_words = groups2[target_k]
                group_context = ''.join([w for k, w in group_words.items()])
                df.loc[0, '备注'] = group_context
            else:
                df.loc[0, '备注'] = ''
        else:
            if free_zone_flag:
                return df, free_zone_flag
            y = k[1]
            x = k[2][0][0]
            same_rects_k = self._find_and_sort_rect_in_same_line(y, groups)
            if len(same_rects_k) == 8:
                free_zone_flag = True
                for kk in same_rects_k:
                    yy = kk[1]
                    xx = kk[2][0][0]
                    words = groups2[kk]
                    words = sorted(words.items(), key=lambda x: x[0]) if words and len(
                        words) > 0 else None
                    key = words[0][1] if words and len(words) > 0 else None
                    val = [word[1] for word in words[1:]
                           ] if key and words and len(words) > 1 else ''
                    val = '\n'.join(val) if val else ''
                    if key:
                        df.loc[0, key] = val
        return df, free_zone_flag

    def extract(self):
        data = self._load_data()
        words = data['words']
        lines = data['lines']

        lines = self._fill_line(lines)
        hlines = lines['hlines']
        vlines = lines['vlines']

        cross_points = self._find_cross_points(hlines, vlines)
        rects = self._find_rects(cross_points)

        word_groups = self._put_words_into_rect(words, rects)
        word_groups2 = self._split_words_into_diff_line(word_groups)

        df = pd.DataFrame()
        free_zone_flag = False
        for k, words in word_groups2.items():
            if k[0] == 'OUT':
                df_item = self._find_outer(k, words)
            else:
                df_item, free_zone_flag = self._find_inner(
                    k, words, word_groups, word_groups2, free_zone_flag)
            df = pd.concat([df, df_item], axis=1)
        return df

if __name__=="__main__":
    path=r'data.pdf'
    data = Extractor(path).extract()
    print(data)

   转载规则


《PDF电子发票内容提取》 查永春 采用 知识共享署名 4.0 国际许可协议 进行许可。
  目录