MinerU / magic_pdf /para /block_termination_processor.py
derful's picture
Upload folder using huggingface_hub
240e0a0 verified
from magic_pdf.para.commons import *
if sys.version_info[0] >= 3:
sys.stdout.reconfigure(encoding="utf-8") # type: ignore
class BlockTerminationProcessor:
def __init__(self) -> None:
pass
def _is_consistent_lines(
self,
curr_line,
prev_line,
next_line,
consistent_direction, # 0 for prev, 1 for next, 2 for both
):
"""
This function checks if the line is consistent with its neighbors
Parameters
----------
curr_line : dict
current line
prev_line : dict
previous line
next_line : dict
next line
consistent_direction : int
0 for prev, 1 for next, 2 for both
Returns
-------
bool
True if the line is consistent with its neighbors, False otherwise.
"""
curr_line_font_size = curr_line["spans"][0]["size"]
curr_line_font_type = curr_line["spans"][0]["font"].lower()
if consistent_direction == 0:
if prev_line:
prev_line_font_size = prev_line["spans"][0]["size"]
prev_line_font_type = prev_line["spans"][0]["font"].lower()
return curr_line_font_size == prev_line_font_size and curr_line_font_type == prev_line_font_type
else:
return False
elif consistent_direction == 1:
if next_line:
next_line_font_size = next_line["spans"][0]["size"]
next_line_font_type = next_line["spans"][0]["font"].lower()
return curr_line_font_size == next_line_font_size and curr_line_font_type == next_line_font_type
else:
return False
elif consistent_direction == 2:
if prev_line and next_line:
prev_line_font_size = prev_line["spans"][0]["size"]
prev_line_font_type = prev_line["spans"][0]["font"].lower()
next_line_font_size = next_line["spans"][0]["size"]
next_line_font_type = next_line["spans"][0]["font"].lower()
return (curr_line_font_size == prev_line_font_size and curr_line_font_type == prev_line_font_type) and (
curr_line_font_size == next_line_font_size and curr_line_font_type == next_line_font_type
)
else:
return False
else:
return False
def _is_regular_line(self, curr_line_bbox, prev_line_bbox, next_line_bbox, avg_char_width, X0, X1, avg_line_height):
"""
This function checks if the line is a regular line
Parameters
----------
curr_line_bbox : list
bbox of the current line
prev_line_bbox : list
bbox of the previous line
next_line_bbox : list
bbox of the next line
avg_char_width : float
average of char widths
X0 : float
median of x0 values, which represents the left average boundary of the page
X1 : float
median of x1 values, which represents the right average boundary of the page
avg_line_height : float
average of line heights
Returns
-------
bool
True if the line is a regular line, False otherwise.
"""
horizontal_ratio = 0.5
vertical_ratio = 0.5
horizontal_thres = horizontal_ratio * avg_char_width
vertical_thres = vertical_ratio * avg_line_height
x0, y0, x1, y1 = curr_line_bbox
x0_near_X0 = abs(x0 - X0) < horizontal_thres
x1_near_X1 = abs(x1 - X1) < horizontal_thres
prev_line_is_end_of_para = prev_line_bbox and (abs(prev_line_bbox[2] - X1) > avg_char_width)
sufficient_spacing_above = False
if prev_line_bbox:
vertical_spacing_above = y1 - prev_line_bbox[3]
sufficient_spacing_above = vertical_spacing_above > vertical_thres
sufficient_spacing_below = False
if next_line_bbox:
vertical_spacing_below = next_line_bbox[1] - y0
sufficient_spacing_below = vertical_spacing_below > vertical_thres
return (
(sufficient_spacing_above or sufficient_spacing_below)
or (not x0_near_X0 and not x1_near_X1)
or prev_line_is_end_of_para
)
def _is_possible_start_of_para(self, curr_line, prev_line, next_line, X0, X1, avg_char_width, avg_font_size):
"""
This function checks if the line is a possible start of a paragraph
Parameters
----------
curr_line : dict
current line
prev_line : dict
previous line
next_line : dict
next line
X0 : float
median of x0 values, which represents the left average boundary of the page
X1 : float
median of x1 values, which represents the right average boundary of the page
avg_char_width : float
average of char widths
avg_line_height : float
average of line heights
Returns
-------
bool
True if the line is a possible start of a paragraph, False otherwise.
"""
start_confidence = 0.5 # Initial confidence of the line being a start of a paragraph
decision_path = [] # Record the decision path
curr_line_bbox = curr_line["bbox"]
prev_line_bbox = prev_line["bbox"] if prev_line else None
next_line_bbox = next_line["bbox"] if next_line else None
indent_ratio = 1
vertical_ratio = 1.5
vertical_thres = vertical_ratio * avg_font_size
left_horizontal_ratio = 0.5
left_horizontal_thres = left_horizontal_ratio * avg_char_width
right_horizontal_ratio = 2.5
right_horizontal_thres = right_horizontal_ratio * avg_char_width
x0, y0, x1, y1 = curr_line_bbox
indent_condition = x0 > X0 + indent_ratio * avg_char_width
if indent_condition:
start_confidence += 0.2
decision_path.append("indent_condition_met")
x0_near_X0 = abs(x0 - X0) < left_horizontal_thres
if x0_near_X0:
start_confidence += 0.1
decision_path.append("x0_near_X0")
x1_near_X1 = abs(x1 - X1) < right_horizontal_thres
if x1_near_X1:
start_confidence += 0.1
decision_path.append("x1_near_X1")
if prev_line is None:
prev_line_is_end_of_para = True
start_confidence += 0.2
decision_path.append("no_prev_line")
else:
prev_line_is_end_of_para, _, _ = self._is_possible_end_of_para(prev_line, next_line, X0, X1, avg_char_width)
if prev_line_is_end_of_para:
start_confidence += 0.1
decision_path.append("prev_line_is_end_of_para")
sufficient_spacing_above = False
if prev_line_bbox:
vertical_spacing_above = y1 - prev_line_bbox[3]
sufficient_spacing_above = vertical_spacing_above > vertical_thres
if sufficient_spacing_above:
start_confidence += 0.2
decision_path.append("sufficient_spacing_above")
sufficient_spacing_below = False
if next_line_bbox:
vertical_spacing_below = next_line_bbox[1] - y0
sufficient_spacing_below = vertical_spacing_below > vertical_thres
if sufficient_spacing_below:
start_confidence += 0.2
decision_path.append("sufficient_spacing_below")
is_regular_line = self._is_regular_line(
curr_line_bbox, prev_line_bbox, next_line_bbox, avg_char_width, X0, X1, avg_font_size
)
if is_regular_line:
start_confidence += 0.1
decision_path.append("is_regular_line")
is_start_of_para = (
(sufficient_spacing_above or sufficient_spacing_below)
or (indent_condition)
or (not indent_condition and x0_near_X0 and x1_near_X1 and not is_regular_line)
or prev_line_is_end_of_para
)
return (is_start_of_para, start_confidence, decision_path)
def _is_possible_end_of_para(self, curr_line, next_line, X0, X1, avg_char_width):
"""
This function checks if the line is a possible end of a paragraph
Parameters
----------
curr_line : dict
current line
next_line : dict
next line
X0 : float
median of x0 values, which represents the left average boundary of the page
X1 : float
median of x1 values, which represents the right average boundary of the page
avg_char_width : float
average of char widths
Returns
-------
bool
True if the line is a possible end of a paragraph, False otherwise.
"""
end_confidence = 0.5 # Initial confidence of the line being a end of a paragraph
decision_path = [] # Record the decision path
curr_line_bbox = curr_line["bbox"]
next_line_bbox = next_line["bbox"] if next_line else None
left_horizontal_ratio = 0.5
right_horizontal_ratio = 0.5
x0, _, x1, y1 = curr_line_bbox
next_x0, next_y0, _, _ = next_line_bbox if next_line_bbox else (0, 0, 0, 0)
x0_near_X0 = abs(x0 - X0) < left_horizontal_ratio * avg_char_width
if x0_near_X0:
end_confidence += 0.1
decision_path.append("x0_near_X0")
x1_smaller_than_X1 = x1 < X1 - right_horizontal_ratio * avg_char_width
if x1_smaller_than_X1:
end_confidence += 0.1
decision_path.append("x1_smaller_than_X1")
next_line_is_start_of_para = (
next_line_bbox
and (next_x0 > X0 + left_horizontal_ratio * avg_char_width)
and (not is_line_left_aligned_from_neighbors(curr_line_bbox, None, next_line_bbox, avg_char_width, direction=1))
)
if next_line_is_start_of_para:
end_confidence += 0.2
decision_path.append("next_line_is_start_of_para")
is_line_left_aligned_from_neighbors_bool = is_line_left_aligned_from_neighbors(
curr_line_bbox, None, next_line_bbox, avg_char_width
)
if is_line_left_aligned_from_neighbors_bool:
end_confidence += 0.1
decision_path.append("line_is_left_aligned_from_neighbors")
is_line_right_aligned_from_neighbors_bool = is_line_right_aligned_from_neighbors(
curr_line_bbox, None, next_line_bbox, avg_char_width
)
if not is_line_right_aligned_from_neighbors_bool:
end_confidence += 0.1
decision_path.append("line_is_not_right_aligned_from_neighbors")
is_end_of_para = end_with_punctuation(curr_line["text"]) and (
(x0_near_X0 and x1_smaller_than_X1)
or (is_line_left_aligned_from_neighbors_bool and not is_line_right_aligned_from_neighbors_bool)
)
return (is_end_of_para, end_confidence, decision_path)
def _cut_paras_per_block(
self,
block,
):
"""
Processes a raw block from PyMuPDF and returns the processed block.
Parameters
----------
raw_block : dict
A raw block from pymupdf.
Returns
-------
processed_block : dict
"""
def _construct_para(lines, is_block_title, para_title_level):
"""
Construct a paragraph from given lines.
"""
font_sizes = [span["size"] for line in lines for span in line["spans"]]
avg_font_size = sum(font_sizes) / len(font_sizes) if font_sizes else 0
font_colors = [span["color"] for line in lines for span in line["spans"]]
most_common_font_color = max(set(font_colors), key=font_colors.count) if font_colors else None
# font_types = [span["font"] for line in lines for span in line["spans"]]
# most_common_font_type = max(set(font_types), key=font_types.count) if font_types else None
font_type_lengths = {}
for line in lines:
for span in line["spans"]:
font_type = span["font"]
bbox_width = span["bbox"][2] - span["bbox"][0]
if font_type in font_type_lengths:
font_type_lengths[font_type] += bbox_width
else:
font_type_lengths[font_type] = bbox_width
# get the font type with the longest bbox width
most_common_font_type = max(font_type_lengths, key=font_type_lengths.get) if font_type_lengths else None # type: ignore
para_bbox = calculate_para_bbox(lines)
para_text = " ".join(line["text"] for line in lines)
return {
"para_bbox": para_bbox,
"para_text": para_text,
"para_font_type": most_common_font_type,
"para_font_size": avg_font_size,
"para_font_color": most_common_font_color,
"is_para_title": is_block_title,
"para_title_level": para_title_level,
}
block_bbox = block["bbox"]
block_text = block["text"]
block_lines = block["lines"]
X0 = safe_get(block, "X0", 0)
X1 = safe_get(block, "X1", 0)
avg_char_width = safe_get(block, "avg_char_width", 0)
avg_char_height = safe_get(block, "avg_char_height", 0)
avg_font_size = safe_get(block, "avg_font_size", 0)
is_block_title = safe_get(block, "is_block_title", False)
para_title_level = safe_get(block, "block_title_level", 0)
# Segment into paragraphs
para_ranges = []
in_paragraph = False
start_idx_of_para = None
# Create the processed paragraphs
processed_paras = {}
para_bboxes = []
end_idx_of_para = 0
for line_index, line in enumerate(block_lines):
curr_line = line
prev_line = block_lines[line_index - 1] if line_index > 0 else None
next_line = block_lines[line_index + 1] if line_index < len(block_lines) - 1 else None
"""
Start processing paragraphs.
"""
# Check if the line is the start of a paragraph
is_start_of_para, start_confidence, decision_path = self._is_possible_start_of_para(
curr_line, prev_line, next_line, X0, X1, avg_char_width, avg_font_size
)
if not in_paragraph and is_start_of_para:
in_paragraph = True
start_idx_of_para = line_index
# print_green(">>> Start of a paragraph")
# print(" curr_line_text: ", curr_line["text"])
# print(" start_confidence: ", start_confidence)
# print(" decision_path: ", decision_path)
# Check if the line is the end of a paragraph
is_end_of_para, end_confidence, decision_path = self._is_possible_end_of_para(
curr_line, next_line, X0, X1, avg_char_width
)
if in_paragraph and (is_end_of_para or not next_line):
para_ranges.append((start_idx_of_para, line_index))
start_idx_of_para = None
in_paragraph = False
# print_red(">>> End of a paragraph")
# print(" curr_line_text: ", curr_line["text"])
# print(" end_confidence: ", end_confidence)
# print(" decision_path: ", decision_path)
# Add the last paragraph if it is not added
if in_paragraph and start_idx_of_para is not None:
para_ranges.append((start_idx_of_para, len(block_lines) - 1))
# Process the matched paragraphs
for para_index, (start_idx, end_idx) in enumerate(para_ranges):
matched_lines = block_lines[start_idx : end_idx + 1]
para_properties = _construct_para(matched_lines, is_block_title, para_title_level)
para_key = f"para_{len(processed_paras)}"
processed_paras[para_key] = para_properties
para_bboxes.append(para_properties["para_bbox"])
end_idx_of_para = end_idx + 1
# Deal with the remaining lines
if end_idx_of_para < len(block_lines):
unmatched_lines = block_lines[end_idx_of_para:]
unmatched_properties = _construct_para(unmatched_lines, is_block_title, para_title_level)
unmatched_key = f"para_{len(processed_paras)}"
processed_paras[unmatched_key] = unmatched_properties
para_bboxes.append(unmatched_properties["para_bbox"])
block["paras"] = processed_paras
return block
def batch_process_blocks(self, pdf_dict):
"""
Parses the blocks of all pages.
Parameters
----------
pdf_dict : dict
PDF dictionary.
filter_blocks : list
List of bounding boxes to filter.
Returns
-------
result_dict : dict
Result dictionary.
"""
num_paras = 0
for page_id, page in pdf_dict.items():
if page_id.startswith("page_"):
para_blocks = []
if "para_blocks" in page.keys():
input_blocks = page["para_blocks"]
for input_block in input_blocks:
new_block = self._cut_paras_per_block(input_block)
para_blocks.append(new_block)
num_paras += len(new_block["paras"])
page["para_blocks"] = para_blocks
pdf_dict["statistics"]["num_paras"] = num_paras
return pdf_dict