diff --git a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/EvictMemoryToTieredStorageTest.scala b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/EvictMemoryToTieredStorageTest.scala index 567a5b283a2..fd6b6e4b198 100644 --- a/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/EvictMemoryToTieredStorageTest.scala +++ b/tests/spark-it/src/test/scala/org/apache/celeborn/tests/spark/EvictMemoryToTieredStorageTest.scala @@ -71,9 +71,9 @@ class EvictMemoryToTieredStorageTest extends AnyFunSuite val s3url = container.getS3URL val augmentedConfiguration = Map( - CelebornConf.ACTIVE_STORAGE_TYPES.key -> "MEMORY,HDD,S3", - CelebornConf.WORKER_STORAGE_CREATE_FILE_POLICY.key -> "MEMORY,HDD,S3", - // CelebornConf.WORKER_STORAGE_EVICT_POLICY.key -> "MEMORY,S3", + CelebornConf.ACTIVE_STORAGE_TYPES.key -> "MEMORY,S3", + CelebornConf.WORKER_STORAGE_CREATE_FILE_POLICY.key -> "MEMORY,S3", + CelebornConf.WORKER_STORAGE_EVICT_POLICY.key -> "MEMORY,S3", // note that in S3 (and Minio) you cannot upload parts smaller than 5MB, so we trigger eviction only when there // is enough data CelebornConf.WORKER_MEMORY_FILE_STORAGE_MAX_FILE_SIZE.key -> "5MB", @@ -199,6 +199,52 @@ class EvictMemoryToTieredStorageTest extends AnyFunSuite celebornSparkSession.stop() } + test("celeborn spark integration test - memory evict to s3 after partition split") { + assumeS3LibraryIsLoaded() + + val sparkConf = new SparkConf().setAppName("celeborn-demo").setMaster("local[2]") + val celebornSparkSession = SparkSession.builder() + .config(updateSparkConfWithStorageTypes(sparkConf, ShuffleMode.HASH, "MEMORY,S3")) + // Set split threshold equal to WORKER_MEMORY_FILE_STORAGE_MAX_FILE_SIZE (5MB) to trigger + // the following sequence that reproduces the production failure: + // 1. MemoryTierWriter accumulates ~5MB → eviction → DfsTierWriter (S3) created. + // getDiskFileInfo() is now non-null, enabling the regular split-threshold check. + // (Without prior eviction to disk, getDiskFileInfo() == null and no split fires.) + // 2. The S3 file grows past 5MB → SOFT_SPLIT response sent to the Spark client. + // 3. ChangePartitionManager calls allocateFromCandidates, which builds the new location + // with type=MEMORY (storageTypes.head for "MEMORY,S3") and availableTypes=MEMORY|S3. + // 4. The new MemoryTierWriter fills again → eviction triggered on the MEMORY-typed + // location → StorageManager.createDiskFile must handle type=MEMORY as a valid S3 + // target (using availableStorageTypes) rather than rejecting it. + .config(s"spark.${CelebornConf.SHUFFLE_PARTITION_SPLIT_THRESHOLD.key}", "5MB") + .getOrCreate() + + // 20MB covers all three phases: + // ~5MB to fill the first MemoryTierWriter and trigger eviction to S3, + // ~5MB of additional S3 writes to exceed the split threshold and fire SOFT_SPLIT, + // ~10MB remaining for the second MemoryTierWriter to fill and trigger the second eviction. + val sampleSeq: immutable.Seq[(String, Int)] = buildDataSet(20 * 1024 * 1024) + + repartition(celebornSparkSession, sequence = sampleSeq, partitions = 1) + + // After splits there are more than 2 committed locations (one per epoch), so we assert + // type and path for each rather than an exact count. + assert(seenPartitionLocationsOpenReader.size >= 2) + seenPartitionLocationsOpenReader.asScala.foreach(location => { + assert( + location.getStorageInfo.getType == Type.MEMORY || location.getStorageInfo.getType == Type.S3) + assert(location.getStorageInfo.getFilePath == "") + }) + assert(seenPartitionLocationsUpdateFileGroups.size >= 2) + seenPartitionLocationsUpdateFileGroups.asScala.foreach { location => + if (location.getStorageInfo.getType == Type.MEMORY) + assert(location.getStorageInfo.getFilePath == "") + else if (location.getStorageInfo.getType == Type.S3) + assert(location.getStorageInfo.getFilePath.startsWith("s3://")) + } + celebornSparkSession.stop() + } + test("celeborn spark integration test - push fails no way of evicting") { assumeS3LibraryIsLoaded() @@ -216,16 +262,15 @@ class EvictMemoryToTieredStorageTest extends AnyFunSuite celebornSparkSession.stop() } - private def buildBigDataSet = { + private def buildBigDataSet: immutable.Seq[(String, Int)] = buildDataSet(10 * 1024 * 1024) + + private def buildDataSet(sizeBytes: Int): immutable.Seq[(String, Int)] = { val big1KBString: String = StringUtils.repeat(' ', 1024) - val partitionSize = 10 * 1024 * 1024 - val numValues = partitionSize / big1KBString.length - // we need to write enough to trigger eviction from MEMORY to S3 - val sampleSeq: immutable.Seq[(String, Int)] = (1 to numValues) + val numValues = sizeBytes / big1KBString.length + (1 to numValues) .map(i => big1KBString + i) // all different keys .toList .map(v => (v.toUpperCase, Random.nextInt(12) + 1)) - sampleSeq } def interceptLocationsSeenByClient(): Unit = { diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala index 116f431b85b..6d375932b82 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StorageManager.scala @@ -1124,9 +1124,15 @@ final private[worker] class StorageManager(conf: CelebornConf, workerSource: Abs fileName: String, userIdentifier: UserIdentifier, partitionType: PartitionType, - partitionSplitEnabled: Boolean): (Flusher, DiskFileInfo, File) = { + partitionSplitEnabled: Boolean, + overrideStorageType: StorageInfo.Type = null): (Flusher, DiskFileInfo, File) = { val suggestedMountPoint = location.getStorageInfo.getMountPoint - val storageType = location.getStorageInfo.getType + + val storageType = + if (overrideStorageType != null) + overrideStorageType + else location.getStorageInfo.getType + var retryCount = 0 var exception: IOException = null val shuffleKey = Utils.makeShuffleKey(appId, shuffleId) diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala index 028e54bd7a1..7168a809dc7 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/storage/StoragePolicy.scala @@ -49,7 +49,7 @@ class StoragePolicy(conf: CelebornConf, storageManager: StorageManager, source: true) } } - logError(s"Create evict file failed for ${partitionDataWriterContext.getPartitionLocation}") + logError(s"Create evict file failed for ${partitionDataWriterContext.getPartitionLocation} - no policy for ${celebornFile.storageType.name()}") null } @@ -94,6 +94,7 @@ class StoragePolicy(conf: CelebornConf, storageManager: StorageManager, source: } def tryCreateFileByType(storageInfoType: StorageInfo.Type): TierWriterBase = { + val overrideType = if (evict) storageInfoType else location.getStorageInfo.getType try { storageInfoType match { case StorageInfo.Type.MEMORY => @@ -118,11 +119,14 @@ class StoragePolicy(conf: CelebornConf, storageManager: StorageManager, source: partitionDataWriterContext, storageManager) } else { + logWarning( + s"Not creating ${storageInfoType} file from ${location.getStorageInfo.getType} for ${partitionDataWriterContext.getShuffleKey} ${partitionDataWriterContext.getPartitionLocation.getFileName}") null } case StorageInfo.Type.HDD | StorageInfo.Type.SSD | StorageInfo.Type.HDFS | StorageInfo.Type.OSS | StorageInfo.Type.S3 => if (storageManager.localOrDfsStorageAvailable) { - logDebug(s"create non-memory file for ${partitionDataWriterContext.getShuffleKey} ${partitionDataWriterContext.getPartitionLocation.getFileName}") + logDebug( + s"create non-memory file type $storageInfoType (evict=$evict, override=$overrideType) for ${partitionDataWriterContext.getShuffleKey} ${partitionDataWriterContext.getPartitionLocation.getFileName}") val (flusher, diskFileInfo, workingDir) = storageManager.createDiskFile( location, partitionDataWriterContext.getAppId, @@ -130,7 +134,9 @@ class StoragePolicy(conf: CelebornConf, storageManager: StorageManager, source: location.getFileName, partitionDataWriterContext.getUserIdentifier, partitionDataWriterContext.getPartitionType, - partitionDataWriterContext.isPartitionSplitEnabled) + partitionDataWriterContext.isPartitionSplitEnabled, + overrideType // this is different from location type, in case of eviction + ) partitionDataWriterContext.setWorkingDir(workingDir) val metaHandler = getPartitionMetaHandler(diskFileInfo) if (flusher.isInstanceOf[LocalFlusher] @@ -167,7 +173,9 @@ class StoragePolicy(conf: CelebornConf, storageManager: StorageManager, source: } } catch { case e: Exception => - logError(s"create celeborn file for storage $storageInfoType failed", e) + logError( + s"create celeborn file for storage $storageInfoType failed for ${partitionDataWriterContext.getShuffleKey} ${partitionDataWriterContext.getPartitionLocation.getFileName}", + e) null } } @@ -193,7 +201,8 @@ class StoragePolicy(conf: CelebornConf, storageManager: StorageManager, source: } val maxSize = order.get.length - for (i <- tryCreateFileTypeIndex until maxSize) { + val firstIndex = tryCreateFileTypeIndex + for (i <- firstIndex until maxSize) { val storageStr = order.get(i) val storageInfoType = StorageInfo.fromStrToType(storageStr) val file = tryCreateFileByType(storageInfoType) @@ -203,9 +212,9 @@ class StoragePolicy(conf: CelebornConf, storageManager: StorageManager, source: } logError( - s"Could not create file for storage type ${location.getStorageInfo.getType}") + s"Could not create file for storage type ${location.getStorageInfo.getType}, tried ${order.get} firstIndex $firstIndex for ${partitionDataWriterContext.getShuffleKey} ${partitionDataWriterContext.getPartitionLocation.getFileName}") throw new CelebornIOException( - s"Create file failed for context ${partitionDataWriterContext.toString}") + s"Create file failed for context ${partitionDataWriterContext.toString}, tried ${order.get} firstIndex $firstIndex") } } diff --git a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase1.scala b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase1.scala index 640a67113a5..bdf9ce7cc45 100644 --- a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase1.scala +++ b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase1.scala @@ -66,6 +66,7 @@ class StoragePolicyCase1 extends CelebornFunSuite { any(), any(), any(), + any(), any())).thenAnswer((mockedFlusher, mockedDiskFile, mockedFile)) val memoryHintPartitionLocation = diff --git a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase2.scala b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase2.scala index 7a7ab956daa..9dcec7e524b 100644 --- a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase2.scala +++ b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase2.scala @@ -66,6 +66,7 @@ class StoragePolicyCase2 extends CelebornFunSuite { any(), any(), any(), + any(), any())).thenAnswer((mockedFlusher, mockedDiskFile, mockedFile)) val memoryHintPartitionLocation = diff --git a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase3.scala b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase3.scala index 9097037001d..8f21a7f4cbc 100644 --- a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase3.scala +++ b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase3.scala @@ -66,6 +66,7 @@ class StoragePolicyCase3 extends CelebornFunSuite { any(), any(), any(), + any(), any())).thenAnswer((mockedFlusher, mockedDiskFile, mockedFile)) val memoryHintPartitionLocation = diff --git a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase4.scala b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase4.scala index e25837d2b92..dc321738e74 100644 --- a/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase4.scala +++ b/worker/src/test/scala/org/apache/celeborn/service/deploy/worker/storage/storagePolicy/StoragePolicyCase4.scala @@ -66,6 +66,7 @@ class StoragePolicyCase4 extends CelebornFunSuite { any(), any(), any(), + any(), any())).thenAnswer((mockedFlusher, mockedDiskFile, mockedFile)) val memoryHintPartitionLocation =