>>> socket.gethostbyname (socket.gethostname ()) '10 .185.143.196 '>>> socket.gethostname ()' ip-10-185-143-196 '>>> socket.getfqdn ()' ip-10- 185-143-196'
ользую кластерную среду Airflow, где у меня есть четыре экземпляра AWS ec2 для серверов.
EC2-экземпляры
Сервер 1: Веб-сервер, Планировщик, Очередь Redis, База данных PostgreSQLСервер 2: веб-серверСервер 3: РабочийСервер 4: РабочийМоя установка работала отлично в течение трех месяцев, но время от времени, примерно раз в неделю, я получаю исключение Broken Pipe, когда Airflow пытается что-то записать.
*** Log file isn't local.
*** Fetching here: http://ip-1-2-3-4:8793/log/foobar/task_1/2018-07-13T00:00:00/1.log
[2018-07-16 00:00:15,521] {cli.py:374} INFO - Running on host ip-1-2-3-4
[2018-07-16 00:00:15,698] {models.py:1197} INFO - Dependencies all met for <TaskInstance: foobar.task_1 2018-07-13 00:00:00 [queued]>
[2018-07-16 00:00:15,710] {models.py:1197} INFO - Dependencies all met for <TaskInstance: foobar.task_1 2018-07-13 00:00:00 [queued]>
[2018-07-16 00:00:15,710] {models.py:1407} INFO -
--------------------------------------------------------------------------------
Starting attempt 1 of 1
--------------------------------------------------------------------------------
[2018-07-16 00:00:15,719] {models.py:1428} INFO - Executing <Task(OmegaFileSensor): task_1> on 2018-07-13 00:00:00
[2018-07-16 00:00:15,720] {base_task_runner.py:115} INFO - Running: ['bash', '-c', 'airflow run foobar task_1 2018-07-13T00:00:00 --job_id 1320 --raw -sd DAGS_FOLDER/datalake_digitalplatform_arl_workflow_schedule_test_2.py']
[2018-07-16 00:00:16,532] {base_task_runner.py:98} INFO - Subtask: [2018-07-16 00:00:16,532] {configuration.py:206} WARNING - section/key [celery/celery_ssl_active] not found in config
[2018-07-16 00:00:16,532] {base_task_runner.py:98} INFO - Subtask: [2018-07-16 00:00:16,532] {default_celery.py:41} WARNING - Celery Executor will run without SSL
[2018-07-16 00:00:16,534] {base_task_runner.py:98} INFO - Subtask: [2018-07-16 00:00:16,533] {__init__.py:45} INFO - Using executor CeleryExecutor
[2018-07-16 00:00:16,597] {base_task_runner.py:98} INFO - Subtask: [2018-07-16 00:00:16,597] {models.py:189} INFO - Filling up the DagBag from /home/ec2-user/airflow/dags/datalake_digitalplatform_arl_workflow_schedule_test_2.py
[2018-07-16 00:00:16,768] {cli.py:374} INFO - Running on host ip-1-2-3-4
[2018-07-16 00:16:24,931] {logging_mixin.py:84} WARNING - --- Logging error ---
[2018-07-16 00:16:24,931] {logging_mixin.py:84} WARNING - Traceback (most recent call last):
[2018-07-16 00:16:24,931] {logging_mixin.py:84} WARNING - File "/usr/lib64/python3.6/logging/__init__.py", line 996, in emit
self.flush()
[2018-07-16 00:16:24,932] {logging_mixin.py:84} WARNING - File "/usr/lib64/python3.6/logging/__init__.py", line 976, in flush
self.stream.flush()
[2018-07-16 00:16:24,932] {logging_mixin.py:84} WARNING - BrokenPipeError: [Errno 32] Broken pipe
[2018-07-16 00:16:24,932] {logging_mixin.py:84} WARNING - Call stack:
[2018-07-16 00:16:24,933] {logging_mixin.py:84} WARNING - File "/usr/bin/airflow", line 27, in <module>
args.func(args)
[2018-07-16 00:16:24,934] {logging_mixin.py:84} WARNING - File "/usr/local/lib/python3.6/site-packages/airflow/bin/cli.py", line 392, in run
pool=args.pool,
[2018-07-16 00:16:24,934] {logging_mixin.py:84} WARNING - File "/usr/local/lib/python3.6/site-packages/airflow/utils/db.py", line 50, in wrapper
result = func(*args, **kwargs)
[2018-07-16 00:16:24,934] {logging_mixin.py:84} WARNING - File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1488, in _run_raw_task
result = task_copy.execute(context=context)
[2018-07-16 00:16:24,934] {logging_mixin.py:84} WARNING - File "/usr/local/lib/python3.6/site-packages/airflow/operators/sensors.py", line 78, in execute
while not self.poke(context):
[2018-07-16 00:16:24,934] {logging_mixin.py:84} WARNING - File "/home/ec2-user/airflow/plugins/custom_plugins.py", line 35, in poke
directory = os.listdir(full_path)
[2018-07-16 00:16:24,934] {logging_mixin.py:84} WARNING - File "/usr/local/lib/python3.6/site-packages/airflow/utils/timeout.py", line 36, in handle_timeout
self.log.error("Process timed out")
[2018-07-16 00:16:24,934] {logging_mixin.py:84} WARNING - Message: 'Process timed out'
Arguments: ()
[2018-07-16 00:16:24,942] {models.py:1595} ERROR - Timeout
Traceback (most recent call last):
File "/usr/local/lib/python3.6/site-packages/airflow/models.py", line 1488, in _run_raw_task
result = task_copy.execute(context=context)
File "/usr/local/lib/python3.6/site-packages/airflow/operators/sensors.py", line 78, in execute
while not self.poke(context):
File "/home/ec2-user/airflow/plugins/custom_plugins.py", line 35, in poke
directory = os.listdir(full_path)
File "/usr/local/lib/python3.6/site-packages/airflow/utils/timeout.py", line 37, in handle_timeout
raise AirflowTaskTimeout(self.error_message)
airflow.exceptions.AirflowTaskTimeout: Timeout
[2018-07-16 00:16:24,942] {models.py:1624} INFO - Marking task as FAILED.
[2018-07-16 00:16:24,956] {models.py:1644} ERROR - Timeout
Иногда ошибка также скажет
*** Log file isn't local.
*** Fetching here: http://ip-1-2-3-4:8793/log/foobar/task_1/2018-07-12T00:00:00/1.log
*** Failed to fetch log file from worker. 404 Client Error: NOT FOUND for url: http://ip-1-2-3-4:8793/log/foobar/task_1/2018-07-12T00:00:00/1.log
Я не уверен, почему журналы работают ~ 95% времени, но иногда случаются сбои. Вот мои настройки журнала в моем файле Airflow.cfg,
# The folder where airflow should store its log files
# This path must be absolute
base_log_folder = /home/ec2-user/airflow/logs
# Airflow can store logs remotely in AWS S3 or Google Cloud Storage. Users
# must supply an Airflow connection id that provides access to the storage
# location.
remote_log_conn_id =
encrypt_s3_logs = False
# Logging level
logging_level = INFO
# Logging class
# Specify the class that will specify the logging configuration
# This class has to be on the python classpath
# logging_config_class = my.path.default_local_settings.LOGGING_CONFIG
logging_config_class =
# Log format
log_format = [%%(asctime)s] {%%(filename)s:%%(lineno)d} %%(levelname)s - %%(message)s
simple_log_format = %%(asctime)s %%(levelname)s - %%(message)s
# Name of handler to read task instance logs.
# Default to use file task handler.
task_log_reader = file.task
# Log files for the gunicorn webserver. '-' means log to stderr.
access_logfile = -
error_logfile =
# The amount of time (in secs) webserver will wait for initial handshake
# while fetching logs from other worker machine
log_fetch_timeout_sec = 5
# When you start an airflow worker, airflow starts a tiny web server
# subprocess to serve the workers local log files to the airflow main
# web server, who then builds pages and sends them to users. This defines
# the port on which the logs are served. It needs to be unused, and open
# visible from the main web server to connect into the workers.
worker_log_server_port = 8793
# How often should stats be printed to the logs
print_stats_interval = 30
child_process_log_directory = /home/ec2-user/airflow/logs/scheduler
Мне интересно, возможно, мне стоит попробовать другую технику для ведения журналов, например, запись в S3 Bucket или есть что-то еще, что я могу сделать, чтобы решить эту проблему.
Обновить:
Запись журналов в S3 не решила эту проблему. Кроме того, ошибка является более последовательной (все еще спорадической). Это происходит примерно в 50% случаев. Одна вещь, которую я заметил, заключается в том, что задача, над которой она выполняется, - это задача создания AWS EMR. Запуск кластера EMS AWS занимает около 20 минут, а затем задача должна дождаться запуска команд Spark в кластере EMR. Таким образом, одна задача выполняется около 30 минут. Мне интересно, если это слишком долго для запуска задачи Airflow, и поэтому она начинает сбой записи в журналы. Если это так, то я мог бы разбить задачу EMR, чтобы была одна задача для создания EMR, а затем другая задача для команд Spark в кластере EMR.
Примечание:
Я также создал новый билет с ошибкой на Jira Airflow здесьhttps://issues.apache.org/jira/browse/AIRFLOW-2844