Here's our Python code for running a Twitter bot

Here's the code for 2 of our Twitter bots 

You can email me ([email protected]) for any clarifications!
 
 
BOT1 - runs a simple ruby quiz

import tweepy
import logging
import time
import urllib
import random
import subprocess
import commands

# Author: Vaibhav Goel, [email protected]

# Consumer keys and access tokens, used for OAuth
consumer_key = REDACTED
consumer_secret = REDACTED
access_token = REDACTED
access_token_secret = REDACTED
 
# OAuth process, using the keys and tokens
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
 
# Creation of the actual interface, using authentication
api = tweepy.API(auth)
 
# Sample method, used to update a status
#x = api.update_status('test 3 Hello Python Central! #Bot2.0')
#print x.id
#y = api.update_status("very nice to see you, handshake", in_reply_to_status_id = x.id)

def diff(first, second):
        second = set(second)
	secondIds = [x.id for x in second]
        return [item for item in first if item.id not in secondIds]

def tweet_url(t):
    return "https://twitter.com/%s/status/%s" % (t.user.screen_name, t.id)



def get_replies(tweet):
    user = tweet.user.screen_name
    tweet_id = tweet.id
    max_id = None
    logging.info("looking for replies to: %s" % tweet_url(tweet))
    while True:
        try:
            replies = api.search(q="to:learning_pt", since_id=tweet_id, max_id=max_id, count=100)
        except tweepy.TweepError as e:
            logging.error("caught twitter api error: %s", e)
            time.sleep(60)
            continue
        for reply in replies:
            logging.info("examining: %s" % tweet_url(reply))
            if reply.in_reply_to_status_id == tweet_id:
                logging.info("found reply: %s" % tweet_url(reply))
                yield reply
                # recursive magic to also get the replies to this reply
                for reply_to_reply in get_replies(reply):
                    yield reply_to_reply
            max_id = reply.id
        if len(replies) != 100:
            break


questions = [("For a string of semi-colon separated integers, write Ruby code to sum the first N numbers (results separated by semi-colons). (1 <= N <= 1000000)? \n If Input is 1;2;4 the output should be 1;3;10 (sum of first 1 number = 1, first 2 numbers = 3.. so on)",0,"1;100;1000000;10000000","1;5050;500000500000;50000005000000")]



questionTweets = [] 

for quizQ in questions:
	if quizQ[1] == 0:
		x = api.update_status(quizQ[0] + "  #TLPCodeQuizBot2")
	else:
		x = api.update_with_media(quizQ[1], quizQ[0] + " Respond directly to this tweet (NOT to tweets below). #TLPCodeQuizBot4")
	questionTweets += [x]
	

scanned_replies = []

congratulatoryStrings = [ a + " " + b for a in ["Aha, that is ", "Yes! That is ", " You got it, that is ", "Yup! That's", "Well done, that is "] for b in ["the right answer", "the correct answer", "correct!", " spot on! Congratulations"]]

incorrect = ["Thanks for trying. Try a bit more!", "Thanks for taking a stab. Try again!", " Think a bit harder :)", "Not yet there ... take another shot","That's not the answer I'm afraid.", "Not really. Try again!", "Thanks for trying. Try a bit harder!",  "Thx for trying. Try a bit harder!", "Thx for trying. Take another shot at it!", "Nopes :( Try again."]

solved = [0 for x in questions ]

def checkAnswer(reply, answer, query):
	command = 'echo \"' + query + '\"| ruby -e \'' + reply.strip() + '\''
	print command
	print reply
	replyOutput =  commands.getoutput(command) #open('outputAnswer','r').read()
	print replyOutput
	print answer
	if answer in replyOutput:
		return True
	else:
		return False
	
	

while (True):
	for i in xrange(0,len(questions)):
		if solved[i] == 1:
			continue
		x = questionTweets[i]
		q = questions[i] 
		replies = get_replies(x)
		reply_list = set(diff(list(replies), scanned_replies))
		print "*** Replies**"
		for r in reply_list:
			print r.id
			print r.in_reply_to_status_id
		print "**"
		print len(reply_list)
		for reply in set(replies):
			print reply.text 
		for reply in (set(diff(reply_list, scanned_replies))):
			if reply.in_reply_to_status_id == x.id:
				try:
					reply_text = reply.text
					print reply_text
					for url in reply.entities['urls']:
						reply_text = reply_text.replace(url['url'],url['display_url'])
					print reply_text
					if checkAnswer(reply_text,q[3],q[2]):
						api.update_status("@" + reply.author.screen_name + " " +  random.choice(congratulatoryStrings) + "  #TLPBot3", reply.id) 
						linkToRightAnswer = "https://twitter.com/" + str(reply.author.screen_name) + "/status/" + str(reply.id)
						api.update_status("Congratulations @" + reply.author.screen_name + " Solved!  #TLPBot " + linkToRightAnswer) 
						solved[i] = 1
					else:
						api.update_status("@" + reply.author.screen_name + " " + random.choice(incorrect) + " #TLPBot", reply.id) 
						print "@" + reply.author.screen_name + " Thanks for trying. #TLPBot"
				except tweepy.TweepError as e:
					logging.error("caught twitter api error: %s", e)
					continue
			
		scanned_replies += list(reply_list)
		print "*** Scanned Replies**"
		for scanned in scanned_replies:
			print scanned.id
			print scanned.in_reply_to_status_id
		print "**"
	time.sleep(10)



BOT 2 - Tries to make a sketch out of an image 

import tweepy
import logging
import time
import urllib
import random
import urllib
import cv2
import numpy as np
import matplotlib.pyplot as plt

# Author: Vaibhav Goel, [email protected]
# Modified version of bots created by Prashant :) 
def convert(): img = cv2.imread("/home/vaibhav1994/Desktop/tweepy/test.JPG") sketch_gray, sketch_color = cv2.pencilSketch(img, sigma_s=50, sigma_r=0.07, shade_factor=0.05) stylized = cv2.stylization(img, sigma_s=60, sigma_r=0.07) cv2.imwrite("test_sketch.JPG",stylized) # Consumer keys and access tokens, used for OAuth consumer_key = REDACTED consumer_secret = REDACTED #'oaImVO5gCpxIV7L23ZW8TzaKkNX5LhXpG1hmqfMRws' access_token = REDACTED #'549856297-aQCu355sQXw5YNrbaLPmOE0u2qlNspATfgH9emAA' access_token_secret = REDACTED #'nKMSB6mKbcn6a5j4t2LbWHclPswr5vm3dBhsEKVnCVTGW' # OAuth process, using the keys and tokens auth = tweepy.OAuthHandler(consumer_key, consumer_secret) auth.set_access_token(access_token, access_token_secret) # Creation of the actual interface, using authentication api = tweepy.API(auth) # Sample method, used to update a status #x = api.update_status('test 3 Hello Python Central! #Bot2.0') #print x.id #y = api.update_status("very nice to see you, handshake", in_reply_to_status_id = x.id) def diff(first, second): second = set(second) secondIds = [x.id for x in second] return [item for item in first if item.id not in secondIds] def tweet_url(t): return "https://twitter.com/%s/status/%s" % (t.user.screen_name, t.id) def get_replies(tweet): user = tweet.user.screen_name tweet_id = tweet.id max_id = None logging.info("looking for replies to: %s" % tweet_url(tweet)) while True: try: replies = api.search(q="#TLPSketchPic to:learning_pt", since_id=tweet_id, max_id=max_id, count=100) except tweepy.TweepError as e: logging.error("caught twitter api error: %s", e) time.sleep(60) continue for reply in replies: logging.info("examining: %s" % tweet_url(reply)) if reply.in_reply_to_status_id == tweet_id: logging.info("found reply: %s" % tweet_url(reply)) yield reply # recursive magic to also get the replies to this reply for reply_to_reply in get_replies(reply): yield reply_to_reply max_id = reply.id if len(replies) != 100: break questions = ["#BotTesting Want to see your pic 'Stylized'? Follow us, reply to this tweet with the hashtag #TLPSketchPic and attach a JPG image."] questionTweets = [] for quizQ in questions: x = api.update_status(quizQ) questionTweets += [x] scanned_replies = [] congratulatoryStrings = [ a + " " + b for a in ["Aha, that is ", "Yes! That is ", " You got it, that is ", "Yup! That's", "Well done, that is "] for b in ["the right answer", "the correct answer", "correct!", " spot on! Congratulations"]] sketch = ["Here you go!", "Here's the sketch!", "And here we have a pencil sketch for you..", "The magic of picture to sketch is right here for you!", "Here it is!", "It's ready!" ] solved = [0 for x in questions ] def checkAnswer(reply): if "tlpsketchpic" in reply.lower(): return True else: return False while (True): for i in xrange(0,len(questions)): if solved[i] == 1: continue x = questionTweets[i] q = questions[i] replies = get_replies(x) reply_list = set(diff(list(replies), scanned_replies)) print "*** Replies**" for r in reply_list: print r.id print r.in_reply_to_status_id print "**" print len(reply_list) for reply in set(replies): print reply.text for reply in (set(diff(reply_list, scanned_replies))): try: #if (api.show_friendship(target_screen_name=reply.author.screen_name)[1].following) or (reply.author.screen_name == "learning_pt"): media = reply.entities.get('media', []) urllib.urlretrieve(media[0]['media_url'], "test.JPG") convert() api.update_with_media("test_sketch.JPG", "@" + reply.author.screen_name + " " + random.choice(sketch) + " #TLPBot", in_reply_to_status_id = reply.id) #else: #api.update("@" + reply.author.screen_name + " - Well, you need to follow us first!", reply.id) except tweepy.TweepError as e: logging.error("caught twitter api error: %s", e) continue scanned_replies += list(reply_list) print "*** Scanned Replies**" for scanned in scanned_replies: print scanned.id print scanned.in_reply_to_status_id print "**" time.sleep(30)
Comments