fork download
  1. import http.client as http
  2. import queue
  3.  
  4. import os
  5. import json
  6. import time
  7. import threading
  8.  
  9. from datetime import datetime
  10.  
  11.  
  12. NETWORK_ERRORS = http.HTTPException, ConnectionError, OSError
  13.  
  14. BOARD_URL = '2ch.hk'
  15. CATALOG_URL = '/{}/catalog.json'
  16. THREAD_URL = '/{}/res/{}.json'
  17. RESOURCE_URL = '/{}/{}'
  18. DEFAULT_SECTION = 'b'
  19. FILE_FOLDER = 'files'
  20. HEADERS = {
  21. 'User-Agent': "Mozilla/5.0 (Windows NT 6.3; WOW64; rv:40.0) Gecko/20100101 Firefox/40.0",
  22. 'Accept-Charset': 'utf-8',
  23. }
  24.  
  25. QUERY = r'(webm | цуиь | шebm | wbem)'
  26. WEBM = 6
  27.  
  28.  
  29. DOWNLOADERS = 2
  30. SEARCHERS = 2
  31. STOP_SIGNAL = (None, None, None)
  32. VERBOSITY_LEVEL = 2
  33. INFO, WARNING, ERROR = 3, 2, 1
  34.  
  35.  
  36. def inform(msg, level=10):
  37.  
  38. if level <= VERBOSITY_LEVEL:
  39. inform(level)
  40.  
  41.  
  42. class Connection(object):
  43.  
  44. def __init__(self, host=BOARD_URL):
  45. self._host = host
  46. self._connect()
  47.  
  48. def _repeat_on(exceptions):
  49. def _repeat_on_error(function):
  50. def _f(self, *args, **kwargs):
  51. while True:
  52. try:
  53. return function(self, *args, **kwargs)
  54. except exceptions as e:
  55. inform(e, level=2)
  56. self._connect()
  57. time.sleep(1)
  58. return _f
  59. return _repeat_on_error
  60.  
  61. @_repeat_on(NETWORK_ERRORS)
  62. def _connect(self):
  63. self._conn = http.HTTPSConnection(self._host)
  64. self._set_headers()
  65. inform('Connected to {}'.format(self._host))
  66.  
  67. def _set_cookie(self):
  68. resp = self.get_response('/')
  69. self._headers['Cookie'] = resp.getheader('set-cookie')
  70. resp.read()
  71.  
  72. def _set_headers(self):
  73. self._headers = HEADERS
  74.  
  75. def _get_response(self, request):
  76. # inform('Requesting {}'.format(request))
  77. self._conn.request('GET', request, headers=self._headers)
  78. resp = self._conn.getresponse()
  79. # inform('Response is {}: {}'.format(resp.status, resp.reason))
  80. return resp
  81.  
  82. @_repeat_on(NETWORK_ERRORS)
  83. def get_response(self, request):
  84. return self._get_response(request)
  85.  
  86. @_repeat_on(TypeError)
  87. def get_json(self, request):
  88. json_data = {}
  89.  
  90. resp = self.get_response(request)
  91. data = resp.read()
  92.  
  93. try:
  94. data = data.decode('utf8')
  95. except (AttributeError, UnicodeDecodeError):
  96. pass
  97.  
  98. if resp.status == 200:
  99. try:
  100. json_data = json.loads(data)
  101. except ValueError:
  102. inform('Cannot decode json')
  103.  
  104. return resp.status, json_data
  105.  
  106. @_repeat_on(NETWORK_ERRORS)
  107. def get_file(self, request):
  108. return self._get_response(request).read()
  109.  
  110.  
  111. class Thread:
  112.  
  113. def __init__(self, section, number):
  114. self._section = section
  115. self._last_post = 0
  116. self._url = THREAD_URL.format(section, number)
  117.  
  118. def get_webms(self, fetch_tool):
  119. # inform('Searching for webms. Last post: {}'.format(self._last_post))
  120. webms = []
  121. status, data = fetch_tool(self._url)
  122.  
  123. if status == 404:
  124. return None
  125. elif status != 200:
  126. return webms
  127.  
  128. found_webms_count = 0
  129.  
  130. for post in data['threads'][0]['posts']:
  131.  
  132. if post['num'] <= self._last_post:
  133. continue
  134.  
  135. try:
  136. files = post['files']
  137. except KeyError:
  138. continue
  139.  
  140. for f in files:
  141. if f.get('type', None) == WEBM:
  142. found_webms_count += 1
  143. webm = RESOURCE_URL.format(self._section, f['path'])
  144. thumb = RESOURCE_URL.format(self._section, f['thumbnail'])
  145. md5 = f['md5']
  146. webms.append((webm, thumb, md5))
  147.  
  148. inform('Found {} webms'.format(found_webms_count))
  149. self._last_post = data['threads'][0]['posts'][-1]['num']
  150. return webms
  151.  
  152. def __str__(self):
  153. return self._url
  154.  
  155.  
  156. class Catalog:
  157.  
  158. def __init__(self, section):
  159.  
  160. self._url = CATALOG_URL.format(section)
  161. self._section = section
  162. self._threads = set()
  163.  
  164. def get_threads(self, fetch_tool):
  165. alive_threads = set()
  166. result = []
  167. status, data = fetch_tool(self._url)
  168.  
  169. if status != 200:
  170. return result
  171.  
  172. try:
  173. threads = data['threads']
  174. except KeyError as e:
  175. inform(e, threads.keys(), self._url, level=2)
  176. return result
  177.  
  178. for thread in threads:
  179. number = int(thread['num'])
  180. if number not in self._threads:
  181. result.append(Thread(self._section, number))
  182. self._threads.add(number)
  183.  
  184. alive_threads.add(number)
  185.  
  186. inform('Found {} threads'.format(len(result)))
  187. self._threads = alive_threads
  188. return result
  189.  
  190. def __str__(self):
  191. return self._url
  192.  
  193.  
  194. def except_key_interrupt(function):
  195. def _f(self):
  196. try:
  197. return function(self)
  198. except (KeyboardInterrupt, SystemExit):
  199. return
  200.  
  201. return _f
  202.  
  203.  
  204. class Searcher(Connection):
  205.  
  206. def __init__(self, lock, thread_Q, file_Q):
  207. self._lock = lock
  208. self._thread_Q = thread_Q
  209. self._file_Q = file_Q
  210.  
  211. super().__init__()
  212.  
  213. @except_key_interrupt
  214. def work(self):
  215. while True:
  216. thread = self._thread_Q.get()
  217. if thread == STOP_SIGNAL:
  218. return
  219.  
  220. webms = thread.get_webms(self.get_json)
  221. if webms is not None:
  222. [self._file_Q.put(w) for w in webms]
  223. self._thread_Q.put(thread)
  224.  
  225.  
  226. class Downloader(Connection):
  227.  
  228. def __init__(self, lock, file_Q, log_Q=None, webms=False):
  229. self._lock = lock
  230. self._file_Q = file_Q
  231. self._log_Q = log_Q
  232. self._webms = webms
  233.  
  234. super().__init__()
  235.  
  236. def _log(self, data):
  237. if not self._log_Q:
  238. return
  239.  
  240. with self._lock:
  241. self._log_Q.put(';'.join(data))
  242.  
  243. def _download(self, data):
  244. webm, thumb, md5 = data
  245. filename, webm_file, thumb_file = None, None, None
  246.  
  247. if self._webms:
  248. webm_file = self.get_file(webm)
  249.  
  250. thumb_file = self.get_file(thumb)
  251. filename = os.path.realpath(thumb).split(os.sep)[-1].split('.')[0]
  252.  
  253. return filename, webm_file, thumb_file
  254.  
  255. def work(self, callback=lambda *args: None):
  256. while True:
  257. data = self._file_Q.get()
  258. if data == STOP_SIGNAL:
  259. return
  260.  
  261. filename, webm, thumb = self._download(data)
  262. self._log(data)
  263.  
  264. callback(filename, webm, thumb)
  265.  
  266.  
  267. class Logger:
  268.  
  269. def __init__(self, lock, log):
  270. self._lock = lock
  271. self._log = log
  272. self._time = time.time()
  273. self._wait_time = 5 * 1
  274. self._file = open(datetime.now().strftime('%Y %m %d %H:%M'), 'w')
  275.  
  276. def work(self):
  277. while True:
  278. time.sleep(2)
  279. t = time.time()
  280. if t - self._time > self._wait_time:
  281. self._time = t
  282. with self._lock:
  283. inform('Writing')
  284. lines = []
  285. while not self._log.empty():
  286. lines.append(self._log.get())
  287. self._file.write(
  288. '%s\n' % '\n'.join([str(each) for each in lines]))
  289.  
  290. def __del__(self):
  291. self._file.close()
  292.  
  293.  
  294. class MainWorker(Connection):
  295.  
  296. def __init__(self, sections, searcher=Searcher, downloader=Downloader):
  297. self._lock = threading.RLock()
  298. self._thread_Q = queue.Queue()
  299. self._file_Q = queue.Queue()
  300. self._log_Q = queue.Queue()
  301.  
  302. self._Searcher = searcher
  303. self._Downloader = downloader
  304.  
  305. self.catalogs = [Catalog(section) for section in sections]
  306.  
  307. super().__init__()
  308.  
  309. def _start_workers(self):
  310. for _ in range(SEARCHERS):
  311. s = self._Searcher(self._lock, self._thread_Q, self._file_Q)
  312. t = threading.Thread(target=s.work)
  313. t.start()
  314.  
  315. for _ in range(DOWNLOADERS):
  316. d = self._Downloader(
  317. self._lock, self._file_Q, self._log_Q, webms=False)
  318. t = threading.Thread(target=d.work)
  319. t.start()
  320.  
  321. logger = Logger(self._lock, self._log_Q)
  322. threading.Thread(target=logger.work).start()
  323.  
  324. def _exit(self):
  325. for _ in range(DOWNLOADERS + SEARCHERS):
  326. self._file_Q.put(STOP_SIGNAL)
  327. self._thread_Q.put(STOP_SIGNAL)
  328.  
  329. def work(self):
  330. try:
  331. self._start_workers()
  332. while True:
  333. for catalog in self.catalogs:
  334. [self._thread_Q.put(t)
  335. for t in catalog.get_threads(self.get_json)]
  336. time.sleep(5)
  337. except (KeyboardInterrupt, SystemExit):
  338. self._exit()
  339.  
  340.  
  341. if __name__ == '__main__':
  342. sections = ['vg', 'b', 'a', 'mov']
  343. main_worker = MainWorker(sections)
  344. main_worker.work()
  345.  
  346.  
Time limit exceeded #stdin #stdout 5s 51504KB
stdin
Standard input is empty
stdout
Standard output is empty