from sqlalchemy.orm import Session from sqlalchemy.exc import IntegrityError from domain.MicroservicioA import MicroservicioA from application.ports.microservicio_a_repository import MicroservicioARepository from infrastructure.adapters.persistence.db import SessionLocal from infrastructure.adapters.persistence.models import MsaModel from application.exceptions import MicroserviceAlreadyExists class SqlMicroservicioARepository(MicroservicioARepository): def __init__(self): self.db: Session = SessionLocal() def save(self, msa: MicroservicioA): model = MsaModel( perfiles=msa.perfiles, idioma=msa.idioma, calidad=msa.calidad ) self.db.add(model) try: self.db.commit() except IntegrityError: self.db.rollback() raise MicroserviceAlreadyExists("There's already one registry with that data.") self.db.refresh(model) return MicroservicioA( msa_id = model.msa_id, perfiles=model.perfiles, idioma=model.idioma, calidad=model.calidad ) def viewAll(self) -> list[MicroservicioA]: models = self.db.query(MsaModel).all() return [ MicroservicioA( msa_id=model.msa_id, perfiles=model.perfiles, idioma=model.idioma, calidad=model.calidad ) for model in models ] def viewById(self, msa_id: int) -> MicroservicioA | None: model = self.db.query(MsaModel).filter(MsaModel.msa_id == msa_id).first() if not model: return None return MicroservicioA( msa_id=model.msa_id, perfiles=model.perfiles, idioma=model.idioma, calidad=model.calidad ) def remove(self, msa_id: int): model = self.db.query(MsaModel).filter(MsaModel.msa_id == msa_id).first() if not model: return False try: self.db.delete(model) self.db.commit() return True except IntegrityError: self.db.rollback() raise MicroserviceAlreadyExists("There's already one registry with that data.") def editById(self, msa_id: int, msa: MicroservicioA) -> MicroservicioA | None: model = self.db.query(MsaModel).filter(MsaModel.msa_id == msa_id).first() if not model: return None try: model.perfiles = msa.perfiles model.idioma = msa.idioma model.calidad = msa.calidad self.db.commit() self.db.refresh(model) return MicroservicioA( msa_id=model.msa_id, perfiles=model.perfiles, idioma=model.idioma, calidad=model.calidad ) except IntegrityError: self.db.rollback() raise MicroserviceAlreadyExists("There's already one registry with that data.")