Spaces:
Runtime error
Runtime error
| import os, sys | |
| import shutil | |
| from pathlib import Path | |
| from docling.document_converter import DocumentConverter | |
| import html2text | |
| import logging | |
| import hashlib | |
| from my_config import MY_CONFIG | |
| logging.basicConfig(level=logging.WARNING, format='%(asctime)s - %(levelname)s - %(message)s') | |
| logger = logging.getLogger(__name__) | |
| logger.setLevel(logging.INFO) | |
| def cleanup_duplicate_markdown_files(processed_dir): | |
| """ | |
| Remove duplicate markdown files based on content hash. | |
| Keeps the first file encountered for each unique content. | |
| """ | |
| processed_path = Path(processed_dir) | |
| md_files = list(processed_path.glob('*.md')) | |
| if not md_files: | |
| logger.info("No markdown files found for deduplication") | |
| return 0 | |
| content_hashes = {} | |
| duplicates_removed = 0 | |
| for md_file in md_files: | |
| try: | |
| with open(md_file, 'r', encoding='utf-8') as f: | |
| content = f.read() | |
| content_hash = hashlib.md5(content.encode('utf-8')).hexdigest() | |
| if content_hash in content_hashes: | |
| os.remove(md_file) | |
| duplicates_removed += 1 | |
| logger.info(f"Removed duplicate: {md_file} (same content as {content_hashes[content_hash]})") | |
| else: | |
| content_hashes[content_hash] = md_file | |
| except Exception as e: | |
| logger.warning(f"Error processing {md_file} for deduplication: {e}") | |
| logger.info(f"β Deduplication complete. Removed {duplicates_removed} duplicate files") | |
| return duplicates_removed | |
| ## --- end of cleanup_duplicate_markdown_files --- | |
| def process_files(crawl_dir, processed_dir): | |
| """ | |
| Process all files in the crawl directory and convert them to markdown. | |
| Uses html2text for HTML/HTM files and docling for PDFs and other documents. | |
| Args: | |
| crawl_dir (str): Directory containing files to process | |
| processed_dir (str): Directory to save processed markdown files | |
| """ | |
| input_path = Path(crawl_dir) | |
| input_files = list(input_path.glob('*')) | |
| logger.info (f"Found {len(input_files)} files to process in {input_path}") | |
| shutil.rmtree(processed_dir, ignore_errors=True) | |
| shutil.os.makedirs(processed_dir, exist_ok=True) | |
| logger.info (f"β Cleared processed data directory : {processed_dir}") | |
| # Initialize converters | |
| docling_converter = DocumentConverter(format_options={"preserve_links": True}) | |
| html_converter = html2text.HTML2Text() | |
| html_converter.ignore_links = False | |
| html_converter.ignore_images = False | |
| files_processed = 0 | |
| errors = 0 | |
| file_type_stats = {} | |
| for input_file in input_files: | |
| file_ext = input_file.suffix.lower() | |
| markdown_content = None | |
| try: | |
| # Process HTML/HTM files with html2text | |
| if file_ext in ['.html', '.htm']: | |
| with open(input_file, 'r', encoding='utf-8', errors='ignore') as f: | |
| html_content = f.read() | |
| markdown_content = html_converter.handle(html_content) | |
| logger.debug(f"Converted HTML '{input_file}' with html2text") | |
| # Process TXT files directly | |
| elif file_ext == '.txt': | |
| with open(input_file, 'r', encoding='utf-8', errors='ignore') as f: | |
| markdown_content = f.read() | |
| logger.debug(f"Processed TXT '{input_file}' directly") | |
| # Process PDF and other documents with docling | |
| else: | |
| result = docling_converter.convert(input_file) | |
| markdown_content = result.document.export_to_markdown() | |
| logger.debug(f"Converted '{input_file}' with docling") | |
| # Save markdown file | |
| if markdown_content: | |
| md_file_name = os.path.join(processed_dir, f"{input_file.stem}.md") | |
| with open(md_file_name, "w", encoding="utf-8") as md_file: | |
| md_file.write(markdown_content) | |
| files_processed += 1 | |
| file_type_stats[file_ext] = file_type_stats.get(file_ext, 0) + 1 | |
| except Exception as e: | |
| errors += 1 | |
| logger.warning(f"Error processing {input_file}: {e}") | |
| logger.info (f"β Processed {files_processed} files. Errors: {errors}") | |
| # Print file type statistics in compact dictionary format | |
| if file_type_stats: | |
| logger.info(f"π File type statistics: {dict(sorted(file_type_stats.items()))}") | |
| return files_processed, errors, file_type_stats | |
| ## --- end of process_files --- | |
| def main(): | |
| """ | |
| Main function to run the file processing pipeline. | |
| """ | |
| logger.info("π Starting file processing pipeline") | |
| try: | |
| files_processed, errors, file_type_stats = process_files(MY_CONFIG.CRAWL_DIR, MY_CONFIG.PROCESSED_DATA_DIR) | |
| duplicates_removed = cleanup_duplicate_markdown_files(MY_CONFIG.PROCESSED_DATA_DIR) | |
| logger.info(f"β Final summary: {files_processed} files processed, {errors} errors, {duplicates_removed} duplicates removed") | |
| logger.info("β File processing pipeline completed successfully") | |
| return 0 | |
| except Exception as e: | |
| logger.error(f"β File processing pipeline failed: {e}") | |
| return 1 | |
| if __name__ == "__main__": | |
| sys.exit(main()) |