DEV Community

loading...

Apache Flink REST and Metrics

tspannhw profile image Timothy Spann Originally published at datainmotion.dev on ・8 min read

Apache Flink REST and Metrics

After seeing Caito Scherr's amazing talk, I want to build up some useful dashboards. My first step is exploring all the available APIs in my CSA/Flink environment. The easiest way to discover them was I turned on Developer Console in Chrome while using the Flink Dashboard which is a great dashboard in it's own right. But it is not focused on some key metrics that some customers are asking about in a very easy to read format for end-users.

Some URLs I have been using:

Overview of Flink Cluster

http://FLINKCLUSTER:8078/overview

{" **taskmanagers**":1,"slots-total":2,"slots-available":1," **jobs-running**":1,"jobs-finished":0,"jobs-cancelled":3,"jobs-failed":0," **flink-version**":"1.10.0-csa1.2.0.0","flink-commit":"664a5f5"}

Overview of All Flink Jobs

http://FLINKCLUSTER:8078/jobs/overview

{"jobs":[{"jid":"7c01884b74ff981a896307c4a06f2b15","name":"default: select \* from itemprice","state":"CANCELED","start-time":1599576455857,"end-time":1599576486876,"duration":31019,"last-modification":1599576486876,"cluster":null,"tasks":{"total":1,"created":0,"scheduled":0,"deploying":0,"running":0,"finished":0,"canceling":0,"canceled":1,"failed":0,"reconciling":0}},{"jid":"faeb308856db337ce628af5fea24b895","name":"default: insert into krogerprices\nselect upc,updatedate,itemdescription,brandname,CAST(price as float) as price, UUID() as uuid\nfrom itemprice\nwhere originstore = 'kroger'","state":"CANCELED","start-time":1599674296089,"end-time":1599766705456,"duration":92409367,"last-modification":1599766705456,"cluster":null,"tasks":{"total":1,"created":0,"scheduled":0,"deploying":0,"running":0,"finished":0,"canceling":0,"canceled":1,"failed":0,"reconciling":0}},{"jid":"5d6ae4f72ab9fca3cea28ba6d4905ca7","name":"default: select \* from krogerprices","state":"CANCELED","start-time":1599576795487,"end-time":1599579517485,"duration":2721998,"last-modification":1599579517485,"cluster":null,"tasks":{"total":1,"created":0,"scheduled":0,"deploying":0,"running":0,"finished":0,"canceling":0,"canceled":1,"failed":0,"reconciling":0}},{"jid":"ec80a32b6ab59d96f649f5b3e493ec67","name":"Streaming WordCount","state":"FINISHED","start-time":1599571302659,"end-time":1599571318768,"duration":16109,"last-modification":1599571318768,"cluster":null,"tasks":{"total":5,"created":0,"scheduled":0,"deploying":0,"running":0,"finished":5,"canceling":0,"canceled":0,"failed":0,"reconciling":0}},{"jid":"ad949e727a8c0267c9f2550c6a9b6000","name":"default: select \* from itemprice","state":"CANCELED","start-time":1599676984684,"end-time":1599677004620,"duration":19936,"last-modification":1599677004620,"cluster":null,"tasks":{"total":1,"created":0,"scheduled":0,"deploying":0,"running":0,"finished":0,"canceling":0,"canceled":1,"failed":0,"reconciling":0}},{"jid":"8c1a6903b81e7b926b7105720e24aee8","name":"default: insert into krogerprices\nselect upc,updatedate,itemdescription,brandname,CAST(price as float) as price, UUID() as uuid\nfrom itemprice\nwhere originstore = 'kroger'","state":"CANCELED","start-time":1599576540243,"end-time":1599582887998,"duration":6347755,"last-modification":1599582887998,"cluster":null,"tasks":{"total":1,"created":0,"scheduled":0,"deploying":0,"running":0,"finished":0,"canceling":0,"canceled":1,"failed":0,"reconciling":0}},{"jid":"4cb8a5983b0bd3a14fe90618e17e2488","name":"default: select \* from krogerprices","state":"CANCELED","start-time":1599674323425,"end-time":1599676592438,"duration":2269013,"last-modification":1599676592438,"cluster":null,"tasks":{"total":1,"created":0,"scheduled":0,"deploying":0,"running":0,"finished":0,"canceling":0,"canceled":1,"failed":0,"reconciling":0}},{"jid":"01988557ccd71cbab899ded9babab606","name":"default: insert into krogerprices\nselect upc,updatedate,itemdescription,brandname,CAST(price as float) as price, UUID() as uuid\nfrom itemprice\nwhere originstore = 'kroger'","state":"RUNNING","start-time":1599673893701,"end-time":-1,"duration":103791030,"last-modification":1599673903811,"cluster":{"url":"http://ec2-3-86-165-80.compute-1.amazonaws.com:8088/proxy/application\_1599570933443\_0003/","originalUrl":"http://ec2-3-86-165-80.compute-1.amazonaws.com:35981","id":"application\_1599570933443\_0003","hostAndPort":"ec2-3-86-165-80.compute-1.amazonaws.com:35981"},"tasks":{"total":1,"created":0,"scheduled":0,"deploying":0,"running":1,"finished":0,"canceling":0,"canceled":0,"failed":0,"reconciling":0}},{"jid":"7c7932678f193f51c32cd3a2ebff6d59","name":"default: select \* from itemprice","state":"CANCELED","start-time":1599573232967,"end-time":1599576425840,"duration":3192873,"last-modification":1599576425840,"cluster":null,"tasks":{"total":1,"created":0,"scheduled":0,"deploying":0,"running":0,"finished":0,"canceling":0,"canceled":1,"failed":0,"reconciling":0}}]}

Flink Job Details

http://FLINKCLUSTER:8078/jobs/7c01884b74ff981a896307c4a06f2b15

{"jid":"7c01884b74ff981a896307c4a06f2b15","name":"default: select \* from itemprice","isStoppable":false,"state":"CANCELED","start-time":1599576455857,"end-time":1599576486876,"duration":31019,"now":1599576486888,"timestamps":{"FAILING":0,"CANCELED":1599576486876,"RECONCILING":0,"SUSPENDED":0,"CREATED":1599576455857,"CANCELLING":1599576486855,"FAILED":0,"RESTARTING":0,"FINISHED":0,"RUNNING":1599576455905},"vertices":[{"id":"cbc357ccb763df2852fee8c4fc7d55f2","name":"Source: KafkaTableSource(upc, originstore, updatedate, longdescription, itemdescription, brandname, displayimage, price, msrp, tpr) -> SourceConversion(table=[registry.default\_database.itemprice, source: [KafkaTableSource(upc, originstore, updatedate, longdescription, itemdescription, brandname, displayimage, price, msrp, tpr)]], fields=[upc, originstore, updatedate, longdescription, itemdescription, brandname, displayimage, price, msrp, tpr]) -> SinkConversionToTuple2 -> Sink: SQL Client Stream Collect Sink","parallelism":1,"status":"CANCELED","start-time":1599576455941,"end-time":1599576486876,"duration":30935,"tasks":{"RUNNING":0,"FAILED":0,"RECONCILING":0,"FINISHED":0,"CANCELING":0,"CANCELED":1,"SCHEDULED":0,"CREATED":0,"DEPLOYING":0},"metrics":{"read-bytes":0,"read-bytes-complete":true,"write-bytes":0,"write-bytes-complete":true,"read-records":0,"read-records-complete":true,"write-records":0,"write-records-complete":true}}],"status-counts":{"RUNNING":0,"FAILED":0,"RECONCILING":0,"FINISHED":0,"CANCELING":0,"CANCELED":1,"SCHEDULED":0,"CREATED":0,"DEPLOYING":0},"plan":{"jid":"7c01884b74ff981a896307c4a06f2b15","name":"default: select \* from itemprice","nodes":[{"id":"cbc357ccb763df2852fee8c4fc7d55f2","parallelism":1,"operator":"","operator\_strategy":"","description":"Source: KafkaTableSource(upc, originstore, updatedate, longdescription, itemdescription, brandname, displayimage, price, msrp, tpr) -> SourceConversion(table=[registry.default\_database.itemprice, source: [KafkaTableSource(upc, originstore, updatedate, longdescription, itemdescription, brandname, displayimage, price, msrp, tpr)]], fields=[upc, originstore, updatedate, longdescription, itemdescription, brandname, displayimage, price, msrp, tpr]) -> SinkConversionToTuple2 -> Sink: SQL Client Stream Collect Sink","optimizer\_properties":{}}]}}



**Flink Cluster Configuration**

**http://FLINKCLUSTER:8078/config**

{"refresh-interval":10000,"timezone-name":"Coordinated Universal Time","timezone-offset":0,"flink-version":"1.10.0-csa1.2.0.0","flink-revision":"664a5f5 @ 29.04.2020 @ 14:13:26 UTC","features":{"web-submit":false,"global-dashboard":true}}




**Jobs - Specific Job - Checkpoints**

**http://YARNCLUSTER:8088/proxy/application\_1599570933443\_0003/jobs/01988557ccd71cbab899ded9babab606/checkpoints**

{"counts":{"restored":0,"total":0,"in\_progress":0,"completed":0,"failed":0},"summary":{"state\_size":{"min":0,"max":0,"avg":0},"end\_to\_end\_duration":{"min":0,"max":0,"avg":0},"alignment\_buffered":{"min":0,"max":0,"avg":0}},"latest":{"completed":null,"savepoint":null,"failed":null,"restored":null},"history":[]}



**Job Manager Configuration**

**http://YARNCLUSTER:**** 8088/proxy/application\_1599570933443\_0003/jobmanager/config**


[{"key":"yarn.flink-dist-jar","value":"file:/opt/cloudera/parcels/FLINK-1.10.0-csa1.2.0.0-cdh7.1.1.0-326-2901802/lib/flink/lib/flink-dist_2.11-1.10.0-csa1.2.0.0.jar"},{"key":"high-availability.cluster-id","value":"application_1599570933443_0003"},{"key":"jobmanager.rpc.address","value":"ec2-3-86-165-80.compute-1.amazonaws.com"},{"key":"high-availability.storageDir","value":"hdfs:///user/flink/ha"},{"key":"state.backend.rocksdb.timer-service.factory","value":"ROCKSDB"},{"key":"io.tmp.dirs","value":"/yarn/nm/usercache/root/appcache/application_1599570933443_0003"},{"key":"historyserver.cli.fallback","value":"true"},{"key":"parallelism.default","value":"1"},{"key":"execution.buffer-timeout","value":"100"},{"key":"jobmanager.heap.size","value":"1073741824"},{"key":"execution.checkpointing.mode","value":"EXACTLY_ONCE"},{"key":"taskmanager.memory.process.size","value":"2 gb"},{"key":"web.port","value":"0"},{"key":"state.backend.local-recovery","value":"true"},{"key":"state.backend.rocksdb.memory.managed","value":"true"},{"key":"yarn.tags","value":"flink"},{"key":"state.backend.incremental","value":"true"},{"key":"taskmanager.memory.network.fraction","value":"0.1"},{"key":"yarn.container-start-command-template","value":"%java% %jvmmem% %jvmopts% -DyarnContainerId=$CONTAINER_ID %logging% %class% %args% %redirects%"},{"key":"web.tmpdir","value":"/tmp/flink-web-37be6c08-f457-468a-9c4b-5fde92c042b1"},{"key":"jobmanager.rpc.port","value":"33566"},{"key":"execution.checkpointing.timeout","value":"60000"},{"key":"high-availability.zookeeper.quorum","value":"ec2-3-86-165-80.compute-1.amazonaws.com:2181"},{"key":"taskmanager.memory.managed.fraction","value":"0.4"},{"key":"rest.address","value":"ec2-3-86-165-80.compute-1.amazonaws.com"},{"key":"state.backend","value":"FILESYSTEM"},{"key":"logging.configuration.file","value":"/etc/flink/conf/log4j.properties"},{"key":"execution.checkpointing.max-concurrent-checkpoints","value":"1"},{"key":"high-availability.zookeeper.client.acl","value":"open"},{"key":"historyserver.web.address","value":"ec2-3-86-165-80.compute-1.amazonaws.com"},{"key":"state.checkpoints.num-retained","value":"3"},{"key":"historyserver.web.port","value":"8078"},{"key":"pipeline.auto-watermark-interval","value":"200"},{"key":"state.savepoints.dir","value":"hdfs:///user/flink/savepoints"},{"key":"pipeline.generic-types","value":"true"},{"key":"yarn.maximum-failed-containers","value":"100"},{"key":"yarn.application-attempts","value":"5"},{"key":"taskmanager.numberOfTaskSlots","value":"2"},{"key":"state.backend.rocksdb.memory.write-buffer-ratio","value":"0.5"},{"key":"jobmanager.archive.fs.dir","value":"hdfs:///user/flink/applicationHistory"},{"key":"execution.target","value":"yarn-per-job"},{"key":"pipeline.object-reuse","value":"false"},{"key":"internal.io.tmpdirs.use-local-default","value":"true"},{"key":"state.backend.rocksdb.memory.high-prio-pool-ratio","value":"0.1"},{"key":"taskmanager.memory.network.max","value":"2147483648"},{"key":"execution.attached","value":"false"},{"key":"internal.cluster.execution-mode","value":"NORMAL"},{"key":"execution.checkpointing.externalized-checkpoint-retention","value":"RETAIN_ON_CANCELLATION"},{"key":"high-availability","value":"ZOOKEEPER"},{"key":"execution.checkpointing.min-pause","value":"0"},{"key":"execution.checkpointing.snapshot-compression","value":"false"},{"key":"state.checkpoints.dir","value":"hdfs:///user/flink/checkpoints"}]

Job Manager - Log

http://YARNCLUSTER:8088/proxy/application\_1599570933443\_0003/jobmanager/log

2020-09-11 13:37:42,586 INFO org.apache.flink.yarn.YarnResourceManager - Disconnect job manager 9c2e3f25dae1d548dc730941d6484cbb@akka.tcp://flink@ec2-3-86-165-80.compute-1.amazonaws.com:33566/user/jobmanager\_7 for job 67a2de8fb291333bbd90b334f8f83def from the resource manager.
2020-09-11 13:37:42,586 INFO org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService - Stopping ZooKeeperLeaderElectionService ZooKeeperLeaderElectionService{leaderPath='/leader/67a2de8fb291333bbd90b334f8f83def/job\_manager\_lock'}.
2020-09-11 13:37:42,591 INFO org.apache.flink.runtime.jobmanager.ZooKeeperJobGraphStore - Removed job graph 67a2de8fb291333bbd90b334f8f83def from ZooKeeper.



**Job Manager - Standard Out**

**http://YARNCLUSTER:8088/proxy/application\_1599570933443\_0003/jobmanager/stdout**




**Task Managers**

**http://YARNCLUSTER:8088/proxy/application\_1599570933443\_0003/taskmanagers**




{"taskmanagers":[{"id":"container_1599570933443_0003_04_000002","path":"akka.tcp://flink@ec2-3-86-165-80.compute-1.amazonaws.com:38435/user/taskmanager_0","dataPort":41920,"timeSinceLastHeartbeat":1599832789681,"slotsNumber":2,"freeSlots":1,"hardware":{"cpuCores":16,"physicalMemory":133838598144,"freeMemory":669515776,"managedMemory":665719939}}]}

Task Managers - Container

http://YARNCLUSTER:8088/proxy/application\_1599570933443\_0003/taskmanagers/container\_1599570933443\_0003\_04\_000002

{"id":"container\_1599570933443\_0003\_04\_000002","path":"akka.tcp://flink@ec2-3-86-165-80.compute-1.amazonaws.com:38435/user/taskmanager\_0","dataPort":41920,"timeSinceLastHeartbeat":1599832809720,"slotsNumber":2,"freeSlots":1,"hardware":{"cpuCores":16,"physicalMemory":133838598144,"freeMemory":669515776,"managedMemory":665719939},"metrics":{"heapUsed":200512944,"heapCommitted":664272896,"heapMax":664272896,"nonHeapUsed":145389616,"nonHeapCommitted":149569536,"nonHeapMax":780140544,"directCount":5111,"directUsed":168414439,"directMax":168414435,"mappedCount":0,"mappedUsed":0,"mappedMax":0,"memorySegmentsAvailable":5079,"memorySegmentsTotal":5079,"garbageCollectors":[{"name":"PS\_Scavenge","count":91,"time":453},{"name":"PS\_MarkSweep","count":4,"time":358}]}}

Task Managers - Container - Log

http://YARNCLUSTER:8088/proxy/application\_1599570933443\_0003/taskmanagers/container\_1599570933443\_0003\_04\_000002/log

2020-09-09 17:58:17,456 INFO org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer - Starting FlinkKafkaInternalProducer (1/1) to produce into default topic krogerprices
2020-09-09 17:58:17,465 INFO org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - Consumer subtask 0 has no restore state.
2020-09-09 17:58:17,475 INFO org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values: 



**Task Managers - Container - Standard Out**

**http://YARNCLUSTER:8088/proxy/application\_1599570933443\_0003/taskmanagers/container\_1599570933443\_0003\_04\_000002/stdout**





[![](https://1.bp.blogspot.com/-DNlb2WnE75M/X1uRD_nnyKI/AAAAAAAAbmg/8gHSfuXtqwcNuZEqRXDwr7jl1i1RzHvFQCLcBGAsYHQ/w625-h48/flinkrestapijobdetails.png)](https://1.bp.blogspot.com/-DNlb2WnE75M/X1uRD_nnyKI/AAAAAAAAbmg/8gHSfuXtqwcNuZEqRXDwr7jl1i1RzHvFQCLcBGAsYHQ/s2559/flinkrestapijobdetails.png)

[![](https://1.bp.blogspot.com/-v-9rSv090vg/X1uREujpx9I/AAAAAAAAbmk/DUn3o5gwpRQ1fBSt4J3E0Xfpq-Z-QyCiwCLcBGAsYHQ/w625-h75/flinkrestapijobs.png)](https://1.bp.blogspot.com/-v-9rSv090vg/X1uREujpx9I/AAAAAAAAbmk/DUn3o5gwpRQ1fBSt4J3E0Xfpq-Z-QyCiwCLcBGAsYHQ/s2560/flinkrestapijobs.png)

Talks

For the definitive way to read and use these, see Caito's awesome talk here:

https://www.youtube.com/watch?v=MQQ7qaKKKc8

Command Line (https://docs.cloudera.com/csa/1.2.0/job-lifecycle/topics/csa-supported-cli.html)

flink list

20/09/11 13:45:34 INFO cli.CliFrontend: Successfully retrieved list of jobs

------------------ Running/Restarting Jobs -------------------

09.09.2020 17:51:33 : 01988557ccd71cbab899ded9babab606 : default: insert into krogerprices

select upc,updatedate,itemdescription,brandname,CAST(price as float) as price, UUID() as uuid

from itemprice

where originstore = 'kroger' (RUNNING)


Discussion (0)

pic
Editor guide