Remove support for sharding and the Azure platform

parent 42dfc79a
# Azure Federations
Implementing Federations inside a new Doctrine Sharding Extension. Some extensions to the DBAL and ORM core have to be done to get this working.
1. DBAL (Database Abstraction Layer)
* Add support for Database Schema Operations
* CREATE FEDERATION
* CREATE TABLE ... FEDERATED ON
* Add support to create a multi-tenant schema from any given schema
* Add API to pick a shard based on distribution key and atomic value
* Add API to ask about federations, federation members and so on.
* Add Sharding Abstraction
* If a shard is picked via distribution key and atomic value fire queries against this only
* Or query the global database.
2. ORM (Object-Relational Mapper)
* Federation Key has to be part of the clustered index of the table
* Test with a pure Multi-Tenant App with Filtering = ON (TaskList)
* Test with sharded app (Weather)
## Implementation Details
SQL Azure requires one and exactly one clustered index. It makes no difference if the primary key
or any other key is the clustered index. Sharding requires an external ID generation (no auto-increment)
such as GUIDs. GUIDs have negative properties with regard to clustered index performance, so that
typically you would add a "created" timestamp for example that holds the clustered index instead
of making the GUID a clustered index.
## Example API:
@@@ php
<?php
use Doctrine\DBAL\DriverManager;
$dbParams = array(
'dbname' => 'tcp:dbname.database.windows.net',
'sharding' => array(
'federationName' => 'Orders_Federation',
'distributionKey' => 'CustID',
'distributionType' => 'integer',
'filteringEnabled' => false,
),
// ...
);
$conn = DriverManager::getConnection($dbParams);
$shardManager = $conn->getShardManager();
// Example 1: query against root database
$sql = "SELECT * FROM Products";
$rows = $conn->executeQuery($sql);
// Example 2: query against the selected shard with CustomerId = 100
$aCustomerID = 100;
$shardManager->selectShard($aCustomerID); // Using Default federationName and distributionKey
// Query: "USE FEDERATION Orders_Federation (CustID = $aCustomerID) WITH RESET, FILTERING OFF;"
$sql = "SELECT * FROM Customers";
$rows = $conn->executeQuery($sql);
// Example 3: Reset API to root database again
$shardManager->selectGlobal();
## ID Generation
With sharding all the ids have to be generated for global uniqueness. There are three strategies for this.
1. Use GUIDs as described here http://blogs.msdn.com/b/cbiyikoglu/archive/2011/06/20/id-generation-in-federations-identity-sequences-and-guids-uniqueidentifier.aspx
2. Having a central table that is accessed with a second connection to generate sequential ids
3. Using natural keys from the domain.
The second approach has the benefit of having numerical primary keys, however also a central failure location. The third strategy can seldom be used, because the domains don't allow this. Identity columns cannot be used at all.
@@@ php
<?php
use Doctrine\DBAL\DriverManager;
use Doctrine\DBAL\Id\TableHiLoIdGenerator;
$dbParams = array(
'dbname' => 'dbname.database.windows.net',
// ...
);
$conn = DriverManager::getConnection($dbParams);
$idGenerator = new TableHiLoIdGenerator($conn, 'id_table_name', $multiplicator = 1);
// only once, create this table
$idGenerator->createTable();
$nextId = $idGenerator->generateId('for_table_name');
$nextOtherId = $idGenerator->generateId('for_other_table');
The connection for the table generator has to be a different one than the one used for the main app to avoid transaction clashes.
# Doctrine Shards
Doctrine Extension to support horizontal sharding in the Doctrine ORM.
## Idea
Implement sharding inside Doctrine at a level that is as unobtrusive to the developer as possible.
Problems to tackle:
1. Where to send INSERT statements?
2. How to generate primary keys?
3. How to pick shards for update, delete statements?
4. How to pick shards for select operations?
5. How to merge select queries that span multiple shards?
6. How to handle/prevent multi-shard queries that cannot be merged (GROUP BY)?
7. How to handle non-sharded data? (static metadata tables for example)
8. How to handle multiple connections?
9. Implementation on the DBAL or ORM level?
## Roadmap
Version 1: DBAL 2.3 (Multi-Tenant Apps)
1. ID Generation support (in DBAL + ORM done)
2. Multi-Tenant Support: Either pick a global metadata database or exactly one shard.
3. Fan-out queries over all shards (or a subset) by result appending
Version 2: ORM related (complex):
4. ID resolving (Pick shard for a new ID)
5. Query resolving (Pick shards a query should send to)
6. Shard resolving (Pick shards an ID could be on)
7. Transactions
8. Read Only objects
## Technical Requirements for Database Schemas
Sharded tables require the sharding-distribution key as one of their columns. This will affect your code compared to a normalized db-schema. If you have a Blog <-> BlogPost <-> PostComments entity setup sharded by `blog_id` then even the PostComment table needs this column, even if an "unsharded", normalized DB-Schema does not need this information.
## Implementation Details
Assumptions:
* For querying you either want to query ALL or just exactly one shard.
* IDs for ALL sharded tables have to be unique across all shards.
* Non-sharded data is replicated between all shards. They redundantly keep the information available. This is necessary so join queries on shards to reference data work.
* If you retrieve an object A from a shard, then all references and collections of this object reside on the same shard.
* The database schema on all shards is the same (or compatible)
### SQL Azure Federations
SQL Azure is a special case, points 1, 2, 3, 4, 7 and 8 are partly handled on the database level. This makes it a perfect test-implementation for just the subset of features in points 5-6. However there needs to be a way to configure SchemaTool to generate the correct Schema on SQL Azure.
* SELECT Operations: The most simple assumption is to always query all shards unless the user specifies otherwise explicitly.
* Queries can be merged in PHP code, this obviously does not work for DISTINCT, GROUP BY and ORDER BY queries.
### Generic Sharding
More features are necessary to implement sharding on the PHP level, independent from database support:
1. Configuration of multiple connections, one connection = one shard.
2. Primary Key Generation mechanisms (UUID, central table, sequence emulation)
## Primary Use-Cases
1. Multi-Tenant Applications
These are easier to support as you have some value to determine the shard id for the whole request very early on.
Here also queries can always be limited to a single shard.
2. Scale-Out by some attribute (Round-Robin?)
This strategy requires access to multiple shards in a single request based on the data accessed.
This diff is collapsed.
This diff is collapsed.
......@@ -13,8 +13,6 @@
reference/schema-representation
reference/events
reference/security
reference/sharding
reference/sharding_azure_tutorial
reference/supporting-other-databases
reference/portability
reference/caching
......
# Sharding with SQLAzure Example
This example demonstrates Sharding with SQL Azure Federations.
## Requirements
1. Windows Azure Account
2. SQL Azure Database
3. Composer for dependencies
## Install
composer install
Change "examples/sharding/bootstrap.php" to contain Database connection.
## Order to execute Scripts
1. create_schema.php
2. view_federation_members.php
3. insert_data.php
4. split_federation.php
5. insert_data_after_split.php
6. query_filtering_off.php
7. query_filtering_on.php
<?php
// bootstrap.php
use Doctrine\DBAL\DriverManager;
use Doctrine\Shards\DBAL\SQLAzure\SQLAzureShardManager;
require_once "vendor/autoload.php";
$config = array(
'dbname' => 'SalesDB',
'host' => 'tcp:dbname.windows.net',
'user' => 'user@dbname',
'password' => 'XXX',
'sharding' => array(
'federationName' => 'Orders_Federation',
'distributionKey' => 'CustId',
'distributionType' => 'integer',
)
);
if ($config['host'] == "tcp:dbname.windows.net") {
die("You have to change the configuration to your Azure account.\n");
}
$conn = DriverManager::getConnection($config);
$shardManager = new SQLAzureShardManager($conn);
{
"require": {
"doctrine/dbal": "*",
"doctrine/shards": "0.3"
}
}
<?php
// create_schema.php
use Doctrine\DBAL\Schema\Schema;
use Doctrine\Shards\DBAL\SQLAzure\SQLAzureSchemaSynchronizer;
require_once 'bootstrap.php';
$schema = new Schema();
$products = $schema->createTable('Products');
$products->addColumn('ProductID', 'integer');
$products->addColumn('SupplierID', 'integer');
$products->addColumn('ProductName', 'string');
$products->addColumn('Price', 'decimal', array('scale' => 2, 'precision' => 12));
$products->setPrimaryKey(array('ProductID'));
$products->addOption('azure.federated', true);
$customers = $schema->createTable('Customers');
$customers->addColumn('CustomerID', 'integer');
$customers->addColumn('CompanyName', 'string');
$customers->addColumn('FirstName', 'string');
$customers->addColumn('LastName', 'string');
$customers->setPrimaryKey(array('CustomerID'));
$customers->addOption('azure.federated', true);
$customers->addOption('azure.federatedOnColumnName', 'CustomerID');
$orders = $schema->createTable('Orders');
$orders->addColumn('CustomerID', 'integer');
$orders->addColumn('OrderID', 'integer');
$orders->addColumn('OrderDate', 'datetime');
$orders->setPrimaryKey(array('CustomerID', 'OrderID'));
$orders->addOption('azure.federated', true);
$orders->addOption('azure.federatedOnColumnName', 'CustomerID');
$orderItems = $schema->createTable('OrderItems');
$orderItems->addColumn('CustomerID', 'integer');
$orderItems->addColumn('OrderID', 'integer');
$orderItems->addColumn('ProductID', 'integer');
$orderItems->addColumn('Quantity', 'integer');
$orderItems->setPrimaryKey(array('CustomerID', 'OrderID', 'ProductID'));
$orderItems->addOption('azure.federated', true);
$orderItems->addOption('azure.federatedOnColumnName', 'CustomerID');
// Create the Schema + Federation:
$synchronizer = new SQLAzureSchemaSynchronizer($conn, $shardManager);
// Or just look at the SQL:
echo implode("\n", $synchronizer->getCreateSchema($schema));
$synchronizer->createSchema($schema);
<?php
// insert_data.php
require_once "bootstrap.php";
$shardManager->selectShard(0);
$conn->insert("Products", array(
"ProductID" => 386,
"SupplierID" => 1001,
"ProductName" => 'Titanium Extension Bracket Left Hand',
"Price" => 5.25,
));
$conn->insert("Products", array(
"ProductID" => 387,
"SupplierID" => 1001,
"ProductName" => 'Titanium Extension Bracket Right Hand',
"Price" => 5.25,
));
$conn->insert("Products", array(
"ProductID" => 388,
"SupplierID" => 1001,
"ProductName" => 'Fusion Generator Module 5 kV',
"Price" => 10.50,
));
$conn->insert("Products", array(
"ProductID" => 389,
"SupplierID" => 1001,
"ProductName" => 'Bypass Filter 400 MHz Low Pass',
"Price" => 10.50,
));
$conn->insert("Customers", array(
'CustomerID' => 10,
'CompanyName' => 'Van Nuys',
'FirstName' => 'Catherine',
'LastName' => 'Abel',
));
$conn->insert("Customers", array(
'CustomerID' => 20,
'CompanyName' => 'Abercrombie',
'FirstName' => 'Kim',
'LastName' => 'Branch',
));
$conn->insert("Customers", array(
'CustomerID' => 30,
'CompanyName' => 'Contoso',
'FirstName' => 'Frances',
'LastName' => 'Adams',
));
$conn->insert("Customers", array(
'CustomerID' => 40,
'CompanyName' => 'A. Datum Corporation',
'FirstName' => 'Mark',
'LastName' => 'Harrington',
));
$conn->insert("Customers", array(
'CustomerID' => 50,
'CompanyName' => 'Adventure Works',
'FirstName' => 'Keith',
'LastName' => 'Harris',
));
$conn->insert("Customers", array(
'CustomerID' => 60,
'CompanyName' => 'Alpine Ski House',
'FirstName' => 'Wilson',
'LastName' => 'Pais',
));
$conn->insert("Customers", array(
'CustomerID' => 70,
'CompanyName' => 'Baldwin Museum of Science',
'FirstName' => 'Roger',
'LastName' => 'Harui',
));
$conn->insert("Customers", array(
'CustomerID' => 80,
'CompanyName' => 'Blue Yonder Airlines',
'FirstName' => 'Pilar',
'LastName' => 'Pinilla',
));
$conn->insert("Customers", array(
'CustomerID' => 90,
'CompanyName' => 'City Power & Light',
'FirstName' => 'Kari',
'LastName' => 'Hensien',
));
$conn->insert("Customers", array(
'CustomerID' => 100,
'CompanyName' => 'Coho Winery',
'FirstName' => 'Peter',
'LastName' => 'Brehm',
));
$conn->executeUpdate("
DECLARE @orderId INT
DECLARE @customerId INT
SET @orderId = 10
SELECT @customerId = CustomerId FROM Customers WHERE LastName = 'Hensien' and FirstName = 'Kari'
INSERT INTO Orders (CustomerId, OrderId, OrderDate)
VALUES (@customerId, @orderId, GetDate())
INSERT INTO OrderItems (CustomerID, OrderID, ProductID, Quantity)
VALUES (@customerId, @orderId, 388, 4)
SET @orderId = 20
SELECT @customerId = CustomerId FROM Customers WHERE LastName = 'Harui' and FirstName = 'Roger'
INSERT INTO Orders (CustomerId, OrderId, OrderDate)
VALUES (@customerId, @orderId, GetDate())
INSERT INTO OrderItems (CustomerID, OrderID, ProductID, Quantity)
VALUES (@customerId, @orderId, 389, 2)
SET @orderId = 30
SELECT @customerId = CustomerId FROM Customers WHERE LastName = 'Brehm' and FirstName = 'Peter'
INSERT INTO Orders (CustomerId, OrderId, OrderDate)
VALUES (@customerId, @orderId, GetDate())
INSERT INTO OrderItems (CustomerID, OrderID, ProductID, Quantity)
VALUES (@customerId, @orderId, 387, 3)
SET @orderId = 40
SELECT @customerId = CustomerId FROM Customers WHERE LastName = 'Pais' and FirstName = 'Wilson'
INSERT INTO Orders (CustomerId, OrderId, OrderDate)
VALUES (@customerId, @orderId, GetDate())
INSERT INTO OrderItems (CustomerID, OrderID, ProductID, Quantity)
VALUES (@customerId, @orderId, 388, 1)");
<?php
// insert_data_aftersplit.php
require_once 'bootstrap.php';
$newCustomerId = 55;
$shardManager->selectShard($newCustomerId);
$conn->insert("Customers", array(
"CustomerID" => $newCustomerId,
"CompanyName" => "Microsoft",
"FirstName" => "Brian",
"LastName" => "Swan",
));
$conn->insert("Orders", array(
"CustomerID" => 55,
"OrderID" => 37,
"OrderDate" => date('Y-m-d H:i:s'),
));
$conn->insert("OrderItems", array(
"CustomerID" => 55,
"OrderID" => 37,
"ProductID" => 387,
"Quantity" => 1,
));
<?php
// query_filtering_off.php
require_once "bootstrap.php";
$shardManager->selectShard(0);
$data = $conn->fetchAll('SELECT * FROM Customers');
print_r($data);
<?php
// query_filtering_on.php
require_once "bootstrap.php";
$shardManager->setFilteringEnabled(true);
$shardManager->selectShard(55);
$data = $conn->fetchAll('SELECT * FROM Customers');
print_r($data);
<?php
// split_federation.php
require_once 'bootstrap.php';
$shardManager->splitFederation(60);
<?php
// view_federation_members.php
require_once "bootstrap.php";
$shards = $shardManager->getShards();
foreach ($shards as $shard) {
print_r($shard);
}
......@@ -144,17 +144,6 @@ final class DriverManager
}
}
// URL support for PoolingShardConnection
if (isset($params['global'])) {
$params['global'] = self::parseDatabaseUrl($params['global']);
}
if (isset($params['shards'])) {
foreach ($params['shards'] as $key => $shardParams) {
$params['shards'][$key] = self::parseDatabaseUrl($shardParams);
}
}
self::_checkParams($params);
$className = $params['driverClass'] ?? self::$_driverMap[$params['driver']];
......
<?php
namespace Doctrine\DBAL\Platforms;
use Doctrine\DBAL\Schema\Table;
/**
* Platform to ensure compatibility of Doctrine with SQL Azure
*
* On top of SQL Server 2008 the following functionality is added:
*
* - Create tables with the FEDERATED ON syntax.
*
* @deprecated
*/
class SQLAzurePlatform extends SQLServer2008Platform
{
/**
* {@inheritDoc}
*/
public function getCreateTableSQL(Table $table, $createFlags = self::CREATE_INDEXES)
{
$sql = parent::getCreateTableSQL($table, $createFlags);
if ($table->hasOption('azure.federatedOnColumnName')) {
$distributionName = $table->getOption('azure.federatedOnDistributionName');
$columnName = $table->getOption('azure.federatedOnColumnName');
$stmt = ' FEDERATED ON (' . $distributionName . ' = ' . $columnName . ')';
$sql[0] .= $stmt;
}
return $sql;
}
}
<?php
namespace Doctrine\DBAL\Sharding;
use Doctrine\Common\EventManager;
use Doctrine\DBAL\Configuration;
use Doctrine\DBAL\Connection;
use Doctrine\DBAL\Driver;
use Doctrine\DBAL\Driver\Connection as DriverConnection;
use Doctrine\DBAL\Event\ConnectionEventArgs;
use Doctrine\DBAL\Events;
use Doctrine\DBAL\Sharding\ShardChoser\ShardChoser;
use InvalidArgumentException;
use function array_merge;
use function is_numeric;
use function is_string;
/**
* Sharding implementation that pools many different connections
* internally and serves data from the currently active connection.
*
* The internals of this class are:
*
* - All sharding clients are specified and given a shard-id during
* configuration.
* - By default, the global shard is selected. If no global shard is configured
* an exception is thrown on access.
* - Selecting a shard by distribution value delegates the mapping
* "distributionValue" => "client" to the ShardChoser interface.
* - An exception is thrown if trying to switch shards during an open
* transaction.
*
* Instantiation through the DriverManager looks like:
*
* @deprecated
*
* @example
*
* $conn = DriverManager::getConnection(array(
* 'wrapperClass' => 'Doctrine\DBAL\Sharding\PoolingShardConnection',
* 'driver' => 'pdo_mysql',
* 'global' => array('user' => '', 'password' => '', 'host' => '', 'dbname' => ''),
* 'shards' => array(
* array('id' => 1, 'user' => 'slave1', 'password', 'host' => '', 'dbname' => ''),
* array('id' => 2, 'user' => 'slave2', 'password', 'host' => '', 'dbname' => ''),
* ),
* 'shardChoser' => 'Doctrine\DBAL\Sharding\ShardChoser\MultiTenantShardChoser',
* ));
* $shardManager = $conn->getShardManager();
* $shardManager->selectGlobal();
* $shardManager->selectShard($value);
*/
class PoolingShardConnection extends Connection
{
/** @var DriverConnection[] */
private $activeConnections = [];
/** @var string|int|null */
private $activeShardId;
/** @var mixed[] */
private $connectionParameters = [];
/**
* {@inheritDoc}
*
* @throws InvalidArgumentException
*/
public function __construct(array $params, Driver $driver, ?Configuration $config = null, ?EventManager $eventManager = null)
{
if (! isset($params['global'], $params['shards'])) {
throw new InvalidArgumentException("Connection Parameters require 'global' and 'shards' configurations.");
}
if (! isset($params['shardChoser'])) {
throw new InvalidArgumentException("Missing Shard Choser configuration 'shardChoser'");
}
if (is_string($params['shardChoser'])) {
$params['shardChoser'] = new $params['shardChoser']();
}
if (! ($params['shardChoser'] instanceof ShardChoser)) {
throw new InvalidArgumentException("The 'shardChoser' configuration is not a valid instance of Doctrine\DBAL\Sharding\ShardChoser\ShardChoser");
}
$this->connectionParameters[0] = array_merge($params, $params['global']);
foreach ($params['shards'] as $shard) {
if (! isset($shard['id'])) {
throw new InvalidArgumentException("Missing 'id' for one configured shard. Please specify a unique shard-id.");
}
if (! is_numeric($shard['id']) || $shard['id'] < 1) {
throw new InvalidArgumentException('Shard Id has to be a non-negative number.');
}
if (isset($this->connectionParameters[$shard['id']])) {
throw new InvalidArgumentException('Shard ' . $shard['id'] . ' is duplicated in the configuration.');
}
$this->connectionParameters[$shard['id']] = array_merge($params, $shard);
}
parent::__construct($params, $driver, $config, $eventManager);
}
/**
* Get active shard id.
*
* @return string|int|null
*/
public function getActiveShardId()
{
return $this->activeShardId;
}
/**
* {@inheritdoc}
*/
public function getParams()
{
return $this->activeShardId ? $this->connectionParameters[$this->activeShardId] : $this->connectionParameters[0];
}
/**
* {@inheritdoc}
*/
public function getHost()
{
$params = $this->getParams();
return $params['host'] ?? parent::getHost();
}
/**
* {@inheritdoc}
*/
public function getPort()
{
$params = $this->getParams();
return $params['port'] ?? parent::getPort();
}
/**
* {@inheritdoc}
*/
public function getUsername()
{
$params = $this->getParams();
return $params['user'] ?? parent::getUsername();
}
/**
* {@inheritdoc}
*/
public function getPassword()
{
$params = $this->getParams();
return $params['password'] ?? parent::getPassword();
}
/**
* Connects to a given shard.
*
* @param string|int|null $shardId
*
* @return bool
*
* @throws ShardingException
*/
public function connect($shardId = null)
{
if ($shardId === null && $this->_conn) {
return false;
}
if ($shardId !== null && $shardId === $this->activeShardId) {
return false;
}
if ($this->getTransactionNestingLevel() > 0) {
throw new ShardingException('Cannot switch shard when transaction is active.');
}
$activeShardId = $this->activeShardId = (int) $shardId;
if (isset($this->activeConnections[$activeShardId])) {
$this->_conn = $this->activeConnections[$activeShardId];
return false;
}
$this->_conn = $this->activeConnections[$activeShardId] = $this->connectTo($activeShardId);
if ($this->_eventManager->hasListeners(Events::postConnect)) {
$eventArgs = new ConnectionEventArgs($this);
$this->_eventManager->dispatchEvent(Events::postConnect, $eventArgs);
}
return true;
}
/**
* Connects to a specific connection.
*
* @param string|int $shardId
*
* @return \Doctrine\DBAL\Driver\Connection
*/
protected function connectTo($shardId)
{
$params = $this->getParams();
$driverOptions = $params['driverOptions'] ?? [];
$connectionParams = $this->connectionParameters[$shardId];
$user = $connectionParams['user'] ?? null;
$password = $connectionParams['password'] ?? null;
return $this->_driver->connect($connectionParams, $user, $password, $driverOptions);
}
/**
* @param string|int|null $shardId
*
* @return bool
*/
public function isConnected($shardId = null)
{
if ($shardId === null) {
return $this->_conn !== null;
}
return isset($this->activeConnections[$shardId]);
}
/**
* @return void
*/
public function close()
{
$this->_conn = null;
$this->activeConnections = [];
$this->activeShardId = null;
}
}
<?php
namespace Doctrine\DBAL\Sharding;
use Doctrine\DBAL\Sharding\ShardChoser\ShardChoser;
use RuntimeException;
/**
* Shard Manager for the Connection Pooling Shard Strategy
*
* @deprecated
*/
class PoolingShardManager implements ShardManager
{
/** @var PoolingShardConnection */
private $conn;
/** @var ShardChoser */
private $choser;
/** @var string|null */
private $currentDistributionValue;
public function __construct(PoolingShardConnection $conn)
{
$params = $conn->getParams();
$this->conn = $conn;
$this->choser = $params['shardChoser'];
}
/**
* {@inheritDoc}
*/
public function selectGlobal()
{
$this->conn->connect(0);
$this->currentDistributionValue = null;
}
/**
* {@inheritDoc}
*/
public function selectShard($distributionValue)
{
$shardId = $this->choser->pickShard($distributionValue, $this->conn);
$this->conn->connect($shardId);
$this->currentDistributionValue = $distributionValue;
}
/**
* {@inheritDoc}
*/
public function getCurrentDistributionValue()
{
return $this->currentDistributionValue;
}
/**
* {@inheritDoc}
*/
public function getShards()
{
$params = $this->conn->getParams();
$shards = [];
foreach ($params['shards'] as $shard) {
$shards[] = ['id' => $shard['id']];
}
return $shards;
}
/**
* {@inheritDoc}
*
* @throws RuntimeException
*/
public function queryAll($sql, array $params, array $types)
{
$shards = $this->getShards();
if (! $shards) {
throw new RuntimeException('No shards found.');
}
$result = [];
$oldDistribution = $this->getCurrentDistributionValue();
foreach ($shards as $shard) {
$this->conn->connect($shard['id']);
foreach ($this->conn->fetchAll($sql, $params, $types) as $row) {
$result[] = $row;
}
}
if ($oldDistribution === null) {
$this->selectGlobal();
} else {
$this->selectShard($oldDistribution);
}
return $result;
}
}
<?php
namespace Doctrine\DBAL\Sharding\SQLAzure;
use Closure;
use Doctrine\DBAL\Connection;
use Doctrine\DBAL\Schema\Schema;
use Doctrine\DBAL\Schema\Synchronizer\AbstractSchemaSynchronizer;
use Doctrine\DBAL\Schema\Synchronizer\SchemaSynchronizer;
use Doctrine\DBAL\Schema\Synchronizer\SingleDatabaseSynchronizer;
use Doctrine\DBAL\Types\Type;
use Doctrine\DBAL\Types\Types;
use RuntimeException;
use function array_merge;
/**
* SQL Azure Schema Synchronizer.
*
* Will iterate over all shards when performing schema operations. This is done
* by partitioning the passed schema into subschemas for the federation and the
* global database and then applying the operations step by step using the
* {@see \Doctrine\DBAL\Schema\Synchronizer\SingleDatabaseSynchronizer}.
*
* @deprecated
*/
class SQLAzureFederationsSynchronizer extends AbstractSchemaSynchronizer
{
public const FEDERATION_TABLE_FEDERATED = 'azure.federated';
public const FEDERATION_DISTRIBUTION_NAME = 'azure.federatedOnDistributionName';
/** @var SQLAzureShardManager */
private $shardManager;
/** @var SchemaSynchronizer */
private $synchronizer;
public function __construct(Connection $conn, SQLAzureShardManager $shardManager, ?SchemaSynchronizer $sync = null)
{
parent::__construct($conn);
$this->shardManager = $shardManager;
$this->synchronizer = $sync ?: new SingleDatabaseSynchronizer($conn);
}
/**
* {@inheritdoc}
*/
public function getCreateSchema(Schema $createSchema)
{
$sql = [];
[$global, $federation] = $this->partitionSchema($createSchema);
$globalSql = $this->synchronizer->getCreateSchema($global);
if ($globalSql) {
$sql[] = "-- Create Root Federation\n" .
'USE FEDERATION ROOT WITH RESET;';
$sql = array_merge($sql, $globalSql);
}
$federationSql = $this->synchronizer->getCreateSchema($federation);
if ($federationSql) {
$defaultValue = $this->getFederationTypeDefaultValue();
$sql[] = $this->getCreateFederationStatement();
$sql[] = 'USE FEDERATION ' . $this->shardManager->getFederationName() . ' (' . $this->shardManager->getDistributionKey() . ' = ' . $defaultValue . ') WITH RESET, FILTERING = OFF;';
$sql = array_merge($sql, $federationSql);
}
return $sql;
}
/**
* {@inheritdoc}
*/
public function getUpdateSchema(Schema $toSchema, $noDrops = false)
{
return $this->work($toSchema, static function ($synchronizer, $schema) use ($noDrops) {
return $synchronizer->getUpdateSchema($schema, $noDrops);
});
}
/**
* {@inheritdoc}
*/
public function getDropSchema(Schema $dropSchema)
{
return $this->work($dropSchema, static function ($synchronizer, $schema) {
return $synchronizer->getDropSchema($schema);
});
}
/**
* {@inheritdoc}
*/
public function createSchema(Schema $createSchema)
{
$this->processSql($this->getCreateSchema($createSchema));
}
/**
* {@inheritdoc}
*/
public function updateSchema(Schema $toSchema, $noDrops = false)
{
$this->processSql($this->getUpdateSchema($toSchema, $noDrops));
}
/**
* {@inheritdoc}
*/
public function dropSchema(Schema $dropSchema)
{
$this->processSqlSafely($this->getDropSchema($dropSchema));
}
/**
* {@inheritdoc}
*/
public function getDropAllSchema()
{
$this->shardManager->selectGlobal();
$globalSql = $this->synchronizer->getDropAllSchema();
if ($globalSql) {
$sql[] = "-- Work on Root Federation\nUSE FEDERATION ROOT WITH RESET;";
$sql = array_merge($sql, $globalSql);
}
$shards = $this->shardManager->getShards();
foreach ($shards as $shard) {
$this->shardManager->selectShard($shard['rangeLow']);
$federationSql = $this->synchronizer->getDropAllSchema();
if (! $federationSql) {
continue;
}
$sql[] = '-- Work on Federation ID ' . $shard['id'] . "\n" .
'USE FEDERATION ' . $this->shardManager->getFederationName() . ' (' . $this->shardManager->getDistributionKey() . ' = ' . $shard['rangeLow'] . ') WITH RESET, FILTERING = OFF;';
$sql = array_merge($sql, $federationSql);
}
$sql[] = 'USE FEDERATION ROOT WITH RESET;';
$sql[] = 'DROP FEDERATION ' . $this->shardManager->getFederationName();
return $sql;
}
/**
* {@inheritdoc}
*/
public function dropAllSchema()
{
$this->processSqlSafely($this->getDropAllSchema());
}
/**
* @return Schema[]
*/
private function partitionSchema(Schema $schema)
{
return [
$this->extractSchemaFederation($schema, false),
$this->extractSchemaFederation($schema, true),
];
}
/**
* @param bool $isFederation
*
* @return Schema
*
* @throws RuntimeException
*/
private function extractSchemaFederation(Schema $schema, $isFederation)
{
$partitionedSchema = clone $schema;
foreach ($partitionedSchema->getTables() as $table) {
if ($isFederation) {
$table->addOption(self::FEDERATION_DISTRIBUTION_NAME, $this->shardManager->getDistributionKey());
}
if ($table->hasOption(self::FEDERATION_TABLE_FEDERATED) !== $isFederation) {
$partitionedSchema->dropTable($table->getName());
} else {
foreach ($table->getForeignKeys() as $fk) {
$foreignTable = $schema->getTable($fk->getForeignTableName());
if ($foreignTable->hasOption(self::FEDERATION_TABLE_FEDERATED) !== $isFederation) {
throw new RuntimeException('Cannot have foreign key between global/federation.');
}
}
}
}
return $partitionedSchema;
}
/**
* Work on the Global/Federation based on currently existing shards and
* perform the given operation on the underlying schema synchronizer given
* the different partitioned schema instances.
*
* @return string[]
*/
private function work(Schema $schema, Closure $operation)
{
[$global, $federation] = $this->partitionSchema($schema);
$sql = [];
$this->shardManager->selectGlobal();
$globalSql = $operation($this->synchronizer, $global);
if ($globalSql) {
$sql[] = "-- Work on Root Federation\nUSE FEDERATION ROOT WITH RESET;";
$sql = array_merge($sql, $globalSql);
}
$shards = $this->shardManager->getShards();
foreach ($shards as $shard) {
$this->shardManager->selectShard($shard['rangeLow']);
$federationSql = $operation($this->synchronizer, $federation);
if (! $federationSql) {
continue;
}
$sql[] = '-- Work on Federation ID ' . $shard['id'] . "\n" .
'USE FEDERATION ' . $this->shardManager->getFederationName() . ' (' . $this->shardManager->getDistributionKey() . ' = ' . $shard['rangeLow'] . ') WITH RESET, FILTERING = OFF;';
$sql = array_merge($sql, $federationSql);
}
return $sql;
}
/**
* @return string
*/
private function getFederationTypeDefaultValue()
{
$federationType = Type::getType($this->shardManager->getDistributionType());
switch ($federationType->getName()) {
case Types::GUID:
$defaultValue = '00000000-0000-0000-0000-000000000000';
break;
case Types::INTEGER:
case Types::SMALLINT:
case Types::BIGINT:
$defaultValue = '0';
break;
default:
$defaultValue = '';
break;
}
return $defaultValue;
}
/**
* @return string
*/
private function getCreateFederationStatement()
{
$federationType = Type::getType($this->shardManager->getDistributionType());
$federationTypeSql = $federationType->getSQLDeclaration([], $this->conn->getDatabasePlatform());
return "--Create Federation\n" .
'CREATE FEDERATION ' . $this->shardManager->getFederationName() . ' (' . $this->shardManager->getDistributionKey() . ' ' . $federationTypeSql . ' RANGE)';
}
}
<?php
namespace Doctrine\DBAL\Sharding\SQLAzure;
use Doctrine\DBAL\Connection;
use Doctrine\DBAL\Sharding\ShardingException;
use Doctrine\DBAL\Sharding\ShardManager;
use Doctrine\DBAL\Types\Type;
use RuntimeException;
use function sprintf;
/**
* Sharding using the SQL Azure Federations support.
*
* @deprecated
*/
class SQLAzureShardManager implements ShardManager
{
/** @var string */
private $federationName;
/** @var bool */
private $filteringEnabled;
/** @var string */
private $distributionKey;
/** @var string */
private $distributionType;
/** @var Connection */
private $conn;
/** @var string|null */
private $currentDistributionValue;
/**
* @throws ShardingException
*/
public function __construct(Connection $conn)
{
$this->conn = $conn;
$params = $conn->getParams();
if (! isset($params['sharding']['federationName'])) {
throw ShardingException::missingDefaultFederationName();
}
if (! isset($params['sharding']['distributionKey'])) {
throw ShardingException::missingDefaultDistributionKey();
}
if (! isset($params['sharding']['distributionType'])) {
throw ShardingException::missingDistributionType();
}
$this->federationName = $params['sharding']['federationName'];
$this->distributionKey = $params['sharding']['distributionKey'];
$this->distributionType = $params['sharding']['distributionType'];
$this->filteringEnabled = (bool) ($params['sharding']['filteringEnabled'] ?? false);
}
/**
* Gets the name of the federation.
*
* @return string
*/
public function getFederationName()
{
return $this->federationName;
}
/**
* Gets the distribution key.
*
* @return string
*/
public function getDistributionKey()
{
return $this->distributionKey;
}
/**
* Gets the Doctrine Type name used for the distribution.
*
* @return string
*/
public function getDistributionType()
{
return $this->distributionType;
}
/**
* Sets Enabled/Disable filtering on the fly.
*
* @param bool $flag
*
* @return void
*/
public function setFilteringEnabled($flag)
{
$this->filteringEnabled = (bool) $flag;
}
/**
* {@inheritDoc}
*/
public function selectGlobal()
{
if ($this->conn->isTransactionActive()) {
throw ShardingException::activeTransaction();
}
$sql = 'USE FEDERATION ROOT WITH RESET';
$this->conn->exec($sql);
$this->currentDistributionValue = null;
}
/**
* {@inheritDoc}
*/
public function selectShard($distributionValue)
{
if ($this->conn->isTransactionActive()) {
throw ShardingException::activeTransaction();
}
$platform = $this->conn->getDatabasePlatform();
$sql = sprintf(
'USE FEDERATION %s (%s = %s) WITH RESET, FILTERING = %s;',
$platform->quoteIdentifier($this->federationName),
$platform->quoteIdentifier($this->distributionKey),
$this->conn->quote($distributionValue),
($this->filteringEnabled ? 'ON' : 'OFF')
);
$this->conn->exec($sql);
$this->currentDistributionValue = $distributionValue;
}
/**
* {@inheritDoc}
*/
public function getCurrentDistributionValue()
{
return $this->currentDistributionValue;
}
/**
* {@inheritDoc}
*/
public function getShards()
{
$sql = 'SELECT member_id as id,
distribution_name as distribution_key,
CAST(range_low AS CHAR) AS rangeLow,
CAST(range_high AS CHAR) AS rangeHigh
FROM sys.federation_member_distributions d
INNER JOIN sys.federations f ON f.federation_id = d.federation_id
WHERE f.name = ' . $this->conn->quote($this->federationName);
return $this->conn->fetchAll($sql);
}
/**
* {@inheritDoc}
*/
public function queryAll($sql, array $params = [], array $types = [])
{
$shards = $this->getShards();
if (! $shards) {
throw new RuntimeException('No shards found for ' . $this->federationName);
}
$result = [];
$oldDistribution = $this->getCurrentDistributionValue();
foreach ($shards as $shard) {
$this->selectShard($shard['rangeLow']);
foreach ($this->conn->fetchAll($sql, $params, $types) as $row) {
$result[] = $row;
}
}
if ($oldDistribution === null) {
$this->selectGlobal();
} else {
$this->selectShard($oldDistribution);
}
return $result;
}
/**
* Splits Federation at a given distribution value.
*
* @param mixed $splitDistributionValue
*
* @return void
*/
public function splitFederation($splitDistributionValue)
{
$type = Type::getType($this->distributionType);
$sql = 'ALTER FEDERATION ' . $this->getFederationName() . ' ' .
'SPLIT AT (' . $this->getDistributionKey() . ' = ' .
$this->conn->quote($splitDistributionValue, $type->getBindingType()) . ')';
$this->conn->exec($sql);
}
}
<?php
namespace Doctrine\DBAL\Sharding\SQLAzure\Schema;
use Doctrine\DBAL\Schema\Column;
use Doctrine\DBAL\Schema\ForeignKeyConstraint;
use Doctrine\DBAL\Schema\Index;
use Doctrine\DBAL\Schema\Schema;
use Doctrine\DBAL\Schema\Sequence;
use Doctrine\DBAL\Schema\Table;
use Doctrine\DBAL\Schema\Visitor\Visitor;
use RuntimeException;
use function in_array;
/**
* Converts a single tenant schema into a multi-tenant schema for SQL Azure
* Federations under the following assumptions:
*
* - Every table is part of the multi-tenant application, only explicitly
* excluded tables are non-federated. The behavior of the tables being in
* global or federated database is undefined. It depends on you selecting a
* federation before DDL statements or not.
* - Every Primary key of a federated table is extended by another column
* 'tenant_id' with a default value of the SQLAzure function
* `federation_filtering_value('tenant_id')`.
* - You always have to work with `filtering=On` when using federations with this
* multi-tenant approach.
* - Primary keys are either using globally unique ids (GUID, Table Generator)
* or you explicitly add the tenant_id in every UPDATE or DELETE statement
* (otherwise they will affect the same-id rows from other tenants as well).
* SQLAzure throws errors when you try to create IDENTIY columns on federated
* tables.
*
* @deprecated
*/
class MultiTenantVisitor implements Visitor
{
/** @var string[] */
private $excludedTables = [];
/** @var string */
private $tenantColumnName;
/** @var string */
private $tenantColumnType = 'integer';
/**
* Name of the federation distribution, defaulting to the tenantColumnName
* if not specified.
*
* @var string
*/
private $distributionName;
/**
* @param string[] $excludedTables
* @param string $tenantColumnName
* @param string|null $distributionName
*/
public function __construct(array $excludedTables = [], $tenantColumnName = 'tenant_id', $distributionName = null)
{
$this->excludedTables = $excludedTables;
$this->tenantColumnName = $tenantColumnName;
$this->distributionName = $distributionName ?: $tenantColumnName;
}
/**
* {@inheritdoc}
*/
public function acceptTable(Table $table)
{
if (in_array($table->getName(), $this->excludedTables)) {
return;
}
$table->addColumn($this->tenantColumnName, $this->tenantColumnType, [
'default' => "federation_filtering_value('" . $this->distributionName . "')",
]);
$clusteredIndex = $this->getClusteredIndex($table);
$indexColumns = $clusteredIndex->getColumns();
$indexColumns[] = $this->tenantColumnName;
if ($clusteredIndex->isPrimary()) {
$table->dropPrimaryKey();
$table->setPrimaryKey($indexColumns);
} else {
$table->dropIndex($clusteredIndex->getName());
$table->addIndex($indexColumns, $clusteredIndex->getName());
$table->getIndex($clusteredIndex->getName())->addFlag('clustered');
}
}
/**
* @param Table $table
*
* @return Index
*
* @throws RuntimeException
*/
private function getClusteredIndex($table)
{
foreach ($table->getIndexes() as $index) {
if ($index->isPrimary() && ! $index->hasFlag('nonclustered')) {
return $index;
}
if ($index->hasFlag('clustered')) {
return $index;
}
}
throw new RuntimeException('No clustered index found on table ' . $table->getName());
}
/**
* {@inheritdoc}
*/
public function acceptSchema(Schema $schema)
{
}
/**
* {@inheritdoc}
*/
public function acceptColumn(Table $table, Column $column)
{
}
/**
* {@inheritdoc}
*/
public function acceptForeignKey(Table $localTable, ForeignKeyConstraint $fkConstraint)
{
}
/**
* {@inheritdoc}
*/
public function acceptIndex(Table $table, Index $index)
{
}
/**
* {@inheritdoc}
*/
public function acceptSequence(Sequence $sequence)
{
}
}
<?php
namespace Doctrine\DBAL\Sharding\ShardChoser;
use Doctrine\DBAL\Sharding\PoolingShardConnection;
/**
* The MultiTenant Shard choser assumes that the distribution value directly
* maps to the shard id.
*
* @deprecated
*/
class MultiTenantShardChoser implements ShardChoser
{
/**
* {@inheritdoc}
*/
public function pickShard($distributionValue, PoolingShardConnection $conn)
{
return $distributionValue;
}
}
<?php
namespace Doctrine\DBAL\Sharding\ShardChoser;
use Doctrine\DBAL\Sharding\PoolingShardConnection;
/**
* Given a distribution value this shard-choser strategy will pick the shard to
* connect to for retrieving rows with the distribution value.
*
* @deprecated
*/
interface ShardChoser
{
/**
* Picks a shard for the given distribution value.
*
* @param string|int $distributionValue
*
* @return string|int
*/
public function pickShard($distributionValue, PoolingShardConnection $conn);
}
<?php
namespace Doctrine\DBAL\Sharding;
/**
* Sharding Manager gives access to APIs to implementing sharding on top of
* Doctrine\DBAL\Connection instances.
*
* For simplicity and developer ease-of-use (and understanding) the sharding
* API only covers single shard queries, no fan-out support. It is primarily
* suited for multi-tenant applications.
*
* The assumption about sharding here
* is that a distribution value can be found that gives access to all the
* necessary data for all use-cases. Switching between shards should be done with
* caution, especially if lazy loading is implemented. Any query is always
* executed against the last shard that was selected. If a query is created for
* a shard Y but then a shard X is selected when its actually executed you
* will hit the wrong shard.
*
* @deprecated
*/
interface ShardManager
{
/**
* Selects global database with global data.
*
* This is the default database that is connected when no shard is
* selected.
*
* @return void
*/
public function selectGlobal();
/**
* Selects the shard against which the queries after this statement will be issued.
*
* @param string $distributionValue
*
* @return void
*
* @throws ShardingException If no value is passed as shard identifier.
*/
public function selectShard($distributionValue);
/**
* Gets the distribution value currently used for sharding.
*
* @return string|null
*/
public function getCurrentDistributionValue();
/**
* Gets information about the amount of shards and other details.
*
* Format is implementation specific, each shard is one element and has an
* 'id' attribute at least.
*
* @return mixed[][]
*/
public function getShards();
/**
* Queries all shards in undefined order and return the results appended to
* each other. Restore the previous distribution value after execution.
*
* Using {@link \Doctrine\DBAL\Connection::fetchAll} to retrieve rows internally.
*
* @param string $sql
* @param mixed[] $params
* @param int[]|string[] $types
*
* @return mixed[]
*/
public function queryAll($sql, array $params, array $types);
}
<?php
namespace Doctrine\DBAL\Sharding;
use Doctrine\DBAL\DBALException;
/**
* Sharding related Exceptions
*
* @deprecated
*/
class ShardingException extends DBALException
{
/**
* @return \Doctrine\DBAL\Sharding\ShardingException
*/
public static function notImplemented()
{
return new self('This functionality is not implemented with this sharding provider.', 1331557937);
}
/**
* @return \Doctrine\DBAL\Sharding\ShardingException
*/
public static function missingDefaultFederationName()
{
return new self('SQLAzure requires a federation name to be set during sharding configuration.', 1332141280);
}
/**
* @return \Doctrine\DBAL\Sharding\ShardingException
*/
public static function missingDefaultDistributionKey()
{
return new self('SQLAzure requires a distribution key to be set during sharding configuration.', 1332141329);
}
/**
* @return \Doctrine\DBAL\Sharding\ShardingException
*/
public static function activeTransaction()
{
return new self('Cannot switch shard during an active transaction.', 1332141766);
}
/**
* @return \Doctrine\DBAL\Sharding\ShardingException
*/
public static function noShardDistributionValue()
{
return new self('You have to specify a string or integer as shard distribution value.', 1332142103);
}
/**
* @return \Doctrine\DBAL\Sharding\ShardingException
*/
public static function missingDistributionType()
{
return new self("You have to specify a sharding distribution type such as 'integer', 'string', 'guid'.");
}
}
......@@ -11,8 +11,6 @@ use Doctrine\DBAL\Driver\PDOSqlite\Driver as PDOSqliteDriver;
use Doctrine\DBAL\Driver\SQLSrv\Driver as SQLSrvDriver;
use Doctrine\DBAL\DriverManager;
use Doctrine\DBAL\Platforms\AbstractPlatform;
use Doctrine\DBAL\Sharding\PoolingShardConnection;
use Doctrine\DBAL\Sharding\ShardChoser\MultiTenantShardChoser;
use Doctrine\Tests\DbalTestCase;
use stdClass;
use function get_class;
......@@ -131,42 +129,6 @@ class DriverManagerTest extends DbalTestCase
self::assertEquals('baz_slave', $params['slaves']['slave1']['dbname']);
}
public function testDatabaseUrlShard() : void
{
$options = [
'driver' => 'pdo_mysql',
'shardChoser' => MultiTenantShardChoser::class,
'global' => ['url' => 'mysql://foo:bar@localhost:11211/baz'],
'shards' => [
[
'id' => 1,
'url' => 'mysql://foo:bar@localhost:11211/baz_slave',
],
],
'wrapperClass' => PoolingShardConnection::class,
];
$conn = DriverManager::getConnection($options);
$params = $conn->getParams();
self::assertInstanceOf(PDOMySQLDriver::class, $conn->getDriver());
$expected = [
'user' => 'foo',
'password' => 'bar',
'host' => 'localhost',
'port' => 11211,
];
foreach ($expected as $key => $value) {
self::assertEquals($value, $params['global'][$key]);
self::assertEquals($value, $params['shards'][0][$key]);
}
self::assertEquals('baz', $params['global']['dbname']);
self::assertEquals('baz_slave', $params['shards'][0]['dbname']);
}
/**
* @param mixed $url
* @param mixed $expected
......
<?php
namespace Doctrine\Tests\DBAL\Platforms;
use Doctrine\DBAL\Platforms\SQLAzurePlatform;
use Doctrine\DBAL\Schema\Table;
use Doctrine\Tests\DbalTestCase;
/**
* @group DBAL-222
*/
class SQLAzurePlatformTest extends DbalTestCase
{
/** @var SQLAzurePlatform */
private $platform;
protected function setUp() : void
{
$this->platform = new SQLAzurePlatform();
}
public function testCreateFederatedOnTable() : void
{
$table = new Table('tbl');
$table->addColumn('id', 'integer');
$table->addOption('azure.federatedOnDistributionName', 'TblId');
$table->addOption('azure.federatedOnColumnName', 'id');
self::assertEquals(['CREATE TABLE tbl (id INT NOT NULL) FEDERATED ON (TblId = id)'], $this->platform->getCreateTableSQL($table));
}
}
<?php
namespace Doctrine\Tests\DBAL\Sharding;
use Doctrine\DBAL\Sharding\PoolingShardConnection;
use Doctrine\DBAL\Sharding\PoolingShardManager;
use Doctrine\DBAL\Sharding\ShardChoser\ShardChoser;
use PHPUnit\Framework\MockObject\MockObject;
use PHPUnit\Framework\TestCase;
class PoolingShardManagerTest extends TestCase
{
/**
* @return PoolingShardConnection|MockObject
*/
private function createConnectionMock() : PoolingShardConnection
{
return $this->getMockBuilder(PoolingShardConnection::class)
->onlyMethods(['connect', 'getParams', 'fetchAll'])
->disableOriginalConstructor()
->getMock();
}
private function createPassthroughShardChoser() : ShardChoser
{
$mock = $this->createMock(ShardChoser::class);
$mock->expects($this->any())
->method('pickShard')
->will($this->returnCallback(static function ($value) {
return $value;
}));
return $mock;
}
private function createStaticShardChooser() : ShardChoser
{
$mock = $this->createMock(ShardChoser::class);
$mock->expects($this->any())
->method('pickShard')
->willReturn(1);
return $mock;
}
public function testSelectGlobal() : void
{
$conn = $this->createConnectionMock();
$conn->expects($this->once())->method('connect')->with($this->equalTo(0));
$conn->method('getParams')
->willReturn([
'shardChoser' => $this->createMock(ShardChoser::class),
]);
$shardManager = new PoolingShardManager($conn);
$shardManager->selectGlobal();
self::assertNull($shardManager->getCurrentDistributionValue());
}
public function testSelectShard() : void
{
$shardId = 10;
$conn = $this->createConnectionMock();
$conn->expects($this->at(0))->method('getParams')->will($this->returnValue(['shardChoser' => $this->createPassthroughShardChoser()]));
$conn->expects($this->at(1))->method('connect')->with($this->equalTo($shardId));
$shardManager = new PoolingShardManager($conn);
$shardManager->selectShard($shardId);
self::assertEquals($shardId, $shardManager->getCurrentDistributionValue());
}
public function testGetShards() : void
{
$conn = $this->createConnectionMock();
$conn->expects($this->any())->method('getParams')->will(
$this->returnValue(
['shards' => [ ['id' => 1], ['id' => 2] ], 'shardChoser' => $this->createPassthroughShardChoser()]
)
);
$shardManager = new PoolingShardManager($conn);
$shards = $shardManager->getShards();
self::assertEquals([['id' => 1], ['id' => 2]], $shards);
}
public function testQueryAll() : void
{
$sql = 'SELECT * FROM table';
$params = [1];
$types = [1];
$conn = $this->createConnectionMock();
$conn->expects($this->at(0))->method('getParams')->will($this->returnValue(
['shards' => [ ['id' => 1], ['id' => 2] ], 'shardChoser' => $this->createPassthroughShardChoser()]
));
$conn->expects($this->at(1))->method('getParams')->will($this->returnValue(
['shards' => [ ['id' => 1], ['id' => 2] ], 'shardChoser' => $this->createPassthroughShardChoser()]
));
$conn->expects($this->at(2))->method('connect')->with($this->equalTo(1));
$conn->expects($this->at(3))
->method('fetchAll')
->with($this->equalTo($sql), $this->equalTo($params), $this->equalTo($types))
->will($this->returnValue([ ['id' => 1] ]));
$conn->expects($this->at(4))->method('connect')->with($this->equalTo(2));
$conn->expects($this->at(5))
->method('fetchAll')
->with($this->equalTo($sql), $this->equalTo($params), $this->equalTo($types))
->will($this->returnValue([ ['id' => 2] ]));
$shardManager = new PoolingShardManager($conn);
$result = $shardManager->queryAll($sql, $params, $types);
self::assertEquals([['id' => 1], ['id' => 2]], $result);
}
public function testQueryAllWithStaticShardChoser() : void
{
$sql = 'SELECT * FROM table';
$params = [1];
$types = [1];
$conn = $this->createConnectionMock();
$conn->expects($this->at(0))->method('getParams')->will($this->returnValue(
['shards' => [ ['id' => 1], ['id' => 2] ], 'shardChoser' => $this->createStaticShardChooser()]
));
$conn->expects($this->at(1))->method('getParams')->will($this->returnValue(
['shards' => [ ['id' => 1], ['id' => 2] ], 'shardChoser' => $this->createStaticShardChooser()]
));
$conn->expects($this->at(2))->method('connect')->with($this->equalTo(1));
$conn->expects($this->at(3))
->method('fetchAll')
->with($this->equalTo($sql), $this->equalTo($params), $this->equalTo($types))
->will($this->returnValue([ ['id' => 1] ]));
$conn->expects($this->at(4))->method('connect')->with($this->equalTo(2));
$conn->expects($this->at(5))
->method('fetchAll')
->with($this->equalTo($sql), $this->equalTo($params), $this->equalTo($types))
->will($this->returnValue([ ['id' => 2] ]));
$shardManager = new PoolingShardManager($conn);
$result = $shardManager->queryAll($sql, $params, $types);
self::assertEquals([['id' => 1], ['id' => 2]], $result);
}
}
<?php
namespace Doctrine\Tests\DBAL\Sharding\SQLAzure;
use Doctrine\DBAL\Connection;
use Doctrine\DBAL\DriverManager;
use Doctrine\DBAL\Schema\Schema;
use Doctrine\DBAL\Sharding\SQLAzure\SQLAzureShardManager;
use PHPUnit\Framework\TestCase;
use function strpos;
abstract class AbstractTestCase extends TestCase
{
/** @var Connection */
protected $conn;
/** @var SQLAzureShardManager */
protected $sm;
protected function setUp() : void
{
if (! isset($GLOBALS['db_type']) || strpos($GLOBALS['db_type'], 'sqlsrv') === false) {
$this->markTestSkipped('No driver or sqlserver driver specified.');
}
$params = [
'driver' => $GLOBALS['db_type'],
'dbname' => $GLOBALS['db_name'],
'user' => $GLOBALS['db_username'],
'password' => $GLOBALS['db_password'],
'host' => $GLOBALS['db_host'],
'sharding' => [
'federationName' => 'Orders_Federation',
'distributionKey' => 'CustID',
'distributionType' => 'integer',
'filteringEnabled' => false,
],
'driverOptions' => ['MultipleActiveResultSets' => false],
];
$this->conn = DriverManager::getConnection($params);
$serverEdition = $this->conn->fetchColumn("SELECT CONVERT(NVARCHAR(128), SERVERPROPERTY('Edition'))");
if (strpos($serverEdition, 'SQL Azure') !== 0) {
$this->markTestSkipped('SQL Azure only test.');
}
// assume database is created and schema is:
// Global products table
// Customers, Orders, OrderItems federation tables.
// See http://cloud.dzone.com/articles/using-sql-azure-federations
$this->sm = new SQLAzureShardManager($this->conn);
}
protected function createShopSchema() : Schema
{
$schema = new Schema();
$products = $schema->createTable('Products');
$products->addColumn('ProductID', 'integer');
$products->addColumn('SupplierID', 'integer');
$products->addColumn('ProductName', 'string');
$products->addColumn('Price', 'decimal', ['scale' => 2, 'precision' => 12]);
$products->setPrimaryKey(['ProductID']);
$products->addOption('azure.federated', true);
$customers = $schema->createTable('Customers');
$customers->addColumn('CustomerID', 'integer');
$customers->addColumn('CompanyName', 'string');
$customers->addColumn('FirstName', 'string');
$customers->addColumn('LastName', 'string');
$customers->setPrimaryKey(['CustomerID']);
$customers->addOption('azure.federated', true);
$customers->addOption('azure.federatedOnColumnName', 'CustomerID');
$orders = $schema->createTable('Orders');
$orders->addColumn('CustomerID', 'integer');
$orders->addColumn('OrderID', 'integer');
$orders->addColumn('OrderDate', 'datetime');
$orders->setPrimaryKey(['CustomerID', 'OrderID']);
$orders->addOption('azure.federated', true);
$orders->addOption('azure.federatedOnColumnName', 'CustomerID');
$orderItems = $schema->createTable('OrderItems');
$orderItems->addColumn('CustomerID', 'integer');
$orderItems->addColumn('OrderID', 'integer');
$orderItems->addColumn('ProductID', 'integer');
$orderItems->addColumn('Quantity', 'integer');
$orderItems->setPrimaryKey(['CustomerID', 'OrderID', 'ProductID']);
$orderItems->addOption('azure.federated', true);
$orderItems->addOption('azure.federatedOnColumnName', 'CustomerID');
return $schema;
}
}
<?php
namespace Doctrine\Tests\DBAL\Sharding\SQLAzure;
use Doctrine\DBAL\Sharding\SQLAzure\SQLAzureFederationsSynchronizer;
use function count;
class FunctionalTest extends AbstractTestCase
{
public function testSharding() : void
{
$schema = $this->createShopSchema();
$synchronizer = new SQLAzureFederationsSynchronizer($this->conn, $this->sm);
$synchronizer->dropAllSchema();
$synchronizer->createSchema($schema);
$this->sm->selectShard(0);
$this->conn->insert('Products', [
'ProductID' => 1,
'SupplierID' => 2,
'ProductName' => 'Test',
'Price' => 10.45,
]);
$this->conn->insert('Customers', [
'CustomerID' => 1,
'CompanyName' => 'Foo',
'FirstName' => 'Benjamin',
'LastName' => 'E.',
]);
$query = 'SELECT * FROM Products';
$data = $this->conn->fetchAll($query);
self::assertGreaterThan(0, count($data));
$query = 'SELECT * FROM Customers';
$data = $this->conn->fetchAll($query);
self::assertGreaterThan(0, count($data));
$data = $this->sm->queryAll('SELECT * FROM Customers');
self::assertGreaterThan(0, count($data));
}
}
<?php
namespace Doctrine\Tests\DBAL\Sharding\SQLAzure;
use Doctrine\DBAL\Platforms\SQLAzurePlatform;
use Doctrine\DBAL\Schema\Schema;
use Doctrine\DBAL\Sharding\SQLAzure\Schema\MultiTenantVisitor;
use PHPUnit\Framework\TestCase;
class MultiTenantVisitorTest extends TestCase
{
public function testMultiTenantPrimaryKey() : void
{
$platform = new SQLAzurePlatform();
$visitor = new MultiTenantVisitor();
$schema = new Schema();
$foo = $schema->createTable('foo');
$foo->addColumn('id', 'string');
$foo->setPrimaryKey(['id']);
$schema->visit($visitor);
self::assertEquals(['id', 'tenant_id'], $foo->getPrimaryKey()->getColumns());
self::assertTrue($foo->hasColumn('tenant_id'));
}
public function testMultiTenantNonPrimaryKey() : void
{
$platform = new SQLAzurePlatform();
$visitor = new MultiTenantVisitor();
$schema = new Schema();
$foo = $schema->createTable('foo');
$foo->addColumn('id', 'string');
$foo->addColumn('created', 'datetime');
$foo->setPrimaryKey(['id']);
$foo->addIndex(['created'], 'idx');
$foo->getPrimaryKey()->addFlag('nonclustered');
$foo->getIndex('idx')->addFlag('clustered');
$schema->visit($visitor);
self::assertEquals(['id'], $foo->getPrimaryKey()->getColumns());
self::assertTrue($foo->hasColumn('tenant_id'));
self::assertEquals(['created', 'tenant_id'], $foo->getIndex('idx')->getColumns());
}
}
<?php
namespace Doctrine\Tests\DBAL\Sharding\SQLAzure;
use Doctrine\DBAL\Sharding\SQLAzure\SQLAzureFederationsSynchronizer;
class SQLAzureFederationsSynchronizerTest extends AbstractTestCase
{
public function testCreateSchema() : void
{
$schema = $this->createShopSchema();
$synchronizer = new SQLAzureFederationsSynchronizer($this->conn, $this->sm);
$sql = $synchronizer->getCreateSchema($schema);
self::assertEquals([
"--Create Federation\nCREATE FEDERATION Orders_Federation (CustID INT RANGE)",
'USE FEDERATION Orders_Federation (CustID = 0) WITH RESET, FILTERING = OFF;',
'CREATE TABLE Products (ProductID INT NOT NULL, SupplierID INT NOT NULL, ProductName NVARCHAR(255) NOT NULL, Price NUMERIC(12, 2) NOT NULL, PRIMARY KEY (ProductID))',
'CREATE TABLE Customers (CustomerID INT NOT NULL, CompanyName NVARCHAR(255) NOT NULL, FirstName NVARCHAR(255) NOT NULL, LastName NVARCHAR(255) NOT NULL, PRIMARY KEY (CustomerID))',
'CREATE TABLE Orders (CustomerID INT NOT NULL, OrderID INT NOT NULL, OrderDate DATETIME2(6) NOT NULL, PRIMARY KEY (CustomerID, OrderID))',
'CREATE TABLE OrderItems (CustomerID INT NOT NULL, OrderID INT NOT NULL, ProductID INT NOT NULL, Quantity INT NOT NULL, PRIMARY KEY (CustomerID, OrderID, ProductID))',
], $sql);
}
public function testUpdateSchema() : void
{
$schema = $this->createShopSchema();
$synchronizer = new SQLAzureFederationsSynchronizer($this->conn, $this->sm);
$synchronizer->dropAllSchema();
$sql = $synchronizer->getUpdateSchema($schema);
self::assertEquals([], $sql);
}
public function testDropSchema() : void
{
$schema = $this->createShopSchema();
$synchronizer = new SQLAzureFederationsSynchronizer($this->conn, $this->sm);
$synchronizer->dropAllSchema();
$synchronizer->createSchema($schema);
$sql = $synchronizer->getDropSchema($schema);
self::assertCount(5, $sql);
}
}
<?php
namespace Doctrine\Tests\DBAL\Sharding\SQLAzure;
use Doctrine\DBAL\Connection;
use Doctrine\DBAL\Sharding\ShardingException;
use Doctrine\DBAL\Sharding\SQLAzure\SQLAzureShardManager;
use PHPUnit\Framework\TestCase;
class SQLAzureShardManagerTest extends TestCase
{
public function testNoFederationName() : void
{
$this->expectException(ShardingException::class);
$this->expectExceptionMessage('SQLAzure requires a federation name to be set during sharding configuration.');
$conn = $this->createConnection(['sharding' => ['distributionKey' => 'abc', 'distributionType' => 'integer']]);
new SQLAzureShardManager($conn);
}
public function testNoDistributionKey() : void
{
$this->expectException(ShardingException::class);
$this->expectExceptionMessage('SQLAzure requires a distribution key to be set during sharding configuration.');
$conn = $this->createConnection(['sharding' => ['federationName' => 'abc', 'distributionType' => 'integer']]);
new SQLAzureShardManager($conn);
}
public function testNoDistributionType() : void
{
$this->expectException(ShardingException::class);
$conn = $this->createConnection(['sharding' => ['federationName' => 'abc', 'distributionKey' => 'foo']]);
new SQLAzureShardManager($conn);
}
public function testGetDefaultDistributionValue() : void
{
$conn = $this->createConnection(['sharding' => ['federationName' => 'abc', 'distributionKey' => 'foo', 'distributionType' => 'integer']]);
$sm = new SQLAzureShardManager($conn);
self::assertNull($sm->getCurrentDistributionValue());
}
public function testSelectGlobalTransactionActive() : void
{
$conn = $this->createConnection(['sharding' => ['federationName' => 'abc', 'distributionKey' => 'foo', 'distributionType' => 'integer']]);
$conn->expects($this->at(1))->method('isTransactionActive')->will($this->returnValue(true));
$this->expectException(ShardingException::class);
$this->expectExceptionMessage('Cannot switch shard during an active transaction.');
$sm = new SQLAzureShardManager($conn);
$sm->selectGlobal();
}
public function testSelectGlobal() : void
{
$conn = $this->createConnection(['sharding' => ['federationName' => 'abc', 'distributionKey' => 'foo', 'distributionType' => 'integer']]);
$conn->expects($this->at(1))->method('isTransactionActive')->will($this->returnValue(false));
$conn->expects($this->at(2))->method('exec')->with($this->equalTo('USE FEDERATION ROOT WITH RESET'));
$sm = new SQLAzureShardManager($conn);
$sm->selectGlobal();
}
public function testSelectShard() : void
{
$conn = $this->createConnection(['sharding' => ['federationName' => 'abc', 'distributionKey' => 'foo', 'distributionType' => 'integer']]);
$conn->expects($this->at(1))->method('isTransactionActive')->will($this->returnValue(true));
$this->expectException(ShardingException::class);
$this->expectExceptionMessage('Cannot switch shard during an active transaction.');
$sm = new SQLAzureShardManager($conn);
$sm->selectShard(1234);
self::assertEquals(1234, $sm->getCurrentDistributionValue());
}
/**
* @param mixed[] $params
*/
private function createConnection(array $params) : Connection
{
$conn = $this->getMockBuilder(Connection::class)
->onlyMethods(['getParams', 'exec', 'isTransactionActive'])
->disableOriginalConstructor()
->getMock();
$conn->expects($this->at(0))->method('getParams')->will($this->returnValue($params));
return $conn;
}
}
<?php
namespace Doctrine\Tests\DBAL\Sharding\ShardChoser;
use Doctrine\DBAL\Sharding\PoolingShardConnection;
use Doctrine\DBAL\Sharding\ShardChoser\MultiTenantShardChoser;
use PHPUnit\Framework\TestCase;
class MultiTenantShardChoserTest extends TestCase
{
public function testPickShard() : void
{
$choser = new MultiTenantShardChoser();
$conn = $this->createConnectionMock();
self::assertEquals(1, $choser->pickShard(1, $conn));
self::assertEquals(2, $choser->pickShard(2, $conn));
}
private function createConnectionMock() : PoolingShardConnection
{
return $this->getMockBuilder(PoolingShardConnection::class)
->onlyMethods(['connect', 'getParams', 'fetchAll'])
->disableOriginalConstructor()
->getMock();
}
}
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment