Sleigh Decompiler Parallelization

In the upcoming Cerbero Suite 5.2 we used our new multi-processing technology (part 1, part 2) to parallelize the Sleigh decompiler by running it in a different process. This guarantees complete stability in case Sleigh encounters an issue and makes every decompiling operation safe to cancel.

We didn’t notice slow-downs by running the decompiler in a different process, in fact it’s still blazingly fast.

By parallelizing the decompiler we were also able to initialize it during the loading of the file / database. Thus, when the decompiler is invoked for the first time there is no initial delay.

Although the decompiler doesn’t take much time to load, the preloading makes it extra-snappy.

It is also possible to choose to run the decompiler in the same process as before from the Carbon settings.

Remote Containers

In our previous post we introduced multi-processing as implemented in the upcoming Cerbero Suite 5.2 and Cerbero Engine 2.2. In this post we’re going to talk about remote containers, which are an additional functionality of our multi-processing technology.

Containers (NTContainer) are a way to encapsulate any kind of raw data (e.g.: memory, files) and are used ubiquitously. There might be occasions in which a manager wants to share a container with a worker.

The API to accomplish this is very simple: all the manager has to do is to share the container using shareContainer() and the worker can access the container using getSharedContainer().

In the following example a 10 mega-bytes container is created with a signature appended at the end. A local and remote search is performed to find the signature.

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
from Pro.Core import NTContainer, MB_SIZE, NTTime
from Pro.MP import *
import time
remote_code = r"""
from Pro.Core import NTTime
from Pro.MP import *
def main():
c = proWorkerObject().getSharedContainer('NAME')
# remote search
magic = b'\xAA\xBB\xCC\xDD'
t = NTTime()
t.start()
match = c.findFirst(magic)
print('remote search (ms): ' + str(t.elapsed()))
main()
"""
def main():
magic = b"\xAA\xBB\xCC\xDD"
buf = b"\xFF" * 10 * MB_SIZE + magic
c = NTContainer()
c.setData(buf)
# local search
t = NTTime()
t.start()
match = c.findFirst(magic)
print("local search (ms):", t.elapsed())
m = ProManager()
m.setOptions(ProMPOpt_AtomicOutput)
m.shareContainer("NAME", c)
worker_id = m.startWorker()
m.evalPythonCode(worker_id, remote_code)
while m.isBusy():
m.processMessages()
time.sleep(0.1)
main()
from Pro.Core import NTContainer, MB_SIZE, NTTime from Pro.MP import * import time remote_code = r""" from Pro.Core import NTTime from Pro.MP import * def main(): c = proWorkerObject().getSharedContainer('NAME') # remote search magic = b'\xAA\xBB\xCC\xDD' t = NTTime() t.start() match = c.findFirst(magic) print('remote search (ms): ' + str(t.elapsed())) main() """ def main(): magic = b"\xAA\xBB\xCC\xDD" buf = b"\xFF" * 10 * MB_SIZE + magic c = NTContainer() c.setData(buf) # local search t = NTTime() t.start() match = c.findFirst(magic) print("local search (ms):", t.elapsed()) m = ProManager() m.setOptions(ProMPOpt_AtomicOutput) m.shareContainer("NAME", c) worker_id = m.startWorker() m.evalPythonCode(worker_id, remote_code) while m.isBusy(): m.processMessages() time.sleep(0.1) main()
from Pro.Core import NTContainer, MB_SIZE, NTTime
from Pro.MP import *
import time

remote_code = r"""
from Pro.Core import NTTime
from Pro.MP import *

def main():
    c = proWorkerObject().getSharedContainer('NAME')
    # remote search
    magic = b'\xAA\xBB\xCC\xDD'
    t = NTTime()
    t.start()
    match = c.findFirst(magic)
    print('remote search (ms): ' + str(t.elapsed()))

main()
"""

def main():
    magic = b"\xAA\xBB\xCC\xDD"
    buf = b"\xFF" * 10 * MB_SIZE + magic
    c = NTContainer()
    c.setData(buf)
    
    # local search
    t = NTTime()
    t.start()
    match = c.findFirst(magic)
    print("local search (ms):", t.elapsed())

    m = ProManager()
    m.setOptions(ProMPOpt_AtomicOutput)
    
    m.shareContainer("NAME", c)
    
    worker_id = m.startWorker()

    m.evalPythonCode(worker_id, remote_code)
    
    while m.isBusy():
        m.processMessages()
        time.sleep(0.1)
    
main()

The output is:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
local search (ms): 17
remote search (ms): 351
local search (ms): 17 remote search (ms): 351
local search (ms): 17
remote search (ms): 351

The reason for the time difference is, of course, that accessing the remote data is comparatively slower. This factor needs to be taken into consideration when working with remote containers.

Yet another limitation regarding remote containers is that they are read-only. This is for security reasons, as it wouldn’t be safe to allow other processes to change the original container.

In the next example the code asks the user to choose a Windows executable (PE), opens it and shares the container. The import table of the PE is then parsed from the worker process.

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
from Pro.Core import *
from Pro.UI import *
from Pro.MP import *
import time
remote_code = r'''
from Pro.Core import *
from Pro.MP import *
from Pro.PE import *
def main():
c = proWorkerObject().getSharedContainer("PE")
# print imported modules
obj = PEObject()
if not obj.Load(c):
print("error: couldn't load file")
return
imp = obj.ImportDirectory()
it = CFFStructIt(imp)
while it.hasNext():
cur = it.next()
name_rva = cur.Uns("Name")
name_offs = obj.RvaToOffset(name_rva)
if name_offs != INVALID_STREAM_OFFSET:
name = obj.ReadUInt8String(name_offs, 1000)[0]
name = name.decode("utf-8", errors="ignore")
print("imported module: " + name)
main()
'''
def main():
fname = proContext().getOpenFileName("Select Windows executable...", str(), "Executable files (*.exe)")
if not fname:
return
c = createContainerFromFile(fname)
if c.isNull():
return
m = ProManager()
m.setOptions(ProMPOpt_AtomicOutput)
m.shareContainer("PE", c)
worker_id = m.startWorker()
m.evalPythonCode(worker_id, remote_code)
while m.isBusy():
m.processMessages()
time.sleep(0.1)
main()
from Pro.Core import * from Pro.UI import * from Pro.MP import * import time remote_code = r''' from Pro.Core import * from Pro.MP import * from Pro.PE import * def main(): c = proWorkerObject().getSharedContainer("PE") # print imported modules obj = PEObject() if not obj.Load(c): print("error: couldn't load file") return imp = obj.ImportDirectory() it = CFFStructIt(imp) while it.hasNext(): cur = it.next() name_rva = cur.Uns("Name") name_offs = obj.RvaToOffset(name_rva) if name_offs != INVALID_STREAM_OFFSET: name = obj.ReadUInt8String(name_offs, 1000)[0] name = name.decode("utf-8", errors="ignore") print("imported module: " + name) main() ''' def main(): fname = proContext().getOpenFileName("Select Windows executable...", str(), "Executable files (*.exe)") if not fname: return c = createContainerFromFile(fname) if c.isNull(): return m = ProManager() m.setOptions(ProMPOpt_AtomicOutput) m.shareContainer("PE", c) worker_id = m.startWorker() m.evalPythonCode(worker_id, remote_code) while m.isBusy(): m.processMessages() time.sleep(0.1) main()
from Pro.Core import *
from Pro.UI import *
from Pro.MP import *
import time

remote_code = r'''
from Pro.Core import *
from Pro.MP import *
from Pro.PE import *

def main():
    c = proWorkerObject().getSharedContainer("PE")
    # print imported modules
    obj = PEObject()
    if not obj.Load(c):
        print("error: couldn't load file")
        return
    imp = obj.ImportDirectory()
   
    it = CFFStructIt(imp)
    while it.hasNext():
        cur = it.next()
        name_rva = cur.Uns("Name")
        name_offs = obj.RvaToOffset(name_rva)
        if name_offs != INVALID_STREAM_OFFSET:
            name = obj.ReadUInt8String(name_offs, 1000)[0]
            name = name.decode("utf-8", errors="ignore")
            print("imported module: " + name)
            

main()
'''

def main():
    fname = proContext().getOpenFileName("Select Windows executable...", str(), "Executable files (*.exe)")
    if not fname:
        return

    c = createContainerFromFile(fname)
    if c.isNull():
        return

    m = ProManager()
    m.setOptions(ProMPOpt_AtomicOutput)
    
    m.shareContainer("PE", c)
    
    worker_id = m.startWorker()

    m.evalPythonCode(worker_id, remote_code)
    
    while m.isBusy():
        m.processMessages()
        time.sleep(0.1)
    
main()

An example of output is:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
imported module: KERNEL32.dll
imported module: SHLWAPI.dll
imported module: KERNEL32.dll imported module: SHLWAPI.dll
imported module: KERNEL32.dll
imported module: SHLWAPI.dll

In the following example the shared container is shown in a hex view from the worker process.

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
from Pro.Core import *
from Pro.UI import *
from Pro.MP import *
import time
remote_code = r'''
from Pro.Core import *
from Pro.UI import *
from Pro.MP import *
def main():
c = proWorkerObject().getSharedContainer("DATA")
ctx = proContext()
hv = ctx.createView(ProView.Type_Hex, "Remote Container Data")
hv.setData(c)
dlg = ctx.createDialog(hv)
dlg.show()
main()
'''
def main():
fname = proContext().getOpenFileName("Select a file...")
if not fname:
return
c = createContainerFromFile(fname)
if c.isNull():
return
m = ProManager()
m.setOptions(ProMPOpt_AtomicOutput)
m.shareContainer("DATA", c)
worker_id = m.startWorker()
m.evalPythonCode(worker_id, remote_code)
while m.isBusy():
m.processMessages()
time.sleep(0.1)
main()
from Pro.Core import * from Pro.UI import * from Pro.MP import * import time remote_code = r''' from Pro.Core import * from Pro.UI import * from Pro.MP import * def main(): c = proWorkerObject().getSharedContainer("DATA") ctx = proContext() hv = ctx.createView(ProView.Type_Hex, "Remote Container Data") hv.setData(c) dlg = ctx.createDialog(hv) dlg.show() main() ''' def main(): fname = proContext().getOpenFileName("Select a file...") if not fname: return c = createContainerFromFile(fname) if c.isNull(): return m = ProManager() m.setOptions(ProMPOpt_AtomicOutput) m.shareContainer("DATA", c) worker_id = m.startWorker() m.evalPythonCode(worker_id, remote_code) while m.isBusy(): m.processMessages() time.sleep(0.1) main()
from Pro.Core import *
from Pro.UI import *
from Pro.MP import *
import time

remote_code = r'''
from Pro.Core import *
from Pro.UI import *
from Pro.MP import *

def main():
    c = proWorkerObject().getSharedContainer("DATA")
    ctx = proContext()
    hv = ctx.createView(ProView.Type_Hex, "Remote Container Data")
    hv.setData(c)
    dlg = ctx.createDialog(hv)
    dlg.show()

main()
'''

def main():
    fname = proContext().getOpenFileName("Select a file...")
    if not fname:
        return

    c = createContainerFromFile(fname)
    if c.isNull():
        return

    m = ProManager()
    m.setOptions(ProMPOpt_AtomicOutput)
    
    m.shareContainer("DATA", c)
    
    worker_id = m.startWorker()

    m.evalPythonCode(worker_id, remote_code)
    
    while m.isBusy():
        m.processMessages()
        time.sleep(0.1)
    
main()

This is a screenshot from running the last example.

Sharing containers with workers is a very inexpensive operation in terms of resources. Therefore, sharing many containers is not an issue.

Introducing Multi-Processing

We’re proud to announce the introduction of multi-processing in the upcoming Cerbero Suite 5.2 and Cerbero Engine 2.2.

Our products make use of parallel processing in terms of multi-threading whenever possible, but there are limitations to the capabilities of multi-threading.

Some of the advantages offered by multi-processing are:

  • Possible process isolation
  • Increased stability for 3rd party components
  • Overcoming the Global Interpreter Lock (GIL) in Python

When designing our multi-processing technology, we briefly took into consideration Python’s multiprocessing library, but we discarded the idea, because it wasn’t flexible enough for our intended purposes and we wanted to have an API not limited to Python.

We wanted our API not only to be flexible but also easy to use: when dealing with multi-processing there are challenges which we wanted to solve upfront, so that our users wouldn’t have to worry about them when using our API.

Additionally, since we wanted our multi-processing technology to also be fast and stable, we built it on top of ZeroMQ, an established ultra-fast messaging library which can be used for clustered solutions.

Introduction

In our API there are managers and workers. The manager (ProManager) is the object assigning tasks to workers (ProWorker). A worker is a separate process launched in the background which awaits instructions from the manager.

We can create as many managers as we want from our process and a manager can have as many workers as permitted by the resources of the system.

The manager can be created from within any thread, but must be accessed from within a single thread. Periodically the processMessages method of ProManager should be called to process internal messages.

The worker processes messages from a dedicated thread and every task assigned to it is guaranteed to be executed in the main thread. That’s very important, because it allows workers to access the user-interface API if needed.

The manager and worker maintain a regular communication. When a worker exits, the manager is informed about it. When the manager stops responding to a worker, the worker exits. This behavior guarantees that workers don’t become zombie processes.

The following is a basic code example.

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
from Pro.MP import *
import time
def main():
m = ProManager()
m.startWorker()
for i in range(3):
m.processMessages()
time.sleep(1)
print("finished!")
main()
from Pro.MP import * import time def main(): m = ProManager() m.startWorker() for i in range(3): m.processMessages() time.sleep(1) print("finished!") main()
from Pro.MP import *
import time

def main():
    m = ProManager()
    m.startWorker()
    
    for i in range(3):
        m.processMessages()
        time.sleep(1)
    
    print("finished!")
    
main()

This code creates a manager, starts a worker and processes messages for three seconds. It doesn’t do anything apart keeping the worker alive.

We can build upon the previous code by launching a test message box.

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
from Pro.MP import *
import time
def main():
m = ProManager()
worker_id = m.startWorker()
m.testMessageBox(worker_id)
while m.isBusy():
m.processMessages()
time.sleep(0.5)
print("finished!")
main()
from Pro.MP import * import time def main(): m = ProManager() worker_id = m.startWorker() m.testMessageBox(worker_id) while m.isBusy(): m.processMessages() time.sleep(0.5) print("finished!") main()
from Pro.MP import *
import time

def main():
    m = ProManager()
    
    worker_id = m.startWorker()
    m.testMessageBox(worker_id)
    
    while m.isBusy():
        m.processMessages()
        time.sleep(0.5)
    
    print("finished!")
    
main()

The main code finishes as soon as the message box is closed.

It is also possible to create multiple workers which all do the same task by using the special id ProWorker_All.

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
from Pro.MP import *
import time
def main():
m = ProManager()
for i in range(3):
m.startWorker()
m.testMessageBox(ProWorker_All)
while m.isBusy():
m.processMessages()
time.sleep(0.5)
print("finished!")
main()
from Pro.MP import * import time def main(): m = ProManager() for i in range(3): m.startWorker() m.testMessageBox(ProWorker_All) while m.isBusy(): m.processMessages() time.sleep(0.5) print("finished!") main()
from Pro.MP import *
import time

def main():
    m = ProManager()
    
    for i in range(3):
        m.startWorker()
        
    m.testMessageBox(ProWorker_All)

    while m.isBusy():
        m.processMessages()
        time.sleep(0.5)
    
    print("finished!")
    
main()

This time the main code finishes when all three message boxes are closed.

Output Redirection

Let’s now print something out from one of the workers.

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
from Pro.MP import *
import time
def main():
m = ProManager()
# we must specify this option in order to obtain the output from the workers
m.setOptions(ProMPOpt_RedirectOutput)
m.testMessage(m.startWorker())
for i in range(3):
m.processMessages()
time.sleep(1)
print("finished!")
main()
from Pro.MP import * import time def main(): m = ProManager() # we must specify this option in order to obtain the output from the workers m.setOptions(ProMPOpt_RedirectOutput) m.testMessage(m.startWorker()) for i in range(3): m.processMessages() time.sleep(1) print("finished!") main()
from Pro.MP import *
import time

def main():
    m = ProManager()
    # we must specify this option in order to obtain the output from the workers
    m.setOptions(ProMPOpt_RedirectOutput)
    
    m.testMessage(m.startWorker())
    
    for i in range(3):
        m.processMessages()
        time.sleep(1)
        
    print("finished!")
    
main()

The output is of the code is:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
Test message.
finished!
Test message. finished!
Test message.
finished!

As explained in the code, the ProMPOpt_RedirectOutput option must be set to obtain the output from the workers.

This option automatically simplifies one of the challenges when using multi-processing.

Let’s now launch multiple workers with a snippet of Python code to evaluate.

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
from Pro.MP import *
import time
def main():
m = ProManager()
m.setOptions(ProMPOpt_RedirectOutput)
for i in range(5):
m.startWorker()
m.evalPythonCode(ProWorker_All, "print('remote script')")
while m.isBusy():
m.processMessages()
time.sleep(0.5)
print("finished!")
main()
from Pro.MP import * import time def main(): m = ProManager() m.setOptions(ProMPOpt_RedirectOutput) for i in range(5): m.startWorker() m.evalPythonCode(ProWorker_All, "print('remote script')") while m.isBusy(): m.processMessages() time.sleep(0.5) print("finished!") main()
from Pro.MP import *
import time

def main():
    m = ProManager()
    m.setOptions(ProMPOpt_RedirectOutput)
    
    for i in range(5):
        m.startWorker()
    
    m.evalPythonCode(ProWorker_All, "print('remote script')")
    
    while m.isBusy():
        m.processMessages()
        time.sleep(0.5)
    
    print("finished!")
    
main()

The output is a bit confusing:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
remote scriptremote script
remote scriptremote scriptremote script
finished!
remote scriptremote script remote scriptremote scriptremote script finished!
remote scriptremote script

remote scriptremote scriptremote script


finished!

The reason for this is that the print function of Python internally writes the string and the new-line separately. Since in our case the execution is parallel, the strings and new-lines get mixed up.

To remedy this problem we can set the ProMPOpt_AtomicOutput option. This option does nothing else than to discard writes of standalone new-lines and append a new-line to every incoming string if a new-line at the end is missing.

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
from Pro.MP import *
import time
def main():
m = ProManager()
m.setOptions(ProMPOpt_RedirectOutput | ProMPOpt_AtomicOutput)
for i in range(5):
m.startWorker()
m.evalPythonCode(ProWorker_All, "print('remote script')")
while m.isBusy():
m.processMessages()
time.sleep(0.5)
print("finished!")
main()
from Pro.MP import * import time def main(): m = ProManager() m.setOptions(ProMPOpt_RedirectOutput | ProMPOpt_AtomicOutput) for i in range(5): m.startWorker() m.evalPythonCode(ProWorker_All, "print('remote script')") while m.isBusy(): m.processMessages() time.sleep(0.5) print("finished!") main()
from Pro.MP import *
import time

def main():
    m = ProManager()
    m.setOptions(ProMPOpt_RedirectOutput | ProMPOpt_AtomicOutput)
    
    for i in range(5):
        m.startWorker()
    
    m.evalPythonCode(ProWorker_All, "print('remote script')")
    
    while m.isBusy():
        m.processMessages()
        time.sleep(0.5)
    
    print("finished!")
    
main()

Now the output is what would be expected:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
remote script
remote script
remote script
remote script
remote script
finished!
remote script remote script remote script remote script remote script finished!
remote script
remote script
remote script
remote script
remote script
finished!

ProMPOpt_AtomicOutput can be used in conjunction with ProMPOpt_RedirectOutput or by itself, since it makes ProMPOpt_RedirectOutput implicit.

We can also execute a Python script on disk:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
from Pro.MP import *
import time
def main():
m = ProManager()
m.setOptions(ProMPOpt_AtomicOutput)
for i in range(5):
m.startWorker()
m.executePythonScript(ProWorker_All, r"path/to/remote.py")
while m.isBusy():
m.processMessages()
time.sleep(0.2)
print("finished!")
main()
from Pro.MP import * import time def main(): m = ProManager() m.setOptions(ProMPOpt_AtomicOutput) for i in range(5): m.startWorker() m.executePythonScript(ProWorker_All, r"path/to/remote.py") while m.isBusy(): m.processMessages() time.sleep(0.2) print("finished!") main()
from Pro.MP import *
import time

def main():
    m = ProManager()
    m.setOptions(ProMPOpt_AtomicOutput)
    
    for i in range(5):
        m.startWorker()
    
    m.executePythonScript(ProWorker_All, r"path/to/remote.py")
    
    while m.isBusy():
        m.processMessages()
        time.sleep(0.2)
    
    print("finished!")
    
main()

Calling Python Functions

If we need to call a function, we can use evalPythonFunction and executePythonFunction. These two methods are the counterparts of evalPythonCode and executePythonScript.

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
from Pro.Core import NTVariantList
from Pro.MP import *
import time
def main():
m = ProManager()
m.setOptions(ProMPOpt_AtomicOutput)
m.startWorker()
code = """
def sum(a, b):
print(a + b)
"""
args = NTVariantList()
args.append(4)
args.append(5)
m.evalPythonFunction(ProWorker_All, code, "sum", args)
for i in range(10):
m.processMessages()
time.sleep(0.2)
print("finished!")
main()
from Pro.Core import NTVariantList from Pro.MP import * import time def main(): m = ProManager() m.setOptions(ProMPOpt_AtomicOutput) m.startWorker() code = """ def sum(a, b): print(a + b) """ args = NTVariantList() args.append(4) args.append(5) m.evalPythonFunction(ProWorker_All, code, "sum", args) for i in range(10): m.processMessages() time.sleep(0.2) print("finished!") main()
from Pro.Core import NTVariantList
from Pro.MP import *
import time

def main():
    m = ProManager()
    m.setOptions(ProMPOpt_AtomicOutput)
    
    m.startWorker()
        
    code = """
def sum(a, b):
    print(a + b)
"""
        
    args = NTVariantList()
    args.append(4)
    args.append(5)

    m.evalPythonFunction(ProWorker_All, code, "sum", args)

    for i in range(10):
        m.processMessages()
        time.sleep(0.2)
    
    print("finished!")
    
main()

The result of the call is outputted, but what if we want to retrieve the result from the remote call in our code?

In that case we can set the last argument of evalPythonFunction to True, which will cause the result of the call to be sent to the manager.

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
from Pro.Core import NTVariantList
from Pro.MP import *
import time
def main():
m = ProManager()
m.setOptions(ProMPOpt_AtomicOutput)
worker_id = m.startWorker()
code = """
def sum(a, b):
return a + b
"""
args = NTVariantList()
args.append(4)
args.append(5)
m.evalPythonFunction(worker_id, code, "sum", args, True)
while m.isBusy():
m.processMessages()
time.sleep(0.1)
res = m.takeResult(worker_id)
print("result:", res)
print("finished!")
main()
from Pro.Core import NTVariantList from Pro.MP import * import time def main(): m = ProManager() m.setOptions(ProMPOpt_AtomicOutput) worker_id = m.startWorker() code = """ def sum(a, b): return a + b """ args = NTVariantList() args.append(4) args.append(5) m.evalPythonFunction(worker_id, code, "sum", args, True) while m.isBusy(): m.processMessages() time.sleep(0.1) res = m.takeResult(worker_id) print("result:", res) print("finished!") main()
from Pro.Core import NTVariantList
from Pro.MP import *
import time

def main():
    m = ProManager()
    m.setOptions(ProMPOpt_AtomicOutput)

    worker_id = m.startWorker()

    code = """
def sum(a, b):
    return a + b
"""

    args = NTVariantList()
    args.append(4)
    args.append(5)

    m.evalPythonFunction(worker_id, code, "sum", args, True)
    
    while m.isBusy():
        m.processMessages()
        time.sleep(0.1)
        
    res = m.takeResult(worker_id)
    print("result:", res)
    
    print("finished!")
    
main()

Similarly, we can launch multiple workers and collect the results from all of them:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
from Pro.Core import NTVariantList
from Pro.MP import *
import time
def main():
m = ProManager()
m.setOptions(ProMPOpt_AtomicOutput)
for i in range(10):
m.startWorker()
code = """
import random
def genRandom():
return random.randint(0, 1000)
"""
m.evalPythonFunction(ProWorker_All, code, "genRandom", NTVariantList(), True)
while m.isBusy():
m.processMessages()
time.sleep(0.1)
while m.hasResults():
res = m.takeResult(ProWorker_Any)
print("result:", res)
print("finished!")
main()
from Pro.Core import NTVariantList from Pro.MP import * import time def main(): m = ProManager() m.setOptions(ProMPOpt_AtomicOutput) for i in range(10): m.startWorker() code = """ import random def genRandom(): return random.randint(0, 1000) """ m.evalPythonFunction(ProWorker_All, code, "genRandom", NTVariantList(), True) while m.isBusy(): m.processMessages() time.sleep(0.1) while m.hasResults(): res = m.takeResult(ProWorker_Any) print("result:", res) print("finished!") main()
from Pro.Core import NTVariantList
from Pro.MP import *
import time

def main():
    m = ProManager()
    m.setOptions(ProMPOpt_AtomicOutput)

    for i in range(10):
        m.startWorker()

    code = """
import random

def genRandom():
    return random.randint(0, 1000)
"""

    m.evalPythonFunction(ProWorker_All, code, "genRandom", NTVariantList(), True)
    
    while m.isBusy():
        m.processMessages()
        time.sleep(0.1)
    
    while m.hasResults():
        res = m.takeResult(ProWorker_Any)
        print("result:", res)
    
    print("finished!")
    
main()

The random output:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
result: 4
result: 619
result: 277
result: 141
result: 542
result: 670
result: 541
result: 506
result: 248
result: 803
finished!
result: 4 result: 619 result: 277 result: 141 result: 542 result: 670 result: 541 result: 506 result: 248 result: 803 finished!
result: 4
result: 619
result: 277
result: 141
result: 542
result: 670
result: 541
result: 506
result: 248
result: 803
finished!

Custom Messaging

Many times we would want to establish a custom communication between the manager and the worker. For this purpose, we can define our own messages and send them.

A ProMPMessage consists of an id and optional data. We can define our own message ids in the range of 0 – 0x7FFFFFFF (higher values are reserved for internal purposes).

The following snippet of code launches a worker with a snippet of Python code which waits for a request and sends a response. The manager sends a request and waits for a response. If the response is received, it prints out the content as a string.

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
from Pro.MP import *
def main():
m = ProManager()
m.setOptions(ProMPOpt_AtomicOutput)
worker_id = m.startWorker()
code = """
from Pro.MP import *
w = proWorkerObject()
if w.waitForMessage(1000):
msg = w.getMessage()
if msg.id == 1:
resp = ProMPMessage(2)
resp.data = b'remote message'
w.sendMessage(resp)
"""
m.evalPythonCode(worker_id, code)
req = ProMPMessage(1)
m.sendMessage(worker_id, req)
if m.waitForMessage(worker_id, 1000):
msg = m.getMessage(worker_id)
if msg.id == 2:
print(msg.data.decode("utf-8"))
else:
print("unknown message:", msg.id)
else:
print("no message")
print("finished!")
main()
from Pro.MP import * def main(): m = ProManager() m.setOptions(ProMPOpt_AtomicOutput) worker_id = m.startWorker() code = """ from Pro.MP import * w = proWorkerObject() if w.waitForMessage(1000): msg = w.getMessage() if msg.id == 1: resp = ProMPMessage(2) resp.data = b'remote message' w.sendMessage(resp) """ m.evalPythonCode(worker_id, code) req = ProMPMessage(1) m.sendMessage(worker_id, req) if m.waitForMessage(worker_id, 1000): msg = m.getMessage(worker_id) if msg.id == 2: print(msg.data.decode("utf-8")) else: print("unknown message:", msg.id) else: print("no message") print("finished!") main()
from Pro.MP import *

def main():
    m = ProManager()
    m.setOptions(ProMPOpt_AtomicOutput)
    
    worker_id = m.startWorker()
    
    code = """
from Pro.MP import *

w = proWorkerObject()
if w.waitForMessage(1000):
    msg = w.getMessage()
    if msg.id == 1:
        resp = ProMPMessage(2)
        resp.data = b'remote message'
        w.sendMessage(resp)
"""
    m.evalPythonCode(worker_id, code)
    
    req = ProMPMessage(1)
    m.sendMessage(worker_id, req)
    
    if m.waitForMessage(worker_id, 1000):
        msg = m.getMessage(worker_id)
        if msg.id == 2:
            print(msg.data.decode("utf-8"))
        else:
            print("unknown message:", msg.id)
    else:
        print("no message")
    
    print("finished!")
    
main()

The output is:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
remote message
finished!
remote message finished!
remote message
finished!

Multi-level Processing

As already mentioned, a single process can create multiple managers. That’s true even for worker processes.

Let’s take into consideration the following snippet which must be launched from the command-line using the “-r” argument:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
from Pro.MP import *
import time
if proWorkerProcessLevel() < 5:
m = ProManager()
m.setOptions(ProMPOpt_AtomicOutput)
m.startWorker()
m.executePythonScript(ProWorker_Any, __file__)
while m.isBusy():
m.processMessages()
time.sleep(0.2)
m = None
else:
# last worker
print("Hello, world!")
from Pro.MP import * import time if proWorkerProcessLevel() < 5: m = ProManager() m.setOptions(ProMPOpt_AtomicOutput) m.startWorker() m.executePythonScript(ProWorker_Any, __file__) while m.isBusy(): m.processMessages() time.sleep(0.2) m = None else: # last worker print("Hello, world!")
from Pro.MP import *
import time

if proWorkerProcessLevel() < 5:
    m = ProManager()
    m.setOptions(ProMPOpt_AtomicOutput)
    
    m.startWorker()
    m.executePythonScript(ProWorker_Any, __file__)
    
    while m.isBusy():
        m.processMessages()
        time.sleep(0.2)
        
    m = None
else:
    # last worker
    print("Hello, world!")

proWorkerProcessLevel returns the level of the worker process. The first process we launch has a level of 0, which means it’s the manager or, in this case, the root manager.

As long as proWorkerProcessLevel is less than 5, the code creates a manager, starts a worker and tells the worker to run itself. The last worker (level 5) prints out a message.

The output of the root process is:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
Hello, world!
Hello, world!
Hello, world!

The reason is that the output is forwarded among each worker until it reaches the root manager.

Also important to notice is the following line in the script:

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
m = None
m = None
    m = None

Since the code is not in a function, we don’t want to leave a reference to the manager as otherwise the root process may not terminate and so won’t its workers.

Wait Objects

Managers and workers support wait objects. A wait object can be a wait dialog box or any other type of wait object.

Let’s take this basic code snippet which runs in a single process. The function doSomething performs a task until it finishes or until the user aborts the operation from the wait dialog.

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
from Pro.UI import *
def doSomething(wo):
import time
i = 1
while not wo.wasAborted() and i < 101:
time.sleep(0.05)
wo.msg("Completed: " + str(i) + "%")
wo.progress(i)
wo.processEvents()
i += 1
def main():
wait = proContext().startWait("Doing something...")
doSomething(wait)
wait.stop()
main()
from Pro.UI import * def doSomething(wo): import time i = 1 while not wo.wasAborted() and i < 101: time.sleep(0.05) wo.msg("Completed: " + str(i) + "%") wo.progress(i) wo.processEvents() i += 1 def main(): wait = proContext().startWait("Doing something...") doSomething(wait) wait.stop() main()
from Pro.UI import *

def doSomething(wo):
    import time
    i = 1
    while not wo.wasAborted() and i < 101:
        time.sleep(0.05)
        wo.msg("Completed: " + str(i) + "%")
        wo.progress(i)
        wo.processEvents()
        i += 1

def main():
    wait = proContext().startWait("Doing something...")
    doSomething(wait)
    wait.stop()
    
main()

Let’s now write the same sample using multi-processing. This time doSomething is executed in a different process.

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
from Pro.Core import NTVariantList
from Pro.MP import *
from Pro.UI import *
import time
remote_code = """
def doSomething(wo):
import time
i = 1
while not wo.wasAborted() and i < 101:
time.sleep(0.05)
wo.msg('Completed: ' + str(i) + '%')
wo.progress(i)
i += 1
def stub():
from Pro.MP import proWorkerObject
doSomething(proWorkerObject().waitObject())
"""
def main():
m = ProManager()
m.setOptions(ProMPOpt_AtomicOutput)
worker_id = m.startWorker()
ui_wait = proContext().startWait("Doing something...")
wait = m.createWaitObject(worker_id, ui_wait)
m.evalPythonFunction(worker_id, remote_code, "stub", NTVariantList())
while m.isBusy():
m.processMessages()
time.sleep(0.02)
wait.processEvents()
wait.stop()
main()
from Pro.Core import NTVariantList from Pro.MP import * from Pro.UI import * import time remote_code = """ def doSomething(wo): import time i = 1 while not wo.wasAborted() and i < 101: time.sleep(0.05) wo.msg('Completed: ' + str(i) + '%') wo.progress(i) i += 1 def stub(): from Pro.MP import proWorkerObject doSomething(proWorkerObject().waitObject()) """ def main(): m = ProManager() m.setOptions(ProMPOpt_AtomicOutput) worker_id = m.startWorker() ui_wait = proContext().startWait("Doing something...") wait = m.createWaitObject(worker_id, ui_wait) m.evalPythonFunction(worker_id, remote_code, "stub", NTVariantList()) while m.isBusy(): m.processMessages() time.sleep(0.02) wait.processEvents() wait.stop() main()
from Pro.Core import NTVariantList
from Pro.MP import *
from Pro.UI import *
import time

remote_code = """
def doSomething(wo):
    import time
    i = 1
    while not wo.wasAborted() and i < 101:
        time.sleep(0.05)
        wo.msg('Completed: ' + str(i) + '%')
        wo.progress(i)
        i += 1
        
def stub():
    from Pro.MP import proWorkerObject
    doSomething(proWorkerObject().waitObject())
"""

def main():
    m = ProManager()
    m.setOptions(ProMPOpt_AtomicOutput)
    
    worker_id = m.startWorker()
    
    ui_wait = proContext().startWait("Doing something...")
    wait = m.createWaitObject(worker_id, ui_wait)

    m.evalPythonFunction(worker_id, remote_code, "stub", NTVariantList())
    
    while m.isBusy():
        m.processMessages()
        time.sleep(0.02)
        wait.processEvents()

    wait.stop()
    
main()

The code of doSomething remained the same. We only removed the call to processEvents as we didn’t need to process UI events any longer, but we could have left it there as it wouldn’t have had any effect.

The important thing to remember is that both wait objects must remain referenced as long as we need them, since createWaitObject doesn’t add a reference to ui_wait.

User Interface

We can now further expand our use of managers and workers to the context of a user interface. Let’s say we want to keep an interface responsive while also performing some CPU-intensive operation.

An obvious solution is to launch a worker to do the heavy lifting for us and just wait for a response. The way we process messages in a UI context is to start idle processing using startIdleNotifications on a custom view. This will enable the custom view to receive pvnIdle notifications, which in turn can be used to call processMessages at fixed intervals.

The following code sample creates a custom view with a text control and inserts the text of every incoming message from the worker into the text control.

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
from Pro.UI import *
from Pro.MP import *
def MPViewCallback(cv, m, code, view, data):
if code == pvnInit:
cv.startIdleNotifications()
return 1
elif code == pvnIdle:
m.processMessages()
if m.hasMessage(ProWorker_Any):
text_view = cv.getView(1)
while True:
msg = m.getMessage(ProWorker_Any)
if msg.id == 1:
text_view.setSelectedText(msg.data.decode("utf-8"))
if not m.hasMessage(ProWorker_Any):
break
return 0
def main():
ctx = proContext()
v = ctx.createView(ProView.Type_Custom, "MP View")
m = ProManager()
m.setOptions(ProMPOpt_AtomicOutput)
worker_id = m.startWorker()
code = """
from Pro.MP import *
import time
w = proWorkerObject()
wo = w.waitObject()
msg = ProMPMessage(1)
i = 0
while not wo.wasAborted():
msg.data = b'remote message ' + str(i).encode('utf-8') + b'\\n'
w.sendMessage(msg)
time.sleep(1)
i += 1
"""
m.evalPythonCode(worker_id, code)
v.setup("<ui><hs><text id='1'/></hs></ui>", MPViewCallback, m)
ctx.addView(v)
main()
from Pro.UI import * from Pro.MP import * def MPViewCallback(cv, m, code, view, data): if code == pvnInit: cv.startIdleNotifications() return 1 elif code == pvnIdle: m.processMessages() if m.hasMessage(ProWorker_Any): text_view = cv.getView(1) while True: msg = m.getMessage(ProWorker_Any) if msg.id == 1: text_view.setSelectedText(msg.data.decode("utf-8")) if not m.hasMessage(ProWorker_Any): break return 0 def main(): ctx = proContext() v = ctx.createView(ProView.Type_Custom, "MP View") m = ProManager() m.setOptions(ProMPOpt_AtomicOutput) worker_id = m.startWorker() code = """ from Pro.MP import * import time w = proWorkerObject() wo = w.waitObject() msg = ProMPMessage(1) i = 0 while not wo.wasAborted(): msg.data = b'remote message ' + str(i).encode('utf-8') + b'\\n' w.sendMessage(msg) time.sleep(1) i += 1 """ m.evalPythonCode(worker_id, code) v.setup("<ui><hs><text id='1'/></hs></ui>", MPViewCallback, m) ctx.addView(v) main()
from Pro.UI import *
from Pro.MP import *

def MPViewCallback(cv, m, code, view, data):
    if code == pvnInit:
        cv.startIdleNotifications()
        return 1
    elif code == pvnIdle:
        m.processMessages()
        if m.hasMessage(ProWorker_Any):
            text_view = cv.getView(1)
            while True:
                msg = m.getMessage(ProWorker_Any)
                if msg.id == 1:
                    text_view.setSelectedText(msg.data.decode("utf-8"))
                if not m.hasMessage(ProWorker_Any):
                    break
    return 0

def main():
    ctx = proContext()
    v = ctx.createView(ProView.Type_Custom, "MP View")
    
    m = ProManager()
    m.setOptions(ProMPOpt_AtomicOutput)
    
    worker_id = m.startWorker()
    
    code = """
from Pro.MP import *
import time

w = proWorkerObject()
wo = w.waitObject()
msg = ProMPMessage(1)
i = 0
while not wo.wasAborted():
    msg.data = b'remote message ' + str(i).encode('utf-8') + b'\\n'
    w.sendMessage(msg)
    time.sleep(1)
    i += 1
"""
    m.evalPythonCode(worker_id, code)
    
    v.setup("<ui><hs><text id='1'/></hs></ui>", MPViewCallback, m)
    ctx.addView(v)

main()

For the final code example we not only work with the UI, but also with wait objects.

We launch 10 workers. Each worker has a custom wait object which updates a progress bar in our view. The user can abort each worker by clicking on a ‘Cancel’ button next to the progress bar.

Plain text
Copy to clipboard
Open code in new window
EnlighterJS 3 Syntax Highlighter
from Pro.Core import NTSimpleWait
from Pro.UI import *
from Pro.MP import *
remote_code = """
def doSomething(wo):
import time
i = 1
while not wo.wasAborted() and i < 101:
time.sleep(0.05)
wo.progress(i)
i += 1
from Pro.MP import proWorkerObject
doSomething(proWorkerObject().waitObject())
"""
class ProgressWait(NTSimpleWait):
def __init__(self, ctrl):
super(ProgressWait, self).__init__()
self.ctrl = ctrl
def progress(self, i):
self.ctrl.setValue(i)
class MPView(object):
def __init__(self):
pass
@staticmethod
def callback(cv, self, code, view, data):
if code == pvnInit:
self.worker_ids = []
# note: we must keep references to all wait objects
self.ui_wait_objects = []
self.mp_wait_object = []
# create workers
for i in range(self.worker_count):
worker_id = self.manager.startWorker()
self.worker_ids.append(worker_id)
ui_wo = ProgressWait(self.view.getView(i))
self.ui_wait_objects.append(ui_wo)
mp_wo = self.manager.createWaitObject(worker_id, ui_wo)
self.mp_wait_object.append(mp_wo)
self.manager.evalPythonCode(worker_id, remote_code)
cv.startIdleNotifications()
return 1
elif code == pvnIdle:
# process messages
self.manager.processMessages()
elif code == pvnButtonClicked:
view.setEnabled(False)
worker_id = self.worker_ids[view.id() - 1000]
self.manager.abortOperation(worker_id, 1000)
return 0
@staticmethod
def create():
ctx = proContext()
self = MPView()
self.worker_count = 10
# create manager
self.manager = ProManager()
self.manager.setOptions(ProMPOpt_AtomicOutput)
# create view
self.view = ctx.createView(ProView.Type_Custom, "MP View")
ui = "<ui><gl margin='20' spacing='20' align='top'>"
for i in range(self.worker_count):
ui += "<progbar id='%d'/><btn id='%d' text='Stop'/><nl/>" % (i, i + 1000)
ui += "</gl></ui>"
self.view.setup(ui, MPView.callback, self)
ctx.addView(self.view)
MPView.create()
from Pro.Core import NTSimpleWait from Pro.UI import * from Pro.MP import * remote_code = """ def doSomething(wo): import time i = 1 while not wo.wasAborted() and i < 101: time.sleep(0.05) wo.progress(i) i += 1 from Pro.MP import proWorkerObject doSomething(proWorkerObject().waitObject()) """ class ProgressWait(NTSimpleWait): def __init__(self, ctrl): super(ProgressWait, self).__init__() self.ctrl = ctrl def progress(self, i): self.ctrl.setValue(i) class MPView(object): def __init__(self): pass @staticmethod def callback(cv, self, code, view, data): if code == pvnInit: self.worker_ids = [] # note: we must keep references to all wait objects self.ui_wait_objects = [] self.mp_wait_object = [] # create workers for i in range(self.worker_count): worker_id = self.manager.startWorker() self.worker_ids.append(worker_id) ui_wo = ProgressWait(self.view.getView(i)) self.ui_wait_objects.append(ui_wo) mp_wo = self.manager.createWaitObject(worker_id, ui_wo) self.mp_wait_object.append(mp_wo) self.manager.evalPythonCode(worker_id, remote_code) cv.startIdleNotifications() return 1 elif code == pvnIdle: # process messages self.manager.processMessages() elif code == pvnButtonClicked: view.setEnabled(False) worker_id = self.worker_ids[view.id() - 1000] self.manager.abortOperation(worker_id, 1000) return 0 @staticmethod def create(): ctx = proContext() self = MPView() self.worker_count = 10 # create manager self.manager = ProManager() self.manager.setOptions(ProMPOpt_AtomicOutput) # create view self.view = ctx.createView(ProView.Type_Custom, "MP View") ui = "<ui><gl margin='20' spacing='20' align='top'>" for i in range(self.worker_count): ui += "<progbar id='%d'/><btn id='%d' text='Stop'/><nl/>" % (i, i + 1000) ui += "</gl></ui>" self.view.setup(ui, MPView.callback, self) ctx.addView(self.view) MPView.create()
from Pro.Core import NTSimpleWait
from Pro.UI import *
from Pro.MP import *

remote_code = """
def doSomething(wo):
    import time
    i = 1
    while not wo.wasAborted() and i < 101:
        time.sleep(0.05)
        wo.progress(i)
        i += 1
        
from Pro.MP import proWorkerObject
doSomething(proWorkerObject().waitObject())
"""

class ProgressWait(NTSimpleWait):

    def __init__(self, ctrl):
        super(ProgressWait, self).__init__()
        self.ctrl = ctrl

    def progress(self, i):
        self.ctrl.setValue(i)
        
class MPView(object):

    def __init__(self):
        pass

    @staticmethod
    def callback(cv, self, code, view, data):
        if code == pvnInit:
            self.worker_ids = []
            # note: we must keep references to all wait objects
            self.ui_wait_objects = []
            self.mp_wait_object = []
            # create workers
            for i in range(self.worker_count):
                worker_id = self.manager.startWorker()
                self.worker_ids.append(worker_id)
                ui_wo = ProgressWait(self.view.getView(i))
                self.ui_wait_objects.append(ui_wo)
                mp_wo = self.manager.createWaitObject(worker_id, ui_wo)
                self.mp_wait_object.append(mp_wo)
                self.manager.evalPythonCode(worker_id, remote_code)
            cv.startIdleNotifications()
            return 1
        elif code == pvnIdle:
            # process messages
            self.manager.processMessages()
        elif code == pvnButtonClicked:
            view.setEnabled(False)
            worker_id = self.worker_ids[view.id() - 1000]
            self.manager.abortOperation(worker_id, 1000)
        return 0

    @staticmethod
    def create():
        ctx = proContext()
        self = MPView()
        self.worker_count = 10
        
        # create manager
        self.manager = ProManager()
        self.manager.setOptions(ProMPOpt_AtomicOutput)
        
        # create view
        self.view = ctx.createView(ProView.Type_Custom, "MP View")
        ui = "<ui><gl margin='20' spacing='20' align='top'>"
        for i in range(self.worker_count):
            ui += "<progbar id='%d'/><btn id='%d' text='Stop'/><nl/>" % (i, i + 1000)
        ui += "</gl></ui>"
        self.view.setup(ui, MPView.callback, self)
        ctx.addView(self.view)

MPView.create()

An image in this case is worth a thousand words.

We’ll soon publish the official documentation for our multi-processing module.