#! /usr/bin/env python3.5 """ Fixing bluetooth stereo headphone/headset problem in ubuntu 16.04 and also debian jessie, with bluez5. This version is adapted to work on ubuntu 16.10 Workaround for bug: https://b...content-available-to-author-only...d.net/ubuntu/+source/indicator-sound/+bug/1577197 Run it with python3.5 or higher after pairing/connecting the bluetooth stereo headphone. Only works with bluez5. See `a2dp.py -h` for info. https://g...content-available-to-author-only...b.com/pylover/d68be364adac5f946887b85e6ed6e7ae Vahid Mardani """ __version__ = '0.1.0' import sys import re import asyncio import subprocess as sb import argparse HEX_PATTERN = '[0-9A-F]' BYTE_HEX_PATTERN = '%s{2}' % HEX_PATTERN class SubprocessError(Exception): pass class BluetoothctlProtocol(asyncio.SubprocessProtocol): def __init__(self, exit_future, echo=True): self.exit_future = exit_future self.transport = None self.output = None self.echo = echo def listen_output(self): self.output = '' def not_listen_output(self): self.output = None def pipe_data_received(self, fd, raw): d = raw.decode() if self.echo: print(d, end='') if self.output is not None: self.output += d def process_exited(self): self.exit_future.set_result(True) def connection_made(self, transport): self.transport = transport print('Connection MADE') async def send_command(self, c): stdin_transport = self.transport.get_pipe_transport(0) stdin_transport._pipe.write(('%s\n' % c).encode()) async def search_in_output(self, expression, fail_expression=None): if self.output is None: return None for l in self.output.splitlines(): if fail_expression and re.search(fail_expression, l, re.IGNORECASE): if re.search(expression, l, re.IGNORECASE): return True async def send_and_wait(self, cmd, wait_expression, fail_expression='fail'): try: self.listen_output() await self.send_command(cmd) while not await self.search_in_output(wait_expression.lower(), fail_expression=fail_expression): await asyncio.sleep(.3) finally: self.not_listen_output() async def disconnect(self, mac): await self.send_and_wait('disconnect %s' % ':'.join(mac), 'Successful disconnected') async def connect(self, mac): await self.send_and_wait('connect %s' % ':'.join(mac), 'Connection successful') async def trust(self, mac): await self.send_and_wait('trust %s' % ':'.join(mac), 'trust succeeded') async def quit(self): await self.send_command('quit') async def list_devices(self): result = [] try: self.listen_output() await self.send_command('devices') await asyncio.sleep(1) for l in self.output.splitlines(): m = re.match(r'^.*Device\s(?P<mac>[:A-F0-9]{17})\s(?P<name>.*)', l) if m: result.append(m.groups()) return result finally: self.not_listen_output() async def list_controllers(self): result = [] try: self.listen_output() await self.send_command('list') await asyncio.sleep(1) for l in self.output.splitlines(): m = re.match(r'^.*Controller\s(?P<mac>[:A-F0-9]{17})\s(?P<name>.*)', l) if m: result.append(m.groups()) return result finally: self.not_listen_output() async def select_device(self): devices = await self.list_devices() count = len(devices) if count < 1: elif count == 1: return devices[0] for i, d in enumerate(devices): print('%d. %s %s' % (i+1, d[0], d[1])) print('Select device[1]:') selected = input() return devices[int(selected) - 1] async def execute_command(cmd): p = await asyncio.create_subprocess_shell(cmd, stdout=sb.PIPE, stderr=sb.PIPE) stdout, stderr = await p.communicate() stdout, stderr = \ stdout.decode() if stdout is not None else '', \ stderr.decode() if stderr is not None else '' if p.returncode != 0 or stderr.strip() != '': raise SubprocessError('Command: %s failed with status: %s\nstderr: %s' % (cmd, p.returncode, stderr)) return stdout async def execute_find(cmd, pattern): stdout = await execute_command(cmd) match = re.search(pattern, stdout) if match: return match.group() return match async def find_dev_id(mac, tries=12): async def fetch(): return await execute_find( 'pactl list cards short', 'bluez_card.%s' % '_'.join(mac)) result = await fetch() while tries > 0 and result is None: await asyncio.sleep(.5) tries -= 1 print('Cannot get device id, retrying %d more times.' % (tries+1)) result = await fetch() return result async def find_sink(mac, tries=12): async def fetch(): return await execute_find( 'pacmd list-sinks', 'bluez_sink.%s' % '_'.join(mac)) result = await fetch() while tries > 0 and result is None: await asyncio.sleep(.5) tries -= 1 print('Cannot find any sink, retrying %d more times.' % (tries+1)) result = await fetch() return result async def set_profile(device_id, profile='a2dp_sink'): return await execute_command('pactl set-card-profile %s %s' % (device_id, profile)) async def set_default_sink(sink): return await execute_command('pacmd set-default-sink %s' % sink) async def main(args): mac = args.mac exit_future = asyncio.Future() transport, protocol = await loop.subprocess_exec( lambda: BluetoothctlProtocol(exit_future, echo=args.echo), 'bluetoothctl' ) try: if mac is None: print('Selecting device:') mac, device_name = await protocol.select_device() mac = mac.split(':' if ':' in mac else '_') print('Device MAC: %s' % ':'.join(mac)) device_id = await find_dev_id(mac, tries=5) if device_id is None: await protocol.trust(mac) await protocol.connect(mac) device_id = await find_dev_id(mac) sink = await find_sink(mac) if sink is None: await set_profile(device_id) sink = await find_sink(mac) print('Device ID: %s' % device_id) print('Sink: %s' % sink) print('Turning off audio profile.') await set_profile(device_id, profile='off') print('Disconnecting the device.') await protocol.disconnect(mac) print('Connecting again.') await protocol.connect(mac) print('Setting A2DP profile') device_id = await find_dev_id(mac) print('Device ID: %s' % device_id) await set_profile(device_id) print('Updating default sink') await set_default_sink(sink) except SubprocessError as ex: print(str(ex)) return 1 finally: print('Exiting bluetoothctl') await protocol.quit() await exit_future # Close the stdout pipe transport.close() if __name__ == '__main__': parser = argparse.ArgumentParser(prog=sys.argv[0]) parser.add_argument('-e', '--echo', action='store_true', default=False) parser.add_argument('mac', nargs='?', default=None) args = parser.parse_args() loop = asyncio.get_event_loop()
Standard input is empty
prog.c:1:2: error: invalid preprocessing directive #!
#! /usr/bin/env python3.5
^
prog.c:2:3: warning: missing terminating " character
"""
^
prog.c:2:1: error: expected identifier or ‘(’ before string constant
"""
^~
prog.c:2:3: error: missing terminating " character
"""
^
prog.c:13:5: error: stray ‘`’ in program
See `a2dp.py -h` for info.
^
prog.c:13:16: error: stray ‘`’ in program
See `a2dp.py -h` for info.
^
prog.c:17:3: warning: missing terminating " character
"""
^
prog.c:17:3: error: missing terminating " character
prog.c:19:15: warning: character constant too long for its type
__version__ = '0.1.0'
^~~~~~~
prog.c:28:15: warning: character constant too long for its type
HEX_PATTERN = '[0-9A-F]'
^~~~~~~~~~
prog.c:29:20: warning: character constant too long for its type
BYTE_HEX_PATTERN = '%s{2}' % HEX_PATTERN
^~~~~~~
prog.c:44:23: error: empty character constant
self.output = ''
^~
prog.c:52:26: error: empty character constant
print(d, end='')
^~
prog.c:62:15: warning: character constant too long for its type
print('Connection MADE')
^~~~~~~~~~~~~~~~~
prog.c:66:38: warning: multi-character character constant [-Wmultichar]
stdin_transport._pipe.write(('%s\n' % c).encode())
^~~~~~
prog.c:74:39: warning: character constant too long for its type
raise SubprocessError('Expression "%s" failed with fail pattern: "%s"' % (l, fail_expression))
^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
prog.c:79:73: warning: multi-character character constant [-Wmultichar]
async def send_and_wait(self, cmd, wait_expression, fail_expression='fail'):
^~~~~~
prog.c:89:34: warning: character constant too long for its type
await self.send_and_wait('disconnect %s' % ':'.join(mac), 'Successful disconnected')
^~~~~~~~~~~~~~~
prog.c:89:67: warning: character constant too long for its type
await self.send_and_wait('disconnect %s' % ':'.join(mac), 'Successful disconnected')
^~~~~~~~~~~~~~~~~~~~~~~~~
prog.c:92:34: warning: character constant too long for its type
await self.send_and_wait('connect %s' % ':'.join(mac), 'Connection successful')
^~~~~~~~~~~~
prog.c:92:64: warning: character constant too long for its type
await self.send_and_wait('connect %s' % ':'.join(mac), 'Connection successful')
^~~~~~~~~~~~~~~~~~~~~~~
prog.c:95:34: warning: character constant too long for its type
await self.send_and_wait('trust %s' % ':'.join(mac), 'trust succeeded')
^~~~~~~~~~
prog.c:95:62: warning: character constant too long for its type
await self.send_and_wait('trust %s' % ':'.join(mac), 'trust succeeded')
^~~~~~~~~~~~~~~~~
prog.c:98:33: warning: multi-character character constant [-Wmultichar]
await self.send_command('quit')
^~~~~~
prog.c:104:37: warning: character constant too long for its type
await self.send_command('devices')
^~~~~~~~~
prog.c:107:31: warning: unknown escape sequence: '\s'
m = re.match(r'^.*Device\s(?P<mac>[:A-F0-9]{17})\s(?P<name>.*)', l)
^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
prog.c:107:31: warning: unknown escape sequence: '\s'
prog.c:107:31: warning: character constant too long for its type
prog.c:118:37: warning: multi-character character constant [-Wmultichar]
await self.send_command('list')
^~~~~~
prog.c:121:31: warning: unknown escape sequence: '\s'
m = re.match(r'^.*Controller\s(?P<mac>[:A-F0-9]{17})\s(?P<name>.*)', l)
^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
prog.c:121:31: warning: unknown escape sequence: '\s'
prog.c:121:31: warning: character constant too long for its type
prog.c:133:35: warning: character constant too long for its type
raise SubprocessError('There is no connected device.')
^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
prog.c:138:19: warning: character constant too long for its type
print('%d. %s %s' % (i+1, d[0], d[1]))
^~~~~~~~~~~
prog.c:139:15: warning: character constant too long for its type
print('Select device[1]:')
^~~~~~~~~~~~~~~~~~~
prog.c:148:52: error: empty character constant
stdout.decode() if stdout is not None else '', \
^~
prog.c:149:52: error: empty character constant
stderr.decode() if stderr is not None else ''
^~
prog.c:150:47: error: empty character constant
if p.returncode != 0 or stderr.strip() != '':
^~
prog.c:151:31: warning: character constant too long for its type
raise SubprocessError('Command: %s failed with status: %s\nstderr: %s' % (cmd, p.returncode, stderr))
^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
prog.c:167:13: warning: character constant too long for its type
'pactl list cards short',
^~~~~~~~~~~~~~~~~~~~~~~~
prog.c:168:13: warning: character constant too long for its type
'bluez_card.%s' % '_'.join(mac))
^~~~~~~~~~~~~~~
prog.c:174:15: warning: character constant too long for its type
print('Cannot get device id, retrying %d more times.' % (tries+1))
^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
prog.c:183:13: warning: character constant too long for its type
'pacmd list-sinks', 'bluez_sink.%s' % '_'.join(mac))
^~~~~~~~~~~~~~~~~~
prog.c:183:33: warning: character constant too long for its type
'pacmd list-sinks', 'bluez_sink.%s' % '_'.join(mac))
^~~~~~~~~~~~~~~
prog.c:189:15: warning: character constant too long for its type
print('Cannot find any sink, retrying %d more times.' % (tries+1))
^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
prog.c:194:42: warning: character constant too long for its type
async def set_profile(device_id, profile='a2dp_sink'):
^~~~~~~~~~~
prog.c:195:34: warning: character constant too long for its type
return await execute_command('pactl set-card-profile %s %s' % (device_id, profile))
^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
prog.c:199:34: warning: character constant too long for its type
return await execute_command('pacmd set-default-sink %s' % sink)
^~~~~~~~~~~~~~~~~~~~~~~~~~~
prog.c:207:68: warning: character constant too long for its type
lambda: BluetoothctlProtocol(exit_future, echo=args.echo), 'bluetoothctl'
^~~~~~~~~~~~~~
prog.c:213:19: warning: character constant too long for its type
print('Selecting device:')
^~~~~~~~~~~~~~~~~~~
prog.c:217:15: warning: character constant too long for its type
print('Device MAC: %s' % ':'.join(mac))
^~~~~~~~~~~~~~~~
prog.c:232:15: warning: character constant too long for its type
print('Device ID: %s' % device_id)
^~~~~~~~~~~~~~~
prog.c:233:15: warning: character constant too long for its type
print('Sink: %s' % sink)
^~~~~~~~~~
prog.c:235:15: warning: character constant too long for its type
print('Turning off audio profile.')
^~~~~~~~~~~~~~~~~~~~~~~~~~~~
prog.c:236:46: warning: multi-character character constant [-Wmultichar]
await set_profile(device_id, profile='off')
^~~~~
prog.c:238:15: warning: character constant too long for its type
print('Disconnecting the device.')
^~~~~~~~~~~~~~~~~~~~~~~~~~~
prog.c:241:15: warning: character constant too long for its type
print('Connecting again.')
^~~~~~~~~~~~~~~~~~~
prog.c:244:15: warning: character constant too long for its type
print('Setting A2DP profile')
^~~~~~~~~~~~~~~~~~~~~~
prog.c:246:15: warning: character constant too long for its type
print('Device ID: %s' % device_id)
^~~~~~~~~~~~~~~
prog.c:249:15: warning: character constant too long for its type
print('Updating default sink')
^~~~~~~~~~~~~~~~~~~~~~~
prog.c:256:15: warning: character constant too long for its type
print('Exiting bluetoothctl')
^~~~~~~~~~~~~~~~~~~~~~
prog.c:260:11: error: invalid preprocessing directive #Close; did you mean #else?
# Close the stdout pipe
^~~~~
else
prog.c:264:16: warning: character constant too long for its type
if __name__ == '__main__':
^~~~~~~~~~
prog.c:267:25: warning: multi-character character constant [-Wmultichar]
parser.add_argument('-e', '--echo', action='store_true', default=False)
^~~~
prog.c:267:31: warning: character constant too long for its type
parser.add_argument('-e', '--echo', action='store_true', default=False)
^~~~~~~~
prog.c:267:48: warning: character constant too long for its type
parser.add_argument('-e', '--echo', action='store_true', default=False)
^~~~~~~~~~~~
prog.c:268:25: warning: multi-character character constant [-Wmultichar]
parser.add_argument('mac', nargs='?', default=None)
^~~~~
Standard output is empty