Commit ddceab3e authored by Manuel Leithner's avatar Manuel Leithner
Browse files

Initial commit

parent 109ba77a
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
Slixmpp OMEMO plugin
Copyright (C) 2010 Nathanael C. Fritz
Copyright (C) 2019 Maxime “pep” Buquet <>
This file is part of slixmpp-omemo.
See the file LICENSE for copying permission.
import os
import re
import sys
import asyncio
import logging
import select
from getpass import getpass
from argparse import ArgumentParser
from pathlib import Path
from slixmpp import ClientXMPP, JID
from slixmpp.exceptions import IqTimeout, IqError
from slixmpp.stanza import Message
import slixmpp_omemo
from slixmpp_omemo import PluginCouldNotLoad, MissingOwnKey, EncryptionPrepareException
from slixmpp_omemo import UndecidedException, UntrustedException, NoAvailableSession
from omemo.exceptions import MissingBundleException
log = logging.getLogger(__name__)
# Used by the Bruh
class Bruh(ClientXMPP):
A simple Slixmpp bot that will echo encrypted messages it receives, along
with a short thank you message.
For details on how to build a client with slixmpp, look at examples in the
slixmpp repository.
eme_ns = 'eu.siacs.conversations.axolotl'
cmd_prefix = '!'
debug_level: int = LEVEL_DEBUG # or LEVEL_ERROR
def __init__(self, jid, password, file_name=None, watch_dest_jid=None):
ClientXMPP.__init__(self, jid, password)
self.file_to_watch = file_name
# This is NOT the destination JID in all cases, only when new lines
# appear in self.file_to_watch
self.watch_dest_jid = JID(watch_dest_jid)
self.prefix_re: re.Pattern = re.compile('^%s' % self.cmd_prefix)
self.cmd_re: re.Pattern = re.compile('^%s(?P<command>\w+)(?:\s+(?P<args>.*))?' % self.cmd_prefix)
self.add_event_handler("session_start", self.start)
self.add_event_handler("message", self.message_handler)
def start(self, _event) -> None:
Process the session_start event.
Typical actions for the session_start event are
requesting the roster and broadcasting an initial
presence stanza.
event -- An empty dictionary. The session_start
event does not provide any additional
def is_command(self, body: str) -> bool:
return self.prefix_re.match(body) is not None
async def handle_command(self, mto: JID, mtype: str, body: str) -> None:
match = self.cmd_re.match(body)
if match is None:
return None
groups = match.groupdict()
cmd = groups['command']
# args = groups['args']
if cmd == 'help':
await self.cmd_help(mto, mtype)
elif cmd == 'verbose':
await self.cmd_verbose(mto, mtype)
elif cmd == 'error':
await self.cmd_error(mto, mtype)
elif cmd == 'chain_length':
await self.cmd_chain_length(mto, mtype)
return None
async def cmd_help(self, mto: JID, mtype: str) -> None:
body = (
'I\'m the slixmpp-omemo echo bot! '
'The following commands are available:\n'
'{prefix}verbose Send message or reply with log messages\n'
'{prefix}error Send message or reply only on error\n'
return await self.encrypted_reply(mto, mtype, body)
async def cmd_verbose(self, mto: JID, mtype: str) -> None:
self.debug_level = LEVEL_DEBUG
body = '''Debug level set to 'verbose'.'''
return await self.encrypted_reply(mto, mtype, body)
async def cmd_error(self, mto: JID, mtype: str) -> None:
self.debug_level = LEVEL_ERROR
body = '''Debug level set to 'error'.'''
return await self.encrypted_reply(mto, mtype, body)
async def cmd_chain_length(self, mto: JID, mtype: str) -> None:
body = (
'lengths: %r\n' % self['xep_0384']._chain_lengths(mto) +
'should heartbeat: %r' % self['xep_0384'].should_heartbeat(mto)
return await self.encrypted_reply(mto, mtype, body)
def message_handler(self, msg: Message) -> None:
async def message(self, msg: Message, allow_untrusted: bool = False) -> None:
Process incoming message stanzas. Be aware that this also
includes MUC messages and error messages. It is usually
a good idea to check the messages's type before processing
or sending replies.
msg -- The received message stanza. See the documentation
for stanza objects and the Message stanza to see
how it may be used.
mfrom = mto = msg['from']
mtype = msg['type']
if mtype not in ('chat', 'normal'):
return None
if not self['xep_0384'].is_encrypted(msg):
if self.debug_level == LEVEL_DEBUG:
await self.plain_reply(mto, mtype, 'Echo unencrypted message:%(body)s' % msg)
return None
encrypted = msg['omemo_encrypted']
body = self['xep_0384'].decrypt_message(encrypted, mfrom, allow_untrusted)
decoded = body.decode('utf8')
if self.is_command(decoded):
await self.handle_command(mto, mtype, decoded)
elif self.debug_level == LEVEL_DEBUG:
await self.encrypted_reply(mto, mtype, 'Echo: %s' % decoded)
return None
except (MissingOwnKey,):
# The message is missing our own key, it was not encrypted for
# us, and we can't decrypt it.
await self.plain_reply(
mto, mtype,
'Error: Message not encrypted for me.',
return None
except (NoAvailableSession,) as exn:
# We received a message from that contained a session that we
# don't know about (deleted session storage, etc.). We can't
# decrypt the message, and it's going to be lost.
# Here, as we need to initiate a new encrypted session, it is
# best if we send an encrypted message directly. XXX: Is it
# where we talk about self-healing messages?
await self.encrypted_reply(
mto, mtype,
'Error: Message uses an encrypted '
'session I don\'t know about.',
return None
except (UndecidedException, UntrustedException) as exn:
# We received a message from an untrusted device. We can
# choose to decrypt the message nonetheless, with the
# `allow_untrusted` flag on the `decrypt_message` call, which
# we will do here. This is only possible for decryption,
# encryption will require us to decide if we trust the device
# or not. Clients _should_ indicate that the message was not
# trusted, or in undecided state, if they decide to decrypt it
# anyway.
await self.plain_reply(
mto, mtype,
"Error: Your device '%s' is not in my trusted devices." % exn.device,
# We resend, setting the `allow_untrusted` parameter to True.
await self.message(msg, allow_untrusted=True)
return None
except (EncryptionPrepareException,):
# Slixmpp tried its best, but there were errors it couldn't
# resolve. At this point you should have seen other exceptions
# and given a chance to resolve them already.
await self.plain_reply(mto, mtype, 'Error: I was not able to decrypt the message.')
return None
except (Exception,) as exn:
await self.plain_reply(mto, mtype, 'Error: Exception occured while attempting decryption.\n%r' % exn)
return None
async def plain_reply(self, mto: JID, mtype: str, body):
Helper to reply to messages
msg = self.make_message(mto=mto, mtype=mtype)
msg['body'] = body
return msg.send()
async def encrypted_reply(self, mto: JID, mtype: str, body):
"""Helper to reply with encrypted messages"""
msg = self.make_message(mto=mto, mtype=mtype)
msg['eme']['namespace'] = self.eme_ns
msg['eme']['name'] = self['xep_0380'].mechanisms[self.eme_ns]
expect_problems = {} # type: Optional[Dict[JID, List[int]]]
while True:
# `encrypt_message` excepts the plaintext to be sent, a list of
# bare JIDs to encrypt to, and optionally a dict of problems to
# expect per bare JID.
# Note that this function returns an `<encrypted/>` object,
# and not a full Message stanza. This combined with the
# `recipients` parameter that requires for a list of JIDs,
# allows you to encrypt for 1:1 as well as groupchats (MUC).
# `expect_problems`: See EncryptionPrepareException handling.
recipients = [mto]
encrypt = await self['xep_0384'].encrypt_message(body, recipients, expect_problems)
return msg.send()
except UndecidedException as exn:
# The library prevents us from sending a message to an
# untrusted/undecided barejid, so we need to make a decision here.
# This is where you prompt your user to ask what to do. In
# this bot we will automatically trust undecided recipients.
self['xep_0384'].trust(exn.bare_jid, exn.device, exn.ik)
# TODO: catch NoEligibleDevicesException
except EncryptionPrepareException as exn:
# This exception is being raised when the library has tried
# all it could and doesn't know what to do anymore. It
# contains a list of exceptions that the user must resolve, or
# explicitely ignore via `expect_problems`.
# TODO: We might need to bail out here if errors are the same?
for error in exn.errors:
if isinstance(error, MissingBundleException):
# We choose to ignore MissingBundleException. It seems
# to be somewhat accepted that it's better not to
# encrypt for a device if it has problems and encrypt
# for the rest, rather than error out. The "faulty"
# device won't be able to decrypt and should display a
# generic message. The receiving end-user at this
# point can bring up the issue if it happens.
mto, mtype,
'Could not find keys for device "%d" of recipient "%s". Skipping.' %
(error.device, error.bare_jid),
jid = JID(error.bare_jid)
device_list = expect_problems.setdefault(jid, [])
except (IqError, IqTimeout) as exn:
mto, mtype,
'An error occured while fetching information on a recipient.\n%r' % exn,
return None
except Exception as exn:
await self.plain_reply(
mto, mtype,
'An error occured while attempting to encrypt.\n%r' % exn,
return None
async def watch_file(self):
if not self.file_to_watch or not self.watch_dest_jid:
log.warning('WATCHER: No file or destination')
return None
# Make sure the file exists
except PermissionError:
with open(self.file_to_watch, mode='rt', buffering=1) as fd:
# Seek to end, 2)
line = ''
while True:
tmp = fd.readline()
if tmp is not None and len(tmp) > 0:
# There's something new in the file
line += tmp
if line.endswith("\n"):
await self.encrypted_reply(
self.watch_dest_jid, 'chat', line)
line = ''
# Nothing new, suspend this thread
await asyncio.sleep(5.0)
if __name__ == '__main__':
# Setup the command line arguments.
parser = ArgumentParser(description=Bruh.__doc__)
# Output verbosity options.
parser.add_argument("-q", "--quiet", help="set logging to ERROR",
action="store_const", dest="loglevel",
const=logging.ERROR, default=logging.INFO)
parser.add_argument("-d", "--debug", help="set logging to DEBUG",
action="store_const", dest="loglevel",
const=logging.DEBUG, default=logging.INFO)
# JID and password options.
parser.add_argument("-j", "--jid", dest="jid",
help="JID to use")
parser.add_argument("-p", "--password", dest="password",
help="password to use")
parser.add_argument("-f", "--file", dest="file_name",
help="Log file to watch")
parser.add_argument("-t", "--to", dest="watch_dest_jid",
help="JID of the user to send new lines to")
# Data dir for omemo plugin
DATA_DIR = os.path.join(
parser.add_argument("--data-dir", dest="data_dir",
help="data directory", default=DATA_DIR)
args = parser.parse_args()
# Setup logging.
format='%(levelname)-8s %(message)s')
if args.jid is None:
args.jid = input("Username: ")
if args.password is None:
args.password = getpass("Password: ")
# Setup the Bruh and register plugins. Note that while plugins may
# have interdependencies, the order in which you register them does
# not matter.
# Ensure OMEMO data dir is created
os.makedirs(args.data_dir, exist_ok=True)
xmpp = Bruh(args.jid, args.password, args.file_name, args.watch_dest_jid)
xmpp.register_plugin('xep_0030') # Service Discovery
xmpp.register_plugin('xep_0199') # XMPP Ping
xmpp.register_plugin('xep_0380') # Explicit Message Encryption
'data_dir': args.data_dir,
except (PluginCouldNotLoad,):
log.exception('And error occured when loading the omemo plugin.')
# Connect to the XMPP server and start processing XMPP stanzas.
Supports Markdown
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment