Chunked processing is crucial when dealing with datasets that are too large to fit in memory. This technique involves processing data in smaller, manageable pieces.
Detailed Implementation:
Example of Chunked Processor
class ChunkedProcessor {
private $chunkSize;
private $maxMemoryUsage;
private $logHandler;
// Adjusted the constructor to avoid type errors
public function __construct($chunkSize = 1000, $maxMemoryUsage = '256M', LogHandler $logHandler = null) {
$this->chunkSize = $chunkSize;
$this->maxMemoryUsage = $maxMemoryUsage;
$this->logHandler = $logHandler ?? new LogHandler(); // Default if not provided
}
public function processLargeDataset($filename, callable $processor) {
try {
// Validate and set memory limit
if (!ini_set('memory_limit', $this->maxMemoryUsage)) {
throw new Exception("Failed to set memory limit.");
}
if (!file_exists($filename)) {
throw new Exception("File not found: $filename");
}
$handle = fopen($filename, 'r');
if ($handle === false) {
throw new Exception("Failed to open file: $filename");
}
$stats = [
'processed_rows' => 0,
'failed_rows' => 0,
'start_time' => microtime(true),
'memory_peak' => 0
];
// Process file in chunks
while (!feof($handle)) {
try {
$chunk = [];
$count = 0;
// Build chunk
while ($count < $this->chunkSize && ($line = fgets($handle)) !== false) {
$decodedLine = json_decode($line, true);
if (json_last_error() === JSON_ERROR_NONE) {
$chunk[] = $decodedLine;
} else {
$this->logHandler->warning("Failed to decode line: $line");
$stats['failed_rows']++;
}
$count++;
}
// Process current chunk
$processor($chunk);
// Update statistics
$stats['processed_rows'] += count($chunk);
$stats['memory_peak'] = max($stats['memory_peak'], memory_get_peak_usage(true));
// Log progress
$this->logProgress($stats);
// Clean up
unset($chunk);
if ($stats['processed_rows'] % ($this->chunkSize * 10) === 0) {
gc_collect_cycles();
}
} catch (Exception $e) {
$stats['failed_rows'] += count($chunk);
$this->logHandler->error("Chunk processing failed: " . $e->getMessage());
continue;
}
}
fclose($handle);
return $this->generateReport($stats);
} catch (Exception $e) {
throw new Exception("Dataset processing failed: " . $e->getMessage());
}
}
private function logProgress(array $stats) {
$memoryUsage = memory_get_usage(true) / 1024 / 1024;
$timeElapsed = microtime(true) - $stats['start_time'];
$rowsPerSecond = $stats['processed_rows'] / $timeElapsed;
$this->logHandler->info(sprintf(
"Processed %d rows | Memory: %.2f MB | Speed: %.2f rows/sec",
$stats['processed_rows'],
$memoryUsage,
$rowsPerSecond
));
}
private function generateReport(array $stats) {
return [
'total_processed' => $stats['processed_rows'],
'total_failed' => $stats['failed_rows'],
'memory_peak_mb' => $stats['memory_peak'] / 1024 / 1024,
'time_taken_sec' => microtime(true) - $stats['start_time']
];
}
}
// Example of usage
class LogHandler {
public function info($message) {
echo "INFO: $message\n";
}
public function error($message) {
echo "ERROR: $message\n";
}
public function warning($message) {
echo "WARNING: $message\n";
}
}
Example of Use:
$logHandler = new LogHandler();
$processor = new ChunkedProcessor(1000, '512M', $logHandler);
$result = $processor->processLargeDataset(dirname(__FILE__) . '/large_data.json', function($chunk) {
// Custom processing logic for each chunk
foreach ($chunk as $data) {
// Simulate processing (e.g., database insert, API call, etc.)
echo "Processing: " . json_encode($data) . "\n";
}
});
echo "\n\n";
print_r($result);
Key Benefits:
Memory efficient: Only loads a small portion of data at a time
Fault tolerant: Errors in one chunk don't affect others
Progress tracking: Enables monitoring of long-running processes
Generators provide a memory-efficient way to iterate over large datasets by yielding values one at a time.
Example of Dataset Generator
class DatasetGenerator {
private $bufferSize;
public function __construct($bufferSize = 8192) {
$this->bufferSize = $bufferSize;
}
public function readLargeFile($filename) {
$handle = fopen($filename, 'r');
while (!feof($handle)) {
$buffer = fread($handle, $this->bufferSize);
$lines = explode("\n", $buffer);
foreach ($lines as $line) {
if (trim($line) !== '') {
yield json_decode($line, true);
}
}
}
fclose($handle);
}
public function processBatchedData($filename, $batchSize = 100) {
$batch = [];
foreach ($this->readLargeFile($filename) as $item) {
$batch[] = $item;
if (count($batch) >= $batchSize) {
yield $batch;
$batch = [];
}
}
if (!empty($batch)) {
yield $batch;
}
}
}
Example of Use:
// Instantiate the DatasetGenerator with an optional buffer size
// You can set the buffer size here or use the default
$generator = new DatasetGenerator(4096);
// Specify the path to the large file
$filename = dirname(__FILE__) . '/large_data.json';
// Process the file in batches of 100 items
foreach ($generator->processBatchedData($filename, 100) as $batch) {
// Perform processing on each batch (e.g., save to a database or further transform data)
foreach ($batch as $item) {
// Example of handling each item in the batch
echo 'Processing item: ' . json_encode($item) . PHP_EOL;
// Add your specific logic here, such as inserting data into a database or validating items
}
}
2. Distributed Storage Systems
A. HDFS Integration
Hadoop Distributed File System (HDFS) is a core component of the Apache Hadoop ecosystem, designed to handle the storage and distribution of massive datasets across a network of commodity hardware. It is optimized for storing large files and is engineered to deliver high throughput access to application data, making it well-suited for batch processing and big data applications.
Key characteristics of HDFS include:
Scalability: HDFS can handle petabytes of data by spreading it across many machines, making it highly scalable. It supports clusters with thousands of nodes.
Fault Tolerance: HDFS replicates data blocks across multiple nodes to ensure data reliability and resiliency against hardware failures.
High Throughput: Optimized for high data throughput, HDFS enables quick data transfer for tasks that require streaming access patterns, where data is read in large sequential chunks.
Data Locality: HDFS is designed to move computations to where data is stored, reducing network congestion and improving processing speeds.
Typical use cases for HDFS include data warehousing, machine learning, log processing, and any application that requires storage of large-scale data. HDFS forms the foundation for other Hadoop ecosystem tools like MapReduce, YARN, and Apache Hive, enabling a complete solution for distributed data processing and analysis.
Example of HDFS Manager
class HDFSManager {
private $webhdfsClient;
private $configuration;
public function __construct(array $config) {
$this->configuration = $config;
$this->webhdfsClient = new WebHDFSClient($config['namenode_host'],
$config['namenode_port']);
}
public function writeFile($localPath, $hdfsPath, $replication = 3) {
try {
// Create file with specified replication
$response = $this->webhdfsClient->create($hdfsPath, [
'overwrite' => 'true',
'replication' => $replication,
'blocksize' => $this->configuration['block_size']
]);
// Stream file content
$handle = fopen($localPath, 'r');
while (!feof($handle)) {
$chunk = fread($handle, 8192);
$this->webhdfsClient->append($hdfsPath, $chunk);
}
fclose($handle);
return [
'status' => 'success',
'path' => $hdfsPath,
'size' => filesize($localPath)
];
} catch (Exception $e) {
throw new HDFSException("Failed to write file: " . $e->getMessage());
}
}
public function readFile($hdfsPath, $processor) {
try {
$metadata = $this->webhdfsClient->getFileStatus($hdfsPath);
$blockSize = $metadata['blockSize'];
$fileSize = $metadata['length'];
// Read file in blocks
for ($offset = 0; $offset < $fileSize; $offset += $blockSize) {
$data = $this->webhdfsClient->read($hdfsPath, $offset, $blockSize);
$processor($data);
}
} catch (Exception $e) {
throw new HDFSException("Failed to read file: " . $e->getMessage());
}
}
}
B. Object Storage Implementation
Object storage systems are designed to store and manage large amounts of unstructured data, such as images, videos, and backups, by storing data as discrete units (objects) instead of traditional file hierarchies. Implementing an object storage solution with features like multipart uploads and retry logic enhances the system’s reliability, efficiency, and fault tolerance. Here’s how these features work in practice:
Key Features of Object Storage Implementation
Multipart Uploads:
Multipart uploads enable the storage system to break large files into smaller parts and upload them independently. Each part is uploaded individually, and once all parts are uploaded, they are combined into a single object.
This approach reduces the risk of upload failures on large files, as only the individual part needs re-uploading if a failure occurs, rather than the entire file.
Multipart uploads are especially beneficial for uploading large files over unstable networks, where uploading in chunks ensures that network interruptions do not require restarting the entire process.
Examples of object storage systems that support multipart uploads include Amazon S3, Azure Blob Storage, and Google Cloud Storage.
Retry Logic:
Retry logic is a mechanism to automatically attempt the upload or download of an object again if an error or interruption occurs. This is essential for maintaining data integrity and ensuring that operations complete successfully even in cases of temporary issues, like network instability or server errors.
A typical implementation will have a set number of retries and a delay between attempts, which might increase gradually (exponential backoff) to avoid server overload.
Retry logic is crucial for operations that need to ensure data availability and consistency, and it can be configured to handle various error types, such as connection timeouts or service unavailability.
Example of a robust object storage system integration with features like multipart uploads and retry logic.