error fixing
This commit is contained in:
@@ -52,10 +52,12 @@ class ReportConsumer:
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing report message: {e}", exc_info=True)
|
||||
# Rollback explícito en caso de error
|
||||
self.user_repo.db.rollback()
|
||||
raise
|
||||
|
||||
def _handle_create_report(self, message: ReportMessage):
|
||||
"""Handle report create event"""
|
||||
"""Handle report create event con manejo de transacciones cruzadas"""
|
||||
try:
|
||||
logger.info(f"Creating report: {message.id_reporte} from user {message.id_usuario}")
|
||||
|
||||
@@ -73,20 +75,33 @@ class ReportConsumer:
|
||||
fecha_creacion=fecha_creacion
|
||||
)
|
||||
|
||||
# Save to repository
|
||||
saved_report = self.repo.save(report)
|
||||
logger.info(f"Report created successfully: {message.id_reporte}")
|
||||
try:
|
||||
# Save to MongoDB repository
|
||||
saved_report = self.repo.save(report)
|
||||
logger.info(f"Report created successfully in MongoDB: {message.id_reporte}")
|
||||
except Exception as mongo_error:
|
||||
logger.error(f"Error saving report to MongoDB: {mongo_error}", exc_info=True)
|
||||
raise
|
||||
|
||||
# Increment user's report counter
|
||||
self.user_repo.increment_reports(message.id_usuario)
|
||||
logger.info(f"Incremented report counter for user: {message.id_usuario}")
|
||||
try:
|
||||
# Increment user's report counter in MySQL
|
||||
self.user_repo.increment_reports(message.id_usuario)
|
||||
logger.info(f"Incremented report counter for user: {message.id_usuario}")
|
||||
except Exception as sql_error:
|
||||
logger.error(f"Error incrementing report counter: {sql_error}", exc_info=True)
|
||||
# Rollback SQL transaction
|
||||
self.user_repo.db.rollback()
|
||||
# Note: MongoDB save cannot be rolled back, log for manual cleanup
|
||||
logger.critical(f"INCONSISTENCY: Report {message.id_reporte} saved to MongoDB but user counter not incremented for user {message.id_usuario}")
|
||||
raise
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error creating report: {e}", exc_info=True)
|
||||
self.user_repo.db.rollback()
|
||||
raise
|
||||
|
||||
def _handle_update_visibility(self, message: ReportMessage):
|
||||
"""Handle report visibility update event"""
|
||||
"""Handle report visibility update event con manejo de transacciones"""
|
||||
try:
|
||||
logger.info(f"Updating visibility for report: {message.id_reporte}")
|
||||
|
||||
@@ -96,9 +111,13 @@ class ReportConsumer:
|
||||
logger.warning(f"Report not found: {message.id_reporte}")
|
||||
return
|
||||
|
||||
# Update visibility
|
||||
self.repo.update_visibility(message.id_reporte, message.visibilidad)
|
||||
logger.info(f"Report visibility updated: {message.id_reporte} -> {message.visibilidad}")
|
||||
# Update visibility in MongoDB
|
||||
try:
|
||||
self.repo.update_visibility(message.id_reporte, message.visibilidad)
|
||||
logger.info(f"Report visibility updated: {message.id_reporte} -> {message.visibilidad}")
|
||||
except Exception as mongo_error:
|
||||
logger.error(f"Error updating report visibility in MongoDB: {mongo_error}", exc_info=True)
|
||||
raise
|
||||
|
||||
# Penalize author if visibility is very low (shadowban)
|
||||
if message.penalize_author and message.visibilidad < 20:
|
||||
@@ -109,11 +128,17 @@ class ReportConsumer:
|
||||
new_rating = max(0, user.calificacion - 5)
|
||||
self.user_repo.update_rating(report.id_usuario, new_rating)
|
||||
logger.info(f"Author penalized: user {report.id_usuario} rating reduced to {new_rating}")
|
||||
except Exception as e:
|
||||
logger.error(f"Error penalizing author: {e}")
|
||||
else:
|
||||
logger.warning(f"User not found for penalty: {report.id_usuario}")
|
||||
except Exception as penalty_error:
|
||||
logger.error(f"Error penalizing author: {penalty_error}", exc_info=True)
|
||||
self.user_repo.db.rollback()
|
||||
logger.critical(f"INCONSISTENCY: Report {message.id_reporte} visibility updated but author penalty failed")
|
||||
raise
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error updating report visibility: {e}", exc_info=True)
|
||||
self.user_repo.db.rollback()
|
||||
raise
|
||||
|
||||
def _handle_delete_report(self, message: ReportMessage):
|
||||
@@ -129,6 +154,7 @@ class ReportConsumer:
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error deleting report: {e}", exc_info=True)
|
||||
self.user_repo.db.rollback()
|
||||
raise
|
||||
|
||||
def start(self):
|
||||
@@ -142,6 +168,13 @@ class ReportConsumer:
|
||||
except Exception as e:
|
||||
logger.error(f"Consumer error: {e}", exc_info=True)
|
||||
raise
|
||||
finally:
|
||||
# Asegurar cierre de sesión SQL
|
||||
if self.user_repo.db:
|
||||
try:
|
||||
self.user_repo.db.close()
|
||||
except Exception as e:
|
||||
logger.error(f"Error closing database session: {e}")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
@@ -50,6 +50,8 @@ class UserConsumer:
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing user message: {e}", exc_info=True)
|
||||
# Rollback en caso de error en el procesamiento del mensaje
|
||||
self.repo.db.rollback()
|
||||
raise
|
||||
|
||||
def _handle_create_user(self, message: UserMessage):
|
||||
@@ -75,12 +77,13 @@ class UserConsumer:
|
||||
biografia=message.biografia
|
||||
)
|
||||
|
||||
# Save to repository
|
||||
# Save to repository with transaction handling
|
||||
saved_user = self.repo.save(user)
|
||||
logger.info(f"User created successfully: {saved_user.user_id} - {saved_user.email}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error creating user: {e}", exc_info=True)
|
||||
self.repo.db.rollback()
|
||||
raise
|
||||
|
||||
def _handle_update_user(self, message: UserMessage):
|
||||
@@ -104,12 +107,13 @@ class UserConsumer:
|
||||
if message.biografia is not None:
|
||||
user.biografia = message.biografia
|
||||
|
||||
# Save to repository
|
||||
# Save to repository with transaction handling
|
||||
updated_user = self.repo.update(user)
|
||||
logger.info(f"User updated successfully: {message.user_id}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error updating user: {e}", exc_info=True)
|
||||
self.repo.db.rollback()
|
||||
raise
|
||||
|
||||
def _handle_delete_user(self, message: UserMessage):
|
||||
@@ -125,6 +129,7 @@ class UserConsumer:
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error deleting user: {e}", exc_info=True)
|
||||
self.repo.db.rollback()
|
||||
raise
|
||||
|
||||
def start(self):
|
||||
@@ -138,6 +143,13 @@ class UserConsumer:
|
||||
except Exception as e:
|
||||
logger.error(f"Consumer error: {e}", exc_info=True)
|
||||
raise
|
||||
finally:
|
||||
# Asegurar cierre de sesión
|
||||
if self.repo.db:
|
||||
try:
|
||||
self.repo.db.close()
|
||||
except Exception as e:
|
||||
logger.error(f"Error closing database session: {e}")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
||||
@@ -14,9 +14,13 @@ SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
|
||||
Base = declarative_base()
|
||||
|
||||
def get_db():
|
||||
"""Obtiene una sesión de base de datos"""
|
||||
"""Obtiene una sesión de base de datos con manejo de rollback"""
|
||||
db = SessionLocal()
|
||||
try:
|
||||
yield db
|
||||
db.commit() # Commit explícito si no hubo errores
|
||||
except Exception as e:
|
||||
db.rollback() # Rollback si ocurre cualquier error
|
||||
raise e
|
||||
finally:
|
||||
db.close()
|
||||
db.close() # Siempre cerrar la sesión
|
||||
|
||||
@@ -3,6 +3,9 @@ from domain.users import User
|
||||
from infrastructure.adapters.persistence.models import UserModel
|
||||
from infrastructure.adapters.persistence.db import SessionLocal
|
||||
from typing import List, Optional
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class UserRepositorySQL(UserRepository):
|
||||
"""Implementación del repositorio de Usuarios usando SQLAlchemy (MySQL)"""
|
||||
@@ -11,48 +14,68 @@ class UserRepositorySQL(UserRepository):
|
||||
self.db = db_session or SessionLocal()
|
||||
|
||||
def save(self, user: User) -> User:
|
||||
"""Guarda un nuevo usuario"""
|
||||
db_user = UserModel(
|
||||
nombre=user.nombre,
|
||||
apellido=user.apellido,
|
||||
email=user.email,
|
||||
fecha_nacimiento=user.fecha_nacimiento,
|
||||
fecha_creacion=user.fecha_creacion,
|
||||
calificacion=user.calificacion,
|
||||
numero_reportes=user.numero_reportes,
|
||||
url_foto_perfil=user.url_foto_perfil,
|
||||
biografia=user.biografia
|
||||
)
|
||||
self.db.add(db_user)
|
||||
self.db.commit()
|
||||
self.db.refresh(db_user)
|
||||
|
||||
# Convertir de vuelta a dominio
|
||||
return self._to_domain(db_user)
|
||||
"""Guarda un nuevo usuario con manejo de transacciones"""
|
||||
try:
|
||||
db_user = UserModel(
|
||||
nombre=user.nombre,
|
||||
apellido=user.apellido,
|
||||
email=user.email,
|
||||
fecha_nacimiento=user.fecha_nacimiento,
|
||||
fecha_creacion=user.fecha_creacion,
|
||||
calificacion=user.calificacion,
|
||||
numero_reportes=user.numero_reportes,
|
||||
url_foto_perfil=user.url_foto_perfil,
|
||||
biografia=user.biografia
|
||||
)
|
||||
self.db.add(db_user)
|
||||
self.db.commit()
|
||||
self.db.refresh(db_user)
|
||||
logger.info(f"Usuario guardado exitosamente: {db_user.user_id}")
|
||||
return self._to_domain(db_user)
|
||||
except Exception as e:
|
||||
self.db.rollback()
|
||||
logger.error(f"Error al guardar usuario: {e}", exc_info=True)
|
||||
raise
|
||||
|
||||
def find_by_id(self, user_id: int) -> Optional[User]:
|
||||
"""Obtiene un usuario por ID"""
|
||||
db_user = self.db.query(UserModel).filter(UserModel.user_id == user_id).first()
|
||||
if db_user:
|
||||
return self._to_domain(db_user)
|
||||
return None
|
||||
try:
|
||||
db_user = self.db.query(UserModel).filter(UserModel.user_id == user_id).first()
|
||||
if db_user:
|
||||
return self._to_domain(db_user)
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.error(f"Error al buscar usuario por ID {user_id}: {e}", exc_info=True)
|
||||
raise
|
||||
|
||||
def find_by_email(self, email: str) -> Optional[User]:
|
||||
"""Obtiene un usuario por email"""
|
||||
db_user = self.db.query(UserModel).filter(UserModel.email == email).first()
|
||||
if db_user:
|
||||
return self._to_domain(db_user)
|
||||
return None
|
||||
try:
|
||||
db_user = self.db.query(UserModel).filter(UserModel.email == email).first()
|
||||
if db_user:
|
||||
return self._to_domain(db_user)
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.error(f"Error al buscar usuario por email {email}: {e}", exc_info=True)
|
||||
raise
|
||||
|
||||
def find_all(self) -> List[User]:
|
||||
"""Obtiene todos los usuarios"""
|
||||
db_users = self.db.query(UserModel).all()
|
||||
return [self._to_domain(user) for user in db_users]
|
||||
try:
|
||||
db_users = self.db.query(UserModel).all()
|
||||
return [self._to_domain(user) for user in db_users]
|
||||
except Exception as e:
|
||||
logger.error(f"Error al obtener todos los usuarios: {e}", exc_info=True)
|
||||
raise
|
||||
|
||||
def update(self, user: User) -> User:
|
||||
"""Actualiza un usuario"""
|
||||
db_user = self.db.query(UserModel).filter(UserModel.user_id == user.user_id).first()
|
||||
if db_user:
|
||||
"""Actualiza un usuario con manejo de transacciones"""
|
||||
try:
|
||||
db_user = self.db.query(UserModel).filter(UserModel.user_id == user.user_id).first()
|
||||
if not db_user:
|
||||
logger.warning(f"Usuario no encontrado para actualizar: {user.user_id}")
|
||||
return user
|
||||
|
||||
db_user.nombre = user.nombre
|
||||
db_user.apellido = user.apellido
|
||||
db_user.calificacion = user.calificacion
|
||||
@@ -61,32 +84,59 @@ class UserRepositorySQL(UserRepository):
|
||||
db_user.biografia = user.biografia
|
||||
self.db.commit()
|
||||
self.db.refresh(db_user)
|
||||
logger.info(f"Usuario actualizado exitosamente: {user.user_id}")
|
||||
return self._to_domain(db_user)
|
||||
return user
|
||||
except Exception as e:
|
||||
self.db.rollback()
|
||||
logger.error(f"Error al actualizar usuario {user.user_id}: {e}", exc_info=True)
|
||||
raise
|
||||
|
||||
def delete(self, user_id: int) -> bool:
|
||||
"""Elimina un usuario"""
|
||||
db_user = self.db.query(UserModel).filter(UserModel.user_id == user_id).first()
|
||||
if db_user:
|
||||
self.db.delete(db_user)
|
||||
self.db.commit()
|
||||
return True
|
||||
return False
|
||||
"""Elimina un usuario con manejo de transacciones"""
|
||||
try:
|
||||
db_user = self.db.query(UserModel).filter(UserModel.user_id == user_id).first()
|
||||
if db_user:
|
||||
self.db.delete(db_user)
|
||||
self.db.commit()
|
||||
logger.info(f"Usuario eliminado exitosamente: {user_id}")
|
||||
return True
|
||||
logger.warning(f"Usuario no encontrado para eliminar: {user_id}")
|
||||
return False
|
||||
except Exception as e:
|
||||
self.db.rollback()
|
||||
logger.error(f"Error al eliminar usuario {user_id}: {e}", exc_info=True)
|
||||
raise
|
||||
|
||||
def increment_reports(self, user_id: int) -> None:
|
||||
"""Incrementa el contador de reportes de un usuario"""
|
||||
db_user = self.db.query(UserModel).filter(UserModel.user_id == user_id).first()
|
||||
if db_user:
|
||||
db_user.numero_reportes += 1
|
||||
self.db.commit()
|
||||
"""Incrementa el contador de reportes con manejo de transacciones"""
|
||||
try:
|
||||
db_user = self.db.query(UserModel).filter(UserModel.user_id == user_id).first()
|
||||
if db_user:
|
||||
db_user.numero_reportes += 1
|
||||
self.db.commit()
|
||||
logger.info(f"Contador de reportes incrementado para usuario: {user_id}")
|
||||
else:
|
||||
logger.warning(f"Usuario no encontrado para incrementar reportes: {user_id}")
|
||||
except Exception as e:
|
||||
self.db.rollback()
|
||||
logger.error(f"Error al incrementar reportes del usuario {user_id}: {e}", exc_info=True)
|
||||
raise
|
||||
|
||||
def update_rating(self, user_id: int, new_rating: float) -> None:
|
||||
"""Actualiza la calificación de un usuario"""
|
||||
db_user = self.db.query(UserModel).filter(UserModel.user_id == user_id).first()
|
||||
if db_user:
|
||||
# Asegurar que la calificación esté en el rango 0-100
|
||||
db_user.calificacion = max(0, min(100, new_rating))
|
||||
self.db.commit()
|
||||
"""Actualiza la calificación de un usuario con manejo de transacciones"""
|
||||
try:
|
||||
db_user = self.db.query(UserModel).filter(UserModel.user_id == user_id).first()
|
||||
if db_user:
|
||||
# Asegurar que la calificación esté en el rango 0-100
|
||||
db_user.calificacion = max(0, min(100, new_rating))
|
||||
self.db.commit()
|
||||
logger.info(f"Calificación actualizada para usuario {user_id}: {db_user.calificacion}")
|
||||
else:
|
||||
logger.warning(f"Usuario no encontrado para actualizar calificación: {user_id}")
|
||||
except Exception as e:
|
||||
self.db.rollback()
|
||||
logger.error(f"Error al actualizar calificación del usuario {user_id}: {e}", exc_info=True)
|
||||
raise
|
||||
|
||||
def _to_domain(self, db_user: UserModel) -> User:
|
||||
"""Convierte un modelo SQLAlchemy a un objeto de dominio"""
|
||||
|
||||
Reference in New Issue
Block a user