Skip to content

Kern-API

Diese Seite enthält die Referenz für die Kernkomponenten von text2speech.

Text2Speech

text2speech.text2speech.Text2Speech

Text-to-Speech (TTS) class with configurable settings.

This class provides text-to-speech functionality using either ElevenLabs or Kokoro model with configuration support via YAML files.

Source code in text2speech/text2speech.py
class Text2Speech:
    """Text-to-Speech (TTS) class with configurable settings.

    This class provides text-to-speech functionality using either ElevenLabs
    or Kokoro model with configuration support via YAML files.
    """

    def __init__(
        self,
        el_api_key: Optional[str] = None,
        verbose: Optional[bool] = None,
        config_path: Optional[str] = None,
        config: Optional[Config] = None,
        enable_queue: bool = True,
        max_queue_size: int = DEFAULT_QUEUE_SIZE,
        duplicate_timeout: float = DEFAULT_DUPLICATE_TIMEOUT,
    ) -> None:
        """Initialize the Text2Speech instance.

        Args:
            el_api_key (Optional[str]): API key for ElevenLabs.
            verbose (Optional[bool]): If True, prints debug info. Overrides config if set.
            config_path (Optional[str]): Path to config.yaml file.
            config (Optional[Config]): Pre-loaded Config object.
            enable_queue (bool): If True, uses AudioQueueManager for thread-safe playback.
            max_queue_size (int): Maximum queued messages.
            duplicate_timeout (float): Skip duplicate messages within this window (seconds).
        """
        # Load configuration
        if config is not None:
            self.config = config
        else:
            self.config = Config(config_path)

        # Override verbose setting if explicitly provided
        self._verbose = verbose if verbose is not None else self.config.verbose

        # Setup logging
        self._setup_logging()

        # Store API key and validate
        self._el_api_key = el_api_key
        self._use_elevenlabs = self._validate_elevenlabs_key(el_api_key)

        # Initialize TTS engine
        self._engine: Optional[TTSEngine] = None
        self._initialize_tts_engine()

        # Set audio output device from config
        self._setup_audio_device()

        # Initialize audio queue manager
        self._enable_queue = enable_queue
        self._audio_queue: Optional[AudioQueueManager] = None

        if enable_queue:
            self._audio_queue = AudioQueueManager(
                tts_callable=self.speak_sync,
                max_queue_size=max_queue_size,
                duplicate_timeout=duplicate_timeout,
                logger=self.logger,
            )
            self._audio_queue.start()
            self.logger.info("Audio queue manager enabled")

    def _setup_logging(self) -> None:
        """Setup logging configuration with sensitive data filter."""
        log_level = self.config.get("logging.log_level", "INFO")
        self.logger = logging.getLogger("text2speech")
        self.logger.setLevel(getattr(logging, log_level))

        if not self.logger.handlers:
            formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
            console_handler = logging.StreamHandler()
            console_handler.setFormatter(formatter)
            self.logger.addHandler(console_handler)

        # Add sensitive data filter to all handlers
        sensitive_filter = SensitiveDataFilter()
        for handler in self.logger.handlers:
            handler.addFilter(sensitive_filter)

        log_file = self.config.get("logging.log_file")
        if log_file:
            file_handler = logging.FileHandler(log_file)
            file_handler.setFormatter(logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s"))
            file_handler.addFilter(sensitive_filter)
            self.logger.addHandler(file_handler)

    def _setup_audio_device(self) -> None:
        """Configure the audio output device."""
        if HAS_SOUNDDEVICE and sd is not None:
            device_id = self.config.audio_output_device
            if device_id is not None:
                try:
                    sd.default.device[1] = device_id
                    self.logger.info(f"Audio output device set to: {device_id}")
                except Exception as e:
                    self.logger.error(f"Failed to set audio device {device_id}: {e}")
                    raise AudioDeviceError(f"Invalid audio device: {device_id}") from e

    def _validate_elevenlabs_key(self, api_key: Optional[str]) -> bool:
        """Validate ElevenLabs API key format."""
        if not api_key or not isinstance(api_key, str):
            return False
        return api_key.startswith(API_KEY_PREFIX) and len(api_key) >= MIN_API_KEY_LENGTH

    def _initialize_tts_engine(self) -> None:
        """Initialize the TTS engine with fallback."""
        if self._use_elevenlabs:
            try:
                model = self.config.get("tts.elevenlabs.model", "eleven_multilingual_v2")
                self._engine = ElevenLabsEngine(api_key=self._el_api_key, model=model)  # type: ignore
                self.logger.info("Initialized ElevenLabs TTS")
                return
            except Exception as e:
                self.logger.warning(f"ElevenLabs initialization failed: {e}. Falling back to Kokoro.")

        try:
            self._engine = KokoroEngine(lang_code=self.config.kokoro_lang_code)
            self.logger.info("Initialized Kokoro TTS")
        except Exception as e:
            self.logger.error(f"Failed to initialize any TTS engine: {e}")
            raise TTSEngineNotAvailable("No TTS engine available") from e

    def speak(self, text: str, priority: int = 0, blocking: bool = False) -> bool:
        """Queue text for speech synthesis.

        Args:
            text (str): Text to speak.
            priority (int): Priority level (0-100).
            blocking (bool): If True, wait for speech to complete.

        Returns:
            bool: True if successfully queued/spoken.
        """
        if self._audio_queue and self._enable_queue:
            success = self._audio_queue.enqueue(text, priority=priority)
            if blocking and success:
                self._wait_for_queue()
            return success

        if blocking:
            self.speak_sync(text)
            return True

        thread = threading.Thread(target=self.speak_sync, args=(text,))
        thread.start()
        return True

    def _wait_for_queue(self) -> None:
        """Wait for the audio queue to be empty."""
        import time

        if self._audio_queue:
            while not self._audio_queue._queue.empty():
                time.sleep(0.1)
            # Wait a bit more for the last message to finish playing
            time.sleep(0.5)

    def speak_sync(self, text: str) -> None:
        """Synchronous TTS call.

        Args:
            text (str): Text to speak.
        """
        if not self._engine:
            self.logger.error("No TTS engine initialized")
            return

        try:
            voice = (
                self.config.kokoro_voice
                if isinstance(self._engine, KokoroEngine)
                else self.config.get("tts.elevenlabs.voice", "Brian")
            )
            speed = self.config.kokoro_speed

            self.logger.debug(f"Synthesizing: {text[:50]}...")
            for _, _, audio in self._engine.synthesize(text, voice=voice, speed=speed):
                self._play_audio_safely(audio, original_sample_rate=self.config.sample_rate, volume=self.config.audio_volume)
                if HAS_SOUNDDEVICE and sd:
                    sd.wait()
        except Exception as e:
            self.logger.error(f"Speech synthesis error: {e}")

    @staticmethod
    def _play_audio_safely(
        audio_tensor: torch.Tensor,
        original_sample_rate: int = 24000,
        device: Optional[int] = None,
        volume: float = DEFAULT_VOLUME,
    ) -> None:
        """Play audio safely with resampling and volume control.

        Args:
            audio_tensor (torch.Tensor): Audio data to play.
            original_sample_rate (int): Sample rate of the audio data.
            device (Optional[int]): Audio device ID to use.
            volume (float): Playback volume (0.0 to 1.0).
        """
        if not HAS_SOUNDDEVICE or sd is None:
            logging.getLogger("text2speech").warning("sounddevice not available, skipping playback")
            return

        try:
            if device is None:
                device = sd.default.device[1]

            device_info = sd.query_devices(device, "output")
            supported_rate = int(device_info["default_samplerate"])

            if original_sample_rate != supported_rate:
                resampler = torchaudio.transforms.Resample(orig_freq=original_sample_rate, new_freq=supported_rate)
                audio_tensor = resampler(audio_tensor)

            peak = torch.abs(audio_tensor).max()
            if peak > 0:
                audio_tensor = audio_tensor / peak
            audio_tensor = torch.clamp(audio_tensor * volume, -0.95, 0.95)

            sd.play(audio_tensor.cpu().numpy(), samplerate=supported_rate, device=device)
        except Exception as e:
            logging.getLogger("text2speech").error(f"Audio playback error: {e}")

    # Deprecated methods
    def call_text2speech_async(self, text: str) -> threading.Thread:
        """Deprecated: Use speak(blocking=False) instead."""
        warnings.warn("call_text2speech_async is deprecated, use speak(blocking=False)", DeprecationWarning, stacklevel=2)
        thread = threading.Thread(target=self.speak_sync, args=(text,))
        thread.start()
        return thread

    def call_text2speech(self, text: str) -> None:
        """Deprecated: Use speak(blocking=True) instead."""
        warnings.warn("call_text2speech is deprecated, use speak(blocking=True)", DeprecationWarning, stacklevel=2)
        self.speak_sync(text)

    def shutdown(self, timeout: float = 5.0) -> None:
        """Shutdown the TTS system.

        Args:
            timeout (float): Maximum seconds to wait for shutdown.
        """
        if self._audio_queue:
            self._audio_queue.shutdown(timeout=timeout)

    def __del__(self) -> None:
        """Destructor to ensure cleanup of resources."""
        self.shutdown()

    def set_voice(self, voice: str) -> None:
        """Set the voice for TTS.

        Args:
            voice (str): Name of the voice to use.
        """
        self.config.set("tts.kokoro.voice", voice)
        self.config.set("tts.elevenlabs.voice", voice)
        self.logger.info(f"Voice changed to: {voice}")

    def set_speed(self, speed: float) -> None:
        """Set the speech speed.

        Args:
            speed (float): Speech speed (0.5 to 2.0).
        """
        if 0.5 <= speed <= 2.0:
            self.config.set("tts.kokoro.speed", speed)
            self.logger.info(f"Speed changed to: {speed}")
        else:
            self.logger.warning(f"Speed {speed} out of range (0.5-2.0)")

    def set_volume(self, volume: float) -> None:
        """Set the playback volume.

        Args:
            volume (float): Playback volume (0.0 to 1.0).
        """
        if 0.0 <= volume <= 1.0:
            self.config.set("audio.default_volume", volume)
            self.logger.info(f"Volume changed to: {volume}")
        else:
            self.logger.warning(f"Volume {volume} out of range (0.0-1.0)")

    def get_available_devices(self) -> List[Dict[str, Any]]:
        """Get available audio devices.

        Returns:
            List[Dict[str, Any]]: List of available audio devices.
        """
        if HAS_SOUNDDEVICE and sd:
            return list(sd.query_devices())
        return []

    def is_using_elevenlabs(self) -> bool:
        """Check if using ElevenLabs."""
        if not self._engine:
            return False
        # Handle both real classes and mocks
        return "ElevenLabsEngine" in str(self._engine) or "ElevenLabsEngine" in str(type(self._engine))

    def get_queue_stats(self) -> Dict[str, Any]:
        """Get queue statistics.

        Returns:
            Dict[str, Any]: Dictionary containing queue statistics.
        """
        if self._audio_queue:
            return dict(self._audio_queue.get_stats())
        return {}

__init__(el_api_key=None, verbose=None, config_path=None, config=None, enable_queue=True, max_queue_size=DEFAULT_QUEUE_SIZE, duplicate_timeout=DEFAULT_DUPLICATE_TIMEOUT)

Initialize the Text2Speech instance.

Parameters:

Name Type Description Default
el_api_key Optional[str]

API key for ElevenLabs.

None
verbose Optional[bool]

If True, prints debug info. Overrides config if set.

None
config_path Optional[str]

Path to config.yaml file.

None
config Optional[Config]

Pre-loaded Config object.

None
enable_queue bool

If True, uses AudioQueueManager for thread-safe playback.

True
max_queue_size int

Maximum queued messages.

DEFAULT_QUEUE_SIZE
duplicate_timeout float

Skip duplicate messages within this window (seconds).

DEFAULT_DUPLICATE_TIMEOUT
Source code in text2speech/text2speech.py
def __init__(
    self,
    el_api_key: Optional[str] = None,
    verbose: Optional[bool] = None,
    config_path: Optional[str] = None,
    config: Optional[Config] = None,
    enable_queue: bool = True,
    max_queue_size: int = DEFAULT_QUEUE_SIZE,
    duplicate_timeout: float = DEFAULT_DUPLICATE_TIMEOUT,
) -> None:
    """Initialize the Text2Speech instance.

    Args:
        el_api_key (Optional[str]): API key for ElevenLabs.
        verbose (Optional[bool]): If True, prints debug info. Overrides config if set.
        config_path (Optional[str]): Path to config.yaml file.
        config (Optional[Config]): Pre-loaded Config object.
        enable_queue (bool): If True, uses AudioQueueManager for thread-safe playback.
        max_queue_size (int): Maximum queued messages.
        duplicate_timeout (float): Skip duplicate messages within this window (seconds).
    """
    # Load configuration
    if config is not None:
        self.config = config
    else:
        self.config = Config(config_path)

    # Override verbose setting if explicitly provided
    self._verbose = verbose if verbose is not None else self.config.verbose

    # Setup logging
    self._setup_logging()

    # Store API key and validate
    self._el_api_key = el_api_key
    self._use_elevenlabs = self._validate_elevenlabs_key(el_api_key)

    # Initialize TTS engine
    self._engine: Optional[TTSEngine] = None
    self._initialize_tts_engine()

    # Set audio output device from config
    self._setup_audio_device()

    # Initialize audio queue manager
    self._enable_queue = enable_queue
    self._audio_queue: Optional[AudioQueueManager] = None

    if enable_queue:
        self._audio_queue = AudioQueueManager(
            tts_callable=self.speak_sync,
            max_queue_size=max_queue_size,
            duplicate_timeout=duplicate_timeout,
            logger=self.logger,
        )
        self._audio_queue.start()
        self.logger.info("Audio queue manager enabled")

speak(text, priority=0, blocking=False)

Queue text for speech synthesis.

Parameters:

Name Type Description Default
text str

Text to speak.

required
priority int

Priority level (0-100).

0
blocking bool

If True, wait for speech to complete.

False

Returns:

Name Type Description
bool bool

True if successfully queued/spoken.

Source code in text2speech/text2speech.py
def speak(self, text: str, priority: int = 0, blocking: bool = False) -> bool:
    """Queue text for speech synthesis.

    Args:
        text (str): Text to speak.
        priority (int): Priority level (0-100).
        blocking (bool): If True, wait for speech to complete.

    Returns:
        bool: True if successfully queued/spoken.
    """
    if self._audio_queue and self._enable_queue:
        success = self._audio_queue.enqueue(text, priority=priority)
        if blocking and success:
            self._wait_for_queue()
        return success

    if blocking:
        self.speak_sync(text)
        return True

    thread = threading.Thread(target=self.speak_sync, args=(text,))
    thread.start()
    return True

speak_sync(text)

Synchronous TTS call.

Parameters:

Name Type Description Default
text str

Text to speak.

required
Source code in text2speech/text2speech.py
def speak_sync(self, text: str) -> None:
    """Synchronous TTS call.

    Args:
        text (str): Text to speak.
    """
    if not self._engine:
        self.logger.error("No TTS engine initialized")
        return

    try:
        voice = (
            self.config.kokoro_voice
            if isinstance(self._engine, KokoroEngine)
            else self.config.get("tts.elevenlabs.voice", "Brian")
        )
        speed = self.config.kokoro_speed

        self.logger.debug(f"Synthesizing: {text[:50]}...")
        for _, _, audio in self._engine.synthesize(text, voice=voice, speed=speed):
            self._play_audio_safely(audio, original_sample_rate=self.config.sample_rate, volume=self.config.audio_volume)
            if HAS_SOUNDDEVICE and sd:
                sd.wait()
    except Exception as e:
        self.logger.error(f"Speech synthesis error: {e}")

set_voice(voice)

Set the voice for TTS.

Parameters:

Name Type Description Default
voice str

Name of the voice to use.

required
Source code in text2speech/text2speech.py
def set_voice(self, voice: str) -> None:
    """Set the voice for TTS.

    Args:
        voice (str): Name of the voice to use.
    """
    self.config.set("tts.kokoro.voice", voice)
    self.config.set("tts.elevenlabs.voice", voice)
    self.logger.info(f"Voice changed to: {voice}")

set_speed(speed)

Set the speech speed.

Parameters:

Name Type Description Default
speed float

Speech speed (0.5 to 2.0).

required
Source code in text2speech/text2speech.py
def set_speed(self, speed: float) -> None:
    """Set the speech speed.

    Args:
        speed (float): Speech speed (0.5 to 2.0).
    """
    if 0.5 <= speed <= 2.0:
        self.config.set("tts.kokoro.speed", speed)
        self.logger.info(f"Speed changed to: {speed}")
    else:
        self.logger.warning(f"Speed {speed} out of range (0.5-2.0)")

set_volume(volume)

Set the playback volume.

Parameters:

Name Type Description Default
volume float

Playback volume (0.0 to 1.0).

required
Source code in text2speech/text2speech.py
def set_volume(self, volume: float) -> None:
    """Set the playback volume.

    Args:
        volume (float): Playback volume (0.0 to 1.0).
    """
    if 0.0 <= volume <= 1.0:
        self.config.set("audio.default_volume", volume)
        self.logger.info(f"Volume changed to: {volume}")
    else:
        self.logger.warning(f"Volume {volume} out of range (0.0-1.0)")

shutdown(timeout=5.0)

Shutdown the TTS system.

Parameters:

Name Type Description Default
timeout float

Maximum seconds to wait for shutdown.

5.0
Source code in text2speech/text2speech.py
def shutdown(self, timeout: float = 5.0) -> None:
    """Shutdown the TTS system.

    Args:
        timeout (float): Maximum seconds to wait for shutdown.
    """
    if self._audio_queue:
        self._audio_queue.shutdown(timeout=timeout)

get_available_devices()

Get available audio devices.

Returns:

Type Description
List[Dict[str, Any]]

List[Dict[str, Any]]: List of available audio devices.

Source code in text2speech/text2speech.py
def get_available_devices(self) -> List[Dict[str, Any]]:
    """Get available audio devices.

    Returns:
        List[Dict[str, Any]]: List of available audio devices.
    """
    if HAS_SOUNDDEVICE and sd:
        return list(sd.query_devices())
    return []

get_queue_stats()

Get queue statistics.

Returns:

Type Description
Dict[str, Any]

Dict[str, Any]: Dictionary containing queue statistics.

Source code in text2speech/text2speech.py
def get_queue_stats(self) -> Dict[str, Any]:
    """Get queue statistics.

    Returns:
        Dict[str, Any]: Dictionary containing queue statistics.
    """
    if self._audio_queue:
        return dict(self._audio_queue.get_stats())
    return {}

AudioQueueManager

text2speech.audio_queue.AudioQueueManager

Thread-safe audio queue manager that serializes TTS playback.

Features
  • Single worker thread for sequential audio playback
  • Priority queue for urgent messages
  • Automatic cleanup on shutdown
  • Skip duplicate messages within timeout
  • Non-blocking queueing
Source code in text2speech/audio_queue.py
class AudioQueueManager:
    """Thread-safe audio queue manager that serializes TTS playback.

    Features:
        - Single worker thread for sequential audio playback
        - Priority queue for urgent messages
        - Automatic cleanup on shutdown
        - Skip duplicate messages within timeout
        - Non-blocking queueing
    """

    def __init__(
        self,
        tts_callable: Callable[[str], None],
        max_queue_size: int = 50,
        duplicate_timeout: float = 2.0,
        logger: Optional[logging.Logger] = None,
    ) -> None:
        """Initialize the audio queue manager.

        Args:
            tts_callable: Synchronous function that performs TTS (blocks until done).
            max_queue_size: Maximum queued messages (older discarded if full).
            duplicate_timeout: Skip duplicate messages within this window (seconds).
            logger: Optional logger instance (creates one if None).
        """
        self._tts_callable: Callable[[str], None] = tts_callable
        self._max_queue_size: int = max_queue_size
        self._duplicate_timeout: float = duplicate_timeout

        # Logging
        self._logger: logging.Logger = logger or logging.getLogger(__name__)

        # Priority queue (uses __lt__ from AudioTask for ordering)
        self._queue: queue.PriorityQueue[AudioTask] = queue.PriorityQueue(maxsize=max_queue_size)

        # Worker thread
        self._worker_thread: Optional[threading.Thread] = None
        self._shutdown_event: threading.Event = threading.Event()

        # Recent messages tracking (for duplicate detection) using TTL cache
        self._recent_messages: TTLCache[str, float] = TTLCache(maxsize=MAX_RECENT_MESSAGES, ttl=duplicate_timeout)
        self._recent_lock: threading.Lock = threading.Lock()

        # Statistics
        self._stats: Dict[str, int] = {
            "messages_queued": 0,
            "messages_played": 0,
            "messages_skipped_duplicate": 0,
            "messages_skipped_full": 0,
            "errors": 0,
        }
        self._stats_lock: threading.Lock = threading.Lock()

    def start(self) -> None:
        """Start the worker thread."""
        if self._worker_thread is not None and self._worker_thread.is_alive():
            self._logger.warning("Worker thread already running")
            return

        self._shutdown_event.clear()
        self._worker_thread = threading.Thread(target=self._worker_loop, daemon=True, name="AudioQueueWorker")
        self._worker_thread.start()
        self._logger.debug("Audio queue manager started")

    def shutdown(self, timeout: float = 5.0) -> None:
        """Stop the worker thread and wait for completion.

        Args:
            timeout: Maximum seconds to wait for shutdown.
        """
        if self._worker_thread is None:
            return

        self._logger.debug("Shutting down audio queue manager...")
        self._shutdown_event.set()

        # Signal queue to unblock
        try:
            # Add a sentinel task to wake up worker
            self._queue.put_nowait(AudioTask("", priority=-1000))
        except queue.Full:
            pass

        self._worker_thread.join(timeout=timeout)

        if self._worker_thread.is_alive():
            self._logger.warning("Worker thread did not shut down cleanly")
        else:
            self._logger.debug("Audio queue manager shut down successfully")

        # Log final stats
        self._log_statistics()

    def enqueue(self, text: str, priority: int = 0) -> bool:
        """Queue a message for audio playback (non-blocking).

        Args:
            text: Message to speak.
            priority: Priority (higher = more urgent, range 0-100).

        Returns:
            bool: True if queued successfully, False if skipped/failed.
        """
        if not text or not text.strip():
            return False

        # Check for duplicates
        if self._is_duplicate(text):
            with self._stats_lock:
                self._stats["messages_skipped_duplicate"] += 1
            self._logger.debug(f"Skipped duplicate: {text[:50]}")
            return False

        # Create task
        task = AudioTask(text=text, priority=priority)

        # Try to queue (non-blocking)
        try:
            self._queue.put_nowait(task)

            with self._stats_lock:
                self._stats["messages_queued"] += 1

            # Track recent message
            self._track_message(text)

            self._logger.debug(f"Queued (priority={priority}): {text[:50]}")
            return True

        except queue.Full:
            # Queue full - log and skip
            with self._stats_lock:
                self._stats["messages_skipped_full"] += 1
            self._logger.warning(f"Queue full, skipped: {text[:50]}")
            return False

    def clear_queue(self) -> None:
        """Clear all pending messages from queue."""
        cleared = 0
        try:
            while True:
                self._queue.get_nowait()
                cleared += 1
        except queue.Empty:
            pass

        if cleared > 0:
            self._logger.info(f"Cleared {cleared} messages from queue")

    def get_stats(self) -> MappingProxyType[str, int]:
        """Get playback statistics."""
        with self._stats_lock:
            return MappingProxyType(self._stats.copy())

    def is_running(self) -> bool:
        """Check if worker thread is active."""
        return self._worker_thread is not None and self._worker_thread.is_alive()

    # Private methods

    def _worker_loop(self) -> None:
        """Worker thread main loop."""
        self._logger.debug("Worker thread started")

        while not self._shutdown_event.is_set():
            try:
                # Get next task (blocking with timeout)
                task = self._queue.get(timeout=1.0)

                # Check if shutdown signal or sentinel
                if self._shutdown_event.is_set() or task.priority == -1000:
                    self._queue.task_done()
                    break

                # Play audio (blocking)
                self._play_audio(task.text)

                # Mark task as done
                self._queue.task_done()

            except queue.Empty:
                # Timeout - check shutdown flag and continue
                continue
            except Exception as e:
                self._logger.error(f"Worker error: {e}", exc_info=True)
                with self._stats_lock:
                    self._stats["errors"] += 1

        self._logger.debug("Worker thread exiting")

    def _play_audio(self, text: str) -> None:
        """Play audio using TTS callable.

        Args:
            text: Message to speak.
        """
        try:
            self._logger.debug(f"Playing: {text[:50]}")

            # Call TTS function (blocking)
            self._tts_callable(text)

            with self._stats_lock:
                self._stats["messages_played"] += 1

            self._logger.debug("Playback complete")

        except Exception as e:
            self._logger.error(f"TTS error: {e}", exc_info=True)
            with self._stats_lock:
                self._stats["errors"] += 1

    def _is_duplicate(self, text: str) -> bool:
        """Check if message is a recent duplicate."""
        with self._recent_lock:
            # TTL cache handles expiration automatically
            return text in self._recent_messages

    def _track_message(self, text: str) -> None:
        """Track message to detect duplicates."""
        with self._recent_lock:
            # TTL cache handles size and expiration automatically
            self._recent_messages[text] = time.time()

    def _log_statistics(self) -> None:
        """Log playback statistics."""
        stats = self.get_stats()
        self._logger.info(
            f"Audio Queue Stats: "
            f"queued={stats['messages_queued']}, "
            f"played={stats['messages_played']}, "
            f"skipped_dup={stats['messages_skipped_duplicate']}, "
            f"skipped_full={stats['messages_skipped_full']}, "
            f"errors={stats['errors']}"
        )

__init__(tts_callable, max_queue_size=50, duplicate_timeout=2.0, logger=None)

Initialize the audio queue manager.

Parameters:

Name Type Description Default
tts_callable Callable[[str], None]

Synchronous function that performs TTS (blocks until done).

required
max_queue_size int

Maximum queued messages (older discarded if full).

50
duplicate_timeout float

Skip duplicate messages within this window (seconds).

2.0
logger Optional[Logger]

Optional logger instance (creates one if None).

None
Source code in text2speech/audio_queue.py
def __init__(
    self,
    tts_callable: Callable[[str], None],
    max_queue_size: int = 50,
    duplicate_timeout: float = 2.0,
    logger: Optional[logging.Logger] = None,
) -> None:
    """Initialize the audio queue manager.

    Args:
        tts_callable: Synchronous function that performs TTS (blocks until done).
        max_queue_size: Maximum queued messages (older discarded if full).
        duplicate_timeout: Skip duplicate messages within this window (seconds).
        logger: Optional logger instance (creates one if None).
    """
    self._tts_callable: Callable[[str], None] = tts_callable
    self._max_queue_size: int = max_queue_size
    self._duplicate_timeout: float = duplicate_timeout

    # Logging
    self._logger: logging.Logger = logger or logging.getLogger(__name__)

    # Priority queue (uses __lt__ from AudioTask for ordering)
    self._queue: queue.PriorityQueue[AudioTask] = queue.PriorityQueue(maxsize=max_queue_size)

    # Worker thread
    self._worker_thread: Optional[threading.Thread] = None
    self._shutdown_event: threading.Event = threading.Event()

    # Recent messages tracking (for duplicate detection) using TTL cache
    self._recent_messages: TTLCache[str, float] = TTLCache(maxsize=MAX_RECENT_MESSAGES, ttl=duplicate_timeout)
    self._recent_lock: threading.Lock = threading.Lock()

    # Statistics
    self._stats: Dict[str, int] = {
        "messages_queued": 0,
        "messages_played": 0,
        "messages_skipped_duplicate": 0,
        "messages_skipped_full": 0,
        "errors": 0,
    }
    self._stats_lock: threading.Lock = threading.Lock()

start()

Start the worker thread.

Source code in text2speech/audio_queue.py
def start(self) -> None:
    """Start the worker thread."""
    if self._worker_thread is not None and self._worker_thread.is_alive():
        self._logger.warning("Worker thread already running")
        return

    self._shutdown_event.clear()
    self._worker_thread = threading.Thread(target=self._worker_loop, daemon=True, name="AudioQueueWorker")
    self._worker_thread.start()
    self._logger.debug("Audio queue manager started")

shutdown(timeout=5.0)

Stop the worker thread and wait for completion.

Parameters:

Name Type Description Default
timeout float

Maximum seconds to wait for shutdown.

5.0
Source code in text2speech/audio_queue.py
def shutdown(self, timeout: float = 5.0) -> None:
    """Stop the worker thread and wait for completion.

    Args:
        timeout: Maximum seconds to wait for shutdown.
    """
    if self._worker_thread is None:
        return

    self._logger.debug("Shutting down audio queue manager...")
    self._shutdown_event.set()

    # Signal queue to unblock
    try:
        # Add a sentinel task to wake up worker
        self._queue.put_nowait(AudioTask("", priority=-1000))
    except queue.Full:
        pass

    self._worker_thread.join(timeout=timeout)

    if self._worker_thread.is_alive():
        self._logger.warning("Worker thread did not shut down cleanly")
    else:
        self._logger.debug("Audio queue manager shut down successfully")

    # Log final stats
    self._log_statistics()

enqueue(text, priority=0)

Queue a message for audio playback (non-blocking).

Parameters:

Name Type Description Default
text str

Message to speak.

required
priority int

Priority (higher = more urgent, range 0-100).

0

Returns:

Name Type Description
bool bool

True if queued successfully, False if skipped/failed.

Source code in text2speech/audio_queue.py
def enqueue(self, text: str, priority: int = 0) -> bool:
    """Queue a message for audio playback (non-blocking).

    Args:
        text: Message to speak.
        priority: Priority (higher = more urgent, range 0-100).

    Returns:
        bool: True if queued successfully, False if skipped/failed.
    """
    if not text or not text.strip():
        return False

    # Check for duplicates
    if self._is_duplicate(text):
        with self._stats_lock:
            self._stats["messages_skipped_duplicate"] += 1
        self._logger.debug(f"Skipped duplicate: {text[:50]}")
        return False

    # Create task
    task = AudioTask(text=text, priority=priority)

    # Try to queue (non-blocking)
    try:
        self._queue.put_nowait(task)

        with self._stats_lock:
            self._stats["messages_queued"] += 1

        # Track recent message
        self._track_message(text)

        self._logger.debug(f"Queued (priority={priority}): {text[:50]}")
        return True

    except queue.Full:
        # Queue full - log and skip
        with self._stats_lock:
            self._stats["messages_skipped_full"] += 1
        self._logger.warning(f"Queue full, skipped: {text[:50]}")
        return False

clear_queue()

Clear all pending messages from queue.

Source code in text2speech/audio_queue.py
def clear_queue(self) -> None:
    """Clear all pending messages from queue."""
    cleared = 0
    try:
        while True:
            self._queue.get_nowait()
            cleared += 1
    except queue.Empty:
        pass

    if cleared > 0:
        self._logger.info(f"Cleared {cleared} messages from queue")

get_stats()

Get playback statistics.

Source code in text2speech/audio_queue.py
def get_stats(self) -> MappingProxyType[str, int]:
    """Get playback statistics."""
    with self._stats_lock:
        return MappingProxyType(self._stats.copy())

is_running()

Check if worker thread is active.

Source code in text2speech/audio_queue.py
def is_running(self) -> bool:
    """Check if worker thread is active."""
    return self._worker_thread is not None and self._worker_thread.is_alive()

Config

text2speech.config.Config

Configuration manager for text2speech settings.

Source code in text2speech/config.py
class Config:
    """Configuration manager for text2speech settings."""

    DEFAULT_CONFIG: Dict[str, Any] = {
        "audio": {
            "output_device": None,
            "default_volume": DEFAULT_VOLUME,
            "sample_rate": DEFAULT_SAMPLE_RATE,
        },
        "tts": {
            "engine": "kokoro",
            "kokoro": {
                "lang_code": "a",
                "voice": "af_heart",
                "speed": DEFAULT_SPEED,
                "split_pattern": r"\n+",
            },
            "elevenlabs": {
                "voice": "Brian",
                "model": "eleven_multilingual_v2",
            },
        },
        "logging": {
            "verbose": False,
            "log_file": None,
            "log_level": "INFO",
        },
        "performance": {
            "use_gpu": True,
            "num_threads": 1,
        },
    }

    def __init__(self, config_path: Optional[str] = None) -> None:
        """Initialize configuration.

        Args:
            config_path (Optional[str]): Path to config.yaml file. If None, searches in common locations.
        """
        self._config: Dict[str, Any] = self.DEFAULT_CONFIG.copy()

        if config_path is None:
            config_path = self._find_config_file()

        if config_path and os.path.exists(config_path):
            self.load_from_file(config_path)

    def _find_config_file(self) -> Optional[str]:
        """Search for config.yaml in common locations.

        Returns:
            Path to config file if found, None otherwise.
        """
        search_paths: List[str] = [
            "config.yaml",
            "config.yml",
            os.path.expanduser("~/.text2speech/config.yaml"),
            os.path.expanduser("~/.config/text2speech/config.yaml"),
            "/etc/text2speech/config.yaml",
        ]

        for path in search_paths:
            if os.path.exists(path):
                return path

        return None

    def load_from_file(self, config_path: str) -> None:
        """Load configuration from YAML file.

        Args:
            config_path (str): Path to the YAML configuration file.

        Raises:
            FileNotFoundError: If config file doesn't exist.
            yaml.YAMLError: If config file is invalid YAML.
        """
        try:
            with open(config_path, "r") as f:
                user_config: Dict[str, Any] = yaml.safe_load(f) or {}

            # Deep merge user config with defaults
            self._config = self._deep_merge(self.DEFAULT_CONFIG.copy(), user_config)

        except FileNotFoundError:
            raise FileNotFoundError(f"Config file not found: {config_path}")
        except yaml.YAMLError as e:
            raise yaml.YAMLError(f"Invalid YAML in config file: {e}")

    @staticmethod
    def _deep_merge(base: Dict[str, Any], update: Dict[str, Any]) -> Dict[str, Any]:
        """Deep merge two dictionaries.

        Args:
            base (Dict[str, Any]): Base dictionary.
            update (Dict[str, Any]): Dictionary with updates to apply.

        Returns:
            Dict[str, Any]: Merged dictionary.
        """
        result: Dict[str, Any] = base.copy()

        for key, value in update.items():
            if key in result and isinstance(result[key], dict) and isinstance(value, dict):
                result[key] = Config._deep_merge(result[key], value)
            else:
                result[key] = value

        return result

    def get(self, key_path: str, default: Any = None) -> Any:
        """Get configuration value using dot notation.

        Args:
            key_path (str): Dot-separated path to config value (e.g., 'audio.output_device').
            default (Any): Default value if key not found.

        Returns:
            Any: Configuration value or default.
        """
        keys: List[str] = key_path.split(".")
        value: Any = self._config

        for key in keys:
            if isinstance(value, dict) and key in value:
                value = value[key]
            else:
                return default

        return value

    def set(self, key_path: str, value: Any) -> None:
        """Set configuration value using dot notation.

        Args:
            key_path (str): Dot-separated path to config value (e.g., 'audio.output_device').
            value (Any): Value to set.
        """
        keys: List[str] = key_path.split(".")
        config: Dict[str, Any] = self._config

        for key in keys[:-1]:
            if key not in config:
                config[key] = {}
            config = config[key]

        config[keys[-1]] = value

    def to_dict(self) -> Dict[str, Any]:
        """Get full configuration as dictionary.

        Returns:
            Complete configuration dictionary.
        """
        return self._config.copy()

    def save_to_file(self, config_path: str) -> None:
        """Save current configuration to YAML file.

        Args:
            config_path (str): Path where to save the configuration.

        Raises:
            ValueError: If path is outside allowed directories.
        """
        path = Path(config_path).resolve()

        # Restrict to safe directories
        allowed_prefixes = [
            Path.home().resolve(),
            Path.cwd().resolve(),
            Path(tempfile.gettempdir()).resolve(),
        ]

        is_allowed = False
        for prefix in allowed_prefixes:
            try:
                path.relative_to(prefix)
                is_allowed = True
                break
            except ValueError:
                continue

        if not is_allowed:
            raise ValueError(f"Config path outside allowed directories: {config_path}")

        # Create directory if it doesn't exist
        path.parent.mkdir(parents=True, exist_ok=True)

        with open(path, "w") as f:
            yaml.dump(self._config, f, default_flow_style=False, sort_keys=False)

    @property
    def audio_output_device(self) -> Optional[int]:
        """Get audio output device ID."""
        val = self.get("audio.output_device")
        return int(val) if val is not None else None

    @property
    def audio_volume(self) -> float:
        """Get default audio volume."""
        return float(self.get("audio.default_volume", DEFAULT_VOLUME))

    @property
    def sample_rate(self) -> int:
        """Get sample rate."""
        return int(self.get("audio.sample_rate", DEFAULT_SAMPLE_RATE))

    @property
    def tts_engine(self) -> str:
        """Get TTS engine name."""
        return str(self.get("tts.engine", "kokoro"))

    @property
    def kokoro_lang_code(self) -> str:
        """Get Kokoro language code."""
        return str(self.get("tts.kokoro.lang_code", "a"))

    @property
    def kokoro_voice(self) -> str:
        """Get Kokoro voice."""
        return str(self.get("tts.kokoro.voice", "af_heart"))

    @property
    def kokoro_speed(self) -> float:
        """Get Kokoro speech speed."""
        return float(self.get("tts.kokoro.speed", DEFAULT_SPEED))

    @property
    def kokoro_split_pattern(self) -> str:
        """Get Kokoro text split pattern."""
        return str(self.get("tts.kokoro.split_pattern", r"\n+"))

    @property
    def verbose(self) -> bool:
        """Get verbose logging setting."""
        return bool(self.get("logging.verbose", False))

    @property
    def use_gpu(self) -> bool:
        """Get GPU usage setting."""
        return bool(self.get("performance.use_gpu", True))

audio_output_device property

Get audio output device ID.

audio_volume property

Get default audio volume.

sample_rate property

Get sample rate.

__init__(config_path=None)

Initialize configuration.

Parameters:

Name Type Description Default
config_path Optional[str]

Path to config.yaml file. If None, searches in common locations.

None
Source code in text2speech/config.py
def __init__(self, config_path: Optional[str] = None) -> None:
    """Initialize configuration.

    Args:
        config_path (Optional[str]): Path to config.yaml file. If None, searches in common locations.
    """
    self._config: Dict[str, Any] = self.DEFAULT_CONFIG.copy()

    if config_path is None:
        config_path = self._find_config_file()

    if config_path and os.path.exists(config_path):
        self.load_from_file(config_path)

load_from_file(config_path)

Load configuration from YAML file.

Parameters:

Name Type Description Default
config_path str

Path to the YAML configuration file.

required

Raises:

Type Description
FileNotFoundError

If config file doesn't exist.

YAMLError

If config file is invalid YAML.

Source code in text2speech/config.py
def load_from_file(self, config_path: str) -> None:
    """Load configuration from YAML file.

    Args:
        config_path (str): Path to the YAML configuration file.

    Raises:
        FileNotFoundError: If config file doesn't exist.
        yaml.YAMLError: If config file is invalid YAML.
    """
    try:
        with open(config_path, "r") as f:
            user_config: Dict[str, Any] = yaml.safe_load(f) or {}

        # Deep merge user config with defaults
        self._config = self._deep_merge(self.DEFAULT_CONFIG.copy(), user_config)

    except FileNotFoundError:
        raise FileNotFoundError(f"Config file not found: {config_path}")
    except yaml.YAMLError as e:
        raise yaml.YAMLError(f"Invalid YAML in config file: {e}")

get(key_path, default=None)

Get configuration value using dot notation.

Parameters:

Name Type Description Default
key_path str

Dot-separated path to config value (e.g., 'audio.output_device').

required
default Any

Default value if key not found.

None

Returns:

Name Type Description
Any Any

Configuration value or default.

Source code in text2speech/config.py
def get(self, key_path: str, default: Any = None) -> Any:
    """Get configuration value using dot notation.

    Args:
        key_path (str): Dot-separated path to config value (e.g., 'audio.output_device').
        default (Any): Default value if key not found.

    Returns:
        Any: Configuration value or default.
    """
    keys: List[str] = key_path.split(".")
    value: Any = self._config

    for key in keys:
        if isinstance(value, dict) and key in value:
            value = value[key]
        else:
            return default

    return value

set(key_path, value)

Set configuration value using dot notation.

Parameters:

Name Type Description Default
key_path str

Dot-separated path to config value (e.g., 'audio.output_device').

required
value Any

Value to set.

required
Source code in text2speech/config.py
def set(self, key_path: str, value: Any) -> None:
    """Set configuration value using dot notation.

    Args:
        key_path (str): Dot-separated path to config value (e.g., 'audio.output_device').
        value (Any): Value to set.
    """
    keys: List[str] = key_path.split(".")
    config: Dict[str, Any] = self._config

    for key in keys[:-1]:
        if key not in config:
            config[key] = {}
        config = config[key]

    config[keys[-1]] = value

save_to_file(config_path)

Save current configuration to YAML file.

Parameters:

Name Type Description Default
config_path str

Path where to save the configuration.

required

Raises:

Type Description
ValueError

If path is outside allowed directories.

Source code in text2speech/config.py
def save_to_file(self, config_path: str) -> None:
    """Save current configuration to YAML file.

    Args:
        config_path (str): Path where to save the configuration.

    Raises:
        ValueError: If path is outside allowed directories.
    """
    path = Path(config_path).resolve()

    # Restrict to safe directories
    allowed_prefixes = [
        Path.home().resolve(),
        Path.cwd().resolve(),
        Path(tempfile.gettempdir()).resolve(),
    ]

    is_allowed = False
    for prefix in allowed_prefixes:
        try:
            path.relative_to(prefix)
            is_allowed = True
            break
        except ValueError:
            continue

    if not is_allowed:
        raise ValueError(f"Config path outside allowed directories: {config_path}")

    # Create directory if it doesn't exist
    path.parent.mkdir(parents=True, exist_ok=True)

    with open(path, "w") as f:
        yaml.dump(self._config, f, default_flow_style=False, sort_keys=False)