3种自定义Celery日志记录处理程序的策略

celery 日志设置

3种自定义Celery日志记录处理程序的策略

python日志处理程序可以自定义日志消息,例如,我们想把日志消息写入屏幕,文件和日志管理服务等,在这种情况下,我们能将三个日志处理程序添加到应用程序的根记录器中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
import logging

logger = logging.getLogger()
formatter = logging.Formatter('[%(asctime)s] [%(levelname)s] [%(name)s] %(message)s [%(lineno)d]')

# StreamHandler
sh = logging.StreamHandler()
sh.setFormatter(formatter)
logger.addHandler(sh)

# FileHandler
fh = logging.FileHandler('logs.log')
fh.setFormatter(formatter)
logger.addHandler(fh)

# SysLogHandler
slh = logging.handlers.SysLogHandler(address=('logsN.papertrailapp.com', '...'))
slh.setFormatter(formatter)
logger.addHandler(slh)

在celery 中添加自定义日志处理程序(通常在 celery 中配置日志记录)可能涉及很多麻烦。celery 文档有点少,而且在各种论坛(比如 stackoverflow中也是充满了各种矛盾的答案),各种各样的文章都提出了相当复杂的解决办法和补丁。

在本文,我将展示三种配置 celery 记录器的替代策略,并说明每种策略的工作方式和原因。提供的代码也是可以运行的独立脚本。要求是 python3.6+和 celery4.2.0+结合使用。

首先启动celery

1
celery worker --app=app.app --concurrency=1 --loglevel=INFO

异步方式启动任务

1
python app.py

celery logging

celery logging 比较复杂且不易设置。底层的 python 日志记录系统需要支持 celery 支持的所有并发的设置:eventlet,greenlet,threads 等。但现实的情况是现在的 python 日志记录系统并不支持所有这些不同的配置。

celery 在 celery.app.log 中提供了特殊的get_task_logger 功能。这将返回一个继承自记录器celery的特殊记录器 celery.task,该记录器自动获取任务名称以及唯一 ID 作为日志的一部分。

但是,我们也可以使用标准getlogger方式获取日志记录对象,原因是我们很可能在 celery 或者 web 应用程序中调用代码。如果我们使用 logging.getlogger(name),可以使我们的底层代码与执行代码的上下文保持干净整洁。

第一种策略:增加Celery 记录器

celery 提供了after_setup_logger在Celery设置记录器之后触发的信号,信号传递记录器对象,我们可以方便地自定义处理程序然后添加到记录器中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import os
import logging
from celery import Celery
from celery.signals import after_setup_logger


for f in ['./broker/out', './broker/processed']:
if not os.path.exists(f):
os.makedirs(f)


logger = logging.getLogger(__name__)


app = Celery('app')
app.conf.update({
'broker_url': 'filesystem://',
'broker_transport_options': {
'data_folder_in': './broker/out',
'data_folder_out': './broker/out',
'data_folder_processed': './broker/processed'
}})


@after_setup_logger.connect
def setup_loggers(logger, *args, **kwargs):
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')

# FileHandler
fh = logging.FileHandler('logs.log')
fh.setFormatter(formatter)
logger.addHandler(fh)

# SysLogHandler
slh = logging.handlers.SysLogHandler(address=('logsN.papertrailapp.com', '...'))
slh.setFormatter(formatter)
logger.addHandler(slh)


@app.task()
def add(x, y):
result = x + y
logger.info(f'Add: {x} + {y} = {result}')
return result


if __name__ == '__main__':
task = add.s(x=2, y=3).delay()
print(f'Started task: {task}')

第二种策略:覆盖 celery 根记录器

可以通过连接setup_logging 信号来阻止celery 配置任何记录器,这样,我们就可以完全自定义自己的日志记录配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import os
import logging
from celery import Celery
from celery.signals import setup_logging

app = Celery('app')
app.conf.update({
'broker_url': 'filesystem://',
'broker_transport_options': {
'data_folder_in': './broker/out',
'data_folder_out': './broker/out',
'data_folder_processed': './broker/processed'
}})


for f in ['./broker/out', './broker/processed']:
if not os.path.exists(f):
os.makedirs(f)


logger = logging.getLogger(__name__)


@setup_logging.connect
def setup_loggers(*args, **kwargs):
logger = logging.getLogger()
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')

# StreamHandler
sh = logging.StreamHandler()
sh.setFormatter(formatter)
logger.addHandler(sh)


# FileHandler
fh = logging.FileHandler('logs.log')
fh.setFormatter(formatter)
logger.addHandler(fh)

# SysLogHandler
slh = logging.handlers.SysLogHandler(address=('logsN.papertrailapp.com', '...'))
slh.setFormatter(formatter)
logger.addHandler(slh)



@app.task()
def add(x, y):
result = x + y
logger.info(f'Add: {x} + {y} = {result}')
return result


if __name__ == '__main__':
task = add.s(x=2, y=3).delay()
print(f'Started task: {task}')

第三种策略:停用 celery 记录器配置

另一种解决方案就是让 celery 设计其记录器但是不使用,并防止其劫持根记录器。默认情况下,celery 会在根记录器上先删除所有先前的配置的处理程序。如果要自定义自己的日志处理程序而不会妨碍celery,则可以通过设置禁用此行为 worker_hijack_root_logger=True。这将使我们能够收回对于根记录器的控制权,并退回到标准的 python 记录器设置。但是需要谨慎使用这种方案,因为我们需要确保python 日志记录与 celery 设置完全兼容(event,greenlet,threads 等)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import os
import logging
from celery import Celery
from celery.signals import setup_logging

app = Celery('app')
app.conf.update({
'broker_url': 'filesystem://',
'broker_transport_options': {
'data_folder_in': './broker/out',
'data_folder_out': './broker/out',
'data_folder_processed': './broker/processed'
},
'worker_hijack_root_logger': False})


# setup folder for message broking
for f in ['./broker/out', './broker/processed']:
if not os.path.exists(f):
os.makedirs(f)


formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
logger = logging.getLogger()

# StreamHandler
sh = logging.StreamHandler()
sh.setFormatter(formatter)
logger.addHandler(sh)


# FileHandler
fh = logging.FileHandler('logs.log')
fh.setFormatter(formatter)
logger.addHandler(fh)

# SysLogHandler
slh = logging.handlers.SysLogHandler(address=('logsN.papertrailapp.com', '...'))
slh.setFormatter(formatter)
logger.addHandler(slh)


logger = logging.getLogger(__name__)


@app.task()
def add(x, y):
result = x + y
logger.info(f'Add: {x} + {y} = {result}')
return result


if __name__ == '__main__':
task = add.s(x=2, y=3).delay()
print(f'Started task: {task}')

我们采用第二种方法来定制celery 的日志格式

  • 添加 logging_config
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import logging.config

LOG_CONFIG = {
'version': 1,
'disable_existing_loggers': False,
'formatters': {
'simple': {
'datefmt': '%Y-%m-%d %H:%M:%S',
'format': '{"timestamp": "%(asctime)s", "app": "bs-whatweb", '
'"logger": "%(name)s", "level": "%(levelname)s", '
'"pathname": "%(pathname)s", "module": "%(module)s", '
'"funcName": "%(funcName)s", "lineno": "%(lineno)d", '
'"message": "%(message)s"}'

},
'json': {
'class': 'project.api.tasks.logger.JSONFormatter'
}
},
'handlers': {
'celery': {
'level': 'INFO',
'formatter': 'simple',
'class': 'logging.StreamHandler'
},
'celery_json': {
'level': 'INFO',
'formatter': 'json',
'class': 'logging.StreamHandler'
},
'sentry': {
'level': "CRITICAL",
'formatter': 'simple',
'class': 'raven.handlers.logging.SentryHandler',
'args': ('https://facc2ededdfa45ba955dca1eb485915a@sentry.xxx.org/7',)
},
},
'loggers': {
'celery_logger': {
'handlers': ['celery_json'],
'level': 'INFO',
'propagate': False,
},
'celery.task': {
'handlers': ['celery_json'],
'level': 'INFO',
'propagate': False,
},
'celery.worker': {
'handlers': ['celery_json'],
'level': 'INFO',
'propagate': False,
},
'celery': {
'handlers': ['celery_json'],
'level': 'INFO',
'propagate': False,
},
'project': {
'handlers': ['celery_json'],
'level': 'INFO',
'propagate': False,
},
}
}

logging.config.dictConfig(LOG_CONFIG)
  • 设置 JSON 格式化,并添加 task_id 及 task_name 两个参数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
def __init__(self, tags=None, hostname=None, fqdn=False, message_type='JSON',
indent=None):
super().__init__()
*********************新增******************
try:
from celery._state import get_current_task
self.get_current_task = get_current_task
except ImportError:
self.get_current_task = lambda: None
******************************************
"""
:param tags: a list of tags to add to every messages
:hostname: force a specific hostname
:fqdn: a boolean to use the FQDN instead of the machine's hostname
:message_type: the message type for Logstash formatters
:indent: indent level of the JSON output
"""
self.message_type = message_type
self.tags = tags if tags is not None else []
self.extra_tags = []
self.indent = indent

if hostname:
self.host = hostname
elif fqdn:
self.host = socket.getfqdn()
else:
self.host = socket.gethostname()

def format(self, record, serialize=True):
****************************新增***********************
task = self.get_current_task()
if task and task.request:
record.__dict__.update(task_id=task.request.id,
task_name=task.name)

else:
record.__dict__.setdefault('task_name', '')
record.__dict__.setdefault('task_id', '')
if record.__dict__.get("data"):
record.__dict__.pop("data")
*****************************************************
new_message = record.getMessage()
# Create message dict
message = {
'timestamp': self.format_timestamp(record.created),
'app': os.environ.get('APP_NAME'),
'host': self.host,
'environment': os.environ.get('FLASK_ENV'),
'logger': record.name,
'level': record.levelname,
'message': new_message,
'path': record.pathname,
'tags': self.tags[:]
}

# Add extra fields
message.update(self.get_extra_fields(record))

# Add extra tags
if self.extra_tags:
message['tags'].extend(self.extra_tags)

# If exception, add debug info
if record.exc_info or record.exc_text:
message.update(self.get_debug_fields(record))

if serialize:
return self.serialize(message, indent=self.indent)
return message

  • 拦截 celery 信号
1
2
3
4
from celery.signals import setup_logging
@setup_logging.connect
def setup_logger(*args, **kwargs):
from project.api.tasks import logging_config