Clyde as a Twitter Monitor

This week, I will be presenting at PyLadies. Just a little pet project of mine.


Clyde is a lamp that we can hack. It’s Arduino-based and has some cool functionalities in the basic software. However there isn’t much space left to code anything else, if you want to keep that out-of-the-box functionality.  As Serial Communications are supported, I came up with the idea of pairing it with a Raspberry Pi, and coding the Pi instead.

Goal: Getting the Pi to monitor Twitter for specific hashtags, and control the color of Clyde’s eye based on trendiness of the target hashtags.  Three hashtags can be monitored, and mapped to R,G and B.  It’s best if the hashtags are *not* trending. Trying to follow the Superbowl was not a success, as the lamp was simply white (at max value) all the time. But it’s a good thing when you want to be notified of a non-trending hashtag.

CODE:

from twython import Twython
import time
import serial
import sys
COUNTDOWN = 6
DELAY = 60
COUNTTWEETS = 5
R_HASH = "#ClydeRed"
B_HASH = "#ClydeBlue"
G_HASH = "#ClydeGreen"
ClydeRGB = [0,0,0]
firstMin = True
colorInc = 0
for i in range(COUNTDOWN):
colorInc += i
COLORMULTIPLIER = 255 / (COUNTTWEETS * colorInc)
print COLORMULTIPLIER, COUNTTWEETS, COUNTDOWN,colorInc
########################################################################
def authenticate(): #https://apps.twitter.com/
global t
TWITTER_APP_KEY = 'GET_YOUR_OWN_KEY'
TWITTER_APP_KEY_SECRET = 'GET_YOUR_OWN_KEY'
TWITTER_ACCESS_TOKEN = 'GET_YOUR_OWN_KEY'
TWITTER_ACCESS_TOKEN_SECRET = 'GET_YOUR_OWN_KEY'
t = Twython(app_key=TWITTER_APP_KEY,
app_secret=TWITTER_APP_KEY_SECRET,
oauth_token=TWITTER_ACCESS_TOKEN,
oauth_token_secret=TWITTER_ACCESS_TOKEN_SECRET)
########################################################################
def calculateColor(intag):
color = 0
for k in intag:
color = color + (intag[k]*COLORMULTIPLIER)
if color > 255:
color = 255
return color
########################################################################
def calculateRGB():
global ClydeRGB
rColor = calculateColor(rHashtags)
gColor = calculateColor(gHashtags)
bColor = calculateColor(bHashtags)
ClydeRGB = [rColor,gColor,bColor]
return ClydeRGB
########################################################################
def calculateCountDown(hashtags):
for k in hashtags:
if hashtags[k] > 0:
hashtags[k] = hashtags[k]-1
return hashtags
########################################################################
def displayUser(inTweet):
user = inTweet['user']
print user['screen_name']
########################################################################
def searchKeyword(keyword):
search = t.search(q=keyword,
count=1)
tweets = search['statuses']
for tweet in tweets:
displayUser(tweet)
print tweet['id_str'], '\n', tweet['created_at'], '\n\n\n'
# print tweet['id_str'], '\n', tweet['text'], '\n', tweet['created_at'], '\n\n\n'
########################################################################
def startHashTagSearch(inTag,firstMin):
search = t.search(q=inTag,
count=COUNTTWEETS)
tweets = search['statuses']
hashes = {}
for tweet in tweets:
lastTweetId = tweet['id_str']
if firstMin:
hashes[tweet['id_str']] = 0
else:
hashes[tweet['id_str']] = COUNTDOWN
return hashes
########################################################################
def comparetags(inhashtags,inNewHashtags):
inNewHashtags.update(inhashtags)
return inNewHashtags
########################################################################
ser = serial.Serial('/dev/ttyACM0', 9600, timeout=0)
print ser.name
myrgbstr = "SET_AMBIENT 0 0 0"
ser.write(myrgbstr)
authenticate()
rHashtags = startHashTagSearch(R_HASH,True)
gHashtags = startHashTagSearch(G_HASH,True)
bHashtags = startHashTagSearch(B_HASH,True)
try:
while True:
print R_HASH
rHashtags = comparetags(rHashtags, startHashTagSearch(R_HASH,False))
rHashtags = calculateCountDown(rHashtags)
print G_HASH
gHashtags = comparetags(gHashtags, startHashTagSearch(G_HASH,False))
gHashtags = calculateCountDown(gHashtags)
print B_HASH
bHashtags = comparetags(bHashtags, startHashTagSearch(B_HASH,False))
bHashtags = calculateCountDown(bHashtags)
calculateRGB()
myrgbstr = "SET_AMBIENT "+ " ".join([str(x) for x in ClydeRGB]) + "\n"
print myrgbstr
ser.write(myrgbstr)
print "waiting ", DELAY/60, " min"
time.sleep(DELAY)
except:
ser.close()
print "dead", sys.exc_info()[0]
view raw Clyde_Twitter.py hosted with ❤ by GitHub

from twython import Twython
import time
import serial
import sys
COUNTDOWN = 6
DELAY = 60
COUNTTWEETS = 5
R_HASH = "#ClydeRed"
B_HASH = "#ClydeBlue"
G_HASH = "#ClydeGreen"
ClydeRGB = [0,0,0]
firstMin = True
colorInc = 0
for i in range(COUNTDOWN):
colorInc += i
COLORMULTIPLIER = 255 / (COUNTTWEETS * colorInc)
print COLORMULTIPLIER, COUNTTWEETS, COUNTDOWN,colorInc
########################################################################
def authenticate(): #https://apps.twitter.com/
global t
TWITTER_APP_KEY = 'GET_YOUR_OWN_KEY'
TWITTER_APP_KEY_SECRET = 'GET_YOUR_OWN_KEY'
TWITTER_ACCESS_TOKEN = 'GET_YOUR_OWN_KEY'
TWITTER_ACCESS_TOKEN_SECRET = 'GET_YOUR_OWN_KEY'
t = Twython(app_key=TWITTER_APP_KEY,
app_secret=TWITTER_APP_KEY_SECRET,
oauth_token=TWITTER_ACCESS_TOKEN,
oauth_token_secret=TWITTER_ACCESS_TOKEN_SECRET)
########################################################################
def calculateColor(intag):
color = 0
for k in intag:
color = color + (intag[k]*COLORMULTIPLIER)
if color > 255:
color = 255
return color
########################################################################
def calculateRGB():
global ClydeRGB
rColor = calculateColor(rHashtags)
gColor = calculateColor(gHashtags)
bColor = calculateColor(bHashtags)
ClydeRGB = [rColor,gColor,bColor]
return ClydeRGB
########################################################################
def calculateCountDown(hashtags):
for k in hashtags:
if hashtags[k] > 0:
hashtags[k] = hashtags[k]-1
return hashtags
########################################################################
def displayUser(inTweet):
user = inTweet['user']
print user['screen_name']
########################################################################
def searchKeyword(keyword):
search = t.search(q=keyword,
count=1)
tweets = search['statuses']
for tweet in tweets:
displayUser(tweet)
print tweet['id_str'], '\n', tweet['created_at'], '\n\n\n'
# print tweet['id_str'], '\n', tweet['text'], '\n', tweet['created_at'], '\n\n\n'
########################################################################
def startHashTagSearch(inTag,firstMin):
search = t.search(q=inTag,
count=COUNTTWEETS)
tweets = search['statuses']
hashes = {}
for tweet in tweets:
lastTweetId = tweet['id_str']
if firstMin:
hashes[tweet['id_str']] = 0
else:
hashes[tweet['id_str']] = COUNTDOWN
return hashes
########################################################################
def comparetags(inhashtags,inNewHashtags):
inNewHashtags.update(inhashtags)
return inNewHashtags
########################################################################
ser = serial.Serial('/dev/ttyACM0', 9600, timeout=0)
print ser.name
myrgbstr = "SET_AMBIENT 0 0 0"
ser.write(myrgbstr)
authenticate()
rHashtags = startHashTagSearch(R_HASH,True)
gHashtags = startHashTagSearch(G_HASH,True)
bHashtags = startHashTagSearch(B_HASH,True)
try:
while True:
print R_HASH
rHashtags = comparetags(rHashtags, startHashTagSearch(R_HASH,False))
rHashtags = calculateCountDown(rHashtags)
print G_HASH
gHashtags = comparetags(gHashtags, startHashTagSearch(G_HASH,False))
gHashtags = calculateCountDown(gHashtags)
print B_HASH
bHashtags = comparetags(bHashtags, startHashTagSearch(B_HASH,False))
bHashtags = calculateCountDown(bHashtags)
calculateRGB()
myrgbstr = "SET_AMBIENT "+ " ".join([str(x) for x in ClydeRGB]) + "\n"
print myrgbstr
ser.write(myrgbstr)
print "waiting ", DELAY/60, " min"
time.sleep(DELAY)
except:
ser.close()
print "dead", sys.exc_info()[0]
view raw Clyde_Twitter.py hosted with ❤ by GitHub