allycat / 2_process_files.py
niloydebbarma's picture
Upload 50 files
a7d2416 verified
import os, sys
import shutil
from pathlib import Path
from docling.document_converter import DocumentConverter
import html2text
import logging
import hashlib
from my_config import MY_CONFIG
logging.basicConfig(level=logging.WARNING, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
logger.setLevel(logging.INFO)
def cleanup_duplicate_markdown_files(processed_dir):
"""
Remove duplicate markdown files based on content hash.
Keeps the first file encountered for each unique content.
"""
processed_path = Path(processed_dir)
md_files = list(processed_path.glob('*.md'))
if not md_files:
logger.info("No markdown files found for deduplication")
return 0
content_hashes = {}
duplicates_removed = 0
for md_file in md_files:
try:
with open(md_file, 'r', encoding='utf-8') as f:
content = f.read()
content_hash = hashlib.md5(content.encode('utf-8')).hexdigest()
if content_hash in content_hashes:
os.remove(md_file)
duplicates_removed += 1
logger.info(f"Removed duplicate: {md_file} (same content as {content_hashes[content_hash]})")
else:
content_hashes[content_hash] = md_file
except Exception as e:
logger.warning(f"Error processing {md_file} for deduplication: {e}")
logger.info(f"βœ… Deduplication complete. Removed {duplicates_removed} duplicate files")
return duplicates_removed
## --- end of cleanup_duplicate_markdown_files ---
def process_files(crawl_dir, processed_dir):
"""
Process all files in the crawl directory and convert them to markdown.
Uses html2text for HTML/HTM files and docling for PDFs and other documents.
Args:
crawl_dir (str): Directory containing files to process
processed_dir (str): Directory to save processed markdown files
"""
input_path = Path(crawl_dir)
input_files = list(input_path.glob('*'))
logger.info (f"Found {len(input_files)} files to process in {input_path}")
shutil.rmtree(processed_dir, ignore_errors=True)
shutil.os.makedirs(processed_dir, exist_ok=True)
logger.info (f"βœ… Cleared processed data directory : {processed_dir}")
# Initialize converters
docling_converter = DocumentConverter(format_options={"preserve_links": True})
html_converter = html2text.HTML2Text()
html_converter.ignore_links = False
html_converter.ignore_images = False
files_processed = 0
errors = 0
file_type_stats = {}
for input_file in input_files:
file_ext = input_file.suffix.lower()
markdown_content = None
try:
# Process HTML/HTM files with html2text
if file_ext in ['.html', '.htm']:
with open(input_file, 'r', encoding='utf-8', errors='ignore') as f:
html_content = f.read()
markdown_content = html_converter.handle(html_content)
logger.debug(f"Converted HTML '{input_file}' with html2text")
# Process TXT files directly
elif file_ext == '.txt':
with open(input_file, 'r', encoding='utf-8', errors='ignore') as f:
markdown_content = f.read()
logger.debug(f"Processed TXT '{input_file}' directly")
# Process PDF and other documents with docling
else:
result = docling_converter.convert(input_file)
markdown_content = result.document.export_to_markdown()
logger.debug(f"Converted '{input_file}' with docling")
# Save markdown file
if markdown_content:
md_file_name = os.path.join(processed_dir, f"{input_file.stem}.md")
with open(md_file_name, "w", encoding="utf-8") as md_file:
md_file.write(markdown_content)
files_processed += 1
file_type_stats[file_ext] = file_type_stats.get(file_ext, 0) + 1
except Exception as e:
errors += 1
logger.warning(f"Error processing {input_file}: {e}")
logger.info (f"βœ… Processed {files_processed} files. Errors: {errors}")
# Print file type statistics in compact dictionary format
if file_type_stats:
logger.info(f"πŸ“Š File type statistics: {dict(sorted(file_type_stats.items()))}")
return files_processed, errors, file_type_stats
## --- end of process_files ---
def main():
"""
Main function to run the file processing pipeline.
"""
logger.info("πŸš€ Starting file processing pipeline")
try:
files_processed, errors, file_type_stats = process_files(MY_CONFIG.CRAWL_DIR, MY_CONFIG.PROCESSED_DATA_DIR)
duplicates_removed = cleanup_duplicate_markdown_files(MY_CONFIG.PROCESSED_DATA_DIR)
logger.info(f"βœ… Final summary: {files_processed} files processed, {errors} errors, {duplicates_removed} duplicates removed")
logger.info("βœ… File processing pipeline completed successfully")
return 0
except Exception as e:
logger.error(f"❌ File processing pipeline failed: {e}")
return 1
if __name__ == "__main__":
sys.exit(main())