I am trying to create a python script that will remove consecutive duplicate text blocks from a text file, ie do something like this:
- Locate text blocks that are the same, based on # of rows (with a max block size X that determines the initial max rows per text block, then reduces to 1 row at a time)
- If there are consecutive X row text block duplicates, remove them and replace the duplicates with a simple "...has an additional X similar entries..." after the initial text block that has duplicates following it and replaces those duplicates.
I also want to ensure when compariing that
- any 2+ spaces in the text rows are repalced with 1 space for the coparison
- any digits are removed (including 'words that start with a number and end with a numbner or period" - which will remove dates eg: XX/XX/XX, comma/decimal numbers eg: 3,444.22, and points eg: 1.) from each row
- Skips empty lines with strip()'d for text block comparing (ie ignored)
The result will be a new txt file that has the duplicate text blocks removed.
I have the row clean function for the comparison like this:
def clean_for_comparison(line):
# Remove dollar amounts (e.g., $1,234.56) - additional example of cleaning row
line = re.sub(r'\$[\d,]+(?:\.\d+)?', '', line)
# Remove numbers with commas and decimals (e.g., 1,234.56)
line = re.sub(r'[\d,]+(?:\.\d+)?', '', line)
# Remove date-like patterns (e.g., 17/01/2020 or 17-01-2020)
line = re.sub(r'\d{1,2}[-/.\s]?\d{1,2}[-/.\s]?\d{2,4}', '', line)
# Remove words starting and ending with digits (e.g., 123abc456)
line = re.sub(r'\b\d+[a-zA-Z]+\d+\b', '', line)
# Normalize spaces and strip
line = re.sub(r'\s+', ' ', line)
line = line.strip()
return line
I am trying to use a hash method for the text block compare by constructing the block using X rows, but am having issues with how it should work effiicently.
I use these helper functions:
def get_block_hash(block):
cleaned_lines = [clean_for_comparison(line) for line in block]
non_blank_cleaned_lines = [line for line in cleaned_lines if line]
cleaned_block_content = ''.join(non_blank_cleaned_lines)
return hashlib.md5(cleaned_block_content.encode()).hexdigest()
def get_block(lines, start_index, max_size):
block = []
i = start_index
row_count = 0
while i < len(lines) and row_count < max_size:
line = lines[i].strip()
if line:
block.append(lines[i])
row_count += 1
i += 1
if i >= len(lines):
break
if lines[i - 1].strip() == "" and lines[i].strip() == "":
break
return block, i
but the it gets confusing trying to do it properly/efficient....so far I was looking at this approach, but it's not quite right...
def process_file(file_path, max_consecutive_rows_to_check=4):
with open(file_path, 'r') as file:
lines = file.readlines()
filtered_lines = []
i = 0
removed_count = 0
while i < len(lines):
block, j = get_block(lines, i, max_consecutive_rows_to_check)
if not block: # Skip processing.
i = j # Increment
continue # Skip empty block
# Sliding window comparison
for size in range(min(len(block), max_consecutive_rows_to_check), 0, -1):
block_to_compare = block[:size]
position_row = initial_block_start
while True:
compare_start = position_row +1 # use the next row outside the intial block to compare?
compare_block, next_k = get_block(lines, compare_start, size)
compare_line = compare_block
if len(compare_block) != size:
break # Done with comapre - no more
if get_block_hash(block_to_compare) == get_block_hash(compare_block):
print("DUPE FOUND!")
found_dupes = True
total_dupes += 1 # dupe counter
position_row = compare_start #set next dupe position to compare consecutively with
i += compare_start
#continue #continue if it finds?
else: #Did not find anything
print("Not a dupe.")
break
# If duplicates were found:
if found_dupes:
print(f"Total duplicates found: {total_dupes}")
filtered_lines.extend(block)
filtered_lines.append(f"....<with an additional {total_dupes} entries found>...\n")
removed_count += total_dupes
i = position_row+max_consecutive_rows_to_check #+1 + size?
# If no duplicates were found:
else:
print("No duplicates found, adding to filtered lines.")
filtered_lines.extend(block)
# Advance to next block
i =+max_consecutive_rows_to_check
# Output to new file
output_filename = file_path
if output_filename.endswith(".txt"):
output_filename = output_filename[:-4]
if not output_filename.endswith("-truncated"):
output_filename = f"{output_filename}-truncated.txt"
with open(output_filename, "w") as output_file:
output_file.writelines(filtered_lines)
print(f"Processed {len(lines)} lines from {file_path}")
print(f"Output saved to {output_filename}")
if removed_count > 0:
print(f"Removed {removed_count} duplicate blocks.")
....etc..
Any help in logic flow, efficenice or if anyone knows of any exist lib that de-dupes text files based on consecutive text blocks?
Thanks
I am trying to create a python script that will remove consecutive duplicate text blocks from a text file, ie do something like this:
- Locate text blocks that are the same, based on # of rows (with a max block size X that determines the initial max rows per text block, then reduces to 1 row at a time)
- If there are consecutive X row text block duplicates, remove them and replace the duplicates with a simple "...has an additional X similar entries..." after the initial text block that has duplicates following it and replaces those duplicates.
I also want to ensure when compariing that
- any 2+ spaces in the text rows are repalced with 1 space for the coparison
- any digits are removed (including 'words that start with a number and end with a numbner or period" - which will remove dates eg: XX/XX/XX, comma/decimal numbers eg: 3,444.22, and points eg: 1.) from each row
- Skips empty lines with strip()'d for text block comparing (ie ignored)
The result will be a new txt file that has the duplicate text blocks removed.
I have the row clean function for the comparison like this:
def clean_for_comparison(line):
# Remove dollar amounts (e.g., $1,234.56) - additional example of cleaning row
line = re.sub(r'\$[\d,]+(?:\.\d+)?', '', line)
# Remove numbers with commas and decimals (e.g., 1,234.56)
line = re.sub(r'[\d,]+(?:\.\d+)?', '', line)
# Remove date-like patterns (e.g., 17/01/2020 or 17-01-2020)
line = re.sub(r'\d{1,2}[-/.\s]?\d{1,2}[-/.\s]?\d{2,4}', '', line)
# Remove words starting and ending with digits (e.g., 123abc456)
line = re.sub(r'\b\d+[a-zA-Z]+\d+\b', '', line)
# Normalize spaces and strip
line = re.sub(r'\s+', ' ', line)
line = line.strip()
return line
I am trying to use a hash method for the text block compare by constructing the block using X rows, but am having issues with how it should work effiicently.
I use these helper functions:
def get_block_hash(block):
cleaned_lines = [clean_for_comparison(line) for line in block]
non_blank_cleaned_lines = [line for line in cleaned_lines if line]
cleaned_block_content = ''.join(non_blank_cleaned_lines)
return hashlib.md5(cleaned_block_content.encode()).hexdigest()
def get_block(lines, start_index, max_size):
block = []
i = start_index
row_count = 0
while i < len(lines) and row_count < max_size:
line = lines[i].strip()
if line:
block.append(lines[i])
row_count += 1
i += 1
if i >= len(lines):
break
if lines[i - 1].strip() == "" and lines[i].strip() == "":
break
return block, i
but the it gets confusing trying to do it properly/efficient....so far I was looking at this approach, but it's not quite right...
def process_file(file_path, max_consecutive_rows_to_check=4):
with open(file_path, 'r') as file:
lines = file.readlines()
filtered_lines = []
i = 0
removed_count = 0
while i < len(lines):
block, j = get_block(lines, i, max_consecutive_rows_to_check)
if not block: # Skip processing.
i = j # Increment
continue # Skip empty block
# Sliding window comparison
for size in range(min(len(block), max_consecutive_rows_to_check), 0, -1):
block_to_compare = block[:size]
position_row = initial_block_start
while True:
compare_start = position_row +1 # use the next row outside the intial block to compare?
compare_block, next_k = get_block(lines, compare_start, size)
compare_line = compare_block
if len(compare_block) != size:
break # Done with comapre - no more
if get_block_hash(block_to_compare) == get_block_hash(compare_block):
print("DUPE FOUND!")
found_dupes = True
total_dupes += 1 # dupe counter
position_row = compare_start #set next dupe position to compare consecutively with
i += compare_start
#continue #continue if it finds?
else: #Did not find anything
print("Not a dupe.")
break
# If duplicates were found:
if found_dupes:
print(f"Total duplicates found: {total_dupes}")
filtered_lines.extend(block)
filtered_lines.append(f"....<with an additional {total_dupes} entries found>...\n")
removed_count += total_dupes
i = position_row+max_consecutive_rows_to_check #+1 + size?
# If no duplicates were found:
else:
print("No duplicates found, adding to filtered lines.")
filtered_lines.extend(block)
# Advance to next block
i =+max_consecutive_rows_to_check
# Output to new file
output_filename = file_path
if output_filename.endswith(".txt"):
output_filename = output_filename[:-4]
if not output_filename.endswith("-truncated"):
output_filename = f"{output_filename}-truncated.txt"
with open(output_filename, "w") as output_file:
output_file.writelines(filtered_lines)
print(f"Processed {len(lines)} lines from {file_path}")
print(f"Output saved to {output_filename}")
if removed_count > 0:
print(f"Removed {removed_count} duplicate blocks.")
....etc..
Any help in logic flow, efficenice or if anyone knows of any exist lib that de-dupes text files based on consecutive text blocks?
Thanks
Share Improve this question edited Mar 15 at 19:02 joanis 12.4k23 gold badges37 silver badges48 bronze badges asked Mar 15 at 16:59 jkeysjkeys 1459 bronze badges1 Answer
Reset to default 0I believe i have resolved this. So for anyone else wanting to do something similar, here is the code:
import hashlib
import re
import glob
import string
import sys
def clean_for_comparison(line):
line = line.lower() # Convert text to lowercase
line = line.translate(str.maketrans("", "", string.punctuation)) # Remove punctuation using string.punctuation
line = re.sub(r'\$[\d,]+(?:\.\d+)?', '', line) # Remove dollar amounts (e.g., $1,234.56)
line = re.sub(r'[\d,]+(?:\.\d+)?', '', line) # Remove numbers with commas and decimals (e.g., 1,234.56)
line = re.sub(r'\d{1,2}[-/.\s]?\d{1,2}[-/.\s]?\d{2,4}', '', line) # Remove date-like patterns (e.g., 17/01/2020 or 17-01-2020)
line = re.sub(r'\b\d+[a-zA-Z]+\d+\b', '', line) # Remove words starting and ending with digits (e.g., 123abc456)
line = re.sub(r'\s+', ' ', line).strip() # Normalize spaces by collapsing multiple spaces into one
return line
def get_block_hash(block):
cleaned_lines = [clean_for_comparison(line) for line in block]
non_blank_cleaned_lines = [line for line in cleaned_lines if line]
cleaned_block_content = ''.join(non_blank_cleaned_lines)
return hashlib.md5(cleaned_block_content.encode()).hexdigest()
def get_block(lines, start_index, block_size):
block = []
i = start_index
count = 0 # Track the number of content lines added to the block
while count < block_size and i < len(lines):
line = lines[i].strip()
if line: # Skip empty lines
block.append(line)
count += 1
i += 1
return block, i
def process_file(file_path, max_consecutive_rows_to_check=6):
try:
with open(file_path, 'r') as file:
lines = file.readlines()
except Exception as e:
print(f"Error reading file {file_path}: {e}")
return
output_lines = []
i = 0
removed_count = 0
while i < len(lines):
block_found = False
for current_rows_to_check in range(max_consecutive_rows_to_check, 0, -1):
if i + current_rows_to_check > len(lines):
continue
block, j = get_block(lines, i, current_rows_to_check)
if not block:
break # Skip if no more blocks
cleaned_block = [clean_for_comparison(line) for line in block] # Clean the block *before* hashing and printing
block_hash = get_block_hash(block)
consecutive_duplicates = 0
next_i = j # Index after the current block
# Compare consecutive blocks
while True:
if next_i + current_rows_to_check > len(lines):
break
compare_block, next_j = get_block(lines, next_i, current_rows_to_check)
if not compare_block:
break
cleaned_compare_block = [clean_for_comparison(line) for line in compare_block] # Clean the compare_block *before* hashing and comparison
compare_block_hash = get_block_hash(compare_block)
if block_hash == compare_block_hash:
consecutive_duplicates += 1
next_i = next_j # Move to the next block after the duplicate
else:
break # No match, stop the duplicate search
if consecutive_duplicates > 0:
output_lines.append(''.join(lines[i:i+current_rows_to_check]) + '\n') # Append the original block of text (not the cleaned version)
output_lines.append(f"....<with an additional {consecutive_duplicates} similar entries found>...\n")
removed_count += consecutive_duplicates
i = next_i # Move to the next block after the last duplicate
block_found = True
break
if not block_found:
cleaned_line = clean_for_comparison(lines[i]) # Apply the cleaned line to skip empty lines from the cleaning function
# Append original line for printing in the -truncated document - but ignore if the CLEANED is BLANK from cleaning
if cleaned_line:
output_lines.append(lines[i]) # No duplicates and line remains
i += 1 # Next row of the whole document
# Output to new file
output_filename = file_path
if output_filename.endswith(".txt"):
output_filename = output_filename[:-4]
if not output_filename.endswith("-truncated"):
output_filename = f"{output_filename}-truncated.txt"
try:
with open(output_filename, "w") as output_file:
output_file.writelines(output_lines) # Save the processed original lines
print(f"Output saved to {output_filename}")
except Exception as e:
print(f"Error writing to file {output_filename}: {e}")