Error:KafkaStorageException打开的文件过多

甘苦 2022-08-06 08:22:35 阅读数:773

文件打开error过多kafkastorageexception

问题描述,在Flink集群大数据处理过程中,向Kafka进行生产数据和消费数据;如果Flink处理过程中出现异常,采取相应的重启机制或设置检查点策略;项目启动后,随着设备接入越来越多,kafka的topic动态产生的也越来越多,Flink处理开始出现异常

java.io.IOException: Could not perform checkpoint 87 for operator Sink: Unnamed (34/90)#99.
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1274)
at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointBarrierHandler.notifyCheckpoint(CheckpointBarrierHandler.java:147)
at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.triggerCheckpoint(SingleCheckpointBarrierHandler.java:287)
at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.access$100(SingleCheckpointBarrierHandler.java:64)
at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler$ControllerImpl.triggerGlobalCheckpoint(SingleCheckpointBarrierHandler.java:493)
at org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.triggerGlobalCheckpoint(AbstractAlignedBarrierHandlerState.java:74)
at org.apache.flink.streaming.runtime.io.checkpointing.AbstractAlignedBarrierHandlerState.barrierReceived(AbstractAlignedBarrierHandlerState.java:66)
at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.lambda$processBarrier$2(SingleCheckpointBarrierHandler.java:234)
at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.markCheckpointAlignedAndTransformState(SingleCheckpointBarrierHandler.java:262)
at org.apache.flink.streaming.runtime.io.checkpointing.SingleCheckpointBarrierHandler.processBarrier(SingleCheckpointBarrierHandler.java:231)
at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.handleEvent(CheckpointedInputGate.java:181)
at org.apache.flink.streaming.runtime.io.checkpointing.CheckpointedInputGate.pollNext(CheckpointedInputGate.java:159)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:110)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:496)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:203)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:809)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:761)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:958)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:937)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:766)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.runtime.checkpoint.CheckpointException: Could not complete snapshot 87 for operator Sink: Unnamed (34/90)#99. Failure reason: Checkpoint was declined.
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:265)
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:170)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.snapshotState(AbstractStreamOperator.java:348)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.checkpointStreamOperator(RegularOperatorChain.java:233)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.buildOperatorSnapshotFutures(RegularOperatorChain.java:206)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.snapshotState(RegularOperatorChain.java:186)
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.takeSnapshotSync(SubtaskCheckpointCoordinatorImpl.java:605)
at org.apache.flink.streaming.runtime.tasks.SubtaskCheckpointCoordinatorImpl.checkpointState(SubtaskCheckpointCoordinatorImpl.java:315)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$performCheckpoint$14(StreamTask.java:1329)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:50)
at org.apache.flink.streaming.runtime.tasks.StreamTask.performCheckpoint(StreamTask.java:1315)
at org.apache.flink.streaming.runtime.tasks.StreamTask.triggerCheckpointOnBarrier(StreamTask.java:1258)
... 22 more
Caused by: org.apache.flink.streaming.connectors.kafka.FlinkKafkaException: Failed to send data to Kafka: Expiring 27 record(s) for topic-call_XAucjhIN-0:120000 ms has passed since batch creation
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.checkErroneous(FlinkKafkaProducer.java:1429)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.flush(FlinkKafkaProducer.java:1117)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:1014)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.preCommit(FlinkKafkaProducer.java:102)
at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.snapshotState(TwoPhaseCommitSinkFunction.java:345)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer.snapshotState(FlinkKafkaProducer.java:1122)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.trySnapshotFunctionState(StreamingFunctionUtils.java:118)
at org.apache.flink.streaming.util.functions.StreamingFunctionUtils.snapshotFunctionState(StreamingFunctionUtils.java:99)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.snapshotState(AbstractUdfStreamOperator.java:87)
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.snapshotState(StreamOperatorStateHandler.java:219)
... 33 more
Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 27 record(s) for topic-call_XAucjhIN-0:120000 ms has passed since batch creation

Kafka集群中某一台服务器挂掉,报错信息如下:

[2022-08-01 14:55:22,453] ERROR Error while writing to checkpoint file /home/kafka-logs/fan_sink_29-1/leader-epoch-checkpoint (kafka.server.LogDirFailureChannel)
java.io.FileNotFoundException: /home/kafka-logs/topic_min/leader-epoch-checkpoint.tmp (打开的文件过多)
at java.io.FileOutputStream.open0(Native Method)
at java.io.FileOutputStream.open(FileOutputStream.java:270)
at java.io.FileOutputStream.<init>(FileOutputStream.java:213)
at java.io.FileOutputStream.<init>(FileOutputStream.java:162)
at kafka.server.checkpoints.CheckpointFile.liftedTree1$1(CheckpointFile.scala:94)
at kafka.server.checkpoints.CheckpointFile.write(CheckpointFile.scala:92)
at kafka.server.checkpoints.LeaderEpochCheckpointFile.write(LeaderEpochCheckpointFile.scala:70)
at kafka.server.epoch.LeaderEpochFileCache.flush(LeaderEpochFileCache.scala:292)
at kafka.server.epoch.LeaderEpochFileCache.assign(LeaderEpochFileCache.scala:61)
at kafka.log.Log.$anonfun$maybeAssignEpochStartOffset$1(Log.scala:1368)
at kafka.log.Log.$anonfun$maybeAssignEpochStartOffset$1$adapted(Log.scala:1367)
at scala.Option.foreach(Option.scala:437)
at kafka.log.Log.maybeAssignEpochStartOffset(Log.scala:1367)
at kafka.cluster.Partition.$anonfun$makeLeader$1(Partition.scala:592)
at kafka.cluster.Partition.makeLeader(Partition.scala:547)
at kafka.server.ReplicaManager.$anonfun$makeLeaders$5(ReplicaManager.scala:1568)
at kafka.utils.Implicits$MapExtensionMethods$.$anonfun$forKeyValue$1(Implicits.scala:62)
at scala.collection.mutable.HashMap$Node.foreachEntry(HashMap.scala:633)
at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:499)
at kafka.server.ReplicaManager.makeLeaders(ReplicaManager.scala:1566)
at kafka.server.ReplicaManager.becomeLeaderOrFollower(ReplicaManager.scala:1411)
at kafka.server.KafkaApis.handleLeaderAndIsrRequest(KafkaApis.scala:258)
at kafka.server.KafkaApis.handle(KafkaApis.scala:171)
at kafka.server.KafkaRequestHandler.run(KafkaRequestHandler.scala:74)
at java.lang.Thread.run(Thread.java:748)

处理方案如下:

//查找包含kafka的目录或文件【定位kafka.service】

[[email protected] ~]# cd /

[[email protected] ~]# find / -name *kafka*

/etc/systemd/system/kafka.service

[[email protected] ~]# cd /etc/systemd/system/
//修改配置-增加读取文件大小

[[email protected] ~]# vi kafka.service

#增加最大文件数
LimitNOFILE=65535

[[email protected] ~]# systemctl daemon-reload 

//重启kafka

[[email protected] ~]# systemctl stop kafka

[[email protected] ~]# systemctl start kafka

//查看kafka进程

[[email protected] system]# ps -ef|grep kafka
这里找到kafka进程号为19694

[[email protected] system]# cat /proc/19694/limits
Limit Soft Limit Hard Limit Units
Max cpu time unlimited unlimited seconds
Max file size unlimited unlimited bytes
Max data size unlimited unlimited bytes
Max stack size 8388608 unlimited bytes
Max core file size 0 unlimited bytes
Max resident set unlimited unlimited bytes
Max processes 2062355 2062355 processes
Max open files 65535 65535 files
Max locked memory 65536 65536 bytes
Max address space unlimited unlimited bytes
Max file locks unlimited unlimited locks
Max pending signals 2062355 2062355 signals
Max msgqueue size 819200 819200 bytes
Max nice priority 0 0
Max realtime priority 0 0
Max realtime timeout unlimited unlimited 

Max Open Files  已经变为65535

至此"打开文件过多"问题已处理完毕

版权声明:本文为[甘苦]所创,转载请带上原文链接,感谢。 https://blog.csdn.net/learnworm/article/details/126106548