-
Notifications
You must be signed in to change notification settings - Fork 59
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feat: Threaded MutationsBatcher #722
Conversation
google/cloud/bigtable/batcher.py
Outdated
|
||
mutation_count = len(item._get_mutations()) | ||
|
||
if mutation_count > MAX_MUTATIONS: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's the Max_mutation constraint here? Is it for checking the concurrent requests that's sending to bigtable? I don't think throwing an error is the correct behavior here. Instead we should block on adding more elements to the batcher.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is the max number of mutations for a single row. (100,000) In this case I think it should raise an error since we shouldn't split the mutations for one row?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Blocking additional elements to the batcher is already done by the use of Python Queue
itself. If we're trying to queue more than 100 elements, it will be blocked and won't be added until items have been popped from the queue.
b99695f
to
3e627ce
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It seems like we don't really have any flow control here. Do you plan on adding it in a follow up PR?
google/cloud/bigtable/batcher.py
Outdated
FLUSH_COUNT = 1000 | ||
FLUSH_COUNT = 100 | ||
MAX_MUTATIONS = 100000 | ||
MAX_ROW_BYTES = 5242880 # 5MB | ||
MAX_ROW_BYTES = 20 * 1024 * 1024 # 20MB | ||
MAX_MUTATIONS_SIZE = 100 * 1024 * 1024 # 100MB |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please add comments what these constants control
google/cloud/bigtable/batcher.py
Outdated
self.total_size = 0 | ||
self.max_row_bytes = max_row_bytes | ||
|
||
def get(self, block=True, timeout=None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
when would block = False?
google/cloud/bigtable/batcher.py
Outdated
exc = future.exception() | ||
if exc: | ||
raise exc |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will this throw the original exception or will it wrap it?
If its the original exception, then the stacktrace will be rooted in the executor. If its wrapped then you are leaking implementation details.
In java we kept the original exception but added a synthetic stacktrace of the caller to help callers diagnose where they called the failed RPC.
HBase does something even hackier and modifies the stacktrace and inserts the callers stack at the top.
I dont know which approach idiomatic to python, but we should be intentional here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will re-raise the original exception. I will see how to do that in Python.
Can you share link on how it was done in Java?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case it the exception will have the stack trace all the way to the original spot where it was raised (line 303 below). The exception itself will have a list of individual error codes so user can still go through it to find out what failed.
google/cloud/bigtable/batcher.py
Outdated
responses.append(result) | ||
|
||
if has_error: | ||
raise MutationsBatchError(status_codes=response) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I noticed Status codes can be converted into exceptions using google.api_core.exceptions.from_grpc_status. Maybe it would be better to raise those directly, to give the full context, rather than using the raw status codes?
google/cloud/bigtable/batcher.py
Outdated
|
||
FLUSH_COUNT = 1000 | ||
|
||
# Max number of items in the queue. Queue will be flushed if this number is reached |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think these variable names are a bit confusing. Maybe we can refactor them to be:
batch_element_count # After this many elements are accumulated, they will be wrapped up in a batch and sent.
batch_byte_size # After this many bytes are accumulated, they will be wrapped up in a batch and sent.
max_outstanding_elements # After these many elements are sent, block until the previous batch is processed
max_outstanding_bytes # After these many bytes are sent, block until the previous batch is processed.
what do you think?
google/cloud/bigtable/batcher.py
Outdated
if ( | ||
self.total_mutation_count >= MAX_MUTATIONS | ||
or self.total_size >= self.max_row_bytes | ||
or self._queue.full() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to check if the queue is full? In the batcher code, we flush when the Batcher is full:
if self._rows.full():
self.flush_async()
I'm not sure if the behavior is still correct?
I think full() should only check for 2 things:
- Number of elements reached the batch element threshold
- Number of bytes reached the batch bytes threshold
The queue is only be used to block user from adding more elements when it's full. We don't trigger another flush when the queue is full.
9eb4dff
to
69d864d
Compare
google/cloud/bigtable/batcher.py
Outdated
mutations_count = 0 | ||
mutations_size = 0 | ||
rows_count = 0 | ||
batch_info = BatchInfo() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm a bit confused by BatchInfo
. Doesn't it duplicate the row_count
, mutations_count
and mutations_size
variables?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's similar info, but for different "buckets".
There are two "queues":
self._rows
: for storing the rows we want to mutate
batch_info
: for storing info about the rows that are being mutated, and we're waiting for the result/response from backend. This gets passed to the the batch_completed_callback
. That's where we can release the flow control.
google/cloud/bigtable/batcher.py
Outdated
self.flow_control.release(processed_rows) | ||
del self.futures_mapping[future] | ||
|
||
def flush_rows(self, rows_to_flush=None): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Having both flush()
and flush_rows()
seems a little confusing. Especially since this doesn't seem to be "flushing" from the cache in any way. I could see people calling this with no arguments thinking it is the main flush function
Maybe this should be called mutate_rows
? Or just made into an internal helper function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree this is confusing. There is already a public mutate_rows
from before, and it has different behavior than this one. I'm changing it to private since user isn't expected to call it manually.
google/cloud/bigtable/batcher.py
Outdated
|
||
MAX_ROW_BYTES = 20 * 1024 * 1024 # 20MB # after this many bytes, send out the batch | ||
|
||
MAX_MUTATIONS_SIZE = 100 * 1024 * 1024 # 100MB # max inflight byte size. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe rename this variable to indicate it's for flow control.
MAX_MUTATIONS_SIZE = 100 * 1024 * 1024 # 100MB # max inflight byte size. | |
MAX_OUTSTANDING_BYTES = 100 * 1024 * 1024 # 100MB # max inflight byte size. |
google/cloud/bigtable/batcher.py
Outdated
|
||
|
||
class FlowControl(object): | ||
def __init__(self, max_mutations=MAX_MUTATIONS_SIZE, max_row_bytes=MAX_ROW_BYTES): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The defaults should be:
def __init__(self, max_mutations=MAX_MUTATIONS_SIZE, max_row_bytes=MAX_ROW_BYTES): | |
def __init__(self, max_mutations=MAX_OUTSTANDING_ELEMNTS, max_row_bytes=MAX_MUTATION_SIZE): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And these numbers are:
MAX_OUTSTANDING_ELEMENTS = 100000
MAX_MUTATION_SIZE = 20 MB
correct?
google/cloud/bigtable/batcher.py
Outdated
|
||
self.inflight_mutations += batch_info.mutations_count | ||
self.inflight_size += batch_info.mutations_size | ||
self.inflight_rows_count += batch_info.rows_count |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we care about this, it's also not used in is_blocked
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was using this for debugging. I will remove it in the end before merging.
I've made adjustments based on previews reviews and feedback. Please take another look. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM with some nits!
1a470c1
to
edb7d5c
Compare
Co-authored-by: Mattie Fu <[email protected]>
- Remove unneeded error - Make some functions internal
Co-authored-by: Mattie Fu <[email protected]>
Co-authored-by: Mattie Fu <[email protected]>
- Remove debugging variable - Update variable names
edb7d5c
to
9899e45
Compare
google/cloud/bigtable/batcher.py
Outdated
mutations_size: int = 0 | ||
|
||
|
||
class FlowControl(object): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be prefixed with an underscore since this is an internal class?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, I will add that in the next commit.
google/cloud/bigtable/batcher.py
Outdated
self.inflight_mutations += batch_info.mutations_count | ||
self.inflight_size += batch_info.mutations_size | ||
self.set_flow_control_status() | ||
self.wait() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
won't this cause a deadlock with a large row? if max_inflight_bytes is 2 and the row size is 4, this will just get stuck?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I adjusted the logic, moving the wait
to the flush_async
function. If the batch is causing the event to be blocked, then it will be sent through, but the subsequent thread will be blocked and waited.
Allow the batch to go through.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, but please have Daniel take a look at the api and confirm it works for him in the async client
Some of the API will need to change for the async client, due to different model classes and asyncio patterns, but the general shape of the solution should be mostly consistent. LGTM |
Reverts #722 This PR caused beam bigtableio.py failures https://togithub.com/apache/beam/issues/26673 and is blocking beam release. We're unclear why it caused the failure. So will revert this change, cut another release so we can unblock beam and investigate separately.
Thank you for opening a Pull Request! Before submitting your PR, there are a few things you can do to make sure it goes smoothly:
Fixes #<issue_number_goes_here> 🦕