From 76c2f33ea6aa4bd077de93d31681998c7e07dd6b Mon Sep 17 00:00:00 2001 From: "Juan M. Ley" Date: Sun, 22 Mar 2026 12:42:47 -0600 Subject: [PATCH] error fixing --- src/consumers/report_consumer.py | 61 +++++-- src/consumers/user_consumer.py | 16 +- src/infrastructure/adapters/persistence/db.py | 8 +- .../persistence/user_repository_sql.py | 150 ++++++++++++------ 4 files changed, 167 insertions(+), 68 deletions(-) diff --git a/src/consumers/report_consumer.py b/src/consumers/report_consumer.py index 94df7c0..318fb38 100644 --- a/src/consumers/report_consumer.py +++ b/src/consumers/report_consumer.py @@ -52,10 +52,12 @@ class ReportConsumer: except Exception as e: logger.error(f"Error processing report message: {e}", exc_info=True) + # Rollback explícito en caso de error + self.user_repo.db.rollback() raise def _handle_create_report(self, message: ReportMessage): - """Handle report create event""" + """Handle report create event con manejo de transacciones cruzadas""" try: logger.info(f"Creating report: {message.id_reporte} from user {message.id_usuario}") @@ -73,20 +75,33 @@ class ReportConsumer: fecha_creacion=fecha_creacion ) - # Save to repository - saved_report = self.repo.save(report) - logger.info(f"Report created successfully: {message.id_reporte}") + try: + # Save to MongoDB repository + saved_report = self.repo.save(report) + logger.info(f"Report created successfully in MongoDB: {message.id_reporte}") + except Exception as mongo_error: + logger.error(f"Error saving report to MongoDB: {mongo_error}", exc_info=True) + raise - # Increment user's report counter - self.user_repo.increment_reports(message.id_usuario) - logger.info(f"Incremented report counter for user: {message.id_usuario}") + try: + # Increment user's report counter in MySQL + self.user_repo.increment_reports(message.id_usuario) + logger.info(f"Incremented report counter for user: {message.id_usuario}") + except Exception as sql_error: + logger.error(f"Error incrementing report counter: {sql_error}", exc_info=True) + # Rollback SQL transaction + self.user_repo.db.rollback() + # Note: MongoDB save cannot be rolled back, log for manual cleanup + logger.critical(f"INCONSISTENCY: Report {message.id_reporte} saved to MongoDB but user counter not incremented for user {message.id_usuario}") + raise except Exception as e: logger.error(f"Error creating report: {e}", exc_info=True) + self.user_repo.db.rollback() raise def _handle_update_visibility(self, message: ReportMessage): - """Handle report visibility update event""" + """Handle report visibility update event con manejo de transacciones""" try: logger.info(f"Updating visibility for report: {message.id_reporte}") @@ -96,9 +111,13 @@ class ReportConsumer: logger.warning(f"Report not found: {message.id_reporte}") return - # Update visibility - self.repo.update_visibility(message.id_reporte, message.visibilidad) - logger.info(f"Report visibility updated: {message.id_reporte} -> {message.visibilidad}") + # Update visibility in MongoDB + try: + self.repo.update_visibility(message.id_reporte, message.visibilidad) + logger.info(f"Report visibility updated: {message.id_reporte} -> {message.visibilidad}") + except Exception as mongo_error: + logger.error(f"Error updating report visibility in MongoDB: {mongo_error}", exc_info=True) + raise # Penalize author if visibility is very low (shadowban) if message.penalize_author and message.visibilidad < 20: @@ -109,11 +128,17 @@ class ReportConsumer: new_rating = max(0, user.calificacion - 5) self.user_repo.update_rating(report.id_usuario, new_rating) logger.info(f"Author penalized: user {report.id_usuario} rating reduced to {new_rating}") - except Exception as e: - logger.error(f"Error penalizing author: {e}") - + else: + logger.warning(f"User not found for penalty: {report.id_usuario}") + except Exception as penalty_error: + logger.error(f"Error penalizing author: {penalty_error}", exc_info=True) + self.user_repo.db.rollback() + logger.critical(f"INCONSISTENCY: Report {message.id_reporte} visibility updated but author penalty failed") + raise + except Exception as e: logger.error(f"Error updating report visibility: {e}", exc_info=True) + self.user_repo.db.rollback() raise def _handle_delete_report(self, message: ReportMessage): @@ -129,6 +154,7 @@ class ReportConsumer: except Exception as e: logger.error(f"Error deleting report: {e}", exc_info=True) + self.user_repo.db.rollback() raise def start(self): @@ -142,6 +168,13 @@ class ReportConsumer: except Exception as e: logger.error(f"Consumer error: {e}", exc_info=True) raise + finally: + # Asegurar cierre de sesión SQL + if self.user_repo.db: + try: + self.user_repo.db.close() + except Exception as e: + logger.error(f"Error closing database session: {e}") if __name__ == '__main__': diff --git a/src/consumers/user_consumer.py b/src/consumers/user_consumer.py index 983f434..c0a0679 100644 --- a/src/consumers/user_consumer.py +++ b/src/consumers/user_consumer.py @@ -50,6 +50,8 @@ class UserConsumer: except Exception as e: logger.error(f"Error processing user message: {e}", exc_info=True) + # Rollback en caso de error en el procesamiento del mensaje + self.repo.db.rollback() raise def _handle_create_user(self, message: UserMessage): @@ -75,12 +77,13 @@ class UserConsumer: biografia=message.biografia ) - # Save to repository + # Save to repository with transaction handling saved_user = self.repo.save(user) logger.info(f"User created successfully: {saved_user.user_id} - {saved_user.email}") except Exception as e: logger.error(f"Error creating user: {e}", exc_info=True) + self.repo.db.rollback() raise def _handle_update_user(self, message: UserMessage): @@ -104,12 +107,13 @@ class UserConsumer: if message.biografia is not None: user.biografia = message.biografia - # Save to repository + # Save to repository with transaction handling updated_user = self.repo.update(user) logger.info(f"User updated successfully: {message.user_id}") except Exception as e: logger.error(f"Error updating user: {e}", exc_info=True) + self.repo.db.rollback() raise def _handle_delete_user(self, message: UserMessage): @@ -125,6 +129,7 @@ class UserConsumer: except Exception as e: logger.error(f"Error deleting user: {e}", exc_info=True) + self.repo.db.rollback() raise def start(self): @@ -138,6 +143,13 @@ class UserConsumer: except Exception as e: logger.error(f"Consumer error: {e}", exc_info=True) raise + finally: + # Asegurar cierre de sesión + if self.repo.db: + try: + self.repo.db.close() + except Exception as e: + logger.error(f"Error closing database session: {e}") if __name__ == '__main__': diff --git a/src/infrastructure/adapters/persistence/db.py b/src/infrastructure/adapters/persistence/db.py index 9864d36..86ae976 100644 --- a/src/infrastructure/adapters/persistence/db.py +++ b/src/infrastructure/adapters/persistence/db.py @@ -14,9 +14,13 @@ SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine) Base = declarative_base() def get_db(): - """Obtiene una sesión de base de datos""" + """Obtiene una sesión de base de datos con manejo de rollback""" db = SessionLocal() try: yield db + db.commit() # Commit explícito si no hubo errores + except Exception as e: + db.rollback() # Rollback si ocurre cualquier error + raise e finally: - db.close() + db.close() # Siempre cerrar la sesión diff --git a/src/infrastructure/adapters/persistence/user_repository_sql.py b/src/infrastructure/adapters/persistence/user_repository_sql.py index 8eced27..1a2b5ac 100644 --- a/src/infrastructure/adapters/persistence/user_repository_sql.py +++ b/src/infrastructure/adapters/persistence/user_repository_sql.py @@ -3,6 +3,9 @@ from domain.users import User from infrastructure.adapters.persistence.models import UserModel from infrastructure.adapters.persistence.db import SessionLocal from typing import List, Optional +import logging + +logger = logging.getLogger(__name__) class UserRepositorySQL(UserRepository): """Implementación del repositorio de Usuarios usando SQLAlchemy (MySQL)""" @@ -11,48 +14,68 @@ class UserRepositorySQL(UserRepository): self.db = db_session or SessionLocal() def save(self, user: User) -> User: - """Guarda un nuevo usuario""" - db_user = UserModel( - nombre=user.nombre, - apellido=user.apellido, - email=user.email, - fecha_nacimiento=user.fecha_nacimiento, - fecha_creacion=user.fecha_creacion, - calificacion=user.calificacion, - numero_reportes=user.numero_reportes, - url_foto_perfil=user.url_foto_perfil, - biografia=user.biografia - ) - self.db.add(db_user) - self.db.commit() - self.db.refresh(db_user) - - # Convertir de vuelta a dominio - return self._to_domain(db_user) + """Guarda un nuevo usuario con manejo de transacciones""" + try: + db_user = UserModel( + nombre=user.nombre, + apellido=user.apellido, + email=user.email, + fecha_nacimiento=user.fecha_nacimiento, + fecha_creacion=user.fecha_creacion, + calificacion=user.calificacion, + numero_reportes=user.numero_reportes, + url_foto_perfil=user.url_foto_perfil, + biografia=user.biografia + ) + self.db.add(db_user) + self.db.commit() + self.db.refresh(db_user) + logger.info(f"Usuario guardado exitosamente: {db_user.user_id}") + return self._to_domain(db_user) + except Exception as e: + self.db.rollback() + logger.error(f"Error al guardar usuario: {e}", exc_info=True) + raise def find_by_id(self, user_id: int) -> Optional[User]: """Obtiene un usuario por ID""" - db_user = self.db.query(UserModel).filter(UserModel.user_id == user_id).first() - if db_user: - return self._to_domain(db_user) - return None + try: + db_user = self.db.query(UserModel).filter(UserModel.user_id == user_id).first() + if db_user: + return self._to_domain(db_user) + return None + except Exception as e: + logger.error(f"Error al buscar usuario por ID {user_id}: {e}", exc_info=True) + raise def find_by_email(self, email: str) -> Optional[User]: """Obtiene un usuario por email""" - db_user = self.db.query(UserModel).filter(UserModel.email == email).first() - if db_user: - return self._to_domain(db_user) - return None + try: + db_user = self.db.query(UserModel).filter(UserModel.email == email).first() + if db_user: + return self._to_domain(db_user) + return None + except Exception as e: + logger.error(f"Error al buscar usuario por email {email}: {e}", exc_info=True) + raise def find_all(self) -> List[User]: """Obtiene todos los usuarios""" - db_users = self.db.query(UserModel).all() - return [self._to_domain(user) for user in db_users] + try: + db_users = self.db.query(UserModel).all() + return [self._to_domain(user) for user in db_users] + except Exception as e: + logger.error(f"Error al obtener todos los usuarios: {e}", exc_info=True) + raise def update(self, user: User) -> User: - """Actualiza un usuario""" - db_user = self.db.query(UserModel).filter(UserModel.user_id == user.user_id).first() - if db_user: + """Actualiza un usuario con manejo de transacciones""" + try: + db_user = self.db.query(UserModel).filter(UserModel.user_id == user.user_id).first() + if not db_user: + logger.warning(f"Usuario no encontrado para actualizar: {user.user_id}") + return user + db_user.nombre = user.nombre db_user.apellido = user.apellido db_user.calificacion = user.calificacion @@ -61,32 +84,59 @@ class UserRepositorySQL(UserRepository): db_user.biografia = user.biografia self.db.commit() self.db.refresh(db_user) + logger.info(f"Usuario actualizado exitosamente: {user.user_id}") return self._to_domain(db_user) - return user + except Exception as e: + self.db.rollback() + logger.error(f"Error al actualizar usuario {user.user_id}: {e}", exc_info=True) + raise def delete(self, user_id: int) -> bool: - """Elimina un usuario""" - db_user = self.db.query(UserModel).filter(UserModel.user_id == user_id).first() - if db_user: - self.db.delete(db_user) - self.db.commit() - return True - return False + """Elimina un usuario con manejo de transacciones""" + try: + db_user = self.db.query(UserModel).filter(UserModel.user_id == user_id).first() + if db_user: + self.db.delete(db_user) + self.db.commit() + logger.info(f"Usuario eliminado exitosamente: {user_id}") + return True + logger.warning(f"Usuario no encontrado para eliminar: {user_id}") + return False + except Exception as e: + self.db.rollback() + logger.error(f"Error al eliminar usuario {user_id}: {e}", exc_info=True) + raise def increment_reports(self, user_id: int) -> None: - """Incrementa el contador de reportes de un usuario""" - db_user = self.db.query(UserModel).filter(UserModel.user_id == user_id).first() - if db_user: - db_user.numero_reportes += 1 - self.db.commit() + """Incrementa el contador de reportes con manejo de transacciones""" + try: + db_user = self.db.query(UserModel).filter(UserModel.user_id == user_id).first() + if db_user: + db_user.numero_reportes += 1 + self.db.commit() + logger.info(f"Contador de reportes incrementado para usuario: {user_id}") + else: + logger.warning(f"Usuario no encontrado para incrementar reportes: {user_id}") + except Exception as e: + self.db.rollback() + logger.error(f"Error al incrementar reportes del usuario {user_id}: {e}", exc_info=True) + raise def update_rating(self, user_id: int, new_rating: float) -> None: - """Actualiza la calificación de un usuario""" - db_user = self.db.query(UserModel).filter(UserModel.user_id == user_id).first() - if db_user: - # Asegurar que la calificación esté en el rango 0-100 - db_user.calificacion = max(0, min(100, new_rating)) - self.db.commit() + """Actualiza la calificación de un usuario con manejo de transacciones""" + try: + db_user = self.db.query(UserModel).filter(UserModel.user_id == user_id).first() + if db_user: + # Asegurar que la calificación esté en el rango 0-100 + db_user.calificacion = max(0, min(100, new_rating)) + self.db.commit() + logger.info(f"Calificación actualizada para usuario {user_id}: {db_user.calificacion}") + else: + logger.warning(f"Usuario no encontrado para actualizar calificación: {user_id}") + except Exception as e: + self.db.rollback() + logger.error(f"Error al actualizar calificación del usuario {user_id}: {e}", exc_info=True) + raise def _to_domain(self, db_user: UserModel) -> User: """Convierte un modelo SQLAlchemy a un objeto de dominio"""