DEV Community

Timothy Spann.   πŸ‡ΊπŸ‡¦
Timothy Spann. πŸ‡ΊπŸ‡¦

Posted on • Originally published at datainmotion.dev on

Apache Flink REST and Metrics

Apache Flink REST and Metrics

After seeing Caito Scherr's amazing talk, I want to build up some useful dashboards. My first step is exploring all the available APIs in my CSA/Flink environment. The easiest way to discover them was I turned on Developer Console in Chrome while using the Flink Dashboard which is a great dashboard in it's own right. But it is not focused on some key metrics that some customers are asking about in a very easy to read format for end-users.

Some URLs I have been using:

Overview of Flink Cluster

http://FLINKCLUSTER:8078/overview

{" **taskmanagers**":1,"slots-total":2,"slots-available":1," **jobs-running**":1,"jobs-finished":0,"jobs-cancelled":3,"jobs-failed":0," **flink-version**":"1.10.0-csa1.2.0.0","flink-commit":"664a5f5"}
Enter fullscreen mode Exit fullscreen mode

Overview of All Flink Jobs

http://FLINKCLUSTER:8078/jobs/overview

{"jobs":[{"jid":"7c01884b74ff981a896307c4a06f2b15","name":"default: select \* from itemprice","state":"CANCELED","start-time":1599576455857,"end-time":1599576486876,"duration":31019,"last-modification":1599576486876,"cluster":null,"tasks":{"total":1,"created":0,"scheduled":0,"deploying":0,"running":0,"finished":0,"canceling":0,"canceled":1,"failed":0,"reconciling":0}},{"jid":"faeb308856db337ce628af5fea24b895","name":"default: insert into krogerprices\nselect upc,updatedate,itemdescription,brandname,CAST(price as float) as price, UUID() as uuid\nfrom itemprice\nwhere originstore = 'kroger'","state":"CANCELED","start-time":1599674296089,"end-time":1599766705456,"duration":92409367,"last-modification":1599766705456,"cluster":null,"tasks":{"total":1,"created":0,"scheduled":0,"deploying":0,"running":0,"finished":0,"canceling":0,"canceled":1,"failed":0,"reconciling":0}},{"jid":"5d6ae4f72ab9fca3cea28ba6d4905ca7","name":"default: select \* from krogerprices","state":"CANCELED","start-time":1599576795487,"end-time":1599579517485,"duration":2721998,"last-modification":1599579517485,"cluster":null,"tasks":{"total":1,"created":0,"scheduled":0,"deploying":0,"running":0,"finished":0,"canceling":0,"canceled":1,"failed":0,"reconciling":0}},{"jid":"ec80a32b6ab59d96f649f5b3e493ec67","name":"Streaming WordCount","state":"FINISHED","start-time":1599571302659,"end-time":1599571318768,"duration":16109,"last-modification":1599571318768,"cluster":null,"tasks":{"total":5,"created":0,"scheduled":0,"deploying":0,"running":0,"finished":5,"canceling":0,"canceled":0,"failed":0,"reconciling":0}},{"jid":"ad949e727a8c0267c9f2550c6a9b6000","name":"default: select \* from itemprice","state":"CANCELED","start-time":1599676984684,"end-time":1599677004620,"duration":19936,"last-modification":1599677004620,"cluster":null,"tasks":{"total":1,"created":0,"scheduled":0,"deploying":0,"running":0,"finished":0,"canceling":0,"canceled":1,"failed":0,"reconciling":0}},{"jid":"8c1a6903b81e7b926b7105720e24aee8","name":"default: insert into krogerprices\nselect upc,updatedate,itemdescription,brandname,CAST(price as float) as price, UUID() as uuid\nfrom itemprice\nwhere originstore = 'kroger'","state":"CANCELED","start-time":1599576540243,"end-time":1599582887998,"duration":6347755,"last-modification":1599582887998,"cluster":null,"tasks":{"total":1,"created":0,"scheduled":0,"deploying":0,"running":0,"finished":0,"canceling":0,"canceled":1,"failed":0,"reconciling":0}},{"jid":"4cb8a5983b0bd3a14fe90618e17e2488","name":"default: select \* from krogerprices","state":"CANCELED","start-time":1599674323425,"end-time":1599676592438,"duration":2269013,"last-modification":1599676592438,"cluster":null,"tasks":{"total":1,"created":0,"scheduled":0,"deploying":0,"running":0,"finished":0,"canceling":0,"canceled":1,"failed":0,"reconciling":0}},{"jid":"01988557ccd71cbab899ded9babab606","name":"default: insert into krogerprices\nselect upc,updatedate,itemdescription,brandname,CAST(price as float) as price, UUID() as uuid\nfrom itemprice\nwhere originstore = 'kroger'","state":"RUNNING","start-time":1599673893701,"end-time":-1,"duration":103791030,"last-modification":1599673903811,"cluster":{"url":"http://ec2-3-86-165-80.compute-1.amazonaws.com:8088/proxy/application\_1599570933443\_0003/","originalUrl":"http://ec2-3-86-165-80.compute-1.amazonaws.com:35981","id":"application\_1599570933443\_0003","hostAndPort":"ec2-3-86-165-80.compute-1.amazonaws.com:35981"},"tasks":{"total":1,"created":0,"scheduled":0,"deploying":0,"running":1,"finished":0,"canceling":0,"canceled":0,"failed":0,"reconciling":0}},{"jid":"7c7932678f193f51c32cd3a2ebff6d59","name":"default: select \* from itemprice","state":"CANCELED","start-time":1599573232967,"end-time":1599576425840,"duration":3192873,"last-modification":1599576425840,"cluster":null,"tasks":{"total":1,"created":0,"scheduled":0,"deploying":0,"running":0,"finished":0,"canceling":0,"canceled":1,"failed":0,"reconciling":0}}]}
Enter fullscreen mode Exit fullscreen mode

Flink Job Details

http://FLINKCLUSTER:8078/jobs/7c01884b74ff981a896307c4a06f2b15

{"jid":"7c01884b74ff981a896307c4a06f2b15","name":"default: select \* from itemprice","isStoppable":false,"state":"CANCELED","start-time":1599576455857,"end-time":1599576486876,"duration":31019,"now":1599576486888,"timestamps":{"FAILING":0,"CANCELED":1599576486876,"RECONCILING":0,"SUSPENDED":0,"CREATED":1599576455857,"CANCELLING":1599576486855,"FAILED":0,"RESTARTING":0,"FINISHED":0,"RUNNING":1599576455905},"vertices":[{"id":"cbc357ccb763df2852fee8c4fc7d55f2","name":"Source: KafkaTableSource(upc, originstore, updatedate, longdescription, itemdescription, brandname, displayimage, price, msrp, tpr) -> SourceConversion(table=[registry.default\_database.itemprice, source: [KafkaTableSource(upc, originstore, updatedate, longdescription, itemdescription, brandname, displayimage, price, msrp, tpr)]], fields=[upc, originstore, updatedate, longdescription, itemdescription, brandname, displayimage, price, msrp, tpr]) -> SinkConversionToTuple2 -> Sink: SQL Client Stream Collect Sink","parallelism":1,"status":"CANCELED","start-time":1599576455941,"end-time":1599576486876,"duration":30935,"tasks":{"RUNNING":0,"FAILED":0,"RECONCILING":0,"FINISHED":0,"CANCELING":0,"CANCELED":1,"SCHEDULED":0,"CREATED":0,"DEPLOYING":0},"metrics":{"read-bytes":0,"read-bytes-complete":true,"write-bytes":0,"write-bytes-complete":true,"read-records":0,"read-records-complete":true,"write-records":0,"write-records-complete":true}}],"status-counts":{"RUNNING":0,"FAILED":0,"RECONCILING":0,"FINISHED":0,"CANCELING":0,"CANCELED":1,"SCHEDULED":0,"CREATED":0,"DEPLOYING":0},"plan":{"jid":"7c01884b74ff981a896307c4a06f2b15","name":"default: select \* from itemprice","nodes":[{"id":"cbc357ccb763df2852fee8c4fc7d55f2","parallelism":1,"operator":"","operator\_strategy":"","description":"Source: KafkaTableSource(upc, originstore, updatedate, longdescription, itemdescription, brandname, displayimage, price, msrp, tpr) -> SourceConversion(table=[registry.default\_database.itemprice, source: [KafkaTableSource(upc, originstore, updatedate, longdescription, itemdescription, brandname, displayimage, price, msrp, tpr)]], fields=[upc, originstore, updatedate, longdescription, itemdescription, brandname, displayimage, price, msrp, tpr]) -> SinkConversionToTuple2 -> Sink: SQL Client Stream Collect Sink","optimizer\_properties":{}}]}}



**Flink Cluster Configuration**

**http://FLINKCLUSTER:8078/config**

{"refresh-interval":10000,"timezone-name":"Coordinated Universal Time","timezone-offset":0,"flink-version":"1.10.0-csa1.2.0.0","flink-revision":"664a5f5 @ 29.04.2020 @ 14:13:26 UTC","features":{"web-submit":false,"global-dashboard":true}}




**Jobs - Specific Job - Checkpoints**

**http://YARNCLUSTER:8088/proxy/application\_1599570933443\_0003/jobs/01988557ccd71cbab899ded9babab606/checkpoints**

{"counts":{"restored":0,"total":0,"in\_progress":0,"completed":0,"failed":0},"summary":{"state\_size":{"min":0,"max":0,"avg":0},"end\_to\_end\_duration":{"min":0,"max":0,"avg":0},"alignment\_buffered":{"min":0,"max":0,"avg":0}},"latest":{"completed":null,"savepoint":null,"failed":null,"restored":null},"history":[]}



**Job Manager Configuration**

**http://YARNCLUSTER:**** 8088/proxy/application\_1599570933443\_0003/jobmanager/config**


Enter fullscreen mode Exit fullscreen mode

[{"key":"yarn.flink-dist-jar","value":"file:/opt/cloudera/parcels/FLINK-1.10.0-csa1.2.0.0-cdh7.1.1.0-326-2901802/lib/flink/lib/flink-dist_2.11-1.10.0-csa1.2.0.0.jar"},{"key":"high-availability.cluster-id","value":"application_1599570933443_0003"},{"key":"jobmanager.rpc.address","value":"ec2-3-86-165-80.compute-1.amazonaws.com"},{"key":"high-availability.storageDir","value":"hdfs:///user/flink/ha"},{"key":"state.backend.rocksdb.timer-service.factory","value":"ROCKSDB"},{"key":"io.tmp.dirs","value":"/yarn/nm/usercache/root/appcache/application_1599570933443_0003"},{"key":"historyserver.cli.fallback","value":"true"},{"key":"parallelism.default","value":"1"},{"key":"execution.buffer-timeout","value":"100"},{"key":"jobmanager.heap.size","value":"1073741824"},{"key":"execution.checkpointing.mode","value":"EXACTLY_ONCE"},{"key":"taskmanager.memory.process.size","value":"2 gb"},{"key":"web.port","value":"0"},{"key":"state.backend.local-recovery","value":"true"},{"key":"state.backend.rocksdb.memory.managed","value":"true"},{"key":"yarn.tags","value":"flink"},{"key":"state.backend.incremental","value":"true"},{"key":"taskmanager.memory.network.fraction","value":"0.1"},{"key":"yarn.container-start-command-template","value":"%java% %jvmmem% %jvmopts% -DyarnContainerId=$CONTAINER_ID %logging% %class% %args% %redirects%"},{"key":"web.tmpdir","value":"/tmp/flink-web-37be6c08-f457-468a-9c4b-5fde92c042b1"},{"key":"jobmanager.rpc.port","value":"33566"},{"key":"execution.checkpointing.timeout","value":"60000"},{"key":"high-availability.zookeeper.quorum","value":"ec2-3-86-165-80.compute-1.amazonaws.com:2181"},{"key":"taskmanager.memory.managed.fraction","value":"0.4"},{"key":"rest.address","value":"ec2-3-86-165-80.compute-1.amazonaws.com"},{"key":"state.backend","value":"FILESYSTEM"},{"key":"logging.configuration.file","value":"/etc/flink/conf/log4j.properties"},{"key":"execution.checkpointing.max-concurrent-checkpoints","value":"1"},{"key":"high-availability.zookeeper.client.acl","value":"open"},{"key":"historyserver.web.address","value":"ec2-3-86-165-80.compute-1.amazonaws.com"},{"key":"state.checkpoints.num-retained","value":"3"},{"key":"historyserver.web.port","value":"8078"},{"key":"pipeline.auto-watermark-interval","value":"200"},{"key":"state.savepoints.dir","value":"hdfs:///user/flink/savepoints"},{"key":"pipeline.generic-types","value":"true"},{"key":"yarn.maximum-failed-containers","value":"100"},{"key":"yarn.application-attempts","value":"5"},{"key":"taskmanager.numberOfTaskSlots","value":"2"},{"key":"state.backend.rocksdb.memory.write-buffer-ratio","value":"0.5"},{"key":"jobmanager.archive.fs.dir","value":"hdfs:///user/flink/applicationHistory"},{"key":"execution.target","value":"yarn-per-job"},{"key":"pipeline.object-reuse","value":"false"},{"key":"internal.io.tmpdirs.use-local-default","value":"true"},{"key":"state.backend.rocksdb.memory.high-prio-pool-ratio","value":"0.1"},{"key":"taskmanager.memory.network.max","value":"2147483648"},{"key":"execution.attached","value":"false"},{"key":"internal.cluster.execution-mode","value":"NORMAL"},{"key":"execution.checkpointing.externalized-checkpoint-retention","value":"RETAIN_ON_CANCELLATION"},{"key":"high-availability","value":"ZOOKEEPER"},{"key":"execution.checkpointing.min-pause","value":"0"},{"key":"execution.checkpointing.snapshot-compression","value":"false"},{"key":"state.checkpoints.dir","value":"hdfs:///user/flink/checkpoints"}]

Job Manager - Log

http://YARNCLUSTER:8088/proxy/application\_1599570933443\_0003/jobmanager/log

2020-09-11 13:37:42,586 INFO org.apache.flink.yarn.YarnResourceManager - Disconnect job manager 9c2e3f25dae1d548dc730941d6484cbb@akka.tcp://flink@ec2-3-86-165-80.compute-1.amazonaws.com:33566/user/jobmanager\_7 for job 67a2de8fb291333bbd90b334f8f83def from the resource manager.
2020-09-11 13:37:42,586 INFO org.apache.flink.runtime.leaderelection.ZooKeeperLeaderElectionService - Stopping ZooKeeperLeaderElectionService ZooKeeperLeaderElectionService{leaderPath='/leader/67a2de8fb291333bbd90b334f8f83def/job\_manager\_lock'}.
2020-09-11 13:37:42,591 INFO org.apache.flink.runtime.jobmanager.ZooKeeperJobGraphStore - Removed job graph 67a2de8fb291333bbd90b334f8f83def from ZooKeeper.



**Job Manager - Standard Out**

**http://YARNCLUSTER:8088/proxy/application\_1599570933443\_0003/jobmanager/stdout**




**Task Managers**

**http://YARNCLUSTER:8088/proxy/application\_1599570933443\_0003/taskmanagers**




Enter fullscreen mode Exit fullscreen mode

{"taskmanagers":[{"id":"container_1599570933443_0003_04_000002","path":"akka.tcp://flink@ec2-3-86-165-80.compute-1.amazonaws.com:38435/user/taskmanager_0","dataPort":41920,"timeSinceLastHeartbeat":1599832789681,"slotsNumber":2,"freeSlots":1,"hardware":{"cpuCores":16,"physicalMemory":133838598144,"freeMemory":669515776,"managedMemory":665719939}}]}

Task Managers - Container

http://YARNCLUSTER:8088/proxy/application\_1599570933443\_0003/taskmanagers/container\_1599570933443\_0003\_04\_000002

{"id":"container\_1599570933443\_0003\_04\_000002","path":"akka.tcp://flink@ec2-3-86-165-80.compute-1.amazonaws.com:38435/user/taskmanager\_0","dataPort":41920,"timeSinceLastHeartbeat":1599832809720,"slotsNumber":2,"freeSlots":1,"hardware":{"cpuCores":16,"physicalMemory":133838598144,"freeMemory":669515776,"managedMemory":665719939},"metrics":{"heapUsed":200512944,"heapCommitted":664272896,"heapMax":664272896,"nonHeapUsed":145389616,"nonHeapCommitted":149569536,"nonHeapMax":780140544,"directCount":5111,"directUsed":168414439,"directMax":168414435,"mappedCount":0,"mappedUsed":0,"mappedMax":0,"memorySegmentsAvailable":5079,"memorySegmentsTotal":5079,"garbageCollectors":[{"name":"PS\_Scavenge","count":91,"time":453},{"name":"PS\_MarkSweep","count":4,"time":358}]}}
Enter fullscreen mode Exit fullscreen mode

Task Managers - Container - Log

http://YARNCLUSTER:8088/proxy/application\_1599570933443\_0003/taskmanagers/container\_1599570933443\_0003\_04\_000002/log

2020-09-09 17:58:17,456 INFO org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer - Starting FlinkKafkaInternalProducer (1/1) to produce into default topic krogerprices
2020-09-09 17:58:17,465 INFO org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase - Consumer subtask 0 has no restore state.
2020-09-09 17:58:17,475 INFO org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values: 



**Task Managers - Container - Standard Out**

**http://YARNCLUSTER:8088/proxy/application\_1599570933443\_0003/taskmanagers/container\_1599570933443\_0003\_04\_000002/stdout**





[![](https://1.bp.blogspot.com/-DNlb2WnE75M/X1uRD_nnyKI/AAAAAAAAbmg/8gHSfuXtqwcNuZEqRXDwr7jl1i1RzHvFQCLcBGAsYHQ/w625-h48/flinkrestapijobdetails.png)](https://1.bp.blogspot.com/-DNlb2WnE75M/X1uRD_nnyKI/AAAAAAAAbmg/8gHSfuXtqwcNuZEqRXDwr7jl1i1RzHvFQCLcBGAsYHQ/s2559/flinkrestapijobdetails.png)

[![](https://1.bp.blogspot.com/-v-9rSv090vg/X1uREujpx9I/AAAAAAAAbmk/DUn3o5gwpRQ1fBSt4J3E0Xfpq-Z-QyCiwCLcBGAsYHQ/w625-h75/flinkrestapijobs.png)](https://1.bp.blogspot.com/-v-9rSv090vg/X1uREujpx9I/AAAAAAAAbmk/DUn3o5gwpRQ1fBSt4J3E0Xfpq-Z-QyCiwCLcBGAsYHQ/s2560/flinkrestapijobs.png)

Enter fullscreen mode Exit fullscreen mode

Talks

For the definitive way to read and use these, see Caito's awesome talk here:

https://www.youtube.com/watch?v=MQQ7qaKKKc8

Command Line (https://docs.cloudera.com/csa/1.2.0/job-lifecycle/topics/csa-supported-cli.html)

flink list
Enter fullscreen mode Exit fullscreen mode

20/09/11 13:45:34 INFO cli.CliFrontend: Successfully retrieved list of jobs

------------------ Running/Restarting Jobs -------------------

09.09.2020 17:51:33 : 01988557ccd71cbab899ded9babab606 : default: insert into krogerprices

select upc,updatedate,itemdescription,brandname,CAST(price as float) as price, UUID() as uuid

from itemprice

where originstore = 'kroger' (RUNNING)


Top comments (0)