import os import sys sys.path.append(os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))) import re import nltk from nltk.tokenize import word_tokenize from nltk.tag import pos_tag from nltk.parse import ChartParser, ViterbiParser from nltk.grammar import CFG, PCFG, Nonterminal, ProbabilisticProduction from nltk.tree import Tree import contractions import string from collections import defaultdict import spacy nlp = spacy.load("en_core_web_sm") import json with open("data/en_vi_dictionary.json", "r", encoding='utf-8') as json_file: dictionary = json.load(json_file) with open('grammar.txt', 'r', encoding='utf-8') as text_file: grammar = text_file.read() class TransferBasedMT: def __init__(self) -> None: # English - Vietnamese dictionary self.dictionary = dictionary # Define the CFG grammar for English sentence structure self.grammar = grammar ################################################ STAGE 1: PREPROCESSING SOURCE SENTENCE ################################################### def preprocessing(self, sentence: str) -> str: """Preprocess the input sentence: handle named entities, lowercase, expand contractions, and tokenize and regroup.""" # Handle named entities, e.g. New York -> New_York doc = nlp(sentence) entities = {ent.text: ent.label_ for ent in doc.ents} for ent_text in sorted(entities.keys(), key=len,reverse=True): ent_joined = ent_text.replace(" ", "_") sentence = sentence.replace(ent_text, ent_joined) # Lowercase and strip redundant space sentence = sentence.lower().strip() # Expand contractions, e.g. don't -> do not sentence = contractions.fix(sentence) #type: ignore # Tokenize and regroup tokens sentence = " ".join(word_tokenize(sentence)) return sentence def safe_tag(self, tag): """Convert tags with special characters to safe nonterminal symbols.""" return tag.replace("$", "S") ################################################ STAGE 2: ANALYZE SOURCE SENTENCE ######################################################### def analyze_source(self, sentence: str): """Analyze the source sentence: tokenize, POS tag, and parse into a syntax tree.""" doc = nlp(sentence) filtered_pos_tagged = [] punctuation_marks = [] for i, token in enumerate(doc): word = token.text tag = token.tag_ if all(char in string.punctuation for char in word): punctuation_marks.append((i, word, tag)) else: filtered_pos_tagged.append((token.lemma_.lower(), tag)) grammar_str = self.grammar # Add terminal rule grammars for word, tag in filtered_pos_tagged: safe_tag = self.safe_tag(tag) escaped_word = word.replace('"', '\\"') grammar_str += f'\n{safe_tag} -> "{escaped_word}"' try: grammar = CFG.fromstring(grammar_str) parser = ChartParser(grammar) tagged_tokens_only = [word for word, _ in filtered_pos_tagged] parses = list(parser.parse(tagged_tokens_only)) # Generate parse trees tree = (parses[0] if parses else self._create_fallback_tree(filtered_pos_tagged)) # Use first parse or fallback tree = self._add_punctuation_to_tree(tree, punctuation_marks) # Reattach punctuation return tree except Exception as e: print(f"Grammar creation error: {e}") return self._create_fallback_tree(filtered_pos_tagged) # Fallback on error def _create_fallback_tree(self, pos_tagged): """Create a simple fallback tree when parsing fails.""" children = [Tree(self.safe_tag(tag), [word]) for word, tag in pos_tagged] # Create leaf nodes for each token return Tree("S", children) # Wrap in a sentence node def _add_punctuation_to_tree(self, tree, punctuation_marks): """Add punctuation marks back to the syntax tree.""" if not punctuation_marks: return tree if tree.label() == "S": # Only add to root sentence node for _, word, tag in sorted(punctuation_marks): tree.append(Tree(self.safe_tag(tag), [word])) return tree #################################################### STAGE 3: TRANSFER GRAMMAR ############################################################ def transfer_grammar(self, tree): """Transfer the English parse tree to Vietnamese structure.""" if not isinstance(tree, nltk.Tree): return tree # Sentence level: recurse through children if tree.label() == "S": return Tree("S", [self.transfer_grammar(child) for child in tree]) # Verb Phrase: adjust word order elif tree.label() == "VP": children = [self.transfer_grammar(child) for child in tree] child_labels = [child.label() if isinstance(child, Tree) else child for child in children] if (len(children) >= 3 and "V" in child_labels and "To" in child_labels and "VP" in child_labels): # Remove TO from V TO VP return Tree("VP", [children[0], children[2]]) return Tree("VP", children) # Default: preserve order # Noun Phrase: adjust word order elif tree.label() == "NP": children = [self.transfer_grammar(child) for child in tree] child_labels = [child.label() if isinstance(child, Tree) else child for child in children] if (len(children) >= 3 and 'Det' in child_labels and 'AdjP' in child_labels and 'N' in child_labels): # Reorder Det Adj N -> Det N Adj return Tree("NP", [children[0], children[2], children[1]]) elif (len(children) >= 2 and 'PRPS' in child_labels and 'N' in child_labels): # Reorder PRPS N -> N PRPS return Tree("NP", [children[1], children[0]]) elif (len(children) >= 2 and 'Det' in child_labels and 'N' in child_labels): # Remove Det from Det N return Tree("NP", [children[1]]) return Tree("NP", children) # Default: preserve order # Prepositional Phrase: adjust word order elif tree.label() == "PP": children = [self.transfer_grammar(child) for child in tree] return Tree("PP", children) # Default: preserve order # Adverbial Phrase: adjust word order elif tree.label() == 'AdvP': children = [self.transfer_grammar(child) for child in tree] return Tree("AdvP", children) # Default: preserve order # Adjective Phrase: adjust word order elif tree.label() == 'AdjP': children = [self.transfer_grammar(child) for child in tree] return Tree("AdjP", children) # Default: preserve order # Wh-Question: adjust word order elif tree.label() == "WhQ": children = [self.transfer_grammar(child) for child in tree] child_labels = [child.label() if isinstance(child, Tree) else child for child in children] if len(children) >= 4 and "WH_Word" in child_labels and "AUX" in child_labels and "NP" in child_labels and "VP" in child_labels: return Tree("WhQ", [children[2], children[3], children[0]]) # Remove AUX from WH_Word AUX NP VP elif len(children) >= 3 and "WH_Word" in child_labels and "NP" in child_labels and "VP" in child_labels and "AUX" not in child_labels: return Tree("WhQ", [children[1], children[2], children[0]]) elif len(children) >= 2 and "WH_Word" in child_labels and "VP" in child_labels: if len(children[1]) >= 2: return Tree("WhQ", [children[1][1], children[1][0], children[0]]) # WH_Word VP -> WH_Word V NP else: return Tree("WhQ", children) # Default: preserve order # Yes/No-Question: adjust word order elif tree.label() == "YNQ": children = [self.transfer_grammar(child) for child in tree] child_labels = [child.label() if isinstance(child, Tree) else child for child in children] if len(children) >= 3 and "AUX" in child_labels and "NP" in child_labels and "VP" in child_labels: return Tree("YNQ", [children[1], children[2]]) elif len(children) >= 3 and "DO" in child_labels and "NP" in child_labels and "VP" in child_labels: return Tree("YNQ", [children[1], children[2]]) elif len(children) >= 3 and "MD" in child_labels and "NP" in child_labels and "VP" in child_labels: return Tree("YNQ", [children[1], children[2]]) return Tree("YNQ", children) # Other labels: recurse through children else: return Tree(tree.label(), [self.transfer_grammar(child) for child in tree]) #################################################### STAGE 4: GENERATION STAGE ############################################################ def generate(self, tree): """Generate Vietnamese output from the transformed tree.""" if not isinstance(tree, nltk.Tree): return self._lexical_transfer(tree) # Translate leaf nodes words = [self.generate(child) for child in tree if self.generate(child)] # Recurse # Handle questions specifically if tree.label() == "WhQ": words = self._process_wh_question(tree, words) elif tree.label() == "YNQ": words = self._process_yn_question(tree, words) elif tree.label() == "NP": # Add classifiers for nouns words = self._add_classifiers(tree, words) elif tree.label() == "VP": # Apply tense/aspect/mood markers words = self._apply_tam_mapping(tree, words) words = self._apply_agreement(tree, words) # Handle agreement (e.g., plurals) result = " ".join(words) # Join words into a string return result def _process_wh_question(self, tree, words): """Process a Wh-question structure for Vietnamese.""" words = [w for w in words if w] wh_word = None for word in words: if word in ["cái gì", "ai", "ở đâu", "khi nào", "tại sao", "như thế nào", "cái nào", "của ai"]: wh_word = word break if wh_word == "tại sao": if words and words[0] != "tại sao": words.remove("tại sao") words.insert(0, "tại sao") elif wh_word == "như thế nào": if "vậy" not in words: words.append("vậy") question_particles = ["vậy", "thế", "à", "hả"] has_particle = any(particle in words for particle in question_particles) if not has_particle and wh_word != "tại sao": words.append("vậy") return words def _process_yn_question(self, tree, words): """Process a Yes/No question structure for Vietnamese.""" words = [w for w in words if w not in ["", "do_vn", "does_vn", "did_vn"]] has_question_particle = any(w in ["không", "à", "hả", "nhỉ", "chứ"] or w in ["không_vn", "à_vn", "hả_vn", "nhỉ_vn", "chứ_vn"] for w in words) if not has_question_particle: if "đã" in words or "đã_vn" in words: words.append("phải không") else: words.append("không") return words def _lexical_transfer(self, word): """Translate English words to Vietnamese using the dictionary.""" if word in self.dictionary: return self.dictionary[word] # Return translation if in dictionary return f"{word}_vn" # Mark untranslated words with _vn suffix def _add_classifiers(self, np_tree, words): """Add Vietnamese classifiers based on nouns.""" # noun_indices = [ # i for i, child in enumerate(np_tree) if isinstance(child, Tree) # and child.label() in ["N", "NN", "NNS", "NNP", "NNPS"] # ] # Find noun positions # for i in noun_indices: # if len(words) > i and not any(words[i].startswith(prefix) for prefix in ["một_vn", "những_vn", "các_vn"]): # Check if classifier is needed # if words[i].endswith("_vn"): # Add default classifier for untranslated nouns # words.insert(i, "cái_vn") return words def _apply_tam_mapping(self, vp_tree, words): """Apply Vietnamese TAM (Tense, Aspect, Mood) markers to the word list. Args: vp_tree: A parse tree node representing the verb phrase. words: List of words to be modified with TAM markers. Returns: List of words with appropriate Vietnamese TAM markers inserted. """ verb_tense = None mood = None # Identify verb tense and mood from the verb phrase tree for child in vp_tree: if isinstance(child, Tree): if child.label() in ["V", "VB", "VBD", "VBG", "VBN", "VBP", "VBZ"]: verb_tense = child.label() if child.label() == "MD": # Modal verbs indicating mood mood = "indicative" elif child.label() == "TO": # Infinitive marker, often subjunctive mood = "subjunctive" if not verb_tense: print("Warning: No verb tense identified in the verb phrase tree.") return words # Apply TAM markers based on verb tense if verb_tense == "VBD": words.insert(0, "đã_vn") elif verb_tense == "VB": if "will_vn" in words: words = [w for w in words if w != "will_vn"] words.insert(0, "sẽ_vn") elif "going_to_vn" in words: words = [w for w in words if w != "going_to_vn"] words.insert(0, "sẽ_vn") elif verb_tense == "VBG": words.insert(0, "đang_vn") if "đã_vn" in words: words.insert(0, "đã_vn") elif verb_tense == "VBN": words.insert(0, "đã_vn") elif verb_tense == "VBP" or verb_tense == "VBZ": pass # Handle future continuous (e.g., "will be running" -> "sẽ đang") if verb_tense == "VBG" and "will_vn" in words: words = [w for w in words if w != "will_vn"] words.insert(0, "đang_vn") # Continuous marker words.insert(0, "sẽ_vn") # Future marker # Apply mood markers if applicable if mood == "subjunctive": words.insert(0, "nếu_vn") # Subjunctive marker (e.g., "if" clause) elif mood == "indicative" and "must_vn" in words: words = [w for w in words if w != "must_vn"] words.insert(0, "phải_vn") # Necessity marker return words def _apply_agreement(self, tree, words): """Apply agreement rules for Vietnamese (e.g., pluralization).""" if tree.label() == "NP": for i, word in enumerate(words): if "_vn" in word and word.replace("_vn", "").endswith("s"): # Handle English plurals base_word = word.replace("_vn", "")[:-1] + "_vn" # Remove 's' words[i] = base_word words.insert(i, "các_vn") # Add plural marker return words def _post_process_vietnamese(self, text): """Post-process the Vietnamese output: remove _vn, fix punctuation, capitalize.""" text = text.replace("_vn", "") # Remove untranslated markers def fix_entities(word): if "_" in word: word = " ".join([w for w in word.split("_")]) return word.title() return word.lower() # Lowercase non-entity words words = text.split() words = [fix_entities(word) for word in words] text = " ".join(words) for punct in [".", ",", "!", "?", ":", ";"]: # Attach punctuation directly text = text.replace(f" {punct}", punct) if text: words = text.split() words[0] = words[0].capitalize() # Capitalize first word text = ' '.join(words) return text def translate(self, english_sentence): """Main translation function that applies all stages of the process.""" # Step 1: Preprocess input preprocessed = self.preprocessing(english_sentence) # Step 2: Parse English sentence source_tree = self.analyze_source(preprocessed) print("English parse tree:") source_tree.pretty_print() # Display English parse tree # Step 3: Transform to Vietnamese structure target_tree = self.transfer_grammar(source_tree) print("Vietnamese structure tree:") target_tree.pretty_print() # Display Vietnamese parse tree # Step 4: Generate final translation raw_output = self.generate(target_tree) vietnamese_output = self._post_process_vietnamese(raw_output) return vietnamese_output if __name__ == "__main__": translator = TransferBasedMT() test_sentences = [ "I read books.", "The student studies at school.", "She has a beautiful house.", "They want to buy a new car.", "This is a good computer.", "Are you ready to listen?", "I want to eat.", "This is my book.","What is your name?", "Do you like books?", "Is she at school?", "Are you ready to listen?", "Can they buy a new car?", "Did he read the book yesterday?", "What is your name?", "Where do you live?", "Who is your teacher?", "When will you go to school?", "Why did he leave early?", "How do you feel today?", "I live in New York" ] test_sentences_2 = [ # YNQ -> BE NP "Is the renowned astrophysicist still available for the conference?", "Are those adventurous explorers currently in the remote jungle?", "Was the mysterious stranger already gone by midnight?", # YNQ -> BE NP Adj "Is the vibrant annual festival exceptionally spectacular this season?", "Are the newly discovered species remarkably resilient to harsh climates?", "Were the ancient ruins surprisingly well-preserved after centuries?", # YNQ -> BE NP NP "Is she the brilliant leader of the innovative research team?", "Are they the enthusiastic organizers of the grand charity event?", "Was he the sole survivor of the perilous expedition?", # YNQ -> BE NP PP "Is the priceless artifact still hidden in the ancient underground chamber?", "Are the colorful tropical birds nesting high above the lush rainforest canopy?", "Was the historic manuscript carefully stored within the fortified library vault?" ] print("English to Vietnamese Translation Examples:") print("-" * 50) for sentence in test_sentences_2: print(f"English: {sentence}") translation = translator.translate(sentence) print(f"Vietnamese: {translation}") print()