最新消息:雨落星辰是一个专注网站SEO优化、网站SEO诊断、搜索引擎研究、网络营销推广、网站策划运营及站长类的自媒体原创博客

python - Deduplication by Text Blocks with filtering - Stack Overflow

programmeradmin3浏览0评论

I am trying to create a python script that will remove consecutive duplicate text blocks from a text file, ie do something like this:

  1. Locate text blocks that are the same, based on # of rows (with a max block size X that determines the initial max rows per text block, then reduces to 1 row at a time)
  2. If there are consecutive X row text block duplicates, remove them and replace the duplicates with a simple "...has an additional X similar entries..." after the initial text block that has duplicates following it and replaces those duplicates.

I also want to ensure when compariing that

  1. any 2+ spaces in the text rows are repalced with 1 space for the coparison
  2. any digits are removed (including 'words that start with a number and end with a numbner or period" - which will remove dates eg: XX/XX/XX, comma/decimal numbers eg: 3,444.22, and points eg: 1.) from each row
  3. Skips empty lines with strip()'d for text block comparing (ie ignored)

The result will be a new txt file that has the duplicate text blocks removed.

I have the row clean function for the comparison like this:

def clean_for_comparison(line):
    # Remove dollar amounts (e.g., $1,234.56) - additional example of cleaning row
    line = re.sub(r'\$[\d,]+(?:\.\d+)?', '', line)
    # Remove numbers with commas and decimals (e.g., 1,234.56)
    line = re.sub(r'[\d,]+(?:\.\d+)?', '', line)
    # Remove date-like patterns (e.g., 17/01/2020 or 17-01-2020)
    line = re.sub(r'\d{1,2}[-/.\s]?\d{1,2}[-/.\s]?\d{2,4}', '', line)
    # Remove words starting and ending with digits (e.g., 123abc456)
    line = re.sub(r'\b\d+[a-zA-Z]+\d+\b', '', line)
    # Normalize spaces and strip
    line = re.sub(r'\s+', ' ', line)
    line = line.strip()
    return line

I am trying to use a hash method for the text block compare by constructing the block using X rows, but am having issues with how it should work effiicently.

I use these helper functions:

def get_block_hash(block):
    cleaned_lines = [clean_for_comparison(line) for line in block]
    non_blank_cleaned_lines = [line for line in cleaned_lines if line]
    cleaned_block_content = ''.join(non_blank_cleaned_lines)
    return hashlib.md5(cleaned_block_content.encode()).hexdigest()

def get_block(lines, start_index, max_size):
    block = []
    i = start_index
    row_count = 0

    while i < len(lines) and row_count < max_size:
        line = lines[i].strip()
        if line:
            block.append(lines[i])
            row_count += 1
        i += 1

        if i >= len(lines):
            break
        if lines[i - 1].strip() == "" and lines[i].strip() == "":
            break

    return block, i

but the it gets confusing trying to do it properly/efficient....so far I was looking at this approach, but it's not quite right...

def process_file(file_path, max_consecutive_rows_to_check=4):
    with open(file_path, 'r') as file:
        lines = file.readlines()

    filtered_lines = []
    i = 0
    removed_count = 0

    while i < len(lines):
        block, j = get_block(lines, i, max_consecutive_rows_to_check)
        if not block:  # Skip processing.
            i = j  # Increment
            continue  # Skip empty block

        # Sliding window comparison
        for size in range(min(len(block), max_consecutive_rows_to_check), 0, -1):
            block_to_compare = block[:size]
            position_row = initial_block_start

            while True:
                compare_start = position_row +1 # use the next row outside the intial block to compare?
                compare_block, next_k = get_block(lines, compare_start, size)
                compare_line = compare_block

                if len(compare_block) != size:
                    break  # Done with comapre - no more
                if get_block_hash(block_to_compare) == get_block_hash(compare_block):
                    print("DUPE FOUND!")
                    found_dupes = True
                    total_dupes += 1 # dupe counter
                    position_row = compare_start #set next dupe position to compare consecutively with
                    i += compare_start
                    #continue #continue if it finds?
                else: #Did not find anything
                    print("Not a dupe.")
                    break

        # If duplicates were found:
        if found_dupes:
            print(f"Total duplicates found: {total_dupes}")
            filtered_lines.extend(block)
            filtered_lines.append(f"....<with an additional {total_dupes} entries found>...\n")
            removed_count += total_dupes

            i = position_row+max_consecutive_rows_to_check #+1 + size?

        # If no duplicates were found:
        else:
            print("No duplicates found, adding to filtered lines.")
            filtered_lines.extend(block)

        # Advance to next block
        i =+max_consecutive_rows_to_check

    # Output to new file
    output_filename = file_path
    if output_filename.endswith(".txt"):
        output_filename = output_filename[:-4]
    if not output_filename.endswith("-truncated"):
        output_filename = f"{output_filename}-truncated.txt"

    with open(output_filename, "w") as output_file:
        output_file.writelines(filtered_lines)

    print(f"Processed {len(lines)} lines from {file_path}")
    print(f"Output saved to {output_filename}")
    if removed_count > 0:
        print(f"Removed {removed_count} duplicate blocks.")

....etc..

Any help in logic flow, efficenice or if anyone knows of any exist lib that de-dupes text files based on consecutive text blocks?

Thanks

I am trying to create a python script that will remove consecutive duplicate text blocks from a text file, ie do something like this:

  1. Locate text blocks that are the same, based on # of rows (with a max block size X that determines the initial max rows per text block, then reduces to 1 row at a time)
  2. If there are consecutive X row text block duplicates, remove them and replace the duplicates with a simple "...has an additional X similar entries..." after the initial text block that has duplicates following it and replaces those duplicates.

I also want to ensure when compariing that

  1. any 2+ spaces in the text rows are repalced with 1 space for the coparison
  2. any digits are removed (including 'words that start with a number and end with a numbner or period" - which will remove dates eg: XX/XX/XX, comma/decimal numbers eg: 3,444.22, and points eg: 1.) from each row
  3. Skips empty lines with strip()'d for text block comparing (ie ignored)

The result will be a new txt file that has the duplicate text blocks removed.

I have the row clean function for the comparison like this:

def clean_for_comparison(line):
    # Remove dollar amounts (e.g., $1,234.56) - additional example of cleaning row
    line = re.sub(r'\$[\d,]+(?:\.\d+)?', '', line)
    # Remove numbers with commas and decimals (e.g., 1,234.56)
    line = re.sub(r'[\d,]+(?:\.\d+)?', '', line)
    # Remove date-like patterns (e.g., 17/01/2020 or 17-01-2020)
    line = re.sub(r'\d{1,2}[-/.\s]?\d{1,2}[-/.\s]?\d{2,4}', '', line)
    # Remove words starting and ending with digits (e.g., 123abc456)
    line = re.sub(r'\b\d+[a-zA-Z]+\d+\b', '', line)
    # Normalize spaces and strip
    line = re.sub(r'\s+', ' ', line)
    line = line.strip()
    return line

I am trying to use a hash method for the text block compare by constructing the block using X rows, but am having issues with how it should work effiicently.

I use these helper functions:

def get_block_hash(block):
    cleaned_lines = [clean_for_comparison(line) for line in block]
    non_blank_cleaned_lines = [line for line in cleaned_lines if line]
    cleaned_block_content = ''.join(non_blank_cleaned_lines)
    return hashlib.md5(cleaned_block_content.encode()).hexdigest()

def get_block(lines, start_index, max_size):
    block = []
    i = start_index
    row_count = 0

    while i < len(lines) and row_count < max_size:
        line = lines[i].strip()
        if line:
            block.append(lines[i])
            row_count += 1
        i += 1

        if i >= len(lines):
            break
        if lines[i - 1].strip() == "" and lines[i].strip() == "":
            break

    return block, i

but the it gets confusing trying to do it properly/efficient....so far I was looking at this approach, but it's not quite right...

def process_file(file_path, max_consecutive_rows_to_check=4):
    with open(file_path, 'r') as file:
        lines = file.readlines()

    filtered_lines = []
    i = 0
    removed_count = 0

    while i < len(lines):
        block, j = get_block(lines, i, max_consecutive_rows_to_check)
        if not block:  # Skip processing.
            i = j  # Increment
            continue  # Skip empty block

        # Sliding window comparison
        for size in range(min(len(block), max_consecutive_rows_to_check), 0, -1):
            block_to_compare = block[:size]
            position_row = initial_block_start

            while True:
                compare_start = position_row +1 # use the next row outside the intial block to compare?
                compare_block, next_k = get_block(lines, compare_start, size)
                compare_line = compare_block

                if len(compare_block) != size:
                    break  # Done with comapre - no more
                if get_block_hash(block_to_compare) == get_block_hash(compare_block):
                    print("DUPE FOUND!")
                    found_dupes = True
                    total_dupes += 1 # dupe counter
                    position_row = compare_start #set next dupe position to compare consecutively with
                    i += compare_start
                    #continue #continue if it finds?
                else: #Did not find anything
                    print("Not a dupe.")
                    break

        # If duplicates were found:
        if found_dupes:
            print(f"Total duplicates found: {total_dupes}")
            filtered_lines.extend(block)
            filtered_lines.append(f"....<with an additional {total_dupes} entries found>...\n")
            removed_count += total_dupes

            i = position_row+max_consecutive_rows_to_check #+1 + size?

        # If no duplicates were found:
        else:
            print("No duplicates found, adding to filtered lines.")
            filtered_lines.extend(block)

        # Advance to next block
        i =+max_consecutive_rows_to_check

    # Output to new file
    output_filename = file_path
    if output_filename.endswith(".txt"):
        output_filename = output_filename[:-4]
    if not output_filename.endswith("-truncated"):
        output_filename = f"{output_filename}-truncated.txt"

    with open(output_filename, "w") as output_file:
        output_file.writelines(filtered_lines)

    print(f"Processed {len(lines)} lines from {file_path}")
    print(f"Output saved to {output_filename}")
    if removed_count > 0:
        print(f"Removed {removed_count} duplicate blocks.")

....etc..

Any help in logic flow, efficenice or if anyone knows of any exist lib that de-dupes text files based on consecutive text blocks?

Thanks

Share Improve this question edited Mar 15 at 19:02 joanis 12.4k23 gold badges37 silver badges48 bronze badges asked Mar 15 at 16:59 jkeysjkeys 1459 bronze badges
Add a comment  | 

1 Answer 1

Reset to default 0

I believe i have resolved this. So for anyone else wanting to do something similar, here is the code:

import hashlib
import re
import glob
import string
import sys

def clean_for_comparison(line):
    line = line.lower()     # Convert text to lowercase
    line = line.translate(str.maketrans("", "", string.punctuation))    # Remove punctuation using string.punctuation
    line = re.sub(r'\$[\d,]+(?:\.\d+)?', '', line)    # Remove dollar amounts (e.g., $1,234.56)
    line = re.sub(r'[\d,]+(?:\.\d+)?', '', line)    # Remove numbers with commas and decimals (e.g., 1,234.56)
    line = re.sub(r'\d{1,2}[-/.\s]?\d{1,2}[-/.\s]?\d{2,4}', '', line)    # Remove date-like patterns (e.g., 17/01/2020 or 17-01-2020)
    line = re.sub(r'\b\d+[a-zA-Z]+\d+\b', '', line)    # Remove words starting and ending with digits (e.g., 123abc456)
    line = re.sub(r'\s+', ' ', line).strip()    # Normalize spaces by collapsing multiple spaces into one
    return line

def get_block_hash(block):
    cleaned_lines = [clean_for_comparison(line) for line in block]
    non_blank_cleaned_lines = [line for line in cleaned_lines if line]
    cleaned_block_content = ''.join(non_blank_cleaned_lines)
    return hashlib.md5(cleaned_block_content.encode()).hexdigest()

def get_block(lines, start_index, block_size):
    block = []
    i = start_index
    count = 0  # Track the number of content lines added to the block
    while count < block_size and i < len(lines):
        line = lines[i].strip()
        if line:  # Skip empty lines
            block.append(line)
            count += 1
        i += 1
    return block, i

def process_file(file_path, max_consecutive_rows_to_check=6):
    try:
        with open(file_path, 'r') as file:
            lines = file.readlines()
    except Exception as e:
        print(f"Error reading file {file_path}: {e}")
        return

    output_lines = []
    i = 0
    removed_count = 0
    
    while i < len(lines):
        block_found = False
        for current_rows_to_check in range(max_consecutive_rows_to_check, 0, -1):
            if i + current_rows_to_check > len(lines):
                continue
            block, j = get_block(lines, i, current_rows_to_check)
            if not block:
                break  # Skip if no more blocks
            cleaned_block = [clean_for_comparison(line) for line in block]            # Clean the block *before* hashing and printing
            block_hash = get_block_hash(block)
            consecutive_duplicates = 0
            next_i = j  # Index after the current block
            # Compare consecutive blocks
            while True:
                if next_i + current_rows_to_check > len(lines):
                    break
                compare_block, next_j = get_block(lines, next_i, current_rows_to_check)
                if not compare_block:
                    break
                cleaned_compare_block = [clean_for_comparison(line) for line in compare_block]                # Clean the compare_block *before* hashing and comparison
                compare_block_hash = get_block_hash(compare_block)
                if block_hash == compare_block_hash:
                    consecutive_duplicates += 1
                    next_i = next_j  # Move to the next block after the duplicate
                else:
                    break  # No match, stop the duplicate search

            if consecutive_duplicates > 0:
                output_lines.append(''.join(lines[i:i+current_rows_to_check]) + '\n')                # Append the original block of text (not the cleaned version)
                output_lines.append(f"....<with an additional {consecutive_duplicates} similar entries found>...\n")
                removed_count += consecutive_duplicates
                i = next_i  # Move to the next block after the last duplicate
                block_found = True
                break
                
        if not block_found:
            cleaned_line = clean_for_comparison(lines[i])  # Apply the cleaned line to skip empty lines from the cleaning function
            # Append original line for printing in the -truncated document - but ignore if the CLEANED is BLANK from cleaning
            if cleaned_line:
                output_lines.append(lines[i])  # No duplicates and line remains
            i += 1  # Next row of the whole document

    # Output to new file
    output_filename = file_path
    if output_filename.endswith(".txt"):
        output_filename = output_filename[:-4]
    if not output_filename.endswith("-truncated"):
        output_filename = f"{output_filename}-truncated.txt"

    try:
        with open(output_filename, "w") as output_file:
            output_file.writelines(output_lines)  # Save the processed original lines
        print(f"Output saved to {output_filename}")
    except Exception as e:
        print(f"Error writing to file {output_filename}: {e}")
发布评论

评论列表(0)

  1. 暂无评论