nordvpntray/venv/lib/python3.12/site-packages/pystray/_xorg.py
2024-12-23 11:03:07 +01:00

491 lines
15 KiB
Python

# coding=utf-8
# pystray
# Copyright (C) 2016-2022 Moses Palmér
#
# This program is free software: you can redistribute it and/or modify it under
# the terms of the GNU Lesser General Public License as published by the Free
# Software Foundation, either version 3 of the License, or (at your option) any
# later version.
#
# This program is distributed in the hope that it will be useful, but WITHOUT
# ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
# FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
# details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import contextlib
import functools
import six
import sys
import threading
import types
import PIL
import Xlib.display
import Xlib.threaded
import Xlib.XK
from six.moves import queue
from . import _base
# Create a display to verify that we have an X connection
display = Xlib.display.Display()
display.close()
del display
class XError(Exception):
"""An error that is thrown at the end of a code block managed by a
:func:`display_manager` if an *X* error occurred.
"""
pass
@contextlib.contextmanager
def display_manager(display):
"""Traps *X* errors and raises an :class:`XError` at the end if any
error occurred.
This handler also ensures that the :class:`Xlib.display.Display` being
managed is sync'd.
:param Xlib.display.Display display: The *X* display.
"""
errors = []
def handler(*args):
errors.append(args)
old_handler = display.set_error_handler(handler)
try:
yield
display.sync()
finally:
display.set_error_handler(old_handler)
if errors:
raise XError(errors)
class Icon(_base.Icon):
_XEMBED_VERSION = 0
_XEMBED_MAPPED = 1
_SYSTEM_TRAY_REQUEST_DOCK = 0
# We support only the default action
HAS_MENU = False
# We support no menu
HAS_MENU_RADIO = False
# No notification (yet)!
HAS_NOTIFICATION = False
def __init__(self, *args, **kwargs):
super(Icon, self).__init__(*args, **kwargs)
#: The properly scaled version of the icon image
self._icon_data = None
#: The window currently embedding this icon
self._systray_manager = None
# This is a mapping from X event codes to handlers used by the mainloop
self._message_handlers = {
Xlib.X.ButtonPress: self._on_button_press,
Xlib.X.ConfigureNotify: self._on_expose,
Xlib.X.DestroyNotify: self._on_destroy_notify,
Xlib.X.Expose: self._on_expose}
self._queue = queue.Queue()
# Connect to X
self._display = Xlib.display.Display()
with display_manager(self._display):
# Create the atoms; some of these are required when creating
# the window
self._create_atoms()
# Create the window and get a graphics context
self._window = self._create_window()
self._gc = self._window.create_gc()
# Rewrite the platform implementation methods to ensure they
# are executed in this thread
self._rewrite_implementation(
self._show,
self._hide,
self._update_icon,
self._update_title,
self._stop)
def __del__(self):
try:
# Destroying the window will stop the mainloop thread
if self._running:
self._stop()
if threading.current_thread().ident != self._thread.ident:
self._thread.join()
finally:
self._display.close()
def _show(self):
"""The implementation of :meth:`_show`, executed in the mainloop
thread.
"""
try:
self._assert_docked()
except AssertionError:
# There is no systray selection owner, so we cannot dock; ignore
# and dock later
self._log.error(
'Failed to dock icon', exc_info=True)
def _hide(self):
"""The implementation of :meth:`_hide`, executed in the mainloop
thread.
"""
if self._systray_manager:
self._undock_window()
def _update_icon(self):
"""The implementation of :meth:`_update_icon`, executed in the mainloop
thread.
"""
try:
self._assert_docked()
except AssertionError:
# If we are not docked, we cannot update the icon
self._log.error(
'Failed to dock icon', exc_info=True)
return
# Setting _icon_data to None will force regeneration of the icon
# from _icon
self._icon_data = None
self._draw()
self._icon_valid = True
def _update_title(self):
"""The implementation of :meth:`_update_title`, executed in the
mainloop thread.
"""
# The title is the window name
self._window.set_wm_name(self.title)
def _update_menu(self):
# Menus are not supported on X
pass
def _run(self):
self._mark_ready()
# Run the event loop
self._thread = threading.current_thread()
self._mainloop()
def _run_detached(self):
threading.Thread(target=lambda: self.run()).start()
def _stop(self):
"""Stops the mainloop.
"""
self._window.destroy()
self._display.flush()
def _mainloop(self):
"""The body of the main loop thread.
This method retrieves all events from *X* and makes sure to dispatch
clicks.
"""
try:
for event in self._events():
# If the systray window is destroyed, the icon has been hidden
if (event.type == Xlib.X.DestroyNotify and
event.window == self._window):
break
self._message_handlers.get(event.type, lambda e: None)(event)
except:
self._log.error(
'An error occurred in the main loop', exc_info=True)
def _on_button_press(self, event):
"""Handles ``Xlib.X.ButtonPress``.
This method calls the activate callback. It will only be called for
left button clicks.
"""
if event.detail == 1:
self()
def _on_destroy_notify(self, event):
"""Handles ``Xlib.X.DestroyNotify``.
This method clears :attr:`_systray_manager` if it is destroyed.
"""
# Handle only the systray manager window; the destroy notification
# for our own window is handled in the event loop
if event.window.id != self._systray_manager.id:
return
# Try to locate a new systray selection owner
self._systray_manager = None
try:
self._assert_docked()
except AssertionError:
# There is no new selection owner; we must retry later
self._log.error(
'Failed to dock icon', exc_info=True)
def _on_expose(self, event):
"""Handles ``Xlib.X.ConfigureNotify`` and ``Xlib.X.Expose``.
This method redraws the window.
"""
# Redraw only our own window
if event.window.id != self._window.id:
return
self._draw()
def _create_atoms(self):
"""Creates the atoms used by the *XEMBED* and *systray* specifications.
"""
self._xembed_info = self._display.intern_atom(
'_XEMBED_INFO')
self._net_system_tray_sx = self._display.intern_atom(
'_NET_SYSTEM_TRAY_S%d' % (
self._display.get_default_screen()))
self._net_system_tray_opcode = self._display.intern_atom(
'_NET_SYSTEM_TRAY_OPCODE')
def _rewrite_implementation(self, *args):
"""Overwrites the platform implementation methods with ones causing the
mainloop to execute the code instead.
:param args: The methods to rewrite.
"""
def dispatcher(original, atom):
@functools.wraps(original)
def inner(self):
# Just invoke the method if we are currently in the correct
# thread
if threading.current_thread().ident == self._thread.ident:
original()
else:
self._send_message(self._window, atom)
self._display.flush()
# Wait for the mainloop to execute the actual method, wait
# for completion and reraise any exceptions
result = self._queue.get()
if result is not True:
six.reraise(*result)
return types.MethodType(inner, self)
def wrapper(original):
@functools.wraps(original)
def inner():
try:
original()
self._queue.put(True)
except:
self._queue.put(sys.exc_info())
return inner
def on_client_message(event):
handlers.get(event.client_type, lambda: None)()
# Create the atoms and a mapping from atom to actual implementation
atoms = [
self._display.intern_atom(
'_PYSTRAY_%s' % original.__name__.upper())
for original in args]
handlers = {
atom: wrapper(original)
for original, atom in zip(args, atoms)}
# Replace the old methods
for original, atom in zip(args, atoms):
setattr(
self,
original.__name__,
dispatcher(original, atom))
# Make sure that we handle ClientMessage
self._message_handlers[Xlib.X.ClientMessage] = on_client_message
def _create_window(self):
"""Creates the system tray icon window.
:return: a window
"""
with display_manager(self._display):
# Create the window
screen = self._display.screen()
window = screen.root.create_window(
-1, -1, 1, 1, 0, screen.root_depth,
event_mask=Xlib.X.ExposureMask | Xlib.X.StructureNotifyMask,
window_class=Xlib.X.InputOutput)
flags = (
Xlib.Xutil.PPosition | Xlib.Xutil.PSize | Xlib.Xutil.PMinSize)
window.set_wm_class('%sSystemTrayIcon' % self.name, self.name)
window.set_wm_name(self.title)
window.set_wm_normal_hints(
flags=flags,
min_width=24,
min_height=24)
# Enable XEMBED for the window
window.change_property(self._xembed_info, self._xembed_info, 32, [
self._XEMBED_VERSION,
self._XEMBED_MAPPED])
return window
def _draw(self):
"""Paints the icon image.
"""
try:
dim = self._window.get_geometry()
self._assert_icon_data(dim.width, dim.height)
self._window.put_pil_image(self._gc, 0, 0, self._icon_data)
except Xlib.error.BadDrawable:
# The window has been destroyed; ignore
pass
def _assert_icon_data(self, width, height):
"""Asserts that the cached icon data matches the requested dimensions.
If no cached icon data exists, or its dimensions do not match the
requested size, the image is generated.
:param int width: The requested width.
:param int height: The requested height.
"""
if self._icon_data and self._icon_data.size == (width, height):
return
self._icon_data = PIL.Image.new(
'RGB',
(width, height))
self._icon_data.paste(self._icon.resize(
(width, height),
PIL.Image.LANCZOS))
self._icon_data.tostring = self._icon_data.tobytes
def _assert_docked(self):
"""Asserts that the icon is docked in the systray.
:raises AssertionError: if the window is not docked
"""
self._dock_window()
assert self._systray_manager
def _dock_window(self):
"""Docks the window in the systray.
"""
# Get the selection owner
systray_manager = self._get_systray_manager()
if not systray_manager:
return
self._systray_manager = systray_manager
# Request being docked
self._send_message(
self._systray_manager,
self._net_system_tray_opcode,
self._SYSTEM_TRAY_REQUEST_DOCK,
self._window.id)
# Make sure we get destroy notifications
systray_manager.change_attributes(
event_mask=Xlib.X.StructureNotifyMask)
self._display.flush()
self._systray_manager = systray_manager
def _undock_window(self):
"""Undocks the window from the systray.
"""
# Make sure we get do not get any notifications
try:
self._systray_manager.change_attributes(
event_mask=Xlib.X.NoEventMask)
except XError:
# The systray manager may have been destroyed
self._log.error(
'Failed to stop notifications', exc_info=True)
self._window.unmap()
self._window.reparent(self._display.screen().root, 0, 0)
self._systray_manager = None
self._display.flush()
def _get_systray_manager(self):
"""Returns the *X* window that owns the systray selection.
:return: the window owning the selection, or ``None`` if no window owns
it
"""
self._display.grab_server()
try:
systray_manager = self._display.get_selection_owner(
self._net_system_tray_sx)
finally:
self._display.ungrab_server()
self._display.flush()
if systray_manager != Xlib.X.NONE:
return self._display.create_resource_object(
'window',
systray_manager.id)
def _send_message(self, window, client_type, l0=0, l1=0, l2=0, l3=0):
"""Sends a generic client message message.
This method does not trap *X* errors; that is up to the caller.
:param int l0: Message specific data.
:param int l1: Message specific data.
:param int l2: Message specific data.
:param int l3: Message specific data.
"""
self._display.send_event(
window,
Xlib.display.event.ClientMessage(
type=Xlib.X.ClientMessage,
client_type=client_type,
window=window.id,
data=(
32,
(Xlib.X.CurrentTime, l0, l1, l2, l3))),
event_mask=Xlib.X.NoEventMask)
def _events(self):
"""Yields all events.
"""
while True:
event = self._display.next_event()
if not event:
break
else:
yield event