Throttling commands

Hi,

I’m making queries to the AWS API, which has a throttle on the number of calls per second you can make.
Unfortunately some of their actions take a single object (instead of a list), which forces me to generate one call per object. That means that sometimes I end up generating hundreds of individual call to the API, which breaks their limit rate of a 100 req per second if they’re all running at the same time.

I tried using Task.sequence to make sure they don’t run concurrently, but those requests complete very quickly and I’m still breaking the rate limit of the API sometimes. An easy fix would be to retry until it works, but that’s very dirty.

What would be a good way to get those hundreds of Cmd to be throttled, maybe run one every 100ms or something of that effect ?
The only thing I can think of would be to queue them in the model and have update be called every 100ms to unqueue them. I’m kind of hopin there’s an easier way, something like Cmd.batchThrottled 100 myVeryLongList for example.

Thanks !

Could you use the Process.sleep Task? Between each of your request Tasks you would put a Process.sleep Task, and that would space them out?

Assuming all these requests can be made into tasks, you could delay all of them using Process.sleep as mentioned by @rupert above using code like this:

delay : Float -> Task e a -> Task e a
delay millis task =
  Process.sleep millis
    |> Task.andThen (\_ -> task) 

You could then combine that with Task.sequence like:

listOfHttpTasks
  |> List.map (delay 100)
  |> Task.sequence
3 Likes

:thinking: Now I am wondering… is there something clever that can be done to try and make the throttle more adaptive to a specific rate?

100 requesys per second is 10ms per request. But suppose the requests are taking 5ms and we space them 10ms apart, that will come to 15ms per request or only 66 requests per second. The problem is we don’t really know in advance how long the requests will take, and likely it will also change as network conditions change and the random jitter in the times too.

Could we somehow factor in a Time.now Task with this, that will get the current timestamp, and subtract the timestamp for the previous request, and adjust the Process.sleep amount, in order to try and keep the requests running 10ms apart more accurately?

You would need to run them in sequence and check the execution time against a throttle value. It would look like this:


nowInMillis : Task x Int
nowInMillis =
    Task.map Time.posixToMillis Time.now


sequenceHelper : Int -> Task x a -> Task x ( Int, List a ) -> Task x ( Int, List a )
sequenceHelper throttle task accTask =
    let
        throttleIfNeeded prevTime acc currentTime =
            if currentTime - prevTime > throttle then
                Task.map (\t -> ( currentTime, acc ++ [ t ] )) task

            else
                Process.sleep (toFloat <| prevTime + throttle - currentTime)
                    |> Task.andThen
                        (\_ ->
                            Task.map2 (\newTime t -> ( newTime, acc ++ [ t ] ))
                                nowInMillis
                                task
                        )
    in
    accTask
        |> Task.andThen
            (\( prevTime, acc ) ->
                nowInMillis
                    |> Task.andThen (throttleIfNeeded prevTime acc)
            )


sequenceWithThrottle : Int -> List (Task x a) -> Task x (List a)
sequenceWithThrottle throttle tasks =
    case tasks of
        [] ->
            Task.succeed []

        first :: rest ->
            let
                firstTask =
                    Task.map2 (\time value -> ( time, [ value ] )) nowInMillis first
            in
            List.foldl (sequenceHelper throttle) firstTask rest
                |> Task.map Tuple.second


See it here in action. The actual tasks are simple Time.now with a Debug.log that outputs the time in the console.

2 Likes

Very good @pdamoc. That implements what I was trying to describe.

I also think there can be an even more accurate implementation - the so-called “token bucket” throttle:

If you just look at the difference between the last timestamp and the current time, when deciding to throttle or not, things will drift off more long term. For example, suppose some requests take > 100ms, later requests will not be allowed to run faster to compensate for this and adjust back on track to an overall average of 100ms.

In the token bucket throttle, you take an overall start time, and every time you look at the clock you know how many requests should have been able to run within that time (available tokens). If you have already sent more than that many (expired tokens), sleep for a bit. If you have already sent less than that many, send the next one.

Definitely! The current implementation is just making sure there are no more than one call per throttle interval (low hanging fruit) but a more sophisticated implementation would actually implement a logic that makes sure that the number of requests per second is being served.

Later edit:

I have implemented a max tasks per second version too.

This is also a case where you are probably better off writing it as a model with commands than as a task. Amongst other things, that makes it easier to stop sending requests if you lose interest. It also means that you can deliver incremental results back to your main model sooner.

Yes, I tried @pdamoc’s code and that works very well, but unfortunately AWS’s throttling doesn’t seem as simple as that. I can’t find any clear information as to the actual limits, but the cloud formation API has some very low limits and I end up having to throttle down to about 1 req / sec to get it to work reliably, which of course means the whole page takes forever to load.

I’ve been playing around with a Cmd queue in my model and a subscription to unqueue commands one by one at different rates, depending on the endpoint, and that allows the page to load bit by bit and I can discard the queue if the user changes to some other page, but cloud formation’s limits are so low that I’m still getting denied from time to time.

I’ll have to do both, queuing / throttling and having some kind of retry logic when a query fails.
Http.Error BadStatus only has the status code, and in AWS’s case all failures return a 400 with the actual error in the json body, so I’m now trying to find a way to get that body to know when the error was throttling and not something else (like an expired token) to implement retries.

You will need Http.expectStringResponse and some decoding.

Maybe it would make sense to map the errors into a custom error type that you can handle better.

You would then use Http.riskyTask and Task.onError to pattern match on the throttling error and just retry the task after some delay.

It’s not that simple since I’m using @rupert’s elm-aws-core, so I’m not generating the Task myself. I’ll see tomorrow if there’s any way around it, or if I need to make a PR to add some way to handle this to the package since that’ll probably be affecting other users aswell at some point.

I think the AWS core libraries use exponential backoff with jitter for their retry strategy.

You could use elm-retry to handle exponential backoff, you give it a task and it adds the retry ability.

originalTask
    |> Retry.with
        [ Retry.maxDuration 7000
        , Retry.exponentialBackoff { interval = 500, maxInterval = 3000 }
        ]
    |> Task.attempt DidOriginalTask
1 Like

Yes, not so keen on that every error is a 400 approach… but it is what it is.

This should help you:

https://package.elm-lang.org/packages/the-sett/elm-aws-core/latest/AWS-Core-Http#setResponseParser

It gives you the Http.Reponse:

https://package.elm-lang.org/packages/elm/http/latest/Http#Response

Which includes the body.

===

I created an issue for this, as I can see how it can be improved by tailoring the error reporting better to how AWS works:

Hi all,

I didn’t know elm-retry existed, that’s very handy actually.
I got it to work by using both setResponseParser and elm-retry, here is the code :


awsThrottling =
  let nextPolicy _ sameTask err =
        case err of
          H.BadStatus 442 -> Task.succeed sameTask
          _               -> Task.fail err
  in
  Retry.Policy nextPolicy

retryHandler decoder resp =
  case resp of
    H.BadStatus_ meta body ->
      if meta.statusCode == 400
      then
        case Decode.decodeString (Decode.succeed (\l -> l) |> required "Error" (Decode.succeed (\l -> l) |> required "Code" Decode.string)) body of
          Ok "Throttling" -> Err <| H.BadStatus 442
          _               -> Err <| H.BadStatus meta.statusCode
      else
        Err <| H.BadStatus meta.statusCode
    H.GoodStatus_ meta body ->
      case Decode.decodeString decoder body of
        Ok r  -> Ok r
        Err e -> Err <| H.BadBody <| Decode.errorToString e
    e                 -> Err <| H.BadStatus 500

Http.requestWithJsonDecoder "DescribeStackResources" GET "/" Http.emptyBody stackResourcesDecoder |> Http.addQuery params |> Http.setResponseParser (retryHandler stackResourcesDecoder) |> Http.send (cf region) creds |> Retry.with [Retry.maxDuration 7000, awsThrottling, Retry.exponentialBackoff { interval = 500, maxInterval = 3000 }] |> Task.attempt msg

That’s just a very quick try, which worked, I’ll clean that up now and handle errors properly but basically I’m just using setResponseParser to change the code 400 to 442, and then I have a retry Policy to only retry when the status code is 442. With that + my existing throttling I’m getting a few 400 but they’re retried a bit later and it seems to load fine.

Thanks a lot for all the help !

2 Likes

This topic was automatically closed 10 days after the last reply. New replies are no longer allowed.