100字范文,内容丰富有趣,生活中的好帮手!
100字范文 > brave+kafka+zipkin+cassandra搭建分布式链路跟踪系统

brave+kafka+zipkin+cassandra搭建分布式链路跟踪系统

时间:2020-02-14 10:19:32

相关推荐

brave+kafka+zipkin+cassandra搭建分布式链路跟踪系统

独角兽企业重金招聘Python工程师标准>>>

先来浏览一下架构图(下图的scribe采用kafka替代)

首先我们完成brave+kafka+zipkin这一步先,这时候采用的存储是zipkin的内存。

----------------------------------------------------------------------------------------------

下载kafka后

tar -xzf kafka_2.11-0.9.0.0.tgz

cd kafka_2.11-0.9.0.0

启动zookeeper:

bin/zookeeper-server-start.sh config/zookeeper.properties

(windows 启动bin\windows\zookeeper-server-start.batconfig\zookeeper.properties)

启动kafka服务:

bin/kafka-server-start.sh config/server.properties

(windows 启动bin\windows\kafka-server-start.bat config\server.properties)

----------------------------------------------------------------------------------------------

下载zipkin的jar包,

wget -O zipkin.jar '/remote_content?g=io.zipkin.java&a=zipkin-server&v=LATEST&c=exec'

运行: java -jar zipkin.jar

用解压工具可以看到zipkin.jar里的BOOT-INF\classes\zipkin-server-shared.yml文件,所有的配置都在这个文件里。

改变配置里的值,可以通过java启动脚本里-D带入配置参数。

现在我们通过kafka做数据通道的话就采用如下参数:

java -DKAFKA_ZOOKEEPER=localhost:2181 -jar zipkin-server-1.19.2-exec.jar

打开ui查看 http://localhost:9411/

----------------------------------------------------------------------------------------------

接着附上测试插入数据到kafka的代码。

import com.github.kristofa.brave.*;import com.twitter.zipkin.gen.Endpoint;import zipkin.Span;import zipkin.reporter.AsyncReporter;import zipkin.reporter.kafka08.KafkaSender;import java.util.ArrayList;import java.util.Collection;import java.util.Random;import java.util.concurrent.ArrayBlockingQueue;import java.util.concurrent.BlockingQueue;public class BraveKafkaTest {private static Brave brave = null;private static Brave brave2 = null;private static void braveInit(){// KafkaSender sender = KafkaSender.builder().bootstrapServers("localhost:9092").messageMaxBytes(10000).build();KafkaSender sender = KafkaSender.builder().bootstrapServers("localhost:9092").build();AsyncReporter<Span> report = AsyncReporter.builder(sender).build();brave = new Brave.Builder("appserver").reporter(report).build();brave2 = new Brave.Builder("datacenter").reporter(report).build();}static class Task {String name;SpanId spanId;public Task(String name, SpanId spanId) {super();this.name = name;this.spanId = spanId;}}public static void main(String[] args) throws Exception {braveInit();final BlockingQueue<Task> queue = new ArrayBlockingQueue<Task>(10);Thread thread = new Thread(){public void run() {while (true) {try {Task task = queue.take();dcHandle(task.name, task.spanId);} catch (Exception e) {e.printStackTrace();}}}};thread.start();for (int i = 0; i < 10; i++) {ServerRequestInterceptor serverRequestInterceptor = brave.serverRequestInterceptor();ServerResponseInterceptor serverResponseInterceptor = brave.serverResponseInterceptor();ClientRequestInterceptor clientRequestInterceptor = brave.clientRequestInterceptor();ClientResponseInterceptor clientResponseInterceptor = brave.clientResponseInterceptor();serverRequestInterceptor.handle(new ServerRequestAdapterImpl("group_data"));ClientRequestAdapterImpl clientRequestAdapterImpl = new ClientRequestAdapterImpl("get_radio_list");clientRequestInterceptor.handle(clientRequestAdapterImpl);queue.offer(new Task("get_radio_list", clientRequestAdapterImpl.getSpanId()));Thread.sleep(10);clientResponseInterceptor.handle(new ClientResponseAdapterImpl());clientRequestAdapterImpl = new ClientRequestAdapterImpl("get_user_list");clientRequestInterceptor.handle(clientRequestAdapterImpl);queue.offer(new Task("get_user_list", clientRequestAdapterImpl.getSpanId()));Thread.sleep(10);clientResponseInterceptor.handle(new ClientResponseAdapterImpl());clientRequestAdapterImpl = new ClientRequestAdapterImpl("get_program_list");clientRequestInterceptor.handle(clientRequestAdapterImpl);queue.offer(new Task("get_program_list", clientRequestAdapterImpl.getSpanId()));Thread.sleep(10);clientResponseInterceptor.handle(new ClientResponseAdapterImpl());serverResponseInterceptor.handle(new ServerResponseAdapterImpl());Thread.sleep(10);}}public static void dcHandle(String spanName, SpanId spanId){ServerRequestInterceptor serverRequestInterceptor = brave2.serverRequestInterceptor();ServerResponseInterceptor serverResponseInterceptor = brave2.serverResponseInterceptor();serverRequestInterceptor.handle(new ServerRequestAdapterImpl(spanName, spanId));serverResponseInterceptor.handle(new ServerResponseAdapterImpl());}static class ServerRequestAdapterImpl implements ServerRequestAdapter {Random randomGenerator = new Random();SpanId spanId;String spanName;ServerRequestAdapterImpl(String spanName){this.spanName = spanName;long startId = randomGenerator.nextLong();SpanId spanId = SpanId.builder().spanId(startId).traceId(startId).parentId(startId).build();this.spanId = spanId;}ServerRequestAdapterImpl(String spanName, SpanId spanId){this.spanName = spanName;this.spanId = spanId;}public TraceData getTraceData() {if (this.spanId != null) {return TraceData.builder().spanId(this.spanId).build();}long startId = randomGenerator.nextLong();SpanId spanId = SpanId.builder().spanId(startId).traceId(startId).parentId(startId).build();return TraceData.builder().spanId(spanId).build();}public String getSpanName() {return spanName;}public Collection<KeyValueAnnotation> requestAnnotations() {Collection<KeyValueAnnotation> collection = new ArrayList<KeyValueAnnotation>();KeyValueAnnotation kv = KeyValueAnnotation.create("radioid", "165646485468486364");collection.add(kv);return collection;}}static class ServerResponseAdapterImpl implements ServerResponseAdapter {public Collection<KeyValueAnnotation> responseAnnotations() {Collection<KeyValueAnnotation> collection = new ArrayList<KeyValueAnnotation>();KeyValueAnnotation kv = KeyValueAnnotation.create("radioid", "165646485468486364");collection.add(kv);return collection;}}static class ClientRequestAdapterImpl implements ClientRequestAdapter {String spanName;SpanId spanId;ClientRequestAdapterImpl(String spanName){this.spanName = spanName;}public SpanId getSpanId() {return spanId;}public String getSpanName() {return this.spanName;}public void addSpanIdToRequest(SpanId spanId) {//记录传输到远程服务System.out.println(spanId);if (spanId != null) {this.spanId = spanId;System.out.println(String.format("trace_id=%s, parent_id=%s, span_id=%s", spanId.traceId, spanId.parentId, spanId.spanId));}}public Collection<KeyValueAnnotation> requestAnnotations() {Collection<KeyValueAnnotation> collection = new ArrayList<KeyValueAnnotation>();KeyValueAnnotation kv = KeyValueAnnotation.create("radioid", "165646485468486364");collection.add(kv);return collection;}public Endpoint serverAddress() {return null;}}static class ClientResponseAdapterImpl implements ClientResponseAdapter {public Collection<KeyValueAnnotation> responseAnnotations() {Collection<KeyValueAnnotation> collection = new ArrayList<KeyValueAnnotation>();KeyValueAnnotation kv = KeyValueAnnotation.create("radioname", "火星人1");collection.add(kv);return collection;}}}

maven依赖:

<dependencies><dependency><groupId>io.zipkin.brave</groupId><artifactId>brave-core</artifactId><version>3.17.0</version></dependency><dependency><groupId>io.zipkin.brave</groupId><artifactId>brave-spancollector-http</artifactId><version>3.17.0</version></dependency><dependency><groupId>com.google.auto.value</groupId><artifactId>auto-value</artifactId><version>1.3</version></dependency><dependency><groupId>io.zipkin.brave</groupId><artifactId>brave-spancollector-kafka</artifactId><version>3.17.0</version></dependency><dependency><groupId>io.zipkin.reporter</groupId><artifactId>zipkin-sender-kafka08</artifactId><version>0.4.3</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.2</version></dependency><dependency><groupId>ch.qos.logback</groupId><artifactId>logback-access</artifactId><version>1.1.3</version></dependency><dependency><groupId>ch.qos.logback</groupId><artifactId>logback-classic</artifactId><version>1.1.3</version></dependency><dependency><groupId>ch.qos.logback</groupId><artifactId>logback-core</artifactId><version>1.1.3</version></dependency></dependencies>

运行完成后,插入了10条跟踪信息导kafka,打开http://localhost:9411/ 即可看到效果。

----------------------------------------------------------------------------------------------

接下来,采用cassandra存储从kafka采集过来的数据。

下载apache-cassandra-2.2.8-bin.tar.gz,解压后

启动cassandra:bin\cassandra.bat

重新运行zipkin(加入-DSTORAGE_TYPE=cassandra参数):

java -DKAFKA_ZOOKEEPER=localhost:2181 -DSTORAGE_TYPE=cassandra-jar zipkin-server-1.19.2-exec.jar

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。