Commit 72dcd04f authored by Matthias Pigulla's avatar Matthias Pigulla

Fix that MysqliStatement cannot handle streams

The blob type maps BLOB (and also TEXT) columns to PHP streams.
Internally, they use the ParameterType::LARGE_OBJECT (i. e. \PDO::PARAM_LOB)
binding type, which suggests that efficient handling of PHP stream resources
was intended.

However, at least when using the mysqli driver, stream resources passed into
insert() or update() are simply cast to strings. As a result, a literal
string like "Resource id #126" will end up in the database.

This PR fixes the issue by correctly processing streams in the
MysqliStatement when they are passed with the ParameterType::LARGE_OBJECT
binding type. It uses the mysqli::send_long_data() method to pass stream
data in chunks to the MySQL server, thus keeping the memory footprint low.

This method does not (despite claims to the contrary) allow to bypass the
max_allowed_package size!

The pdo_mysql driver was already capable of handling streams this way.
Now this is covered by tests.

Helpful documentation:

- http://php.net/manual/en/mysqli-stmt.send-long-data.php
- http://php.net/manual/en/mysqli-stmt.bind-param.php - see first "Note"
- http://php.net/manual/en/pdo.lobs.php
- https://blogs.oracle.com/oswald/phps-mysqli-extension:-storing-and-retrieving-blobs

Additional notes on MySQL's max_allowed_packet:

This change does not not intend to work around the max_allowed_packet setting,
and quick tests show that this is not possible: When MySQL is configured to use
a low max_allowed_packet value, an error will be triggered stating

  Parameter of prepared statement which is set through
  mysql_send_long_data() is longer than 'max_allowed_packet' bytes.

Documentation for the underlying mysql_stmt_send_long_data() C API function
suggests that max_allowed_packet is always a hard limit.

References:

- https://dev.mysql.com/doc/refman/8.0/en/mysql-stmt-send-long-data.html
- https://dev.mysql.com/doc/refman/8.0/en/server-system-variables.html#sysvar_max_allowed_packet
- https://bugs.mysql.com/bug.php?id=83958

What mysqli::send_long_data() seems to do is that every data chunk of data
passed to it is immediately sent out to the network. I have confirmed this
using tcpdump, and so the advantage might be that we can keep the memory
footprint low on the PHP side while processing streams.
parent e9a62962
......@@ -21,11 +21,16 @@ namespace Doctrine\DBAL\Driver\Mysqli;
use Doctrine\DBAL\Driver\Statement;
use Doctrine\DBAL\Driver\StatementIterator;
use Doctrine\DBAL\Exception\InvalidArgumentException;
use Doctrine\DBAL\FetchMode;
use Doctrine\DBAL\ParameterType;
use function array_combine;
use function array_fill;
use function count;
use function feof;
use function fread;
use function get_resource_type;
use function is_resource;
use function str_repeat;
/**
......@@ -42,7 +47,7 @@ class MysqliStatement implements \IteratorAggregate, Statement
ParameterType::BOOLEAN => 'i',
ParameterType::NULL => 's',
ParameterType::INTEGER => 'i',
ParameterType::LARGE_OBJECT => 's',
ParameterType::LARGE_OBJECT => 'b',
];
/**
......@@ -169,9 +174,11 @@ class MysqliStatement implements \IteratorAggregate, Statement
throw new MysqliException($this->_stmt->error, $this->_stmt->errno);
}
} else {
if (! $this->_stmt->bind_param($this->types, ...$this->_bindedValues)) {
list($types, $values, $streams) = $this->separateBoundValues();
if (! $this->_stmt->bind_param($types, ...$values)) {
throw new MysqliException($this->_stmt->error, $this->_stmt->sqlstate, $this->_stmt->errno);
}
$this->sendLongData($streams);
}
}
......@@ -228,6 +235,63 @@ class MysqliStatement implements \IteratorAggregate, Statement
return true;
}
/**
* Split $this->_bindedValues into those values that need to be sent using mysqli::send_long_data()
* and those that can be bound the usual way.
*
* @return array<int, array<int|string, mixed>|string>
*/
private function separateBoundValues()
{
$streams = $values = [];
$types = $this->types;
foreach ($this->_bindedValues as $parameter => $value) {
if (! isset($types[$parameter - 1])) {
$types[$parameter - 1] = static::$_paramTypeMap[ParameterType::STRING];
}
if ($types[$parameter - 1] === static::$_paramTypeMap[ParameterType::LARGE_OBJECT]) {
if (is_resource($value)) {
if (get_resource_type($value) !== 'stream') {
throw new InvalidArgumentException('Resources passed with the LARGE_OBJECT parameter type must be stream resources.');
}
$streams[$parameter] = $value;
$values[$parameter] = null;
continue;
} else {
$types[$parameter - 1] = static::$_paramTypeMap[ParameterType::STRING];
}
}
$values[$parameter] = $value;
}
return [$types, $values, $streams];
}
/**
* Handle $this->_longData after regular query parameters have been bound
*
* @throws MysqliException
*/
private function sendLongData($streams)
{
foreach ($streams as $paramNr => $stream) {
while (! feof($stream)) {
$chunk = fread($stream, 8192);
if ($chunk === false) {
throw new MysqliException("Failed reading the stream resource for parameter offset ${paramNr}.");
}
if (! $this->_stmt->send_long_data($paramNr - 1, $chunk)) {
throw new MysqliException($this->_stmt->error, $this->_stmt->sqlstate, $this->_stmt->errno);
}
}
}
}
/**
* Binds a array of values to bound parameters.
*
......
......@@ -3,11 +3,13 @@
namespace Doctrine\Tests\DBAL\Functional;
use Doctrine\DBAL\Driver\PDOSqlsrv\Driver as PDOSQLSrvDriver;
use Doctrine\DBAL\FetchMode;
use Doctrine\DBAL\ParameterType;
use Doctrine\DBAL\Schema\Table;
use Doctrine\DBAL\Types\Type;
use const CASE_LOWER;
use function array_change_key_case;
use function fopen;
use function in_array;
use function str_repeat;
use function stream_get_contents;
/**
......@@ -49,6 +51,28 @@ class BlobTest extends \Doctrine\Tests\DbalFunctionalTestCase
self::assertEquals(1, $ret);
}
public function testInsertProcessesStream()
{
if (in_array($this->_conn->getDatabasePlatform()->getName(), ['oracle', 'db2'], true)) {
// https://github.com/doctrine/dbal/issues/3288 for DB2
// https://github.com/doctrine/dbal/issues/3290 for Oracle
$this->markTestIncomplete('Platform does not support stream resources as parameters');
}
$longBlob = str_repeat('x', 4 * 8192); // send 4 chunks
$this->_conn->insert('blob_table', [
'id' => 1,
'clobfield' => 'ignored',
'blobfield' => fopen('data://text/plain,' . $longBlob, 'r'),
], [
ParameterType::INTEGER,
ParameterType::STRING,
ParameterType::LARGE_OBJECT,
]);
$this->assertBlobContains($longBlob);
}
public function testSelect()
{
$this->_conn->insert('blob_table', [
......@@ -86,14 +110,63 @@ class BlobTest extends \Doctrine\Tests\DbalFunctionalTestCase
$this->assertBlobContains('test2');
}
public function testUpdateProcessesStream()
{
if (in_array($this->_conn->getDatabasePlatform()->getName(), ['oracle', 'db2'], true)) {
// https://github.com/doctrine/dbal/issues/3288 for DB2
// https://github.com/doctrine/dbal/issues/3290 for Oracle
$this->markTestIncomplete('Platform does not support stream resources as parameters');
}
$this->_conn->insert('blob_table', [
'id' => 1,
'clobfield' => 'ignored',
'blobfield' => 'test',
], [
ParameterType::INTEGER,
ParameterType::STRING,
ParameterType::LARGE_OBJECT,
]);
$this->_conn->update('blob_table', [
'id' => 1,
'blobfield' => fopen('data://text/plain,test2', 'r'),
], ['id' => 1], [
ParameterType::INTEGER,
ParameterType::LARGE_OBJECT,
]);
$this->assertBlobContains('test2');
}
public function testBindParamProcessesStream()
{
if (in_array($this->_conn->getDatabasePlatform()->getName(), ['oracle', 'db2'], true)) {
// https://github.com/doctrine/dbal/issues/3288 for DB2
// https://github.com/doctrine/dbal/issues/3290 for Oracle
$this->markTestIncomplete('Platform does not support stream resources as parameters');
}
$stmt = $this->_conn->prepare("INSERT INTO blob_table(id, clobfield, blobfield) VALUES (1, 'ignored', ?)");
$stream = null;
$stmt->bindParam(1, $stream, ParameterType::LARGE_OBJECT);
// Bind param does late binding (bind by reference), so create the stream only now:
$stream = fopen('data://text/plain,test', 'r');
$stmt->execute();
$this->assertBlobContains('test');
}
private function assertBlobContains($text)
{
$rows = $this->_conn->fetchAll('SELECT * FROM blob_table');
$rows = $this->_conn->query('SELECT blobfield FROM blob_table')->fetchAll(FetchMode::COLUMN);
self::assertCount(1, $rows);
$row = array_change_key_case($rows[0], CASE_LOWER);
$blobValue = Type::getType('blob')->convertToPHPValue($row['blobfield'], $this->_conn->getDatabasePlatform());
$blobValue = Type::getType('blob')->convertToPHPValue($rows[0], $this->_conn->getDatabasePlatform());
self::assertInternalType('resource', $blobValue);
self::assertEquals($text, stream_get_contents($blobValue));
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment