博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Kafka学习笔记——使用Kafka记录APP的操作日志
阅读量:7014 次
发布时间:2019-06-28

本文共 18698 字,大约阅读时间需要 62 分钟。

hot3.png

上一篇文章我们讲到了Kafka的工作原理和如何使用Kafka的代码示例,这里我们开始讲解Kafka的实战,在实际的应用中我们如何使用kafka的。下面将介绍前台的操作日志定时推送到kafka,然后通过kafka将消息日志进行保存,方便的统计分析形成运营报表。 

我们先看看工程的目录结构: 
这里写图片描述 
kafka的版本是:

org.apache.kafka
kafka_2.10
0.10.2.0
  •  

下面我们依次看下代码实现:

错误码字典类CodeConstant.java

public class CodeConstant {    // 参数为空    public static int NULL_ERROR = -1;    // 请求参数错误    public static int PARAM_ERROR = -2;    // token错误    public static int TOKEN_ERROR = -3;}
  •  

返回信息实体类JsonMsg.java

public class JsonMsg {    private int code;    private String message;    public int getCode() {        return code;    }    public void setCode(int code) {        this.code = code;    }    public String getMessage() {        return message;    }    public void setMessage(String message) {        this.message = message;    }}
  •  

kafka消息实体类Message.java

/** * kafka消息实体类 * @author fuyuwei * 2017年6月10日 下午10:57:17 */public class Message implements Serializable {    private static final long serialVersionUID = -6170235919490993626L;    /**     * 消息主键     */    protected String messageId;    /**     * 回复消息对应的源消息主键     */    protected String sourceMessageId;    /**     * 发送消息相关信息     */    protected String  sender;    /**     * 消息体     */    protected byte[]  messageBody;    /**     * 消息创建时间     */    protected long createTime;    public Message(byte[] messageBody){        this.sender = getIps();        createMessageId();        this.messageBody = messageBody;        this.createTime = System.currentTimeMillis();    }    public String getIps(){        try {            return InetAddress.getLocalHost().getHostAddress();        } catch (UnknownHostException e) {            e.printStackTrace();        }        return "";    }    /**     * 消息转为在消息中间件传输的内容     * @return      * @throws BusinessException     */    public String toJSONString() throws BusinessException {        createMessageId();        try {            return JsonUtil.toJSon(this);        } catch (BusinessException e) {            throw e;        }    }    /**     * 接收到的消息转为实体对象     * @param content 消息内容     * @return 消息实体     * @throws BusinessException     */    public Message toMessage(String content) throws BusinessException{        return JsonUtil.readValue(content, Message.class);    }    public String toString(){        String date =null;              try {            SimpleDateFormat sdf=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");             date = sdf.format(new Date(createTime));        } catch (Exception e) {                 }         StringBuffer sb = new StringBuffer();        sb.append("messageId:"+this.messageId+"\r\n").append("sourceMessageId:"+this.messageId+"\r\n")          .append("sender:"+sender+"\r\n").append("messageBody"+messageBody+"\r\n")          .append("createTime="+date+"\r\n");        return sb.toString();    }    public String getMessageId() {        return messageId;    }    private void createMessageId() {                this.messageId = sender+createUUID();    }    private String createUUID(){        String id = UUID.randomUUID().toString();               return id.substring(0,8)+id.substring(9,13)+id.substring(14,18)+id.substring(19,23)+id.substring(24); //去掉“-”符号     }       public String getSender() {        return sender;    }    public void setSender(String sender) {        this.sender = sender;    }    public long getCreateTime() {        return createTime;    }    public void setCreateTime(long createTime) {        this.createTime = createTime;    }    public String getSourceMessageId() {        return sourceMessageId;    }    public void setSourceMessageId(String sourceMessageId) {        this.sourceMessageId = sourceMessageId;    }    public byte[] getMessageBody() {        return messageBody;    }    public void setMessageBody(byte[] messageBody) {        this.messageBody = messageBody;    }    public void setMessageId(String messageId) {        this.messageId = messageId;    }}
  •  

Http访问类KafkaLogController.java

/** * kafka log接收器 * @author fuyuwei * 2017年6月10日 下午8:00:07 */@Controllerpublic class KafkaLogController {    private static MessageProducer producer = MessageProducer.getInstance();    /**     * 接收前台传来的日志字符串     * 既然采用Http协议请求,务必考虑传输的安全性,添加了请求的参数拦截校验     * @author fuyuwei     * 2017年6月10日 下午8:01:36     * @param req     * @param resp     * @throws Throwable     */    @RequestMapping(value = "/kafka/log/receiveLog.do", method= RequestMethod.POST)    public void receiveLog(HttpServletRequest req, HttpServletResponse resp) throws Throwable{        ServletInputStream is = req.getInputStream();        byte[] bytes = readStream(is);        if(bytes == null || bytes.length == 0){            JsonMsg msg = new JsonMsg();            msg.setCode(CodeConstant.NULL_ERROR);            msg.setMessage("the request data is null");            // 不设置缓存            RespUtil.responJson(resp, msg, 0);            return;        }        Message message = new Message(bytes);        producer.sendMessage("appLog", message);        BizLogger.info("receiveLog","appLog",message.getMessageId());    }    /**     * 把日志字符串转换为字节流数组     * @author fuyuwei     * 2017年6月10日 下午8:05:20     * @param inStream     * @return     */    public static byte[] readStream(InputStream inStream){        ByteArrayOutputStream outStream = new ByteArrayOutputStream();        BufferedInputStream inputStream = new BufferedInputStream(inStream);        try {            byte[] buffer = new byte[1024];            int len = -1;            while((len = inputStream.read(buffer)) != -1){                outStream.write(buffer,0,len);            }            return outStream.toByteArray();        } catch (IOException e) {            BizLogger.error(e, "inputStream.read failure...");        }        return null;    }}
  •  

Spring启动加载类InitMessageConsumer.java

public class InitMessageConsumer implements InitializingBean, DisposableBean {    public MessageConsumer consumer;    @Override    public void destroy() throws Exception {    }    @Override    public void afterPropertiesSet() throws Exception {    }    public void initMethod() {        BizLogger.info("init MessageReceiver start");        consumer = new MessageConsumer("appLog", 2,"app-group", new MessageConsumerExecutor());        try {            consumer.receiveMessage();        } catch (Exception e) {            BizLogger.error(e, "InitAndDestroySeqBean initMethod");        }        BizLogger.info("MessageReceiver init finish!");    }    public void destroyMethod() {        if (null != consumer) {            consumer.close();        }    }}
  •  

拦截器AccessInteceptor.java

public class AccessInteceptor implements HandlerInterceptor {    @Override    public void afterCompletion(HttpServletRequest req,            HttpServletResponse res, Object o, Exception e) throws Exception {    }    @Override    public void postHandle(HttpServletRequest req, HttpServletResponse res,            Object o, ModelAndView m) throws Exception {    }    @Override    public boolean preHandle(HttpServletRequest req, HttpServletResponse res,            Object o) throws Exception {        String flagImmei = req.getHeader("flagImmei");        String tk = req.getHeader("token");        if(flagImmei.length() > 40){            JsonMsg msg = new JsonMsg();            msg.setCode(CodeConstant.PARAM_ERROR);            msg.setMessage("the request data is null");            // 不设置缓存            RespUtil.responJson(res, msg, 0);            return false;        }        if(!AppAESUtil.check(tk, flagImmei)){            JsonMsg msg = new JsonMsg();            msg.setCode(CodeConstant.TOKEN_ERROR);            msg.setMessage("the token is error");            RespUtil.responJson(res, msg, 0);            return false;        }        return true;    }}
  •  

消息生产者MessageProducer.java

public class MessageProducer implements MessageService {    private Producer
producer; private static MessageProducer instance = null; /** * 初始化生产者 */ private MessageProducer() { try { Properties properties = new Properties(); properties.load(new ClassPathResource("producer.properties").getInputStream()); producer = new KafkaProducer<>(properties); } catch (IOException e) { BizLogger.error(e, "load producer file fail!"); } } /** * 单例模式 * @author fuyuwei * 2017年6月10日 下午8:44:05 * @return */ public static synchronized MessageProducer getInstance() { if(instance == null){ synchronized(MessageProducer.class){ if(instance == null){ instance = new MessageProducer(); } } } return instance; } /** * 发送消息 */ public boolean sendMessage(String topic, Message message) throws Exception { Collection
messages = new ArrayList
(); messages.add(message); return sendMessage(topic, messages); } /** * 批量发送消息 */ public boolean sendMessage(String topic, Collection
messages) throws Exception { if (messages == null || messages.isEmpty()) { return false; } for (Message message : messages) { ProducerRecord
km = new ProducerRecord
(topic, message.getMessageId(), message.getMessageBody()); producer.send(km); } return true; } /** * 关闭发送客户端 */ public void close() { producer.close(); }}
  •  

消息消费者MessageConsumer.java

public class MessageConsumer {    private String topic;    private int partitionsNum;     private String topicConsumerGroup;    private MessageExecutor executor;      private ConsumerConnector connector;     private ExecutorService threadPool;      public MessageConsumer(String topic, int partitionsNum,String topicConsumerGroup, MessageExecutor executor){         this.topic = topic;        this.executor = executor;          this.partitionsNum = partitionsNum;        this.topicConsumerGroup = topicConsumerGroup;        createConsumerConsumer();    }    /**     * 初始化消息消费者,创建connector     * @author fuyuwei     * 2017年6月10日 下午11:02:26     * @return     */    private boolean createConsumerConsumer() {         try{            Properties properties = new Properties();              properties.load(new ClassPathResource("consumer.properties").getInputStream());              properties.put("group.id",topicConsumerGroup);              ConsumerConfig config=new ConsumerConfig(properties);             connector=Consumer.createJavaConsumerConnector(config);                return true;                    }catch (IOException e) {            BizLogger.error(e, "MessageConsumer","init kafka consumer properties error");        }        return false;    }      /**     * 接收消息,并启动线程放到线程池执行     * @author fuyuwei     * 2017年6月10日 下午11:02:51     * @throws Exception     */    public void receiveMessage() throws Exception{              Map
topics = new HashMap
(); topics.put(topic, partitionsNum); Map
>> streams = connector.createMessageStreams(topics); List
> partitions = streams.get(topic); threadPool = Executors.newFixedThreadPool(partitionsNum); for(KafkaStream
partition : partitions){ threadPool.submit(new ReceiverMessageRunner(partition)); } } public void close(){ try{ if (threadPool != null) threadPool.shutdownNow(); }catch(Exception e){ BizLogger.error(e, "MessageConsumer","close fail"); }finally{ if (connector != null) connector.shutdown(); } } private class ReceiverMessageRunner implements Runnable{ private KafkaStream
partition; public ReceiverMessageRunner(KafkaStream
partition) { this.partition = partition; } public void run(){ ConsumerIterator
it = partition.iterator(); while(it.hasNext()){ MessageAndMetadata
item = it.next(); executor.execute(item.message()); } } } }
  •  

执行消息的保存操作MessageConsumerExecutor.java

public class MessageConsumerExecutor implements MessageExecutor {    @Override    public void execute(byte[] message ) {        try {            BizLogger.info("ReceiverMessageExecutor","start Resolve message");            String random =  randomString();            int totalLength = message.length;            if(totalLength <= 4 ){                BizLogger.info("message length is not correct");            }            byte[] header = new byte[4];// 4个字节的消息头            System.arraycopy(message, 0, header, 0, 4);            int headerLength = Utility.byte2Int(header);            if(headerLength >= totalLength){                BizLogger.info("message header is not correct","headerLength",headerLength,"totalLength",totalLength);                return;            }            byte[] headerMessage = new byte[headerLength];            System.arraycopy(message, 4, headerMessage, 0, headerLength);            BizLogger.info("start parse headerMessage");            NYMobStatHeader mobheader = NYMobStatHeader.parseFrom(headerMessage);            BizLogger.info("header",mobheader.getAppVer(),mobheader.getSysVer(),mobheader.getSdkVer(),mobheader.getDeviceName(),mobheader.getTelcom(),mobheader.getImei(),mobheader.getNetworkType(),mobheader.getAppId(),mobheader.getUserId(),random + mobheader.getFileName());            int currentLength = 4 + headerLength;            while (currentLength < totalLength) {                byte[] bodyMessageFlag = new byte[4];// 4个字节代表消息体的长度                System.arraycopy(message, currentLength, bodyMessageFlag, 0, 4);                int bodyLength = Utility.byte2Int(bodyMessageFlag);                if(bodyLength >= totalLength){                    BizLogger.info("message body is not correct");                    return;                }                byte[] bodyMessage = new byte[bodyLength];                currentLength = currentLength + 4 ;                System.arraycopy(message, currentLength, bodyMessage, 0, bodyLength);                currentLength = currentLength + bodyLength;                NYMobStatModel statModel = NYMobStatModel.parseFrom(bodyMessage);                Map
maps = statModel.getEventAttributesMap(); StringBuffer keys = new StringBuffer(); if(maps != null){ Set
keySet=maps.keySet(); Iterator
iterator=keySet.iterator(); while(iterator.hasNext()){ String key=iterator.next(); String value = maps.get(key); keys.append(key).append(":").append(value).append(","); } } BizLogger.info("body",statModel.getDataType(),statModel.getCtime(),statModel.getEventId(),statModel.getEventLabel(),keys.toString(),statModel.getPageId(),statModel.getFromPageId(),statModel.getUserId(),random + mobheader.getFileName()); } } catch (InvalidProtocolBufferException e) { BizLogger.info("protobuff parse fail "); ErrorMessageLogger.info("ReceiverMessageExecutor","protobuff parse fail"); }catch (Exception e) { BizLogger.info("parse fail "); ErrorMessageLogger.info("ReceiverMessageExecutor","parse fail"); } } public static String randomString(){ String s = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz"; char[] c = s.toCharArray(); Random random = new Random(); StringBuffer buffer = new StringBuffer(); for(int i = 0;i< 5;i++){ buffer.append(c[random.nextInt(c.length)]); } return buffer.toString(); }}
  •  

定义保存消息操作接口类MessageExecutor.java

public interface MessageExecutor {            public void execute(byte[] message) ;   }
  •  

发送消息接口类MessageService.java

public interface MessageService {    /**     * 发送消息     * @param message 消息     * @return     * @throws BusinessException     */    public boolean sendMessage(String topic,Message message) throws Exception;     /**     * 批量发送消息     * @param messages 消息集合     * @return     * @throws BusinessException     */    public boolean sendMessage(String topic,Collection
messages)throws Exception;}
  •  

序列化工具类GoogleprotobufUtils.java和往前台返回json信息的工具类RespUtil.java

public class RespUtil {    /**     *      * @author fuyuwei     * 2017年6月10日 下午8:23:41     * @param resp     * @param msg     * @param cachetime     * @throws IOException     */    public static void responJson(HttpServletResponse resp, JsonMsg msg,int cachetime) throws IOException {        resp.setHeader("Access-Control-Allow-Origin", "*");        if (cachetime == 0) {            resp.setHeader("Cache-Control", "no-cache");            resp.setHeader("Pragma", "no-cache");        } else {            resp.setHeader("Cache-Control",                    (new StringBuilder()).append("max-age=").append(cachetime)                            .toString());        }        resp.setContentType("application/json;charset=utf-8");        resp.getWriter().write(msg.toString());        resp.getWriter().close();    }}
  •  

消费者配置文件consumer.properties

zookeeper.connect=127.0.01:2181# timeout in ms for connecting to zookeeper zookeeper.session.timeout.ms=20000zookeeper.connectiontimeout.ms=1000000zookeeper.sync.time.ms=20000  auto.commit.enable=trueauto.commit.interval.ms=1000queued.max.message.chunks=50rebalance.max.retries=5# 最大取多少块缓存到消费者(默认10)queued.max.message.chunks=50# 每次feth将得到多条消息,此值为总大小,提升此值,将会消耗更多的consumer端内存fetch.min.bytes=6553600# 当消息的尺寸不足时,server阻塞的时间,如果超时,消息将立即发送给consumerfetch.wait.max.ms=5000socket.receive.buffer.bytes=655360auto.offset.reset=largest# 指定序列化处理类(mafka client API调用说明-->3.序列化约定wiki),默认为kafka.serializer.DefaultDecoder,即byte[]derializer.class=kafka.serializer.DefaultDecoder
  •  

生产者配置producer.properties

bootstrap.servers=127.0.01:9092partitioner.class=org.apache.kafka.clients.producer.internals.DefaultPartitionervalue.serializer=org.apache.kafka.common.serialization.ByteArraySerializerkey.serializer=org.apache.kafka.common.serialization.StringSerializerbuffer.memory=33554432linger.ms=0acks=1request.timeout.ms=10000
  •  

Spring文件配置spring-mvc.xml

json=application/json xml=application/xml

转载于:https://my.oschina.net/rightemperor/blog/920362

你可能感兴趣的文章
jdk环境变量配置
查看>>
为网卡配置多个IP地址(windows)
查看>>
句柄的理解
查看>>
手机网络连接问题
查看>>
Go -- runtime.Gosched()的作用分析
查看>>
Java Lambda 表达式 对 Map 对象排序
查看>>
WIndows 使用VS编译 Lua5
查看>>
转 VB ListView控件各种操作详解
查看>>
查看name的状态,是属于active还是standby
查看>>
&lt;LeetCode OJ&gt; 337. House Robber III
查看>>
PSR规范
查看>>
[Javascript] this in Function Calls
查看>>
MinGW32和64位交叉编译环境的安装和使用
查看>>
laravel 增加不存在数据库的字段
查看>>
什么是“单播”“组播”和“多播”
查看>>
flex---->图表控件
查看>>
Android Developers:在命令行构建和运行
查看>>
firefox 不识别background-position-y / background-position-x
查看>>
分析函数调用关系图(call graph)的几种方法
查看>>
Dynamic Web Module 3.0 requires Java 1.6 or newer
查看>>