Mastodon: Added user autocompletion module (not yet implemented on GUI)

This commit is contained in:
Manuel Cortez 2023-10-11 11:29:39 -06:00
parent a5dba08b06
commit 14f48e4bbe
No known key found for this signature in database
GPG Key ID: 9E0735CA15EFE790
9 changed files with 394 additions and 0 deletions

View File

@ -0,0 +1,2 @@
# -*- coding: utf-8 -*-
""" Autocompletion users for TWBlue. This package contains all needed code to support this feature, including automatic addition of users, management and code to show the autocompletion menu when an user is composing a post. """

View File

@ -0,0 +1,66 @@
# -*- coding: utf-8 -*-
""" Module to display the user autocompletion menu in post dialogs. """
import output
from . import storage
from . import wx_menu
class autocompletionUsers(object):
def __init__(self, window, session_id):
""" Class constructor. Displays a menu with users matching the specified pattern for autocompletion.
:param window: A wx control where the menu should be displayed. Normally this is going to be the wx.TextCtrl indicating the tweet's text or direct message recipient.
:type window: wx.Dialog
:param session_id: Session ID which calls this class. We will load the users database from this session.
:type session_id: str.
"""
super(autocompletionUsers, self).__init__()
self.window = window
self.db = storage.storage(session_id)
def show_menu(self, mode="mastodon"):
""" displays a menu with possible users matching the specified pattern.
this menu can be displayed in dialogs where an username is expected. For Mastodon's post dialogs, the string should start with an at symbol (@), otherwise it won't match the pattern.
Of course, users must be already loaded in database before attempting this.
If no users are found, an error message will be spoken.
:param mode: this controls how the dialog will behave. Possible values are 'mastodon' and 'free'. In mastodon mode, the matching pattern will be @user (@ is required), while in 'free' mode the matching pattern will be anything written in the text control.
:type mode: str
"""
if mode == "mastodon":
position = self.window.text.GetInsertionPoint()
text = self.window.text.GetValue()
text = text[:position]
try:
pattern = text.split()[-1]
except IndexError:
output.speak(_(u"You have to start writing"))
return
if pattern.startswith("@") == True:
menu = wx_menu.menu(self.window.text, pattern[1:], mode=mode)
users = self.db.get_users(pattern[1:])
if len(users) > 0:
menu.append_options(users)
self.window.PopupMenu(menu, self.window.text.GetPosition())
menu.destroy()
else:
output.speak(_(u"There are no results in your users database"))
else:
output.speak(_(u"Autocompletion only works for users."))
elif mode == "free":
text = self.window.cb.GetValue()
try:
pattern = text.split()[-1]
except IndexError:
output.speak(_(u"You have to start writing"))
return
menu = wx_menu.menu(self.window.cb, pattern, mode=mode)
users = self.db.get_users(pattern)
if len(users) > 0:
menu.append_options(users)
self.window.PopupMenu(menu, self.window.cb.GetPosition())
menu.destroy()
else:
output.speak(_(u"There are no results in your users database"))

View File

@ -0,0 +1,57 @@
# -*- coding: utf-8 -*-
""" Management of users in the local database for autocompletion. """
import time
import widgetUtils
from wxUI import commonMessageDialogs
from . import storage, wx_manage
from .mastodon import scan as mastodon
class autocompletionManage(object):
def __init__(self, session):
""" class constructor. Manages everything related to user autocompletion.
:param session: Sessiom where the autocompletion management has been requested.
:type session: sessions.base.Session.
"""
super(autocompletionManage, self).__init__()
self.session = session
# Instantiate database so we can perform modifications on it.
self.database = storage.storage(self.session.session_id)
def show_settings(self):
""" display user management dialog and connect events associated to it. """
self.dialog = wx_manage.autocompletionManageDialog()
self.users = self.database.get_all_users()
self.dialog.put_users(self.users)
widgetUtils.connect_event(self.dialog.add, widgetUtils.BUTTON_PRESSED, self.add_user)
widgetUtils.connect_event(self.dialog.remove, widgetUtils.BUTTON_PRESSED, self.remove_user)
self.dialog.get_response()
def update_list(self):
""" update users list in management dialog. This function is normallhy used after we modify the database in any way, so we can reload all users in the autocompletion user management list. """
item = self.dialog.users.get_selected()
self.dialog.users.clear()
self.users = self.database.get_all_users()
self.dialog.put_users(self.users)
self.dialog.users.select_item(item)
def add_user(self, *args, **kwargs):
""" Add a new username to the autocompletion database. """
usr = self.dialog.get_user()
if usr == False:
return
user_added = False
if self.session.type == "mastodon":
user_added = mastodon.add_user(session=self.session, database=self.database, user=usr)
if user_added == False:
self.dialog.show_invalid_user_error()
return
self.update_list()
def remove_user(self, *args, **kwargs):
""" Remove focused user from the autocompletion database. """
if commonMessageDialogs.delete_user_from_db() == widgetUtils.YES:
item = self.dialog.users.get_selected()
user = self.users[item]
self.database.remove_user(user[0])
self.update_list()

View File

@ -0,0 +1,103 @@
# -*- coding: utf-8 -*-
""" Scanning code for autocompletion feature on TWBlue. This module can retrieve user objects from the selected Mastodon account automatically. """
import time
import wx
import widgetUtils
import output
from pubsub import pub
from . import wx_scan
from extra.autocompletionUsers import manage, storage
class autocompletionScan(object):
def __init__(self, config, buffer, window):
""" Class constructor. This class will take care of scanning the selected Mastodon account to populate the database with users automatically upon request.
:param config: Config for the session that will be scanned in search for users.
:type config: dict
:param buffer: home buffer for the focused session.
:type buffer: controller.buffers.mastodon.base.baseBuffer
:param window: Main Window of TWBlue.
:type window:wx.Frame
"""
super(autocompletionScan, self).__init__()
self.config = config
self.buffer = buffer
self.window = window
def show_dialog(self):
""" displays a dialog to confirm which buffers should be scanned (followers or following users). """
self.dialog = wx_scan.autocompletionScanDialog()
self.dialog.set("friends", self.config["mysc"]["save_friends_in_autocompletion_db"])
self.dialog.set("followers", self.config["mysc"]["save_followers_in_autocompletion_db"])
if self.dialog.get_response() == widgetUtils.OK:
confirmation = wx_scan.confirm()
return confirmation
def prepare_progress_dialog(self):
self.progress_dialog = wx_scan.autocompletionScanProgressDialog()
# connect method to update progress dialog
pub.subscribe(self.on_update_progress, "on-update-progress")
self.progress_dialog.Show()
def on_update_progress(self):
wx.CallAfter(self.progress_dialog.progress_bar.Pulse)
def scan(self):
""" Attempts to add all users selected by current user to the autocomplete database. """
self.config["mysc"]["save_friends_in_autocompletion_db"] = self.dialog.get("friends")
self.config["mysc"]["save_followers_in_autocompletion_db"] = self.dialog.get("followers")
output.speak(_("Updating database... You can close this window now. A message will tell you when the process finishes."))
database = storage.storage(self.buffer.session.session_id)
percent = 0
users = []
if self.dialog.get("friends") == True:
first_page = self.buffer.session.api.account_following(id=self.buffer.session.db["user_id"], limit=80)
pub.sendMessage("on-update-progress")
if first_page != None:
for user in first_page:
users.append(user)
next_page = first_page
while next_page != None:
next_page = self.buffer.session.api.fetch_next(next_page)
pub.sendMessage("on-update-progress")
if next_page == None:
break
for user in next_page:
users.append(user)
# same step, but for followers.
if self.dialog.get("followers") == True:
first_page = self.buffer.session.api.account_followers(id=self.buffer.session.db["user_id"], limit=80)
pub.sendMessage("on-update-progress")
if first_page != None:
for user in first_page:
if user not in users:
users.append(user)
next_page = first_page
while next_page != None:
next_page = self.buffer.session.api.fetch_next(next_page)
pub.sendMessage("on-update-progress")
if next_page == None:
break
for user in next_page:
if user not in users:
users.append(user)
# except TweepyException:
# wx.CallAfter(wx_scan.show_error)
# return self.done()
for user in users:
name = user.display_name if user.display_name != None and user.display_name != "" else user.username
database.set_user(user.acct, name, 1)
wx.CallAfter(wx_scan .show_success, len(users))
self.done()
def done(self):
wx.CallAfter(self.progress_dialog.Destroy)
wx.CallAfter(self.dialog.Destroy)
pub.unsubscribe(self.on_update_progress, "on-update-progress")
def add_user(session, database, user):
""" Adds an user to the database. """
user = session.api.account_lookup(user)
if user != None:
name = user.display_name if user.display_name != None and user.display_name != "" else user.username
database.set_user(user.acct, name, 1)

View File

@ -0,0 +1,45 @@
# -*- coding: utf-8 -*-
import wx
import widgetUtils
import application
class autocompletionScanDialog(widgetUtils.BaseDialog):
def __init__(self):
super(autocompletionScanDialog, self).__init__(parent=None, id=-1, title=_(u"Autocomplete users' settings"))
panel = wx.Panel(self)
sizer = wx.BoxSizer(wx.VERTICAL)
self.followers = wx.CheckBox(panel, -1, _("Add followers to database"))
self.friends = wx.CheckBox(panel, -1, _("Add following to database"))
sizer.Add(self.followers, 0, wx.ALL, 5)
sizer.Add(self.friends, 0, wx.ALL, 5)
ok = wx.Button(panel, wx.ID_OK)
cancel = wx.Button(panel, wx.ID_CANCEL)
sizerBtn = wx.BoxSizer(wx.HORIZONTAL)
sizerBtn.Add(ok, 0, wx.ALL, 5)
sizer.Add(cancel, 0, wx.ALL, 5)
sizer.Add(sizerBtn, 0, wx.ALL, 5)
panel.SetSizer(sizer)
self.SetClientSize(sizer.CalcMin())
class autocompletionScanProgressDialog(widgetUtils.BaseDialog):
def __init__(self, *args, **kwargs):
super(autocompletionScanProgressDialog, self).__init__(parent=None, id=wx.ID_ANY, title=_("Updating autocompletion database"), *args, **kwargs)
panel = wx.Panel(self)
sizer = wx.BoxSizer(wx.VERTICAL)
self.progress_bar = wx.Gauge(parent=panel)
sizer.Add(self.progress_bar)
panel.SetSizerAndFit(sizer)
def confirm():
with wx.MessageDialog(None, _("This process will retrieve the users you selected from your Mastodon account, and add them to the user autocomplete database. Please note that if there are many users or you have tried to perform this action less than 15 minutes ago, TWBlue may reach a limit in API calls when trying to load the users into the database. If this happens, we will show you an error, in which case you will have to try this process again in a few minutes. If this process ends with no error, you will be redirected back to the account settings dialog. Do you want to continue?"), _("Attention"), style=wx.ICON_QUESTION|wx.YES_NO) as result:
if result.ShowModal() == wx.ID_YES:
return True
return False
def show_success(users):
with wx.MessageDialog(None, _("TWBlue has imported {} users successfully.").format(users), _("Done")) as dlg:
dlg.ShowModal()
def show_error():
with wx.MessageDialog(None, _("Error adding users from Mastodon. Please try again in about 15 minutes."), _("Error"), style=wx.ICON_ERROR) as dlg:
dlg.ShowModal()

View File

@ -0,0 +1,52 @@
# -*- coding: utf-8 -*-
import os, sqlite3, paths
class storage(object):
def __init__(self, session_id):
self.connection = sqlite3.connect(os.path.join(paths.config_path(), "%s/autocompletionUsers.dat" % (session_id)))
self.cursor = self.connection.cursor()
if self.table_exist("users") == False:
self.create_table()
def table_exist(self, table):
ask = self.cursor.execute("SELECT name FROM sqlite_master WHERE type='table' AND name='%s'" % (table))
answer = ask.fetchone()
if answer == None:
return False
else:
return True
def get_all_users(self):
self.cursor.execute("""select * from users""")
return self.cursor.fetchall()
def get_users(self, term):
self.cursor.execute("""SELECT * FROM users WHERE UPPER(user) LIKE :term OR UPPER(name) LIKE :term""", {"term": "%{}%".format(term.upper())})
return self.cursor.fetchall()
def set_user(self, screen_name, user_name, from_a_buffer):
self.cursor.execute("""insert or ignore into users values(?, ?, ?)""", (screen_name, user_name, from_a_buffer))
self.connection.commit()
def remove_user(self, user):
self.cursor.execute("""DELETE FROM users WHERE user = ?""", (user,))
self.connection.commit()
return self.cursor.fetchone()
def remove_by_buffer(self, bufferType):
""" Removes all users saved on a buffer. BufferType is 0 for no buffer, 1 for friends and 2 for followers"""
self.cursor.execute("""DELETE FROM users WHERE from_a_buffer = ?""", (bufferType,))
self.connection.commit()
return self.cursor.fetchone()
def create_table(self):
self.cursor.execute("""
create table users(
user TEXT UNIQUE,
name TEXT,
from_a_buffer INTEGER
)""")
def __del__(self):
self.cursor.close()
self.connection.close()

View File

@ -0,0 +1,44 @@
# -*- coding: utf-8 -*-
import wx
import widgetUtils
from multiplatform_widgets import widgets
import application
class autocompletionManageDialog(widgetUtils.BaseDialog):
def __init__(self):
super(autocompletionManageDialog, self).__init__(parent=None, id=-1, title=_(u"Manage Autocompletion database"))
panel = wx.Panel(self)
sizer = wx.BoxSizer(wx.VERTICAL)
label = wx.StaticText(panel, -1, _(u"Editing {0} users database").format(application.name,))
self.users = widgets.list(panel, _(u"Username"), _(u"Name"), style=wx.LC_REPORT)
sizer.Add(label, 0, wx.ALL, 5)
sizer.Add(self.users.list, 0, wx.ALL, 5)
self.add = wx.Button(panel, -1, _(u"Add user"))
self.remove = wx.Button(panel, -1, _(u"Remove user"))
optionsBox = wx.BoxSizer(wx.HORIZONTAL)
optionsBox.Add(self.add, 0, wx.ALL, 5)
optionsBox.Add(self.remove, 0, wx.ALL, 5)
sizer.Add(optionsBox, 0, wx.ALL, 5)
ok = wx.Button(panel, wx.ID_OK)
cancel = wx.Button(panel, wx.ID_CANCEL)
sizerBtn = wx.BoxSizer(wx.HORIZONTAL)
sizerBtn.Add(ok, 0, wx.ALL, 5)
sizer.Add(cancel, 0, wx.ALL, 5)
sizer.Add(sizerBtn, 0, wx.ALL, 5)
panel.SetSizer(sizer)
self.SetClientSize(sizer.CalcMin())
def put_users(self, users):
for i in users:
j = [i[0], i[1]]
self.users.insert_item(False, *j)
def get_user(self):
usr = False
userDlg = wx.TextEntryDialog(None, _(u"Twitter username"), _(u"Add user to database"))
if userDlg.ShowModal() == wx.ID_OK:
usr = userDlg.GetValue()
return usr
def show_invalid_user_error(self):
wx.MessageDialog(None, _(u"The user does not exist"), _(u"Error!"), wx.ICON_ERROR).ShowModal()

View File

@ -0,0 +1,25 @@
# -*- coding: utf-8 -*-
import wx
class menu(wx.Menu):
def __init__(self, window, pattern, mode):
super(menu, self).__init__()
self.window = window
self.pattern = pattern
self.mode = mode
def append_options(self, options):
for i in options:
item = wx.MenuItem(self, wx.ID_ANY, "%s (@%s)" % (i[1], i[0]))
self.Append(item)
self.Bind(wx.EVT_MENU, lambda evt, temp=i[0]: self.select_text(evt, temp), item)
def select_text(self, ev, text):
if self.mode == "mastodon":
self.window.ChangeValue(self.window.GetValue().replace("@"+self.pattern, "@"+text+" "))
elif self.mode == "free":
self.window.SetValue(self.window.GetValue().replace(self.pattern, text))
self.window.SetInsertionPointEnd()
def destroy(self):
self.Destroy()