Maker.io main logo

Getting Started with Asyncio in MicroPython (Raspberry Pi Pico)

2 028

2021-08-09 | By ShawnHymel

License: Attribution

Concurrent programming allows us to run multiple tasks simultaneously in a program. It’s extremely useful if you need to have tasks perform something periodically (e.g. taking a measurement from a sensor) or wait for a function to complete (e.g. waiting for a response from a web page). During these downtimes, you can have the program run another task in order to use the processor more efficiently.

Asyncio is a Python library that allows us to create functions called “coroutines” that are capable of pausing to allow another coroutine to run. With asyncio, we can create cooperative multitasking programs. This is different from preemptive multitasking, in which the scheduler can force a task to stop in order to run another task.

With many preemptive schedulers, the scheduler runs at a given interval to see if a new thread needs to be run. The scheduler can also run when a thread specifically yields.

Preemptive scheduling time slicing

Full preemptive multitasking is possible in Python with the threading library. However, Python implements a feature known as the “global interpreter lock” (GIL) to prevent race conditions. While this helps beginners create functioning concurrent programs without running into tricky bugs, it limits the capabilities of multitasking and prevents full optimization of processor utilization.

Asyncio allows us to create cooperative multitasking programs where tasks must explicitly give up control of the processor to allow other tasks to run. While this puts the onus on the programmer to make sure the tasks play nicely, it helps prevent race conditions caused by schedulers that can (seemingly randomly) pause a thread to execute another thread.

With cooperative multitasking, each task must make sure to yield the processor to allow other tasks to run.

Cooperative multitasking time slicing

MicroPython implements a version of asyncio called uasyncio that contains a subset of the functions available in the full asyncio library. In this tutorial, we’ll give a brief example of how to use the uasyncio library to create a cooperative multitasking program.

You can see this tutorial in video form here:

 

 

Hardware Hookup

We’ll use the Raspberry Pi Pico for this demo (most microcontrollers that run MicroPython should also work). Make sure that you have the latest version of MicroPython running on your Raspberry Pi Pico (here is the easiest way to do that).

Connect a single pushbutton to the Pico as shown. Note that we will be using an internal pull-up resistor for the button.

Raspberry Pi Pico pushbutton circuit

Queue Library

In order to demonstrate queues, we’ll need a separate library, as the MicroPython version of asyncio does not support queues (at this time).

Head to https://github.com/peterhinch/micropython-async/blob/master/v3/primitives/queue.py and copy the code (click Raw). Paste it into a new document in Thonny. Click File > Save As. Create a new folder on your Raspberry Pi named /lib. Save this file as queue.py in that /lib folder.

Saving queue library in Thonny

Code

In a new document, enter the following code:

Copy Code
import machine
import uasyncio
import utime
import queue

# Settings
led = machine.Pin(25, machine.Pin.OUT)
btn = machine.Pin(15, machine.Pin.IN, machine.Pin.PULL_UP)

# Coroutine: blink on a timer
async def blink(q):
    delay_ms = 0
    while True:
        if not q.empty():
            delay_ms = await q.get()
        led.toggle()
        await uasyncio.sleep_ms(delay_ms)
        
# Coroutine: only return on button press
async def wait_button():
    btn_prev = btn.value()
    while (btn.value() == 1) or (btn.value() == btn_prev):
        btn_prev = btn.value()
        await uasyncio.sleep(0.04)
        
# Coroutine: entry point for asyncio program
async def main():
    
    # Queue for passing messages
    q = queue.Queue()
    
    # Start coroutine as a task and immediately return
    uasyncio.create_task(blink(q))
    
    # Main loop
    timestamp = utime.ticks_ms()
    while True:
        
        # Calculate time between button presses
        await wait_button()
        new_time = utime.ticks_ms()
        delay_time = new_time - timestamp
        timestamp = new_time
        print(delay_time)
        
        # Send calculated time to blink task
        delay_time = min(delay_time, 2000)
        await q.put(delay_time)
    
# Start event loop and run entry point coroutine
uasyncio.run(main())

Run the program. When you press the button, the LED should flash using a delay determined by the time between button presses.

Pressing button

In addition, you should see the time between button presses (in milliseconds) appear in the console window of Thonny.

Output of time between button presses

Code Discussion

A coroutine is a function that is capable of pausing its own execution to allow other code to run. This is a useful feature if you want to create a periodic task or have the processor work on something else while waiting for a return value (e.g. from a sensor or a website).

To define a coroutine, we use the async keyword in front of the function definition. For example:

 

Copy Code
# Coroutine: blink on a timer
async def blink(q):
    delay_ms = 0
    while True:
        if not q.empty():
            delay_ms = await q.get()
        led.toggle()
        await uasyncio.sleep_ms(delay_ms)

Here, we create a coroutine named “blink” that can be called as a function. In it, we define a forever loop that looks for elements in a queue (q). If an element is found, it updates the delay_ms variable with the integer value found in the queue element. It then toggles the onboard LED and sleeps for the amount given by delay_ms

The await keyword is another special term in the asyncio library. It yields the processor while waiting for the given function (after the await keyword) to return. Note that we must use the special sleep_ms function that’s part of the uasyncio library. That’s because the regular time.sleep_ms() function does not yield the processor. 

You must use specially crafted coroutines that yield the processor when using await. You can learn more about creating your own awaitable objects/functions here.

Our wait_button() coroutine uses a similar cooperative structure:

 

Copy Code
# Coroutine: only return on button press
async def wait_button():
    btn_prev = btn.value()
    while (btn.value() == 1) or (btn.value() == btn_prev):
        btn_prev = btn.value()
        await uasyncio.sleep(0.04)

It simply checks the state of the button every 0.04 seconds while yielding the processor between those checks. If the button has been pressed, the function returns.

In main, we create Queue object that we pass to our blink task:

 

Copy Code
    # Queue for passing messages
    q = queue.Queue()
    
    # Start coroutine as a task and immediately return
    uasyncio.create_task(blink(q))

Here, create_task() runs a function and then returns as soon as the function yields. This allows us to execute code in an apparently simultaneous manner.

While blink is running, the program enters the main (while True) loop. In it, it calls the following:

Copy Code
        await wait_button()

This yields the processor while waiting for the button to be pressed, all while still blinking the LED.

Once the button has been pressed, wait_button() returns, and we calculate the time between this and the last button press. That time is capped at 2000 ms and added to the queue with:

Copy Code
        await q.put(delay_time)

The next time the blink() coroutine runs, it will get that value from the queue and update the delay time.

Finally, you might have noticed that we must create a main entry point for our asyncio program by defining the main coroutine with async:

Copy Code
async def main():

We can only call coroutines from within another coroutine, as there is a background scheduler task that runs to switch tasks. This scheduler starts and runs as part of the asyncio “event loop.” The top-level coroutine must be called with the run() command:

 

Copy Code
# Start event loop and run entry point coroutine
uasyncio.run(main())

You should only have one run() coroutine running at a time, as that controls the starting and stopping of the scheduler. Inside the event loop, you can call as many coroutines as you’d like.

Going Further

Asyncio is a complex library with lots to learn when it comes to creating cooperative multitasking programs, but I hope this guide has given you a start to using it in your MicroPython projects. If you’d like to learn more about asyncio and uasyncio, check out these links:

Mfr Part # SC0915
RASPBERRY PI PICO RP2040
Raspberry Pi
Add all DigiKey Parts to Cart
Have questions or comments? Continue the conversation on TechForum, DigiKey's online community and technical resource.