關于kafka的這些概念和理論請見這篇博客 https://www.jianshu.com/p/d3e963ff8b70,
在下只簡單闡述一下自己遇到的問題以及解決辦法? 由于之前自己配置的maven版本和消費kafka信息的姿勢有問題,所以導致上線時瘋狂報錯:Attempt to join group failed due to fatal error: The configured groupId is i? xxx
?
kafka創建topic、現貼上正確的使用套路? ? 上maven? 這里要注意的是? 如果你kafka配置jar包不指定版本,那么默認會追隨springboot的版本,所以當你使用低版本的springboot的時候建議帶上版本號
<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-freemarker</artifactId></dependency><dependency><groupId>org.springframework.kafka</groupId><artifactId>spring-kafka</artifactId><version>1.3.5.RELEASE</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.1.15</version></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId></dependency><!--jdbc --><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-jdbc</artifactId></dependency></dependencies>
yml文件
server:port: 8082
spring:kafka:consumer:enable-auto-commit: false
# group-id: test-group-1auto-offset-reset: earliestbootstrap-servers: 192.168.10.xx:9092,192.168.10.xx:9092,192.168.10.xx:9092,192.168.10.xx:9092,192.168.10.xx:9092
java代碼
package com.xuebusi.consumer;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.dao.DataAccessException;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;import com.alibaba.fastjson.JSONObject;/*** 消費者* 指定topics*/
@Component
public class KafkaConsumer {@AutowiredJdbcTemplate jdbcTemplate;@KafkaListener(topics = {"spectrum-data-vod"}, groupId = "test-group-1")public void receive(String message) {try {JSONObject json = JSONObject.parseObject(message);JSONObject videoInfo = json.getJSONObject("video_info");//唯一列String sndlvl_id = videoInfo.getString("sndlvl_id");String is_charge = videoInfo.getString("is_charge");String sndlvl_name = json.getString("sndlvl_name");String clum_id = json.getString("clum_id");String sqls="insert into iptv_content_info(sndlvl_id,is_charge,sndlvl_name,clum_id) values ('"+sndlvl_id+"','"+is_charge+"','"+sndlvl_name+"','"+clum_id+"')";System.out.println("vod: " + sndlvl_name + " ----- " + sndlvl_id + " ----- " + clum_id + " ----- " + is_charge + " ----- ");try {/*** 添加數據到數據庫,并且抓取異常 因為主鍵的關系,重復添加將會報錯,所以做異常處理*/jdbcTemplate.update(sqls);} catch (Exception e) {}} catch (Exception e) {e.printStackTrace();}}@KafkaListener(topics = {"spectrum-data-device"}, groupId = "test-group-2")public void receive2(String message) {try {JSONObject json = JSONObject.parseObject(message);String model = json.getString("model");String manufacturers = json.getString("manufacturers");String apk_version = json.getString("apk_version");String mac = json.getString("mac");String user_id = json.getString("user_id");String license = json.getString("license");System.out.println("device: " + model + " ----- " + manufacturers + " ----- " + apk_version + " ----- " + mac + " ----- " + user_id + "-----" + license);jdbcTemplate.update("insert into iptv_content_device(user_id,model,manufacturers,apk_version,mac,license)values ('"+user_id +"','"+model+"','"+manufacturers+"','"+apk_version+"','"+mac+"','"+license+"') ON DUPLICATE KEY UPDATE model='"+model+"',manufacturers='"+manufacturers+"',apk_version='"+apk_version+"'");} catch (Exception e) {e.printStackTrace();}}
}
指定topics 以及 groupId 只要在方法的注解上標示就好了,但是問題是,假設你的jar包版本有問題的話是沒有groupid這個選項的,它會報錯,所以得注意下jar包的版本
版权声明:本站所有资料均为网友推荐收集整理而来,仅供学习和研究交流使用。
工作时间:8:00-18:00
客服电话
电子邮件
admin@qq.com
扫码二维码
获取最新动态