fork download
  1. import os
  2. import logging
  3. from datetime import datetime, timedelta
  4. import pandas as pd
  5. import glob
  6. import airflow
  7. from airflow.models import DAG
  8. from airflow.operators.dummy_operator import DummyOperator
  9. from airflow.operators.python_operator import PythonOperator
  10. from airflow.utils.db import provide_session
  11.  
  12. logging.basicConfig(format='%(levelname)s: %(asctime)s %(message)s', filename='updaglog.log', level=logging.debug)
  13.  
  14. ##############################################################################################################################
  15.  
  16. def function_1():
  17. fst = pd.read_excel('/folder/list.xlsx')
  18.  
  19. old_files = glob.glob1('/folder/', 'up_dag_**')
  20. for f in old_files: os.remove(f)
  21.  
  22. now_date = datetime.now().strftime('%Y%m-%H%M%S')
  23. upload_date = fst.T
  24. upload_date.to_excel('/folder/up_dag_' + now_date + '_.xlsx')
  25.  
  26. return True
  27.  
  28. def function_2():
  29. lst = pd.read_excel('/folder/list.xlsx')
  30.  
  31. old_files = glob.glob1('/folder/', 'up2_dag_**')
  32. for f in old_files: os.remove(f)
  33.  
  34. now_date = datetime.now().strftime('%Y%m-%H%M%S')
  35. upload_date = lst.T
  36. upload_date.to_excel('/folder/up2_dag_' + now_date + '_.xlsx')
  37.  
  38. return True
  39.  
  40.  
  41. ###############################################################################################################################
  42.  
  43. times = timedelta(hours=1)
  44.  
  45. dag = DAG(
  46. dag_id='TSnew_test_dag',
  47. description = '',
  48. start_date = datetime(2020, 8, 31),
  49. schedule_interval = timedelta(hours=1)
  50. )
  51.  
  52. with dag:
  53. load_data_node = PythonOperator(
  54. task_id='fst_unit',
  55. python_callable=function_1,
  56. provide_context=True
  57. )
  58.  
  59. load2_data_node = PythonOperator(
  60. task_id='lst_unit',
  61. python_callable=function_2,
  62. provide_context=True
  63. )
  64.  
  65. load_data_node >> load2_data_node
Runtime error #stdin #stdout #stderr 0.54s 65948KB
stdin
Standard input is empty
stdout
Standard output is empty
stderr
Traceback (most recent call last):
  File "./prog.py", line 6, in <module>
ModuleNotFoundError: No module named 'airflow'