kafka創建topic,springboot消費kafka設置topics 以及 groupId

 2023-11-19 阅读 41 评论 0

摘要:關于kafka的這些概念和理論請見這篇博客 https://www.jianshu.com/p/d3e963ff8b70, 在下只簡單闡述一下自己遇到的問題以及解決辦法? 由于之前自己配置的maven版本和消費kafka信息的姿勢有問題,所以導致上線時瘋狂報錯:Attempt to join group failed due

關于kafka的這些概念和理論請見這篇博客 https://www.jianshu.com/p/d3e963ff8b70,

在下只簡單闡述一下自己遇到的問題以及解決辦法? 由于之前自己配置的maven版本和消費kafka信息的姿勢有問題,所以導致上線時瘋狂報錯:Attempt to join group failed due to fatal error: The configured groupId is i? xxx

?

kafka創建topic、現貼上正確的使用套路? ? 上maven? 這里要注意的是? 如果你kafka配置jar包不指定版本,那么默認會追隨springboot的版本,所以當你使用低版本的springboot的時候建議帶上版本號

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-freemarker</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>1.3.5.RELEASE</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.1.15</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId></dependency><!--jdbc --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-jdbc</artifactId></dependency></dependencies>

yml文件

server:port: 8082
spring:kafka:consumer:enable-auto-commit: false
#      group-id: test-group-1auto-offset-reset: earliestbootstrap-servers: 192.168.10.xx:9092,192.168.10.xx:9092,192.168.10.xx:9092,192.168.10.xx:9092,192.168.10.xx:9092

java代碼

package com.xuebusi.consumer;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DataAccessException;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;import com.alibaba.fastjson.JSONObject;/*** 消費者* 指定topics*/
@Component
public class KafkaConsumer {@AutowiredJdbcTemplate jdbcTemplate;@KafkaListener(topics = {"spectrum-data-vod"}, groupId = "test-group-1")public void receive(String message) {try {JSONObject json = JSONObject.parseObject(message);JSONObject videoInfo = json.getJSONObject("video_info");//唯一列String sndlvl_id = videoInfo.getString("sndlvl_id");String is_charge = videoInfo.getString("is_charge");String sndlvl_name = json.getString("sndlvl_name");String clum_id = json.getString("clum_id");String sqls="insert into iptv_content_info(sndlvl_id,is_charge,sndlvl_name,clum_id) values ('"+sndlvl_id+"','"+is_charge+"','"+sndlvl_name+"','"+clum_id+"')";System.out.println("vod:   " + sndlvl_name + " ----- " + sndlvl_id + " ----- " + clum_id + " ----- " + is_charge + " ----- ");try {/*** 添加數據到數據庫,并且抓取異常  因為主鍵的關系,重復添加將會報錯,所以做異常處理*/jdbcTemplate.update(sqls);} catch (Exception e) {}} catch (Exception e) {e.printStackTrace();}}@KafkaListener(topics = {"spectrum-data-device"}, groupId = "test-group-2")public void receive2(String message) {try {JSONObject json = JSONObject.parseObject(message);String model = json.getString("model");String manufacturers = json.getString("manufacturers");String apk_version = json.getString("apk_version");String mac = json.getString("mac");String user_id = json.getString("user_id");String license = json.getString("license");System.out.println("device:   " + model + " ----- " + manufacturers + " ----- " + apk_version + " ----- " + mac + " ----- " + user_id + "-----" + license);jdbcTemplate.update("insert into iptv_content_device(user_id,model,manufacturers,apk_version,mac,license)values ('"+user_id +"','"+model+"','"+manufacturers+"','"+apk_version+"','"+mac+"','"+license+"') ON DUPLICATE KEY UPDATE model='"+model+"',manufacturers='"+manufacturers+"',apk_version='"+apk_version+"'");} catch (Exception e) {e.printStackTrace();}}
}

指定topics 以及 groupId 只要在方法的注解上標示就好了,但是問題是,假設你的jar包版本有問題的話是沒有groupid這個選項的,它會報錯,所以得注意下jar包的版本

版权声明:本站所有资料均为网友推荐收集整理而来,仅供学习和研究交流使用。

原文链接:https://808629.com/184445.html

发表评论:

本站为非赢利网站,部分文章来源或改编自互联网及其他公众平台,主要目的在于分享信息,版权归原作者所有,内容仅供读者参考,如有侵权请联系我们删除!

Copyright © 2022 86后生记录生活 Inc. 保留所有权利。

底部版权信息