Using MTurk with Zapier

Published by Dave on

When I first joined the Amazon MTurk team a colleague at Amazon commented that it was frustrating that MTurk couldn’t be used with services like Zapier to schedule tasks for MTurk Workers to tackle. She had a side business in real estate investment and thought it would be useful to trigger review by MTurk Workers for certain attributes when new listings were posted. She also pointed out it could be used for a variety of other tasks where MTurk Workers could be useful as a step Zapier workflows (Zaps).

I took that as a challenge, and from time to time I tried to figure out a way to make it work. I unfortunately kept hitting roadblocks, or got busy with other things, and never quite figured it out. Since leaving Amazon I finally got it working for some of my own workflows. While the solution requires a bit more technical skill and AWS work than the average Zapier task, it does open up some powerful options for automation. In the example below we’ll keep things simple by just asking Workers to categorize forum posts but you’ll still need some coding and AWS skills if you want to set this up yourself.

It can’t be done as a single Zap…

On the surface this should be easy, I want to setup a Zap that will be triggered by new forum posts, have MTurk Workers categorize them, and then send an email with the forum post and the category that was assigned. In short, something like this:

The challenge is that using MTurk isn’t just one step. When we post the task (1) we’ll be posting it to the MTurk marketplace, (2) Workers will complete it, and (3) we’ll need to retrieve the response that Workers provided. This doesn’t fit with how most Actions in Zapier are designed to work. They’re typically designed around making an API call and then getting the results back immediately. While MTurk is generally pretty fast for things that involve humans, it’s likely going to take up to 5 minutes to get a response from Workers, if not longer (although I’ve seen many tasks completed in less than a minute in cases like this). As a result we’ll need to revise our approach.

One option would be to add a delay to our Zap to wait for results to be available. We’d have one step to create a task on MTurk, wait 10 minutes, and then have another step to retrieve the results. There are a couple problems with this approach. First, if it happens to take Workers more than 10 minutes you’re not going to find any results. Second, the only Zapier tool to call MTurk with is the AWS Lambda Action. This Action is great but it currently doesn’t let you use the return value from your Lambda task in subsequent steps. This means that you wouldn’t get the HITId back from your create task step, nor would you get the result that Workers provided in the subsequent step.

It’s a bit frustrating that you can’t do this as one continuous Zap but it’s still possible to make this work. To get around the limitations we’ll leverage some of notification features that have been built into MTurk.

Two Zap workflow

To make this work we’re going to create two separate Zaps. The first is triggered by an RSS feed and calls an AWS Lambda Action to create a task on the MTurk marketplace. The second is going to be triggered by an Amazon Simple Queue Service (SQS) queue which is where the results will be posted once my task is done.

Of course to make this work we need to build some back end steps to get the results of our MTurk task onto that SQS queue. To do this we’ll take advantage of the notification feature built into MTurk. You can specify a notification to occur at various stages of a HIT lifecycle. In this case we’ll post a message when the task has been completed by Workers (HITReviewable). We’ll have this message go to an intermediate SNS topic that will be setup to trigger another AWS Lambda function to process the results and put the final answer on our final SQS queue.

Now the question you may be asking is: “Why don’t you just have Zapier listen for the first SNS topic to be ready and trigger off of that?” You can certainly do that but it wouldn’t be very useful because the message that MTurk sends only includes a message indicating that the task is complete, it doesn’t include any of the Worker responses. The extra AWS Lambda step will retrieve the Worker responses and (if we ask multiple Workers) consolidate them into a single response.

Building the AWS resources

Let’s get started by setting up the Lambdas, SNS topic, and SQS queue that are going to be used here. We’ll create two Lambdas called demo-zapier-mturk-create and demo-zapier-mturk-handle. We’ll also create an intermediate SNS topic demo-zapier-mturk-notification and an SQS queue name demo-zapier-mturk-complete where we will put the results so they can be grabbed by Zapier.

Before we go too much further, note that to do this task you’ll need to setup and fund and MTurk account for use with this project. You’ll also need to link it to your AWS account.

Create the Topic and Queue

As a first step we’ll create an SNS topic to let us know when the HIT is complete and an SQS queue to let Zapier know when our data is ready. To setup the demo-zapier-mturk-notification topic just select Topics from the SNS console and select Create topic.

On the Create topic page you’ll want to provide the name of the topic and setup permissions so that MTurk can publish events to it. More details on doing this can be found in the MTurk API docs or you can simply select Advanced under the Access Policy section and add the following statement (replacing the account ID portions below).

    {
      "Effect": "Allow",
      "Principal": {
        "Service": "mturk-requester.amazonaws.com"
      },
      "Action": "SNS:Publish",
      "Resource": "arn:aws:sns:region:YOUR_ACCOUNT_ID:demo-zapier-mturk-notification",
      "Condition": {
        "StringEquals": {
          "aws:SourceAccount": "YOUR_ACCOUNT_ID"
        },
        "Bool": {
          "aws:SecureTransport":"true"
        }
      }
    }

To setup the demo-zapier-mturk-complete SQS queue you can simply select the Quick Create option on the SQS console.

Build the Create Lambda

Now we can create the Lambdas. Start by creating a new Lambda named demo-zapier-mturk-create with the Python 3.x runtime. You can let AWS create a default execution role for you using the basic permissions to start. After you create the function, we can give your Lambda permissions to access MTurk by scrolling down to the Execution Role section and viewing the IAM role that was created for our Lambda.

Click on the role and attach the AmazonMechanicalTurkFullAccess policy to allow our Lambda to create HITs.

Now that we’ve created the function we can add the code below.

import json
import boto3

def lambda_handler(event, context):
    print(event)
    
    ###################################
    # Step 1: Create a client
    ###################################
    endpoint = "https://mturk-requester.us-east-1.amazonaws.com"
    mturk = boto3.client(
        service_name='mturk',
        region_name='us-east-1',
        endpoint_url=endpoint)

    ###################################
    # Step 2: Define the task
    ###################################
    html = '''
        <script src="https://assets.crowd.aws/crowd-html-elements.js"></script>
        <crowd-form answer-format="flatten-objects">
        
            <crowd-classifier
              categories="['Question', 'Request', 'Feedback', 'Other']"
              name="category"
              header="What is best way to describe the topic of this thread?"
            >
              <classification-target>
                <a href="{}" target="_blank">{}</a>
        
              </classification-target>
        
              <short-instructions>
                Choose the appropriate category that best suits the thread.
              </short-instructions>
        
              <full-instructions header="Classification Instructions">
                <p>Read the thread carefully.</p>
                <p>Choose the appropriate category that best suits the thread.</p>
              </full-instructions>
        
            </crowd-classifier>
        </crowd-form>
    '''.format(event['thread_link'], event['thread_title'])
    
    question_xml = '''
        <HTMLQuestion xmlns="http://mechanicalturk.amazonaws.com/AWSMechanicalTurkDataSchemas/2011-11-11/HTMLQuestion.xsd">
        <HTMLContent><![CDATA[{}]]></HTMLContent>
        <FrameHeight>0</FrameHeight>
        </HTMLQuestion>'''.format(html)
        
    task_attributes = {
        'MaxAssignments': 3,
        'LifetimeInSeconds': 60 * 60 * 5, # Stay active for 5 hours
        'AssignmentDurationInSeconds': 60 * 10, # Workers have 10 minutes to respond
        'Reward': '0.03',
        'Title': 'Categorize a forum post',
        'Keywords': 'categorize',
        'Description': 'Choose the appropriate category that best suits the thread.',
        'RequesterAnnotation': json.dumps(event)
    }

    ###################################
    # Step 3: Create the HIT
    ###################################
    response = mturk.create_hit(
        **task_attributes,
        Question=question_xml
    )
    hit_type_id = response['HIT']['HITTypeId']
    print('Created HIT {} in HITType {}'.format(response['HIT']['HITId'], hit_type_id))
    
    ###################################
    # Step 4: Attach a notification
    ###################################
    mturk.update_notification_settings(
        HITTypeId=hit_type_id,
        Notification={
            'Destination': 'arn:aws:sns:us-west-2:0000000000000:demo-zapier-mturk-notification',
            'Transport': 'SNS',
            'Version': '2014-08-15',
            'EventTypes': ['HITReviewable']
        },
        Active=True
    )
   
    return {
        'statusCode': 200
    }

There are four steps to this function:

  1. Create an MTurk client using the appropriate endpoint.
  2. Define the task we’re going to display to Workers. We start by defining the HTML that will be displayed and wrap it in the QuestionXML format used by MTurk. Then we define the attributes for our HIT including the title and description Workers will see, as well as the amount we will pay per response. Note that we’re going to pass the event that triggered this task in the RequesterAnnotation field. We’ll use this later.
  3. Create our HIT using the client and the attributes we’ve defined.
  4. Update notification settings for this HITType to let us know when all of the Assignments have been completed. Here we’ve included the URL of our notification SNS topic. For yours you’ll need to update the account id.

Note that I haven’t included any checks to ensure that the RequesterAnnotation values we pass don’t exceed the 256 character limit for this field. I’ve excluded this for brevity, but I’d recommend adding a check if you use this in production.

Build the Handle Lambda

Next we can create a demo-zapier-mturk-handle Lambda using the Python 3.x runtime. Similar to the create function, we’ll edit the IAM role to include AmazonMechicanicalTurkFullAccess policy as well as the AmazonSQSFullAccess policy.

To trigger our Lambda on the SNS topic we setup, we’ll add a trigger in our Lambda designer to use the SNS queue we setup earlier.

To ensure that our function has enough time to run, we’ll increase the timeout to 15 seconds.

Once we’ve completed this setup, we can add the code below.

import json
import boto3
import xml.etree.ElementTree as ET


def lambda_handler(event, context):
    print(json.dumps(event))

    for record in event['Records']:
        notification = json.loads(record['Sns']['Message'])

        for mturk_event in notification['Events']:

            if mturk_event['EventType'] == 'HITReviewable':
                
                ###################################
                # Step 1: Create a client
                ###################################
                endpoint = "https://mturk-requester.us-east-1.amazonaws.com"
                # For testing in the MTurk Sandbox
                #endpoint = "https://mturk-requester-sandbox.us-east-1.amazonaws.com"
                mturk = boto3.client(
                    service_name='mturk',
                    region_name='us-east-1',
                    endpoint_url=endpoint)

                
                ###################################
                # Step 2: Get the HIT and data that
                #         had been submitted.
                ###################################
                hit_response = mturk.get_hit(HITId=mturk_event['HITId'])
                hit = hit_response['HIT']
                source_data = json.loads(hit['RequesterAnnotation'])

                ###################################
                # Step 3: Retrieve the answers that 
                #         were provided by Workers
                ###################################
                response = mturk.list_assignments_for_hit(
                    HITId=mturk_event['HITId'],
                    AssignmentStatuses=['Submitted', 'Approved'])
                assignments = response['Assignments']
                answers = []
                answers_namespace = {'mt': 'http://mechanicalturk.amazonaws.com/AWSMechanicalTurkDataSchemas/2005-10-01/QuestionFormAnswers.xsd'}
                for assignment in assignments:
                    root = ET.fromstring(assignment['Answer'])
                    answers.append(root.find('mt:Answer', answers_namespace).find('mt:FreeText', answers_namespace).text)
                print(answers)

                ###################################
                # Step 4: Consolidate the answers
                ###################################
                scored_answers = {}
                for answer in answers:
                    scored_answers[answer] = scored_answers.get(answer,0) + 1
                result = json.dumps(scored_answers)
                for answer, score in scored_answers.items():
                    if score/len(answers) > .6:
                        result = answer
                print(result)

                ###################################
                # Step 5: Send the result to SQS
                ###################################
                source_data['category'] = result
                message = json.dumps(source_data)
                print(message)
                sqs = boto3.client('sqs')
                sqs.send_message(QueueUrl='https://sqs.us-west-2.amazonaws.com/000000000000/demo-zapier-mturk-complete', MessageBody=message)
                
            else:
                print('Unhandled event type {}'.format(mturk_event['EventType']))
    return {
        'statusCode': 200
    }

This function has five steps to retrieve the results provided by Workers and post them to SQS so Zapier can retrieve them:

  1. Create an MTurk client using the appropriate endpoint.
  2. Retrieve the HIT itself to extract the RequesterAnnotation we stored earlier.
  3. Get the Assignments that were returned by Workers and extract the answers they provided. Here we use the ElementTree library to simplify extracting the answer from the XML format that MTurk uses.
  4. Consolidate the answers provided by Workers. This uses a rudimentary consolidation approach that counts the number of times each response was provided and returns that answer if at least 60% of Workers agree. If this threshold isn’t met the scores themselves are returned.
  5. Send the results along to the complete SQS queue. You’ll need to update the URL to use your queue. Note that we’re including our data from the RequesterAnnotation in the message we’re sending to Zapier. This will allow us to display the original thread alongside the category.

Whew! That took a bit of work but we’re all set and can return to Zapier to get this working.

Zapier setup

As I mentioned earlier, we’ll be setting up two Zaps. The first will look for new entries on a forum using RSS and will pass those to our create Lambda. The second will look for results on our complete SQS queue and send an email with the results.

New forum post Zap

For this example we’ll look for new posts on the AWS MTurk forum. To start we’ll create a new Zap and select the RSS trigger app.

Enter the RSS feed associated with this forum and choose the Different Guid/URL option to use as a trigger.

In the next step we’re prompted to select one of the existing posts to use for a test. Pick whichever you like and continue. After you do this step we’re prompted to add an Action and will choose the AWS Lambda app. Note that this is a premium app so you’ll need to sign up for a paid plan to use it.

Choose the Invoke Function option and select an AWS Lambda account you want to use. If you haven’t set this up in the past you’ll need to go through account setup and create an IAM user to provide AWS credentials to Zapier.

Now we can select a region and our create Lambda. We’ll also pass two arguments to the Lambda, thread_link and thread_title. You may recall that these are the values our create Lambda inserts into the task template. These will also be passed in our RequesterAnnotation. We’re passing the Title and Link values that were found in our first step.

We can now submit a test to Lambda to confirm that it works. This will submit the test thread to MTurk as a task to confirm that it works. This will also result in a message in our complete SQS queue that we can use for testing in the next step. Note that because Zapier doesn’t handle any return values from AWS Lambda you may get an alert that there was an error with your task. If this happens you can just choose to skip the test when prompted to again. After you select Finish you’ll be prompted to give your Zap a name and turn it on.

To confirm that everything worked correctly you can check your AWS CloudWatch Logs to review the results for both Lambdas as it moves through the processing stages.

Categorization complete Zap

You can monitor your complete SQS queue to see when the task is complete and a message has been posted there.

When you have a message there (typically around 10 minutes but maybe less) we can then start setup of our next Zap. For this Zap we’ll start with the Amazon SQS trigger app.

Because our message is JSON we can use the New JSON Message trigger and will need to link to an AWS account with SQS access.

After this step we can find our complete SQS queue, use Base64 encoding, and instruct the app to remove consumed messages.

Once we’ve completed this step we’ll be prompted to select a message from the queue to use for testing and can create an Action to handle the message.

We’ll use the Gmail – Send Email action but from here on you can use just about any Action you wish. In our case we’ll compose an email that includes the relevant details about the thread. As you can see below, we’ve included all of the data that was included in the contents of our SQS message.

The steps from here are similar to those above. We can send a test email to confirm it works and enable our Zap.

Human task SaaS

While there are more steps than I’d like in this process, I’ve found it is quite reliable and I typically get results within 30 minutes of a new forum post. The primary reason it typically takes this long is that Zapier only polls SQS and RSS every 15 minutes.

I hope you found this walk through of using MTurk with Zapier useful. Feel free to share your thoughts and ideas in the comments. If you have a process you’d like to setup using MTurk with Zapier or similar services, feel free to reach out, I’d be happy to assist your business in building a solution tailored to your needs.

Note that this post originally suggested using an SQS queue for the intermediate step rather than an SNS topic. This was changed to use SNS because the SQS Lambda trigger polls every 4 seconds and this can quickly exceed the monthly free tier if used for multiple tasks.

Categories: Uncategorized

0 Comments

Leave a Reply

Your email address will not be published. Required fields are marked *