当前位置: 首页 > news >正文

做视频资源网站有哪些济南网站制作公司

做视频资源网站有哪些,济南网站制作公司,应用软件免费下载,wordpress漏洞复现Kafka发送消息是异步发送的#xff0c;所以我们不知道消息是否发送成功#xff0c;所以会可能造成消息丢失。而且Kafka架构是由生产者-服务器端-消费者三种组成部分构成的。要保证消息不丢失#xff0c;那么主要有三种解决方法。 生产者(producer)端处理 生产者默认发送消息… Kafka发送消息是异步发送的所以我们不知道消息是否发送成功所以会可能造成消息丢失。而且Kafka架构是由生产者-服务器端-消费者三种组成部分构成的。要保证消息不丢失那么主要有三种解决方法。 生产者(producer)端处理 生产者默认发送消息代码如下 import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties;public class KafkaMessageProducer {public static void main(String[] args) {// 配置Kafka生产者Properties props new Properties();props.put(bootstrap.servers, localhost:9092); // Kafka集群地址props.put(key.serializer, org.apache.kafka.common.serialization.StringSerializer); // 键的序列化器props.put(value.serializer, org.apache.kafka.common.serialization.StringSerializer); // 值的序列化器// 创建Kafka生产者实例ProducerString, String producer new KafkaProducer(props);String topic test; // Kafka主题try {// 发送消息到Kafkafor (int i 0; i 10; i) {String message Message i;ProducerRecordString, String record new ProducerRecord(topic, message);producer.send(record);System.out.println(Sent message: message);}} catch (Exception e) {e.printStackTrace();} finally {// 关闭Kafka生产者producer.close();}} } 生产者端要保证消息发送成功可以有两个方法 1.把异步发送改成同步发送这样producer就能实时知道消息的发送结果。 要将 Kafka 发送方法改为同步发送可以使用 send() 方法的返回值FutureRecordMetadata 并调用 get() 方法来等待发送完成。 以下是将 Kafka 发送方法改为同步发送的示例代码 import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; import java.util.concurrent.ExecutionException; import org.apache.kafka.clients.producer.RecordMetadata;public class KafkaMessageProducer {public static void main(String[] args) {// 配置 Kafka 生产者Properties props new Properties();props.put(bootstrap.servers, localhost:9092); // Kafka 集群地址props.put(key.serializer, org.apache.kafka.common.serialization.StringSerializer); // 键的序列化器props.put(value.serializer, org.apache.kafka.common.serialization.StringSerializer); // 值的序列化器// 创建 Kafka 生产者实例ProducerString, String producer new KafkaProducer(props);String topic test; // Kafka 主题try {// 发送消息到 Kafkafor (int i 0; i 10; i) {String message Message i;ProducerRecordString, String record new ProducerRecord(topic, message);RecordMetadata metadata producer.send(record).get(); // 同步发送并等待发送完成System.out.println(Sent message: message , offset: metadata.offset());}} catch (InterruptedException | ExecutionException e) {e.printStackTrace();} finally {// 关闭 Kafka 生产者producer.close();}} } 在这个示例代码中通过调用 send(record).get() 实现了同步发送其中 get() 方法会阻塞当前线程直到发送完成并返回消息的元数据。 2.添加异步回调函数来监听消息发送的结果如果发送失败可以在回调函数里重新发送。 要保持发送消息成功并添加回调函数你可以在发送消息的时候指定一个回调函数作为参数。回调 函数将在消息发送完成后被调用以便你可以在回调函数中处理发送结果。 import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.RecordMetadata;public class KafkaMessageProducer {public static void main(String[] args) {// 配置 Kafka 生产者Properties props new Properties();props.put(bootstrap.servers, localhost:9092); // Kafka 集群地址props.put(key.serializer, org.apache.kafka.common.serialization.StringSerializer); // 键的序列化器props.put(value.serializer, org.apache.kafka.common.serialization.StringSerializer); // 值的序列化器// 创建 Kafka 生产者实例ProducerString, String producer new KafkaProducer(props);String topic test; // Kafka 主题try {// 发送消息到 Kafkafor (int i 0; i 10; i) {String message Message i;ProducerRecordString, String record new ProducerRecord(topic, message);// 发送消息并指定回调函数producer.send(record, new Callback() {Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {if (exception null) {System.out.println(Sent message: message , offset: metadata.offset());} else {// 这里重新发送消息producer.send(record);exception.printStackTrace();}}});}} finally {// 关闭 Kafka 生产者producer.close();}} } 在这个示例代码中我们使用了 send(record, callback) 方法来发送消息并传递了一个实现了 Callback 接口的匿名内部类作为回调函数。当消息发送完成后回调函数的 onCompletion() 方法会被调用。你可以根据 RecordMetadata 和 Exception 参数来处理发送结果。另外producer还提供了一个重试参数这个参数叫retries如果因为网络问题或者Broker故障导致producer发送消息失败那么producer会根据这个参数的值进行重试发送消息。 服务器端(Broker)端 Kafka Broker服务器端通过以下方式来确保生产者端消息发送的成功和不丢失 1. 消息持久化异步刷盘Kafka Broker将接收到的消息持久化到磁盘上的日志文件中。这样即使在消息发送后发生故障Broker能够恢复并确保消息不会丢失。注意持久化是由操作系统调度的如果持久化之前系统崩溃了那么就因为不能持久化导致数据丢失但是Kafka没提供同步刷盘策略 2. 复制与高可用性Kafka支持分布式部署可以将消息分布到多个Broker上形成一个Broker集群。在集群中消息被复制到多个副本中以提供冗余和高可用性。生产者发送消息时它可以将消息发送到任何一个Broker然后Broker将确保消息在集群中的所有副本中都被复制成功。 3. 消息提交确认当生产者发送消息后在收到Broker的确认响应之前生产者会等待。如果消息成功写入并复制到了指定的副本中Broker会发送确认响应给生产者。如果生产者在指定的时间内没有收到确认响应它将会尝试重新发送消息以确保消息不会丢失。 4. 可靠性设置同步刷盘生产者可以配置一些参数来提高消息发送的可靠性。例如可以设置acks参数来指定需要收到多少个Broker的确认响应才认为消息发送成功。可以将acks设置为all表示需要收到所有副本的确认响应才算发送成功。 总之Kafka Broker通过持久化和复制机制以及消息确认和可靠性设置确保生产者端的消息发送成功且不丢失。同时应注意及时处理可能的错误情况并根据生产者端需求和场景合理配置相应的参数。 对于使用YAML文件进行Kafka配置的情况你可以按照以下格式设置acks参数 # Kafka生产者配置 producer:bootstrap.servers: your-kafka-server:9092acks: all # 设置acks参数为allkey.serializer: org.apache.kafka.common.serialization.StringSerializervalue.serializer: org.apache.kafka.common.serialization.StringSerializer 消费者Consumer处理 Kafka Consumer 默认会确保消息的至少一次传递at least once delivery。这意味着当 Consumer 完成对一条消息的处理后会向 Kafka 提交消息的偏移量offset告知 Kafka 这条消息已被成功处理。如果 Consumer 在处理消息时发生错误可以通过回滚偏移量来重试处理之前的消息。 以下是一些确保消息消费成功的方法 使用自动提交偏移量Auto Commit Offsets手动提交偏移量Manual Commit Offsets设置消费者的最大重试次数设置适当的消费者参数 尽管 Kafka 提供了可靠的消息传递机制但仍然需要在消费者端实现适当的错误处理和重试逻辑以处理可能发生的错误情况。
http://www.yingshimen.cn/news/5103/

相关文章:

  • 手机怎样使用域名访问网站企微app下载安装
  • 凡科网建立网站后怎么修改免费电子版个人简历
  • 在什么网站做兼职响水县住房建设局网站
  • 室内设计效果图的网站付费电影网站源码
  • 网站 报价方案马鞍山的网站建设公司哪家好
  • 计算机网站建设的能力动画设计专业介绍
  • 郴州网站建设哪家公司好龙岗门户
  • 手机网站建设网站报价怎么办
  • 怎么用电脑做网站服务器做网站要具备些什么
  • 汾湖做网站室内装修设计网
  • 互联网大赛建设网站策划书网站制作需求表
  • 网站制作 长沙企业级软件
  • 网站建设设计总结网站 地区加关键词
  • 购物网站开发 项目描述建筑设计方案大全
  • 驻马店市住房和城乡建设局网站长沙专业建设网站企业
  • 四视图网站网站如何做excel预览
  • 网站备案密码有什么用wordpress缩略图幻灯展现
  • 做网站注册的商标类别网站的制作建站人
  • 网站后台图片个人网站设计成品
  • 番禺网站建设wwiw宁波seo服务推广平台
  • 网站建设 报告北京市建设工程招标网站
  • 杭州 网站定制图片渐隐 网站头部flash
  • wordpress中国加速360优化大师官方官网
  • 网站建设每年需要交多少钱宿州住房和城乡建设局网站
  • 辛集市住房和城乡建设厅网站网站建设的人性分析
  • 为什么网站开发需要写php网站建设是
  • 机场建设管理投资有限责任公司网站做微商怎样加入网站卖东西赚钱
  • 会展免费网站模板模板建站小程序
  • 厦门行业网站建设微信小程序连接wordpress
  • 中铁建设集团华东分公司网站游戏网站建设计划书