作业完成 Kafka 插件指南
在配置后,jobcomp/kafka 插件尝试将每个已完成作业记录的部分字段发布到 Kafka 服务器。从 Slurm 25.05 开始,还可以选择在作业首次开始运行时发送作业记录字段的子集。
要求
该插件在每次尝试发布之前将字段子集序列化为 JSON。序列化是使用 Slurm 序列化插件完成的,因此 libjson-c 开发文件是该插件的间接先决条件。
该插件将部分客户端生产者工作卸载到 librdkafka 并使用其 API,因此库开发文件是另一个先决条件。
- librdkafka 开发文件
- libjson-c 开发文件
配置
该插件使用以下 slurm.conf 选项进行配置:
-
JobCompType 应设置为 jobcomp/kafka。
JobCompType=jobcomp/kafka
-
JobCompLoc 该字符串表示包含 'key=value' 对的文件的绝对路径,用于配置 librdkafka 属性。为了使插件正常工作,该文件必须存在,并且至少需要配置 bootstrap.servers 属性。
JobCompLoc=/arbitrary/path/to/rdkafka.conf
注意:当配置此插件时,JobCompLoc 没有默认值,因此需要显式设置。
注意:此选项引用的文件中配置的 librdkafka 参数在 slurmctld 重启时生效。
注意:该插件不会验证这些参数,但如果传递给库 API 函数 rd_kafka_conf_set() 的任何参数失败,则只会记录错误并失败。
示例配置文件可能如下所示:
bootstrap.servers=kafkahost1:9092 debug=broker,topic,msg linger.ms=400 log_level=7
-
JobCompParams 逗号分隔的额外可配置参数列表。有关具体细节,请参阅 slurm.conf 手册页。示例:
JobCompParams=flush_timeout=200,poll_interval=3,requeue_on_msg_timeout, topic=mycluster
注意:对该选项的更改不需要重启 slurmctld。重新配置或 SIGHUP 就足够使其生效。
注意:请参阅手册页以配置作业启动事件。
-
DebugFlags 可选的 JobComp 调试标志,用于额外的插件特定日志记录。
DebugFlags=JobComp
插件功能
对于每个完成(或可选开始运行)的作业,执行插件 jobcomp_p_record_job_[end|start] 操作。作业记录字段的子集通过 Slurm 序列化插件序列化为 JSON 字符串。然后尝试使用 librdkafka rd_kafka_producev() API 调用生成序列化字符串。
即使 Kafka 服务器宕机,也可以向 librdkafka 生成消息。但是,从此调用返回的错误会导致消息被丢弃。生成的消息在 librdkafka 输出队列中累积,最多 "linger.ms" 毫秒(可配置的 librdkafka 参数)后,才会从累积的消息中构建消息集。
然后,librdkafka 库发送生成请求。在未收到 "ACK" 之前,根据库文档,消息在概念上被视为 "在飞行中"。然后库接收生成响应,可以通过以下两种方式之一进行处理:
- 可重试错误
- 如果没有达到库限制参数,库将自动尝试重试。
- 永久错误 或 成功
- 消息将从库输出队列中删除,并被暂存到库交付报告队列中。
以下图示说明了所描述的功能:

jobcomp/kafka 插件有一个后台轮询处理线程,定期调用 librdkafka API rd_kafka_poll() 函数。线程调用的频率可以通过 JobCompParams=poll_interval 配置。该调用使得库交付报告队列中的消息被拉取并处理回插件交付报告回调,根据库设置的错误消息采取不同的操作。默认情况下,如果启用 DebugFlags=JobComp,成功的消息仅被记录,而具有永久错误的消息则被丢弃,除非错误是消息超时,并且 JobCompParams 配置了 "requeue_on_msg_timeout",这将指示回调尝试再次生成该消息。
在插件终止时,执行 fini() 操作。调用 rd_kafka_purge() 库 API 函数以清除 librdkafka 输出队列消息。还调用 rd_kafka_flush() API 调用,该调用等待直到所有未完成的生成请求(以及可能的其他类型请求)完成。等待的时间也可以通过 JobCompParams=flush_timeout 参数进行配置。被清除的消息始终保存在 Slurm StateSaveLocation 中的插件状态文件中,而在 "在飞行中" 时被清除的消息将被丢弃。
注意:您必须使用 librdkafka v1.0.0 或更高版本才能利用上述清除功能。在以前的版本中,无法在关闭时将输出队列清除到状态文件,这意味着在 kafka 插件终止之前未交付的任何消息将丢失。
在插件初始化时,解析配置后,加载状态中的保存消息并尝试再次生成。因此,未交付的消息应该能够抵御 slurmctld 重启。
Kafka 代理 "host:port" 列表应在上述 JobCompLoc 选项引用的文件中显式配置。默认主题是配置的 Slurm ClusterName,但也可以通过 JobCompParams=topic 参数进行配置。
jobcomp/kafka 插件主要将信息消息记录到 JobComp DebugFlag,错误消息除外。librdkafka 默认记录到应用程序的 stderr,但该插件将库配置为强制记录到 syslog。库的日志级别和调试上下文也可以通过 JobCompLoc 引用的文件进行配置,以及其他库配置参数。
最后修改于 2025 年 2 月 3 日