I'll discuss approach how to make things run in parallel to collect data from some http endpoint. Far from perfect but good as starter to exploring this are.
Situation: we have some old code (coded by me years ago) that was a quick fix to collect incidents data from Service Mananger 9 (previous HP product, now maybe Microfocus). The code was ugly hack done in approx 1 day to get this thing done somehow. I opted back then to use dumb shell solution. As we all know, living in linux shell means you can do a lot, albeit not so efficiently. So the code grabbed incident numbers from DB, that were not closed. For each incident it executed curl to get status (and the stupid design required you to ask for incident and state - either open or closed, so if you have 1000 incidents, you have to do 2000 queries). The code put cap on max 24 running queries at a time. This means we constantly bombed kernel with new processes just to get some data from https endpoint.
Version 0.1
I have decided it is time to have some fun and use Common Lisp to create algorithm representation that deals with parallel execution. For this I decided to use Clozure common lisp, put basic Qucklisp there and load some libraries to do this.
Parallel
First package to use is lparallel to enable parallel processing without much coding on my side. Thing are easy here, you define lparallel:*kernel*
with number of workers available for parallel tasks, define channel to receive results and start coding. I have actually used approach that does not even require channel for results.
(defparameter worker-count 32)
(setf lparallel:*kernel* (lparallel:make-kernel worker-count :bindings
`((*standard-output* . ,*standard-output*)
(*error-output* . ,*error-output*)
(*drakma-stream* . nil) ))))
(setf *master-channel* (lparallel:make-channel))
You might raise eyebrows what is binding for? Well any program has one input, one output and one error stream/file descriptor when running. We are basically telling parallel kernel to have the same thing in all parallel threads defined. The last piece, *drakma-stream*
is initially set to nil so that each parallel task can have its own connection available.
Drakma continous querying
We will reuse connection between http requests to save time on SSL handshake and overall load of target server. We are talking to some Tomcat server that has setup of 120 seconds for keep-alive. This means that we should take care of that and properly close and re-open connection when keep-alive expires. After some experiments, we defined base minimum function that wraps drakma calls.
First we define helper macro to add stream to drakma
call if we are reusing it. We expect some TARGET-URI
to hold value of endpoint url. The content
is post form content tha is used in query.
(defmacro DRAKMA-REQUEST-CONT (content a-stream)
"Macro constructing drakma:http-request according to a-stream value"
(let* ((request-1 (list 'drakma:http-request (TARGET-URI)
:method :post
:verify nil
:content-type "application/xml"
:close nil
:content content))
(request (if (null a-stream)
request-1
(append request-1 (list :stream a-stream))))
(response `(ignore-errors (multiple-value-list ,request))))
(if response
response
`(multiple-value-list ,request-1))))
Next we define function that will do the handling
(defmacro PARSE-RESULT-OCTET(result-octet)
`(flexi-streams:octets-to-string ,result-octet :external-format :utf-8))
(defun SM9-QUERY (content)
"Query SM9 via rws reusing connection stored in drakma-sream variable"
; Check drakma-stream
(cond ((null *drakma-stream*) (log:info "Stream not opened, creating new connection."))
((streamp *drakma-stream*) (if (not (open-stream-p *drakma-stream*))
(progn
(log:debug "Stream is closed, setting stream to nil")
(setf *drakma-stream* nil))
;(log:debug "Stream opened, keeping it.")
))
(t (log:debug "Stream opened, keeping it.")))
(let* ((result-values (DRAKMA-REQUEST-CONT content *drakma-stream*))
(result-stream (if (nth 5 result-values) nil (nth 4 result-values))) ;must-close is true - return nil as stream
(status-code (cadr result-values)))
(if (null result-stream)
(progn (if *drakma-stream* (close *drakma-stream*)) (setf *drakma-stream* nil))
(setf *drakma-stream* result-stream))
;(log:debug status-code)
(format t ".")
(values (PARSE-RESULT-OCTET (car result-values)) (cadr result-values))))
With this in hand we have the base to run parallel getting of information reusing connection. How do we get this rolling?
Execution
We will define some result
for holding what we collected.
(setf result nil)
We somehow get the ticket-ids from db and hold it in *open-tickets*
list. The easiest way is to do
(progn
(setf result (lparallel:pmapcar #'SM9-GET-TICKET-BY-ID *open-tickets*))
(SM9-close-sockets)
"Execution finished"))
It is very important to close streams we used during collection. We defined SM9-close-sockets
as function that will broadcast task to all parallel threads of execution.
(defun SM9-close-sockets ()
"closes all sockets inside threads"
(lparallel:broadcast-task #'(lambda() (if (and (not (null *drakma-stream*)) (open-stream-p *drakma-stream*)) (close *drakma-stream*))(setf *drakma-stream* nil))))
Last piece is SM9-GET-TICKET-BY-ID
that takes ticket id and create some data (contetnt) to be send to target to obtain results.
It could look like this (expects defined SM9-USER
)
(defun SM9-REQUEST-TICKET(ticketId status)
(concatenate 'string
"<?xml version=\"1.0\" encoding=\"UTF-8\" standalone=\"yes\"?>
<ticketRetrieveRequest>
<criteria>
<customerDetails>
<companyId>****</companyId>
<locationId></locationId>
<managementRegion>*****</managementRegion>
<requestedById>" SM9-USER "</requestedById>
<requestedForId>" SM9-USER "</requestedForId>
</customerDetails>
<systemType>Incident</systemType>
<status>" status "</status>
<ticketId>" ticketId "</ticketId>
</criteria>
</ticketRetrieveRequest>"))
(defun SM9-GET-TICKET-BY-ID (ticketId)
"Get given ticketId from SM9 via rws server, tries both open and closed statuses, returns parsed object or NIL"
(let* ((result1 (SM9-QUERY (SM9-REQUEST-TICKET ticketId "open")))
(result2 (SM9-QUERY (SM9-REQUEST-TICKET ticketId "closed"))))
(or
(if (ignore-errors (jsown:val (ignore-errors (jsown:parse result1)) "tickets")) result1) ; try to parse result1
(if (ignore-errors (jsown:val (ignore-errors (jsown:parse result2)) "tickets")) result2)))) ; try to parse result2, at least one should be ok
So now we have version 0.1 ready we can play and test. Obvious question might be: why do we have to wait for all processing to be done? Cannot we write as we go when we have something ready?
Reasons are obvious - if the searched scope is too big (like millions of requests) we might run out-of-memory
and we will have to wait for this to happen. The second might be we want to know data as we got them, even if we do polling
.
Let us make V 0.2
We will explore further and create writing thread, that will receive result along the way (it can be obviously something else than write, but hey, this is basic you can easily check :))
Still not perfect, but slightly better.
Let's define file-writer
function that works on queues.
(defun file-writer(output-queue signal-queue feedback-queue target-file-name)
;;truncate file first
(let ((stream (open target-file-name :direction :output :if-exists :supersede :if-does-not-exist :create)))
(close stream))
;;open file for results
(with-open-file (stream target-file-name :direction :output :if-exists :append)
;;loop trhough output-queue , watch signal-queue fol final run
(loop while (lparallel.queue:queue-empty-p signal-queue)
do (multiple-value-bind (message exists) (lparallel.queue:try-pop-queue output-queue :timeout 1)
(if exists
(format stream "~a~%" message))))
;read signal
(if (lparallel.queue:queue-empty-p signal-queue)
;this should not happen, we are waiting for signal to finish
t
;actually there should be a signal in queue, read it
(format t "Got signal ~a~%" (lparallel.queue:pop-queue signal-queue))
)
; we might get signal sooner than all results are ready
(if (lparallel.queue:queue-empty-p output-queue)
; nothing to do
t
; else process other requests
(loop while (not (lparallel.queue:queue-empty-p output-queue))
do (let ((message (lparallel.queue:pop-queue output-queue)))
(format stream "~a~%" message)
)))
)
(lparallel.queue:push-queue "writer finished" feedback-queue))
Function uses lparallel.queue
package that comes with lparallel
when quicklisp
loads it. It takes target-file-name
, that it truncates at beginning. Next it looks at signal-queue
(if it is empty) and tries to read data from output-queue
with timeout. So function reads the output queue while watching if there is signal on signal-queue to finish processing. After receiving signal on signal-queue, it reads that signal, empties the output-queue (process it) and send message to feedback-queue for synchronizing. All this is to ensure, we properly close output stream. if we don't, we might experience truncated output at filesystem (like 16k or 72k or whatever it will be).
Lparallel uses bordeaux-threads
so we do not have to ask for it separately (quicklisp will load it as dependency). In order to start our new function, we define file-name, and queues, and run the writer.
(setf *output-queue* (lparallel.queue:make-queue :fixed-capacity 500))
(setf *signal-queue* (lparallel.queue:make-queue))
(setf *feedback-queue* (lparallel.queue:make-queue))
(defun run-in-background (fn &optional (name "Generic Background task") &rest params)
(let ((background-task (bt:make-thread (lambda()
(apply fn params)) :name name)))
broadcast-task))
;run writer thread
(run-in-background #'file-writer "SM9-Writer-Thread" *output-queue* *signal-queue* *feedback-queue* target-file-name)
;-------------------------
;main processing goes here
;-------------------------
;you can instruct file-writer to finish and close file and let you know when finished, by executing
(lparallel.queue:push-queue "finish" *signal-queue* )
;and wait for it to finishes by waiting on pop
(lparallel.queue:pop-queue *feedback-queue* )
;-------------------------
;this piece can run only after we processed all records
;-------------------------
Now we have the basics in place, but do not know how to process the data. Well actually it is not that difficult. Instead of returning the result from querying, we push it to output queue [ (lparallel.queue:push-queue data output-queue) ].
(defun SM9-GET-TICKET-BY-ID (ticketId)
"Get given ticketId from SM9 via rws server, tries both open and closed statuses, returns parsed object or NIL"
(let* ((result1 (SM9-QUERY (SM9-REQUEST-TICKET ticketId "open")))
(result2 (SM9-QUERY (SM9-REQUEST-TICKET ticketId "closed"))))
(or
(if (ignore-errors (jsown:val (ignore-errors (jsown:parse result1)) "tickets")) (lparallel.queue:push-queue result1 *output-queue*)) ; try to parse result1
(if (ignore-errors (jsown:val (ignore-errors (jsown:parse result2)) "tickets")) (lparallel.queue:push-queue result2 *output-queue*))))) ; try to parse result2, at least one should be ok
Initial benchmark shows we went from overloaded system (forks in shell scripts doing curl, jq, awk and whatnot) to balance execution with much shorter execution time (keep-alive, reusing connections).
That's it for first write, expecting constructive criticism. If you like this, let me know, I'll appreciate this.
Cheers!
Top comments (0)