#!python
# Copyright 2012, 2014 Kodi Arfer
#
# Permission is hereby granted, free of charge, to any person obtaining a
# copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish, dis-
# tribute, sublicense, and/or sell copies of the Software, and to permit
# persons to whom the Software is furnished to do so, subject to the fol-
# lowing conditions:
#
# The above copyright notice and this permission notice shall be included
# in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS
# OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL-
# ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
# SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
import argparse # Hence, Python 2.7 is required.
import sys
import os.path
import string
import inspect
import datetime, calendar
import boto.mturk.connection, boto.mturk.price, boto.mturk.question, boto.mturk.qualification
from boto.compat import json
# --------------------------------------------------
# Globals
# -------------------------------------------------
interactive = False
con = None
mturk_website = None
default_nicknames_path = os.path.expanduser('~/.boto_mturkcli_hit_nicknames')
nicknames = {}
nickname_pool = set(string.ascii_lowercase)
get_assignments_page_size = 100
time_units = dict(
s = 1,
min = 60,
h = 60 * 60,
d = 24 * 60 * 60)
qual_requirements = dict(
Adult = '00000000000000000060',
Locale = '00000000000000000071',
NumberHITsApproved = '00000000000000000040',
PercentAssignmentsSubmitted = '00000000000000000000',
PercentAssignmentsAbandoned = '00000000000000000070',
PercentAssignmentsReturned = '000000000000000000E0',
PercentAssignmentsApproved = '000000000000000000L0',
PercentAssignmentsRejected = '000000000000000000S0')
qual_comparators = {v : k for k, v in dict(
LessThan = '<', LessThanOrEqualTo = '<=',
GreaterThan = '>', GreaterThanOrEqualTo = '>=',
EqualTo = '==', NotEqualTo = '!=',
Exists = 'exists').items()}
example_config_file = '''Example configuration file:
{
"title": "Pick your favorite color",
"description": "In this task, you are asked to pick your favorite color.",
"reward": 0.50,
"assignments": 10,
"duration": "20 min",
"keywords": ["color", "favorites", "survey"],
"lifetime": "7 d",
"approval_delay": "14 d",
"qualifications": [
"PercentAssignmentsApproved > 90",
"Locale == US",
"2ARFPLSP75KLA8M8DH1HTEQVJT3SY6 exists"
],
"question_url": "http://example.com/myhit",
"question_frame_height": 450
}'''
# --------------------------------------------------
# Subroutines
# --------------------------------------------------
def unjson(path):
with open(path) as o:
return json.load(o)
def add_argparse_arguments(parser):
parser.add_argument('-P', '--production',
dest = 'sandbox', action = 'store_false', default = True,
help = 'use the production site (default: use the sandbox)')
parser.add_argument('--nicknames',
dest = 'nicknames_path', metavar = 'PATH',
default = default_nicknames_path,
help = 'where to store HIT nicknames (default: {})'.format(
default_nicknames_path))
def init_by_args(args):
init(args.sandbox, args.nicknames_path)
def init(sandbox = False, nicknames_path = default_nicknames_path):
global con, mturk_website, nicknames, original_nicknames
mturk_website = 'workersandbox.mturk.com' if sandbox else 'www.mturk.com'
con = boto.mturk.connection.MTurkConnection(
host = 'mechanicalturk.sandbox.amazonaws.com' if sandbox else 'mechanicalturk.amazonaws.com')
try:
nicknames = unjson(nicknames_path)
except IOError:
nicknames = {}
original_nicknames = nicknames.copy()
def save_nicknames(nicknames_path = default_nicknames_path):
if nicknames != original_nicknames:
with open(nicknames_path, 'w') as o:
json.dump(nicknames, o, sort_keys = True, indent = 4)
print >>o
def parse_duration(s):
'''Parses durations like "2 d", "48 h", "2880 min",
"172800 s", or "172800".'''
x = s.split()
return int(x[0]) * time_units['s' if len(x) == 1 else x[1]]
def display_duration(n):
for unit, m in sorted(time_units.items(), key = lambda x: -x[1]):
if n % m == 0:
return '{} {}'.format(n / m, unit)
def parse_qualification(inp):
'''Parses qualifications like "PercentAssignmentsApproved > 90",
"Locale == US", and "2ARFPLSP75KLA8M8DH1HTEQVJT3SY6 exists".'''
inp = inp.split()
name, comparator, value = inp.pop(0), inp.pop(0), (inp[0] if len(inp) else None)
qtid = qual_requirements.get(name)
if qtid is None:
# Treat "name" as a Qualification Type ID.
qtid = name
if qtid == qual_requirements['Locale']:
return boto.mturk.qualification.LocaleRequirement(
qual_comparators[comparator],
value,
required_to_preview = False)
return boto.mturk.qualification.Requirement(
qtid,
qual_comparators[comparator],
value,
required_to_preview = qtid == qual_requirements['Adult'])
# Thus required_to_preview is true only for the
# Worker_Adult requirement.
def preview_url(hit):
return 'https://{}/mturk/preview?groupId={}'.format(
mturk_website, hit.HITTypeId)
def parse_timestamp(s):
'''Takes a timestamp like "2012-11-24T16:34:41Z".
Returns a datetime object in the local time zone.'''
return datetime.datetime.fromtimestamp(
calendar.timegm(
datetime.datetime.strptime(s, '%Y-%m-%dT%H:%M:%SZ').timetuple()))
def get_hitid(nickname_or_hitid):
return nicknames.get(nickname_or_hitid) or nickname_or_hitid
def get_nickname(hitid):
for k, v in nicknames.items():
if v == hitid:
return k
return None
def display_datetime(dt):
return dt.strftime('%e %b %Y, %l:%M %P')
def display_hit(hit, verbose = False):
et = parse_timestamp(hit.Expiration)
return '\n'.join([
'{} - {} ({}, {}, {})'.format(
get_nickname(hit.HITId),
hit.Title,
hit.FormattedPrice,
display_duration(int(hit.AssignmentDurationInSeconds)),
hit.HITStatus),
'HIT ID: ' + hit.HITId,
'Type ID: ' + hit.HITTypeId,
'Group ID: ' + hit.HITGroupId,
'Preview: ' + preview_url(hit),
'Created {} {}'.format(
display_datetime(parse_timestamp(hit.CreationTime)),
'Expired' if et <= datetime.datetime.now() else
'Expires ' + display_datetime(et)),
'Assignments: {} -- {} avail, {} pending, {} reviewable, {} reviewed'.format(
hit.MaxAssignments,
hit.NumberOfAssignmentsAvailable,
hit.NumberOfAssignmentsPending,
int(hit.MaxAssignments) - (int(hit.NumberOfAssignmentsAvailable) + int(hit.NumberOfAssignmentsPending) + int(hit.NumberOfAssignmentsCompleted)),
hit.NumberOfAssignmentsCompleted)
if hasattr(hit, 'NumberOfAssignmentsAvailable')
else 'Assignments: {} total'.format(hit.MaxAssignments),
# For some reason, SearchHITs includes the
# NumberOfAssignmentsFoobar fields but GetHIT doesn't.
] + ([] if not verbose else [
'\nDescription: ' + hit.Description,
'\nKeywords: ' + hit.Keywords
])) + '\n'
def digest_assignment(a):
return dict(
answers = {str(x.qid): str(x.fields[0]) for x in a.answers[0]},
**{k: str(getattr(a, k)) for k in (
'AcceptTime', 'SubmitTime',
'HITId', 'AssignmentId', 'WorkerId',
'AssignmentStatus')})
# --------------------------------------------------
# Commands
# --------------------------------------------------
def get_balance():
return con.get_account_balance()
def show_hit(hit):
return display_hit(con.get_hit(hit)[0], verbose = True)
def list_hits():
'Lists your 10 most recently created HITs, with the most recent last.'
return '\n'.join(reversed(map(display_hit, con.search_hits(
sort_by = 'CreationTime',
sort_direction = 'Descending',
page_size = 10))))
def make_hit(title, description, keywords, reward, question_url, question_frame_height, duration, assignments, approval_delay, lifetime, qualifications = []):
r = con.create_hit(
title = title,
description = description,
keywords = con.get_keywords_as_string(keywords),
reward = con.get_price_as_price(reward),
question = boto.mturk.question.ExternalQuestion(
question_url,
question_frame_height),
duration = parse_duration(duration),
qualifications = boto.mturk.qualification.Qualifications(
map(parse_qualification, qualifications)),
max_assignments = assignments,
approval_delay = parse_duration(approval_delay),
lifetime = parse_duration(lifetime))
nick = None
available_nicks = nickname_pool - set(nicknames.keys())
if available_nicks:
nick = min(available_nicks)
nicknames[nick] = r[0].HITId
if interactive:
print 'Nickname:', nick
print 'HIT ID:', r[0].HITId
print 'Preview:', preview_url(r[0])
else:
return r[0]
def extend_hit(hit, assignments_increment = None, expiration_increment = None):
con.extend_hit(hit, assignments_increment, expiration_increment)
def expire_hit(hit):
con.expire_hit(hit)
def delete_hit(hit):
'''Deletes a HIT using DisableHIT.
Unreviewed assignments get automatically approved. Unsubmitted
assignments get automatically approved upon submission.
The API docs say DisableHIT doesn't work with Reviewable HITs,
but apparently, it does.'''
con.disable_hit(hit)
global nicknames
nicknames = {k: v for k, v in nicknames.items() if v != hit}
def list_assignments(hit, only_reviewable = False):
# Accumulate all relevant assignments, one page of results at
# a time.
assignments = []
page = 1
while True:
rs = con.get_assignments(
hit_id = hit,
page_size = get_assignments_page_size,
page_number = page,
status = 'Submitted' if only_reviewable else None)
assignments += map(digest_assignment, rs)
if len(assignments) >= int(rs.TotalNumResults):
break
page += 1
if interactive:
print json.dumps(assignments, sort_keys = True, indent = 4)
print ' '.join([a['AssignmentId'] for a in assignments])
print ' '.join([a['WorkerId'] + ',' + a['AssignmentId'] for a in assignments])
else:
return assignments
def grant_bonus(message, amount, pairs):
for worker, assignment in pairs:
con.grant_bonus(worker, assignment, con.get_price_as_price(amount), message)
if interactive: print 'Bonused', worker
def approve_assignments(message, assignments):
for a in assignments:
con.approve_assignment(a, message)
if interactive: print 'Approved', a
def reject_assignments(message, assignments):
for a in assignments:
con.reject_assignment(a, message)
if interactive: print 'Rejected', a
def unreject_assignments(message, assignments):
for a in assignments:
con.approve_rejected_assignment(a, message)
if interactive: print 'Unrejected', a
def notify_workers(subject, text, workers):
con.notify_workers(workers, subject, text)
def give_qualification(qualification, workers, value = 1, notify = True):
for w in workers:
con.assign_qualification(qualification, w, value, notify)
if interactive: print 'Gave to', w
def revoke_qualification(qualification, workers, message = None):
for w in workers:
con.revoke_qualification(w, qualification, message)
if interactive: print 'Revoked from', w
# --------------------------------------------------
# Mainline code
# --------------------------------------------------
if __name__ == '__main__':
interactive = True
parser = argparse.ArgumentParser()
add_argparse_arguments(parser)
subs = parser.add_subparsers()
sub = subs.add_parser('bal',
help = 'display your prepaid balance')
sub.set_defaults(f = get_balance, a = lambda: [])
sub = subs.add_parser('hit',
help = 'get information about a HIT')
sub.add_argument('HIT',
help = 'nickname or ID of the HIT to show')
sub.set_defaults(f = show_hit, a = lambda:
[get_hitid(args.HIT)])
sub = subs.add_parser('hits',
help = 'list all your HITs')
sub.set_defaults(f = list_hits, a = lambda: [])
sub = subs.add_parser('new',
help = 'create a new HIT (external questions only)',
epilog = example_config_file,
formatter_class = argparse.RawDescriptionHelpFormatter)
sub.add_argument('JSON_PATH',
help = 'path to JSON configuration file for the HIT')
sub.add_argument('-u', '--question-url', dest = 'question_url',
metavar = 'URL',
help = 'URL for the external question')
sub.add_argument('-a', '--assignments', dest = 'assignments',
type = int, metavar = 'N',
help = 'number of assignments')
sub.add_argument('-r', '--reward', dest = 'reward',
type = float, metavar = 'PRICE',
help = 'reward amount, in USD')
sub.set_defaults(f = make_hit, a = lambda: dict(
unjson(args.JSON_PATH).items() + [(k, getattr(args, k))
for k in ('question_url', 'assignments', 'reward')
if getattr(args, k) is not None]))
sub = subs.add_parser('extend',
help = 'add assignments or time to a HIT')
sub.add_argument('HIT',
help = 'nickname or ID of the HIT to extend')
sub.add_argument('-a', '--assignments', dest = 'assignments',
metavar = 'N', type = int,
help = 'number of assignments to add')
sub.add_argument('-t', '--time', dest = 'time',
metavar = 'T',
help = 'amount of time to add to the expiration date')
sub.set_defaults(f = extend_hit, a = lambda:
[get_hitid(args.HIT), args.assignments,
args.time and parse_duration(args.time)])
sub = subs.add_parser('expire',
help = 'force a HIT to expire without deleting it')
sub.add_argument('HIT',
help = 'nickname or ID of the HIT to expire')
sub.set_defaults(f = expire_hit, a = lambda:
[get_hitid(args.HIT)])
sub = subs.add_parser('rm',
help = 'delete a HIT')
sub.add_argument('HIT',
help = 'nickname or ID of the HIT to delete')
sub.set_defaults(f = delete_hit, a = lambda:
[get_hitid(args.HIT)])
sub = subs.add_parser('as',
help = "list a HIT's submitted assignments")
sub.add_argument('HIT',
help = 'nickname or ID of the HIT to get assignments for')
sub.add_argument('-r', '--reviewable', dest = 'only_reviewable',
action = 'store_true',
help = 'show only unreviewed assignments')
sub.set_defaults(f = list_assignments, a = lambda:
[get_hitid(args.HIT), args.only_reviewable])
for command, fun, helpmsg in [
('approve', approve_assignments, 'approve assignments'),
('reject', reject_assignments, 'reject assignments'),
('unreject', unreject_assignments, 'approve previously rejected assignments')]:
sub = subs.add_parser(command, help = helpmsg)
sub.add_argument('ASSIGNMENT', nargs = '+',
help = 'ID of an assignment')
sub.add_argument('-m', '--message', dest = 'message',
metavar = 'TEXT',
help = 'feedback message shown to workers')
sub.set_defaults(f = fun, a = lambda:
[args.message, args.ASSIGNMENT])
sub = subs.add_parser('bonus',
help = 'give some workers a bonus')
sub.add_argument('AMOUNT', type = float,
help = 'bonus amount, in USD')
sub.add_argument('MESSAGE',
help = 'the reason for the bonus (shown to workers in an email sent by MTurk)')
sub.add_argument('WIDAID', nargs = '+',
help = 'a WORKER_ID,ASSIGNMENT_ID pair')
sub.set_defaults(f = grant_bonus, a = lambda:
[args.MESSAGE, args.AMOUNT,
[p.split(',') for p in args.WIDAID]])
sub = subs.add_parser('notify',
help = 'send a message to some workers')
sub.add_argument('SUBJECT',
help = 'subject of the message')
sub.add_argument('MESSAGE',
help = 'text of the message')
sub.add_argument('WORKER', nargs = '+',
help = 'ID of a worker')
sub.set_defaults(f = notify_workers, a = lambda:
[args.SUBJECT, args.MESSAGE, args.WORKER])
sub = subs.add_parser('give-qual',
help = 'give a qualification to some workers')
sub.add_argument('QUAL',
help = 'ID of the qualification')
sub.add_argument('WORKER', nargs = '+',
help = 'ID of a worker')
sub.add_argument('-v', '--value', dest = 'value',
metavar = 'N', type = int, default = 1,
help = 'value of the qualification')
sub.add_argument('--dontnotify', dest = 'notify',
action = 'store_false', default = True,
help = "don't notify workers")
sub.set_defaults(f = give_qualification, a = lambda:
[args.QUAL, args.WORKER, args.value, args.notify])
sub = subs.add_parser('revoke-qual',
help = 'revoke a qualification from some workers')
sub.add_argument('QUAL',
help = 'ID of the qualification')
sub.add_argument('WORKER', nargs = '+',
help = 'ID of a worker')
sub.add_argument('-m', '--message', dest = 'message',
metavar = 'TEXT',
help = 'the reason the qualification was revoked (shown to workers in an email sent by MTurk)')
sub.set_defaults(f = revoke_qualification, a = lambda:
[args.QUAL, args.WORKER, args.message])
args = parser.parse_args()
init_by_args(args)
f = args.f
a = args.a()
if isinstance(a, dict):
# We do some introspective gymnastics so we can produce a
# less incomprehensible error message if some arguments
# are missing.
spec = inspect.getargspec(f)
missing = set(spec.args[: len(spec.args) - len(spec.defaults)]) - set(a.keys())
if missing:
raise ValueError('Missing arguments: ' + ', '.join(missing))
doit = lambda: f(**a)
else:
doit = lambda: f(*a)
try:
x = doit()
except boto.mturk.connection.MTurkRequestError as e:
print 'MTurk error:', e.error_message
sys.exit(1)
if x is not None:
print x
save_nicknames()