I’m making queries to the AWS API, which has a throttle on the number of calls per second you can make.
Unfortunately some of their actions take a single object (instead of a list), which forces me to generate one call per object. That means that sometimes I end up generating hundreds of individual call to the API, which breaks their limit rate of a 100 req per second if they’re all running at the same time.
I tried using Task.sequence to make sure they don’t run concurrently, but those requests complete very quickly and I’m still breaking the rate limit of the API sometimes. An easy fix would be to retry until it works, but that’s very dirty.
What would be a good way to get those hundreds of Cmd to be throttled, maybe run one every 100ms or something of that effect ?
The only thing I can think of would be to queue them in the model and have update be called every 100ms to unqueue them. I’m kind of hopin there’s an easier way, something like Cmd.batchThrottled 100 myVeryLongList for example.
Assuming all these requests can be made into tasks, you could delay all of them using Process.sleep as mentioned by @rupert above using code like this:
delay : Float -> Task e a -> Task e a
delay millis task =
Process.sleep millis
|> Task.andThen (\_ -> task)
You could then combine that with Task.sequence like:
Now I am wondering… is there something clever that can be done to try and make the throttle more adaptive to a specific rate?
100 requesys per second is 10ms per request. But suppose the requests are taking 5ms and we space them 10ms apart, that will come to 15ms per request or only 66 requests per second. The problem is we don’t really know in advance how long the requests will take, and likely it will also change as network conditions change and the random jitter in the times too.
Could we somehow factor in a Time.now Task with this, that will get the current timestamp, and subtract the timestamp for the previous request, and adjust the Process.sleep amount, in order to try and keep the requests running 10ms apart more accurately?
You would need to run them in sequence and check the execution time against a throttle value. It would look like this:
nowInMillis : Task x Int
nowInMillis =
Task.map Time.posixToMillis Time.now
sequenceHelper : Int -> Task x a -> Task x ( Int, List a ) -> Task x ( Int, List a )
sequenceHelper throttle task accTask =
let
throttleIfNeeded prevTime acc currentTime =
if currentTime - prevTime > throttle then
Task.map (\t -> ( currentTime, acc ++ [ t ] )) task
else
Process.sleep (toFloat <| prevTime + throttle - currentTime)
|> Task.andThen
(\_ ->
Task.map2 (\newTime t -> ( newTime, acc ++ [ t ] ))
nowInMillis
task
)
in
accTask
|> Task.andThen
(\( prevTime, acc ) ->
nowInMillis
|> Task.andThen (throttleIfNeeded prevTime acc)
)
sequenceWithThrottle : Int -> List (Task x a) -> Task x (List a)
sequenceWithThrottle throttle tasks =
case tasks of
[] ->
Task.succeed []
first :: rest ->
let
firstTask =
Task.map2 (\time value -> ( time, [ value ] )) nowInMillis first
in
List.foldl (sequenceHelper throttle) firstTask rest
|> Task.map Tuple.second
See it here in action. The actual tasks are simple Time.now with a Debug.log that outputs the time in the console.
Very good @pdamoc. That implements what I was trying to describe.
I also think there can be an even more accurate implementation - the so-called “token bucket” throttle:
If you just look at the difference between the last timestamp and the current time, when deciding to throttle or not, things will drift off more long term. For example, suppose some requests take > 100ms, later requests will not be allowed to run faster to compensate for this and adjust back on track to an overall average of 100ms.
In the token bucket throttle, you take an overall start time, and every time you look at the clock you know how many requests should have been able to run within that time (available tokens). If you have already sent more than that many (expired tokens), sleep for a bit. If you have already sent less than that many, send the next one.
Definitely! The current implementation is just making sure there are no more than one call per throttle interval (low hanging fruit) but a more sophisticated implementation would actually implement a logic that makes sure that the number of requests per second is being served.
This is also a case where you are probably better off writing it as a model with commands than as a task. Amongst other things, that makes it easier to stop sending requests if you lose interest. It also means that you can deliver incremental results back to your main model sooner.
Yes, I tried @pdamoc’s code and that works very well, but unfortunately AWS’s throttling doesn’t seem as simple as that. I can’t find any clear information as to the actual limits, but the cloud formation API has some very low limits and I end up having to throttle down to about 1 req / sec to get it to work reliably, which of course means the whole page takes forever to load.
I’ve been playing around with a Cmd queue in my model and a subscription to unqueue commands one by one at different rates, depending on the endpoint, and that allows the page to load bit by bit and I can discard the queue if the user changes to some other page, but cloud formation’s limits are so low that I’m still getting denied from time to time.
I’ll have to do both, queuing / throttling and having some kind of retry logic when a query fails.
Http.Error BadStatus only has the status code, and in AWS’s case all failures return a 400 with the actual error in the json body, so I’m now trying to find a way to get that body to know when the error was throttling and not something else (like an expired token) to implement retries.
It’s not that simple since I’m using @rupert’s elm-aws-core, so I’m not generating the Task myself. I’ll see tomorrow if there’s any way around it, or if I need to make a PR to add some way to handle this to the package since that’ll probably be affecting other users aswell at some point.
I didn’t know elm-retry existed, that’s very handy actually.
I got it to work by using both setResponseParser and elm-retry, here is the code :
awsThrottling =
let nextPolicy _ sameTask err =
case err of
H.BadStatus 442 -> Task.succeed sameTask
_ -> Task.fail err
in
Retry.Policy nextPolicy
retryHandler decoder resp =
case resp of
H.BadStatus_ meta body ->
if meta.statusCode == 400
then
case Decode.decodeString (Decode.succeed (\l -> l) |> required "Error" (Decode.succeed (\l -> l) |> required "Code" Decode.string)) body of
Ok "Throttling" -> Err <| H.BadStatus 442
_ -> Err <| H.BadStatus meta.statusCode
else
Err <| H.BadStatus meta.statusCode
H.GoodStatus_ meta body ->
case Decode.decodeString decoder body of
Ok r -> Ok r
Err e -> Err <| H.BadBody <| Decode.errorToString e
e -> Err <| H.BadStatus 500
Http.requestWithJsonDecoder "DescribeStackResources" GET "/" Http.emptyBody stackResourcesDecoder |> Http.addQuery params |> Http.setResponseParser (retryHandler stackResourcesDecoder) |> Http.send (cf region) creds |> Retry.with [Retry.maxDuration 7000, awsThrottling, Retry.exponentialBackoff { interval = 500, maxInterval = 3000 }] |> Task.attempt msg
That’s just a very quick try, which worked, I’ll clean that up now and handle errors properly but basically I’m just using setResponseParser to change the code 400 to 442, and then I have a retry Policy to only retry when the status code is 442. With that + my existing throttling I’m getting a few 400 but they’re retried a bit later and it seems to load fine.