import sys import requests import markdown from PyQt6.QtWidgets import ( QApplication, QMainWindow, QWidget, QVBoxLayout, QHBoxLayout, QTextEdit, QTextBrowser, QPushButton, QLabel, QRadioButton, QButtonGroup, QMessageBox, ) from PyQt6.QtCore import Qt, QThread, pyqtSignal from PyQt6.QtGui import QFont from config import ( API_BASE_URL, API_HEALTH_ENDPOINT, API_MESSAGE_ENDPOINT, API_MESSAGE_DOCS_ENDPOINT, WINDOW_WIDTH, WINDOW_HEIGHT, WINDOW_TITLE, REQUEST_TIMEOUT, ) def markdown_to_html(text): """Convierte markdown a HTML con soporte para tablas, títulos, negritas y cursivas""" html = markdown.markdown( text, extensions=['tables', 'sane_lists', 'extra'] ) # Agregar CSS para mejorar la presentación styled_html = f"""
{html} """ return styled_html class APIWorker(QThread): """Thread worker para hacer requests a la API sin bloquear la UI""" response_received = pyqtSignal(str) error_occurred = pyqtSignal(str) finished = pyqtSignal() def __init__(self, endpoint, message, use_streaming=True): super().__init__() self.endpoint = endpoint self.message = message self.use_streaming = use_streaming def run(self): try: if self.use_streaming: self._handle_streaming_response() else: self._handle_normal_response() except requests.exceptions.ConnectionError: self.error_occurred.emit( "Error de conexión: No se puede conectar a la API.\n" f"Asegúrate de que el servidor esté corriendo en {API_BASE_URL}" ) except requests.exceptions.Timeout: self.error_occurred.emit("Error de timeout: La API tardó demasiado en responder.") except Exception as e: self.error_occurred.emit(f"Error: {str(e)}") finally: self.finished.emit() def _handle_streaming_response(self): """Maneja respuestas en streaming""" try: response = requests.post( self.endpoint, params={"message": self.message}, stream=True, timeout=REQUEST_TIMEOUT, ) response.raise_for_status() for chunk in response.iter_content(decode_unicode=True, chunk_size=10): if chunk: self.response_received.emit(chunk) except Exception as e: raise e def _handle_normal_response(self): """Maneja respuestas normales""" response = requests.post( self.endpoint, params={"message": self.message}, timeout=REQUEST_TIMEOUT, ) response.raise_for_status() self.response_received.emit(response.text) class MainWindow(QMainWindow): def __init__(self): super().__init__() self.setWindowTitle(WINDOW_TITLE) self.setGeometry(100, 100, WINDOW_WIDTH, WINDOW_HEIGHT) self.worker = None self.api_status = False self.response_buffer = "" # Buffer para acumular la respuesta self.initUI() self.check_api_status() def initUI(self): """Inicializa la interfaz de usuario""" central_widget = QWidget() self.setCentralWidget(central_widget) main_layout = QVBoxLayout() # Sección de estado status_layout = QHBoxLayout() self.status_label = QLabel("● Estado: Verificando...") self.status_label.setFont(QFont("Arial", 10)) status_layout.addWidget(self.status_label) status_layout.addStretch() refresh_btn = QPushButton("Actualizar estado") refresh_btn.clicked.connect(self.check_api_status) status_layout.addWidget(refresh_btn) main_layout.addLayout(status_layout) # Selector de modo mode_layout = QHBoxLayout() mode_label = QLabel("Modo:") mode_layout.addWidget(mode_label) self.mode_group = QButtonGroup() self.radio_normal = QRadioButton("Mensaje normal") self.radio_docs = QRadioButton("Con documentos") self.radio_normal.setChecked(True) self.mode_group.addButton(self.radio_normal) self.mode_group.addButton(self.radio_docs) mode_layout.addWidget(self.radio_normal) mode_layout.addWidget(self.radio_docs) mode_layout.addStretch() main_layout.addLayout(mode_layout) # Área de entrada input_label = QLabel("Mensaje:") main_layout.addWidget(input_label) self.input_text = QTextEdit() self.input_text.setPlaceholderText("Escribe tu mensaje aquí...") self.input_text.setMaximumHeight(120) main_layout.addWidget(self.input_text) # Botones de acción button_layout = QHBoxLayout() self.send_btn = QPushButton("Enviar") self.send_btn.clicked.connect(self.send_message) self.clear_btn = QPushButton("Limpiar") self.clear_btn.clicked.connect(self.clear_all) button_layout.addWidget(self.send_btn) button_layout.addWidget(self.clear_btn) button_layout.addStretch() main_layout.addLayout(button_layout) # Área de respuesta response_label = QLabel("Respuesta:") main_layout.addWidget(response_label) self.response_text = QTextBrowser() self.response_text.setMarkdown("La respuesta de la API aparecerá aquí...") main_layout.addWidget(self.response_text) central_widget.setLayout(main_layout) def check_api_status(self): """Verifica el estado del API""" try: response = requests.get(API_HEALTH_ENDPOINT, timeout=REQUEST_TIMEOUT) if response.status_code == 200: self.api_status = True self.status_label.setText("● Estado: API conectada ✓") self.status_label.setStyleSheet("color: green;") return except Exception: pass self.api_status = False self.status_label.setText("● Estado: API desconectada ✗") self.status_label.setStyleSheet("color: red;") def send_message(self): """Envía el mensaje a la API""" if not self.api_status: QMessageBox.warning(self, "Error", "La API no está conectada.") return message = self.input_text.toPlainText().strip() if not message: QMessageBox.warning(self, "Advertencia", "Por favor escribe un mensaje.") return # Determinar endpoint use_docs = self.radio_docs.isChecked() endpoint = API_MESSAGE_DOCS_ENDPOINT if use_docs else API_MESSAGE_ENDPOINT # Limpiar respuesta anterior self.response_buffer = "" self.response_text.clear() # Deshabilitar botón de envío self.send_btn.setEnabled(False) self.send_btn.setText("Procesando...") # Crear y ejecutar worker self.worker = APIWorker(endpoint, message, use_streaming=True) self.worker.response_received.connect(self.append_response) self.worker.error_occurred.connect(self.handle_error) self.worker.finished.connect(self.on_request_finished) self.worker.start() def append_response(self, chunk): """Acumula chunks de respuesta y los renderiza como markdown""" self.response_buffer += chunk # Renderizar el markdown con HTML html = markdown_to_html(self.response_buffer) self.response_text.setHtml(html) # Auto-scroll al final scrollbar = self.response_text.verticalScrollBar() scrollbar.setValue(scrollbar.maximum()) def handle_error(self, error_message): """Maneja errores""" error_md = f"**Error:** {error_message}" html = markdown_to_html(error_md) self.response_text.setHtml(html) QMessageBox.critical(self, "Error", error_message) def on_request_finished(self): """Se ejecuta cuando la request termina""" self.send_btn.setEnabled(True) self.send_btn.setText("Enviar") self.check_api_status() def clear_all(self): """Limpia el mensaje y la respuesta""" self.input_text.clear() self.response_buffer = "" self.response_text.clear() self.input_text.setFocus() def main(): app = QApplication(sys.argv) window = MainWindow() window.show() sys.exit(app.exec()) if __name__ == "__main__": main()