This week, I will be presenting at PyLadies. Just a little pet project of mine.
Clyde is a lamp that we can hack. It’s Arduino-based and has some cool functionalities in the basic software. However there isn’t much space left to code anything else, if you want to keep that out-of-the-box functionality. As Serial Communications are supported, I came up with the idea of pairing it with a Raspberry Pi, and coding the Pi instead.
Goal: Getting the Pi to monitor Twitter for specific hashtags, and control the color of Clyde’s eye based on trendiness of the target hashtags. Three hashtags can be monitored, and mapped to R,G and B. It’s best if the hashtags are *not* trending. Trying to follow the Superbowl was not a success, as the lamp was simply white (at max value) all the time. But it’s a good thing when you want to be notified of a non-trending hashtag.
CODE:
from twython import Twython | |
import time | |
import serial | |
import sys | |
COUNTDOWN = 6 | |
DELAY = 60 | |
COUNTTWEETS = 5 | |
R_HASH = "#ClydeRed" | |
B_HASH = "#ClydeBlue" | |
G_HASH = "#ClydeGreen" | |
ClydeRGB = [0,0,0] | |
firstMin = True | |
colorInc = 0 | |
for i in range(COUNTDOWN): | |
colorInc += i | |
COLORMULTIPLIER = 255 / (COUNTTWEETS * colorInc) | |
print COLORMULTIPLIER, COUNTTWEETS, COUNTDOWN,colorInc | |
######################################################################## | |
def authenticate(): #https://apps.twitter.com/ | |
global t | |
TWITTER_APP_KEY = 'GET_YOUR_OWN_KEY' | |
TWITTER_APP_KEY_SECRET = 'GET_YOUR_OWN_KEY' | |
TWITTER_ACCESS_TOKEN = 'GET_YOUR_OWN_KEY' | |
TWITTER_ACCESS_TOKEN_SECRET = 'GET_YOUR_OWN_KEY' | |
t = Twython(app_key=TWITTER_APP_KEY, | |
app_secret=TWITTER_APP_KEY_SECRET, | |
oauth_token=TWITTER_ACCESS_TOKEN, | |
oauth_token_secret=TWITTER_ACCESS_TOKEN_SECRET) | |
######################################################################## | |
def calculateColor(intag): | |
color = 0 | |
for k in intag: | |
color = color + (intag[k]*COLORMULTIPLIER) | |
if color > 255: | |
color = 255 | |
return color | |
######################################################################## | |
def calculateRGB(): | |
global ClydeRGB | |
rColor = calculateColor(rHashtags) | |
gColor = calculateColor(gHashtags) | |
bColor = calculateColor(bHashtags) | |
ClydeRGB = [rColor,gColor,bColor] | |
return ClydeRGB | |
######################################################################## | |
def calculateCountDown(hashtags): | |
for k in hashtags: | |
if hashtags[k] > 0: | |
hashtags[k] = hashtags[k]-1 | |
return hashtags | |
######################################################################## | |
def displayUser(inTweet): | |
user = inTweet['user'] | |
print user['screen_name'] | |
######################################################################## | |
def searchKeyword(keyword): | |
search = t.search(q=keyword, | |
count=1) | |
tweets = search['statuses'] | |
for tweet in tweets: | |
displayUser(tweet) | |
print tweet['id_str'], '\n', tweet['created_at'], '\n\n\n' | |
# print tweet['id_str'], '\n', tweet['text'], '\n', tweet['created_at'], '\n\n\n' | |
######################################################################## | |
def startHashTagSearch(inTag,firstMin): | |
search = t.search(q=inTag, | |
count=COUNTTWEETS) | |
tweets = search['statuses'] | |
hashes = {} | |
for tweet in tweets: | |
lastTweetId = tweet['id_str'] | |
if firstMin: | |
hashes[tweet['id_str']] = 0 | |
else: | |
hashes[tweet['id_str']] = COUNTDOWN | |
return hashes | |
######################################################################## | |
def comparetags(inhashtags,inNewHashtags): | |
inNewHashtags.update(inhashtags) | |
return inNewHashtags | |
######################################################################## | |
ser = serial.Serial('/dev/ttyACM0', 9600, timeout=0) | |
print ser.name | |
myrgbstr = "SET_AMBIENT 0 0 0" | |
ser.write(myrgbstr) | |
authenticate() | |
rHashtags = startHashTagSearch(R_HASH,True) | |
gHashtags = startHashTagSearch(G_HASH,True) | |
bHashtags = startHashTagSearch(B_HASH,True) | |
try: | |
while True: | |
print R_HASH | |
rHashtags = comparetags(rHashtags, startHashTagSearch(R_HASH,False)) | |
rHashtags = calculateCountDown(rHashtags) | |
print G_HASH | |
gHashtags = comparetags(gHashtags, startHashTagSearch(G_HASH,False)) | |
gHashtags = calculateCountDown(gHashtags) | |
print B_HASH | |
bHashtags = comparetags(bHashtags, startHashTagSearch(B_HASH,False)) | |
bHashtags = calculateCountDown(bHashtags) | |
calculateRGB() | |
myrgbstr = "SET_AMBIENT "+ " ".join([str(x) for x in ClydeRGB]) + "\n" | |
print myrgbstr | |
ser.write(myrgbstr) | |
print "waiting ", DELAY/60, " min" | |
time.sleep(DELAY) | |
except: | |
ser.close() | |
print "dead", sys.exc_info()[0] | |
from twython import Twython | |
import time | |
import serial | |
import sys | |
COUNTDOWN = 6 | |
DELAY = 60 | |
COUNTTWEETS = 5 | |
R_HASH = "#ClydeRed" | |
B_HASH = "#ClydeBlue" | |
G_HASH = "#ClydeGreen" | |
ClydeRGB = [0,0,0] | |
firstMin = True | |
colorInc = 0 | |
for i in range(COUNTDOWN): | |
colorInc += i | |
COLORMULTIPLIER = 255 / (COUNTTWEETS * colorInc) | |
print COLORMULTIPLIER, COUNTTWEETS, COUNTDOWN,colorInc | |
######################################################################## | |
def authenticate(): #https://apps.twitter.com/ | |
global t | |
TWITTER_APP_KEY = 'GET_YOUR_OWN_KEY' | |
TWITTER_APP_KEY_SECRET = 'GET_YOUR_OWN_KEY' | |
TWITTER_ACCESS_TOKEN = 'GET_YOUR_OWN_KEY' | |
TWITTER_ACCESS_TOKEN_SECRET = 'GET_YOUR_OWN_KEY' | |
t = Twython(app_key=TWITTER_APP_KEY, | |
app_secret=TWITTER_APP_KEY_SECRET, | |
oauth_token=TWITTER_ACCESS_TOKEN, | |
oauth_token_secret=TWITTER_ACCESS_TOKEN_SECRET) | |
######################################################################## | |
def calculateColor(intag): | |
color = 0 | |
for k in intag: | |
color = color + (intag[k]*COLORMULTIPLIER) | |
if color > 255: | |
color = 255 | |
return color | |
######################################################################## | |
def calculateRGB(): | |
global ClydeRGB | |
rColor = calculateColor(rHashtags) | |
gColor = calculateColor(gHashtags) | |
bColor = calculateColor(bHashtags) | |
ClydeRGB = [rColor,gColor,bColor] | |
return ClydeRGB | |
######################################################################## | |
def calculateCountDown(hashtags): | |
for k in hashtags: | |
if hashtags[k] > 0: | |
hashtags[k] = hashtags[k]-1 | |
return hashtags | |
######################################################################## | |
def displayUser(inTweet): | |
user = inTweet['user'] | |
print user['screen_name'] | |
######################################################################## | |
def searchKeyword(keyword): | |
search = t.search(q=keyword, | |
count=1) | |
tweets = search['statuses'] | |
for tweet in tweets: | |
displayUser(tweet) | |
print tweet['id_str'], '\n', tweet['created_at'], '\n\n\n' | |
# print tweet['id_str'], '\n', tweet['text'], '\n', tweet['created_at'], '\n\n\n' | |
######################################################################## | |
def startHashTagSearch(inTag,firstMin): | |
search = t.search(q=inTag, | |
count=COUNTTWEETS) | |
tweets = search['statuses'] | |
hashes = {} | |
for tweet in tweets: | |
lastTweetId = tweet['id_str'] | |
if firstMin: | |
hashes[tweet['id_str']] = 0 | |
else: | |
hashes[tweet['id_str']] = COUNTDOWN | |
return hashes | |
######################################################################## | |
def comparetags(inhashtags,inNewHashtags): | |
inNewHashtags.update(inhashtags) | |
return inNewHashtags | |
######################################################################## | |
ser = serial.Serial('/dev/ttyACM0', 9600, timeout=0) | |
print ser.name | |
myrgbstr = "SET_AMBIENT 0 0 0" | |
ser.write(myrgbstr) | |
authenticate() | |
rHashtags = startHashTagSearch(R_HASH,True) | |
gHashtags = startHashTagSearch(G_HASH,True) | |
bHashtags = startHashTagSearch(B_HASH,True) | |
try: | |
while True: | |
print R_HASH | |
rHashtags = comparetags(rHashtags, startHashTagSearch(R_HASH,False)) | |
rHashtags = calculateCountDown(rHashtags) | |
print G_HASH | |
gHashtags = comparetags(gHashtags, startHashTagSearch(G_HASH,False)) | |
gHashtags = calculateCountDown(gHashtags) | |
print B_HASH | |
bHashtags = comparetags(bHashtags, startHashTagSearch(B_HASH,False)) | |
bHashtags = calculateCountDown(bHashtags) | |
calculateRGB() | |
myrgbstr = "SET_AMBIENT "+ " ".join([str(x) for x in ClydeRGB]) + "\n" | |
print myrgbstr | |
ser.write(myrgbstr) | |
print "waiting ", DELAY/60, " min" | |
time.sleep(DELAY) | |
except: | |
ser.close() | |
print "dead", sys.exc_info()[0] | |