File size: 4,416 Bytes
0a06673 |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 |
import logging
from threading import Thread
import monotonic
import backoff
import json
from analytics.request import post, APIError, DatetimeSerializer
try:
from queue import Empty
except ImportError:
from Queue import Empty
MAX_MSG_SIZE = 32 << 10
# Our servers only accept batches less than 500KB. Here limit is set slightly
# lower to leave space for extra data that will be added later, eg. "sentAt".
BATCH_SIZE_LIMIT = 475000
class Consumer(Thread):
"""Consumes the messages from the client's queue."""
log = logging.getLogger('segment')
def __init__(self, queue, write_key, upload_size=100, host=None,
on_error=None, upload_interval=0.5, gzip=False, retries=10,
timeout=15, proxies=None):
"""Create a consumer thread."""
Thread.__init__(self)
# Make consumer a daemon thread so that it doesn't block program exit
self.daemon = True
self.upload_size = upload_size
self.upload_interval = upload_interval
self.write_key = write_key
self.host = host
self.on_error = on_error
self.queue = queue
self.gzip = gzip
# It's important to set running in the constructor: if we are asked to
# pause immediately after construction, we might set running to True in
# run() *after* we set it to False in pause... and keep running
# forever.
self.running = True
self.retries = retries
self.timeout = timeout
self.proxies = proxies
def run(self):
"""Runs the consumer."""
self.log.debug('consumer is running...')
while self.running:
self.upload()
self.log.debug('consumer exited.')
def pause(self):
"""Pause the consumer."""
self.running = False
def upload(self):
"""Upload the next batch of items, return whether successful."""
success = False
batch = self.next()
if len(batch) == 0:
return False
try:
self.request(batch)
success = True
except Exception as e:
self.log.error('error uploading: %s', e)
success = False
if self.on_error:
self.on_error(e, batch)
finally:
# mark items as acknowledged from queue
for _ in batch:
self.queue.task_done()
return success
def next(self):
"""Return the next batch of items to upload."""
queue = self.queue
items = []
start_time = monotonic.monotonic()
total_size = 0
while len(items) < self.upload_size:
elapsed = monotonic.monotonic() - start_time
if elapsed >= self.upload_interval:
break
try:
item = queue.get(
block=True, timeout=self.upload_interval - elapsed)
item_size = len(json.dumps(
item, cls=DatetimeSerializer).encode())
if item_size > MAX_MSG_SIZE:
self.log.error(
'Item exceeds 32kb limit, dropping. (%s)', str(item))
continue
items.append(item)
total_size += item_size
if total_size >= BATCH_SIZE_LIMIT:
self.log.debug(
'hit batch size limit (size: %d)', total_size)
break
except Empty:
break
return items
def request(self, batch):
"""Attempt to upload the batch and retry before raising an error """
def fatal_exception(exc):
if isinstance(exc, APIError):
# retry on server errors and client errors
# with 429 status code (rate limited),
# don't retry on other client errors
return (400 <= exc.status < 500) and exc.status != 429
else:
# retry on all other errors (eg. network)
return False
@backoff.on_exception(
backoff.expo,
Exception,
max_tries=self.retries + 1,
giveup=fatal_exception)
def send_request():
post(self.write_key, self.host, gzip=self.gzip,
timeout=self.timeout, batch=batch, proxies=self.proxies)
send_request()
|