Python源码示例:praw.Reddit()
示例1
def main():
try:
logger.debug('Logging in as /u/' + username)
r = praw.Reddit(client_id=app_key,
client_secret=app_secret,
username=username,
password=password,
user_agent=username)
# Get the submission and the comments
submission = r.submission(id=link_id)
submission.comments.replace_more(limit=None, threshold=0)
flat_comments = submission.comments.list()
for comment in flat_comments:
logger.debug("Processing comment: " + comment.id)
if not hasattr(comment, 'author'):
continue
if comment.is_root is True:
heatware = re.search(regex, comment.body)
if heatware:
url = heatware.group(0)
if not comment.author_flair_text:
replies_flat = comment.replies.list()
for reply in replies_flat:
if reply.author:
if str(reply.author.name) == username:
break
else:
if comment.author:
if comment.author_flair_css_class:
r.subreddit(subreddit).flair.set(comment.author, url, comment.author_flair_css_class)
else:
r.subreddit(subreddit).flair.set(comment.author, url, '')
logger.info('Set ' + comment.author.name + '\'s heatware to ' + url)
if reply:
comment.reply(reply)
except Exception as e:
logger.error(e)
示例2
def main():
parser = argparse.ArgumentParser(description="Import flairs to subreddit")
parser.add_argument("-f", "--file", dest="filename", help="input file", metavar="FILE", type=extant_file, required=True)
parser.add_argument("-t", "--type", dest="filetype", help="json or csv", metavar="TYPE", type=str, choices=['json', 'csv'], required=True)
args = parser.parse_args()
r = praw.Reddit(client_id=app_key,
client_secret=app_secret,
username=username,
password=password,
user_agent=username)
if args.filetype == "json":
r.subreddit(subreddit).flair.update(load_json(args.filename))
elif args.filetype == "csv":
r.subreddit(subreddit).flair.update(load_csv(args.filename))
示例3
def get_reddit_object(token):
try:
reddit = praw.Reddit(user_agent='reddit_utils web app by Roman Kuleshov',
client_id=token['client_id'],
client_secret=token['client_secret'],
username=token['username'],
password=token['password'])
reddit.user.me()
return {'status': 'success', 'data': reddit}
except OAuthException as err:
return {'status': 'error', 'data': 'Error: Unable to get API access, please make sure API credentials are correct and try again (check the username and password first)'}
except ResponseException as err:
return {'status': 'error', 'data': 'Error: ResponseException: ' + str(err)}
except Exception as err:
return {'status': 'error', 'data': 'Unexpected Error: ' + str(err)}
示例4
def _getAllComments(self, curComments):
"""
Get all the comments of a Reddit submission in a nice, JSON-formatted hierarchy of comments and replies.
Uses recursion to get the comment hierarchy.
:type curComments: list
:rtype: dict
"""
comments = {}
for comment in curComments:
if isinstance(comment, praw.objects.Comment): # Make sure it isn't a MoreComments object
author = comment.author
if author is None:
author = "[Deleted]"
else:
author = author.name
if comments.get(
author) is not None: # We make this a list in case the author comments multiple times in the submission on the same level of the comment tree
comments[author].append({'Body': comment.body, 'Replies': self._getAllComments(comment.replies)})
else:
comments[author] = [{'Body': comment.body, 'Replies': self._getAllComments(comment.replies)}]
return comments
示例5
def login(reddit_oauth_credentials_path):
####################################################################################################################
# Log into my application.
####################################################################################################################
user_agent, client_id, client_secret, redirect_uri = read_oauth_credentials(reddit_oauth_credentials_path)
reddit = praw.Reddit(user_agent=user_agent)
reddit.set_oauth_app_info(client_id=client_id,
client_secret=client_secret,
redirect_uri=redirect_uri)
# We do this in order to also keep the json files for storage
reddit.config.store_json_result = True
return reddit
示例6
def load_log():
"""Reads the processed posts log file and creates it if it doesn't exist.
Returns
-------
list
A list of Reddit posts ids.
"""
try:
with open(POSTS_LOG, "r", encoding="utf-8") as log_file:
return log_file.read().splitlines()
except FileNotFoundError:
with open(POSTS_LOG, "a", encoding="utf-8") as log_file:
return []
示例7
def update_log(post_id):
"""Updates the processed posts log with the given post id.
Parameters
----------
post_id : str
A Reddit post id.
"""
with open(POSTS_LOG, "a", encoding="utf-8") as log_file:
return log_file.write("{}\n".format(post_id))
示例8
def submissions_top(redditor):
top_s = []
for submission in redditor.submissions.top('all', limit=10):
sub = {}
sub['Title'] = submission.title.encode('utf-8')
sub['URL'] = submission.url.encode('utf-8')
sub['Subreddit'] = str(submission.subreddit)
sub['Created Date'] = datetime.fromtimestamp(submission.created).strftime("%D %H:%M")
sub['Score'] = submission.score
sub['Comments'] = submission.num_comments
sub['Crossposts'] = submission.num_crossposts
sub['Mature Content'] = submission.over_18
if submission.media:
if 'oembed' in submission.media:
if 'url' in submission.media['oembed']:
sub['Embedded Media Description'] = submission.media['oembed']['description']
if 'url' in submission.media['oembed']:
sub['Embedded Media URL'] = submission.media['oembed']['url']
if 'reddit_video' in submission.media:
if 'fallback_url' in submission.media['reddit_video']:
sub['Reddit Video URL'] = submission.media['reddit_video']['fallback_url']
top_s.append(sub)
return top_s
示例9
def main(username):
user_stats = {}
reddit_id = vault.get_key('reddit_id')
reddit_secret = vault.get_key('reddit_secret')
user_agent = 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.11 (KHTML, like Gecko) Chrome/23.0.1271.64 Safari/537.11'
reddit = praw.Reddit(client_id=reddit_id, client_secret=reddit_secret,
user_agent=user_agent)
redditor = reddit.redditor(username)
try:
user_stats['Redditor Stats'] = redditor_stats(redditor)
user_stats['Top 10 Submitted to Subreddits'] = submission_stats(redditor)
user_stats['Top 10 Commented in Subreddits'] = comment_stats(redditor)
if EXTRA_VERBOSE:
user_stats['Top Submissions'] = submissions_top(redditor)
user_stats['Top Comments'] = comments_top(redditor)
user_stats['Contriversial Posts'] = controversial_stats(redditor)
except NotFound as e:
user_stats['Error'] = str(e)
pass
return user_stats
示例10
def main():
match_text = "INSERT TEXT HERE"
r = praw.Reddit(
user_agent=config.USER_AGENT,
client_id=config.R_CLIENT_ID,
client_secret=config.R_CLIENT_SECRET,
username=config.R_USERNAME,
password=config.R_PASSWORD,
)
user = r.redditor(config.R_USERNAME)
comments = list(user.comments.new())
count = 0
for c in comments:
if match_text in c.body:
c.delete()
count += 1
print "Comments deleted: {}".format(count)
示例11
def log(message, alert=False):
"""Log messages along with a timestamp in a log file. If the alert
option is set to true, send a message to the admin's reddit inbox.
"""
t = time.strftime('%y-%m-%d %H:%M:%S', time.localtime())
message = "{}: {}\n".format(t, message)
message = message.encode('utf8', 'replace')
if config.LOG_FILE:
with open(config.LOG_FILE, 'a') as f:
f.write(message)
else:
print(message, end='')
if alert and config.ADMIN:
r = praw.Reddit(config.USER_AGENT)
r.login(config.R_USERNAME, config.R_PASSWORD)
admin_alert = message
subject = "CompileBot Alert"
r.redditor(config.ADMIN).message(subject, admin_alert)
示例12
def main():
r = praw.Reddit(
user_agent=config.USER_AGENT,
client_id=config.R_CLIENT_ID,
client_secret=config.R_CLIENT_SECRET,
username=config.R_USERNAME,
password=config.R_PASSWORD,
)
if config.SUBREDDIT:
config.BANNED_USERS
config.BANNED_USERS = get_banned(r)
# Iterate though each new comment/message in the inbox and
# process it appropriately.
inbox = r.inbox.unread()
for new in inbox:
try:
process_unread(new, r)
except:
tb = traceback.format_exc()
# Notify admin of any errors
log("Error processing comment {c.id}\n"
"{traceback}".format(c=new, traceback=code_block(tb)), alert=True)
finally:
new.mark_read()
示例13
def start():
"""Start the sneakpeek application."""
logging.info("Starting application")
logging.info("Instantiating Reddit instance")
reddit = praw.Reddit(
client_id=config.CLIENT["ID"],
client_secret=config.CLIENT["SECRET"],
user_agent=config.USER_AGENT,
username=config.USERNAME,
password=config.PASSWORD)
try:
scan(reddit.subreddit(config.SUBREDDIT))
except Exception as exception:
# This should never happen,
# because it breaks the infinite subreddit monitoring
# provided by subreddit.stream.submissions()
logging.critical("Exception occurred while scanning. This should never happen.")
logging.critical(exception)
示例14
def search_subreddit(name, query, sort="relevance", limit=500, filter_nsfw=True):
if has_reddit_access():
try:
submissions = list(
get_subreddit(name).search(query, sort=sort, limit=limit)
)
if filter_nsfw:
submissions = [
submission for submission in submissions if not submission.over_18
]
return submissions
except ResponseException as err:
logger.error("Exception with accessing Reddit: {}".format(err))
except RequestException as err:
logger.error("Exception with accessing Reddit: {}".format(err))
else:
logger.warning("WARNING: No reddit access!")
示例15
def createPrawConfig(client_id, client_secret,
praw_path='praw.ini'):
"""
Create praw.ini file for Reddit credentials.
Parameters
----------
client_id: str
Reddit app client id.
client_secret: str
Reddit app client secret.
praw_path: str
Path to praw.ini.
"""
r_config = ConfigParser()
r_config['bot1'] = {
'client_id': client_id,
'client_secret': client_secret,
'user_agent': 'FreshScript'
}
with open(praw_path, 'w') as p:
r_config.write(p)
示例16
def setupCron(self):
cron = CronTab()
cron_setting = textwrap.dedent("""\
┌───────────── minute (0 - 59)
│ ┌───────────── hour (0 - 23)
│ │ ┌───────────── day of month (1 - 31)
│ │ │ ┌───────────── month (1 - 12)
│ │ │ │ ┌───────────── day of week (0 - 6) (Sunday to Saturday;
│ │ │ │ │ 7 is also Sunday on some systems)
│ │ │ │ │
│ │ │ │ │
* * * * * command to execute
""")
choice = input(cron_setting)
# Reddit class
示例17
def get_reddit_posts(subreddit_info):
post_dict = {}
print('[ OK ] Getting posts from Reddit...')
for submission in subreddit_info.hot(limit=POST_LIMIT):
if (submission.over_18 and NSFW_POSTS_ALLOWED is False):
# Skip over NSFW posts if they are disabled in the config file
print('[ OK ] Skipping', submission.id, 'because it is marked as NSFW')
continue
elif (submission.is_self and SELF_POSTS_ALLOWED is False):
# Skip over NSFW posts if they are disabled in the config file
print('[ OK ] Skipping', submission.id, 'because it is a self post')
continue
elif (submission.spoiler and SPOILERS_ALLOWED is False):
# Skip over posts marked as spoilers if they are disabled in the config file
print('[ OK ] Skipping', submission.id, 'because it is marked as a spoiler')
continue
elif (submission.stickied):
print('[ OK ] Skipping', submission.id, 'because it is stickied')
continue
else:
# Create dict
post_dict[submission.id] = submission
return post_dict
示例18
def get_reddit_posts(subreddit_info):
post_dict = {}
print('[ OK ] Getting posts from Reddit...')
for submission in subreddit_info.hot(limit=POST_LIMIT):
if (submission.over_18 and NSFW_POSTS_ALLOWED is False):
# Skip over NSFW posts if they are disabled in the config file
print('[ OK ] Skipping', submission.id, 'because it is marked as NSFW')
continue
elif (submission.is_self and SELF_POSTS_ALLOWED is False):
# Skip over NSFW posts if they are disabled in the config file
print('[ OK ] Skipping', submission.id, 'because it is a self post')
continue
elif (submission.spoiler and SPOILERS_ALLOWED is False):
# Skip over posts marked as spoilers if they are disabled in the config file
print('[ OK ] Skipping', submission.id, 'because it is marked as a spoiler')
continue
elif (submission.stickied):
print('[ OK ] Skipping', submission.id, 'because it is stickied')
continue
else:
# Create dict
post_dict[submission.id] = submission
return post_dict
示例19
def Load_Configuration():
File_Dir = os.path.dirname(os.path.realpath('__file__'))
Configuration_File = os.path.join(File_Dir, 'plugins/common/config/config.json')
logging.info(f"{General.Date()} - {__name__.strip('plugins.')} - Loading configuration data.")
try:
with open(Configuration_File) as JSON_File:
Configuration_Data = json.load(JSON_File)
Reddit_Details = Configuration_Data[Plugin_Name.lower()]
Reddit_Client_ID = Reddit_Details['client_id']
Reddit_Client_Secret = Reddit_Details['client_secret']
Reddit_User_Agent = Reddit_Details['user_agent']
Reddit_Username = Reddit_Details['username']
Reddit_Password = Reddit_Details['password']
Subreddit_to_Search = Reddit_Details["subreddits"]
if Reddit_Client_ID and Reddit_Client_Secret and Reddit_User_Agent and Reddit_Username and Reddit_Password and Subreddit_to_Search:
return [Reddit_Client_ID, Reddit_Client_Secret, Reddit_User_Agent, Reddit_Username, Reddit_Password, Subreddit_to_Search]
else:
return None
except:
logging.warning(f"{General.Date()} - {__name__.strip('plugins.')} - Failed to load Reddit details.")
示例20
def scrape_reddit(url):
reddit = praw.Reddit(user_agent='Comment Extraction (by /u/USERNAME)',
client_id='INSERT CLIENT ID', client_secret="INSERT CLIENT SECRET")
submission = reddit.submission(url=URL2)
output = dict()
output['Submission'] = submission.title
output['Comments'] = list()
submission.comments.replace_more(limit=0)
for top_level_comment in submission.comments:
ans = dict()
try:
ans['Author'] = top_level_comment.author.name
except:
ans['Author'] = "Hidden"
ans['content'] = top_level_comment.body
output['Comments'].append(ans)
json_object = json.dumps(output, ensure_ascii=False, indent=4)
io.open('reddit_output.json', 'w').write(json_object)
示例21
def reddit() -> praw.Reddit:
"""Lazily fetch singleton PRAW instance."""
global _reddit_instance
assert REDDIT_CLIENT_ID
assert REDDIT_CLIENT_SECRET
assert REDDIT_USER
assert REDDIT_PASSWORD
if not _reddit_instance:
_reddit_instance = praw.Reddit(
client_id=REDDIT_CLIENT_ID,
client_secret=REDDIT_CLIENT_SECRET,
user_agent=USERAGENT,
username=REDDIT_USER,
password=REDDIT_PASSWORD,
)
return _reddit_instance
示例22
def reddit_authorize_callback():
r = praw.Reddit(user_agent=app.config["REDDIT_WEB_APP_USER_AGENT"])
r.set_oauth_app_info(app.config['REDDIT_API_ID'], app.config['REDDIT_API_SECRET'], url_for('.reddit_authorize_callback', _external=True))
name = None
code = request.args.get('code', '')
if code:
r.get_access_information(code)
name = r.get_me().name
if name:
user = get_or_create(Streamer, reddit_username=name)
db.session.commit()
login_user(user, remember=True)
flash("Logged in successfully!", 'success')
if not name:
flash("An error occurred while trying to log in.", 'error')
next_url = session.pop('next_url_after_login', url_for("streamer_page", streamer_name=name))
return redirect(next_url)
示例23
def __init__(self,
parser,
user_name,
user_pass,
subreddits='all',
user_agent='redditreplier v{} by /u/naiyt'.format(__version__),
limit=1000,
debug=False):
print("Setting things up...")
self.parser = parser
self.user_agent = user_agent
self.subreddits = subreddits
self.user_name = user_name
self.user_pass = user_pass
self.limit = limit
self.debug = debug
self.r = praw.Reddit(self.user_agent)
self.blacklist = self._setup_blacklist('BLACKLIST.txt')
self.rest_time = 3
self.comments_replied_to = 0
示例24
def prepare_the_message(spool):
message_template = ("**Name**: {0}\n\n**Author**: {1}\n\n**Avg Rating**: "
"{2} by {3} users\n\n**Description**: {4}\n\n Pages: "
"{5}, Year: {6}")
message = ""
for book in spool:
message += message_template.format(book['title'],
book['authors'],
book['average_rating'],
book['ratings_count'],
html_to_md(book['description']),
book['num_pages'],
book['publication_year'])
message += '\n\n---\n\n'
message += ('^(Bleep, Blop, Bleep! I am still in beta, please be nice. '
'Contact )[^(my creator)](https://www.reddit.com/message/'
'compose/?to=avinassh) ^(for feedback, bug reports or just to '
'say thanks! The code is on )[^github](https://github.com/'
'avinassh/Reddit-GoodReads-Bot)^.')
return message
示例25
def main():
reddit = praw.Reddit(
client_id = API["client_id"],
client_secret = API["client_secret"],
user_agent = API["user_agent"],
username = API["username"],
password = API["password"])
Run(reddit).run_urs()
示例26
def login():
r = praw.Reddit(client_id=app_key,
client_secret=app_secret,
username=username,
password=password,
user_agent=username)
return(r)
示例27
def login():
r = praw.Reddit(client_id=app_key,
client_secret=app_secret,
username=username,
password=password,
user_agent=username)
return(r)
示例28
def main():
app = QApplication(sys.argv)
rddtDataExtractor = loadState()
if rddtDataExtractor is None:
rddtDataExtractor = RedditDataExtractor()
else:
# If something weird happened to cause currentlyDownloading to be saved as True, set it back to False
rddtDataExtractor.currentlyDownloading = False
# reinstantiate the praw instance because it doesn't shelve properly
# praw shelve issue causes http.validate_certs to be uninstantiated
rddtDataExtractor._r = praw.Reddit(user_agent='Data Extractor for reddit v1.1 by /u/VoidXC')
rddtDataExtractor._r.http.validate_certs = 'RedditDataExtractor/cacert.pem'
queue = Queue()
thread = QThread()
recv = QueueMessageReceiver(queue)
mainGUIWindow = RddtDataExtractorGUI(rddtDataExtractor, queue, recv)
recv.queuePutSignal.connect(mainGUIWindow.append_text)
recv.moveToThread(thread)
thread.started.connect(recv.run)
# Add clean up finished signals so the threads end appropriately when the program ends
recv.finished.connect(thread.quit)
recv.finished.connect(recv.deleteLater)
thread.finished.connect(thread.deleteLater)
# start the receiver
thread.start()
# show the GUI
mainGUIWindow.show()
# display Imgur API pop up if not hidden by user and client-id isn't set
if rddtDataExtractor.showImgurAPINotification and rddtDataExtractor.imgurAPIClientID is None:
mainGUIWindow.notifyImgurAPI()
# and wait for the user to exit
sys.exit(app.exec_())
示例29
def __init__(self, _subreddit, _outfile, _start_date, _end_date, step=3600):
self.r = praw.Reddit(site_name='graphbrain', user_agent='GraphBrain (http://graphbrain.org)')
self.subreddit = _subreddit
self.output_file = _outfile
self.step = step
self.start_ts = int(time.mktime(datetime.datetime.strptime(_start_date, "%d/%m/%Y").timetuple()))
self.end_ts = int(time.mktime(datetime.datetime.strptime(_end_date, "%d/%m/%Y").timetuple()))
self.cur_ts = self.start_ts
self.posts = 0
self.comments = 0
self.retry_wait = 30
示例30
def get_reddit(credfile = 'reddit_credentials.json'):
'''
Initiate the connexion to Reddit API by using the reddit_credentials
stored in credfile (json file).
Credentials should be stored as:
{"client_id":"XX1234XXX",
"client_secret":"xxx1234xxx",
"user_agent":"linux:xxx (by /u/xxx)"}
'''
with open(credfile) as json_data:
cred = json.load(json_data)
reddit = praw.Reddit(client_id= cred['client_id'],
client_secret= cred ['client_secret'],
user_agent= cred['user_agent'])
return reddit