geored's picture
Upload folder using huggingface_hub
0a06673 verified
raw
history blame
4.42 kB
import logging
from threading import Thread
import monotonic
import backoff
import json
from analytics.request import post, APIError, DatetimeSerializer
try:
from queue import Empty
except ImportError:
from Queue import Empty
MAX_MSG_SIZE = 32 << 10
# Our servers only accept batches less than 500KB. Here limit is set slightly
# lower to leave space for extra data that will be added later, eg. "sentAt".
BATCH_SIZE_LIMIT = 475000
class Consumer(Thread):
"""Consumes the messages from the client's queue."""
log = logging.getLogger('segment')
def __init__(self, queue, write_key, upload_size=100, host=None,
on_error=None, upload_interval=0.5, gzip=False, retries=10,
timeout=15, proxies=None):
"""Create a consumer thread."""
Thread.__init__(self)
# Make consumer a daemon thread so that it doesn't block program exit
self.daemon = True
self.upload_size = upload_size
self.upload_interval = upload_interval
self.write_key = write_key
self.host = host
self.on_error = on_error
self.queue = queue
self.gzip = gzip
# It's important to set running in the constructor: if we are asked to
# pause immediately after construction, we might set running to True in
# run() *after* we set it to False in pause... and keep running
# forever.
self.running = True
self.retries = retries
self.timeout = timeout
self.proxies = proxies
def run(self):
"""Runs the consumer."""
self.log.debug('consumer is running...')
while self.running:
self.upload()
self.log.debug('consumer exited.')
def pause(self):
"""Pause the consumer."""
self.running = False
def upload(self):
"""Upload the next batch of items, return whether successful."""
success = False
batch = self.next()
if len(batch) == 0:
return False
try:
self.request(batch)
success = True
except Exception as e:
self.log.error('error uploading: %s', e)
success = False
if self.on_error:
self.on_error(e, batch)
finally:
# mark items as acknowledged from queue
for _ in batch:
self.queue.task_done()
return success
def next(self):
"""Return the next batch of items to upload."""
queue = self.queue
items = []
start_time = monotonic.monotonic()
total_size = 0
while len(items) < self.upload_size:
elapsed = monotonic.monotonic() - start_time
if elapsed >= self.upload_interval:
break
try:
item = queue.get(
block=True, timeout=self.upload_interval - elapsed)
item_size = len(json.dumps(
item, cls=DatetimeSerializer).encode())
if item_size > MAX_MSG_SIZE:
self.log.error(
'Item exceeds 32kb limit, dropping. (%s)', str(item))
continue
items.append(item)
total_size += item_size
if total_size >= BATCH_SIZE_LIMIT:
self.log.debug(
'hit batch size limit (size: %d)', total_size)
break
except Empty:
break
return items
def request(self, batch):
"""Attempt to upload the batch and retry before raising an error """
def fatal_exception(exc):
if isinstance(exc, APIError):
# retry on server errors and client errors
# with 429 status code (rate limited),
# don't retry on other client errors
return (400 <= exc.status < 500) and exc.status != 429
else:
# retry on all other errors (eg. network)
return False
@backoff.on_exception(
backoff.expo,
Exception,
max_tries=self.retries + 1,
giveup=fatal_exception)
def send_request():
post(self.write_key, self.host, gzip=self.gzip,
timeout=self.timeout, batch=batch, proxies=self.proxies)
send_request()