fork download
  1. import datetime
  2. import calendar
  3.  
  4. __all__ = ["CronExpression", "parse_atom", "DEFAULT_EPOCH", "SUBSTITUTIONS"]
  5. __license__ = "Public Domain"
  6.  
  7. DAY_NAMES = zip(('sun', 'mon', 'tue', 'wed', 'thu', 'fri', 'sat'), xrange(7))
  8. MINUTES = (0, 59)
  9. HOURS = (0, 23)
  10. DAYS_OF_MONTH = (1, 31)
  11. MONTHS = (1, 12)
  12. DAYS_OF_WEEK = (0, 6)
  13. L_FIELDS = (DAYS_OF_WEEK, DAYS_OF_MONTH)
  14. FIELD_RANGES = (MINUTES, HOURS, DAYS_OF_MONTH, MONTHS, DAYS_OF_WEEK)
  15. MONTH_NAMES = zip(('jan', 'feb', 'mar', 'apr', 'may', 'jun',
  16. 'jul', 'aug', 'sep', 'oct', 'nov', 'dec'), xrange(1, 13))
  17. DEFAULT_EPOCH = (1970, 1, 1, 0, 0, 0)
  18. SUBSTITUTIONS = {
  19. "@yearly": "0 0 1 1 *",
  20. "@anually": "0 0 1 1 *",
  21. "@monthly": "0 0 1 * *",
  22. "@weekly": "0 0 * * 0",
  23. "@daily": "0 0 * * *",
  24. "@midnight": "0 0 * * *",
  25. "@hourly": "0 * * * *"
  26. }
  27.  
  28. class CronExpression(object):
  29. def __init__(self, line, epoch=DEFAULT_EPOCH, epoch_utc_offset=0):
  30. """
  31. Instantiates a CronExpression object with an optionally defined epoch.
  32. If the epoch is defined, the UTC offset can be specified one of two
  33. ways: as the sixth element in 'epoch' or supplied in epoch_utc_offset.
  34. The epoch should be defined down to the minute sorted by
  35. descending significance.
  36. """
  37. for key, value in SUBSTITUTIONS.items():
  38. if line.startswith(key):
  39. line = line.replace(key, value)
  40. break
  41.  
  42. fields = line.split(None, 5)
  43. if len(fields) == 5:
  44. fields.append('')
  45.  
  46. minutes, hours, dom, months, dow, self.comment = fields
  47.  
  48. dow = dow.replace('7', '0').replace('?', '*')
  49. dom = dom.replace('?', '*')
  50.  
  51. for monthstr, monthnum in MONTH_NAMES:
  52. months = months.lower().replace(monthstr, str(monthnum))
  53.  
  54. for dowstr, downum in DAY_NAMES:
  55. dow = dow.lower().replace(dowstr, str(downum))
  56.  
  57. self.string_tab = [minutes, hours, dom.upper(), months, dow.upper()]
  58. self.compute_numtab()
  59. if len(epoch) == 5:
  60. y, mo, d, h, m = epoch
  61. self.epoch = (y, mo, d, h, m, epoch_utc_offset)
  62. else:
  63. self.epoch = epoch
  64.  
  65. def __str__(self):
  66. base = self.__class__.__name__ + "(%s)"
  67. cron_line = self.string_tab + [str(self.comment])
  68. if not self.comment:
  69. cron_line.pop()
  70. arguments = '"' + ' '.join(cron_line) + '"'
  71. if self.epoch != DEFAULT_EPOCH:
  72. return base % (arguments + ", epoch=" + repr(self.epoch))
  73. else:
  74. return base % arguments
  75.  
  76. def __repr__(self):
  77. return str(self)
  78.  
  79. def compute_numtab(self):
  80. """
  81. Recomputes the sets for the static ranges of the trigger time.
  82.  
  83. This method should only be called by the user if the string_tab
  84. member is modified.
  85. """
  86. self.numerical_tab = []
  87.  
  88. for field_str, span in zip(self.string_tab, FIELD_RANGES):
  89. split_field_str = field_str.split(',')
  90. if len(split_field_str) > 1 and "*" in split_field_str:
  91. raise ValueError("\"*\" must be alone in a field.")
  92.  
  93. unified = set()
  94. for cron_atom in split_field_str:
  95. # parse_atom only handles static cases
  96. for special_char in ('%', '#', 'L', 'W'):
  97. if special_char in cron_atom:
  98. break
  99. else:
  100. unified.update(parse_atom(cron_atom, span))
  101.  
  102. self.numerical_tab.append(unified)
  103.  
  104. if self.string_tab[2] == "*" and self.string_tab[4] != "*":
  105. self.numerical_tab[2] = set()
  106.  
  107. def check_trigger(self, date_tuple, utc_offset=0):
  108. """
  109. Returns boolean indicating if the trigger is active at the given time.
  110. The date tuple should be in the local time. Unless periodicities are
  111. used, utc_offset does not need to be specified. If periodicities are
  112. used, specifically in the hour and minutes fields, it is crucial that
  113. the utc_offset is specified.
  114. """
  115. year, month, day, hour, mins = date_tuple
  116. given_date = datetime.date(year, month, day)
  117. zeroday = datetime.date(*self.epoch[:3])
  118. last_dom = calendar.monthrange(year, month)[-1]
  119. dom_matched = True
  120.  
  121. # In calendar and datetime.date.weekday, Monday = 0
  122. given_dow = (datetime.date.weekday(given_date) + 1) % 7
  123. first_dow = (given_dow + 1 - day) % 7
  124.  
  125. # Figure out how much time has passed from the epoch to the given date
  126. utc_diff = utc_offset - self.epoch[5]
  127. mod_delta_yrs = year - self.epoch[0]
  128. mod_delta_mon = month - self.epoch[1] + mod_delta_yrs * 12
  129. mod_delta_day = (given_date - zeroday).days
  130. mod_delta_hrs = hour - self.epoch[3] + mod_delta_day * 24 + utc_diff
  131. mod_delta_min = mins - self.epoch[4] + mod_delta_hrs * 60
  132.  
  133. # Makes iterating through like components easier.
  134. quintuple = zip(
  135. (mins, hour, day, month, given_dow),
  136. self.numerical_tab,
  137. self.string_tab,
  138. (mod_delta_min, mod_delta_hrs, mod_delta_day, mod_delta_mon,
  139. mod_delta_day),
  140. FIELD_RANGES)
  141.  
  142. for value, valid_values, field_str, delta_t, field_type in quintuple:
  143. # All valid, static values for the fields are stored in sets
  144. if value in valid_values:
  145. continue
  146.  
  147. # The following for loop implements the logic for context
  148. # sensitive and epoch sensitive constraints. break statements,
  149. # which are executed when a match is found, lead to a continue
  150. # in the outer loop. If there are no matches found, the given date
  151. # does not match expression constraints, so the function returns
  152. # False as seen at the end of this for...else... construct.
  153. for cron_atom in field_str.split(','):
  154. if cron_atom[0] == '%':
  155. if not(delta_t % int(cron_atom[1:])):
  156. break
  157.  
  158. elif field_type == DAYS_OF_WEEK and '#' in cron_atom:
  159. D, N = int(cron_atom[0]), int(cron_atom[2])
  160. # Computes Nth occurence of D day of the week
  161. if (((D - first_dow) % 7) + 1 + 7 * (N - 1)) == day:
  162. break
  163.  
  164. elif field_type == DAYS_OF_MONTH and cron_atom[-1] == 'W':
  165. target = min(int(cron_atom[:-1]), last_dom)
  166. lands_on = (first_dow + target - 1) % 7
  167. if lands_on == 0:
  168. # Shift from Sun. to Mon. unless Mon. is next month
  169. target += 1 if target < last_dom else -2
  170. elif lands_on == 6:
  171. # Shift from Sat. to Fri. unless Fri. in prior month
  172. target += -1 if target > 1 else 2
  173.  
  174. # Break if the day is correct, and target is a weekday
  175. if target == day and (first_dow + target - 7) % 7 > 1:
  176. break
  177.  
  178. elif field_type in L_FIELDS and cron_atom.endswith('L'):
  179. # In dom field, L means the last day of the month
  180. target = last_dom
  181.  
  182. if field_type == DAYS_OF_WEEK:
  183. # Calculates the last occurence of given day of week
  184. desired_dow = int(cron_atom[:-1])
  185. target = (((desired_dow - first_dow) % 7) + 29)
  186. target -= 7 if target > last_dom else 0
  187.  
  188. if target == day:
  189. break
  190. else:
  191. # See 2010.11.15 of CHANGELOG
  192. if field_type == DAYS_OF_MONTH and self.string_tab[4] != '*':
  193. dom_matched = False
  194. continue
  195. elif field_type == DAYS_OF_WEEK and self.string_tab[2] != '*':
  196. # If we got here, then days of months validated so it does
  197. # not matter that days of the week failed.
  198. return dom_matched
  199.  
  200. # None of the expressions matched which means this field fails
  201. return False
  202.  
  203. # Arriving at this point means the date landed within the constraints
  204. # of all fields; the associated trigger should be fired.
  205. return True
  206.  
  207.  
  208. def parse_atom(parse, minmax):
  209. """
  210. Returns a set containing valid values for a given cron-style range of
  211. numbers. The 'minmax' arguments is a two element iterable containing the
  212. inclusive upper and lower limits of the expression.
  213.  
  214. Examples:
  215. >>> parse_atom("1-5",(0,6))
  216. set([1, 2, 3, 4, 5])
  217.  
  218. >>> parse_atom("*/6",(0,23))
  219. set([0, 6, 12, 18])
  220.  
  221. >>> parse_atom("18-6/4",(0,23))
  222. set([18, 22, 0, 4])
  223.  
  224. >>> parse_atom("*/9",(0,23))
  225. set([0, 9, 18])
  226. """
  227. parse = parse.strip()
  228. increment = 1
  229. if parse == '*':
  230. return set(xrange(minmax[0], minmax[1] + 1))
  231. elif parse.isdigit():
  232. # A single number still needs to be returned as a set
  233. value = int(parse)
  234. if value >= minmax[0] and value <= minmax[1]:
  235. return set((value,))
  236. else:
  237. raise ValueError("Invalid bounds: \"%s\"" % parse)
  238. elif '-' in parse or '/' in parse:
  239. divide = parse.split('/')
  240. subrange = divide[0]
  241. if len(divide) == 2:
  242. # Example: 1-3/5 or */7 increment should be 5 and 7 respectively
  243. increment = int(divide[1])
  244.  
  245. if '-' in subrange:
  246. # Example: a-b
  247. prefix, suffix = [int(n) for n in subrange.split('-')]
  248. if prefix < minmax[0] or suffix > minmax[1]:
  249. raise ValueError("Invalid bounds: \"%s\"" % parse)
  250. elif subrange == '*':
  251. # Include all values with the given range
  252. prefix, suffix = minmax
  253. else:
  254. raise ValueError("Unrecognized symbol: \"%s\"" % subrange)
  255.  
  256. if prefix < suffix:
  257. # Example: 7-10
  258. return set(xrange(prefix, suffix + 1, increment))
  259. else:
  260. # Example: 12-4/2; (12, 12 + n, ..., 12 + m*n) U (n_0, ..., 4)
  261. noskips = list(xrange(prefix, minmax[1] + 1))
  262. noskips+= list(xrange(minmax[0], suffix + 1))
  263. return set(noskips[::increment])
  264.  
Compilation error #stdin compilation error #stdout 0s 0KB
stdin
Standard input is empty
compilation info
Traceback (most recent call last):
  File "/usr/lib/python3.7/py_compile.py", line 143, in compile
    _optimize=optimize)
  File "<frozen importlib._bootstrap_external>", line 791, in source_to_code
  File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
  File "./prog.py", line 67
    cron_line = self.string_tab + [str(self.comment])
                                                   ^
SyntaxError: invalid syntax

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/usr/lib/python3.7/py_compile.py", line 147, in compile
    raise py_exc
py_compile.PyCompileError:   File "./prog.py", line 67
    cron_line = self.string_tab + [str(self.comment])
                                                   ^
SyntaxError: invalid syntax

stdout
Standard output is empty