fork download
  1. #! /usr/bin/env python3.5
  2. """
  3.  
  4. Fixing bluetooth stereo headphone/headset problem in ubuntu 16.04 and also debian jessie, with bluez5.
  5.  
  6. This version is adapted to work on ubuntu 16.10
  7.  
  8. Workaround for bug: https://b...content-available-to-author-only...d.net/ubuntu/+source/indicator-sound/+bug/1577197
  9. Run it with python3.5 or higher after pairing/connecting the bluetooth stereo headphone.
  10.  
  11. Only works with bluez5.
  12.  
  13. See `a2dp.py -h` for info.
  14.  
  15. https://g...content-available-to-author-only...b.com/pylover/d68be364adac5f946887b85e6ed6e7ae
  16. Vahid Mardani
  17. """
  18.  
  19. __version__ = '0.1.0'
  20.  
  21. import sys
  22. import re
  23. import asyncio
  24. import subprocess as sb
  25. import argparse
  26.  
  27.  
  28. HEX_PATTERN = '[0-9A-F]'
  29. BYTE_HEX_PATTERN = '%s{2}' % HEX_PATTERN
  30.  
  31.  
  32. class SubprocessError(Exception):
  33. pass
  34.  
  35.  
  36. class BluetoothctlProtocol(asyncio.SubprocessProtocol):
  37. def __init__(self, exit_future, echo=True):
  38. self.exit_future = exit_future
  39. self.transport = None
  40. self.output = None
  41. self.echo = echo
  42.  
  43. def listen_output(self):
  44. self.output = ''
  45.  
  46. def not_listen_output(self):
  47. self.output = None
  48.  
  49. def pipe_data_received(self, fd, raw):
  50. d = raw.decode()
  51. if self.echo:
  52. print(d, end='')
  53.  
  54. if self.output is not None:
  55. self.output += d
  56.  
  57. def process_exited(self):
  58. self.exit_future.set_result(True)
  59.  
  60. def connection_made(self, transport):
  61. self.transport = transport
  62. print('Connection MADE')
  63.  
  64. async def send_command(self, c):
  65. stdin_transport = self.transport.get_pipe_transport(0)
  66. stdin_transport._pipe.write(('%s\n' % c).encode())
  67.  
  68. async def search_in_output(self, expression, fail_expression=None):
  69. if self.output is None:
  70. return None
  71.  
  72. for l in self.output.splitlines():
  73. if fail_expression and re.search(fail_expression, l, re.IGNORECASE):
  74. raise SubprocessError('Expression "%s" failed with fail pattern: "%s"' % (l, fail_expression))
  75.  
  76. if re.search(expression, l, re.IGNORECASE):
  77. return True
  78.  
  79. async def send_and_wait(self, cmd, wait_expression, fail_expression='fail'):
  80. try:
  81. self.listen_output()
  82. await self.send_command(cmd)
  83. while not await self.search_in_output(wait_expression.lower(), fail_expression=fail_expression):
  84. await asyncio.sleep(.3)
  85. finally:
  86. self.not_listen_output()
  87.  
  88. async def disconnect(self, mac):
  89. await self.send_and_wait('disconnect %s' % ':'.join(mac), 'Successful disconnected')
  90.  
  91. async def connect(self, mac):
  92. await self.send_and_wait('connect %s' % ':'.join(mac), 'Connection successful')
  93.  
  94. async def trust(self, mac):
  95. await self.send_and_wait('trust %s' % ':'.join(mac), 'trust succeeded')
  96.  
  97. async def quit(self):
  98. await self.send_command('quit')
  99.  
  100. async def list_devices(self):
  101. result = []
  102. try:
  103. self.listen_output()
  104. await self.send_command('devices')
  105. await asyncio.sleep(1)
  106. for l in self.output.splitlines():
  107. m = re.match(r'^.*Device\s(?P<mac>[:A-F0-9]{17})\s(?P<name>.*)', l)
  108. if m:
  109. result.append(m.groups())
  110. return result
  111. finally:
  112. self.not_listen_output()
  113.  
  114. async def list_controllers(self):
  115. result = []
  116. try:
  117. self.listen_output()
  118. await self.send_command('list')
  119. await asyncio.sleep(1)
  120. for l in self.output.splitlines():
  121. m = re.match(r'^.*Controller\s(?P<mac>[:A-F0-9]{17})\s(?P<name>.*)', l)
  122. if m:
  123. result.append(m.groups())
  124. return result
  125. finally:
  126. self.not_listen_output()
  127.  
  128. async def select_device(self):
  129. devices = await self.list_devices()
  130. count = len(devices)
  131.  
  132. if count < 1:
  133. raise SubprocessError('There is no connected device.')
  134. elif count == 1:
  135. return devices[0]
  136.  
  137. for i, d in enumerate(devices):
  138. print('%d. %s %s' % (i+1, d[0], d[1]))
  139. print('Select device[1]:')
  140. selected = input()
  141. return devices[int(selected) - 1]
  142.  
  143.  
  144. async def execute_command(cmd):
  145. p = await asyncio.create_subprocess_shell(cmd, stdout=sb.PIPE, stderr=sb.PIPE)
  146. stdout, stderr = await p.communicate()
  147. stdout, stderr = \
  148. stdout.decode() if stdout is not None else '', \
  149. stderr.decode() if stderr is not None else ''
  150. if p.returncode != 0 or stderr.strip() != '':
  151. raise SubprocessError('Command: %s failed with status: %s\nstderr: %s' % (cmd, p.returncode, stderr))
  152. return stdout
  153.  
  154.  
  155. async def execute_find(cmd, pattern):
  156. stdout = await execute_command(cmd)
  157. match = re.search(pattern, stdout)
  158. if match:
  159. return match.group()
  160. return match
  161.  
  162.  
  163. async def find_dev_id(mac, tries=12):
  164.  
  165. async def fetch():
  166. return await execute_find(
  167. 'pactl list cards short',
  168. 'bluez_card.%s' % '_'.join(mac))
  169.  
  170. result = await fetch()
  171. while tries > 0 and result is None:
  172. await asyncio.sleep(.5)
  173. tries -= 1
  174. print('Cannot get device id, retrying %d more times.' % (tries+1))
  175. result = await fetch()
  176. return result
  177.  
  178.  
  179. async def find_sink(mac, tries=12):
  180.  
  181. async def fetch():
  182. return await execute_find(
  183. 'pacmd list-sinks', 'bluez_sink.%s' % '_'.join(mac))
  184.  
  185. result = await fetch()
  186. while tries > 0 and result is None:
  187. await asyncio.sleep(.5)
  188. tries -= 1
  189. print('Cannot find any sink, retrying %d more times.' % (tries+1))
  190. result = await fetch()
  191. return result
  192.  
  193.  
  194. async def set_profile(device_id, profile='a2dp_sink'):
  195. return await execute_command('pactl set-card-profile %s %s' % (device_id, profile))
  196.  
  197.  
  198. async def set_default_sink(sink):
  199. return await execute_command('pacmd set-default-sink %s' % sink)
  200.  
  201.  
  202. async def main(args):
  203. mac = args.mac
  204.  
  205. exit_future = asyncio.Future()
  206. transport, protocol = await loop.subprocess_exec(
  207. lambda: BluetoothctlProtocol(exit_future, echo=args.echo), 'bluetoothctl'
  208. )
  209.  
  210. try:
  211.  
  212. if mac is None:
  213. print('Selecting device:')
  214. mac, device_name = await protocol.select_device()
  215.  
  216. mac = mac.split(':' if ':' in mac else '_')
  217. print('Device MAC: %s' % ':'.join(mac))
  218.  
  219. device_id = await find_dev_id(mac, tries=5)
  220.  
  221. if device_id is None:
  222. await protocol.trust(mac)
  223. await protocol.connect(mac)
  224.  
  225. device_id = await find_dev_id(mac)
  226.  
  227. sink = await find_sink(mac)
  228. if sink is None:
  229. await set_profile(device_id)
  230. sink = await find_sink(mac)
  231.  
  232. print('Device ID: %s' % device_id)
  233. print('Sink: %s' % sink)
  234.  
  235. print('Turning off audio profile.')
  236. await set_profile(device_id, profile='off')
  237.  
  238. print('Disconnecting the device.')
  239. await protocol.disconnect(mac)
  240.  
  241. print('Connecting again.')
  242. await protocol.connect(mac)
  243.  
  244. print('Setting A2DP profile')
  245. device_id = await find_dev_id(mac)
  246. print('Device ID: %s' % device_id)
  247. await set_profile(device_id)
  248.  
  249. print('Updating default sink')
  250. await set_default_sink(sink)
  251.  
  252. except SubprocessError as ex:
  253. print(str(ex))
  254. return 1
  255. finally:
  256. print('Exiting bluetoothctl')
  257. await protocol.quit()
  258. await exit_future
  259.  
  260. # Close the stdout pipe
  261. transport.close()
  262.  
  263.  
  264. if __name__ == '__main__':
  265.  
  266. parser = argparse.ArgumentParser(prog=sys.argv[0])
  267. parser.add_argument('-e', '--echo', action='store_true', default=False)
  268. parser.add_argument('mac', nargs='?', default=None)
  269. args = parser.parse_args()
  270.  
  271. loop = asyncio.get_event_loop()
  272. sys.exit(loop.run_until_complete(main(args)))
Compilation error #stdin compilation error #stdout 0s 0KB
stdin
Standard input is empty
compilation info
prog.c:1:2: error: invalid preprocessing directive #!
 #! /usr/bin/env python3.5
  ^
prog.c:2:3: warning: missing terminating " character
 """
   ^
prog.c:2:1: error: expected identifier or ‘(’ before string constant
 """
 ^~
prog.c:2:3: error: missing terminating " character
 """
   ^
prog.c:13:5: error: stray ‘`’ in program
 See `a2dp.py -h` for info.
     ^
prog.c:13:16: error: stray ‘`’ in program
 See `a2dp.py -h` for info.
                ^
prog.c:17:3: warning: missing terminating " character
 """
   ^
prog.c:17:3: error: missing terminating " character
prog.c:19:15: warning: character constant too long for its type
 __version__ = '0.1.0'
               ^~~~~~~
prog.c:28:15: warning: character constant too long for its type
 HEX_PATTERN = '[0-9A-F]'
               ^~~~~~~~~~
prog.c:29:20: warning: character constant too long for its type
 BYTE_HEX_PATTERN = '%s{2}' % HEX_PATTERN
                    ^~~~~~~
prog.c:44:23: error: empty character constant
         self.output = ''
                       ^~
prog.c:52:26: error: empty character constant
             print(d, end='')
                          ^~
prog.c:62:15: warning: character constant too long for its type
         print('Connection MADE')
               ^~~~~~~~~~~~~~~~~
prog.c:66:38: warning: multi-character character constant [-Wmultichar]
         stdin_transport._pipe.write(('%s\n' % c).encode())
                                      ^~~~~~
prog.c:74:39: warning: character constant too long for its type
                 raise SubprocessError('Expression "%s" failed with fail pattern: "%s"' % (l, fail_expression))
                                       ^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
prog.c:79:73: warning: multi-character character constant [-Wmultichar]
     async def send_and_wait(self, cmd, wait_expression, fail_expression='fail'):
                                                                         ^~~~~~
prog.c:89:34: warning: character constant too long for its type
         await self.send_and_wait('disconnect %s' % ':'.join(mac), 'Successful disconnected')
                                  ^~~~~~~~~~~~~~~
prog.c:89:67: warning: character constant too long for its type
         await self.send_and_wait('disconnect %s' % ':'.join(mac), 'Successful disconnected')
                                                                   ^~~~~~~~~~~~~~~~~~~~~~~~~
prog.c:92:34: warning: character constant too long for its type
         await self.send_and_wait('connect %s' % ':'.join(mac), 'Connection successful')
                                  ^~~~~~~~~~~~
prog.c:92:64: warning: character constant too long for its type
         await self.send_and_wait('connect %s' % ':'.join(mac), 'Connection successful')
                                                                ^~~~~~~~~~~~~~~~~~~~~~~
prog.c:95:34: warning: character constant too long for its type
         await self.send_and_wait('trust %s' % ':'.join(mac), 'trust succeeded')
                                  ^~~~~~~~~~
prog.c:95:62: warning: character constant too long for its type
         await self.send_and_wait('trust %s' % ':'.join(mac), 'trust succeeded')
                                                              ^~~~~~~~~~~~~~~~~
prog.c:98:33: warning: multi-character character constant [-Wmultichar]
         await self.send_command('quit')
                                 ^~~~~~
prog.c:104:37: warning: character constant too long for its type
             await self.send_command('devices')
                                     ^~~~~~~~~
prog.c:107:31: warning: unknown escape sequence: '\s'
                 m = re.match(r'^.*Device\s(?P<mac>[:A-F0-9]{17})\s(?P<name>.*)', l)
                               ^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
prog.c:107:31: warning: unknown escape sequence: '\s'
prog.c:107:31: warning: character constant too long for its type
prog.c:118:37: warning: multi-character character constant [-Wmultichar]
             await self.send_command('list')
                                     ^~~~~~
prog.c:121:31: warning: unknown escape sequence: '\s'
                 m = re.match(r'^.*Controller\s(?P<mac>[:A-F0-9]{17})\s(?P<name>.*)', l)
                               ^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
prog.c:121:31: warning: unknown escape sequence: '\s'
prog.c:121:31: warning: character constant too long for its type
prog.c:133:35: warning: character constant too long for its type
             raise SubprocessError('There is no connected device.')
                                   ^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
prog.c:138:19: warning: character constant too long for its type
             print('%d. %s %s' % (i+1, d[0], d[1]))
                   ^~~~~~~~~~~
prog.c:139:15: warning: character constant too long for its type
         print('Select device[1]:')
               ^~~~~~~~~~~~~~~~~~~
prog.c:148:52: error: empty character constant
         stdout.decode() if stdout is not None else '', \
                                                    ^~
prog.c:149:52: error: empty character constant
         stderr.decode() if stderr is not None else ''
                                                    ^~
prog.c:150:47: error: empty character constant
     if p.returncode != 0 or stderr.strip() != '':
                                               ^~
prog.c:151:31: warning: character constant too long for its type
         raise SubprocessError('Command: %s failed with status: %s\nstderr: %s' % (cmd, p.returncode, stderr))
                               ^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
prog.c:167:13: warning: character constant too long for its type
             'pactl list cards short',
             ^~~~~~~~~~~~~~~~~~~~~~~~
prog.c:168:13: warning: character constant too long for its type
             'bluez_card.%s' % '_'.join(mac))
             ^~~~~~~~~~~~~~~
prog.c:174:15: warning: character constant too long for its type
         print('Cannot get device id, retrying %d more times.' % (tries+1))
               ^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
prog.c:183:13: warning: character constant too long for its type
             'pacmd list-sinks', 'bluez_sink.%s' % '_'.join(mac))
             ^~~~~~~~~~~~~~~~~~
prog.c:183:33: warning: character constant too long for its type
             'pacmd list-sinks', 'bluez_sink.%s' % '_'.join(mac))
                                 ^~~~~~~~~~~~~~~
prog.c:189:15: warning: character constant too long for its type
         print('Cannot find any sink, retrying %d more times.' % (tries+1))
               ^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
prog.c:194:42: warning: character constant too long for its type
 async def set_profile(device_id, profile='a2dp_sink'):
                                          ^~~~~~~~~~~
prog.c:195:34: warning: character constant too long for its type
     return await execute_command('pactl set-card-profile %s %s' % (device_id, profile))
                                  ^~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
prog.c:199:34: warning: character constant too long for its type
     return await execute_command('pacmd set-default-sink %s' % sink)
                                  ^~~~~~~~~~~~~~~~~~~~~~~~~~~
prog.c:207:68: warning: character constant too long for its type
         lambda: BluetoothctlProtocol(exit_future, echo=args.echo), 'bluetoothctl'
                                                                    ^~~~~~~~~~~~~~
prog.c:213:19: warning: character constant too long for its type
             print('Selecting device:')
                   ^~~~~~~~~~~~~~~~~~~
prog.c:217:15: warning: character constant too long for its type
         print('Device MAC: %s' % ':'.join(mac))
               ^~~~~~~~~~~~~~~~
prog.c:232:15: warning: character constant too long for its type
         print('Device ID: %s' % device_id)
               ^~~~~~~~~~~~~~~
prog.c:233:15: warning: character constant too long for its type
         print('Sink: %s' % sink)
               ^~~~~~~~~~
prog.c:235:15: warning: character constant too long for its type
         print('Turning off audio profile.')
               ^~~~~~~~~~~~~~~~~~~~~~~~~~~~
prog.c:236:46: warning: multi-character character constant [-Wmultichar]
         await set_profile(device_id, profile='off')
                                              ^~~~~
prog.c:238:15: warning: character constant too long for its type
         print('Disconnecting the device.')
               ^~~~~~~~~~~~~~~~~~~~~~~~~~~
prog.c:241:15: warning: character constant too long for its type
         print('Connecting again.')
               ^~~~~~~~~~~~~~~~~~~
prog.c:244:15: warning: character constant too long for its type
         print('Setting A2DP profile')
               ^~~~~~~~~~~~~~~~~~~~~~
prog.c:246:15: warning: character constant too long for its type
         print('Device ID: %s' % device_id)
               ^~~~~~~~~~~~~~~
prog.c:249:15: warning: character constant too long for its type
         print('Updating default sink')
               ^~~~~~~~~~~~~~~~~~~~~~~
prog.c:256:15: warning: character constant too long for its type
         print('Exiting bluetoothctl')
               ^~~~~~~~~~~~~~~~~~~~~~
prog.c:260:11: error: invalid preprocessing directive #Close; did you mean #else?
         # Close the stdout pipe
           ^~~~~
           else
prog.c:264:16: warning: character constant too long for its type
 if __name__ == '__main__':
                ^~~~~~~~~~
prog.c:267:25: warning: multi-character character constant [-Wmultichar]
     parser.add_argument('-e', '--echo', action='store_true', default=False)
                         ^~~~
prog.c:267:31: warning: character constant too long for its type
     parser.add_argument('-e', '--echo', action='store_true', default=False)
                               ^~~~~~~~
prog.c:267:48: warning: character constant too long for its type
     parser.add_argument('-e', '--echo', action='store_true', default=False)
                                                ^~~~~~~~~~~~
prog.c:268:25: warning: multi-character character constant [-Wmultichar]
     parser.add_argument('mac', nargs='?', default=None)
                         ^~~~~
stdout
Standard output is empty